fix(proof_v2): make sure that all storage proofs are delivered (#21611)

Co-authored-by: Brian Picciano <me@mediocregopher.com>
This commit is contained in:
Arsenii Kulikov
2026-01-30 15:21:17 +04:00
committed by GitHub
parent 0470c65e6c
commit 20f48b1e50

View File

@@ -54,7 +54,11 @@ pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
Dispatched {
hashed_address: B256,
account: Account,
proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
/// The receiver for the storage proof result. This is an `Option` so that `encode` can
/// take ownership of the receiver, preventing the `Drop` impl from trying to receive on
/// it again.
proof_result_rx:
Option<Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>>,
/// Shared storage proof results.
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
/// Shared stats for tracking wait time and counts.
@@ -78,13 +82,58 @@ pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
},
}
impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
fn drop(&mut self) {
// If this is a Dispatched encoder that was never consumed via encode(), we need to
// receive the storage proof result to avoid losing it.
let res = if let Self::Dispatched {
hashed_address,
proof_result_rx,
storage_proof_results,
stats,
..
} = self
{
// Take the receiver out - if it's None (already consumed by encode), nothing to do
let Some(proof_result_rx) = proof_result_rx.take() else { return };
(|| -> Result<(), StateProofError> {
let rx = proof_result_rx?;
let wait_start = Instant::now();
let msg = rx.recv().map_err(|_| {
StateProofError::Database(DatabaseError::Other(format!(
"Storage proof channel closed for {hashed_address:?}",
)))
})?;
let result = msg.result?;
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { proof, .. } = result else {
panic!("StorageProofResult is not V2: {result:?}")
};
storage_proof_results.borrow_mut().insert(*hashed_address, proof);
Ok(())
})()
} else {
return;
};
if let Err(err) = res {
tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop");
}
}
}
impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
{
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
let (account, root) = match self {
fn encode(mut self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
let (account, root) = match &mut self {
Self::Dispatched {
hashed_address,
account,
@@ -94,6 +143,12 @@ where
storage_calculator,
cached_storage_roots,
} => {
let hashed_address = *hashed_address;
let account = *account;
// Take the receiver so Drop won't try to receive on it again
let proof_result_rx = proof_result_rx
.take()
.expect("encode called on already-consumed Dispatched encoder");
let wait_start = Instant::now();
let result = proof_result_rx?
.recv()
@@ -136,8 +191,10 @@ where
(account, root)
}
Self::FromCache { account, root } => (account, root),
Self::FromCache { account, root } => (*account, *root),
Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
let hashed_address = *hashed_address;
let account = *account;
let mut calculator = storage_calculator.borrow_mut();
let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?;
let storage_root = calculator
@@ -266,7 +323,7 @@ where
return AsyncAccountDeferredValueEncoder::Dispatched {
hashed_address,
account,
proof_result_rx: Ok(rx),
proof_result_rx: Some(Ok(rx)),
storage_proof_results: self.storage_proof_results.clone(),
stats: self.stats.clone(),
storage_calculator: self.storage_calculator.clone(),