mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: spawn proof workers in a separate thread (#21780)
This commit is contained in:
@@ -168,101 +168,112 @@ impl ProofWorkerHandle {
|
||||
"Spawning proof worker pools"
|
||||
);
|
||||
|
||||
let parent_span =
|
||||
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
|
||||
.entered();
|
||||
// Spawn storage workers
|
||||
for worker_id in 0..storage_worker_count {
|
||||
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
|
||||
let task_ctx_clone = task_ctx.clone();
|
||||
let work_rx_clone = storage_work_rx.clone();
|
||||
let storage_available_workers_clone = storage_available_workers.clone();
|
||||
let cached_storage_roots = cached_storage_roots.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
let cursor_metrics = ProofTaskCursorMetrics::new();
|
||||
|
||||
let _guard = span.enter();
|
||||
let worker = StorageProofWorker::new(
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
worker_id,
|
||||
storage_available_workers_clone,
|
||||
cached_storage_roots,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics,
|
||||
#[cfg(feature = "metrics")]
|
||||
cursor_metrics,
|
||||
)
|
||||
.with_v2_proofs(v2_proofs_enabled);
|
||||
if let Err(error) = worker.run() {
|
||||
error!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?error,
|
||||
"Storage worker failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(parent_span);
|
||||
|
||||
let parent_span =
|
||||
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
|
||||
.entered();
|
||||
// Spawn account workers
|
||||
for worker_id in 0..account_worker_count {
|
||||
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
|
||||
let task_ctx_clone = task_ctx.clone();
|
||||
let work_rx_clone = account_work_rx.clone();
|
||||
let storage_work_tx_clone = storage_work_tx.clone();
|
||||
let account_available_workers_clone = account_available_workers.clone();
|
||||
let cached_storage_roots = cached_storage_roots.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
let cursor_metrics = ProofTaskCursorMetrics::new();
|
||||
|
||||
let _guard = span.enter();
|
||||
let worker = AccountProofWorker::new(
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
worker_id,
|
||||
storage_work_tx_clone,
|
||||
account_available_workers_clone,
|
||||
cached_storage_roots,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics,
|
||||
#[cfg(feature = "metrics")]
|
||||
cursor_metrics,
|
||||
)
|
||||
.with_v2_proofs(v2_proofs_enabled);
|
||||
if let Err(error) = worker.run() {
|
||||
error!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?error,
|
||||
"Account worker failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(parent_span);
|
||||
|
||||
Self {
|
||||
storage_work_tx,
|
||||
let this = Self {
|
||||
storage_work_tx: storage_work_tx.clone(),
|
||||
account_work_tx,
|
||||
storage_available_workers,
|
||||
account_available_workers,
|
||||
storage_available_workers: storage_available_workers.clone(),
|
||||
account_available_workers: account_available_workers.clone(),
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
v2_proofs_enabled,
|
||||
}
|
||||
};
|
||||
|
||||
// Clone for the first spawn_blocking (storage workers)
|
||||
let task_ctx_for_storage = task_ctx.clone();
|
||||
let executor_for_storage = executor.clone();
|
||||
let cached_storage_roots_for_storage = cached_storage_roots.clone();
|
||||
|
||||
executor_for_storage.clone().spawn_blocking(move || {
|
||||
let parent_span =
|
||||
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
|
||||
.entered();
|
||||
// Spawn storage workers
|
||||
for worker_id in 0..storage_worker_count {
|
||||
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
|
||||
let task_ctx_clone = task_ctx_for_storage.clone();
|
||||
let work_rx_clone = storage_work_rx.clone();
|
||||
let storage_available_workers_clone = storage_available_workers.clone();
|
||||
let cached_storage_roots = cached_storage_roots_for_storage.clone();
|
||||
|
||||
executor_for_storage.spawn_blocking(move || {
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
let cursor_metrics = ProofTaskCursorMetrics::new();
|
||||
|
||||
let _guard = span.enter();
|
||||
let worker = StorageProofWorker::new(
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
worker_id,
|
||||
storage_available_workers_clone,
|
||||
cached_storage_roots,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics,
|
||||
#[cfg(feature = "metrics")]
|
||||
cursor_metrics,
|
||||
)
|
||||
.with_v2_proofs(v2_proofs_enabled);
|
||||
if let Err(error) = worker.run() {
|
||||
error!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?error,
|
||||
"Storage worker failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(parent_span);
|
||||
});
|
||||
|
||||
executor.clone().spawn_blocking(move || {
|
||||
let parent_span =
|
||||
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
|
||||
.entered();
|
||||
// Spawn account workers
|
||||
for worker_id in 0..account_worker_count {
|
||||
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
|
||||
let task_ctx_clone = task_ctx.clone();
|
||||
let work_rx_clone = account_work_rx.clone();
|
||||
let storage_work_tx_clone = storage_work_tx.clone();
|
||||
let account_available_workers_clone = account_available_workers.clone();
|
||||
let cached_storage_roots = cached_storage_roots.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
let cursor_metrics = ProofTaskCursorMetrics::new();
|
||||
|
||||
let _guard = span.enter();
|
||||
let worker = AccountProofWorker::new(
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
worker_id,
|
||||
storage_work_tx_clone,
|
||||
account_available_workers_clone,
|
||||
cached_storage_roots,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics,
|
||||
#[cfg(feature = "metrics")]
|
||||
cursor_metrics,
|
||||
)
|
||||
.with_v2_proofs(v2_proofs_enabled);
|
||||
if let Err(error) = worker.run() {
|
||||
error!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?error,
|
||||
"Account worker failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(parent_span);
|
||||
});
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
/// Returns whether V2 storage proofs are enabled for this worker pool.
|
||||
|
||||
Reference in New Issue
Block a user