mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
fix: handling of on-disk reorgs (#15046)
This commit is contained in:
@@ -8,7 +8,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash};
|
||||
use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, NumHash};
|
||||
use alloy_primitives::{
|
||||
map::{HashMap, HashSet},
|
||||
BlockNumber, B256, U256,
|
||||
@@ -48,7 +48,6 @@ use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
|
||||
use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
|
||||
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{btree_map, hash_map, BTreeMap, VecDeque},
|
||||
fmt::Debug,
|
||||
ops::Bound,
|
||||
@@ -730,7 +729,6 @@ where
|
||||
let persistence_state = PersistenceState {
|
||||
last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
|
||||
rx: None,
|
||||
remove_above_state: VecDeque::new(),
|
||||
};
|
||||
|
||||
let (tx, outgoing) = unbounded_channel();
|
||||
@@ -1278,7 +1276,7 @@ where
|
||||
/// Helper method to remove blocks and set the persistence state. This ensures we keep track of
|
||||
/// the current persistence action while we're removing blocks.
|
||||
fn remove_blocks(&mut self, new_tip_num: u64) {
|
||||
debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
|
||||
debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
|
||||
if new_tip_num < self.persistence_state.last_persisted_block.number {
|
||||
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -1348,7 +1346,7 @@ where
|
||||
}
|
||||
|
||||
if !self.persistence_state.in_progress() {
|
||||
if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
|
||||
if let Some(new_tip_num) = self.find_disk_reorg()? {
|
||||
self.remove_blocks(new_tip_num)
|
||||
} else if self.should_persist() {
|
||||
let blocks_to_persist = self.get_canonical_blocks_to_persist();
|
||||
@@ -1718,6 +1716,13 @@ where
|
||||
///
|
||||
/// Assumes that `finish` has been called on the `persistence_state` at least once
|
||||
fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
|
||||
// If we have an on-disk reorg, we need to handle it firsrt before touching the in-memory
|
||||
// state.
|
||||
if let Some(remove_above) = self.find_disk_reorg()? {
|
||||
self.remove_blocks(remove_above);
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
|
||||
self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
|
||||
self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
|
||||
@@ -2086,39 +2091,50 @@ where
|
||||
None
|
||||
}
|
||||
|
||||
/// This determines whether or not we should remove blocks from the chain, based on a canonical
|
||||
/// chain update.
|
||||
///
|
||||
/// If the chain update is a reorg:
|
||||
/// * is the new chain behind the last persisted block, or
|
||||
/// * if the root of the new chain is at the same height as the last persisted block, is it a
|
||||
/// different block
|
||||
///
|
||||
/// If either of these are true, then this returns the height of the first block. Otherwise,
|
||||
/// this returns [`None`]. This should be used to check whether or not we should be sending a
|
||||
/// remove command to the persistence task.
|
||||
fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
|
||||
let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
|
||||
/// This method tries to detect whether on-disk and in-memory states have diverged. It might
|
||||
/// happen if a reorg is happening while we are persisting a block.
|
||||
fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
|
||||
let mut canonical = self.state.tree_state.current_canonical_head;
|
||||
let mut persisted = self.persistence_state.last_persisted_block;
|
||||
|
||||
let BlockNumHash { number: new_num, hash: new_hash } =
|
||||
new.first().map(|block| block.recovered_block().num_hash())?;
|
||||
let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
|
||||
Ok(self
|
||||
.sealed_header_by_hash(num_hash.hash)?
|
||||
.ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
|
||||
.parent_num_hash())
|
||||
};
|
||||
|
||||
match new_num.cmp(&self.persistence_state.last_persisted_block.number) {
|
||||
Ordering::Greater => {
|
||||
// new number is above the last persisted block so the reorg can be performed
|
||||
// entirely in memory
|
||||
None
|
||||
}
|
||||
Ordering::Equal => {
|
||||
// new number is the same, if the hash is the same then we should not need to remove
|
||||
// any blocks
|
||||
(self.persistence_state.last_persisted_block.hash != new_hash).then_some(new_num)
|
||||
}
|
||||
Ordering::Less => {
|
||||
// this means we are below the last persisted block and must remove on disk blocks
|
||||
Some(new_num)
|
||||
}
|
||||
// Happy path, canonical chain is ahead or equal to persisted chain.
|
||||
// Walk canonical chain back to make sure that it connects to persisted chain.
|
||||
while canonical.number > persisted.number {
|
||||
canonical = parent_num_hash(canonical)?;
|
||||
}
|
||||
|
||||
// If we've reached persisted tip by walking the canonical chain back, everything is fine.
|
||||
if canonical == persisted {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// At this point, we know that `persisted` block can't be reached by walking the canonical
|
||||
// chain back. In this case we need to truncate it to the first canonical block it connects
|
||||
// to.
|
||||
|
||||
// Firstly, walk back until we reach the same height as `canonical`.
|
||||
while persisted.number > canonical.number {
|
||||
persisted = parent_num_hash(persisted)?;
|
||||
}
|
||||
|
||||
debug_assert_eq!(persisted.number, canonical.number);
|
||||
|
||||
// Now walk both chains back until we find a common ancestor.
|
||||
while persisted.hash != canonical.hash {
|
||||
canonical = parent_num_hash(canonical)?;
|
||||
persisted = parent_num_hash(persisted)?;
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
|
||||
|
||||
Ok(Some(persisted.number))
|
||||
}
|
||||
|
||||
/// Invoked when we the canonical chain has been updated.
|
||||
@@ -2128,13 +2144,6 @@ where
|
||||
trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
|
||||
let start = Instant::now();
|
||||
|
||||
// schedule a remove_above call if we have an on-disk reorg
|
||||
if let Some(height) = self.find_disk_reorg(&chain_update) {
|
||||
// calculate the new tip by subtracting one from the lowest part of the chain
|
||||
let new_tip_num = height.saturating_sub(1);
|
||||
self.persistence_state.schedule_removal(new_tip_num);
|
||||
}
|
||||
|
||||
// update the tracked canonical head
|
||||
self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
|
||||
|
||||
@@ -4134,8 +4143,8 @@ mod tests {
|
||||
test_harness = test_harness.with_blocks(blocks.clone());
|
||||
|
||||
let last_persisted_block_number = 3;
|
||||
test_harness.tree.persistence_state.last_persisted_block.number =
|
||||
last_persisted_block_number;
|
||||
test_harness.tree.persistence_state.last_persisted_block =
|
||||
blocks[last_persisted_block_number as usize].recovered_block.num_hash();
|
||||
|
||||
let persistence_threshold = 4;
|
||||
let memory_block_buffer_target = 3;
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::B256;
|
||||
use std::{collections::VecDeque, time::Instant};
|
||||
use std::time::Instant;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::{debug, trace};
|
||||
use tracing::trace;
|
||||
|
||||
/// The state of the persistence task.
|
||||
#[derive(Default, Debug)]
|
||||
@@ -15,9 +15,6 @@ pub struct PersistenceState {
|
||||
/// sent when done. A None value means there's no persistence task in progress.
|
||||
pub(crate) rx:
|
||||
Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
|
||||
/// The block above which blocks should be removed from disk, because there has been an on disk
|
||||
/// reorg.
|
||||
pub(crate) remove_above_state: VecDeque<u64>,
|
||||
}
|
||||
|
||||
impl PersistenceState {
|
||||
@@ -52,13 +49,6 @@ impl PersistenceState {
|
||||
self.rx.as_ref().map(|rx| &rx.2)
|
||||
}
|
||||
|
||||
/// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the
|
||||
/// current `last_persisted_block_number`.
|
||||
pub(crate) fn schedule_removal(&mut self, new_tip_num: u64) {
|
||||
debug!(target: "engine::tree", ?new_tip_num, prev_remove_state=?self.remove_above_state, last_persisted_block=?self.last_persisted_block, "Scheduling removal");
|
||||
self.remove_above_state.push_back(new_tip_num);
|
||||
}
|
||||
|
||||
/// Sets state for a finished persistence task.
|
||||
pub(crate) fn finish(
|
||||
&mut self,
|
||||
|
||||
Reference in New Issue
Block a user