feat(tree): make on_new_head and is_fork aware of disk (#10551)

This commit is contained in:
Dan Cline
2024-09-04 15:00:07 -04:00
committed by GitHub
parent 8151c9a3c7
commit 3289cd79bd

View File

@@ -162,33 +162,6 @@ impl TreeState {
}
}
/// Determines if the given block is part of a fork by checking that these
/// conditions are true:
/// * walking back from the target hash to verify that the target hash is not part of an
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
fn is_fork(&self, target_hash: B256) -> bool {
// verify that the given hash is not part of an extension of the canon chain.
let mut current_hash = target_hash;
while let Some(current_block) = self.block_by_hash(current_hash) {
if current_block.hash() == self.canonical_block_hash() {
return false
}
current_hash = current_block.header.parent_hash;
}
// verify that the given hash is not already part of the canon chain
current_hash = self.canonical_block_hash();
while let Some(current_block) = self.block_by_hash(current_hash) {
if current_block.hash() == target_hash {
return false
}
current_hash = current_block.header.parent_hash;
}
true
}
/// Remove single executed block by its hash.
///
/// ## Returns
@@ -336,75 +309,6 @@ impl TreeState {
const fn canonical_block_number(&self) -> BlockNumber {
self.canonical_head().number
}
/// Returns the new chain for the given head.
///
/// This also handles reorgs.
///
/// Note: This does not update the tracked state and instead returns the new chain based on the
/// given head.
fn on_new_head(&self, new_head: B256) -> Option<NewCanonicalChain> {
let new_head_block = self.blocks_by_hash.get(&new_head)?;
let new_head_number = new_head_block.block.number;
let current_canonical_number = self.current_canonical_head.number;
let mut new_chain = vec![new_head_block.clone()];
let mut current_hash = new_head_block.block.parent_hash;
let mut current_number = new_head_number - 1;
// Walk back the new chain until we reach a block we know about
while current_number > current_canonical_number {
if let Some(block) = self.blocks_by_hash.get(&current_hash) {
new_chain.push(block.clone());
current_hash = block.block.parent_hash;
current_number -= 1;
} else {
return None; // We don't have the full chain
}
}
if current_hash == self.current_canonical_head.hash {
new_chain.reverse();
// Simple extension of the current chain
return Some(NewCanonicalChain::Commit { new: new_chain });
}
// We have a reorg. Walk back both chains to find the fork point.
let mut old_chain = Vec::new();
let mut old_hash = self.current_canonical_head.hash;
while old_hash != current_hash {
if let Some(block) = self.blocks_by_hash.get(&old_hash) {
old_chain.push(block.clone());
old_hash = block.block.parent_hash;
} else {
// This shouldn't happen as we're walking back the canonical chain
warn!(target: "consensus::engine", invalid_hash=?old_hash, "Canonical block not found in TreeState");
return None;
}
if old_hash == current_hash {
// We've found the fork point
break;
}
if let Some(block) = self.blocks_by_hash.get(&current_hash) {
if self.is_fork(block.block.hash()) {
new_chain.push(block.clone());
current_hash = block.block.parent_hash;
}
} else {
// This shouldn't happen as we've already walked this path
warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState");
return None;
}
}
new_chain.reverse();
old_chain.reverse();
Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain })
}
}
/// Tracks the state of the engine api internals.
@@ -482,7 +386,12 @@ impl TreeEvent {
#[derive(Debug)]
pub enum TreeAction {
/// Make target canonical.
MakeCanonical(B256),
MakeCanonical {
/// The sync target head hash
sync_target_head: B256,
/// The sync target finalized hash
sync_target_finalized: Option<B256>,
},
}
/// The engine API tree handler implementation.
@@ -836,14 +745,144 @@ where
let mut outcome = TreeOutcome::new(status);
if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
// NOTE: if we are in this branch, `is_sync_target_head` has returned true,
// meaning a sync target state exists, so we can safely unwrap
let sync_target = self
.state
.forkchoice_state_tracker
.sync_target_state()
.expect("sync target must exist");
// if the hash is zero then we should act like there is no finalized hash
let sync_target_finalized = (!sync_target.finalized_block_hash.is_zero())
.then_some(sync_target.finalized_block_hash);
// if the block is valid and it is the sync target head, make it canonical
outcome =
outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash)));
outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_hash,
sync_target_finalized,
}));
}
Ok(outcome)
}
/// Returns the new chain for the given head.
///
/// This also handles reorgs.
///
/// Note: This does not update the tracked state and instead returns the new chain based on the
/// given head.
fn on_new_head(
&self,
new_head: B256,
finalized_block: Option<B256>,
) -> ProviderResult<Option<NewCanonicalChain>> {
// get the executed new head block
let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
return Ok(None)
};
let new_head_number = new_head_block.block.number;
let current_canonical_number = self.state.tree_state.current_canonical_head.number;
let mut new_chain = vec![new_head_block.clone()];
let mut current_hash = new_head_block.block.parent_hash;
let mut current_number = new_head_number - 1;
// Walk back the new chain until we reach a block we know about
//
// This is only done for in-memory blocks, because we should not have persisted any blocks
// that are _above_ the current canonical head.
while current_number > current_canonical_number {
if let Some(block) = self.executed_block_by_hash(current_hash)? {
new_chain.push(block.clone());
current_hash = block.block.parent_hash;
current_number -= 1;
} else {
warn!(target: "consensus::engine", current_hash=?current_hash, "Sidechain block not found in TreeState");
// This should never happen as we're walking back a chain that should connect to
// the canonical chain
return Ok(None);
}
}
// If we have reached the current canonical head by walking back from the target, then we
// know this represents an extension of the canonical chain.
if current_hash == self.state.tree_state.current_canonical_head.hash {
new_chain.reverse();
// Simple extension of the current chain
return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
}
// We have a reorg. Walk back both chains to find the fork point.
let mut old_chain = Vec::new();
let mut old_hash = self.state.tree_state.current_canonical_head.hash;
while old_hash != current_hash {
if let Some(block) = self.executed_block_by_hash(old_hash)? {
old_chain.push(block.clone());
old_hash = block.block.header.parent_hash;
} else {
// This shouldn't happen as we're walking back the canonical chain
warn!(target: "consensus::engine", current_hash=?old_hash, "Canonical block not found in TreeState");
return Ok(None);
}
if old_hash == current_hash {
// We've found the fork point
break;
}
if let Some(block) = self.executed_block_by_hash(current_hash)? {
if self.is_fork(block.block.hash(), finalized_block)? {
new_chain.push(block.clone());
current_hash = block.block.parent_hash;
}
} else {
// This shouldn't happen as we've already walked this path
warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState");
return Ok(None);
}
}
new_chain.reverse();
old_chain.reverse();
Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
}
/// Determines if the given block is part of a fork by checking that these
/// conditions are true:
/// * walking back from the target hash to verify that the target hash is not part of an
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
fn is_fork(&self, target_hash: B256, finalized_hash: Option<B256>) -> ProviderResult<bool> {
// verify that the given hash is not part of an extension of the canon chain.
let mut current_hash = target_hash;
while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
if current_block.hash() == self.state.tree_state.canonical_block_hash() {
return Ok(false)
}
current_hash = current_block.parent_hash;
}
// verify that the given hash is not already part of the canon chain
current_hash = self.state.tree_state.canonical_block_hash();
while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
if Some(current_hash) == finalized_hash {
return Ok(true)
}
if current_block.hash() == target_hash {
return Ok(false)
}
current_hash = current_block.parent_hash;
}
Ok(true)
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
@@ -914,8 +953,11 @@ where
return Ok(valid_outcome(state.head_block_hash))
}
let finalized_block_opt =
(!state.finalized_block_hash.is_zero()).then_some(state.finalized_block_hash);
// 2. ensure we can apply a new chain update for the head block
if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) {
if let Some(chain_update) = self.on_new_head(state.head_block_hash, finalized_block_opt)? {
let tip = chain_update.tip().header.clone();
self.on_canonical_chain_update(chain_update);
@@ -1222,8 +1264,10 @@ where
/// Attempts to make the given target canonical.
///
/// This will update the tracked canonical in memory state and do the necessary housekeeping.
fn make_canonical(&mut self, target: B256) {
if let Some(chain_update) = self.state.tree_state.on_new_head(target) {
fn make_canonical(&mut self, target: B256, finalized: Option<B256>) {
// TODO: propagate errors, after https://github.com/paradigmxyz/reth/pull/10276 is merged,
// since it makes it easier
if let Ok(Some(chain_update)) = self.on_new_head(target, finalized) {
self.on_canonical_chain_update(chain_update);
}
}
@@ -1239,8 +1283,8 @@ where
fn on_tree_event(&mut self, event: TreeEvent) {
match event {
TreeEvent::TreeAction(action) => match action {
TreeAction::MakeCanonical(target) => {
self.make_canonical(target);
TreeAction::MakeCanonical { sync_target_head, sync_target_finalized } => {
self.make_canonical(sync_target_head, sync_target_finalized);
}
},
TreeEvent::BackfillAction(action) => {
@@ -1606,7 +1650,12 @@ where
if self.is_sync_target_head(child_num_hash.hash) &&
matches!(res, InsertPayloadOk2::Inserted(BlockStatus2::Valid))
{
self.make_canonical(child_num_hash.hash);
// we are using the sync target here because we're trying to make the sync
// target canonical
let sync_target_finalized =
self.state.forkchoice_state_tracker.sync_target_finalized();
self.make_canonical(child_num_hash.hash, sync_target_finalized);
}
}
Err(err) => {
@@ -1842,11 +1891,15 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) => {
if self.is_sync_target_head(block_num_hash.hash) {
trace!(target: "engine", "appended downloaded sync target block");
let sync_target_finalized =
self.state.forkchoice_state_tracker.sync_target_finalized();
// we just inserted the current sync target block, we can try to make it
// canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical(
block_num_hash.hash,
))))
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
sync_target_finalized,
})))
}
trace!(target: "engine", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash)?;
@@ -2003,8 +2056,12 @@ where
self.state.tree_state.insert_executed(executed);
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// we are checking that this is a fork block compared to the current `SYNCING` forkchoice
// state.
let finalized = self.state.forkchoice_state_tracker.sync_target_finalized();
// emit insert event
let engine_event = if self.state.tree_state.is_fork(block_hash) {
let engine_event = if self.is_fork(block_hash, finalized)? {
BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block)
} else {
BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, start.elapsed())
@@ -3005,17 +3062,18 @@ mod tests {
#[tokio::test]
async fn test_tree_state_on_new_head() {
let mut tree_state = TreeState::new(BlockNumHash::default());
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::default();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
for block in &blocks {
tree_state.insert_executed(block.clone());
test_harness.tree.state.tree_state.insert_executed(block.clone());
}
// set block 3 as the current canonical head
tree_state.set_canonical_head(blocks[2].block.num_hash());
test_harness.tree.state.tree_state.set_canonical_head(blocks[2].block.num_hash());
// create a fork from block 2
let fork_block_3 =
@@ -3025,12 +3083,12 @@ mod tests {
let fork_block_5 =
test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash());
tree_state.insert_executed(fork_block_3.clone());
tree_state.insert_executed(fork_block_4.clone());
tree_state.insert_executed(fork_block_5.clone());
test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
// normal (non-reorg) case
let result = tree_state.on_new_head(blocks[4].block.hash());
let result = test_harness.tree.on_new_head(blocks[4].block.hash(), None).unwrap();
assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
if let Some(NewCanonicalChain::Commit { new }) = result {
assert_eq!(new.len(), 2);
@@ -3039,7 +3097,7 @@ mod tests {
}
// reorg case
let result = tree_state.on_new_head(fork_block_5.block.hash());
let result = test_harness.tree.on_new_head(fork_block_5.block.hash(), None).unwrap();
assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
if let Some(NewCanonicalChain::Reorg { new, old }) = result {
assert_eq!(new.len(), 3);
@@ -3056,26 +3114,27 @@ mod tests {
async fn test_tree_state_on_new_head_deep_fork() {
reth_tracing::init_test_tracing();
let mut tree_state = TreeState::new(BlockNumHash::default());
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::default();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
for block in &blocks {
tree_state.insert_executed(block.clone());
test_harness.tree.state.tree_state.insert_executed(block.clone());
}
// set last block as the current canonical head
let last_block = blocks.last().unwrap().block.clone();
tree_state.set_canonical_head(last_block.num_hash());
test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
// create a fork chain from last_block
let chain_a = test_block_builder.create_fork(&last_block, 10);
let chain_b = test_block_builder.create_fork(&last_block, 10);
for block in &chain_a {
tree_state.insert_executed(ExecutedBlock {
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
block: Arc::new(block.block.clone()),
senders: Arc::new(block.senders.clone()),
execution_output: Arc::new(ExecutionOutcome::default()),
@@ -3083,10 +3142,10 @@ mod tests {
trie: Arc::new(TrieUpdates::default()),
});
}
tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
for block in &chain_b {
tree_state.insert_executed(ExecutedBlock {
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
block: Arc::new(block.block.clone()),
senders: Arc::new(block.senders.clone()),
execution_output: Arc::new(ExecutionOutcome::default()),
@@ -3096,7 +3155,8 @@ mod tests {
}
// reorg case
let result = tree_state.on_new_head(chain_b.first().unwrap().block.hash());
let result =
test_harness.tree.on_new_head(chain_b.first().unwrap().block.hash(), None).unwrap();
assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
if let Some(NewCanonicalChain::Reorg { new, old }) = result {
assert_eq!(new.len(), 1);
@@ -3412,7 +3472,7 @@ mod tests {
.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
.unwrap();
test_harness.check_fork_chain_insertion(remaining).await;
test_harness.check_canon_chain_insertion(remaining).await;
// check canonical chain committed event with the hash of the latest block
test_harness.check_canon_commit(main_chain_last_hash).await;
@@ -3519,7 +3579,7 @@ mod tests {
test_harness.check_canon_head(chain_b_tip_hash);
// verify that chain A is now considered a fork
assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash()));
assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash(), None).unwrap());
}
#[tokio::test]