From 9cd32e516a3961711c81abf248daafe2757a5a2a Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 31 May 2023 17:06:01 +0100 Subject: [PATCH] feat: Add general `database::Provider` and move implementations from `ShareableDatabase` to it (#2902) --- crates/blockchain-tree/src/blockchain_tree.rs | 16 +- .../provider/src/providers/database.rs | 723 ------------------ .../provider/src/providers/database/mod.rs | 356 +++++++++ .../src/providers/database/provider.rs | 461 +++++++++++ crates/storage/provider/src/providers/mod.rs | 86 ++- .../storage/provider/src/test_utils/mock.rs | 9 + .../storage/provider/src/test_utils/noop.rs | 9 + crates/storage/provider/src/traits/block.rs | 6 + .../storage/provider/src/traits/block_id.rs | 5 +- 9 files changed, 905 insertions(+), 766 deletions(-) delete mode 100644 crates/storage/provider/src/providers/database.rs create mode 100644 crates/storage/provider/src/providers/database/mod.rs create mode 100644 crates/storage/provider/src/providers/database/provider.rs diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 04a67e818d..0473f6576d 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -164,7 +164,7 @@ impl BlockchainTree } // check if block is inside database - if self.externals.database().block_number(block.hash)?.is_some() { + if self.externals.database().provider()?.block_number(block.hash)?.is_some() { return Ok(Some(BlockStatus::Valid)) } @@ -346,9 +346,11 @@ impl BlockchainTree // https://github.com/paradigmxyz/reth/issues/1713 let db = self.externals.database(); + let provider = + db.provider().map_err(|err| InsertBlockError::new(block.block.clone(), err.into()))?; // Validate that the block is post merge - let parent_td = db + let parent_td = provider .header_td(&block.parent_hash) .map_err(|err| InsertBlockError::new(block.block.clone(), err.into()))? .ok_or_else(|| { @@ -366,7 +368,7 @@ impl BlockchainTree )) } - let parent_header = db + let parent_header = provider .header(&block.parent_hash) .map_err(|err| InsertBlockError::new(block.block.clone(), err.into()))? .ok_or_else(|| { @@ -399,6 +401,9 @@ impl BlockchainTree (BlockStatus::Accepted, chain) }; + // let go of `db` immutable borrow + drop(provider); + self.insert_chain(chain); self.try_connect_buffered_blocks(block_num_hash); Ok(block_status) @@ -816,7 +821,7 @@ impl BlockchainTree let mut header = None; if let Some(num) = self.block_indices.get_canonical_block_number(hash) { - header = self.externals.database().header_by_number(num)?; + header = self.externals.database().provider()?.header_by_number(num)?; } if header.is_none() && self.is_block_hash_inside_chain(*hash) { @@ -824,7 +829,7 @@ impl BlockchainTree } if header.is_none() { - header = self.externals.database().header(hash)? + header = self.externals.database().provider()?.header(hash)? } Ok(header.map(|header| header.seal(*hash))) @@ -857,6 +862,7 @@ impl BlockchainTree let td = self .externals .database() + .provider()? .header_td(block_hash)? .ok_or(BlockExecutionError::MissingTotalDifficulty { hash: *block_hash })?; if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(td, U256::ZERO) { diff --git a/crates/storage/provider/src/providers/database.rs b/crates/storage/provider/src/providers/database.rs deleted file mode 100644 index 553f399214..0000000000 --- a/crates/storage/provider/src/providers/database.rs +++ /dev/null @@ -1,723 +0,0 @@ -use crate::{ - providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, - traits::{BlockSource, ReceiptProvider}, - BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, HeaderProvider, - ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider, - WithdrawalsProvider, -}; -use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; -use reth_interfaces::Result; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - Block, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, ChainSpec, Head, Header, Receipt, - SealedBlock, SealedHeader, TransactionMeta, TransactionSigned, TxHash, TxNumber, Withdrawal, - H256, U256, -}; -use reth_revm_primitives::{ - config::revm_spec, - env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env}, - primitives::{BlockEnv, CfgEnv, SpecId}, -}; -use std::{ops::RangeBounds, sync::Arc}; -use tracing::trace; - -/// A common provider that fetches data from a database. -/// -/// This provider implements most provider or provider factory traits. -#[derive(Debug)] -pub struct ShareableDatabase { - /// Database - db: DB, - /// Chain spec - chain_spec: Arc, -} - -impl ShareableDatabase { - /// create new database provider - pub fn new(db: DB, chain_spec: Arc) -> Self { - Self { db, chain_spec } - } -} - -impl Clone for ShareableDatabase { - fn clone(&self) -> Self { - Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } - } -} - -impl ShareableDatabase { - /// Storage provider for latest block - pub fn latest(&self) -> Result> { - trace!(target: "providers::db", "Returning latest state provider"); - Ok(Box::new(LatestStateProvider::new(self.db.tx()?))) - } - - /// Storage provider for state at that given block - pub fn history_by_block_number( - &self, - mut block_number: BlockNumber, - ) -> Result> { - let tx = self.db.tx()?; - - if is_latest_block_number(&tx, block_number)? { - return Ok(Box::new(LatestStateProvider::new(tx))) - } - - // +1 as the changeset that we want is the one that was applied after this block. - block_number += 1; - - trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number"); - Ok(Box::new(HistoricalStateProvider::new(tx, block_number))) - } - - /// Storage provider for state at that given block hash - pub fn history_by_block_hash(&self, block_hash: BlockHash) -> Result> { - let tx = self.db.tx()?; - // get block number - let mut block_number = tx - .get::(block_hash)? - .ok_or(ProviderError::BlockHashNotFound(block_hash))?; - - if is_latest_block_number(&tx, block_number)? { - return Ok(Box::new(LatestStateProvider::new(tx))) - } - - // +1 as the changeset that we want is the one that was applied after this block. - // as the changeset contains old values. - block_number += 1; - - trace!(target: "providers::db", ?block_hash, "Returning historical state provider for block hash"); - Ok(Box::new(HistoricalStateProvider::new(tx, block_number))) - } - - /// Reads the block's ommers blocks and withdrawals. - /// - /// Note: these are mutually exclusive, after shanghai, this only returns withdrawals. Before - /// shanghai, this only returns ommers. - #[allow(clippy::type_complexity)] - fn read_block_ommers_and_withdrawals<'a, TX>( - &self, - tx: &TX, - block_number: u64, - timestamp: u64, - ) -> std::result::Result< - (Option>, Option>), - reth_interfaces::db::DatabaseError, - > - where - TX: DbTx<'a> + Send + Sync, - { - let mut ommers = None; - let mut withdrawals = None; - if self.chain_spec.is_shanghai_activated_at_timestamp(timestamp) { - withdrawals = read_withdrawals_by_number(tx, block_number)?; - } else { - ommers = tx.get::(block_number)?.map(|o| o.ommers); - } - Ok((ommers, withdrawals)) - } -} - -impl HeaderProvider for ShareableDatabase { - fn header(&self, block_hash: &BlockHash) -> Result> { - self.db.view(|tx| { - if let Some(num) = tx.get::(*block_hash)? { - Ok(tx.get::(num)?) - } else { - Ok(None) - } - })? - } - - fn header_by_number(&self, num: BlockNumber) -> Result> { - Ok(self.db.view(|tx| tx.get::(num))??) - } - - fn header_td(&self, hash: &BlockHash) -> Result> { - self.db.view(|tx| { - if let Some(num) = tx.get::(*hash)? { - Ok(tx.get::(num)?.map(|td| td.0)) - } else { - Ok(None) - } - })? - } - - fn header_td_by_number(&self, number: BlockNumber) -> Result> { - self.db.view(|tx| Ok(tx.get::(number)?.map(|td| td.0)))? - } - - fn headers_range(&self, range: impl RangeBounds) -> Result> { - self.db - .view(|tx| { - let mut cursor = tx.cursor_read::()?; - cursor - .walk_range(range)? - .map(|result| result.map(|(_, header)| header).map_err(Into::into)) - .collect::>>() - })? - .map_err(Into::into) - } - - fn sealed_headers_range( - &self, - range: impl RangeBounds, - ) -> Result> { - self.db - .view(|tx| -> Result<_> { - let mut headers = vec![]; - for entry in tx.cursor_read::()?.walk_range(range)? { - let (num, header) = entry?; - let hash = read_header_hash(tx, num)?; - headers.push(header.seal(hash)); - } - Ok(headers) - })? - .map_err(Into::into) - } - - fn sealed_header(&self, number: BlockNumber) -> Result> { - self.db - .view(|tx| -> Result<_> { - if let Some(header) = tx.get::(number)? { - let hash = read_header_hash(tx, number)?; - Ok(Some(header.seal(hash))) - } else { - Ok(None) - } - })? - .map_err(Into::into) - } -} - -impl BlockHashProvider for ShareableDatabase { - fn block_hash(&self, number: u64) -> Result> { - self.db.view(|tx| tx.get::(number))?.map_err(Into::into) - } - - fn canonical_hashes_range(&self, start: BlockNumber, end: BlockNumber) -> Result> { - let range = start..end; - self.db - .view(|tx| { - let mut cursor = tx.cursor_read::()?; - cursor - .walk_range(range)? - .map(|result| result.map(|(_, hash)| hash).map_err(Into::into)) - .collect::>>() - })? - .map_err(Into::into) - } -} - -impl BlockNumProvider for ShareableDatabase { - fn chain_info(&self) -> Result { - let best_number = self.best_block_number()?; - let best_hash = self.block_hash(best_number)?.unwrap_or_default(); - Ok(ChainInfo { best_hash, best_number }) - } - - fn best_block_number(&self) -> Result { - Ok(self.db.view(|tx| best_block_number(tx))??.unwrap_or_default()) - } - - fn block_number(&self, hash: H256) -> Result> { - self.db.view(|tx| read_block_number(tx, hash))?.map_err(Into::into) - } -} - -impl BlockProvider for ShareableDatabase { - fn find_block_by_hash(&self, hash: H256, source: BlockSource) -> Result> { - if source.is_database() { - self.block(hash.into()) - } else { - Ok(None) - } - } - - fn block(&self, id: BlockHashOrNumber) -> Result> { - let tx = self.db.tx()?; - if let Some(number) = convert_hash_or_number(&tx, id)? { - if let Some(header) = read_header(&tx, number)? { - // we check for shanghai first - let (ommers, withdrawals) = - self.read_block_ommers_and_withdrawals(&tx, number, header.timestamp)?; - - let transactions = read_transactions_by_number(&tx, number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(number))?; - - return Ok(Some(Block { - header, - body: transactions, - ommers: ommers.unwrap_or_default(), - withdrawals, - })) - } - } - - Ok(None) - } - - fn pending_block(&self) -> Result> { - Ok(None) - } - - fn ommers(&self, id: BlockHashOrNumber) -> Result>> { - let tx = self.db.tx()?; - if let Some(number) = convert_hash_or_number(&tx, id)? { - // TODO: this can be optimized to return empty Vec post-merge - let ommers = tx.get::(number)?.map(|o| o.ommers); - return Ok(ommers) - } - - Ok(None) - } -} - -impl TransactionsProvider for ShareableDatabase { - fn transaction_id(&self, tx_hash: TxHash) -> Result> { - self.db.view(|tx| tx.get::(tx_hash))?.map_err(Into::into) - } - - fn transaction_by_id(&self, id: TxNumber) -> Result> { - self.db - .view(|tx| tx.get::(id))? - .map_err(Into::into) - .map(|tx| tx.map(Into::into)) - } - - fn transaction_by_hash(&self, hash: TxHash) -> Result> { - self.db - .view(|tx| { - if let Some(id) = tx.get::(hash)? { - tx.get::(id) - } else { - Ok(None) - } - })? - .map_err(Into::into) - .map(|tx| tx.map(Into::into)) - } - - fn transaction_by_hash_with_meta( - &self, - tx_hash: TxHash, - ) -> Result> { - self.db - .view(|tx| -> Result<_> { - if let Some(transaction_id) = tx.get::(tx_hash)? { - if let Some(transaction) = tx.get::(transaction_id)? { - let mut transaction_cursor = - tx.cursor_read::()?; - if let Some(block_number) = - transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))? - { - if let Some((header, block_hash)) = - read_sealed_header(tx, block_number)? - { - if let Some(block_body) = - tx.get::(block_number)? - { - // the index of the tx in the block is the offset: - // len([start..tx_id]) - // SAFETY: `transaction_id` is always `>=` the block's first - // index - let index = transaction_id - block_body.first_tx_num(); - - let meta = TransactionMeta { - tx_hash, - index, - block_hash, - block_number, - base_fee: header.base_fee_per_gas, - }; - - return Ok(Some((transaction.into(), meta))) - } - } - } - } - } - - Ok(None) - })? - .map_err(Into::into) - } - - fn transaction_block(&self, id: TxNumber) -> Result> { - self.db - .view(|tx| { - let mut cursor = tx.cursor_read::()?; - cursor.seek(id).map(|b| b.map(|(_, bn)| bn)) - })? - .map_err(Into::into) - } - - fn transactions_by_block( - &self, - id: BlockHashOrNumber, - ) -> Result>> { - let tx = self.db.tx()?; - if let Some(number) = convert_hash_or_number(&tx, id)? { - return Ok(read_transactions_by_number(&tx, number)?) - } - Ok(None) - } - - fn transactions_by_block_range( - &self, - range: impl RangeBounds, - ) -> Result>> { - let tx = self.db.tx()?; - let mut results = Vec::default(); - let mut body_cursor = tx.cursor_read::()?; - let mut tx_cursor = tx.cursor_read::()?; - for entry in body_cursor.walk_range(range)? { - let (_, body) = entry?; - let tx_num_range = body.tx_num_range(); - if tx_num_range.is_empty() { - results.push(Vec::default()); - } else { - results.push( - tx_cursor - .walk_range(tx_num_range)? - .map(|result| result.map(|(_, tx)| tx.into())) - .collect::, _>>()?, - ); - } - } - Ok(results) - } -} - -impl ReceiptProvider for ShareableDatabase { - fn receipt(&self, id: TxNumber) -> Result> { - self.db.view(|tx| tx.get::(id))?.map_err(Into::into) - } - - fn receipt_by_hash(&self, hash: TxHash) -> Result> { - self.db - .view(|tx| { - if let Some(id) = tx.get::(hash)? { - tx.get::(id) - } else { - Ok(None) - } - })? - .map_err(Into::into) - } - - fn receipts_by_block(&self, block: BlockHashOrNumber) -> Result>> { - let tx = self.db.tx()?; - if let Some(number) = convert_hash_or_number(&tx, block)? { - if let Some(body) = tx.get::(number)? { - let tx_range = body.tx_num_range(); - return if tx_range.is_empty() { - Ok(Some(Vec::new())) - } else { - let mut tx_cursor = tx.cursor_read::()?; - let transactions = tx_cursor - .walk_range(tx_range)? - .map(|result| result.map(|(_, tx)| tx)) - .collect::, _>>()?; - Ok(Some(transactions)) - } - } - } - Ok(None) - } -} - -impl WithdrawalsProvider for ShareableDatabase { - fn withdrawals_by_block( - &self, - id: BlockHashOrNumber, - timestamp: u64, - ) -> Result>> { - if self.chain_spec.is_shanghai_activated_at_timestamp(timestamp) { - let tx = self.db.tx()?; - if let Some(number) = convert_hash_or_number(&tx, id)? { - // If we are past shanghai, then all blocks should have a withdrawal list, even if - // empty - let withdrawals = read_withdrawals_by_number(&tx, number)?.unwrap_or_default(); - return Ok(Some(withdrawals)) - } - } - Ok(None) - } - - fn latest_withdrawal(&self) -> Result> { - let latest_block_withdrawal = - self.db.view(|tx| tx.cursor_read::()?.last())?; - latest_block_withdrawal - .map(|block_withdrawal_pair| { - block_withdrawal_pair - .and_then(|(_, block_withdrawal)| block_withdrawal.withdrawals.last().cloned()) - }) - .map_err(Into::into) - } -} - -impl StageCheckpointProvider for ShareableDatabase { - fn get_stage_checkpoint(&self, id: StageId) -> Result> { - Ok(get_stage_checkpoint(&self.db.tx()?, id)?) - } -} - -impl EvmEnvProvider for ShareableDatabase { - fn fill_env_at( - &self, - cfg: &mut CfgEnv, - block_env: &mut BlockEnv, - at: BlockHashOrNumber, - ) -> Result<()> { - let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; - let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; - self.fill_env_with_header(cfg, block_env, &header) - } - - fn fill_env_with_header( - &self, - cfg: &mut CfgEnv, - block_env: &mut BlockEnv, - header: &Header, - ) -> Result<()> { - let total_difficulty = self - .header_td_by_number(header.number)? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; - fill_cfg_and_block_env(cfg, block_env, &self.chain_spec, header, total_difficulty); - Ok(()) - } - - fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockHashOrNumber) -> Result<()> { - let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; - let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; - - self.fill_block_env_with_header(block_env, &header) - } - - fn fill_block_env_with_header(&self, block_env: &mut BlockEnv, header: &Header) -> Result<()> { - let total_difficulty = self - .header_td_by_number(header.number)? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; - let spec_id = revm_spec( - &self.chain_spec, - Head { - number: header.number, - timestamp: header.timestamp, - difficulty: header.difficulty, - total_difficulty, - // Not required - hash: Default::default(), - }, - ); - let after_merge = spec_id >= SpecId::MERGE; - fill_block_env(block_env, &self.chain_spec, header, after_merge); - Ok(()) - } - - fn fill_cfg_env_at(&self, cfg: &mut CfgEnv, at: BlockHashOrNumber) -> Result<()> { - let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; - let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; - self.fill_cfg_env_with_header(cfg, &header) - } - - fn fill_cfg_env_with_header(&self, cfg: &mut CfgEnv, header: &Header) -> Result<()> { - let total_difficulty = self - .header_td_by_number(header.number)? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; - fill_cfg_env(cfg, &self.chain_spec, header, total_difficulty); - Ok(()) - } -} - -/// Returns the block number for the given block hash or number. -#[inline] -fn convert_hash_or_number<'a, TX>( - tx: &TX, - block: BlockHashOrNumber, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - match block { - BlockHashOrNumber::Hash(hash) => read_block_number(tx, hash), - BlockHashOrNumber::Number(number) => Ok(Some(number)), - } -} - -/// Reads the number for the given block hash. -#[inline] -fn read_block_number<'a, TX>( - tx: &TX, - hash: H256, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::(hash) -} - -/// Reads the hash for the given block number -/// -/// Returns an error if no matching entry is found. -#[inline] -fn read_header_hash<'a, TX>( - tx: &TX, - number: u64, -) -> std::result::Result -where - TX: DbTx<'a> + Send + Sync, -{ - match tx.get::(number)? { - Some(hash) => Ok(hash), - None => Err(ProviderError::HeaderNotFound(number.into()).into()), - } -} - -/// Fetches the Withdrawals that belong to the given block number -#[inline] -fn read_transactions_by_number<'a, TX>( - tx: &TX, - block_number: u64, -) -> std::result::Result>, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - if let Some(body) = tx.get::(block_number)? { - let tx_range = body.tx_num_range(); - return if tx_range.is_empty() { - Ok(Some(Vec::new())) - } else { - let mut tx_cursor = tx.cursor_read::()?; - let transactions = tx_cursor - .walk_range(tx_range)? - .map(|result| result.map(|(_, tx)| tx.into())) - .collect::, _>>()?; - Ok(Some(transactions)) - } - } - - Ok(None) -} - -/// Fetches the Withdrawals that belong to the given block number -#[inline] -fn read_withdrawals_by_number<'a, TX>( - tx: &TX, - block_number: u64, -) -> std::result::Result>, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::(block_number).map(|w| w.map(|w| w.withdrawals)) -} - -/// Fetches the corresponding header -#[inline] -fn read_header<'a, TX>( - tx: &TX, - block_number: u64, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::(block_number) -} - -/// Fetches Header and its hash -#[inline] -fn read_sealed_header<'a, TX>( - tx: &TX, - block_number: u64, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - let block_hash = match tx.get::(block_number)? { - Some(block_hash) => block_hash, - None => return Ok(None), - }; - match read_header(tx, block_number)? { - Some(header) => Ok(Some((header, block_hash))), - None => Ok(None), - } -} - -/// Fetches checks if the block number is the latest block number. -#[inline] -fn is_latest_block_number<'a, TX>( - tx: &TX, - block_number: BlockNumber, -) -> std::result::Result -where - TX: DbTx<'a> + Send + Sync, -{ - // check if the block number is the best block number - // there's always at least one header in the database (genesis) - let best = best_block_number(tx)?.unwrap_or_default(); - let last = last_canonical_header(tx)?.map(|(last, _)| last).unwrap_or_default(); - Ok(block_number == best && block_number == last) -} - -/// Fetches the best block number from the database. -#[inline] -fn best_block_number<'a, TX>( - tx: &TX, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::("Finish".to_string()) // TODO: - .map(|result| result.map(|checkpoint| checkpoint.block_number)) -} - -/// Fetches the last canonical header from the database. -#[inline] -fn last_canonical_header<'a, TX>( - tx: &TX, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.cursor_read::()?.last() -} - -/// Get checkpoint for the given stage. -#[inline] -pub fn get_stage_checkpoint<'a, TX>( - tx: &TX, - id: StageId, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::(id.to_string()) -} - -#[cfg(test)] -mod tests { - use super::ShareableDatabase; - use crate::BlockNumProvider; - use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; - use reth_primitives::{ChainSpecBuilder, H256}; - use std::sync::Arc; - - #[test] - fn common_history_provider() { - let chain_spec = ChainSpecBuilder::mainnet().build(); - let db = create_test_db::(EnvKind::RW); - let provider = ShareableDatabase::new(db, Arc::new(chain_spec)); - let _ = provider.latest(); - } - - #[test] - fn default_chain_info() { - let chain_spec = ChainSpecBuilder::mainnet().build(); - let db = create_test_db::(EnvKind::RW); - let provider = ShareableDatabase::new(db, Arc::new(chain_spec)); - - let chain_info = provider.chain_info().expect("should be ok"); - assert_eq!(chain_info.best_number, 0); - assert_eq!(chain_info.best_hash, H256::zero()); - } -} diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs new file mode 100644 index 0000000000..61762de4db --- /dev/null +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -0,0 +1,356 @@ +use crate::{ + providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, + traits::{BlockSource, ReceiptProvider}, + BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, HeaderProvider, + ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider, + WithdrawalsProvider, +}; +use reth_db::{database::Database, models::StoredBlockBodyIndices, tables, transaction::DbTx}; +use reth_interfaces::Result; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + Block, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, ChainSpec, Header, Receipt, + SealedBlock, SealedHeader, TransactionMeta, TransactionSigned, TxHash, TxNumber, Withdrawal, + H256, U256, +}; +use reth_revm_primitives::primitives::{BlockEnv, CfgEnv}; +use std::{ops::RangeBounds, sync::Arc}; +use tracing::trace; + +mod provider; +use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW}; + +/// A common provider that fetches data from a database. +/// +/// This provider implements most provider or provider factory traits. +#[derive(Debug)] +pub struct ShareableDatabase { + /// Database + db: DB, + /// Chain spec + chain_spec: Arc, +} + +impl ShareableDatabase { + /// Returns a provider with a created `DbTx` inside, which allows fetching data from the + /// database using different types of providers. Example: [`HeaderProvider`] + /// [`BlockHashProvider`] + pub fn provider(&self) -> Result> { + Ok(DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone())) + } + + /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating + /// data from the database using different types of providers. Example: [`HeaderProvider`] + /// [`BlockHashProvider`] + pub fn provider_rw(&self) -> Result> { + Ok(DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone())) + } +} + +impl ShareableDatabase { + /// create new database provider + pub fn new(db: DB, chain_spec: Arc) -> Self { + Self { db, chain_spec } + } +} + +impl Clone for ShareableDatabase { + fn clone(&self) -> Self { + Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } + } +} + +impl ShareableDatabase { + /// Storage provider for latest block + pub fn latest(&self) -> Result> { + trace!(target: "providers::db", "Returning latest state provider"); + Ok(Box::new(LatestStateProvider::new(self.db.tx()?))) + } + + /// Storage provider for state at that given block + pub fn history_by_block_number( + &self, + mut block_number: BlockNumber, + ) -> Result> { + let provider = self.provider()?; + + if block_number == provider.best_block_number().unwrap_or_default() && + block_number == provider.last_block_number().unwrap_or_default() + { + return Ok(Box::new(LatestStateProvider::new(provider.into_tx()))) + } + + // +1 as the changeset that we want is the one that was applied after this block. + block_number += 1; + + trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number"); + Ok(Box::new(HistoricalStateProvider::new(provider.into_tx(), block_number))) + } + + /// Storage provider for state at that given block hash + pub fn history_by_block_hash(&self, block_hash: BlockHash) -> Result> { + let provider = self.provider()?; + + let mut block_number = provider + .block_number(block_hash)? + .ok_or(ProviderError::BlockHashNotFound(block_hash))?; + + if block_number == provider.best_block_number().unwrap_or_default() && + block_number == provider.last_block_number().unwrap_or_default() + { + return Ok(Box::new(LatestStateProvider::new(provider.into_tx()))) + } + + // +1 as the changeset that we want is the one that was applied after this block. + // as the changeset contains old values. + block_number += 1; + + trace!(target: "providers::db", ?block_hash, "Returning historical state provider for block hash"); + Ok(Box::new(HistoricalStateProvider::new(provider.into_tx(), block_number))) + } +} + +impl HeaderProvider for ShareableDatabase { + fn header(&self, block_hash: &BlockHash) -> Result> { + self.provider()?.header(block_hash) + } + + fn header_by_number(&self, num: BlockNumber) -> Result> { + self.provider()?.header_by_number(num) + } + + fn header_td(&self, hash: &BlockHash) -> Result> { + self.provider()?.header_td(hash) + } + + fn header_td_by_number(&self, number: BlockNumber) -> Result> { + self.provider()?.header_td_by_number(number) + } + + fn headers_range(&self, range: impl RangeBounds) -> Result> { + self.provider()?.headers_range(range) + } + + fn sealed_headers_range( + &self, + range: impl RangeBounds, + ) -> Result> { + self.provider()?.sealed_headers_range(range) + } + + fn sealed_header(&self, number: BlockNumber) -> Result> { + self.provider()?.sealed_header(number) + } +} + +impl BlockHashProvider for ShareableDatabase { + fn block_hash(&self, number: u64) -> Result> { + self.provider()?.block_hash(number) + } + + fn canonical_hashes_range(&self, start: BlockNumber, end: BlockNumber) -> Result> { + self.provider()?.canonical_hashes_range(start, end) + } +} + +impl BlockNumProvider for ShareableDatabase { + fn chain_info(&self) -> Result { + self.provider()?.chain_info() + } + + fn best_block_number(&self) -> Result { + self.provider()?.best_block_number() + } + + fn last_block_number(&self) -> Result { + self.provider()?.last_block_number() + } + + fn block_number(&self, hash: H256) -> Result> { + self.provider()?.block_number(hash) + } +} + +impl BlockProvider for ShareableDatabase { + fn find_block_by_hash(&self, hash: H256, source: BlockSource) -> Result> { + self.provider()?.find_block_by_hash(hash, source) + } + + fn block(&self, id: BlockHashOrNumber) -> Result> { + self.provider()?.block(id) + } + + fn pending_block(&self) -> Result> { + self.provider()?.pending_block() + } + + fn ommers(&self, id: BlockHashOrNumber) -> Result>> { + self.provider()?.ommers(id) + } + + fn block_body_indices(&self, num: u64) -> Result> { + self.provider()?.block_body_indices(num) + } +} + +impl TransactionsProvider for ShareableDatabase { + fn transaction_id(&self, tx_hash: TxHash) -> Result> { + self.provider()?.transaction_id(tx_hash) + } + + fn transaction_by_id(&self, id: TxNumber) -> Result> { + self.provider()?.transaction_by_id(id) + } + + fn transaction_by_hash(&self, hash: TxHash) -> Result> { + self.provider()?.transaction_by_hash(hash) + } + + fn transaction_by_hash_with_meta( + &self, + tx_hash: TxHash, + ) -> Result> { + self.provider()?.transaction_by_hash_with_meta(tx_hash) + } + + fn transaction_block(&self, id: TxNumber) -> Result> { + self.provider()?.transaction_block(id) + } + + fn transactions_by_block( + &self, + id: BlockHashOrNumber, + ) -> Result>> { + self.provider()?.transactions_by_block(id) + } + + fn transactions_by_block_range( + &self, + range: impl RangeBounds, + ) -> Result>> { + self.provider()?.transactions_by_block_range(range) + } +} + +impl ReceiptProvider for ShareableDatabase { + fn receipt(&self, id: TxNumber) -> Result> { + self.provider()?.receipt(id) + } + + fn receipt_by_hash(&self, hash: TxHash) -> Result> { + self.provider()?.receipt_by_hash(hash) + } + + fn receipts_by_block(&self, block: BlockHashOrNumber) -> Result>> { + self.provider()?.receipts_by_block(block) + } +} + +impl WithdrawalsProvider for ShareableDatabase { + fn withdrawals_by_block( + &self, + id: BlockHashOrNumber, + timestamp: u64, + ) -> Result>> { + self.provider()?.withdrawals_by_block(id, timestamp) + } + + fn latest_withdrawal(&self) -> Result> { + self.provider()?.latest_withdrawal() + } +} + +impl StageCheckpointProvider for ShareableDatabase { + fn get_stage_checkpoint(&self, id: StageId) -> Result> { + self.provider()?.get_stage_checkpoint(id) + } +} + +impl EvmEnvProvider for ShareableDatabase { + fn fill_env_at( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + at: BlockHashOrNumber, + ) -> Result<()> { + self.provider()?.fill_env_at(cfg, block_env, at) + } + + fn fill_env_with_header( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + header: &Header, + ) -> Result<()> { + self.provider()?.fill_env_with_header(cfg, block_env, header) + } + + fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockHashOrNumber) -> Result<()> { + self.provider()?.fill_block_env_at(block_env, at) + } + + fn fill_block_env_with_header(&self, block_env: &mut BlockEnv, header: &Header) -> Result<()> { + self.provider()?.fill_block_env_with_header(block_env, header) + } + + fn fill_cfg_env_at(&self, cfg: &mut CfgEnv, at: BlockHashOrNumber) -> Result<()> { + self.provider()?.fill_cfg_env_at(cfg, at) + } + + fn fill_cfg_env_with_header(&self, cfg: &mut CfgEnv, header: &Header) -> Result<()> { + self.provider()?.fill_cfg_env_with_header(cfg, header) + } +} + +/// Get checkpoint for the given stage. +#[inline] +pub fn get_stage_checkpoint<'a, TX>( + tx: &TX, + id: StageId, +) -> std::result::Result, reth_interfaces::db::DatabaseError> +where + TX: DbTx<'a> + Send + Sync, +{ + tx.get::(id.to_string()) +} + +#[cfg(test)] +mod tests { + use super::ShareableDatabase; + use crate::{BlockHashProvider, BlockNumProvider}; + use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; + use reth_primitives::{ChainSpecBuilder, H256}; + use std::sync::Arc; + + #[test] + fn common_history_provider() { + let chain_spec = ChainSpecBuilder::mainnet().build(); + let db = create_test_db::(EnvKind::RW); + let provider = ShareableDatabase::new(db, Arc::new(chain_spec)); + let _ = provider.latest(); + } + + #[test] + fn default_chain_info() { + let chain_spec = ChainSpecBuilder::mainnet().build(); + let db = create_test_db::(EnvKind::RW); + let db = ShareableDatabase::new(db, Arc::new(chain_spec)); + let provider = db.provider().unwrap(); + + let chain_info = provider.chain_info().expect("should be ok"); + assert_eq!(chain_info.best_number, 0); + assert_eq!(chain_info.best_hash, H256::zero()); + } + + #[test] + fn provider_flow() { + let chain_spec = ChainSpecBuilder::mainnet().build(); + let db = create_test_db::(EnvKind::RW); + let db = ShareableDatabase::new(db, Arc::new(chain_spec)); + let provider = db.provider().unwrap(); + provider.block_hash(0).unwrap(); + let provider_rw = db.provider_rw().unwrap(); + provider_rw.block_hash(0).unwrap(); + provider.block_hash(0).unwrap(); + } +} diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs new file mode 100644 index 0000000000..a769c58102 --- /dev/null +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -0,0 +1,461 @@ +use crate::{ + traits::{BlockSource, ReceiptProvider}, + BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, HeaderProvider, + ProviderError, StageCheckpointProvider, TransactionsProvider, WithdrawalsProvider, +}; +use reth_db::{ + cursor::DbCursorRO, + database::DatabaseGAT, + models::StoredBlockBodyIndices, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_interfaces::Result; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + Block, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, ChainSpec, Head, Header, Receipt, + SealedBlock, SealedHeader, TransactionMeta, TransactionSigned, TxHash, TxNumber, Withdrawal, + H256, U256, +}; +use reth_revm_primitives::{ + config::revm_spec, + env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env}, + primitives::{BlockEnv, CfgEnv, SpecId}, +}; +use std::{ops::RangeBounds, sync::Arc}; + +/// A [`DatabaseProvider`] that holds a read-only database transaction. +pub(crate) type DatabaseProviderRO<'this, DB> = + DatabaseProvider<'this, >::TX>; + +/// A [`DatabaseProvider`] that holds a read-write database transaction. +pub(crate) type DatabaseProviderRW<'this, DB> = + DatabaseProvider<'this, >::TXMut>; + +/// A provider struct that fetchs data from the database. +/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashProvider`] +#[derive(Debug)] +pub struct DatabaseProvider<'this, TX> +where + Self: 'this, +{ + /// Database transaction. + tx: TX, + /// Chain spec + chain_spec: Arc, + _phantom_data: std::marker::PhantomData<&'this ()>, +} + +impl<'this, TX: DbTxMut<'this>> DatabaseProvider<'this, TX> { + /// Creates a provider with an inner read-write transaction. + pub fn new_rw(tx: TX, chain_spec: Arc) -> Self { + Self { tx, chain_spec, _phantom_data: std::marker::PhantomData } + } +} + +impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> { + /// Creates a provider with an inner read-only transaction. + pub fn new(tx: TX, chain_spec: Arc) -> Self { + Self { tx, chain_spec, _phantom_data: std::marker::PhantomData } + } + + /// Consume `DbTx` or `DbTxMut`. + pub fn into_tx(self) -> TX { + self.tx + } +} + +impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { + /// Commit database transaction. + pub fn commit(self) -> Result { + Ok(self.tx.commit()?) + } +} + +impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> { + fn header(&self, block_hash: &BlockHash) -> Result> { + if let Some(num) = self.block_number(*block_hash)? { + Ok(self.header_by_number(num)?) + } else { + Ok(None) + } + } + + fn header_by_number(&self, num: BlockNumber) -> Result> { + Ok(self.tx.get::(num)?) + } + + fn header_td(&self, block_hash: &BlockHash) -> Result> { + if let Some(num) = self.block_number(*block_hash)? { + self.header_td_by_number(num) + } else { + Ok(None) + } + } + + fn header_td_by_number(&self, number: BlockNumber) -> Result> { + Ok(self.tx.get::(number)?.map(|td| td.0)) + } + + fn headers_range(&self, range: impl RangeBounds) -> Result> { + let mut cursor = self.tx.cursor_read::()?; + cursor + .walk_range(range)? + .map(|result| result.map(|(_, header)| header).map_err(Into::into)) + .collect::>>() + } + + fn sealed_headers_range( + &self, + range: impl RangeBounds, + ) -> Result> { + let mut headers = vec![]; + for entry in self.tx.cursor_read::()?.walk_range(range)? { + let (number, header) = entry?; + let hash = self + .block_hash(number)? + .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?; + headers.push(header.seal(hash)); + } + Ok(headers) + } + + fn sealed_header(&self, number: BlockNumber) -> Result> { + if let Some(header) = self.header_by_number(number)? { + let hash = self + .block_hash(number)? + .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?; + Ok(Some(header.seal(hash))) + } else { + Ok(None) + } + } +} + +impl<'this, TX: DbTx<'this>> BlockHashProvider for DatabaseProvider<'this, TX> { + fn block_hash(&self, number: u64) -> Result> { + Ok(self.tx.get::(number)?) + } + + fn canonical_hashes_range(&self, start: BlockNumber, end: BlockNumber) -> Result> { + let range = start..end; + let mut cursor = self.tx.cursor_read::()?; + cursor + .walk_range(range)? + .map(|result| result.map(|(_, hash)| hash).map_err(Into::into)) + .collect::>>() + } +} + +impl<'this, TX: DbTx<'this>> BlockNumProvider for DatabaseProvider<'this, TX> { + fn chain_info(&self) -> Result { + let best_number = self.best_block_number()?; + let best_hash = self.block_hash(best_number)?.unwrap_or_default(); + Ok(ChainInfo { best_hash, best_number }) + } + + fn best_block_number(&self) -> Result { + Ok(self + .get_stage_checkpoint(StageId::Finish)? + .map(|checkpoint| checkpoint.block_number) + .unwrap_or_default()) + } + + fn last_block_number(&self) -> Result { + Ok(self.tx.cursor_read::()?.last()?.unwrap_or_default().0) + } + + fn block_number(&self, hash: H256) -> Result> { + Ok(self.tx.get::(hash)?) + } +} + +impl<'this, TX: DbTx<'this>> BlockProvider for DatabaseProvider<'this, TX> { + fn find_block_by_hash(&self, hash: H256, source: BlockSource) -> Result> { + if source.is_database() { + self.block(hash.into()) + } else { + Ok(None) + } + } + + fn block(&self, id: BlockHashOrNumber) -> Result> { + if let Some(number) = self.convert_hash_or_number(id)? { + if let Some(header) = self.header_by_number(number)? { + let withdrawals = self.withdrawals_by_block(number.into(), header.timestamp)?; + let ommers = if withdrawals.is_none() { self.ommers(number.into())? } else { None } + .unwrap_or_default(); + let transactions = self + .transactions_by_block(number.into())? + .ok_or(ProviderError::BlockBodyIndicesNotFound(number))?; + + return Ok(Some(Block { header, body: transactions, ommers, withdrawals })) + } + } + + Ok(None) + } + + fn pending_block(&self) -> Result> { + Ok(None) + } + + fn ommers(&self, id: BlockHashOrNumber) -> Result>> { + if let Some(number) = self.convert_hash_or_number(id)? { + // TODO: this can be optimized to return empty Vec post-merge + let ommers = self.tx.get::(number)?.map(|o| o.ommers); + return Ok(ommers) + } + + Ok(None) + } + + fn block_body_indices(&self, num: u64) -> Result> { + Ok(self.tx.get::(num)?) + } +} + +impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX> { + fn transaction_id(&self, tx_hash: TxHash) -> Result> { + Ok(self.tx.get::(tx_hash)?) + } + + fn transaction_by_id(&self, id: TxNumber) -> Result> { + Ok(self.tx.get::(id)?.map(Into::into)) + } + + fn transaction_by_hash(&self, hash: TxHash) -> Result> { + if let Some(id) = self.transaction_id(hash)? { + Ok(self.transaction_by_id(id)?) + } else { + Ok(None) + } + .map(|tx| tx.map(Into::into)) + } + + fn transaction_by_hash_with_meta( + &self, + tx_hash: TxHash, + ) -> Result> { + if let Some(transaction_id) = self.transaction_id(tx_hash)? { + if let Some(transaction) = self.transaction_by_id(transaction_id)? { + let mut transaction_cursor = self.tx.cursor_read::()?; + if let Some(block_number) = + transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))? + { + if let Some(sealed_header) = self.sealed_header(block_number)? { + let (header, block_hash) = sealed_header.split(); + if let Some(block_body) = self.block_body_indices(block_number)? { + // the index of the tx in the block is the offset: + // len([start..tx_id]) + // SAFETY: `transaction_id` is always `>=` the block's first + // index + let index = transaction_id - block_body.first_tx_num(); + + let meta = TransactionMeta { + tx_hash, + index, + block_hash, + block_number, + base_fee: header.base_fee_per_gas, + }; + + return Ok(Some((transaction, meta))) + } + } + } + } + } + + Ok(None) + } + + fn transaction_block(&self, id: TxNumber) -> Result> { + let mut cursor = self.tx.cursor_read::()?; + Ok(cursor.seek(id)?.map(|(_, bn)| bn)) + } + + fn transactions_by_block( + &self, + id: BlockHashOrNumber, + ) -> Result>> { + if let Some(block_number) = self.convert_hash_or_number(id)? { + if let Some(body) = self.block_body_indices(block_number)? { + let tx_range = body.tx_num_range(); + return if tx_range.is_empty() { + Ok(Some(Vec::new())) + } else { + let mut tx_cursor = self.tx.cursor_read::()?; + let transactions = tx_cursor + .walk_range(tx_range)? + .map(|result| result.map(|(_, tx)| tx.into())) + .collect::, _>>()?; + Ok(Some(transactions)) + } + } + } + Ok(None) + } + + fn transactions_by_block_range( + &self, + range: impl RangeBounds, + ) -> Result>> { + let mut results = Vec::default(); + let mut body_cursor = self.tx.cursor_read::()?; + let mut tx_cursor = self.tx.cursor_read::()?; + for entry in body_cursor.walk_range(range)? { + let (_, body) = entry?; + let tx_num_range = body.tx_num_range(); + if tx_num_range.is_empty() { + results.push(Vec::default()); + } else { + results.push( + tx_cursor + .walk_range(tx_num_range)? + .map(|result| result.map(|(_, tx)| tx.into())) + .collect::, _>>()?, + ); + } + } + Ok(results) + } +} + +impl<'this, TX: DbTx<'this>> ReceiptProvider for DatabaseProvider<'this, TX> { + fn receipt(&self, id: TxNumber) -> Result> { + Ok(self.tx.get::(id)?) + } + + fn receipt_by_hash(&self, hash: TxHash) -> Result> { + if let Some(id) = self.transaction_id(hash)? { + self.receipt(id) + } else { + Ok(None) + } + } + + fn receipts_by_block(&self, block: BlockHashOrNumber) -> Result>> { + if let Some(number) = self.convert_hash_or_number(block)? { + if let Some(body) = self.block_body_indices(number)? { + let tx_range = body.tx_num_range(); + return if tx_range.is_empty() { + Ok(Some(Vec::new())) + } else { + let mut tx_cursor = self.tx.cursor_read::()?; + let transactions = tx_cursor + .walk_range(tx_range)? + .map(|result| result.map(|(_, tx)| tx)) + .collect::, _>>()?; + Ok(Some(transactions)) + } + } + } + Ok(None) + } +} + +impl<'this, TX: DbTx<'this>> WithdrawalsProvider for DatabaseProvider<'this, TX> { + fn withdrawals_by_block( + &self, + id: BlockHashOrNumber, + timestamp: u64, + ) -> Result>> { + if self.chain_spec.is_shanghai_activated_at_timestamp(timestamp) { + if let Some(number) = self.convert_hash_or_number(id)? { + // If we are past shanghai, then all blocks should have a withdrawal list, even if + // empty + let withdrawals = self + .tx + .get::(number) + .map(|w| w.map(|w| w.withdrawals))? + .unwrap_or_default(); + return Ok(Some(withdrawals)) + } + } + Ok(None) + } + + fn latest_withdrawal(&self) -> Result> { + let latest_block_withdrawal = self.tx.cursor_read::()?.last(); + latest_block_withdrawal + .map(|block_withdrawal_pair| { + block_withdrawal_pair + .and_then(|(_, block_withdrawal)| block_withdrawal.withdrawals.last().cloned()) + }) + .map_err(Into::into) + } +} + +impl<'this, TX: DbTx<'this>> EvmEnvProvider for DatabaseProvider<'this, TX> { + fn fill_env_at( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + at: BlockHashOrNumber, + ) -> Result<()> { + let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; + let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; + self.fill_env_with_header(cfg, block_env, &header) + } + + fn fill_env_with_header( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + header: &Header, + ) -> Result<()> { + let total_difficulty = self + .header_td_by_number(header.number)? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; + fill_cfg_and_block_env(cfg, block_env, &self.chain_spec, header, total_difficulty); + Ok(()) + } + + fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockHashOrNumber) -> Result<()> { + let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; + let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; + + self.fill_block_env_with_header(block_env, &header) + } + + fn fill_block_env_with_header(&self, block_env: &mut BlockEnv, header: &Header) -> Result<()> { + let total_difficulty = self + .header_td_by_number(header.number)? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; + let spec_id = revm_spec( + &self.chain_spec, + Head { + number: header.number, + timestamp: header.timestamp, + difficulty: header.difficulty, + total_difficulty, + // Not required + hash: Default::default(), + }, + ); + let after_merge = spec_id >= SpecId::MERGE; + fill_block_env(block_env, &self.chain_spec, header, after_merge); + Ok(()) + } + + fn fill_cfg_env_at(&self, cfg: &mut CfgEnv, at: BlockHashOrNumber) -> Result<()> { + let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; + let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; + self.fill_cfg_env_with_header(cfg, &header) + } + + fn fill_cfg_env_with_header(&self, cfg: &mut CfgEnv, header: &Header) -> Result<()> { + let total_difficulty = self + .header_td_by_number(header.number)? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; + fill_cfg_env(cfg, &self.chain_spec, header, total_difficulty); + Ok(()) + } +} + +impl<'this, TX: DbTx<'this>> StageCheckpointProvider for DatabaseProvider<'this, TX> { + fn get_stage_checkpoint(&self, id: StageId) -> Result> { + Ok(self.tx.get::(id.to_string())?) + } +} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 0887710ea4..3fe7e02d67 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -5,7 +5,7 @@ use crate::{ ReceiptProvider, StageCheckpointProvider, StateProviderBox, StateProviderFactory, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::database::Database; +use reth_db::{database::Database, models::StoredBlockBodyIndices}; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer}, consensus::ForkchoiceState, @@ -68,9 +68,13 @@ where /// Create a new provider using only the database and the tree, fetching the latest header from /// the database to initialize the provider. pub fn new(database: ShareableDatabase, tree: Tree) -> Result { - let best = database.chain_info()?; - match database.header_by_number(best.best_number)? { - Some(header) => Ok(Self::with_latest(database, tree, header.seal(best.best_hash))), + let provider = database.provider()?; + let best: ChainInfo = provider.chain_info()?; + match provider.header_by_number(best.best_number)? { + Some(header) => { + drop(provider); + Ok(Self::with_latest(database, tree, header.seal(best.best_hash))) + } None => Err(Error::Provider(ProviderError::HeaderNotFound(best.best_number.into()))), } } @@ -106,34 +110,34 @@ where Tree: Send + Sync, { fn header(&self, block_hash: &BlockHash) -> Result> { - self.database.header(block_hash) + self.database.provider()?.header(block_hash) } fn header_by_number(&self, num: BlockNumber) -> Result> { - self.database.header_by_number(num) + self.database.provider()?.header_by_number(num) } fn header_td(&self, hash: &BlockHash) -> Result> { - self.database.header_td(hash) + self.database.provider()?.header_td(hash) } fn header_td_by_number(&self, number: BlockNumber) -> Result> { - self.database.header_td_by_number(number) + self.database.provider()?.header_td_by_number(number) } fn headers_range(&self, range: impl RangeBounds) -> Result> { - self.database.headers_range(range) + self.database.provider()?.headers_range(range) } fn sealed_headers_range( &self, range: impl RangeBounds, ) -> Result> { - self.database.sealed_headers_range(range) + self.database.provider()?.sealed_headers_range(range) } fn sealed_header(&self, number: BlockNumber) -> Result> { - self.database.sealed_header(number) + self.database.provider()?.sealed_header(number) } } @@ -143,11 +147,11 @@ where Tree: Send + Sync, { fn block_hash(&self, number: u64) -> Result> { - self.database.block_hash(number) + self.database.provider()?.block_hash(number) } fn canonical_hashes_range(&self, start: BlockNumber, end: BlockNumber) -> Result> { - self.database.canonical_hashes_range(start, end) + self.database.provider()?.canonical_hashes_range(start, end) } } @@ -164,8 +168,12 @@ where Ok(self.chain_info.get_canonical_block_number()) } + fn last_block_number(&self) -> Result { + self.database.provider()?.last_block_number() + } + fn block_number(&self, hash: H256) -> Result> { - self.database.block_number(hash) + self.database.provider()?.block_number(hash) } } @@ -200,12 +208,12 @@ where // hash let mut block = self.tree.block_by_hash(hash).map(|block| block.unseal()); if block.is_none() { - block = self.database.block_by_hash(hash)?; + block = self.database.provider()?.block_by_hash(hash)?; } block } BlockSource::Pending => self.tree.block_by_hash(hash).map(|block| block.unseal()), - BlockSource::Database => self.database.block_by_hash(hash)?, + BlockSource::Database => self.database.provider()?.block_by_hash(hash)?, }; Ok(block) @@ -214,7 +222,7 @@ where fn block(&self, id: BlockHashOrNumber) -> Result> { match id { BlockHashOrNumber::Hash(hash) => self.find_block_by_hash(hash, BlockSource::Any), - BlockHashOrNumber::Number(num) => self.database.block_by_number(num), + BlockHashOrNumber::Number(num) => self.database.provider()?.block_by_number(num), } } @@ -223,7 +231,11 @@ where } fn ommers(&self, id: BlockHashOrNumber) -> Result>> { - self.database.ommers(id) + self.database.provider()?.ommers(id) + } + + fn block_body_indices(&self, num: u64) -> Result> { + self.database.provider()?.block_body_indices(num) } } @@ -233,40 +245,40 @@ where Tree: BlockchainTreeViewer + Send + Sync, { fn transaction_id(&self, tx_hash: TxHash) -> Result> { - self.database.transaction_id(tx_hash) + self.database.provider()?.transaction_id(tx_hash) } fn transaction_by_id(&self, id: TxNumber) -> Result> { - self.database.transaction_by_id(id) + self.database.provider()?.transaction_by_id(id) } fn transaction_by_hash(&self, hash: TxHash) -> Result> { - self.database.transaction_by_hash(hash) + self.database.provider()?.transaction_by_hash(hash) } fn transaction_by_hash_with_meta( &self, tx_hash: TxHash, ) -> Result> { - self.database.transaction_by_hash_with_meta(tx_hash) + self.database.provider()?.transaction_by_hash_with_meta(tx_hash) } fn transaction_block(&self, id: TxNumber) -> Result> { - self.database.transaction_block(id) + self.database.provider()?.transaction_block(id) } fn transactions_by_block( &self, id: BlockHashOrNumber, ) -> Result>> { - self.database.transactions_by_block(id) + self.database.provider()?.transactions_by_block(id) } fn transactions_by_block_range( &self, range: impl RangeBounds, ) -> Result>> { - self.database.transactions_by_block_range(range) + self.database.provider()?.transactions_by_block_range(range) } } @@ -276,15 +288,15 @@ where Tree: Send + Sync, { fn receipt(&self, id: TxNumber) -> Result> { - self.database.receipt(id) + self.database.provider()?.receipt(id) } fn receipt_by_hash(&self, hash: TxHash) -> Result> { - self.database.receipt_by_hash(hash) + self.database.provider()?.receipt_by_hash(hash) } fn receipts_by_block(&self, block: BlockHashOrNumber) -> Result>> { - self.database.receipts_by_block(block) + self.database.provider()?.receipts_by_block(block) } } @@ -298,11 +310,11 @@ where id: BlockHashOrNumber, timestamp: u64, ) -> Result>> { - self.database.withdrawals_by_block(id, timestamp) + self.database.provider()?.withdrawals_by_block(id, timestamp) } fn latest_withdrawal(&self) -> Result> { - self.database.latest_withdrawal() + self.database.provider()?.latest_withdrawal() } } @@ -312,7 +324,7 @@ where Tree: Send + Sync, { fn get_stage_checkpoint(&self, id: StageId) -> Result> { - self.database.get_stage_checkpoint(id) + self.database.provider()?.get_stage_checkpoint(id) } } @@ -327,7 +339,7 @@ where block_env: &mut BlockEnv, at: BlockHashOrNumber, ) -> Result<()> { - self.database.fill_env_at(cfg, block_env, at) + self.database.provider()?.fill_env_at(cfg, block_env, at) } fn fill_env_with_header( @@ -336,23 +348,23 @@ where block_env: &mut BlockEnv, header: &Header, ) -> Result<()> { - self.database.fill_env_with_header(cfg, block_env, header) + self.database.provider()?.fill_env_with_header(cfg, block_env, header) } fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockHashOrNumber) -> Result<()> { - self.database.fill_block_env_at(block_env, at) + self.database.provider()?.fill_block_env_at(block_env, at) } fn fill_block_env_with_header(&self, block_env: &mut BlockEnv, header: &Header) -> Result<()> { - self.database.fill_block_env_with_header(block_env, header) + self.database.provider()?.fill_block_env_with_header(block_env, header) } fn fill_cfg_env_at(&self, cfg: &mut CfgEnv, at: BlockHashOrNumber) -> Result<()> { - self.database.fill_cfg_env_at(cfg, at) + self.database.provider()?.fill_cfg_env_at(cfg, at) } fn fill_cfg_env_with_header(&self, cfg: &mut CfgEnv, header: &Header) -> Result<()> { - self.database.fill_cfg_env_with_header(cfg, header) + self.database.provider()?.fill_cfg_env_with_header(cfg, header) } } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 0eff583b14..b041bd74a5 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -5,6 +5,7 @@ use crate::{ StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, }; use parking_lot::Mutex; +use reth_db::models::StoredBlockBodyIndices; use reth_interfaces::{provider::ProviderError, Result}; use reth_primitives::{ keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, @@ -263,6 +264,10 @@ impl BlockNumProvider for MockEthProvider { .ok_or(ProviderError::BestBlockNotFound)?) } + fn last_block_number(&self) -> Result { + self.best_block_number() + } + fn block_number(&self, hash: H256) -> Result> { let lock = self.blocks.lock(); let num = lock.iter().find_map(|(h, b)| (*h == hash).then_some(b.number)); @@ -304,6 +309,10 @@ impl BlockProvider for MockEthProvider { fn ommers(&self, _id: BlockHashOrNumber) -> Result>> { Ok(None) } + + fn block_body_indices(&self, _num: u64) -> Result> { + Ok(None) + } } impl BlockProviderIdExt for MockEthProvider { diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 012bfb4f55..24310c620a 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -4,6 +4,7 @@ use crate::{ BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointProvider, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, }; +use reth_db::models::StoredBlockBodyIndices; use reth_interfaces::Result; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -39,6 +40,10 @@ impl BlockNumProvider for NoopProvider { Ok(0) } + fn last_block_number(&self) -> Result { + Ok(0) + } + fn block_number(&self, _hash: H256) -> Result> { Ok(None) } @@ -60,6 +65,10 @@ impl BlockProvider for NoopProvider { fn ommers(&self, _id: BlockHashOrNumber) -> Result>> { Ok(None) } + + fn block_body_indices(&self, _num: u64) -> Result> { + Ok(None) + } } impl BlockProviderIdExt for NoopProvider { diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index 68a4f65a89..d8e9ae5470 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -1,6 +1,7 @@ use crate::{ BlockIdProvider, BlockNumProvider, HeaderProvider, ReceiptProvider, TransactionsProvider, }; +use reth_db::models::StoredBlockBodyIndices; use reth_interfaces::Result; use reth_primitives::{ Block, BlockHashOrNumber, BlockId, BlockNumberOrTag, Header, SealedBlock, SealedHeader, H256, @@ -81,6 +82,11 @@ pub trait BlockProvider: fn block_by_number(&self, num: u64) -> Result> { self.block(num.into()) } + + /// Returns the block body indices with matching number from database. + /// + /// Returns `None` if block is not found. + fn block_body_indices(&self, num: u64) -> Result>; } /// Trait extension for `BlockProvider`, for types that implement `BlockId` conversion. diff --git a/crates/storage/provider/src/traits/block_id.rs b/crates/storage/provider/src/traits/block_id.rs index 9105608d0f..f83929acbc 100644 --- a/crates/storage/provider/src/traits/block_id.rs +++ b/crates/storage/provider/src/traits/block_id.rs @@ -14,12 +14,15 @@ pub trait BlockNumProvider: BlockHashProvider + Send + Sync { /// Returns the best block number in the chain. fn best_block_number(&self) -> Result; + /// Returns the last block number associated with the last canonical header in the database. + fn last_block_number(&self) -> Result; + /// Gets the `BlockNumber` for the given hash. Returns `None` if no block with this hash exists. fn block_number(&self, hash: H256) -> Result>; /// Gets the block number for the given `BlockHashOrNumber`. Returns `None` if no block with /// this hash exists. If the `BlockHashOrNumber` is a `Number`, it is returned as is. - fn convert_hash(&self, id: BlockHashOrNumber) -> Result> { + fn convert_hash_or_number(&self, id: BlockHashOrNumber) -> Result> { match id { BlockHashOrNumber::Hash(hash) => self.block_number(hash), BlockHashOrNumber::Number(num) => Ok(Some(num)),