From ce4d4bd43dbbb0d0cc64c582697544e6a023eec6 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 18 Jul 2024 09:30:43 +0200 Subject: [PATCH] feat: add canonical in memory state (#9588) --- crates/engine/tree/src/tree/mod.rs | 162 +++++++++++++----- .../provider/src/providers/chain_info.rs | 34 ++-- crates/storage/provider/src/providers/mod.rs | 10 +- 3 files changed, 139 insertions(+), 67 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 5f603d56df..0ab987a330 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -4,6 +4,7 @@ use crate::{ engine::{DownloadRequest, EngineApiEvent, FromEngine}, persistence::PersistenceHandle, }; +use parking_lot::RwLock; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, }; @@ -19,10 +20,11 @@ use reth_payload_primitives::PayloadTypes; use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ Address, Block, BlockNumber, GotExpected, Receipts, Requests, SealedBlock, - SealedBlockWithSenders, B256, U256, + SealedBlockWithSenders, SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider, + providers::ChainInfoTracker, BlockReader, ExecutionOutcome, StateProvider, + StateProviderFactory, StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -155,9 +157,9 @@ impl TreeState { /// Container type for in memory state data. #[derive(Debug, Default)] pub struct InMemoryStateImpl { - blocks: HashMap>, - numbers: HashMap, - pending: Option, + blocks: RwLock>>, + numbers: RwLock>, + pending: RwLock>, } impl InMemoryStateImpl { @@ -166,29 +168,94 @@ impl InMemoryStateImpl { numbers: HashMap, pending: Option, ) -> Self { - Self { blocks, numbers, pending } + Self { + blocks: RwLock::new(blocks), + numbers: RwLock::new(numbers), + pending: RwLock::new(pending), + } } } impl InMemoryState for InMemoryStateImpl { fn state_by_hash(&self, hash: B256) -> Option> { - self.blocks.get(&hash).cloned() + self.blocks.read().get(&hash).cloned() } fn state_by_number(&self, number: u64) -> Option> { - self.numbers.get(&number).and_then(|hash| self.blocks.get(hash).cloned()) + self.numbers.read().get(&number).and_then(|hash| self.blocks.read().get(hash).cloned()) } - fn current_head(&self) -> Option<(BlockNumber, B256)> { - self.numbers.iter().max_by_key(|(&number, _)| number).map(|(&number, &hash)| (number, hash)) - } - - fn pending_block_hash(&self) -> Option { - self.pending.as_ref().map(|state| state.hash()) + fn head_state(&self) -> Option> { + self.numbers + .read() + .iter() + .max_by_key(|(&number, _)| number) + .and_then(|(_, hash)| self.blocks.read().get(hash).cloned()) } fn pending_state(&self) -> Option> { - self.pending.as_ref().map(|state| Arc::new(State(state.0.clone()))) + self.pending.read().as_ref().map(|state| Arc::new(State(state.0.clone()))) + } +} + +/// Inner type to provide in memory state. It includes a chain tracker to be +/// advanced internally by the tree. +#[derive(Debug)] +struct CanonicalInMemoryStateInner { + chain_info_tracker: ChainInfoTracker, + in_memory_state: InMemoryStateImpl, +} + +/// This type is responsible for providing the blocks, receipts, and state for +/// all canonical blocks not on disk yet and keeps track of the block range that +/// is in memory. +#[derive(Debug, Clone)] +pub struct CanonicalInMemoryState { + inner: Arc, +} + +impl CanonicalInMemoryState { + fn new( + blocks: HashMap>, + numbers: HashMap, + pending: Option, + ) -> Self { + let in_memory_state = InMemoryStateImpl::new(blocks, numbers, pending); + let head_state = in_memory_state.head_state(); + let header = match head_state { + Some(state) => state.block().block().header.clone(), + None => SealedHeader::default(), + }; + let chain_info_tracker = ChainInfoTracker::new(header); + let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state }; + + Self { inner: Arc::new(inner) } + } + + fn with_header(header: SealedHeader) -> Self { + let chain_info_tracker = ChainInfoTracker::new(header); + let in_memory_state = InMemoryStateImpl::default(); + let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state }; + + Self { inner: Arc::new(inner) } + } +} + +impl InMemoryState for CanonicalInMemoryState { + fn state_by_hash(&self, hash: B256) -> Option> { + self.inner.in_memory_state.state_by_hash(hash) + } + + fn state_by_number(&self, number: u64) -> Option> { + self.inner.in_memory_state.state_by_number(number) + } + + fn head_state(&self) -> Option> { + self.inner.in_memory_state.head_state() + } + + fn pending_state(&self) -> Option> { + self.inner.in_memory_state.pending_state() } } @@ -316,7 +383,7 @@ pub struct EngineApiTreeHandlerImpl { persistence_state: PersistenceState, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, - canonical_in_memory_state: InMemoryStateImpl, + canonical_in_memory_state: CanonicalInMemoryState, _marker: PhantomData, } @@ -335,6 +402,7 @@ where incoming: Receiver>>, outgoing: UnboundedSender, state: EngineApiTreeState, + header: SealedHeader, persistence: PersistenceHandle, ) -> Self { Self { @@ -348,7 +416,7 @@ where persistence_state: PersistenceState::default(), is_pipeline_active: false, state, - canonical_in_memory_state: InMemoryStateImpl::default(), + canonical_in_memory_state: CanonicalInMemoryState::with_header(header), _marker: PhantomData, } } @@ -361,6 +429,7 @@ where payload_validator: ExecutionPayloadValidator, incoming: Receiver>>, state: EngineApiTreeState, + header: SealedHeader, persistence: PersistenceHandle, ) -> UnboundedSender { let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel(); @@ -372,6 +441,7 @@ where incoming, outgoing.clone(), state, + header, persistence, ); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); @@ -935,17 +1005,15 @@ trait InMemoryState: Send + Sync { fn state_by_hash(&self, hash: B256) -> Option>; /// Returns the state for a given block number. fn state_by_number(&self, number: u64) -> Option>; - /// Returns the current chain head. - fn current_head(&self) -> Option<(BlockNumber, B256)>; - /// Returns the pending block hash. - fn pending_block_hash(&self) -> Option; + /// Returns the current chain head state. + fn head_state(&self) -> Option>; /// Returns the pending state corresponding to the current head plus one, /// from the payload received in newPayload that does not have a FCU yet. fn pending_state(&self) -> Option>; } /// State after applying the given block. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct State(ExecutedBlock); impl State { @@ -1047,6 +1115,7 @@ mod tests { forkchoice_state_tracker: ForkchoiceStateTracker::default(), }; + let header = blocks.first().unwrap().block().header.clone(); let mut tree = EngineApiTreeHandlerImpl::new( provider, executor_factory, @@ -1055,12 +1124,13 @@ mod tests { to_tree_rx, from_tree_tx, engine_api_tree_state, + header, persistence_handle, ); let last_executed_block = blocks.last().unwrap().clone(); let pending = Some(State::new(last_executed_block)); tree.canonical_in_memory_state = - InMemoryStateImpl::new(state_by_hash, hash_by_number, pending); + CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending); TestHarness { tree, to_tree_tx, blocks, sf_action_rx } } @@ -1103,12 +1173,20 @@ mod tests { let expected_state = State::new(executed_block.clone()); - let actual_state_by_hash = - tree.canonical_in_memory_state.state_by_hash(sealed_block.hash()).unwrap(); + let actual_state_by_hash = tree + .canonical_in_memory_state + .inner + .in_memory_state + .state_by_hash(sealed_block.hash()) + .unwrap(); assert_eq!(expected_state, *actual_state_by_hash); - let actual_state_by_number = - tree.canonical_in_memory_state.state_by_number(sealed_block.number).unwrap(); + let actual_state_by_number = tree + .canonical_in_memory_state + .inner + .in_memory_state + .state_by_number(sealed_block.number) + .unwrap(); assert_eq!(expected_state, *actual_state_by_number); } } @@ -1145,28 +1223,23 @@ mod tests { } #[tokio::test] - async fn test_in_memory_state_impl_current_head() { + async fn test_in_memory_state_impl_head_state() { + let mut state_by_hash = HashMap::new(); let mut hash_by_number = HashMap::new(); - let hash1 = B256::random(); - let hash2 = B256::random(); + let state1 = Arc::new(create_mock_state(1)); + let state2 = Arc::new(create_mock_state(2)); + let hash1 = state1.hash(); + let hash2 = state2.hash(); hash_by_number.insert(1, hash1); hash_by_number.insert(2, hash2); + state_by_hash.insert(hash1, state1); + state_by_hash.insert(hash2, state2); - let in_memory_state = InMemoryStateImpl::new(HashMap::new(), hash_by_number, None); + let in_memory_state = InMemoryStateImpl::new(state_by_hash, hash_by_number, None); + let head_state = in_memory_state.head_state().unwrap(); - assert_eq!(in_memory_state.current_head(), Some((2, hash2))); - } - - #[tokio::test] - async fn test_in_memory_state_impl_pending_block_hash() { - let number = rand::thread_rng().gen::(); - let pending_state = create_mock_state(number); - let pending_hash = pending_state.hash(); - - let in_memory_state = - InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state)); - - assert_eq!(in_memory_state.pending_block_hash(), Some(pending_hash)); + assert_eq!(head_state.hash(), hash2); + assert_eq!(head_state.number(), 2); } #[tokio::test] @@ -1189,7 +1262,6 @@ mod tests { async fn test_in_memory_state_impl_no_pending_state() { let in_memory_state = InMemoryStateImpl::new(HashMap::new(), HashMap::new(), None); - assert_eq!(in_memory_state.pending_block_hash(), None); assert_eq!(in_memory_state.pending_state(), None); } diff --git a/crates/storage/provider/src/providers/chain_info.rs b/crates/storage/provider/src/providers/chain_info.rs index c696fefea9..40ca080901 100644 --- a/crates/storage/provider/src/providers/chain_info.rs +++ b/crates/storage/provider/src/providers/chain_info.rs @@ -12,13 +12,13 @@ use tokio::sync::watch; /// Tracks the chain info: canonical head, safe block, finalized block. #[derive(Debug, Clone)] -pub(crate) struct ChainInfoTracker { +pub struct ChainInfoTracker { inner: Arc, } impl ChainInfoTracker { /// Create a new chain info container for the given canonical head. - pub(crate) fn new(head: SealedHeader) -> Self { + pub fn new(head: SealedHeader) -> Self { let (finalized_block, _) = watch::channel(None); let (safe_block, _) = watch::channel(None); Self { @@ -34,73 +34,73 @@ impl ChainInfoTracker { } /// Returns the [`ChainInfo`] for the canonical head. - pub(crate) fn chain_info(&self) -> ChainInfo { + pub fn chain_info(&self) -> ChainInfo { let inner = self.inner.canonical_head.read(); ChainInfo { best_hash: inner.hash(), best_number: inner.number } } /// Update the timestamp when we received a forkchoice update. - pub(crate) fn on_forkchoice_update_received(&self) { + pub fn on_forkchoice_update_received(&self) { self.inner.last_forkchoice_update.write().replace(Instant::now()); } /// Returns the instant when we received the latest forkchoice update. - pub(crate) fn last_forkchoice_update_received_at(&self) -> Option { + pub fn last_forkchoice_update_received_at(&self) -> Option { *self.inner.last_forkchoice_update.read() } /// Update the timestamp when we exchanged a transition configuration. - pub(crate) fn on_transition_configuration_exchanged(&self) { + pub fn on_transition_configuration_exchanged(&self) { self.inner.last_transition_configuration_exchange.write().replace(Instant::now()); } /// Returns the instant when we exchanged the transition configuration last time. - pub(crate) fn last_transition_configuration_exchanged_at(&self) -> Option { + pub fn last_transition_configuration_exchanged_at(&self) -> Option { *self.inner.last_transition_configuration_exchange.read() } /// Returns the canonical head of the chain. - pub(crate) fn get_canonical_head(&self) -> SealedHeader { + pub fn get_canonical_head(&self) -> SealedHeader { self.inner.canonical_head.read().clone() } /// Returns the safe header of the chain. - pub(crate) fn get_safe_header(&self) -> Option { + pub fn get_safe_header(&self) -> Option { self.inner.safe_block.borrow().clone() } /// Returns the finalized header of the chain. - pub(crate) fn get_finalized_header(&self) -> Option { + pub fn get_finalized_header(&self) -> Option { self.inner.finalized_block.borrow().clone() } /// Returns the canonical head of the chain. #[allow(dead_code)] - pub(crate) fn get_canonical_num_hash(&self) -> BlockNumHash { + pub fn get_canonical_num_hash(&self) -> BlockNumHash { self.inner.canonical_head.read().num_hash() } /// Returns the canonical head of the chain. - pub(crate) fn get_canonical_block_number(&self) -> BlockNumber { + pub fn get_canonical_block_number(&self) -> BlockNumber { self.inner.canonical_head_number.load(Ordering::Relaxed) } /// Returns the safe header of the chain. #[allow(dead_code)] - pub(crate) fn get_safe_num_hash(&self) -> Option { + pub fn get_safe_num_hash(&self) -> Option { let h = self.inner.safe_block.borrow(); h.as_ref().map(|h| h.num_hash()) } /// Returns the finalized header of the chain. #[allow(dead_code)] - pub(crate) fn get_finalized_num_hash(&self) -> Option { + pub fn get_finalized_num_hash(&self) -> Option { let h = self.inner.finalized_block.borrow(); h.as_ref().map(|h| h.num_hash()) } /// Sets the canonical head of the chain. - pub(crate) fn set_canonical_head(&self, header: SealedHeader) { + pub fn set_canonical_head(&self, header: SealedHeader) { let number = header.number; *self.inner.canonical_head.write() = header; @@ -109,14 +109,14 @@ impl ChainInfoTracker { } /// Sets the safe header of the chain. - pub(crate) fn set_safe(&self, header: SealedHeader) { + pub fn set_safe(&self, header: SealedHeader) { self.inner.safe_block.send_modify(|h| { let _ = h.replace(header); }); } /// Sets the finalized header of the chain. - pub(crate) fn set_finalized(&self, header: SealedHeader) { + pub fn set_finalized(&self, header: SealedHeader) { self.inner.finalized_block.send_modify(|h| { let _ = h.replace(header); }); diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 2b9175307c..330c880c7e 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -55,7 +55,7 @@ mod bundle_state_provider; pub use bundle_state_provider::BundleStateProvider; mod chain_info; -use chain_info::ChainInfoTracker; +pub use chain_info::ChainInfoTracker; mod consistent_view; use alloy_rpc_types_engine::ForkchoiceState; @@ -74,8 +74,8 @@ pub struct BlockchainProvider { tree: Arc, /// Tracks the chain info wrt forkchoice updates chain_info: ChainInfoTracker, - // TODO: In-memory state for recent blocks and pending state. - //in_memory_state: Arc, + // TODO: replace chain_info with CanonicalInMemoryState. + //canonical_in_memory_state: CanonicalInMemoryState, } impl Clone for BlockchainProvider { @@ -84,8 +84,8 @@ impl Clone for BlockchainProvider { database: self.database.clone(), tree: self.tree.clone(), chain_info: self.chain_info.clone(), - // TODO: add in_memory_state - // in_memory_state: self.in_memory_state.clone(), + // TODO: add canonical_in_memory_state + // canonical_in_memory_state: self.canonical_in_memory_state.clone(), } } }