diff --git a/Cargo.lock b/Cargo.lock index 0e0e88a6b4..e0dbc11147 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10248,6 +10248,7 @@ dependencies = [ "reth-static-file-types", "reth-storage-api", "reth-storage-errors", + "reth-tasks", "reth-testing-utils", "reth-tracing", "reth-trie", diff --git a/crates/cli/commands/src/db/state.rs b/crates/cli/commands/src/db/state.rs index 35881e7cd2..b28e4c2647 100644 --- a/crates/cli/commands/src/db/state.rs +++ b/crates/cli/commands/src/db/state.rs @@ -11,6 +11,7 @@ use reth_db_common::DbTool; use reth_node_builder::NodeTypesWithDB; use reth_provider::providers::ProviderNodeTypes; use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache}; +use reth_tasks::spawn_scoped_os_thread; use std::{ collections::BTreeSet, thread, @@ -230,7 +231,7 @@ impl Command { thread::scope(|s| { let handles: Vec<_> = (0..num_threads) .map(|thread_id| { - s.spawn(move || { + spawn_scoped_os_thread(s, "db-state-worker", move || { loop { // Get next chunk to process let chunk_idx = { diff --git a/crates/cli/runner/src/lib.rs b/crates/cli/runner/src/lib.rs index 1668746fcb..3ea3446a03 100644 --- a/crates/cli/runner/src/lib.rs +++ b/crates/cli/runner/src/lib.rs @@ -83,22 +83,7 @@ impl CliRunner { task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout); } - // `drop(tokio_runtime)` would block the current thread until its pools - // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop - // it on a separate thread and wait for up to 5 seconds for this operation to - // complete. - let (tx, rx) = mpsc::channel(); - std::thread::Builder::new() - .name("tokio-runtime-shutdown".to_string()) - .spawn(move || { - drop(tokio_runtime); - let _ = tx.send(()); - }) - .unwrap(); - - let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| { - debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); - }); + tokio_shutdown(tokio_runtime, true); command_res } @@ -137,19 +122,7 @@ impl CliRunner { task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout); } - // Shutdown the runtime on a separate thread - let (tx, rx) = mpsc::channel(); - std::thread::Builder::new() - .name("tokio-runtime-shutdown".to_string()) - .spawn(move || { - drop(tokio_runtime); - let _ = tx.send(()); - }) - .unwrap(); - - let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| { - debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); - }); + tokio_shutdown(tokio_runtime, true); command_res } @@ -179,13 +152,7 @@ impl CliRunner { tokio_runtime .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?; - // drop the tokio runtime on a separate thread because drop blocks until its pools - // (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block - // the current thread but we want to exit right away. - std::thread::Builder::new() - .name("tokio-runtime-shutdown".to_string()) - .spawn(move || drop(tokio_runtime)) - .unwrap(); + tokio_shutdown(tokio_runtime, false); Ok(()) } @@ -321,3 +288,27 @@ where Ok(()) } + +/// Shut down the given Tokio runtime, and wait for it if `wait` is set. +/// +/// `drop(tokio_runtime)` would block the current thread until its pools +/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop +/// it on a separate thread and wait for up to 5 seconds for this operation to +/// complete. +fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) { + // Shutdown the runtime on a separate thread + let (tx, rx) = mpsc::channel(); + std::thread::Builder::new() + .name("tokio-shutdown".to_string()) + .spawn(move || { + drop(rt); + let _ = tx.send(()); + }) + .unwrap(); + + if wait { + let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| { + debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); + }); + } +} diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index dbac81f7e7..905c4fbb78 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -11,6 +11,7 @@ use reth_provider::{ }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; +use reth_tasks::spawn_os_thread; use std::{ sync::{ mpsc::{Receiver, SendError, Sender}, @@ -264,14 +265,11 @@ impl PersistenceHandle { // spawn the persistence service let db_service = PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx); - let join_handle = std::thread::Builder::new() - .name("persistence".to_string()) - .spawn(|| { - if let Err(err) = db_service.run() { - error!(target: "engine::persistence", ?err, "Persistence service failed"); - } - }) - .unwrap(); + let join_handle = spawn_os_thread("persistence", || { + if let Err(err) = db_service.run() { + error!(target: "engine::persistence", ?err, "Persistence service failed"); + } + }); PersistenceHandle { sender: db_service_tx, diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 21daf3c38a..d2f7ad551d 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -37,6 +37,7 @@ use reth_provider::{ }; use reth_revm::database::StateProviderDatabase; use reth_stages_api::ControlFlow; +use reth_tasks::spawn_os_thread; use reth_trie_db::ChangesetCache; use revm::state::EvmState; use state::TreeState; @@ -431,7 +432,7 @@ where changeset_cache, ); let incoming = task.incoming_tx.clone(); - std::thread::Builder::new().name("engine".to_string()).spawn(|| task.run()).unwrap(); + spawn_os_thread("engine", || task.run()); (incoming, outgoing) } diff --git a/crates/engine/tree/src/tree/tests.rs b/crates/engine/tree/src/tree/tests.rs index b2ea8272a0..1903888b03 100644 --- a/crates/engine/tree/src/tree/tests.rs +++ b/crates/engine/tree/src/tree/tests.rs @@ -28,6 +28,7 @@ use reth_ethereum_primitives::{Block, EthPrimitives}; use reth_evm_ethereum::MockEvmConfig; use reth_primitives_traits::Block as _; use reth_provider::test_utils::MockEthProvider; +use reth_tasks::spawn_os_thread; use std::{ collections::BTreeMap, str::FromStr, @@ -538,10 +539,7 @@ async fn test_tree_persist_blocks() { .get_executed_blocks(1..tree_config.persistence_threshold() + 2) .collect(); let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone()); - std::thread::Builder::new() - .name("Engine Task".to_string()) - .spawn(|| test_harness.tree.run()) - .unwrap(); + spawn_os_thread("engine", || test_harness.tree.run()); // send a message to the tree to enter the main loop. test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); @@ -1989,10 +1987,7 @@ mod forkchoice_updated_tests { let action_rx = test_harness.action_rx; // Spawn tree in background thread - std::thread::Builder::new() - .name("Engine Task".to_string()) - .spawn(|| test_harness.tree.run()) - .unwrap(); + spawn_os_thread("engine", || test_harness.tree.run()); // Send terminate request to_tree_tx diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index d21ff69804..96a4f715b5 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -31,6 +31,7 @@ reth-codecs.workspace = true reth-chain-state.workspace = true reth-node-types.workspace = true reth-static-file-types.workspace = true +reth-tasks.workspace = true # ethereum alloy-eips.workspace = true alloy-primitives.workspace = true diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index f9533a42cd..99816fe91a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -64,6 +64,7 @@ use reth_storage_api::{ StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput, }; use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError}; +use reth_tasks::spawn_scoped_os_thread; use reth_trie::{ updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted}, HashedPostStateSorted, StoredNibbles, @@ -552,28 +553,20 @@ impl DatabaseProvider(start.elapsed()) - }) - // Same panic happens in `scope.spawn` - .expect("failed to spawn thread"); + let sf_handle = spawn_scoped_os_thread(s, "static-files", || { + let start = Instant::now(); + sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?; + Ok::<_, ProviderError>(start.elapsed()) + }); // RocksDB writes #[cfg(all(unix, feature = "rocksdb"))] let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| { - thread::Builder::new() - .name("rocksdb".into()) - .spawn_scoped(s, || { - let start = Instant::now(); - rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?; - Ok::<_, ProviderError>(start.elapsed()) - }) - // Same panic happens in `scope.spawn` - .expect("failed to spawn thread") + spawn_scoped_os_thread(s, "rocksdb", || { + let start = Instant::now(); + rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?; + Ok::<_, ProviderError>(start.elapsed()) + }) }); // MDBX writes diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 32576e6cf5..69360c8a32 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -21,6 +21,7 @@ use reth_storage_errors::{ db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel}, provider::{ProviderError, ProviderResult}, }; +use reth_tasks::spawn_scoped_os_thread; use rocksdb::{ BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType, DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB, @@ -1152,13 +1153,21 @@ impl RocksDBProvider { let handles: Vec<_> = [ (ctx.storage_settings.transaction_hash_numbers_in_rocksdb && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full())) - .then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))), - ctx.storage_settings - .account_history_in_rocksdb - .then(|| s.spawn(|| self.write_account_history(blocks, &ctx))), - ctx.storage_settings - .storages_history_in_rocksdb - .then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))), + .then(|| { + spawn_scoped_os_thread(s, "rocksdb-tx-hash", || { + self.write_tx_hash_numbers(blocks, tx_nums, &ctx) + }) + }), + ctx.storage_settings.account_history_in_rocksdb.then(|| { + spawn_scoped_os_thread(s, "rocksdb-account-history", || { + self.write_account_history(blocks, &ctx) + }) + }), + ctx.storage_settings.storages_history_in_rocksdb.then(|| { + spawn_scoped_os_thread(s, "rocksdb-storage-history", || { + self.write_storage_history(blocks, &ctx) + }) + }), ] .into_iter() .enumerate() diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 7f772d4414..f5a35234fe 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -47,6 +47,7 @@ use reth_storage_api::{ StorageChangeSetReader, StorageSettingsCache, }; use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; +use reth_tasks::spawn_scoped_os_thread; use std::{ collections::BTreeMap, fmt::Debug, @@ -669,15 +670,11 @@ impl StaticFileProvider { where F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env, { - thread::Builder::new() - .name(segment.as_short_str().into()) - .spawn_scoped(scope, move || { - let mut w = self.get_writer(first_block_number, segment)?; - f(&mut w)?; - w.sync_all() - }) - // Same panic happens in `scope.spawn` - .expect("failed to spawn thread") + spawn_scoped_os_thread(scope, segment.as_short_str(), move || { + let mut w = self.get_writer(first_block_number, segment)?; + f(&mut w)?; + w.sync_all() + }) } /// Writes all static file data for multiple blocks in parallel per-segment. diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 905c91e11b..4294f3d40c 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -30,6 +30,7 @@ use std::{ Arc, OnceLock, }, task::{ready, Context, Poll}, + thread, }; use tokio::{ runtime::Handle, @@ -48,6 +49,51 @@ pub mod pool; /// Global [`TaskExecutor`] instance that can be accessed from anywhere. static GLOBAL_EXECUTOR: OnceLock = OnceLock::new(); +/// Spawns an OS thread with the current tokio runtime context propagated. +/// +/// This function captures the current tokio runtime handle (if available) and enters it +/// in the newly spawned thread. This ensures that code running in the spawned thread can +/// use [`Handle::current()`], [`Handle::spawn_blocking()`], and other tokio utilities that +/// require a runtime context. +#[track_caller] +pub fn spawn_os_thread(name: &str, f: F) -> thread::JoinHandle +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let handle = Handle::try_current().ok(); + thread::Builder::new() + .name(name.to_string()) + .spawn(move || { + let _guard = handle.as_ref().map(Handle::enter); + f() + }) + .unwrap_or_else(|e| panic!("failed to spawn thread {name:?}: {e}")) +} + +/// Spawns a scoped OS thread with the current tokio runtime context propagated. +/// +/// This is the scoped thread version of [`spawn_os_thread`], for use with [`std::thread::scope`]. +#[track_caller] +pub fn spawn_scoped_os_thread<'scope, 'env, F, T>( + scope: &'scope thread::Scope<'scope, 'env>, + name: &str, + f: F, +) -> thread::ScopedJoinHandle<'scope, T> +where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, +{ + let handle = Handle::try_current().ok(); + thread::Builder::new() + .name(name.to_string()) + .spawn_scoped(scope, move || { + let _guard = handle.as_ref().map(Handle::enter); + f() + }) + .unwrap_or_else(|e| panic!("failed to spawn scoped thread {name:?}: {e}")) +} + /// A type that can spawn tasks. /// /// The main purpose of this type is to abstract over [`TaskExecutor`] so it's more convenient to