mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf: background init of workers (#19012)
This commit is contained in:
@@ -166,8 +166,6 @@ where
|
||||
///
|
||||
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
|
||||
/// canceling)
|
||||
///
|
||||
/// Returns an error with the original transactions iterator if proof worker spawning fails.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
|
||||
&mut self,
|
||||
@@ -179,7 +177,7 @@ where
|
||||
config: &TreeConfig,
|
||||
) -> Result<
|
||||
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
|
||||
(reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
|
||||
(ParallelStateRootError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
|
||||
>
|
||||
where
|
||||
P: DatabaseProviderFactory<Provider: BlockReader>
|
||||
@@ -203,18 +201,13 @@ where
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
|
||||
let proof_handle = match ProofWorkerHandle::new(
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
consistent_view,
|
||||
task_ctx,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
) {
|
||||
Ok(handle) => handle,
|
||||
Err(error) => {
|
||||
return Err((error, transactions, env, provider_builder));
|
||||
}
|
||||
};
|
||||
);
|
||||
|
||||
// We set it to half of the proof task concurrency, because often for each multiproof we
|
||||
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
|
||||
|
||||
@@ -553,7 +553,7 @@ impl MultiproofManager {
|
||||
|
||||
let proof_result: Result<DecodedMultiProof, ParallelStateRootError> = (|| {
|
||||
let receiver = account_proof_worker_handle
|
||||
.queue_account_multiproof(input)
|
||||
.dispatch_account_multiproof(input)
|
||||
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
|
||||
|
||||
receiver
|
||||
@@ -1228,8 +1228,7 @@ mod tests {
|
||||
);
|
||||
let consistent_view = ConsistentDbView::new(factory, None);
|
||||
let proof_handle =
|
||||
ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
|
||||
.expect("Failed to spawn proof workers");
|
||||
ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1);
|
||||
let channel = channel();
|
||||
|
||||
MultiProofTask::new(config, executor, proof_handle, channel.0, 1, None)
|
||||
|
||||
@@ -193,7 +193,7 @@ impl ParallelProof {
|
||||
|
||||
let receiver = self
|
||||
.proof_worker_handle
|
||||
.queue_account_multiproof(input)
|
||||
.dispatch_account_multiproof(input)
|
||||
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
|
||||
|
||||
// Wait for account multiproof result from worker
|
||||
@@ -307,7 +307,7 @@ mod tests {
|
||||
let task_ctx =
|
||||
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
|
||||
let proof_worker_handle =
|
||||
ProofWorkerHandle::new(rt.handle().clone(), consistent_view, task_ctx, 1, 1).unwrap();
|
||||
ProofWorkerHandle::new(rt.handle().clone(), consistent_view, task_ctx, 1, 1);
|
||||
|
||||
let parallel_result = ParallelProof::new(
|
||||
Default::default(),
|
||||
|
||||
@@ -29,7 +29,6 @@ use reth_db_api::transaction::DbTx;
|
||||
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
|
||||
use reth_provider::{
|
||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
|
||||
ProviderResult,
|
||||
};
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
use reth_trie::{
|
||||
@@ -112,14 +111,20 @@ enum StorageWorkerJob {
|
||||
/// # Shutdown
|
||||
///
|
||||
/// Worker shuts down when the crossbeam channel closes (all senders dropped).
|
||||
fn storage_worker_loop<Tx>(
|
||||
proof_tx: ProofTaskTx<Tx>,
|
||||
fn storage_worker_loop<Factory>(
|
||||
view: ConsistentDbView<Factory>,
|
||||
task_ctx: ProofTaskCtx,
|
||||
work_rx: CrossbeamReceiver<StorageWorkerJob>,
|
||||
worker_id: usize,
|
||||
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
|
||||
) where
|
||||
Tx: DbTx,
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader>,
|
||||
{
|
||||
// Create db transaction before entering work loop
|
||||
let provider =
|
||||
view.provider_ro().expect("Storage worker failed to initialize: database unavailable");
|
||||
let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id);
|
||||
|
||||
tracing::debug!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
@@ -258,15 +263,21 @@ fn storage_worker_loop<Tx>(
|
||||
/// # Shutdown
|
||||
///
|
||||
/// Worker shuts down when the crossbeam channel closes (all senders dropped).
|
||||
fn account_worker_loop<Tx>(
|
||||
proof_tx: ProofTaskTx<Tx>,
|
||||
fn account_worker_loop<Factory>(
|
||||
view: ConsistentDbView<Factory>,
|
||||
task_ctx: ProofTaskCtx,
|
||||
work_rx: CrossbeamReceiver<AccountWorkerJob>,
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
worker_id: usize,
|
||||
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
|
||||
) where
|
||||
Tx: DbTx,
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader>,
|
||||
{
|
||||
// Create db transaction before entering work loop
|
||||
let provider =
|
||||
view.provider_ro().expect("Account worker failed to initialize: database unavailable");
|
||||
let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id);
|
||||
|
||||
tracing::debug!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
@@ -308,7 +319,7 @@ fn account_worker_loop<Tx>(
|
||||
);
|
||||
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
|
||||
|
||||
let storage_proof_receivers = match queue_storage_proofs(
|
||||
let storage_proof_receivers = match dispatch_storage_proofs(
|
||||
&storage_work_tx,
|
||||
&input.targets,
|
||||
&mut storage_prefix_sets,
|
||||
@@ -568,7 +579,7 @@ where
|
||||
/// computation. This enables interleaved parallelism for better performance.
|
||||
///
|
||||
/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
|
||||
fn queue_storage_proofs(
|
||||
fn dispatch_storage_proofs(
|
||||
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
|
||||
targets: &MultiProofTargets,
|
||||
storage_prefix_sets: &mut B256Map<PrefixSet>,
|
||||
@@ -864,9 +875,9 @@ impl ProofWorkerHandle {
|
||||
task_ctx: ProofTaskCtx,
|
||||
storage_worker_count: usize,
|
||||
account_worker_count: usize,
|
||||
) -> ProviderResult<Self>
|
||||
) -> Self
|
||||
where
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader>,
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
|
||||
{
|
||||
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
|
||||
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
|
||||
@@ -880,9 +891,8 @@ impl ProofWorkerHandle {
|
||||
|
||||
// Spawn storage workers
|
||||
for worker_id in 0..storage_worker_count {
|
||||
let provider_ro = view.provider_ro()?;
|
||||
let tx = provider_ro.into_tx();
|
||||
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
|
||||
let view_clone = view.clone();
|
||||
let task_ctx_clone = task_ctx.clone();
|
||||
let work_rx_clone = storage_work_rx.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
@@ -890,7 +900,8 @@ impl ProofWorkerHandle {
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
|
||||
storage_worker_loop(
|
||||
proof_task_tx,
|
||||
view_clone,
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
worker_id,
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -907,9 +918,8 @@ impl ProofWorkerHandle {
|
||||
|
||||
// Spawn account workers
|
||||
for worker_id in 0..account_worker_count {
|
||||
let provider_ro = view.provider_ro()?;
|
||||
let tx = provider_ro.into_tx();
|
||||
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
|
||||
let view_clone = view.clone();
|
||||
let task_ctx_clone = task_ctx.clone();
|
||||
let work_rx_clone = account_work_rx.clone();
|
||||
let storage_work_tx_clone = storage_work_tx.clone();
|
||||
|
||||
@@ -918,7 +928,8 @@ impl ProofWorkerHandle {
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
|
||||
account_worker_loop(
|
||||
proof_task_tx,
|
||||
view_clone,
|
||||
task_ctx_clone,
|
||||
work_rx_clone,
|
||||
storage_work_tx_clone,
|
||||
worker_id,
|
||||
@@ -934,7 +945,7 @@ impl ProofWorkerHandle {
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self::new_handle(storage_work_tx, account_work_tx))
|
||||
Self::new_handle(storage_work_tx, account_work_tx)
|
||||
}
|
||||
|
||||
/// Creates a new [`ProofWorkerHandle`] with direct access to worker pools.
|
||||
@@ -963,7 +974,7 @@ impl ProofWorkerHandle {
|
||||
}
|
||||
|
||||
/// Queue an account multiproof computation
|
||||
pub fn queue_account_multiproof(
|
||||
pub fn dispatch_account_multiproof(
|
||||
&self,
|
||||
input: AccountMultiproofInput,
|
||||
) -> Result<Receiver<AccountMultiproofResult>, ProviderError> {
|
||||
@@ -1091,7 +1102,7 @@ mod tests {
|
||||
let view = ConsistentDbView::new(factory, None);
|
||||
let ctx = test_ctx();
|
||||
|
||||
let proof_handle = ProofWorkerHandle::new(handle.clone(), view, ctx, 5, 3).unwrap();
|
||||
let proof_handle = ProofWorkerHandle::new(handle.clone(), view, ctx, 5, 3);
|
||||
|
||||
// Verify handle can be cloned
|
||||
let _cloned_handle = proof_handle.clone();
|
||||
|
||||
Reference in New Issue
Block a user