feat(storage): introduce storage proof worker pool

- Added configuration for maximum and minimum storage proof workers.
- Implemented a worker pool for processing storage proof tasks, improving efficiency by reusing transactions.
- Updated `ProofTaskManager` to handle storage proof tasks via a dedicated channel.
- Enhanced metrics to track storage proof requests and fallback scenarios.
- Adjusted existing tests to accommodate the new storage worker functionality.
This commit is contained in:
Yong Kang
2025-10-07 06:35:27 +00:00
parent 691b14bfca
commit badc3b7fb9
6 changed files with 505 additions and 316 deletions

View File

@@ -32,6 +32,15 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
/// Maximum number of storage proof workers
const MAX_STORAGE_PROOF_WORKERS: usize = 12;
/// Minimum number of storage proof workers
const MIN_STORAGE_PROOF_WORKERS: usize = 2;
/// Default ratio of storage proof workers to max_proof_task_concurrency
const DEFAULT_STORAGE_PROOF_WORKER_RATIO: f32 = 0.5;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -121,8 +130,9 @@ pub struct TreeConfig {
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of dedicated storage proof workers.
/// If None, defaults to half of max_proof_task_concurrency.
storage_proof_workers: Option<usize>,
}
impl Default for TreeConfig {
@@ -149,7 +159,7 @@ impl Default for TreeConfig {
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
storage_proof_workers: None,
}
}
}
@@ -179,7 +189,7 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
storage_proof_workers: Option<usize>,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
Self {
@@ -204,7 +214,7 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
storage_proof_workers,
}
}
@@ -472,14 +482,30 @@ impl TreeConfig {
self.prewarm_max_concurrency
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
/// Get the number of storage proof workers.
///
/// Defaults to half of max_proof_task_concurrency, clamped to valid range.
pub fn storage_proof_workers(&self) -> usize {
self.storage_proof_workers.unwrap_or_else(|| {
let derived = (self.max_proof_task_concurrency as f32 *
DEFAULT_STORAGE_PROOF_WORKER_RATIO) as usize;
derived.clamp(MIN_STORAGE_PROOF_WORKERS, MAX_STORAGE_PROOF_WORKERS)
})
}
/// Setter for the number of storage proof worker threads.
pub const fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
self.storage_worker_count = storage_worker_count;
/// Set the number of storage proof workers explicitly.
///
/// Value is clamped to [MIN_STORAGE_PROOF_WORKERS, MAX_STORAGE_PROOF_WORKERS].
pub const fn with_storage_proof_workers(mut self, workers: usize) -> Self {
// Note: Can't use clamp in const fn, so we'll do manual clamping
let clamped = if workers < MIN_STORAGE_PROOF_WORKERS {
MIN_STORAGE_PROOF_WORKERS
} else if workers > MAX_STORAGE_PROOF_WORKERS {
MAX_STORAGE_PROOF_WORKERS
} else {
workers
};
self.storage_proof_workers = Some(clamped);
self
}
}

View File

@@ -203,19 +203,14 @@ where
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let storage_worker_count = config.storage_worker_count();
let proof_task = match ProofTaskManager::new(
let storage_worker_count = config.storage_proof_workers();
let proof_task = ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
max_proof_task_concurrency,
storage_worker_count,
) {
Ok(task) => task,
Err(error) => {
return Err((error, transactions, env, provider_builder));
}
};
);
// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.

View File

@@ -1236,9 +1236,8 @@ mod tests {
config.consistent_view.clone(),
task_ctx,
1,
1,
)
.expect("Failed to create ProofTaskManager");
1, // storage_worker_count: 1 for tests
);
let channel = channel();
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)

View File

@@ -448,8 +448,7 @@ mod tests {
let task_ctx =
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
let proof_task =
ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1, 1)
.unwrap();
ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1, 1);
let proof_task_handle = proof_task.handle();
// keep the join handle around to make sure it does not return any errors

View File

