From 3289cd79bde9b036aee781fcba20dcaf778f135e Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 4 Sep 2024 15:00:07 -0400 Subject: [PATCH] feat(tree): make on_new_head and is_fork aware of disk (#10551) --- crates/engine/tree/src/tree/mod.rs | 312 +++++++++++++++++------------ 1 file changed, 186 insertions(+), 126 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 21ba30cbc5..ed13fb4540 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -162,33 +162,6 @@ impl TreeState { } } - /// Determines if the given block is part of a fork by checking that these - /// conditions are true: - /// * walking back from the target hash to verify that the target hash is not part of an - /// extension of the canonical chain. - /// * walking back from the current head to verify that the target hash is not already part of - /// the canonical chain. - fn is_fork(&self, target_hash: B256) -> bool { - // verify that the given hash is not part of an extension of the canon chain. - let mut current_hash = target_hash; - while let Some(current_block) = self.block_by_hash(current_hash) { - if current_block.hash() == self.canonical_block_hash() { - return false - } - current_hash = current_block.header.parent_hash; - } - - // verify that the given hash is not already part of the canon chain - current_hash = self.canonical_block_hash(); - while let Some(current_block) = self.block_by_hash(current_hash) { - if current_block.hash() == target_hash { - return false - } - current_hash = current_block.header.parent_hash; - } - true - } - /// Remove single executed block by its hash. /// /// ## Returns @@ -336,75 +309,6 @@ impl TreeState { const fn canonical_block_number(&self) -> BlockNumber { self.canonical_head().number } - - /// Returns the new chain for the given head. - /// - /// This also handles reorgs. - /// - /// Note: This does not update the tracked state and instead returns the new chain based on the - /// given head. - fn on_new_head(&self, new_head: B256) -> Option { - let new_head_block = self.blocks_by_hash.get(&new_head)?; - let new_head_number = new_head_block.block.number; - let current_canonical_number = self.current_canonical_head.number; - - let mut new_chain = vec![new_head_block.clone()]; - let mut current_hash = new_head_block.block.parent_hash; - let mut current_number = new_head_number - 1; - - // Walk back the new chain until we reach a block we know about - while current_number > current_canonical_number { - if let Some(block) = self.blocks_by_hash.get(¤t_hash) { - new_chain.push(block.clone()); - current_hash = block.block.parent_hash; - current_number -= 1; - } else { - return None; // We don't have the full chain - } - } - - if current_hash == self.current_canonical_head.hash { - new_chain.reverse(); - - // Simple extension of the current chain - return Some(NewCanonicalChain::Commit { new: new_chain }); - } - - // We have a reorg. Walk back both chains to find the fork point. - let mut old_chain = Vec::new(); - let mut old_hash = self.current_canonical_head.hash; - - while old_hash != current_hash { - if let Some(block) = self.blocks_by_hash.get(&old_hash) { - old_chain.push(block.clone()); - old_hash = block.block.parent_hash; - } else { - // This shouldn't happen as we're walking back the canonical chain - warn!(target: "consensus::engine", invalid_hash=?old_hash, "Canonical block not found in TreeState"); - return None; - } - - if old_hash == current_hash { - // We've found the fork point - break; - } - - if let Some(block) = self.blocks_by_hash.get(¤t_hash) { - if self.is_fork(block.block.hash()) { - new_chain.push(block.clone()); - current_hash = block.block.parent_hash; - } - } else { - // This shouldn't happen as we've already walked this path - warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState"); - return None; - } - } - new_chain.reverse(); - old_chain.reverse(); - - Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }) - } } /// Tracks the state of the engine api internals. @@ -482,7 +386,12 @@ impl TreeEvent { #[derive(Debug)] pub enum TreeAction { /// Make target canonical. - MakeCanonical(B256), + MakeCanonical { + /// The sync target head hash + sync_target_head: B256, + /// The sync target finalized hash + sync_target_finalized: Option, + }, } /// The engine API tree handler implementation. @@ -836,14 +745,144 @@ where let mut outcome = TreeOutcome::new(status); if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) { + // NOTE: if we are in this branch, `is_sync_target_head` has returned true, + // meaning a sync target state exists, so we can safely unwrap + let sync_target = self + .state + .forkchoice_state_tracker + .sync_target_state() + .expect("sync target must exist"); + + // if the hash is zero then we should act like there is no finalized hash + let sync_target_finalized = (!sync_target.finalized_block_hash.is_zero()) + .then_some(sync_target.finalized_block_hash); + // if the block is valid and it is the sync target head, make it canonical - outcome = - outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash))); + outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical { + sync_target_head: block_hash, + sync_target_finalized, + })); } Ok(outcome) } + /// Returns the new chain for the given head. + /// + /// This also handles reorgs. + /// + /// Note: This does not update the tracked state and instead returns the new chain based on the + /// given head. + fn on_new_head( + &self, + new_head: B256, + finalized_block: Option, + ) -> ProviderResult> { + // get the executed new head block + let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else { + return Ok(None) + }; + + let new_head_number = new_head_block.block.number; + let current_canonical_number = self.state.tree_state.current_canonical_head.number; + + let mut new_chain = vec![new_head_block.clone()]; + let mut current_hash = new_head_block.block.parent_hash; + let mut current_number = new_head_number - 1; + + // Walk back the new chain until we reach a block we know about + // + // This is only done for in-memory blocks, because we should not have persisted any blocks + // that are _above_ the current canonical head. + while current_number > current_canonical_number { + if let Some(block) = self.executed_block_by_hash(current_hash)? { + new_chain.push(block.clone()); + current_hash = block.block.parent_hash; + current_number -= 1; + } else { + warn!(target: "consensus::engine", current_hash=?current_hash, "Sidechain block not found in TreeState"); + // This should never happen as we're walking back a chain that should connect to + // the canonical chain + return Ok(None); + } + } + + // If we have reached the current canonical head by walking back from the target, then we + // know this represents an extension of the canonical chain. + if current_hash == self.state.tree_state.current_canonical_head.hash { + new_chain.reverse(); + + // Simple extension of the current chain + return Ok(Some(NewCanonicalChain::Commit { new: new_chain })); + } + + // We have a reorg. Walk back both chains to find the fork point. + let mut old_chain = Vec::new(); + let mut old_hash = self.state.tree_state.current_canonical_head.hash; + + while old_hash != current_hash { + if let Some(block) = self.executed_block_by_hash(old_hash)? { + old_chain.push(block.clone()); + old_hash = block.block.header.parent_hash; + } else { + // This shouldn't happen as we're walking back the canonical chain + warn!(target: "consensus::engine", current_hash=?old_hash, "Canonical block not found in TreeState"); + return Ok(None); + } + + if old_hash == current_hash { + // We've found the fork point + break; + } + + if let Some(block) = self.executed_block_by_hash(current_hash)? { + if self.is_fork(block.block.hash(), finalized_block)? { + new_chain.push(block.clone()); + current_hash = block.block.parent_hash; + } + } else { + // This shouldn't happen as we've already walked this path + warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState"); + return Ok(None); + } + } + new_chain.reverse(); + old_chain.reverse(); + + Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain })) + } + + /// Determines if the given block is part of a fork by checking that these + /// conditions are true: + /// * walking back from the target hash to verify that the target hash is not part of an + /// extension of the canonical chain. + /// * walking back from the current head to verify that the target hash is not already part of + /// the canonical chain. + fn is_fork(&self, target_hash: B256, finalized_hash: Option) -> ProviderResult { + // verify that the given hash is not part of an extension of the canon chain. + let mut current_hash = target_hash; + while let Some(current_block) = self.sealed_header_by_hash(current_hash)? { + if current_block.hash() == self.state.tree_state.canonical_block_hash() { + return Ok(false) + } + current_hash = current_block.parent_hash; + } + + // verify that the given hash is not already part of the canon chain + current_hash = self.state.tree_state.canonical_block_hash(); + while let Some(current_block) = self.sealed_header_by_hash(current_hash)? { + if Some(current_hash) == finalized_hash { + return Ok(true) + } + + if current_block.hash() == target_hash { + return Ok(false) + } + current_hash = current_block.parent_hash; + } + Ok(true) + } + /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid /// chain. @@ -914,8 +953,11 @@ where return Ok(valid_outcome(state.head_block_hash)) } + let finalized_block_opt = + (!state.finalized_block_hash.is_zero()).then_some(state.finalized_block_hash); + // 2. ensure we can apply a new chain update for the head block - if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) { + if let Some(chain_update) = self.on_new_head(state.head_block_hash, finalized_block_opt)? { let tip = chain_update.tip().header.clone(); self.on_canonical_chain_update(chain_update); @@ -1222,8 +1264,10 @@ where /// Attempts to make the given target canonical. /// /// This will update the tracked canonical in memory state and do the necessary housekeeping. - fn make_canonical(&mut self, target: B256) { - if let Some(chain_update) = self.state.tree_state.on_new_head(target) { + fn make_canonical(&mut self, target: B256, finalized: Option) { + // TODO: propagate errors, after https://github.com/paradigmxyz/reth/pull/10276 is merged, + // since it makes it easier + if let Ok(Some(chain_update)) = self.on_new_head(target, finalized) { self.on_canonical_chain_update(chain_update); } } @@ -1239,8 +1283,8 @@ where fn on_tree_event(&mut self, event: TreeEvent) { match event { TreeEvent::TreeAction(action) => match action { - TreeAction::MakeCanonical(target) => { - self.make_canonical(target); + TreeAction::MakeCanonical { sync_target_head, sync_target_finalized } => { + self.make_canonical(sync_target_head, sync_target_finalized); } }, TreeEvent::BackfillAction(action) => { @@ -1606,7 +1650,12 @@ where if self.is_sync_target_head(child_num_hash.hash) && matches!(res, InsertPayloadOk2::Inserted(BlockStatus2::Valid)) { - self.make_canonical(child_num_hash.hash); + // we are using the sync target here because we're trying to make the sync + // target canonical + let sync_target_finalized = + self.state.forkchoice_state_tracker.sync_target_finalized(); + + self.make_canonical(child_num_hash.hash, sync_target_finalized); } } Err(err) => { @@ -1842,11 +1891,15 @@ where Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) => { if self.is_sync_target_head(block_num_hash.hash) { trace!(target: "engine", "appended downloaded sync target block"); + let sync_target_finalized = + self.state.forkchoice_state_tracker.sync_target_finalized(); + // we just inserted the current sync target block, we can try to make it // canonical - return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical( - block_num_hash.hash, - )))) + return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { + sync_target_head: block_num_hash.hash, + sync_target_finalized, + }))) } trace!(target: "engine", "appended downloaded block"); self.try_connect_buffered_blocks(block_num_hash)?; @@ -2003,8 +2056,12 @@ where self.state.tree_state.insert_executed(executed); self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); + // we are checking that this is a fork block compared to the current `SYNCING` forkchoice + // state. + let finalized = self.state.forkchoice_state_tracker.sync_target_finalized(); + // emit insert event - let engine_event = if self.state.tree_state.is_fork(block_hash) { + let engine_event = if self.is_fork(block_hash, finalized)? { BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block) } else { BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, start.elapsed()) @@ -3005,17 +3062,18 @@ mod tests { #[tokio::test] async fn test_tree_state_on_new_head() { - let mut tree_state = TreeState::new(BlockNumHash::default()); + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec); let mut test_block_builder = TestBlockBuilder::default(); let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect(); for block in &blocks { - tree_state.insert_executed(block.clone()); + test_harness.tree.state.tree_state.insert_executed(block.clone()); } // set block 3 as the current canonical head - tree_state.set_canonical_head(blocks[2].block.num_hash()); + test_harness.tree.state.tree_state.set_canonical_head(blocks[2].block.num_hash()); // create a fork from block 2 let fork_block_3 = @@ -3025,12 +3083,12 @@ mod tests { let fork_block_5 = test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash()); - tree_state.insert_executed(fork_block_3.clone()); - tree_state.insert_executed(fork_block_4.clone()); - tree_state.insert_executed(fork_block_5.clone()); + test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone()); + test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone()); + test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone()); // normal (non-reorg) case - let result = tree_state.on_new_head(blocks[4].block.hash()); + let result = test_harness.tree.on_new_head(blocks[4].block.hash(), None).unwrap(); assert!(matches!(result, Some(NewCanonicalChain::Commit { .. }))); if let Some(NewCanonicalChain::Commit { new }) = result { assert_eq!(new.len(), 2); @@ -3039,7 +3097,7 @@ mod tests { } // reorg case - let result = tree_state.on_new_head(fork_block_5.block.hash()); + let result = test_harness.tree.on_new_head(fork_block_5.block.hash(), None).unwrap(); assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. }))); if let Some(NewCanonicalChain::Reorg { new, old }) = result { assert_eq!(new.len(), 3); @@ -3056,26 +3114,27 @@ mod tests { async fn test_tree_state_on_new_head_deep_fork() { reth_tracing::init_test_tracing(); - let mut tree_state = TreeState::new(BlockNumHash::default()); + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec); let mut test_block_builder = TestBlockBuilder::default(); let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect(); for block in &blocks { - tree_state.insert_executed(block.clone()); + test_harness.tree.state.tree_state.insert_executed(block.clone()); } // set last block as the current canonical head let last_block = blocks.last().unwrap().block.clone(); - tree_state.set_canonical_head(last_block.num_hash()); + test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash()); // create a fork chain from last_block let chain_a = test_block_builder.create_fork(&last_block, 10); let chain_b = test_block_builder.create_fork(&last_block, 10); for block in &chain_a { - tree_state.insert_executed(ExecutedBlock { + test_harness.tree.state.tree_state.insert_executed(ExecutedBlock { block: Arc::new(block.block.clone()), senders: Arc::new(block.senders.clone()), execution_output: Arc::new(ExecutionOutcome::default()), @@ -3083,10 +3142,10 @@ mod tests { trie: Arc::new(TrieUpdates::default()), }); } - tree_state.set_canonical_head(chain_a.last().unwrap().num_hash()); + test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash()); for block in &chain_b { - tree_state.insert_executed(ExecutedBlock { + test_harness.tree.state.tree_state.insert_executed(ExecutedBlock { block: Arc::new(block.block.clone()), senders: Arc::new(block.senders.clone()), execution_output: Arc::new(ExecutionOutcome::default()), @@ -3096,7 +3155,8 @@ mod tests { } // reorg case - let result = tree_state.on_new_head(chain_b.first().unwrap().block.hash()); + let result = + test_harness.tree.on_new_head(chain_b.first().unwrap().block.hash(), None).unwrap(); assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. }))); if let Some(NewCanonicalChain::Reorg { new, old }) = result { assert_eq!(new.len(), 1); @@ -3412,7 +3472,7 @@ mod tests { .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())) .unwrap(); - test_harness.check_fork_chain_insertion(remaining).await; + test_harness.check_canon_chain_insertion(remaining).await; // check canonical chain committed event with the hash of the latest block test_harness.check_canon_commit(main_chain_last_hash).await; @@ -3519,7 +3579,7 @@ mod tests { test_harness.check_canon_head(chain_b_tip_hash); // verify that chain A is now considered a fork - assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash())); + assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash(), None).unwrap()); } #[tokio::test]