diff --git a/crates/trie/parallel/src/value_encoder.rs b/crates/trie/parallel/src/value_encoder.rs index e2d046edee..90549dd0a1 100644 --- a/crates/trie/parallel/src/value_encoder.rs +++ b/crates/trie/parallel/src/value_encoder.rs @@ -54,7 +54,11 @@ pub(crate) enum AsyncAccountDeferredValueEncoder { Dispatched { hashed_address: B256, account: Account, - proof_result_rx: Result, DatabaseError>, + /// The receiver for the storage proof result. This is an `Option` so that `encode` can + /// take ownership of the receiver, preventing the `Drop` impl from trying to receive on + /// it again. + proof_result_rx: + Option, DatabaseError>>, /// Shared storage proof results. storage_proof_results: Rc>>>, /// Shared stats for tracking wait time and counts. @@ -78,13 +82,58 @@ pub(crate) enum AsyncAccountDeferredValueEncoder { }, } +impl Drop for AsyncAccountDeferredValueEncoder { + fn drop(&mut self) { + // If this is a Dispatched encoder that was never consumed via encode(), we need to + // receive the storage proof result to avoid losing it. + let res = if let Self::Dispatched { + hashed_address, + proof_result_rx, + storage_proof_results, + stats, + .. + } = self + { + // Take the receiver out - if it's None (already consumed by encode), nothing to do + let Some(proof_result_rx) = proof_result_rx.take() else { return }; + + (|| -> Result<(), StateProofError> { + let rx = proof_result_rx?; + + let wait_start = Instant::now(); + let msg = rx.recv().map_err(|_| { + StateProofError::Database(DatabaseError::Other(format!( + "Storage proof channel closed for {hashed_address:?}", + ))) + })?; + let result = msg.result?; + + stats.borrow_mut().storage_wait_time += wait_start.elapsed(); + + let StorageProofResult::V2 { proof, .. } = result else { + panic!("StorageProofResult is not V2: {result:?}") + }; + + storage_proof_results.borrow_mut().insert(*hashed_address, proof); + Ok(()) + })() + } else { + return; + }; + + if let Err(err) = res { + tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop"); + } + } +} + impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder where TC: TrieStorageCursor, HC: HashedStorageCursor, { - fn encode(self, buf: &mut Vec) -> Result<(), StateProofError> { - let (account, root) = match self { + fn encode(mut self, buf: &mut Vec) -> Result<(), StateProofError> { + let (account, root) = match &mut self { Self::Dispatched { hashed_address, account, @@ -94,6 +143,12 @@ where storage_calculator, cached_storage_roots, } => { + let hashed_address = *hashed_address; + let account = *account; + // Take the receiver so Drop won't try to receive on it again + let proof_result_rx = proof_result_rx + .take() + .expect("encode called on already-consumed Dispatched encoder"); let wait_start = Instant::now(); let result = proof_result_rx? .recv() @@ -136,8 +191,10 @@ where (account, root) } - Self::FromCache { account, root } => (account, root), + Self::FromCache { account, root } => (*account, *root), Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => { + let hashed_address = *hashed_address; + let account = *account; let mut calculator = storage_calculator.borrow_mut(); let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?; let storage_root = calculator @@ -266,7 +323,7 @@ where return AsyncAccountDeferredValueEncoder::Dispatched { hashed_address, account, - proof_result_rx: Ok(rx), + proof_result_rx: Some(Ok(rx)), storage_proof_results: self.storage_proof_results.clone(), stats: self.stats.clone(), storage_calculator: self.storage_calculator.clone(),