@@ -10,7 +10,7 @@
use crate::root::ParallelStateRootError;
use alloy_primitives::{map::B256Set, B256};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use reth_db_api::transaction::DbTx;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{
@@ -49,96 +49,91 @@ use crate::proof_task_metrics::ProofTaskMetrics;
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Internal message for storage workers.
/// Internal message for storage proof workers.
///
/// This is NOT exposed publicly. External callers use `ProofTaskKind::StorageProof` or
/// `ProofTaskKind::BlindedStorageNode` which are routed through the manager's `std::mpsc` channel.
/// This is NOT exposed publicly. External callers still use `ProofTaskKind::StorageProof`
/// which is routed through the manager's std::mpsc channel.
#[derive(Debug)]
enum StorageWorkerJob {
/// Storage proof computation request
StorageProof {
/// Storage proof input parameters
input: StorageProofInput,
/// Channel to send result back to original caller
result_sender: Sender<StorageProofResult>,
},
/// Blinded storage node retrieval request
BlindedStorageNode {
/// Target account
account: B256,
/// Path to the storage node
path: Nibbles,
/// Channel to send result back to original caller
result_sender: Sender<TrieNodeProviderResult>,
},
}
impl StorageWorkerJob {
/// Sends an error back to the caller when worker pool is unavailable.
struct StorageProofJob {
/// Storage proof input parameters
input: StorageProofInput,
/// Channel to send result back to original caller
///
/// Returns `Ok(())` if the error was sent successfully, or `Err(())` if the receiver was
/// dropped.
fn send_worker_unavailable_error(&self) -> Result<(), ()> {
let error =
ParallelStateRootError::Other("Storage proof worker pool unavailable".to_string());
match self {
Self::StorageProof { result_sender, .. } => {
result_sender.send(Err(error)).map_err(|_| ())
}
Self::BlindedStorageNode { result_sender, .. } => result_sender
.send(Err(SparseTrieError::from(SparseTrieErrorKind::Other(Box::new(error)))))
.map_err(|_| ()),
}
}
/// This is the same std::mpsc::Sender that the external caller provided in
/// ProofTaskKind::StorageProof(input, sender).
result_sender: Sender<StorageProofResult>,
}
/// Manager for coordinating proof request execution across different task types.
/// Internal message for on-demand task execution.
///
/// # Architecture
/// These tasks are executed with lazily-created transactions that are
/// returned to the pool after use (same as current behavior).
#[derive(Debug)]
enum OnDemandTask {
/// Fetch a blinded account node by path
BlindedAccountNode(Nibbles, Sender<TrieNodeProviderResult>),
/// Fetch a blinded storage node by account and path
BlindedStorageNode(B256, Nibbles, Sender<TrieNodeProviderResult>),
}
/// A task that manages sending proof requests to worker pools and on-demand tasks.
///
/// This manager handles two distinct execution paths:
/// # Architecture (PR1: Storage Workers Only)
///
/// 1. **Storage Worker Pool** (for storage trie operations):
/// This manager maintains two execution paths:
///
/// 1. **Storage Worker Pool** (NEW):
/// - Pre-spawned workers with dedicated long-lived transactions
/// - Handles `StorageProof` and `BlindedStorageNode` requests
/// - Tasks queued via crossbeam unbounded channel
/// - Workers continuously process without transaction overhead
/// - Unbounded queue ensures all storage proofs benefit from transaction reuse
/// - Tasks queued via crossbeam bounded channel
/// - Workers continuously process without transaction return overhead
///
/// 2. **On-Demand Execution** (for account trie operations):
/// - Lazy transaction creation for `BlindedAccountNode` requests
/// - Transactions returned to pool after use for reuse
/// 2. **On-Demand Execution** (EXISTING):
/// - Lazy transaction creation for blinded node fetches
/// - Transactions returned to pool after use (original behavior)
/// - Same message-passing mechanism as before
///
/// # Public Interface
/// # External API
///
/// The public interface through `ProofTaskManagerHandle` allows external callers to:
/// - Submit tasks via `queue_task(ProofTaskKind)`
/// - Use standard `std::mpsc` message passing
/// - Receive consistent return types and error handling
/// The external API via `ProofTaskManagerHandle` is COMPLETELY UNCHANGED:
/// - `queue_task(ProofTaskKind)` signature identical
/// - Same std::mpsc message passing
/// - Same return types and error handling
///
/// All changes are internal routing optimizations.
#[derive(Debug)]
pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
/// Sender for storage worker jobs to worker pool.
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
// ==================== STORAGE WORKER POOL (NEW) ====================
/// Sender for storage proof tasks to worker pool.
///
/// Queue capacity = storage_worker_count * 2 (for 2x buffering)
storage_work_tx: CrossbeamSender<StorageProofJob>,
/// Number of storage workers successfully spawned.
///
/// May be less than requested if concurrency limits reduce the worker budget.
/// May be less than requested if transaction creation fails.
storage_worker_count: usize,
/// Max number of database transactions to create for on-demand account trie operations.
max_concurrency: usize,
// ==================== ON-DEMAND TRANSACTION POOL (REFACTORED) ====================
/// Maximum number of on-demand transactions for blinded node fetches.
///
/// Calculated as: max_concurrency - storage_worker_count
max_on_demand_txs: usize,
/// Number of database transactions created for on-demand operations.
total_transactions: usize,
/// Currently available on-demand transactions (reused after return).
///
/// Same lifecycle as before PR1.
on_demand_txs: Vec<ProofTaskTx<FactoryTx<Factory>>>,
/// Proof tasks pending execution (account trie operations only).
pending_tasks: VecDeque<ProofTaskKind>,
/// Total on-demand transactions created (for ID assignment).
on_demand_tx_count: usize,
/// The proof task transactions, containing owned cursor factories that are reused for proof
/// calculation (account trie operations only).
proof_task_txs: Vec<ProofTaskTx<FactoryTx<Factory>>>,
/// Queue of pending on-demand tasks waiting for available transaction.
///
/// Replaces the old `pending_tasks` VecDeque which held all task types.
/// TODO: Change to VecDeque<OnDemandTask> in Phase 8 when implementing proper task routing
pending_on_demand: VecDeque<ProofTaskKind>,
// ==================== SHARED RESOURCES ====================
/// Consistent view provider used for creating transactions on-demand.
view: ConsistentDbView<Factory>,
@@ -148,10 +143,15 @@ pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
/// The underlying handle from which to spawn proof tasks.
executor: Handle,
/// Receives proof task requests from [`ProofTaskManagerHandle`].
/// A receiver for new proof task messages from external callers.
///
/// This is the std::mpsc channel connected to ProofTaskManagerHandle.
/// UNCHANGED - maintains interface compatibility.
proof_task_rx: Receiver<ProofTaskMessage<FactoryTx<Factory>>>,
/// Internal channel for on-demand tasks to return transactions after use.
/// A sender for internal messaging (transaction returns).
///
/// Used by on-demand tasks to return transactions to pool.
tx_sender: Sender<ProofTaskMessage<FactoryTx<Factory>>>,
/// The number of active handles.
@@ -165,32 +165,39 @@ pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
metrics: ProofTaskMetrics,
}
/// Worker loop for storage trie operations.
/// Worker loop for storage proof computation.
///
/// # Lifecycle
///
/// Each worker:
/// 1. Receives `StorageWorkerJob` from crossbeam unbounded channel
/// 2. Computes result using its dedicated long-lived transaction
/// 3. Sends result directly to original caller via `std::mpsc`
/// 1. Receives `StorageProofJob` from crossbeam bounded channel
/// 2. Computes proof using its dedicated long-lived transaction
/// 3. Sends result directly to original caller via std::mpsc
/// 4. Repeats until channel closes (graceful shutdown)
///
/// # Transaction Reuse
///
/// Reuses the same transaction and cursor factories across multiple operations
/// to avoid transaction creation and cursor factory setup overhead.
/// The key optimization: the worker reuses the same `proof_tx` across ALL proofs,
/// avoiding the overhead of:
/// - Creating new database transactions
/// - Setting up cursor factories
/// - Returning transactions to a pool
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
/// continue operating and the system degrades gracefully.
/// If this function panics, the worker thread terminates but:
/// - Other workers continue operating
/// - The manager detects disconnection when trying to send
/// - System degrades gracefully rather than failing completely
///
/// # Shutdown
///
/// Worker shuts down when the crossbeam channel closes (all senders dropped).
/// Worker shuts down when:
/// - Crossbeam channel closes (all senders dropped)
/// - `ProofTaskManager::run()` drops `storage_work_tx` on terminate
fn storage_worker_loop<Tx>(
proof_tx: ProofTaskTx<Tx>,
work_rx: CrossbeamReceiver<StorageWorkerJob>,
work_rx: CrossbeamReceiver<StorageProofJob>,
worker_id: usize,
) where
Tx: DbTx,
@@ -198,112 +205,73 @@ fn storage_worker_loop<Tx>(
tracing::debug!(
target: "trie::proof_task",
worker_id,
"Storage worker started"
"Storage proof worker started"
);
// Create factories once at worker startup to avoid recreation overhead.
let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories();
let mut proofs_processed = 0u64;
let start_time = Instant::now();
// Create blinded provider factory once for all blinded node requests
let blinded_provider_factory = ProofTrieNodeProviderFactory::new(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
proof_tx.task_ctx.prefix_sets.clone(),
);
// Main worker loop: process jobs until channel closes
while let Ok(StorageProofJob { input, result_sender }) = work_rx.recv() {
let proof_start = Instant::now();
let mut storage_proofs_processed = 0u64;
let mut storage_nodes_processed = 0u64;
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?input.hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots = input.target_slots.len(),
"Processing storage proof"
);
while let Ok(job) = work_rx.recv() {
match job {
StorageWorkerJob::StorageProof { input, result_sender } => {
let hashed_address = input.hashed_address;
// ==================== CORE COMPUTATION ====================
// Compute storage proof using reused transaction
// This is the key difference from on-demand execution:
// - No transaction creation overhead
// - No transaction return message
// - Cursor factories reused across proofs
let result = proof_tx.compute_storage_proof(&input);
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots = input.target_slots.len(),
"Processing storage proof"
);
let proof_elapsed = proof_start.elapsed();
proofs_processed += 1;
let proof_start = Instant::now();
let result = proof_tx.compute_storage_proof(
input,
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
);
let proof_elapsed = proof_start.elapsed();
storage_proofs_processed += 1;
if result_sender.send(result).is_err() {
tracing::debug!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
storage_proofs_processed,
"Storage proof receiver dropped, discarding result"
);
}
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
proof_time_us = proof_elapsed.as_micros(),
total_processed = storage_proofs_processed,
"Storage proof completed"
);
}
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
trace!(
target: "trie::proof_task",
worker_id,
?account,
?path,
"Processing blinded storage node"
);
let start = Instant::now();
let result =
blinded_provider_factory.storage_node_provider(account).trie_node(&path);
let elapsed = start.elapsed();
storage_nodes_processed += 1;
if result_sender.send(result).is_err() {
tracing::debug!(
target: "trie::proof_task",
worker_id,
?account,
?path,
storage_nodes_processed,
"Blinded storage node receiver dropped, discarding result"
);
}
trace!(
target: "trie::proof_task",
worker_id,
?account,
?path,
elapsed_us = elapsed.as_micros(),
total_processed = storage_nodes_processed,
"Blinded storage node completed"
);
}
// ==================== RESULT DELIVERY ====================
// Send result directly to original caller's std::mpsc::Receiver
// If receiver is dropped (caller cancelled), log and continue
if let Err(_) = result_sender.send(result) {
tracing::debug!(
target: "trie::proof_task",
worker_id,
hashed_address = ?input.hashed_address,
proofs_processed,
"Storage proof receiver dropped, discarding result"
);
}
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?input.hashed_address,
proof_time_us = proof_elapsed.as_micros(),
total_processed = proofs_processed,
"Storage proof completed"
);
}
tracing::debug!(
// Channel closed - graceful shutdown
let total_elapsed = start_time.elapsed();
tracing::info!(
target: "trie::proof_task",
worker_id,
storage_proofs_processed,
storage_nodes_processed,
"Storage worker shutting down"
proofs_processed,
uptime_secs = total_elapsed.as_secs(),
avg_proof_time_ms = if proofs_processed > 0 {
total_elapsed.as_millis() / proofs_processed as u128
} else {
0
},
"Storage proof worker shutting down"
);
}
@@ -311,61 +279,155 @@ impl<Factory> ProofTaskManager<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader>,
{
/// Creates a new [`ProofTaskManager`] with pre-spawned storage proof workers.
/// Creates a new [`ProofTaskManager`] with the given configuration.
///
/// The `storage_worker_count` determines how many storage workers to spawn, and
/// `max_concurrency` determines the limit for on-demand operations (blinded account nodes).
/// These are now independent - storage workers are spawned as requested, and on-demand
/// operations use a separate concurrency pool for blinded account nodes.
/// Returns an error if the underlying provider fails to create the transactions required for
/// spawning workers.
/// # Arguments
///
/// * `executor` - Tokio runtime handle for spawning workers and tasks
/// * `view` - Consistent database view for creating read-only transactions
/// * `task_ctx` - Shared context (trie updates, hashed state, prefix sets)
/// * `max_concurrency` - Total transaction budget across all execution paths
/// * `storage_worker_count` - Number of storage proof workers to pre-spawn
///
/// # Transaction Budget Allocation
///
/// The total `max_concurrency` is split between two pools:
///
/// 1. **Storage Workers**: `storage_worker_count` transactions (pre-allocated)
/// 2. **On-Demand Pool**: `max_concurrency - storage_worker_count` (lazy)
///
/// Example:
/// ```text
/// max_concurrency = 8, storage_worker_count = 4
/// → 4 storage workers (pre-spawned)
/// → 4 on-demand transactions (created lazily for blinded nodes)
/// Total: 8 transactions max (same capacity as before)
/// ```
///
/// # Worker Spawn Resilience
///
/// If some workers fail to spawn (e.g., transaction creation error):
/// - Failed workers are logged and skipped
/// - On-demand pool is adjusted: `max_concurrency - actual_spawned_workers`
/// - System continues with fewer workers rather than failing entirely
///
/// # Panics
///
/// Does not panic. All errors are logged and handled gracefully.
pub fn new(
executor: Handle,
view: ConsistentDbView<Factory>,
task_ctx: ProofTaskCtx,
max_concurrency: usize,
storage_worker_count: usize,
) -> ProviderResult<Self> {
) -> Self {
// Create message channel for external callers (UNCHANGED)
let (tx_sender, proof_task_rx) = channel();
// Use unbounded channel to ensure all storage operations are queued to workers.
// This maintains transaction reuse benefits and avoids fallback to on-demand execution.
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
// ==================== STORAGE WORKER POOL SETUP ====================
// Queue capacity: 2x buffering to reduce contention
// If workers = 4, queue holds 8 tasks maximum
let queue_capacity = storage_worker_count.saturating_mul(2).max(1);
let (storage_work_tx, storage_work_rx) = bounded::<StorageProofJob>(queue_capacity);
tracing::info!(
target: "trie::proof_task",
storage_worker_count,
queue_capacity,
max_concurrency,
"Initializing storage worker pool with unbounded queue"
"Initializing storage proof worker pool"
);
// Spawn storage workers - each gets its own long-lived transaction
let mut spawned_workers = 0;
for worker_id in 0..storage_worker_count {
let provider_ro = view.provider_ro()?;
// Try to create transaction for this worker
match view.provider_ro() {
Ok(provider_ro) => {
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx = storage_work_rx.clone();
let tx = provider_ro.into_tx();
let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
let work_rx = storage_work_rx.clone();
// Spawn worker on tokio blocking pool
executor.spawn_blocking(move || {
storage_worker_loop(proof_task_tx, work_rx, worker_id)
});
executor.spawn_blocking(move || storage_worker_loop(proof_task_tx, work_rx, worker_id));
spawned_workers += 1;
spawned_workers += 1;
tracing::debug!(
target: "trie::proof_task",
worker_id,
spawned_workers,
"Storage worker spawned successfully"
);
}
Err(err) => {
// Non-fatal: log and continue with fewer workers
tracing::warn!(
target: "trie::proof_task",
worker_id,
?err,
requested = storage_worker_count,
spawned_workers,
"Failed to create transaction for storage worker, continuing with fewer workers"
);
}
}
}
tracing::debug!(
// Verify we spawned at least some workers
if spawned_workers == 0 {
tracing::error!(
target: "trie::proof_task",
requested = storage_worker_count,
"Failed to spawn any storage workers - all will use on-demand pool"
);
} else if spawned_workers < storage_worker_count {
tracing::warn!(
target: "trie::proof_task",
requested = storage_worker_count,
spawned = spawned_workers,
"Spawned fewer storage workers than requested"
);
} else {
tracing::info!(
target: "trie::proof_task",
worker_id,
spawned_workers,
"Storage worker spawned successfully"
queue_capacity,
"Storage worker pool initialized successfully"
);
}
Ok(Self {
// ==================== ON-DEMAND POOL SETUP ====================
// Calculate on-demand budget: remaining capacity after storage workers
// Ensure at least 1 on-demand transaction even if storage workers consume all budget
let max_on_demand_txs = max_concurrency.saturating_sub(spawned_workers).max(1);
tracing::debug!(
target: "trie::proof_task",
max_on_demand_txs,
storage_workers = spawned_workers,
total_capacity = max_concurrency,
"Configured on-demand transaction pool for blinded nodes"
);
// ==================== CONSTRUCT MANAGER ====================
Self {
// Storage worker pool
storage_work_tx,
storage_worker_count: spawned_workers,
max_concurrency,
total_transactions: 0,
pending_tasks: VecDeque::new(),
proof_task_txs: Vec::with_capacity(max_concurrency),
// On-demand pool
max_on_demand_txs,
on_demand_txs: Vec::with_capacity(max_on_demand_txs),
on_demand_tx_count: 0,
pending_on_demand: VecDeque::new(),
// Shared resources
view,
task_ctx,
executor,
@@ -375,10 +437,15 @@ where
#[cfg(feature = "metrics")]
metrics: ProofTaskMetrics::default(),
})
}
}
/// Returns a handle for sending new proof tasks to the [`ProofTaskManager`].
/// Returns a handle for sending new proof tasks to the manager.
///
/// # Interface Compatibility
///
/// This method is UNCHANGED from the original implementation. The returned
/// `ProofTaskManagerHandle` has the exact same public API as before PR1.
pub fn handle(&self) -> ProofTaskManagerHandle<FactoryTx<Factory>> {
ProofTaskManagerHandle::new(self.tx_sender.clone(), self.active_handles.clone())
}
@@ -390,22 +457,22 @@ where
{
/// Inserts the task into the pending tasks queue.
pub fn queue_proof_task(&mut self, task: ProofTaskKind) {
self.pending_tasks.push_back(task);
self.pending_on_demand.push_back(task);
}
/// Gets either the next available transaction, or creates a new one if all are in use and the
/// total number of transactions created is less than the max concurrency.
pub fn get_or_create_tx(&mut self) -> ProviderResult<Option<ProofTaskTx<FactoryTx<Factory>>>> {
if let Some(proof_task_tx) = self.proof_task_txs.pop() {
if let Some(proof_task_tx) = self.on_demand_txs.pop() {
return Ok(Some(proof_task_tx));
}
// if we can create a new tx within our concurrency limits, create one on-demand
if self.total_transactions < self.max_concurrency {
if self.on_demand_tx_count < self.max_on_demand_txs {
let provider_ro = self.view.provider_ro()?;
let tx = provider_ro.into_tx();
self.total_transactions += 1;
return Ok(Some(ProofTaskTx::new(tx, self.task_ctx.clone(), self.total_transactions)));
self.on_demand_tx_count += 1;
return Ok(Some(ProofTaskTx::new(tx, self.task_ctx.clone(), self.on_demand_tx_count)));
}
Ok(None)
@@ -417,11 +484,11 @@ where
/// This will return an error if a transaction must be created on-demand and the consistent view
/// provider fails.
pub fn try_spawn_next(&mut self) -> ProviderResult<()> {
let Some(task) = self.pending_tasks.pop_front() else { return Ok(()) };
let Some(task) = self.pending_on_demand.pop_front() else { return Ok(()) };
let Some(proof_task_tx) = self.get_or_create_tx()? else {
// if there are no txs available, requeue the proof task
self.pending_tasks.push_front(task);
self.pending_on_demand.push_front(task);
return Ok(())
};
@@ -443,99 +510,99 @@ where
///
/// # Task Routing
///
/// - **Storage Trie Operations** (`StorageProof` and `BlindedStorageNode`): Routed to
/// pre-spawned worker pool via unbounded channel.
/// - **Account Trie Operations** (`BlindedAccountNode`): Queued for on-demand execution via
/// `pending_tasks`.
/// - **Storage Proofs**: Routed to pre-spawned worker pool via bounded channel
/// - If channel is full, falls back to on-demand spawn
/// - **Blinded Nodes**: Queued for on-demand execution (original behavior)
///
/// # Shutdown
/// # Worker Pool Lifecycle
///
/// On termination, `storage_work_tx` is dropped, closing the channel and
/// signaling all workers to shut down gracefully.
/// On termination, `storage_work_tx` is dropped, closing the channel and signaling
/// all workers to shut down gracefully.
pub fn run(mut self) -> ProviderResult<()> {
loop {
match self.proof_task_rx.recv() {
Ok(message) => {
match message {
ProofTaskMessage::QueueTask(task) => match task {
ProofTaskKind::StorageProof(input, sender) => {
match self.storage_work_tx.send(StorageWorkerJob::StorageProof {
input,
result_sender: sender,
}) {
Ok(_) => {
tracing::trace!(
target: "trie::proof_task",
"Storage proof dispatched to worker pool"
);
ProofTaskMessage::QueueTask(task) => {
match task {
// ==================== STORAGE PROOF ROUTING ====================
ProofTaskKind::StorageProof(input, sender) => {
#[cfg(feature = "metrics")]
{
self.metrics.storage_proofs += 1;
}
Err(crossbeam_channel::SendError(job)) => {
tracing::error!(
target: "trie::proof_task",
storage_worker_count = self.storage_worker_count,
"Worker pool disconnected, cannot process storage proof"
);
// Send error back to caller
let _ = job.send_worker_unavailable_error();
// Try to send to worker pool first
match self
.storage_work_tx
.try_send(StorageProofJob { input, result_sender: sender })
{
Ok(_) => {
// Successfully queued to worker pool
tracing::trace!(
target: "trie::proof_task",
"Storage proof dispatched to worker pool"
);
}
Err(crossbeam_channel::TrySendError::Full(job)) => {
// Channel full - fall back to on-demand spawn
tracing::debug!(
target: "trie::proof_task",
"Worker pool queue full, spawning on-demand"
);
#[cfg(feature = "metrics")]
{
self.metrics.on_demand_fallback += 1;
}
// Queue for on-demand execution
self.pending_on_demand.push_back(
ProofTaskKind::StorageProof(
job.input,
job.result_sender,
),
);
}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
// Workers shut down - this should not happen
tracing::error!(
target: "trie::proof_task",
"Worker pool disconnected unexpectedly"
);
return Err(reth_storage_errors::provider::ProviderError::Database(
reth_db_api::DatabaseError::Other("Worker pool disconnected".into())
))
}
}
}
// ==================== BLINDED NODE ROUTING ====================
ProofTaskKind::BlindedAccountNode(_, _) => {
#[cfg(feature = "metrics")]
{
self.metrics.account_nodes += 1;
}
self.queue_proof_task(task);
}
ProofTaskKind::BlindedStorageNode(_, _, _) => {
#[cfg(feature = "metrics")]
{
self.metrics.storage_nodes += 1;
}
self.queue_proof_task(task);
}
}
ProofTaskKind::BlindedStorageNode(account, path, sender) => {
#[cfg(feature = "metrics")]
{
self.metrics.storage_nodes += 1;
}
match self.storage_work_tx.send(
StorageWorkerJob::BlindedStorageNode {
account,
path,
result_sender: sender,
},
) {
Ok(_) => {
tracing::trace!(
target: "trie::proof_task",
?account,
?path,
"Blinded storage node dispatched to worker pool"
);
}
Err(crossbeam_channel::SendError(job)) => {
tracing::warn!(
target: "trie::proof_task",
storage_worker_count = self.storage_worker_count,
?account,
?path,
"Worker pool disconnected, cannot process blinded storage node"
);
// Send error back to caller
let _ = job.send_worker_unavailable_error();
}
}
}
ProofTaskKind::BlindedAccountNode(_, _) => {
// Route account trie operations to pending_tasks
#[cfg(feature = "metrics")]
{
self.metrics.account_nodes += 1;
}
self.queue_proof_task(task);
}
},
}
ProofTaskMessage::Transaction(tx) => {
// Return transaction to pending_tasks pool
self.proof_task_txs.push(tx);
// Return transaction to on-demand pool
self.on_demand_txs.push(tx);
}
ProofTaskMessage::Terminate => {
// Drop storage_work_tx to signal workers to shut down
drop(self.storage_work_tx);
tracing::debug!(
tracing::info!(
target: "trie::proof_task",
storage_worker_count = self.storage_worker_count,
"Shutting down proof task manager, signaling workers to terminate"
@@ -554,7 +621,7 @@ where
Err(_) => return Ok(()),
};
// Try spawning pending account trie tasks
// Try spawning on-demand tasks only (storage proofs handled by worker pool)
self.try_spawn_next()?;
}
}
@@ -607,14 +674,105 @@ where
(trie_cursor_factory, hashed_cursor_factory)
}
/// Compute storage proof with pre-created factories.
/// Compute storage proof without consuming self (for worker pool reuse).
///
/// Accepts cursor factories as parameters to allow reuse across multiple proofs.
/// Used by storage workers in the worker pool to avoid factory recreation
/// overhead on each proof computation.
#[inline]
fn compute_storage_proof(
&self,
/// # Purpose
///
/// This method enables transaction reuse in the storage worker pool. Unlike the
/// original `storage_proof(self, ...)` which consumes self and returns the
/// transaction to a pool, this method:
///
/// 1. Borrows self immutably
/// 2. Computes the proof using the owned transaction
/// 3. Returns only the result (transaction remains owned)
/// 4. Can be called repeatedly on the same ProofTaskTx instance
///
/// # Usage
///
/// This is called exclusively by storage workers in the worker pool. On-demand
/// execution still uses the original `storage_proof(self, ...)` method which
/// consumes self and returns the transaction.
///
/// # Performance
///
/// By reusing the same transaction and cursor factories across multiple proofs:
/// - Eliminates per-proof transaction creation overhead
/// - Avoids message passing to return transactions
/// - Reduces memory allocations for cursor factories
fn compute_storage_proof(&self, input: &StorageProofInput) -> StorageProofResult {
// ==================== SETUP ====================
// Create cursor factories (same as original implementation)
let (trie_cursor_factory, hashed_cursor_factory) = self.create_factories();
// Get or create added/removed keys context
let multi_added_removed_keys = input
.multi_added_removed_keys
.clone()
.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
let added_removed_keys = multi_added_removed_keys.get_storage(&input.hashed_address);
let span = tracing::trace_span!(
target: "trie::proof_task",
"Storage proof calculation",
hashed_address = ?input.hashed_address,
// Worker ID embedded in ProofTaskTx for trace correlation
worker_id = self.id,
);
let _guard = span.enter();
let target_slots_len = input.target_slots.len();
let proof_start = Instant::now();
// Compute raw storage multiproof (identical to original)
let raw_proof_result = StorageProof::new_hashed(
trie_cursor_factory,
hashed_cursor_factory,
input.hashed_address,
)
.with_prefix_set_mut(PrefixSetMut::from(input.prefix_set.iter().copied()))
.with_branch_node_masks(input.with_branch_node_masks)
.with_added_removed_keys(added_removed_keys)
.storage_multiproof(input.target_slots.clone())
.map_err(|e| ParallelStateRootError::Other(e.to_string()));
// Decode proof into DecodedStorageMultiProof
let decoded_result = raw_proof_result.and_then(|raw_proof| {
raw_proof.try_into().map_err(|e: alloy_rlp::Error| {
ParallelStateRootError::Other(format!(
"Failed to decode storage proof for {}: {}",
input.hashed_address, e
))
})
});
trace!(
target: "trie::proof_task",
hashed_address = ?input.hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots = target_slots_len,
proof_time_us = proof_start.elapsed().as_micros(),
worker_id = self.id,
"Completed storage proof calculation"
);
decoded_result
// NOTE: self is NOT consumed - transaction remains owned by worker
// No ProofTaskMessage::Transaction sent
}
/// Calculates a storage proof for the given hashed address, and desired prefix set.
///
/// **ON-DEMAND VARIANT** - Consumes self, returns transaction to pool.
///
/// This method is NO LONGER CALLED for storage proofs from the worker pool,
/// but is kept for:
/// 1. Backward compatibility with any direct callers
/// 2. Future use cases that need one-off storage proofs
/// 3. Tests that rely on the transaction return mechanism
fn storage_proof(
self,
input: StorageProofInput,
trie_cursor_factory: impl TrieCursorFactory,
hashed_cursor_factory: impl HashedCursorFactory,

View File

@@ -9,6 +9,10 @@ pub struct ProofTaskMetrics {
pub account_nodes: usize,
/// Count of blinded storage node requests.
pub storage_nodes: usize,
/// Count of storage proof requests routed to worker pool.
pub storage_proofs: usize,
/// Count of times worker pool was full and fell back to on-demand execution.
pub on_demand_fallback: usize,
}
impl ProofTaskMetrics {
@@ -16,6 +20,7 @@ impl ProofTaskMetrics {
pub fn record(&self) {
self.task_metrics.record_account_nodes(self.account_nodes);
self.task_metrics.record_storage_nodes(self.storage_nodes);
self.task_metrics.record_storage_proofs(self.storage_proofs);
}
}
@@ -27,6 +32,8 @@ pub struct ProofTaskTrieMetrics {
blinded_account_nodes: Histogram,
/// A histogram for the number of blinded storage nodes fetched.
blinded_storage_nodes: Histogram,
/// A histogram for the number of storage proofs computed via worker pool.
storage_proofs: Histogram,
}
impl ProofTaskTrieMetrics {
@@ -39,4 +46,9 @@ impl ProofTaskTrieMetrics {
pub fn record_storage_nodes(&self, count: usize) {
self.blinded_storage_nodes.record(count as f64);
}
/// Record storage proofs computed via worker pool.
pub fn record_storage_proofs(&self, count: usize) {
self.storage_proofs.record(count as f64);
}
}