mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
chore: centralize thread::spawn to share tokio handles (#21754)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10248,6 +10248,7 @@ dependencies = [
|
||||
"reth-static-file-types",
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-tasks",
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
"reth-trie",
|
||||
|
||||
@@ -11,6 +11,7 @@ use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_provider::providers::ProviderNodeTypes;
|
||||
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
thread,
|
||||
@@ -230,7 +231,7 @@ impl Command {
|
||||
thread::scope(|s| {
|
||||
let handles: Vec<_> = (0..num_threads)
|
||||
.map(|thread_id| {
|
||||
s.spawn(move || {
|
||||
spawn_scoped_os_thread(s, "db-state-worker", move || {
|
||||
loop {
|
||||
// Get next chunk to process
|
||||
let chunk_idx = {
|
||||
|
||||
@@ -83,22 +83,7 @@ impl CliRunner {
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
// complete.
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(tokio_runtime);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -137,19 +122,7 @@ impl CliRunner {
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
// Shutdown the runtime on a separate thread
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(tokio_runtime);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -179,13 +152,7 @@ impl CliRunner {
|
||||
tokio_runtime
|
||||
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
|
||||
|
||||
// drop the tokio runtime on a separate thread because drop blocks until its pools
|
||||
// (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
|
||||
// the current thread but we want to exit right away.
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || drop(tokio_runtime))
|
||||
.unwrap();
|
||||
tokio_shutdown(tokio_runtime, false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -321,3 +288,27 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
|
||||
///
|
||||
/// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
/// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
/// complete.
|
||||
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
// Shutdown the runtime on a separate thread
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(rt);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
if wait {
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
|
||||
use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc::{Receiver, SendError, Sender},
|
||||
@@ -264,14 +265,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
// spawn the persistence service
|
||||
let db_service =
|
||||
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
|
||||
let join_handle = std::thread::Builder::new()
|
||||
.name("persistence".to_string())
|
||||
.spawn(|| {
|
||||
if let Err(err) = db_service.run() {
|
||||
error!(target: "engine::persistence", ?err, "Persistence service failed");
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
let join_handle = spawn_os_thread("persistence", || {
|
||||
if let Err(err) = db_service.run() {
|
||||
error!(target: "engine::persistence", ?err, "Persistence service failed");
|
||||
}
|
||||
});
|
||||
|
||||
PersistenceHandle {
|
||||
sender: db_service_tx,
|
||||
|
||||
@@ -37,6 +37,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use revm::state::EvmState;
|
||||
use state::TreeState;
|
||||
@@ -431,7 +432,7 @@ where
|
||||
changeset_cache,
|
||||
);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
std::thread::Builder::new().name("engine".to_string()).spawn(|| task.run()).unwrap();
|
||||
spawn_os_thread("engine", || task.run());
|
||||
(incoming, outgoing)
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ use reth_ethereum_primitives::{Block, EthPrimitives};
|
||||
use reth_evm_ethereum::MockEvmConfig;
|
||||
use reth_primitives_traits::Block as _;
|
||||
use reth_provider::test_utils::MockEthProvider;
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
str::FromStr,
|
||||
@@ -538,10 +539,7 @@ async fn test_tree_persist_blocks() {
|
||||
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
|
||||
.collect();
|
||||
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
|
||||
std::thread::Builder::new()
|
||||
.name("Engine Task".to_string())
|
||||
.spawn(|| test_harness.tree.run())
|
||||
.unwrap();
|
||||
spawn_os_thread("engine", || test_harness.tree.run());
|
||||
|
||||
// send a message to the tree to enter the main loop.
|
||||
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
@@ -1989,10 +1987,7 @@ mod forkchoice_updated_tests {
|
||||
let action_rx = test_harness.action_rx;
|
||||
|
||||
// Spawn tree in background thread
|
||||
std::thread::Builder::new()
|
||||
.name("Engine Task".to_string())
|
||||
.spawn(|| test_harness.tree.run())
|
||||
.unwrap();
|
||||
spawn_os_thread("engine", || test_harness.tree.run());
|
||||
|
||||
// Send terminate request
|
||||
to_tree_tx
|
||||
|
||||
@@ -31,6 +31,7 @@ reth-codecs.workspace = true
|
||||
reth-chain-state.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-static-file-types.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
# ethereum
|
||||
alloy-eips.workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
|
||||
@@ -64,6 +64,7 @@ use reth_storage_api::{
|
||||
StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use reth_trie::{
|
||||
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
|
||||
HashedPostStateSorted, StoredNibbles,
|
||||
@@ -552,28 +553,20 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
thread::scope(|s| {
|
||||
// SF writes
|
||||
let sf_handle = thread::Builder::new()
|
||||
.name("static-files".into())
|
||||
.spawn_scoped(s, || {
|
||||
let start = Instant::now();
|
||||
sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
})
|
||||
// Same panic happens in `scope.spawn`
|
||||
.expect("failed to spawn thread");
|
||||
let sf_handle = spawn_scoped_os_thread(s, "static-files", || {
|
||||
let start = Instant::now();
|
||||
sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
});
|
||||
|
||||
// RocksDB writes
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
|
||||
thread::Builder::new()
|
||||
.name("rocksdb".into())
|
||||
.spawn_scoped(s, || {
|
||||
let start = Instant::now();
|
||||
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
})
|
||||
// Same panic happens in `scope.spawn`
|
||||
.expect("failed to spawn thread")
|
||||
spawn_scoped_os_thread(s, "rocksdb", || {
|
||||
let start = Instant::now();
|
||||
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
})
|
||||
});
|
||||
|
||||
// MDBX writes
|
||||
|
||||
@@ -21,6 +21,7 @@ use reth_storage_errors::{
|
||||
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
|
||||
provider::{ProviderError, ProviderResult},
|
||||
};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use rocksdb::{
|
||||
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
|
||||
DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
|
||||
@@ -1152,13 +1153,21 @@ impl RocksDBProvider {
|
||||
let handles: Vec<_> = [
|
||||
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
|
||||
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
|
||||
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
|
||||
ctx.storage_settings
|
||||
.account_history_in_rocksdb
|
||||
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
|
||||
ctx.storage_settings
|
||||
.storages_history_in_rocksdb
|
||||
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
|
||||
.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-tx-hash", || {
|
||||
self.write_tx_hash_numbers(blocks, tx_nums, &ctx)
|
||||
})
|
||||
}),
|
||||
ctx.storage_settings.account_history_in_rocksdb.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-account-history", || {
|
||||
self.write_account_history(blocks, &ctx)
|
||||
})
|
||||
}),
|
||||
ctx.storage_settings.storages_history_in_rocksdb.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-storage-history", || {
|
||||
self.write_storage_history(blocks, &ctx)
|
||||
})
|
||||
}),
|
||||
]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
|
||||
@@ -47,6 +47,7 @@ use reth_storage_api::{
|
||||
StorageChangeSetReader, StorageSettingsCache,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
@@ -669,15 +670,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
where
|
||||
F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env,
|
||||
{
|
||||
thread::Builder::new()
|
||||
.name(segment.as_short_str().into())
|
||||
.spawn_scoped(scope, move || {
|
||||
let mut w = self.get_writer(first_block_number, segment)?;
|
||||
f(&mut w)?;
|
||||
w.sync_all()
|
||||
})
|
||||
// Same panic happens in `scope.spawn`
|
||||
.expect("failed to spawn thread")
|
||||
spawn_scoped_os_thread(scope, segment.as_short_str(), move || {
|
||||
let mut w = self.get_writer(first_block_number, segment)?;
|
||||
f(&mut w)?;
|
||||
w.sync_all()
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes all static file data for multiple blocks in parallel per-segment.
|
||||
|
||||
@@ -30,6 +30,7 @@ use std::{
|
||||
Arc, OnceLock,
|
||||
},
|
||||
task::{ready, Context, Poll},
|
||||
thread,
|
||||
};
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
@@ -48,6 +49,51 @@ pub mod pool;
|
||||
/// Global [`TaskExecutor`] instance that can be accessed from anywhere.
|
||||
static GLOBAL_EXECUTOR: OnceLock<TaskExecutor> = OnceLock::new();
|
||||
|
||||
/// Spawns an OS thread with the current tokio runtime context propagated.
|
||||
///
|
||||
/// This function captures the current tokio runtime handle (if available) and enters it
|
||||
/// in the newly spawned thread. This ensures that code running in the spawned thread can
|
||||
/// use [`Handle::current()`], [`Handle::spawn_blocking()`], and other tokio utilities that
|
||||
/// require a runtime context.
|
||||
#[track_caller]
|
||||
pub fn spawn_os_thread<F, T>(name: &str, f: F) -> thread::JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let handle = Handle::try_current().ok();
|
||||
thread::Builder::new()
|
||||
.name(name.to_string())
|
||||
.spawn(move || {
|
||||
let _guard = handle.as_ref().map(Handle::enter);
|
||||
f()
|
||||
})
|
||||
.unwrap_or_else(|e| panic!("failed to spawn thread {name:?}: {e}"))
|
||||
}
|
||||
|
||||
/// Spawns a scoped OS thread with the current tokio runtime context propagated.
|
||||
///
|
||||
/// This is the scoped thread version of [`spawn_os_thread`], for use with [`std::thread::scope`].
|
||||
#[track_caller]
|
||||
pub fn spawn_scoped_os_thread<'scope, 'env, F, T>(
|
||||
scope: &'scope thread::Scope<'scope, 'env>,
|
||||
name: &str,
|
||||
f: F,
|
||||
) -> thread::ScopedJoinHandle<'scope, T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'scope,
|
||||
T: Send + 'scope,
|
||||
{
|
||||
let handle = Handle::try_current().ok();
|
||||
thread::Builder::new()
|
||||
.name(name.to_string())
|
||||
.spawn_scoped(scope, move || {
|
||||
let _guard = handle.as_ref().map(Handle::enter);
|
||||
f()
|
||||
})
|
||||
.unwrap_or_else(|e| panic!("failed to spawn scoped thread {name:?}: {e}"))
|
||||
}
|
||||
|
||||
/// A type that can spawn tasks.
|
||||
///
|
||||
/// The main purpose of this type is to abstract over [`TaskExecutor`] so it's more convenient to
|
||||
|
||||
Reference in New Issue
Block a user