refactor(proof_task): enhance comments and adjust queue capacity logic

- Improved comments in `ProofTaskManager` and related functions for better clarity on task management and processing.
- Updated queue capacity calculation to use 4x buffering, reducing fallback to slower on-demand execution during burst loads.
- Removed redundant variable assignments to streamline the code.
This commit is contained in:
Yong Kang
2025-10-07 07:53:38 +00:00
parent 29d48d4e1e
commit 3fb97c6977
2 changed files with 10 additions and 40 deletions

View File

@@ -196,7 +196,8 @@ where
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
// Default to half of max concurrency, leaving room for on-demand tasks (Accountproof and blinded nodes)
// Default to half of max concurrency, leaving room for on-demand tasks (Accountproof and
// blinded nodes)
let storage_worker_count = (max_proof_task_concurrency / 2)
.max(1)
.min(max_proof_task_concurrency.saturating_sub(1));

View File

@@ -88,8 +88,6 @@ struct StorageProofJob {
#[derive(Debug)]
pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
/// Sender for storage proof tasks to worker pool.
///
/// Queue capacity = `storage_worker_count` * 2 (for 2x buffering)
storage_work_tx: CrossbeamSender<StorageProofJob>,
/// Number of storage workers successfully spawned.
@@ -177,19 +175,16 @@ fn storage_worker_loop<Tx>(
);
let mut proofs_processed = 0u64;
let start_time = Instant::now();
while let Ok(StorageProofJob { input, result_sender }) = work_rx.recv() {
let hashed_address = input.hashed_address;
let prefix_set_len = input.prefix_set.len();
let target_slots_len = input.target_slots.len();
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len,
target_slots = target_slots_len,
prefix_set_len = input.prefix_set.len(),
target_slots = input.target_slots.len(),
"Processing storage proof"
);
@@ -214,26 +209,16 @@ fn storage_worker_loop<Tx>(
worker_id,
hashed_address = ?hashed_address,
proof_time_us = proof_elapsed.as_micros(),
prefix_set_len,
target_slots = target_slots_len,
total_processed = proofs_processed,
"Storage proof completed"
);
}
// Channel closed - graceful shutdown
let total_elapsed = start_time.elapsed();
tracing::info!(
target: "trie::proof_task",
worker_id,
proofs_processed,
uptime_secs = total_elapsed.as_secs(),
avg_proof_time_ms = if proofs_processed > 0 {
total_elapsed.as_millis() / proofs_processed as u128
} else {
0
},
"Storage proof worker shutting down"
);
}
@@ -270,7 +255,9 @@ where
);
}
let queue_capacity = planned_workers.saturating_mul(2).max(1);
// Use 4x buffering to prevent queue saturation under burst load.
// Deeper queue reduces fallback to slower on-demand execution when workers are busy.
let queue_capacity = planned_workers.saturating_mul(4).max(1);
let (storage_work_tx, storage_work_rx) = bounded::<StorageProofJob>(queue_capacity);
tracing::info!(
@@ -328,26 +315,11 @@ where
spawned = spawned_workers,
"Spawned fewer storage workers than requested"
);
} else {
tracing::info!(
target: "trie::proof_task",
spawned_workers,
queue_capacity,
"Storage worker pool initialized successfully"
);
}
// Allocate remaining capacity to on-demand pool.
let max_on_demand_txs = max_concurrency.saturating_sub(spawned_workers);
tracing::debug!(
target: "trie::proof_task",
max_on_demand_txs,
storage_workers = spawned_workers,
total_capacity = max_concurrency,
"Configured on-demand transaction pool for blinded nodes"
);
Self {
storage_work_tx,
storage_worker_count: spawned_workers,
@@ -569,6 +541,7 @@ impl<Tx> ProofTaskTx<Tx>
where
Tx: DbTx,
{
#[inline]
fn create_factories(&self) -> ProofFactories<'_, Tx> {
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(&self.tx),
@@ -588,6 +561,7 @@ where
/// Borrows self immutably to allow transaction reuse across multiple calls.
/// Used by storage workers in the worker pool to avoid transaction creation
/// overhead on each proof computation.
#[inline]
fn compute_storage_proof(&self, input: StorageProofInput) -> StorageProofResult {
let (trie_cursor_factory, hashed_cursor_factory) = self.create_factories();
@@ -599,8 +573,6 @@ where
with_branch_node_masks,
multi_added_removed_keys,
} = input;
let prefix_set_len = prefix_set.len();
let target_slots_len = target_slots.len();
// Get or create added/removed keys context
let multi_added_removed_keys =
@@ -611,14 +583,13 @@ where
target: "trie::proof_task",
"Storage proof calculation",
hashed_address = ?hashed_address,
// Worker ID embedded in ProofTaskTx for trace correlation
worker_id = self.id,
);
let _guard = span.enter();
let proof_start = Instant::now();
// Compute raw storage multiproof (identical to original)
// Compute raw storage multiproof
let raw_proof_result =
StorageProof::new_hashed(trie_cursor_factory, hashed_cursor_factory, hashed_address)
.with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied()))
@@ -640,8 +611,6 @@ where
trace!(
target: "trie::proof_task",
hashed_address = ?hashed_address,
prefix_set_len,
target_slots = target_slots_len,
proof_time_us = proof_start.elapsed().as_micros(),
worker_id = self.id,
"Completed storage proof calculation"