panic recovery

- Updated `execute_account_multiproof_worker` to directly return results, improving error handling and simplifying the function signature.
- Wrapped account multiproof execution in panic recovery to prevent worker failures from causing disruptions.
- Enhanced storage proof fetching with non-blocking receive attempts, improving efficiency and responsiveness.
- Introduced a new method `decoded_multiproof_with_stats` to return both multiproof results and performance statistics, aiding in performance monitoring.
- Added tests to validate the new metrics tracking for storage proofs, ensuring comprehensive coverage and reliability in proof task execution.
This commit is contained in:
Yong Kang
2025-10-02 10:34:17 +00:00
parent 47ded5b56b
commit e13a7b23ed
2 changed files with 300 additions and 30 deletions

View File

@@ -105,11 +105,30 @@ where
TrieElement::Leaf(hashed_address, account) => {
// Fetch storage proof on-demand (blocks if not yet ready)
let decoded_storage_multiproof = match storage_receivers.remove(&hashed_address) {
Some(receiver) => match receiver.recv() {
Ok(Ok(proof)) => proof,
Ok(Err(e)) => return Err(e),
Err(_) => return Err(storage_channel_closed_error(&hashed_address)),
},
Some(receiver) => {
// Try non-blocking receive first to track if proof was ready
match receiver.try_recv() {
Ok(Ok(proof)) => {
tracker.inc_storage_proof_immediate();
proof
}
Ok(Err(e)) => return Err(e),
Err(crossbeam_channel::TryRecvError::Empty) => {
// Proof not ready yet, block and wait
tracker.inc_storage_proof_blocked();
match receiver.recv() {
Ok(Ok(proof)) => proof,
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(storage_channel_closed_error(&hashed_address))
}
}
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
return Err(storage_channel_closed_error(&hashed_address))
}
}
}
// Since we do not store all intermediate nodes in the database, there might
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
@@ -411,6 +430,90 @@ where
Ok(multiproof)
}
/// Generate a state multiproof according to specified targets, returning stats.
///
/// This is identical to [`Self::decoded_multiproof`] but also returns the trie statistics
/// for analysis and testing.
pub fn decoded_multiproof_with_stats(
self,
targets: MultiProofTargets,
) -> Result<(DecodedMultiProof, ParallelTrieStats), ParallelStateRootError> {
// Extend prefix sets with targets
let mut prefix_sets = (*self.prefix_sets).clone();
prefix_sets.extend(TriePrefixSetsMut {
account_prefix_set: PrefixSetMut::from(targets.keys().copied().map(Nibbles::unpack)),
storage_prefix_sets: targets
.iter()
.filter(|&(_hashed_address, slots)| !slots.is_empty())
.map(|(hashed_address, slots)| {
(*hashed_address, PrefixSetMut::from(slots.iter().map(Nibbles::unpack)))
})
.collect(),
destroyed_accounts: Default::default(),
});
// Freeze prefix sets for storage root targets
let frozen_prefix_sets = prefix_sets.freeze();
let storage_root_targets = StorageRootTargets::new(
frozen_prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
frozen_prefix_sets.storage_prefix_sets.clone(),
);
let storage_root_targets_len = storage_root_targets.len();
trace!(
target: "trie::parallel_proof",
total_targets = storage_root_targets_len,
"Starting parallel proof generation"
);
// stores the receiver for the storage proof outcome for the hashed addresses
// this way we can lazily await the outcome when we iterate over the map
let mut storage_proof_receivers: B256Map<
crossbeam_channel::Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>>,
> = B256Map::with_capacity_and_hasher(storage_root_targets.len(), Default::default());
for (hashed_address, prefix_set) in
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
{
let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default();
let receiver = self.queue_storage_proof(hashed_address, prefix_set, target_slots);
// store the receiver for that result with the hashed address so we can await this in
// place when we iterate over the trie
storage_proof_receivers.insert(hashed_address, receiver);
}
let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&self.nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&self.state_sorted,
);
// Build account multiproof with on-demand storage proof fetching
let (multiproof, stats) = build_account_multiproof_with_storage(
trie_cursor_factory,
hashed_cursor_factory,
targets,
frozen_prefix_sets,
storage_proof_receivers, // ← Pass receivers directly for on-demand fetching
self.collect_branch_node_masks,
self.multi_added_removed_keys,
)?;
#[cfg(feature = "metrics")]
self.metrics.record(stats);
Ok((multiproof, stats))
}
}
#[cfg(test)]
@@ -749,4 +852,108 @@ mod tests {
drop(proof_task_handle);
rt.block_on(join_handle).unwrap().expect("proof task should succeed");
}
/// Test that storage proof metrics are properly tracked
#[test]
fn storage_proof_metrics_tracking() {
let factory = create_test_provider_factory();
let consistent_view = ConsistentDbView::new(factory.clone(), None);
let mut rng = rand::rng();
// Create state with multiple accounts having storage
let state = (0..10)
.map(|_| {
let address = Address::random();
let account =
Account { balance: U256::from(rng.random::<u64>()), ..Default::default() };
// All accounts have storage to ensure we get metrics
let storage: HashMap<B256, U256, DefaultHashBuilder> = (0..5)
.map(|_| {
(
B256::from(U256::from(rng.random::<u64>())),
U256::from(rng.random::<u64>()),
)
})
.collect();
(address, (account, storage))
})
.collect::<HashMap<_, _, DefaultHashBuilder>>();
{
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.insert_account_for_hashing(
state.iter().map(|(address, (account, _))| (*address, Some(*account))),
)
.unwrap();
provider_rw
.insert_storage_for_hashing(state.iter().map(|(address, (_, storage))| {
(
*address,
storage
.iter()
.map(|(slot, value)| StorageEntry { key: *slot, value: *value }),
)
}))
.unwrap();
provider_rw.commit().unwrap();
}
// Create targets for all accounts
let mut targets = MultiProofTargets::default();
for (address, (_, storage)) in &state {
let hashed_address = keccak256(*address);
let target_slots = storage.iter().take(3).map(|(slot, _)| *slot).collect();
targets.insert(hashed_address, target_slots);
}
let rt = Runtime::new().unwrap();
let task_ctx =
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
let proof_task = ProofTaskManager::new(
rt.handle().clone(),
consistent_view.clone(),
task_ctx,
2, // storage_worker_count
1, // account_worker_count
1, // max_concurrency
)
.unwrap();
let proof_task_handle = proof_task.handle();
let join_handle = rt.spawn_blocking(move || proof_task.run());
let (result, stats) = ParallelProof::new(
consistent_view,
Default::default(),
Default::default(),
Default::default(),
proof_task_handle.clone(),
)
.decoded_multiproof_with_stats(targets.clone())
.unwrap();
// Verify we got a valid result
assert!(!result.account_subtree.is_empty());
// Verify metrics are tracked
let total_proofs = stats.storage_proofs_immediate() + stats.storage_proofs_blocked();
assert_eq!(total_proofs, 10, "Should track all 10 storage proofs (immediate + blocked)");
// At least one category should have proofs
assert!(
stats.storage_proofs_immediate() > 0 || stats.storage_proofs_blocked() > 0,
"Should have at least some storage proof activity"
);
// Percentage should be valid (0-100)
let percentage = stats.storage_proofs_immediate_percentage();
assert!(
(0.0..=100.0).contains(&percentage),
"Percentage should be between 0-100, got {percentage}"
);
drop(proof_task_handle);
rt.block_on(join_handle).unwrap().expect("proof task should succeed");
}
}

