refactor: replace spawn_proof_workers with ProofTaskManagerHandle

- Updated the code to utilize ProofTaskManagerHandle for spawning proof workers instead of the deprecated spawn_proof_workers function.
- This change enhances code clarity and maintainability by consolidating the worker management logic within the ProofTaskManagerHandle struct.
This commit is contained in:
Yong Kang
2025-10-13 09:19:28 +00:00
parent 6a4766ce47
commit f9e167ec39
4 changed files with 90 additions and 88 deletions

View File

@@ -32,7 +32,7 @@ use reth_provider::{
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie_parallel::{
proof_task::{spawn_proof_workers, ProofTaskCtx},
proof_task::{ProofTaskCtx, ProofTaskManagerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
@@ -203,7 +203,7 @@ where
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_handle = match spawn_proof_workers(
let proof_handle = match ProofTaskManagerHandle::new(
self.executor.handle().clone(),
consistent_view,
task_ctx,

View File

@@ -1212,7 +1212,7 @@ mod tests {
DatabaseProviderFactory,
};
use reth_trie::{MultiProof, TrieInput};
use reth_trie_parallel::proof_task::{spawn_proof_workers, ProofTaskCtx};
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManagerHandle};
use revm_primitives::{B256, U256};
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1228,7 +1228,7 @@ mod tests {
);
let consistent_view = ConsistentDbView::new(factory, None);
let proof_handle =
spawn_proof_workers(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
ProofTaskManagerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
.expect("Failed to spawn proof workers");
let channel = channel();

View File

@@ -224,7 +224,7 @@ impl ParallelProof {
#[cfg(test)]
mod tests {
use super::*;
use crate::proof_task::{spawn_proof_workers, ProofTaskCtx};
use crate::proof_task::{ProofTaskCtx, ProofTaskManagerHandle};
use alloy_primitives::{
keccak256,
map::{B256Set, DefaultHashBuilder, HashMap},
@@ -307,7 +307,8 @@ mod tests {
let task_ctx =
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
let proof_task_handle =
spawn_proof_workers(rt.handle().clone(), consistent_view, task_ctx, 1, 1).unwrap();
ProofTaskManagerHandle::new(rt.handle().clone(), consistent_view, task_ctx, 1, 1)
.unwrap();
let parallel_result = ParallelProof::new(
Default::default(),

View File

@@ -90,82 +90,6 @@ enum StorageWorkerJob {
},
}
/// Spawns storage and account worker pools with dedicated database transactions.
///
/// Returns a handle for submitting proof tasks to the worker pools.
/// Workers run until the last handle is dropped.
///
/// # Parameters
/// - `executor`: Tokio runtime handle for spawning blocking tasks
/// - `view`: Consistent database view for creating transactions
/// - `task_ctx`: Shared context with trie updates and prefix sets
/// - `storage_worker_count`: Number of storage workers to spawn
/// - `account_worker_count`: Number of account workers to spawn
pub fn spawn_proof_workers<Factory>(
executor: Handle,
view: ConsistentDbView<Factory>,
task_ctx: ProofTaskCtx,
storage_worker_count: usize,
account_worker_count: usize,
) -> ProviderResult<ProofTaskManagerHandle>
where
Factory: DatabaseProviderFactory<Provider: BlockReader>,
{
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
tracing::info!(
target: "trie::proof_task",
storage_worker_count,
account_worker_count,
"Spawning proof worker pools"
);
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let provider_ro = view.provider_ro()?;
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx_clone = storage_work_rx.clone();
executor
.spawn_blocking(move || storage_worker_loop(proof_task_tx, work_rx_clone, worker_id));
tracing::debug!(
target: "trie::proof_task",
worker_id,
"Storage worker spawned successfully"
);
}
// Spawn account workers
for worker_id in 0..account_worker_count {
let provider_ro = view.provider_ro()?;
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
executor.spawn_blocking(move || {
account_worker_loop(proof_task_tx, work_rx_clone, storage_work_tx_clone, worker_id)
});
tracing::debug!(
target: "trie::proof_task",
worker_id,
"Account worker spawned successfully"
);
}
Ok(ProofTaskManagerHandle::new(
storage_work_tx,
account_work_tx,
Arc::new(AtomicUsize::new(0)),
#[cfg(feature = "metrics")]
Arc::new(ProofTaskMetrics::default()),
))
}
/// Worker loop for storage trie operations.
///
/// # Lifecycle
@@ -921,11 +845,87 @@ pub struct ProofTaskManagerHandle {
}
impl ProofTaskManagerHandle {
/// Spawns storage and account worker pools with dedicated database transactions.
///
/// Returns a handle for submitting proof tasks to the worker pools.
/// Workers run until the last handle is dropped.
///
/// # Parameters
/// - `executor`: Tokio runtime handle for spawning blocking tasks
/// - `view`: Consistent database view for creating transactions
/// - `task_ctx`: Shared context with trie updates and prefix sets
/// - `storage_worker_count`: Number of storage workers to spawn
/// - `account_worker_count`: Number of account workers to spawn
pub fn new<Factory>(
executor: Handle,
view: ConsistentDbView<Factory>,
task_ctx: ProofTaskCtx,
storage_worker_count: usize,
account_worker_count: usize,
) -> ProviderResult<Self>
where
Factory: DatabaseProviderFactory<Provider: BlockReader>,
{
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
tracing::info!(
target: "trie::proof_task",
storage_worker_count,
account_worker_count,
"Spawning proof worker pools"
);
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let provider_ro = view.provider_ro()?;
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx_clone = storage_work_rx.clone();
executor.spawn_blocking(move || {
storage_worker_loop(proof_task_tx, work_rx_clone, worker_id)
});
tracing::debug!(
target: "trie::proof_task",
worker_id,
"Storage worker spawned successfully"
);
}
// Spawn account workers
for worker_id in 0..account_worker_count {
let provider_ro = view.provider_ro()?;
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
executor.spawn_blocking(move || {
account_worker_loop(proof_task_tx, work_rx_clone, storage_work_tx_clone, worker_id)
});
tracing::debug!(
target: "trie::proof_task",
worker_id,
"Account worker spawned successfully"
);
}
Ok(Self::new_handle(
storage_work_tx,
account_work_tx,
Arc::new(AtomicUsize::new(0)),
#[cfg(feature = "metrics")]
Arc::new(ProofTaskMetrics::default()),
))
}
/// Creates a new [`ProofTaskManagerHandle`] with direct access to worker pools.
///
/// This is an internal constructor used by `spawn_proof_workers`.
/// External users should call `spawn_proof_workers` to create handles.
fn new(
/// This is an internal constructor used for creating handles.
fn new_handle(
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
account_work_tx: CrossbeamSender<AccountWorkerJob>,
active_handles: Arc<AtomicUsize>,
@@ -1017,7 +1017,7 @@ impl ProofTaskManagerHandle {
impl Clone for ProofTaskManagerHandle {
fn clone(&self) -> Self {
Self::new(
Self::new_handle(
self.storage_work_tx.clone(),
self.account_work_tx.clone(),
self.active_handles.clone(),
@@ -1120,7 +1120,7 @@ mod tests {
)
}
/// Ensures `spawn_proof_workers` spawns workers correctly.
/// Ensures `ProofTaskManagerHandle::new` spawns workers correctly.
#[test]
fn spawn_proof_workers_creates_handle() {
let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
@@ -1130,7 +1130,8 @@ mod tests {
let view = ConsistentDbView::new(factory, None);
let ctx = test_ctx();
let proof_handle = spawn_proof_workers(handle.clone(), view, ctx, 5, 3).unwrap();
let proof_handle =
ProofTaskManagerHandle::new(handle.clone(), view, ctx, 5, 3).unwrap();
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();