From 48c42f563059739e7ae820c3922bd69e26f9dec5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 23 Jul 2024 14:05:54 +0200 Subject: [PATCH] feat: some progress on FCU handling (#9727) --- crates/engine/tree/src/engine.rs | 7 ++ crates/engine/tree/src/tree/mod.rs | 153 +++++++++++++++++++++++++---- 2 files changed, 140 insertions(+), 20 deletions(-) diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index a6cb75271e..eeac5e5d75 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -251,3 +251,10 @@ pub enum DownloadRequest { /// Download the given range of blocks. BlockRange(B256, u64), } + +impl DownloadRequest { + /// Returns a [`DownloadRequest`] for a single block. + pub fn single_block(hash: B256) -> Self { + Self::BlockSet(HashSet::from([hash])) + } +} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index cc2e0c70d9..2ea849b499 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -13,7 +13,7 @@ use reth_blockchain_tree::{ error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus, }; use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; -use reth_chain_state::{BlockState, CanonicalInMemoryState, ExecutedBlock}; +use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock}; use reth_consensus::{Consensus, PostExecutionInput}; use reth_engine_primitives::EngineTypes; use reth_errors::{ConsensusError, ProviderResult}; @@ -26,7 +26,8 @@ use reth_primitives::{ SealedBlockWithSenders, SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider, + BlockReader, CanonStateNotification, ExecutionOutcome, StateProvider, StateProviderFactory, + StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -59,19 +60,36 @@ const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256; const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; /// Keeps track of the state of the tree. +/// +/// ## Invariants +/// +/// - This only stores blocks that are connected to the canonical chain. +/// - All executed blocks are valid and have been executed. #[derive(Debug, Default)] pub struct TreeState { - /// All executed blocks by hash. + /// __All__ executed blocks by block hash. + /// + /// This includes blocks of all forks. blocks_by_hash: HashMap, /// Executed blocks grouped by their respective block number. + /// + /// This maps unique block number to all known blocks for that height. blocks_by_number: BTreeMap>, - /// Pending state not yet applied - pending: Option>, - /// Block number and hash of the current head. - current_head: Option<(BlockNumber, B256)>, + /// Currently tracked canonical head of the chain. + current_canonical_head: BlockNumHash, } impl TreeState { + /// Returns a new, empty tree state that points to the given canonical head. + fn new(current_canonical_head: BlockNumHash) -> Self { + Self { + blocks_by_hash: HashMap::new(), + blocks_by_number: BTreeMap::new(), + current_canonical_head, + } + } + + /// Returns the block by hash. fn block_by_hash(&self, hash: B256) -> Option> { self.blocks_by_hash.get(&hash).map(|b| b.block.clone()) } @@ -114,6 +132,38 @@ impl TreeState { pub(crate) fn max_block_number(&self) -> BlockNumber { *self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0 } + + /// Returns the block number of the pending block: `head + 1` + const fn pending_block_number(&self) -> BlockNumber { + self.current_canonical_head.number + 1 + } + + /// Updates the canonical head to the given block. + fn set_canonical_head(&mut self, new_head: BlockNumHash) { + self.current_canonical_head = new_head; + } + + /// Returns the tracked canonical head. + const fn canonical_head(&self) -> &BlockNumHash { + &self.current_canonical_head + } + + /// Returns the block hash of the canonical head. + const fn canonical_block_hash(&self) -> B256 { + self.canonical_head().hash + } + + /// Returns the new chain for the given head. + /// + /// This also handles reorgs. + // TODO: this type needs to include more info, like missing block etc. + fn on_new_head(&self, new_head: B256) -> Option { + let new_head_block = self.blocks_by_hash.get(&new_head)?; + + // TODO walk the chain back and connect to canonical chain or detect reorg + + None + } } /// Tracks the state of the engine api internals. @@ -133,11 +183,15 @@ pub struct EngineApiTreeState { } impl EngineApiTreeState { - fn new(block_buffer_limit: u32, max_invalid_header_cache_length: u32) -> Self { + fn new( + block_buffer_limit: u32, + max_invalid_header_cache_length: u32, + canonical_block: BlockNumHash, + ) -> Self { Self { invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length), buffer: BlockBuffer::new(block_buffer_limit), - tree_state: TreeState::default(), + tree_state: TreeState::new(canonical_block), forkchoice_state_tracker: ForkchoiceStateTracker::default(), } } @@ -312,15 +366,16 @@ where persistence: PersistenceHandle, payload_builder: PayloadBuilderHandle, ) -> UnboundedReceiver { + let best_block_number = provider.best_block_number().unwrap_or(0); + let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); + let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel(); let state = EngineApiTreeState::new( DEFAULT_BLOCK_BUFFER_LIMIT, DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, + header.num_hash(), ); - let best_block_number = provider.best_block_number().unwrap_or(0); - let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); - let task = Self::new( provider, executor_provider, @@ -390,14 +445,17 @@ where }, FromEngine::Request(request) => match request { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - let output = self.on_forkchoice_updated(state, payload_attrs); + let mut output = self.on_forkchoice_updated(state, payload_attrs); - if let Ok(res) = &output { + if let Ok(res) = &mut output { // emit an event about the handled FCU self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated( state, res.outcome.forkchoice_status(), )); + + // handle the event if any + self.on_maybe_tree_event(res.event.take()); } if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) { @@ -459,7 +517,8 @@ where .block(&sync_target_state.finalized_block_hash) .map(|block| block.number); - // TODO: state housekeeping + // TODO(mattsse): state housekeeping, this needs to update the tracked canonical state and + // attempt to make the current target canonical if we have all the blocks buffered // The block number that the backfill finished at - if the progress or newest // finalized is None then we can't check the distance anyways. @@ -489,6 +548,13 @@ where // TODO: implement state updates and shift canonical state } + /// Convenience function to handle an optional tree event. + fn on_maybe_tree_event(&self, event: Option) { + if let Some(event) = event { + self.on_tree_event(event); + } + } + /// Handles a tree event. fn on_tree_event(&self, event: TreeEvent) { match event { @@ -572,6 +638,10 @@ where } /// Return state provider with reference to in-memory blocks that overlay database state. + /// + /// This merges the state of all blocks that are part of the chain that the requested block is + /// the head of. This includes all blocks that connect back to the canonical block on disk. + // TODO: return error if the chain has gaps fn state_provider( &self, hash: B256, @@ -1143,13 +1213,56 @@ where ) -> ProviderResult> { if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? { self.state.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status()); - - // TODO: make canonical and process payload attributes if valid - return Ok(TreeOutcome::new(on_updated)) } - todo!() + let valid_outcome = |head| { + TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new( + PayloadStatusEnum::Valid, + Some(head), + ))) + }; + + // Process the forkchoice update by trying to make the head block canonical + // + // We can only process this forkchoice update if: + // - we have the `head` block + // - the head block is part of a chain that is connected to the canonical chain. This + // includes reorgs. + // + // Performing a FCU involves: + // - marking the FCU's head block as canonical + // - updating in memory state to reflect the new canonical chain + // - updating canonical state trackers + // - emitting a canonicalization event for the new chain (including reorg) + // - if we have payload attributes, delegate them to the payload service + + // 1. ensure we have a new head block + if self.state.tree_state.canonical_block_hash() == state.head_block_hash { + // the head block is already canonical + return Ok(valid_outcome(state.head_block_hash)) + } + + // 2. ensure we can apply a new chain update for the head block + if let Some(update) = self.state.tree_state.on_new_head(state.head_block_hash) { + // update the tracked canonical head + self.state.tree_state.set_canonical_head(update.tip().num_hash()); + // TODO + // update inmemory state + // update trackers + // emit notification + // validate and handle payload attributes + + return Ok(valid_outcome(state.head_block_hash)) + } + + // 3. we don't have the block to perform the update + let target = self.lowest_buffered_ancestor_or(state.head_block_hash); + + Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status( + PayloadStatusEnum::Syncing, + ))) + .with_event(TreeEvent::Download(DownloadRequest::single_block(target)))) } } @@ -1190,7 +1303,7 @@ mod tests { use super::*; use crate::static_files::StaticFileAction; use reth_beacon_consensus::EthBeaconConsensus; - use reth_chain_state::test_utils::get_executed_blocks; + use reth_chain_state::{test_utils::get_executed_blocks, BlockState}; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::test_utils::MockExecutorProvider;