refactor: use spawn_os_thread for better tokio integration (#21788)

Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
Elaela Solis
2026-02-04 16:00:37 -03:00
committed by GitHub
parent 20d94027eb
commit 386b774ed5
5 changed files with 12 additions and 4 deletions

1
Cargo.lock generated
View File

@@ -10827,6 +10827,7 @@ dependencies = [
"reth-static-file-types",
"reth-storage-api",
"reth-storage-errors",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
"reth-trie",

View File

@@ -122,7 +122,13 @@ where
}
let tx_clone = tx.clone();
let provider = sf_provider.clone();
std::thread::spawn(move || {
let thread_name = match segment {
StaticFileSegment::Transactions => "init-state-txs",
StaticFileSegment::Receipts => "init-state-receipts",
StaticFileSegment::TransactionSenders => "init-state-senders",
_ => "init-state-segment",
};
reth_tasks::spawn_os_thread(thread_name, move || {
let result = provider.latest_writer(segment).and_then(|mut writer| {
for block_num in 1..=target_height {
writer.increment_block(block_num)?;
@@ -136,7 +142,7 @@ where
// Spawn job for appending empty headers
let provider = sf_provider.clone();
std::thread::spawn(move || {
reth_tasks::spawn_os_thread("init-state-headers", move || {
let result = provider.latest_writer(StaticFileSegment::Headers).and_then(|mut writer| {
for block_num in 1..=target_height {
// TODO: should we fill with real parent_hash?

View File

@@ -38,6 +38,7 @@ reth-storage-errors.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-static-file-types.workspace = true
reth-tasks.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-db = { workspace = true, features = ["metrics"] }

View File

@@ -295,7 +295,7 @@ where
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
reth_tasks::spawn_os_thread("sender-recovery", move || {
while let Ok(chunks) = tx_receiver.recv() {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.

View File

@@ -276,7 +276,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
/// receive `update_index` notifications from a node that appends/truncates data.
pub fn watch_directory(&self) {
let provider = self.clone();
std::thread::spawn(move || {
reth_tasks::spawn_os_thread("sf-watch", move || {
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(
move |res| tx.send(res).unwrap(),