From 2209381de9b48313d4b9f89f5ec84b78c3333c36 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 23 Jul 2024 18:17:53 +0200 Subject: [PATCH] feat: add new BlockchainProvider type (#9656) --- crates/chain-state/src/in_memory.rs | 101 ++- .../src/providers/blockchain_provider.rs | 812 ++++++++++++++++++ crates/storage/provider/src/providers/mod.rs | 3 + 3 files changed, 907 insertions(+), 9 deletions(-) create mode 100644 crates/storage/provider/src/providers/blockchain_provider.rs diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 9e7a6a4528..7888e87a56 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -1,12 +1,19 @@ //! Types for tracking the canonical chain state in memory. -use crate::ChainInfoTracker; +use crate::{CanonStateNotificationSender, CanonStateNotifications, ChainInfoTracker}; use parking_lot::RwLock; use reth_chainspec::ChainInfo; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{Address, BlockNumHash, Receipts, SealedBlock, SealedHeader, B256}; +use reth_primitives::{ + Address, BlockNumHash, Header, Receipt, Receipts, SealedBlock, SealedBlockWithSenders, + SealedHeader, B256, +}; use reth_trie::{updates::TrieUpdates, HashedPostState}; use std::{collections::HashMap, sync::Arc, time::Instant}; +use tokio::sync::broadcast; + +/// Size of the broadcast channel used to notify canonical state events. +const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256; /// Container type for in memory state data. #[derive(Debug, Default)] @@ -61,6 +68,7 @@ impl InMemoryState { pub(crate) struct CanonicalInMemoryStateInner { pub(crate) chain_info_tracker: ChainInfoTracker, pub(crate) in_memory_state: InMemoryState, + pub(crate) canon_state_notification_sender: CanonStateNotificationSender, } /// This type is responsible for providing the blocks, receipts, and state for @@ -85,7 +93,14 @@ impl CanonicalInMemoryState { None => SealedHeader::default(), }; let chain_info_tracker = ChainInfoTracker::new(header); - let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state }; + let (canon_state_notification_sender, _canon_state_notification_receiver) = + broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); + + let inner = CanonicalInMemoryStateInner { + chain_info_tracker, + in_memory_state, + canon_state_notification_sender, + }; Self { inner: Arc::new(inner) } } @@ -94,7 +109,13 @@ impl CanonicalInMemoryState { pub fn with_head(head: SealedHeader) -> Self { let chain_info_tracker = ChainInfoTracker::new(head); let in_memory_state = InMemoryState::default(); - let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state }; + let (canon_state_notification_sender, _canon_state_notification_receiver) = + broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE); + let inner = CanonicalInMemoryStateInner { + chain_info_tracker, + in_memory_state, + canon_state_notification_sender, + }; Self { inner: Arc::new(inner) } } @@ -196,6 +217,40 @@ impl CanonicalInMemoryState { pub fn get_safe_header(&self) -> Option { self.inner.chain_info_tracker.get_safe_header() } + + /// Returns the `SealedHeader` corresponding to the pending state. + pub fn pending_sealed_header(&self) -> Option { + self.pending_state().map(|h| h.block().block().header.clone()) + } + + /// Returns the `Header` corresponding to the pending state. + pub fn pending_header(&self) -> Option
{ + self.pending_sealed_header().map(|sealed_header| sealed_header.unseal()) + } + + /// Returns the `SealedBlock` corresponding to the pending state. + pub fn pending_block(&self) -> Option { + self.pending_state().map(|block_state| block_state.block().block().clone()) + } + + /// Returns the `SealedBlockWithSenders` corresponding to the pending state. + pub fn pending_block_with_senders(&self) -> Option { + self.pending_state() + .and_then(|block_state| block_state.block().block().clone().seal_with_senders()) + } + + /// Returns a tuple with the `SealedBlock` corresponding to the pending + /// state and a vector of its `Receipt`s. + pub fn pending_block_and_receipts(&self) -> Option<(SealedBlock, Vec)> { + self.pending_state().map(|block_state| { + (block_state.block().block().clone(), block_state.executed_block_receipts()) + }) + } + + /// Subscribe to new blocks events. + pub fn subscribe_canon_state(&self) -> CanonStateNotifications { + self.inner.canon_state_notification_sender.subscribe() + } } /// State after applying the given block. @@ -209,25 +264,53 @@ impl BlockState { Self(executed_block) } - pub(crate) fn block(&self) -> ExecutedBlock { + /// Returns the executed block that determines the state. + pub fn block(&self) -> ExecutedBlock { self.0.clone() } - pub(crate) fn hash(&self) -> B256 { + /// Returns the hash of executed block that determines the state. + pub fn hash(&self) -> B256 { self.0.block().hash() } - pub(crate) fn number(&self) -> u64 { + /// Returns the block number of executed block that determines the state. + pub fn number(&self) -> u64 { self.0.block().number } - pub(crate) fn state_root(&self) -> B256 { + /// Returns the state root after applying the executed block that determines + /// the state. + pub fn state_root(&self) -> B256 { self.0.block().header.state_root } - pub(crate) fn receipts(&self) -> &Receipts { + /// Returns the `Receipts` of executed block that determines the state. + pub fn receipts(&self) -> &Receipts { &self.0.execution_outcome().receipts } + + /// Returns a vector of `Receipt` of executed block that determines the state. + /// We assume that the `Receipts` in the executed block `ExecutionOutcome` + /// has only one element corresponding to the executed block associated to + /// the state. + pub fn executed_block_receipts(&self) -> Vec { + let receipts = self.receipts(); + + debug_assert!( + receipts.receipt_vec.len() <= 1, + "Expected at most one block's worth of receipts, found {}", + receipts.receipt_vec.len() + ); + + receipts + .receipt_vec + .first() + .map(|block_receipts| { + block_receipts.iter().filter_map(|opt_receipt| opt_receipt.clone()).collect() + }) + .unwrap_or_default() + } } /// Represents an executed block stored in-memory. diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs new file mode 100644 index 0000000000..f80ceb9c84 --- /dev/null +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -0,0 +1,812 @@ +use crate::{ + providers::BundleStateProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, + BlockReader, BlockReaderIdExt, BlockSource, BlockchainTreePendingStateProvider, + CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions, ChainSpecProvider, + ChangeSetReader, DatabaseProviderFactory, DatabaseProviderRO, EvmEnvProvider, + FullExecutionDataProvider, HeaderProvider, ProviderError, ProviderFactory, + PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, + StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory, + TransactionVariant, TransactionsProvider, WithdrawalsProvider, +}; +use alloy_rpc_types_engine::ForkchoiceState; +use reth_chain_state::CanonicalInMemoryState; +use reth_chainspec::{ChainInfo, ChainSpec}; +use reth_db_api::{ + database::Database, + models::{AccountBeforeTx, StoredBlockBodyIndices}, +}; +use reth_evm::ConfigureEvmEnv; +use reth_primitives::{ + Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumber, + BlockNumberOrTag, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders, + SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + Withdrawal, Withdrawals, B256, U256, +}; +use reth_prune_types::{PruneCheckpoint, PruneSegment}; +use reth_stages_types::{StageCheckpoint, StageId}; +use reth_storage_errors::provider::ProviderResult; +use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg}; +use std::{ + ops::{RangeBounds, RangeInclusive}, + sync::Arc, + time::Instant, +}; +use tracing::trace; + +use super::StaticFileProvider; + +/// The main type for interacting with the blockchain. +/// +/// This type serves as the main entry point for interacting with the blockchain and provides data +/// from database storage and from the blockchain tree (pending state etc.) It is a simple wrapper +/// type that holds an instance of the database and the blockchain tree. +#[allow(missing_debug_implementations)] +pub struct BlockchainProvider2 { + /// Provider type used to access the database. + database: ProviderFactory, + /// Tracks the chain info wrt forkchoice updates and in memory canonical + /// state. + canonical_in_memory_state: CanonicalInMemoryState, +} + +impl Clone for BlockchainProvider2 { + fn clone(&self) -> Self { + Self { + database: self.database.clone(), + canonical_in_memory_state: self.canonical_in_memory_state.clone(), + } + } +} + +impl BlockchainProvider2 { + /// Create new provider instance that wraps the database and the blockchain tree, using the + /// provided latest header to initialize the chain info tracker. + pub fn with_latest(database: ProviderFactory, latest: SealedHeader) -> Self { + Self { database, canonical_in_memory_state: CanonicalInMemoryState::with_head(latest) } + } +} + +impl BlockchainProvider2 +where + DB: Database, +{ + /// Create a new provider using only the database, fetching the latest header from + /// the database to initialize the provider. + pub fn new(database: ProviderFactory) -> ProviderResult { + let provider = database.provider()?; + let best: ChainInfo = provider.chain_info()?; + match provider.header_by_number(best.best_number)? { + Some(header) => { + drop(provider); + Ok(Self::with_latest(database, header.seal(best.best_hash))) + } + None => Err(ProviderError::HeaderNotFound(best.best_number.into())), + } + } +} + +impl BlockchainProvider2 +where + DB: Database, +{ + /// Ensures that the given block number is canonical (synced) + /// + /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are + /// out of range and would lead to invalid results, mainly during initial sync. + /// + /// Verifying the `block_number` would be expensive since we need to lookup sync table + /// Instead, we ensure that the `block_number` is within the range of the + /// [`Self::best_block_number`] which is updated when a block is synced. + #[inline] + fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> { + let latest = self.best_block_number()?; + if block_number > latest { + Err(ProviderError::HeaderNotFound(block_number.into())) + } else { + Ok(()) + } + } +} + +impl DatabaseProviderFactory for BlockchainProvider2 +where + DB: Database, +{ + fn database_provider_ro(&self) -> ProviderResult> { + self.database.provider() + } +} + +impl StaticFileProviderFactory for BlockchainProvider2 { + fn static_file_provider(&self) -> StaticFileProvider { + self.database.static_file_provider() + } +} + +impl HeaderProvider for BlockchainProvider2 +where + DB: Database, +{ + fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + self.database.header(block_hash) + } + + fn header_by_number(&self, num: BlockNumber) -> ProviderResult> { + self.database.header_by_number(num) + } + + fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + self.database.header_td(hash) + } + + fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult> { + self.database.header_td_by_number(number) + } + + fn headers_range(&self, range: impl RangeBounds) -> ProviderResult> { + self.database.headers_range(range) + } + + fn sealed_header(&self, number: BlockNumber) -> ProviderResult> { + self.database.sealed_header(number) + } + + fn sealed_headers_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.database.sealed_headers_range(range) + } + + fn sealed_headers_while( + &self, + range: impl RangeBounds, + predicate: impl FnMut(&SealedHeader) -> bool, + ) -> ProviderResult> { + self.database.sealed_headers_while(range, predicate) + } +} + +impl BlockHashReader for BlockchainProvider2 +where + DB: Database, +{ + fn block_hash(&self, number: u64) -> ProviderResult> { + self.database.block_hash(number) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + self.database.canonical_hashes_range(start, end) + } +} + +impl BlockNumReader for BlockchainProvider2 +where + DB: Database, +{ + fn chain_info(&self) -> ProviderResult { + Ok(self.canonical_in_memory_state.chain_info()) + } + + fn best_block_number(&self) -> ProviderResult { + Ok(self.canonical_in_memory_state.get_canonical_block_number()) + } + + fn last_block_number(&self) -> ProviderResult { + self.database.last_block_number() + } + + fn block_number(&self, hash: B256) -> ProviderResult> { + self.database.block_number(hash) + } +} + +impl BlockIdReader for BlockchainProvider2 +where + DB: Database, +{ + fn pending_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block_num_hash()) + } + + fn safe_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.get_safe_num_hash()) + } + + fn finalized_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.get_finalized_num_hash()) + } +} + +impl BlockReader for BlockchainProvider2 +where + DB: Database, +{ + fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult> { + let block = match source { + BlockSource::Any | BlockSource::Canonical => { + // check in memory first + // Note: it's fine to return the unsealed block because the caller already has + // the hash + let mut block = self + .canonical_in_memory_state + .state_by_hash(hash) + .map(|block_state| block_state.block().block().clone().unseal()); + + if block.is_none() { + block = self.database.block_by_hash(hash)?; + } + block + } + BlockSource::Pending => { + self.canonical_in_memory_state.pending_block().map(|block| block.unseal()) + } + }; + + Ok(block) + } + + fn block(&self, id: BlockHashOrNumber) -> ProviderResult> { + match id { + BlockHashOrNumber::Hash(hash) => self.find_block_by_hash(hash, BlockSource::Any), + BlockHashOrNumber::Number(num) => self.database.block_by_number(num), + } + } + + fn pending_block(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block()) + } + + fn pending_block_with_senders(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block_with_senders()) + } + + fn pending_block_and_receipts(&self) -> ProviderResult)>> { + Ok(self.canonical_in_memory_state.pending_block_and_receipts()) + } + + fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult>> { + self.database.ommers(id) + } + + fn block_body_indices( + &self, + number: BlockNumber, + ) -> ProviderResult> { + self.database.block_body_indices(number) + } + + /// Returns the block with senders with matching number or hash from database. + /// + /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid + /// hashes, since they would need to be calculated on the spot, and we want fast querying.** + /// + /// Returns `None` if block is not found. + fn block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.database.block_with_senders(id, transaction_kind) + } + + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.database.sealed_block_with_senders(id, transaction_kind) + } + + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { + self.database.block_range(range) + } + + fn block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.database.block_with_senders_range(range) + } + + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.database.sealed_block_with_senders_range(range) + } +} + +impl TransactionsProvider for BlockchainProvider2 +where + DB: Database, +{ + fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult> { + self.database.transaction_id(tx_hash) + } + + fn transaction_by_id(&self, id: TxNumber) -> ProviderResult> { + self.database.transaction_by_id(id) + } + + fn transaction_by_id_no_hash( + &self, + id: TxNumber, + ) -> ProviderResult> { + self.database.transaction_by_id_no_hash(id) + } + + fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult> { + self.database.transaction_by_hash(hash) + } + + fn transaction_by_hash_with_meta( + &self, + tx_hash: TxHash, + ) -> ProviderResult> { + self.database.transaction_by_hash_with_meta(tx_hash) + } + + fn transaction_block(&self, id: TxNumber) -> ProviderResult> { + self.database.transaction_block(id) + } + + fn transactions_by_block( + &self, + id: BlockHashOrNumber, + ) -> ProviderResult>> { + self.database.transactions_by_block(id) + } + + fn transactions_by_block_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.database.transactions_by_block_range(range) + } + + fn transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.database.transactions_by_tx_range(range) + } + + fn senders_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.database.senders_by_tx_range(range) + } + + fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { + self.database.transaction_sender(id) + } +} + +impl ReceiptProvider for BlockchainProvider2 +where + DB: Database, +{ + fn receipt(&self, id: TxNumber) -> ProviderResult> { + self.database.receipt(id) + } + + fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult> { + self.database.receipt_by_hash(hash) + } + + fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult>> { + self.database.receipts_by_block(block) + } + + fn receipts_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.database.receipts_by_tx_range(range) + } +} + +impl ReceiptProviderIdExt for BlockchainProvider2 +where + DB: Database, +{ + fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult>> { + match block { + BlockId::Hash(rpc_block_hash) => { + let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?; + if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) { + let block_state = self + .canonical_in_memory_state + .state_by_hash(rpc_block_hash.block_hash) + .ok_or(ProviderError::StateForHashNotFound(rpc_block_hash.block_hash))?; + receipts = Some(block_state.executed_block_receipts()); + } + Ok(receipts) + } + BlockId::Number(num_tag) => match num_tag { + BlockNumberOrTag::Pending => Ok(self + .canonical_in_memory_state + .pending_state() + .map(|block_state| block_state.executed_block_receipts())), + _ => { + if let Some(num) = self.convert_block_number(num_tag)? { + self.receipts_by_block(num.into()) + } else { + Ok(None) + } + } + }, + } + } +} + +impl WithdrawalsProvider for BlockchainProvider2 +where + DB: Database, +{ + fn withdrawals_by_block( + &self, + id: BlockHashOrNumber, + timestamp: u64, + ) -> ProviderResult> { + self.database.withdrawals_by_block(id, timestamp) + } + + fn latest_withdrawal(&self) -> ProviderResult> { + self.database.latest_withdrawal() + } +} + +impl RequestsProvider for BlockchainProvider2 +where + DB: Database, +{ + fn requests_by_block( + &self, + id: BlockHashOrNumber, + timestamp: u64, + ) -> ProviderResult> { + self.database.requests_by_block(id, timestamp) + } +} + +impl StageCheckpointReader for BlockchainProvider2 +where + DB: Database, +{ + fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult> { + self.database.provider()?.get_stage_checkpoint(id) + } + + fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult>> { + self.database.provider()?.get_stage_checkpoint_progress(id) + } + + fn get_all_checkpoints(&self) -> ProviderResult> { + self.database.provider()?.get_all_checkpoints() + } +} + +impl EvmEnvProvider for BlockchainProvider2 +where + DB: Database, +{ + fn fill_env_at( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + block_env: &mut BlockEnv, + at: BlockHashOrNumber, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv, + { + self.database.provider()?.fill_env_at(cfg, block_env, at, evm_config) + } + + fn fill_env_with_header( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + block_env: &mut BlockEnv, + header: &Header, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv, + { + self.database.provider()?.fill_env_with_header(cfg, block_env, header, evm_config) + } + + fn fill_cfg_env_at( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + at: BlockHashOrNumber, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv, + { + self.database.provider()?.fill_cfg_env_at(cfg, at, evm_config) + } + + fn fill_cfg_env_with_header( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + header: &Header, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv, + { + self.database.provider()?.fill_cfg_env_with_header(cfg, header, evm_config) + } +} + +impl PruneCheckpointReader for BlockchainProvider2 +where + DB: Database, +{ + fn get_prune_checkpoint( + &self, + segment: PruneSegment, + ) -> ProviderResult> { + self.database.provider()?.get_prune_checkpoint(segment) + } + + fn get_prune_checkpoints(&self) -> ProviderResult> { + self.database.provider()?.get_prune_checkpoints() + } +} + +impl ChainSpecProvider for BlockchainProvider2 +where + DB: Send + Sync, +{ + fn chain_spec(&self) -> Arc { + self.database.chain_spec() + } +} + +impl StateProviderFactory for BlockchainProvider2 +where + DB: Database, +{ + /// Storage provider for latest block + fn latest(&self) -> ProviderResult { + trace!(target: "providers::blockchain", "Getting latest block state provider"); + self.database.latest() + } + + fn history_by_block_number( + &self, + block_number: BlockNumber, + ) -> ProviderResult { + trace!(target: "providers::blockchain", ?block_number, "Getting history by block number"); + self.ensure_canonical_block(block_number)?; + self.database.history_by_block_number(block_number) + } + + fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult { + trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash"); + self.database.history_by_block_hash(block_hash) + } + + fn state_by_block_hash(&self, block: BlockHash) -> ProviderResult { + trace!(target: "providers::blockchain", ?block, "Getting state by block hash"); + let mut state = self.history_by_block_hash(block); + + // we failed to get the state by hash, from disk, hash block be the pending block + if state.is_err() { + if let Ok(Some(pending)) = self.pending_state_by_hash(block) { + // we found pending block by hash + state = Ok(pending) + } + } + + state + } + + /// Returns the state provider for pending state. + /// + /// If there's no pending block available then the latest state provider is returned: + /// [`Self::latest`] + fn pending(&self) -> ProviderResult { + trace!(target: "providers::blockchain", "Getting provider for pending state"); + + // TODO: check in memory overlay https://github.com/paradigmxyz/reth/issues/9614 + + // fallback to latest state if the pending block is not available + self.latest() + } + + fn pending_state_by_hash(&self, _block_hash: B256) -> ProviderResult> { + // TODO: check in memory overlay https://github.com/paradigmxyz/reth/issues/9614 + + Ok(None) + } + + fn pending_with_provider( + &self, + bundle_state_data: Box, + ) -> ProviderResult { + let canonical_fork = bundle_state_data.canonical_fork(); + trace!(target: "providers::blockchain", ?canonical_fork, "Returning post state provider"); + + let state_provider = self.history_by_block_hash(canonical_fork.hash)?; + let bundle_state_provider = BundleStateProvider::new(state_provider, bundle_state_data); + Ok(Box::new(bundle_state_provider)) + } +} + +impl CanonChainTracker for BlockchainProvider2 +where + DB: Send + Sync, + Self: BlockReader, +{ + fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) { + // update timestamp + self.canonical_in_memory_state.on_forkchoice_update_received(); + } + + fn last_received_update_timestamp(&self) -> Option { + self.canonical_in_memory_state.last_received_update_timestamp() + } + + fn on_transition_configuration_exchanged(&self) { + self.canonical_in_memory_state.on_transition_configuration_exchanged(); + } + + fn last_exchanged_transition_configuration_timestamp(&self) -> Option { + self.canonical_in_memory_state.last_exchanged_transition_configuration_timestamp() + } + + fn set_canonical_head(&self, header: SealedHeader) { + self.canonical_in_memory_state.set_canonical_head(header); + } + + fn set_safe(&self, header: SealedHeader) { + self.canonical_in_memory_state.set_safe(header); + } + + fn set_finalized(&self, header: SealedHeader) { + self.canonical_in_memory_state.set_finalized(header); + } +} + +impl BlockReaderIdExt for BlockchainProvider2 +where + Self: BlockReader + BlockIdReader + ReceiptProviderIdExt, +{ + fn block_by_id(&self, id: BlockId) -> ProviderResult> { + match id { + BlockId::Number(num) => self.block_by_number_or_tag(num), + BlockId::Hash(hash) => { + // TODO: should we only apply this for the RPCs that are listed in EIP-1898? + // so not at the provider level? + // if we decide to do this at a higher level, then we can make this an automatic + // trait impl + if Some(true) == hash.require_canonical { + // check the database, canonical blocks are only stored in the database + self.find_block_by_hash(hash.block_hash, BlockSource::Canonical) + } else { + self.block_by_hash(hash.block_hash) + } + } + } + } + + fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult> { + Ok(match id { + BlockNumberOrTag::Latest => { + Some(self.canonical_in_memory_state.get_canonical_head().unseal()) + } + BlockNumberOrTag::Finalized => { + self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal()) + } + BlockNumberOrTag::Safe => { + self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal()) + } + BlockNumberOrTag::Earliest => self.header_by_number(0)?, + BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(), + + BlockNumberOrTag::Number(num) => self.header_by_number(num)?, + }) + } + + fn sealed_header_by_number_or_tag( + &self, + id: BlockNumberOrTag, + ) -> ProviderResult> { + match id { + BlockNumberOrTag::Latest => { + Ok(Some(self.canonical_in_memory_state.get_canonical_head())) + } + BlockNumberOrTag::Finalized => { + Ok(self.canonical_in_memory_state.get_finalized_header()) + } + BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()), + BlockNumberOrTag::Earliest => { + self.header_by_number(0)?.map_or_else(|| Ok(None), |h| Ok(Some(h.seal_slow()))) + } + BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()), + BlockNumberOrTag::Number(num) => { + self.header_by_number(num)?.map_or_else(|| Ok(None), |h| Ok(Some(h.seal_slow()))) + } + } + } + + fn sealed_header_by_id(&self, id: BlockId) -> ProviderResult> { + Ok(match id { + BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?, + BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(|h| h.seal_slow()), + }) + } + + fn header_by_id(&self, id: BlockId) -> ProviderResult> { + Ok(match id { + BlockId::Number(num) => self.header_by_number_or_tag(num)?, + BlockId::Hash(hash) => self.header(&hash.block_hash)?, + }) + } + + fn ommers_by_id(&self, id: BlockId) -> ProviderResult>> { + match id { + BlockId::Number(num) => self.ommers_by_number_or_tag(num), + BlockId::Hash(hash) => { + // TODO: EIP-1898 question, see above + // here it is not handled + self.ommers(BlockHashOrNumber::Hash(hash.block_hash)) + } + } + } +} + +impl BlockchainTreePendingStateProvider for BlockchainProvider2 +where + DB: Send + Sync, +{ + fn find_pending_state_provider( + &self, + _block_hash: BlockHash, + ) -> Option> { + // TODO: check in memory overlay https://github.com/paradigmxyz/reth/issues/9614 + None + } +} + +impl CanonStateSubscriptions for BlockchainProvider2 +where + DB: Send + Sync, +{ + fn subscribe_to_canonical_state(&self) -> CanonStateNotifications { + self.canonical_in_memory_state.subscribe_canon_state() + } +} + +impl ChangeSetReader for BlockchainProvider2 +where + DB: Database, +{ + fn account_block_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + self.database.provider()?.account_block_changeset(block_number) + } +} + +impl AccountReader for BlockchainProvider2 +where + DB: Database + Sync + Send, +{ + /// Get basic account information. + fn basic_account(&self, address: Address) -> ProviderResult> { + self.database.provider()?.basic_account(address) + } +} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index d7aa5b1517..be6db2dcad 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -59,6 +59,9 @@ mod consistent_view; use alloy_rpc_types_engine::ForkchoiceState; pub use consistent_view::{ConsistentDbView, ConsistentViewError}; +mod blockchain_provider; +pub use blockchain_provider::BlockchainProvider2; + /// The main type for interacting with the blockchain. /// /// This type serves as the main entry point for interacting with the blockchain and provides data