fix(engine): recompute trie updates for forked blocks (#16568)

This commit is contained in:
Alexey Shekhirin
2025-06-04 10:58:19 +01:00
committed by GitHub
parent fe5c6d80d5
commit 5106f64f76
15 changed files with 324 additions and 117 deletions

View File

@@ -1,6 +1,7 @@
//! Internal errors for the tree module.
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use reth_consensus::ConsensusError;
use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError};
use reth_evm::execute::InternalBlockExecutionError;
@@ -17,6 +18,20 @@ pub enum AdvancePersistenceError {
/// A provider error
#[error(transparent)]
Provider(#[from] ProviderError),
/// Missing ancestor.
///
/// This error occurs when we need to compute the state root for a block with missing trie
/// updates, but the ancestor block is not available. State root computation requires the state
/// from the parent block as a starting point.
///
/// A block may be missing the trie updates when it's a fork chain block building on top of the
/// historical database state. Since we don't store the historical trie state, we cannot
/// generate the trie updates for it until the moment when database is unwound to the canonical
/// chain.
///
/// Also see [`reth_chain_state::ExecutedTrieUpdates::Missing`].
#[error("Missing ancestor with hash {0}")]
MissingAncestor(B256),
}
#[derive(thiserror::Error)]

View File

@@ -20,7 +20,7 @@ use payload_processor::sparse_trie::StateRootComputeOutcome;
use persistence_state::CurrentPersistenceAction;
use precompile_cache::{CachedPrecompile, PrecompileCacheMap};
use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
MemoryOverlayStateProvider, NewCanonicalChain,
};
use reth_consensus::{Consensus, FullConsensus};
@@ -49,6 +49,7 @@ use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use state::TreeState;
use std::{
borrow::Cow,
fmt::Debug,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
@@ -727,11 +728,16 @@ where
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
///
/// The header is required as an arg, because we might be checking that the header is a fork
/// block before it's in the tree state and before it's in the database.
fn is_fork(&self, target_header: &SealedHeader<N::BlockHeader>) -> ProviderResult<bool> {
let target_hash = target_header.hash();
// verify that the given hash is not part of an extension of the canon chain.
let canonical_head = self.state.tree_state.canonical_head();
let mut current_hash = target_hash;
while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
let mut current_hash;
let mut current_block = Cow::Borrowed(target_header);
loop {
if current_block.hash() == canonical_head.hash {
return Ok(false)
}
@@ -740,6 +746,9 @@ where
break
}
current_hash = current_block.parent_hash();
let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
current_block = Cow::Owned(next_block);
}
// verify that the given hash is not already part of canonical chain stored in memory
@@ -755,6 +764,26 @@ where
Ok(true)
}
/// Check if the given block has any ancestors with missing trie updates.
fn has_ancestors_with_missing_trie_updates(
&self,
target_header: &SealedHeader<N::BlockHeader>,
) -> bool {
// Walk back through the chain starting from the parent of the target block
let mut current_hash = target_header.parent_hash();
while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
// Check if this block is missing trie updates
if block.trie.is_missing() {
return true;
}
// Move to the parent block
current_hash = block.recovered_block().parent_hash();
}
false
}
/// Returns the persisting kind for the input block.
fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
// Check that we're currently persisting.
@@ -1021,7 +1050,7 @@ where
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
} else if self.should_persist() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
self.persist_blocks(blocks_to_persist);
}
}
@@ -1348,9 +1377,20 @@ where
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. canonical_head - threshold]` . The expected
/// `(last_persisted_number .. canonical_head - threshold]`. The expected
/// order is oldest -> newest.
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
///
/// For those blocks that didn't have the trie updates calculated, runs the state root
/// calculation, and saves the trie updates.
///
/// Returns an error if the state root calculation fails.
fn get_canonical_blocks_to_persist(
&mut self,
) -> Result<Vec<ExecutedBlockWithTrieUpdates<N>>, AdvancePersistenceError> {
// We will calculate the state root using the database, so we need to be sure there are no
// changes
debug_assert!(!self.persistence_state.in_progress());
let mut blocks_to_persist = Vec::new();
let mut current_hash = self.state.tree_state.canonical_block_hash();
let last_persisted_number = self.persistence_state.last_persisted_block.number;
@@ -1373,10 +1413,51 @@ where
current_hash = block.recovered_block().parent_hash();
}
// reverse the order so that the oldest block comes first
// Reverse the order so that the oldest block comes first
blocks_to_persist.reverse();
blocks_to_persist
// Calculate missing trie updates
for block in &mut blocks_to_persist {
if block.trie.is_present() {
continue
}
debug!(
target: "engine::tree",
block = ?block.recovered_block().num_hash(),
"Calculating trie updates before persisting"
);
let provider = self
.state_provider_builder(block.recovered_block().parent_hash())?
.ok_or(AdvancePersistenceError::MissingAncestor(
block.recovered_block().parent_hash(),
))?
.build()?;
let mut trie_input = self.compute_trie_input(
self.persisting_kind_for(block.recovered_block().header()),
self.provider.database_provider_ro()?,
block.recovered_block().parent_hash(),
)?;
// Extend with block we are generating trie updates for.
trie_input.append_ref(block.hashed_state());
let (_root, updates) = provider.state_root_from_nodes_with_updates(trie_input)?;
debug_assert_eq!(_root, block.recovered_block().state_root());
// Update trie updates in both tree state and blocks to persist that we return
let trie_updates = Arc::new(updates);
let tree_state_block = self
.state
.tree_state
.blocks_by_hash
.get_mut(&block.recovered_block().hash())
.expect("blocks to persist are constructed from tree state blocks");
tree_state_block.trie.set_present(trie_updates.clone());
block.trie.set_present(trie_updates);
}
Ok(blocks_to_persist)
}
/// This clears the blocks from the in-memory tree state that have been persisted to the
@@ -1828,7 +1909,10 @@ where
.persisted_trie_updates
.get(&block.recovered_block.hash())
.cloned()?;
Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
Some(ExecutedBlockWithTrieUpdates {
block: block.clone(),
trie: ExecutedTrieUpdates::Present(trie),
})
})
.collect::<Vec<_>>();
self.reinsert_reorged_blocks(old);
@@ -2081,10 +2165,21 @@ where
let run_parallel_state_root =
persisting_kind.can_run_parallel_state_root() && !self.config.state_root_fallback();
// Use state root task only if:
// 1. No persistence is in progress
// 2. Config allows it
// 3. No ancestors with missing trie updates. If any exist, it will mean that every state
// root task proof calculation will include a lot of unrelated paths in the prefix sets.
// It's cheaper to run a parallel state root that does one walk over trie tables while
// accounting for the prefix sets.
let use_state_root_task = run_parallel_state_root &&
self.config.use_state_root_task() &&
!self.has_ancestors_with_missing_trie_updates(block.sealed_header());
// use prewarming background task
let header = block.clone_sealed_header();
let txs = block.clone_transactions_recovered().collect();
let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
let mut handle = if run_parallel_state_root && use_state_root_task {
// use background tasks for state root calc
let consistent_view =
ensure_ok!(ConsistentDbView::new_with_latest_tip(self.provider.clone()));
@@ -2093,7 +2188,7 @@ where
let trie_input_start = Instant::now();
let res = self.compute_trie_input(
persisting_kind,
consistent_view.clone(),
ensure_ok!(consistent_view.provider_ro()),
block.header().parent_hash(),
);
let trie_input = match res {
@@ -2167,7 +2262,7 @@ where
if run_parallel_state_root {
// if we new payload extends the current canonical change we attempt to use the
// background task or try to compute it in parallel
if self.config.use_state_root_task() {
if use_state_root_task {
match handle.state_root() {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
let elapsed = execution_finish.elapsed();
@@ -2248,13 +2343,22 @@ where
// terminate prewarming task with good state output
handle.terminate_caching(Some(output.state.clone()));
let is_fork = ensure_ok!(self.is_fork(block.sealed_header()));
// If the block is a fork, we don't save the trie updates, because they may be incorrect.
// Instead, they will be recomputed on persistence.
let trie_updates = if is_fork {
ExecutedTrieUpdates::Missing
} else {
ExecutedTrieUpdates::Present(Arc::new(trie_output))
};
let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
block: ExecutedBlock {
recovered_block: Arc::new(block),
execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
hashed_state: Arc::new(hashed_state),
},
trie: Arc::new(trie_output),
trie: trie_updates,
};
// if the parent is the canonical head, we can insert the block as the pending block
@@ -2269,10 +2373,6 @@ where
// emit insert event
let elapsed = start.elapsed();
let is_fork = match self.is_fork(block_num_hash.hash) {
Ok(val) => val,
Err(e) => return Err((e.into(), executed.block.recovered_block().clone())),
};
let engine_event = if is_fork {
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
} else {
@@ -2338,7 +2438,7 @@ where
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input =
self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
self.compute_trie_input(persisting_kind, consistent_view.provider_ro()?, parent_hash)?;
// Extend with block we are validating root for.
input.append_ref(hashed_state);
@@ -2360,15 +2460,14 @@ where
/// block.
/// 3. Once in-memory blocks are collected and optionally filtered, we compute the
/// [`HashedPostState`] from them.
fn compute_trie_input(
fn compute_trie_input<TP: DBProvider + BlockNumReader>(
&self,
persisting_kind: PersistingKind,
consistent_view: ConsistentDbView<P>,
provider: TP,
parent_hash: B256,
) -> Result<TrieInput, ParallelStateRootError> {
) -> ProviderResult<TrieInput> {
let mut input = TrieInput::default();
let provider = consistent_view.provider_ro()?;
let best_block_number = provider.best_block_number()?;
let (mut historical, mut blocks) = self
@@ -2439,9 +2538,9 @@ where
input.append(revert_state);
// Extend with contents of parent in-memory blocks.
for block in blocks.iter().rev() {
input.append_cached_ref(block.trie_updates(), block.hashed_state())
}
input.extend_with_blocks(
blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
);
Ok(input)
}
@@ -2809,7 +2908,7 @@ mod tests {
use reth_node_ethereum::EthereumEngineValidator;
use reth_primitives_traits::Block as _;
use reth_provider::test_utils::MockEthProvider;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::HashedPostState;
use std::{
collections::BTreeMap,
str::FromStr,
@@ -3589,7 +3688,7 @@ mod tests {
execution_output: Arc::new(ExecutionOutcome::default()),
hashed_state: Arc::new(HashedPostState::default()),
},
trie: Arc::new(TrieUpdates::default()),
trie: ExecutedTrieUpdates::empty(),
});
}
test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
@@ -3601,7 +3700,7 @@ mod tests {
execution_output: Arc::new(ExecutionOutcome::default()),
hashed_state: Arc::new(HashedPostState::default()),
},
trie: Arc::new(TrieUpdates::default()),
trie: ExecutedTrieUpdates::empty(),
});
}
@@ -3651,7 +3750,7 @@ mod tests {
.with_persistence_threshold(persistence_threshold)
.with_memory_block_buffer_target(memory_block_buffer_target);
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
let expected_blocks_to_persist_length: usize =
(canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
@@ -3672,7 +3771,7 @@ mod tests {
assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
// check that the fork block is not included in the blocks to persist
@@ -4057,7 +4156,7 @@ mod tests {
test_harness.check_canon_head(chain_b_tip_hash);
// verify that chain A is now considered a fork
assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
assert!(test_harness.tree.is_fork(chain_a.last().unwrap().sealed_header()).unwrap());
}
#[tokio::test]

View File

@@ -210,13 +210,20 @@ impl<N: NodePrimitives> TreeState<N> {
while let Some(executed) = self.blocks_by_hash.get(&current_block) {
current_block = executed.recovered_block().parent_hash();
if executed.recovered_block().number() <= upper_bound {
debug!(target: "engine::tree", num_hash=?executed.recovered_block().num_hash(), "Attempting to remove block walking back from the head");
if let Some((removed, _)) = self.remove_by_hash(executed.recovered_block().hash()) {
debug!(target: "engine::tree", num_hash=?removed.recovered_block().num_hash(), "Removed block walking back from the head");
let num_hash = executed.recovered_block().num_hash();
debug!(target: "engine::tree", ?num_hash, "Attempting to remove block walking back from the head");
if let Some((mut removed, _)) =
self.remove_by_hash(executed.recovered_block().hash())
{
debug!(target: "engine::tree", ?num_hash, "Removed block walking back from the head");
// finally, move the trie updates
let Some(trie_updates) = removed.trie.take_present() else {
debug!(target: "engine::tree", ?num_hash, "No trie updates found for persisted block");
continue;
};
self.persisted_trie_updates.insert(
removed.recovered_block().hash(),
(removed.recovered_block().number(), removed.trie),
(removed.recovered_block().number(), trie_updates),
);
}
}