From 08972ca5e42db0fec306eacfbe2c46dbf60f737b Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 12 May 2023 12:24:41 +0200 Subject: [PATCH] feat: add forkchoice update tracker type (#2555) Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> --- bin/reth/src/node/mod.rs | 4 +- crates/consensus/beacon/Cargo.toml | 1 + crates/consensus/beacon/src/engine/mod.rs | 101 +++++++++++++---- .../rpc-types/src/eth/engine/forkchoice.rs | 3 + crates/storage/provider/Cargo.toml | 4 +- crates/storage/provider/src/lib.rs | 9 +- .../provider/src/providers/chain_info.rs | 102 ++++++++++++++++++ crates/storage/provider/src/providers/mod.rs | 65 +++++++++-- .../storage/provider/src/traits/chain_info.rs | 17 +++ crates/storage/provider/src/traits/mod.rs | 3 + 10 files changed, 271 insertions(+), 38 deletions(-) create mode 100644 crates/storage/provider/src/providers/chain_info.rs create mode 100644 crates/storage/provider/src/traits/chain_info.rs diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 61b5f8c644..4e69b3b5ac 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -190,7 +190,7 @@ impl Command { // setup the blockchain provider let shareable_db = ShareableDatabase::new(Arc::clone(&db), Arc::clone(&self.chain)); - let blockchain_db = BlockchainProvider::new(shareable_db, blockchain_tree.clone()); + let blockchain_db = BlockchainProvider::new(shareable_db, blockchain_tree.clone())?; let transaction_pool = reth_transaction_pool::Pool::eth_pool( EthTransactionValidator::new(blockchain_db.clone(), Arc::clone(&self.chain)), @@ -341,7 +341,7 @@ impl Command { Arc::clone(&db), ctx.task_executor.clone(), pipeline, - blockchain_tree.clone(), + blockchain_db.clone(), self.debug.max_block, self.debug.continuous, payload_builder.clone(), diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 3e7e30eb87..e65a0e82a7 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -13,6 +13,7 @@ reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-stages = { path = "../../stages" } reth-db = { path = "../../storage/db" } +reth-provider = { path = "../../storage/provider" } reth-rpc-types = { path = "../../rpc/rpc-types" } reth-tasks = { path = "../../tasks" } reth-payload-builder = { path = "../../payload/builder" } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index ab7a6db453..2de37eeb63 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -11,6 +11,7 @@ use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{ listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256, }; +use reth_provider::{BlockProvider, BlockSource, CanonChainTracker, ProviderError}; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError, @@ -137,7 +138,7 @@ pub struct BeaconConsensusEngine where DB: Database, TS: TaskSpawner, - BT: BlockchainTreeEngine, + BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker, { /// The database handle. db: DB, @@ -147,8 +148,8 @@ where /// Must always be [Some] unless the state is being reevaluated. /// The pipeline is used for historical sync by setting the current forkchoice head. pipeline_state: Option>, - /// The blockchain tree used for live sync and reorg tracking. - blockchain_tree: BT, + /// The type we can use to query both the database and the blockchain tree. + blockchain: BT, /// The Engine API message receiver. engine_message_rx: UnboundedReceiverStream, /// A clone of the handle @@ -179,14 +180,14 @@ impl BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner, - BT: BlockchainTreeEngine + 'static, + BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + 'static, { /// Create a new instance of the [BeaconConsensusEngine]. pub fn new( db: DB, task_spawner: TS, pipeline: Pipeline, - blockchain_tree: BT, + blockchain: BT, max_block: Option, continuous: bool, payload_builder: PayloadBuilderHandle, @@ -196,7 +197,7 @@ where db, task_spawner, pipeline, - blockchain_tree, + blockchain, max_block, continuous, payload_builder, @@ -212,7 +213,7 @@ where db: DB, task_spawner: TS, pipeline: Pipeline, - blockchain_tree: BT, + blockchain: BT, max_block: Option, continuous: bool, payload_builder: PayloadBuilderHandle, @@ -224,7 +225,7 @@ where db, task_spawner, pipeline_state: Some(PipelineState::Idle(pipeline)), - blockchain_tree, + blockchain, engine_message_rx: UnboundedReceiverStream::new(rx), handle: handle.clone(), forkchoice_state: None, @@ -279,7 +280,7 @@ where return Some(H256::zero()) } - self.blockchain_tree.find_canonical_ancestor(parent_hash) + self.blockchain.find_canonical_ancestor(parent_hash) } /// Loads the header for the given `block_number` from the database. @@ -334,7 +335,7 @@ where let is_first_forkchoice = self.forkchoice_state.is_none(); self.forkchoice_state = Some(state); let status = if self.is_pipeline_idle() { - match self.blockchain_tree.make_canonical(&state.head_block_hash) { + match self.blockchain.make_canonical(&state.head_block_hash) { Ok(_) => { let head_block_number = self .get_block_number(state.head_block_hash)? @@ -354,9 +355,20 @@ where let header = self .load_header(head_block_number)? .expect("was canonicalized, so it exists"); - return Ok(self.process_payload_attributes(attrs, header, state)) + + let payload_response = + self.process_payload_attributes(attrs, header, state); + if payload_response.is_valid_update() { + // we will return VALID, so let's make sure the info tracker is + // properly updated + self.update_canon_chain(&state)?; + } + return Ok(payload_response) } + // we will return VALID, so let's make sure the info tracker is + // properly updated + self.update_canon_chain(&state)?; PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)) } Err(error) => { @@ -380,6 +392,42 @@ where Ok(OnForkChoiceUpdated::valid(status)) } + /// Sets the state of the canon chain tracker based on the given forkchoice update. This should + /// be called before issuing a VALID forkchoice update. + fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), BeaconEngineError> { + if !update.finalized_block_hash.is_zero() { + let finalized = self + .blockchain + .find_block_by_hash(update.finalized_block_hash, BlockSource::Any)? + .ok_or_else(|| { + Error::Provider(ProviderError::UnknownBlockHash(update.finalized_block_hash)) + })?; + self.blockchain.set_finalized(finalized.header.seal(update.finalized_block_hash)); + } + + if !update.safe_block_hash.is_zero() { + let safe = self + .blockchain + .find_block_by_hash(update.safe_block_hash, BlockSource::Any)? + .ok_or_else(|| { + Error::Provider(ProviderError::UnknownBlockHash(update.safe_block_hash)) + })?; + self.blockchain.set_safe(safe.header.seal(update.safe_block_hash)); + } + + // the consensus engine should ensure the head is not zero so we always update the head + let head = self + .blockchain + .find_block_by_hash(update.head_block_hash, BlockSource::Any)? + .ok_or_else(|| { + Error::Provider(ProviderError::UnknownBlockHash(update.head_block_hash)) + })?; + + self.blockchain.set_canonical_head(head.header.seal(update.head_block_hash)); + self.blockchain.on_forkchoice_update_received(update); + Ok(()) + } + /// Handler for a failed a forkchoice update due to a canonicalization error. /// /// This will determine if the state's head is invalid, and if so, return immediately. @@ -425,6 +473,9 @@ where } /// Validates the payload attributes with respect to the header and fork choice state. + /// + /// Note: At this point, the fork choice update is considered to be VALID, however, we can still + /// return an error if the payload attributes are invalid. fn process_payload_attributes( &self, attrs: PayloadAttributes, @@ -505,7 +556,7 @@ where let header = block.header.clone(); let status = if self.is_pipeline_idle() { - match self.blockchain_tree.insert_block_without_senders(block) { + match self.blockchain.insert_block_without_senders(block) { Ok(status) => { let mut latest_valid_hash = None; let status = match status { @@ -538,7 +589,7 @@ where PayloadStatus::new(status, latest_valid_hash) } } - } else if let Err(error) = self.blockchain_tree.buffer_block_without_sender(block) { + } else if let Err(error) = self.blockchain.buffer_block_without_sender(block) { // received a new payload while we're still syncing to the target let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); @@ -598,7 +649,7 @@ where let needs_pipeline_run = match self.get_block_number(state.finalized_block_hash)? { Some(number) => { // Attempt to restore the tree. - self.blockchain_tree.restore_canonical_hashes(number)?; + self.blockchain.restore_canonical_hashes(number)?; // After restoring the tree, check if the head block is missing. self.db @@ -645,7 +696,7 @@ impl Future for BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner + Unpin, - BT: BlockchainTreeEngine + Unpin + 'static, + BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static, { type Output = Result<(), BeaconEngineError>; @@ -672,7 +723,7 @@ where // Terminate the sync early if it's reached the maximum user // configured block. if is_valid_response { - let tip_number = this.blockchain_tree.canonical_tip().number; + let tip_number = this.blockchain.canonical_tip().number; if this.has_reached_max_block(tip_number) { return Poll::Ready(Ok(())) } @@ -809,7 +860,10 @@ mod tests { use reth_interfaces::test_utils::TestConsensus; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; - use reth_provider::{test_utils::TestExecutorFactory, Transaction}; + use reth_provider::{ + providers::BlockchainProvider, test_utils::TestExecutorFactory, ShareableDatabase, + Transaction, + }; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, sync::Arc, time::Duration}; @@ -821,7 +875,10 @@ mod tests { type TestBeaconConsensusEngine = BeaconConsensusEngine< Arc>, TokioTaskExecutor, - ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, + BlockchainProvider< + Arc>, + ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, + >, >; struct TestEnv { @@ -905,18 +962,22 @@ mod tests { .build(db.clone()); // Setup blockchain tree - let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); + let externals = + TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec.clone()); let config = BlockchainTreeConfig::new(1, 2, 3, 2); let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); let tree = ShareableBlockchainTree::new( BlockchainTree::new(externals, canon_state_notification_sender, config) .expect("failed to create tree"), ); + let shareable_db = ShareableDatabase::new(db.clone(), chain_spec.clone()); + let latest = chain_spec.genesis_header().seal_slow(); + let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest); let (engine, handle) = BeaconConsensusEngine::new( db.clone(), TokioTaskExecutor::default(), pipeline, - tree, + blockchain_provider, None, false, payload_builder, diff --git a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs index b97341225d..bf107cd839 100644 --- a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs +++ b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs @@ -22,8 +22,11 @@ pub type ForkChoiceUpdateResult = Result, +} + +impl ChainInfoTracker { + /// Create a new chain info container for the given canonical head. + pub(crate) fn new(head: SealedHeader) -> Self { + Self { + inner: Arc::new(ChainInfoInner { + last_forkchoice_update: RwLock::new(Instant::now()), + canonical_head: RwLock::new(head), + safe_block: RwLock::new(None), + finalized_block: RwLock::new(None), + }), + } + } + + /// Update the timestamp when we received a forkchoice update. + pub(crate) fn on_forkchoice_update_received(&self) { + *self.inner.last_forkchoice_update.write() = Instant::now(); + } + + /// Returns the instant when we received the latest forkchoice update. + #[allow(unused)] + pub(crate) fn last_forkchoice_update_received_at(&self) -> Instant { + *self.inner.last_forkchoice_update.read() + } + + /// Returns the canonical head of the chain. + #[allow(unused)] + pub(crate) fn get_canonical_head(&self) -> SealedHeader { + self.inner.canonical_head.read().clone() + } + + /// Returns the safe header of the chain. + #[allow(unused)] + pub(crate) fn get_safe_header(&self) -> Option { + self.inner.safe_block.read().clone() + } + + /// Returns the finalized header of the chain. + #[allow(unused)] + pub(crate) fn get_finalized_header(&self) -> Option { + self.inner.finalized_block.read().clone() + } + + /// Returns the canonical head of the chain. + #[allow(unused)] + pub(crate) fn get_canonical_num_hash(&self) -> BlockNumHash { + self.inner.canonical_head.read().num_hash() + } + + /// Returns the safe header of the chain. + #[allow(unused)] + pub(crate) fn get_safe_num_hash(&self) -> Option { + let h = self.inner.safe_block.read(); + h.as_ref().map(|h| h.num_hash()) + } + + /// Returns the finalized header of the chain. + #[allow(unused)] + pub(crate) fn get_finalized_num_hash(&self) -> Option { + let h = self.inner.finalized_block.read(); + h.as_ref().map(|h| h.num_hash()) + } + + /// Sets the canonical head of the chain. + pub(crate) fn set_canonical_head(&self, header: SealedHeader) { + *self.inner.canonical_head.write() = header; + } + + /// Sets the safe header of the chain. + pub(crate) fn set_safe(&self, header: SealedHeader) { + self.inner.safe_block.write().replace(header); + } + + /// Sets the finalized header of the chain. + pub(crate) fn set_finalized(&self, header: SealedHeader) { + self.inner.finalized_block.write().replace(header); + } +} + +/// Container type for all chain info fields +#[derive(Debug)] +struct ChainInfoInner { + /// Timestamp when we received the last fork choice update. + /// + /// This is mainly used to track if we're connected to a beacon node. + last_forkchoice_update: RwLock, + /// The canonical head of the chain. + canonical_head: RwLock, + /// The block that the beacon node considers safe. + safe_block: RwLock>, + /// The block that the beacon node considers finalized. + finalized_block: RwLock>, +} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 92d4088b45..f80a44dff2 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -1,18 +1,20 @@ use crate::{ BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, - BlockchainTreePendingStateProvider, CanonStateNotifications, CanonStateSubscriptions, - EvmEnvProvider, HeaderProvider, PostStateDataProvider, ReceiptProvider, StateProviderBox, - StateProviderFactory, TransactionsProvider, WithdrawalsProvider, + BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, + CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError, + ReceiptProvider, StateProviderBox, StateProviderFactory, TransactionsProvider, + WithdrawalsProvider, }; use reth_db::database::Database; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer}, - Result, + consensus::ForkchoiceState, + Error, Result, }; use reth_primitives::{ Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumber, ChainInfo, Header, - Receipt, SealedBlock, SealedBlockWithSenders, TransactionMeta, TransactionSigned, TxHash, - TxNumber, Withdrawal, H256, U256, + Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned, + TxHash, TxNumber, Withdrawal, H256, U256, }; use reth_revm_primitives::primitives::{BlockEnv, CfgEnv}; pub use state::{ @@ -25,10 +27,11 @@ use std::{ }; use tracing::trace; +mod chain_info; mod database; mod post_state_provider; mod state; -use crate::traits::BlockSource; +use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource}; pub use database::*; pub use post_state_provider::PostStateProvider; @@ -43,12 +46,30 @@ pub struct BlockchainProvider { database: ShareableDatabase, /// The blockchain tree instance. tree: Tree, + /// Tracks the chain info wrt forkchoice updates + chain_info: ChainInfoTracker, } impl BlockchainProvider { - /// Create new provider instance that wraps the database and the blockchain tree. - pub fn new(database: ShareableDatabase, tree: Tree) -> Self { - Self { database, tree } + /// Create new provider instance that wraps the database and the blockchain tree, using the + /// provided latest header to initialize the chain info tracker. + pub fn with_latest(database: ShareableDatabase, tree: Tree, latest: SealedHeader) -> Self { + Self { database, tree, chain_info: ChainInfoTracker::new(latest) } + } +} + +impl BlockchainProvider +where + DB: Database, +{ + /// Create a new provider using only the database and the tree, fetching the latest header from + /// the database to initialize the provider. + pub fn new(database: ShareableDatabase, tree: Tree) -> Result { + let best = database.chain_info()?; + match database.header_by_number(best.best_number)? { + Some(header) => Ok(Self::with_latest(database, tree, header.seal(best.best_hash))), + None => Err(Error::Provider(ProviderError::Header { number: best.best_number })), + } } } @@ -410,6 +431,30 @@ where } } +impl CanonChainTracker for BlockchainProvider +where + DB: Send + Sync, + Tree: Send + Sync, + Self: BlockProvider, +{ + fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) { + // update timestamp + self.chain_info.on_forkchoice_update_received(); + } + + fn set_finalized(&self, header: SealedHeader) { + self.chain_info.set_finalized(header); + } + + fn set_safe(&self, header: SealedHeader) { + self.chain_info.set_safe(header); + } + + fn set_canonical_head(&self, header: SealedHeader) { + self.chain_info.set_canonical_head(header); + } +} + impl BlockProviderIdExt for BlockchainProvider where Self: BlockProvider + BlockIdProvider, diff --git a/crates/storage/provider/src/traits/chain_info.rs b/crates/storage/provider/src/traits/chain_info.rs new file mode 100644 index 0000000000..6fdb338574 --- /dev/null +++ b/crates/storage/provider/src/traits/chain_info.rs @@ -0,0 +1,17 @@ +use reth_interfaces::consensus::ForkchoiceState; +use reth_primitives::SealedHeader; + +/// A type that can track updates related to fork choice updates. +pub trait CanonChainTracker: Send + Sync { + /// Notify the tracker about a received fork choice update. + fn on_forkchoice_update_received(&self, update: &ForkchoiceState); + + /// Sets the canonical head of the chain. + fn set_canonical_head(&self, header: SealedHeader); + + /// Sets the safe block of the chain. + fn set_safe(&self, header: SealedHeader); + + /// Sets the finalized block of the chain. + fn set_finalized(&self, header: SealedHeader); +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index ebaf060e5f..ea12466d72 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -15,6 +15,9 @@ pub use block_id::{BlockIdProvider, BlockNumProvider}; mod evm_env; pub use evm_env::EvmEnvProvider; +mod chain_info; +pub use chain_info::CanonChainTracker; + mod header; pub use header::HeaderProvider;