View File

@@ -44,7 +44,7 @@ use std::{
time::Instant,
};
use tokio::runtime::Handle;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::ProofTaskMetrics;
@@ -63,19 +63,16 @@ fn storage_manager_closed_error() -> ParallelStateRootError {
/// This function coordinates with the storage manager to compute storage proofs for all
/// accounts in the multiproof, then assembles the final account multiproof by fetching
/// storage proofs on-demand during account trie traversal.
///
/// Returns the multiproof result. The caller is responsible for sending via `result_sender`.
fn execute_account_multiproof_worker<Tx: DbTx>(
input: AccountMultiproofInput<Tx>,
targets: MultiProofTargets,
prefix_sets: TriePrefixSets,
collect_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
storage_proof_handle: ProofTaskManagerHandle<Tx>,
proof_tx: &ProofTaskTx<Tx>,
) {
let AccountMultiproofInput {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
storage_proof_handle,
result_sender,
} = input;
) -> Result<DecodedMultiProof, ParallelStateRootError> {
// Queue ALL storage proof requests to storage manager
let mut storage_receivers: B256Map<
CrossbeamReceiver<Result<DecodedStorageMultiProof, ParallelStateRootError>>,
@@ -104,8 +101,7 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
?address,
"Storage manager closed, cannot queue proof"
);
let _ = result_sender.send(Err(storage_manager_closed_error()));
return;
return Err(storage_manager_closed_error());
}
storage_receivers.insert(*address, receiver);
@@ -122,14 +118,10 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
storage_receivers, // ← Pass receivers directly for on-demand fetching
collect_branch_node_masks,
multi_added_removed_keys,
);
)?;
if result_sender.send(result.map(|(multiproof, _stats)| multiproof)).is_err() {
warn!(
target: "trie::proof_task",
"Account multiproof result discarded - receiver dropped"
);
}
// Return just the multiproof (discard stats - caller doesn't need them)
Ok(result.0)
}
/// Proof job dispatched to the storage worker pool.
@@ -199,6 +191,9 @@ where
/// multiproof workers upfront that reuse database transactions for the entire block lifetime
/// (prewarm.rs pattern).
///
/// Worker counts are clamped to at least 1 to prevent deadlocks. Configuring zero workers
/// for either pool would cause queued work to never be processed.
///
/// Returns an error if the consistent view provider fails to create a read-only transaction.
pub fn new(
executor: Handle,
@@ -208,6 +203,10 @@ where
account_worker_count: usize,
max_concurrency: usize,
) -> ProviderResult<Self> {
// Clamp worker counts to at least 1 to prevent deadlocks
let storage_worker_count = storage_worker_count.max(1);
let account_worker_count = account_worker_count.max(1);
let (tx_sender, proof_task_rx) = channel();
let (storage_work_tx, storage_work_rx) = crossbeam_channel::unbounded();
let (account_work_tx, account_work_rx) = crossbeam_channel::unbounded();
@@ -250,8 +249,29 @@ where
metrics_clone.record_storage_wait_time(job.enqueued_at.elapsed());
}
let result = proof_tx.storage_proof_internal(&job.input);
let _ = job.result_tx.send(result);
// Wrap proof computation in panic recovery to prevent zombie workers
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
proof_tx.storage_proof_internal(&job.input)
}));
let final_result = match result {
Ok(Ok(proof)) => Ok(proof),
Ok(Err(e)) => Err(e),
Err(_panic_info) => {
error!(
target: "trie::proof_pool",
worker_id,
hashed_address = ?job.input.hashed_address,
"Storage proof task panicked, converting to error"
);
Err(ParallelStateRootError::Other(format!(
"storage proof task panicked for {}",
job.input.hashed_address
)))
}
};
let _ = job.result_tx.send(final_result);
}
debug!(target: "trie::proof_pool", worker_id, "Storage proof worker shutdown");
@@ -290,8 +310,51 @@ where
metrics_clone.record_account_wait_time(job.enqueued_at.elapsed());
}
// Execute account multiproof (blocks on storage proofs)
execute_account_multiproof_worker(job.input, &proof_tx);
// Destructure input to extract result_sender and pass fields to worker
let AccountMultiproofInput {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
storage_proof_handle,
result_sender,
} = job.input;
// Wrap account multiproof execution in panic recovery
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
execute_account_multiproof_worker(
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
storage_proof_handle,
&proof_tx,
)
}));
// Convert panic to error and send result
let final_result = match result {
Ok(Ok(multiproof)) => Ok(multiproof),
Ok(Err(e)) => Err(e),
Err(_panic_info) => {
error!(
target: "trie::proof_pool",
worker_id,
"Account multiproof task panicked - converting to error"
);
Err(ParallelStateRootError::Other(
"account multiproof task panicked".into(),
))
}
};
if result_sender.send(final_result).is_err() {
warn!(
target: "trie::proof_pool",
worker_id,
"Account multiproof result discarded - receiver dropped"
);
}
}
debug!(target: "trie::proof_pool", worker_id, "Account proof worker shutdown");