From 386b774ed5a21205933bc4e62a3ead8c5156beef Mon Sep 17 00:00:00 2001 From: Elaela Solis Date: Wed, 4 Feb 2026 16:00:37 -0300 Subject: [PATCH] refactor: use spawn_os_thread for better tokio integration (#21788) Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com> --- Cargo.lock | 1 + crates/cli/commands/src/init_state/without_evm.rs | 10 ++++++++-- crates/stages/stages/Cargo.toml | 1 + crates/stages/stages/src/stages/sender_recovery.rs | 2 +- .../provider/src/providers/static_file/manager.rs | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a46ba783db..78fe63fef0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10827,6 +10827,7 @@ dependencies = [ "reth-static-file-types", "reth-storage-api", "reth-storage-errors", + "reth-tasks", "reth-testing-utils", "reth-tracing", "reth-trie", diff --git a/crates/cli/commands/src/init_state/without_evm.rs b/crates/cli/commands/src/init_state/without_evm.rs index 2d36bb67cf..5583bb389c 100644 --- a/crates/cli/commands/src/init_state/without_evm.rs +++ b/crates/cli/commands/src/init_state/without_evm.rs @@ -122,7 +122,13 @@ where } let tx_clone = tx.clone(); let provider = sf_provider.clone(); - std::thread::spawn(move || { + let thread_name = match segment { + StaticFileSegment::Transactions => "init-state-txs", + StaticFileSegment::Receipts => "init-state-receipts", + StaticFileSegment::TransactionSenders => "init-state-senders", + _ => "init-state-segment", + }; + reth_tasks::spawn_os_thread(thread_name, move || { let result = provider.latest_writer(segment).and_then(|mut writer| { for block_num in 1..=target_height { writer.increment_block(block_num)?; @@ -136,7 +142,7 @@ where // Spawn job for appending empty headers let provider = sf_provider.clone(); - std::thread::spawn(move || { + reth_tasks::spawn_os_thread("init-state-headers", move || { let result = provider.latest_writer(StaticFileSegment::Headers).and_then(|mut writer| { for block_num in 1..=target_height { // TODO: should we fill with real parent_hash? diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index e7be76cd7c..63a6ca17c0 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -38,6 +38,7 @@ reth-storage-errors.workspace = true reth-revm.workspace = true reth-stages-api.workspace = true reth-static-file-types.workspace = true +reth-tasks.workspace = true reth-trie = { workspace = true, features = ["metrics"] } reth-trie-db = { workspace = true, features = ["metrics"] } diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index 67df242168..a50fd3396b 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -295,7 +295,7 @@ where // // However, using `std::thread::spawn` allows us to utilize the timeout grace // period to complete some work without throwing errors during the shutdown. - std::thread::spawn(move || { + reth_tasks::spawn_os_thread("sender-recovery", move || { while let Ok(chunks) = tx_receiver.recv() { for (chunk_range, recovered_senders_tx) in chunks { // Read the raw value, and let the rayon worker to decompress & decode. diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 1ccc4771ec..cd2cfb216d 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -276,7 +276,7 @@ impl StaticFileProvider { /// receive `update_index` notifications from a node that appends/truncates data. pub fn watch_directory(&self) { let provider = self.clone(); - std::thread::spawn(move || { + reth_tasks::spawn_os_thread("sf-watch", move || { let (tx, rx) = std::sync::mpsc::channel(); let mut watcher = RecommendedWatcher::new( move |res| tx.send(res).unwrap(),