mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 07:48:19 -05:00
perf(provider): cache storage table cursors
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -9984,6 +9984,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"metrics",
|
||||
"notify",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"rand 0.9.2",
|
||||
"rayon",
|
||||
|
||||
@@ -54,6 +54,7 @@ parking_lot.workspace = true
|
||||
dashmap = { workspace = true, features = ["inline"] }
|
||||
strum.workspace = true
|
||||
eyre.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
# test-utils
|
||||
reth-ethereum-engine-primitives = { workspace = true, optional = true }
|
||||
|
||||
@@ -44,6 +44,7 @@ use tracing::trace;
|
||||
|
||||
mod provider;
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
|
||||
pub(crate) use provider::{PlainStorageCursor, StorageChangesetCursor};
|
||||
|
||||
use super::ProviderNodeTypes;
|
||||
use reth_trie::KeccakKeyHasher;
|
||||
@@ -671,7 +672,7 @@ mod tests {
|
||||
#[test]
|
||||
fn common_history_provider() {
|
||||
let factory = create_test_provider_factory();
|
||||
let _ = factory.latest();
|
||||
let _ = factory.latest().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::RocksDBProvider,
|
||||
static_file::StaticFileWriter,
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
LowestAvailableBlocks, NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
to_range,
|
||||
traits::{
|
||||
@@ -33,7 +33,8 @@ use alloy_primitives::{
|
||||
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rayon::slice::ParallelSliceMut;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
|
||||
@@ -91,6 +92,13 @@ use tracing::{debug, trace};
|
||||
/// A [`DatabaseProvider`] that holds a read-only database transaction.
|
||||
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
|
||||
|
||||
/// Cached cursor for plain storage state.
|
||||
pub(crate) type PlainStorageCursor<Tx> =
|
||||
Arc<OnceCell<Mutex<<Tx as DbTx>::DupCursor<tables::PlainStorageState>>>>;
|
||||
/// Cached cursor for storage changeset.
|
||||
pub(crate) type StorageChangesetCursor<Tx> =
|
||||
Arc<OnceCell<Mutex<<Tx as DbTx>::DupCursor<tables::StorageChangeSets>>>>;
|
||||
|
||||
/// A [`DatabaseProvider`] that holds a read-write database transaction.
|
||||
///
|
||||
/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
|
||||
@@ -151,8 +159,7 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
|
||||
|
||||
/// A provider struct that fetches data from the database.
|
||||
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
pub struct DatabaseProvider<TX: DbTx, N: NodeTypes> {
|
||||
/// Database transaction.
|
||||
tx: TX,
|
||||
/// Chain spec
|
||||
@@ -169,9 +176,26 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
plain_storage_cursor: PlainStorageCursor<TX>,
|
||||
storage_changeset_cursor: StorageChangesetCursor<TX>,
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx, N: NodeTypes> std::fmt::Debug for DatabaseProvider<TX, N> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DatabaseProvider")
|
||||
.field("tx", &self.tx)
|
||||
.field("chain_spec", &self.chain_spec)
|
||||
.field("static_file_provider", &self.static_file_provider)
|
||||
.field("prune_modes", &self.prune_modes)
|
||||
.field("storage", &self.storage)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("rocksdb_provider", &self.rocksdb_provider)
|
||||
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Returns reference to prune modes.
|
||||
pub const fn prune_modes_ref(&self) -> &PruneModes {
|
||||
&self.prune_modes
|
||||
@@ -182,7 +206,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// State provider for latest state
|
||||
pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
|
||||
trace!(target: "providers::db", "Returning latest state provider");
|
||||
Box::new(LatestStateProviderRef::new(self))
|
||||
Box::new(LatestStateProviderRef::new_with_cursor(self, self.plain_storage_cursor.clone()))
|
||||
}
|
||||
|
||||
/// Storage provider for state at that given block hash
|
||||
@@ -195,7 +219,10 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
if block_number == self.best_block_number().unwrap_or_default() &&
|
||||
block_number == self.last_block_number().unwrap_or_default()
|
||||
{
|
||||
return Ok(Box::new(LatestStateProviderRef::new(self)))
|
||||
return Ok(Box::new(LatestStateProviderRef::new_with_cursor(
|
||||
self,
|
||||
self.plain_storage_cursor.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
// +1 as the changeset that we want is the one that was applied after this block.
|
||||
@@ -206,7 +233,13 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
let storage_history_prune_checkpoint =
|
||||
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
|
||||
|
||||
let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
|
||||
let mut state_provider = HistoricalStateProviderRef::new_with_cursors(
|
||||
self,
|
||||
block_number,
|
||||
LowestAvailableBlocks::default(),
|
||||
self.plain_storage_cursor.clone(),
|
||||
self.storage_changeset_cursor.clone(),
|
||||
);
|
||||
|
||||
// If we pruned account or storage history, we can't return state on every historical block.
|
||||
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
|
||||
@@ -235,11 +268,11 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
|
||||
type Primitives = N::Primitives;
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
|
||||
/// Returns a static file provider
|
||||
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
|
||||
self.static_file_provider.clone()
|
||||
@@ -254,15 +287,15 @@ impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
|
||||
/// Returns the `RocksDB` provider.
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
|
||||
for DatabaseProvider<TX, N>
|
||||
impl<TX: DbTx + Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>>
|
||||
ChainSpecProvider for DatabaseProvider<TX, N>
|
||||
{
|
||||
type ChainSpec = N::ChainSpec;
|
||||
|
||||
@@ -271,9 +304,9 @@ impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> C
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Creates a provider with an inner read-write transaction.
|
||||
pub const fn new_rw(
|
||||
pub fn new_rw(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
@@ -291,11 +324,13 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
plain_storage_cursor: PlainStorageCursor::<TX>::default(),
|
||||
storage_changeset_cursor: StorageChangesetCursor::<TX>::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
|
||||
fn as_ref(&self) -> &Self {
|
||||
self
|
||||
}
|
||||
@@ -440,7 +475,10 @@ impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for Databa
|
||||
let storage_history_prune_checkpoint =
|
||||
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
|
||||
|
||||
let mut state_provider = HistoricalStateProvider::new(self, block_number);
|
||||
let plain_storage_cursor = self.plain_storage_cursor.clone();
|
||||
let storage_cursor = self.storage_changeset_cursor.clone();
|
||||
let mut state_provider =
|
||||
HistoricalStateProvider::new(self, block_number, plain_storage_cursor, storage_cursor);
|
||||
|
||||
// If we pruned account or storage history, we can't return state on every historical block.
|
||||
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
|
||||
@@ -528,7 +566,7 @@ where
|
||||
|
||||
impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
/// Creates a provider with an inner read-only transaction.
|
||||
pub const fn new(
|
||||
pub fn new(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
@@ -546,6 +584,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
plain_storage_cursor: PlainStorageCursor::<TX>::default(),
|
||||
storage_changeset_cursor: StorageChangesetCursor::<TX>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1512,7 +1552,7 @@ impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
|
||||
/// Save stage checkpoint.
|
||||
fn save_stage_checkpoint(
|
||||
&self,
|
||||
@@ -3098,7 +3138,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvide
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
|
||||
fn save_prune_checkpoint(
|
||||
&self,
|
||||
segment: PruneSegment,
|
||||
@@ -3147,7 +3187,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvide
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
|
||||
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
|
||||
Ok(self
|
||||
.tx
|
||||
@@ -3202,13 +3242,13 @@ impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
|
||||
fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
|
||||
self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: Send + Sync, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
|
||||
impl<TX: DbTx + Send + Sync, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
|
||||
fn cached_storage_settings(&self) -> StorageSettings {
|
||||
*self.storage_settings.read()
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use crate::{
|
||||
providers::state::macros::delegate_provider_impls, AccountReader, BlockHashReader,
|
||||
ChangeSetReader, HashedPostStateProvider, ProviderError, StateProvider, StateRootProvider,
|
||||
providers::{
|
||||
state::macros::delegate_provider_impls, PlainStorageCursor, StorageChangesetCursor,
|
||||
},
|
||||
AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError,
|
||||
StateProvider, StateRootProvider,
|
||||
};
|
||||
use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbDupCursorRO},
|
||||
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
|
||||
@@ -43,14 +47,27 @@ use std::fmt::Debug;
|
||||
/// - [`tables::StoragesHistory`]
|
||||
/// - [`tables::AccountChangeSets`]
|
||||
/// - [`tables::StorageChangeSets`]
|
||||
#[derive(Debug)]
|
||||
pub struct HistoricalStateProviderRef<'b, Provider> {
|
||||
pub struct HistoricalStateProviderRef<'b, Provider: DBProvider> {
|
||||
/// Database provider
|
||||
provider: &'b Provider,
|
||||
/// Block number is main index for the history state of accounts and storages.
|
||||
block_number: BlockNumber,
|
||||
/// Lowest blocks at which different parts of the state are available.
|
||||
lowest_available_blocks: LowestAvailableBlocks,
|
||||
/// Cached plain storage state cursor.
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
/// Cached storage changeset cursor.
|
||||
storage_changeset_cursor: StorageChangesetCursor<Provider::Tx>,
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider + Debug> Debug for HistoricalStateProviderRef<'_, Provider> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("HistoricalStateProviderRef")
|
||||
.field("provider", &self.provider)
|
||||
.field("block_number", &self.block_number)
|
||||
.field("lowest_available_blocks", &self.lowest_available_blocks)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
@@ -64,17 +81,62 @@ pub enum HistoryInfo {
|
||||
impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, Provider> {
|
||||
/// Create new `StateProvider` for historical block number
|
||||
pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self {
|
||||
Self { provider, block_number, lowest_available_blocks: Default::default() }
|
||||
Self {
|
||||
provider,
|
||||
block_number,
|
||||
lowest_available_blocks: LowestAvailableBlocks::default(),
|
||||
storage_changeset_cursor: StorageChangesetCursor::<Provider::Tx>::default(),
|
||||
plain_storage_cursor: PlainStorageCursor::<Provider::Tx>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create new `StateProvider` for historical block number and lowest block numbers at which
|
||||
/// account & storage histories are available.
|
||||
pub const fn new_with_lowest_available_blocks(
|
||||
pub fn new_with_lowest_available_blocks(
|
||||
provider: &'b Provider,
|
||||
block_number: BlockNumber,
|
||||
lowest_available_blocks: LowestAvailableBlocks,
|
||||
) -> Self {
|
||||
Self { provider, block_number, lowest_available_blocks }
|
||||
Self {
|
||||
provider,
|
||||
block_number,
|
||||
lowest_available_blocks,
|
||||
storage_changeset_cursor: StorageChangesetCursor::<Provider::Tx>::default(),
|
||||
plain_storage_cursor: PlainStorageCursor::<Provider::Tx>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create new `StateProvider` with shared cursors.
|
||||
pub const fn new_with_cursors(
|
||||
provider: &'b Provider,
|
||||
block_number: BlockNumber,
|
||||
lowest_available_blocks: LowestAvailableBlocks,
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
storage_changeset_cursor: StorageChangesetCursor<Provider::Tx>,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
block_number,
|
||||
lowest_available_blocks,
|
||||
plain_storage_cursor,
|
||||
storage_changeset_cursor,
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_changeset_cursor(
|
||||
&self,
|
||||
) -> ProviderResult<&Mutex<<Provider::Tx as DbTx>::DupCursor<tables::StorageChangeSets>>> {
|
||||
self.storage_changeset_cursor
|
||||
.get_or_try_init(|| self.provider.tx_ref().cursor_dup_read().map(Mutex::new))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn plain_storage_cursor(
|
||||
&self,
|
||||
) -> ProviderResult<&Mutex<<Provider::Tx as DbTx>::DupCursor<tables::PlainStorageState>>> {
|
||||
self.plain_storage_cursor
|
||||
.get_or_try_init(|| self.provider.tx_ref().cursor_dup_read().map(Mutex::new))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Lookup an account in the `AccountsHistory` table
|
||||
@@ -397,7 +459,9 @@ impl<Provider: DBProvider + BlockNumReader> StateProofProvider
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider: Sync> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provider> {
|
||||
impl<Provider: DBProvider + Sync> HashedPostStateProvider
|
||||
for HistoricalStateProviderRef<'_, Provider>
|
||||
{
|
||||
fn hashed_post_state(&self, bundle_state: &revm_database::BundleState) -> HashedPostState {
|
||||
HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
|
||||
}
|
||||
@@ -414,25 +478,30 @@ impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader>
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
match self.storage_history_lookup(address, storage_key)? {
|
||||
HistoryInfo::NotYetWritten => Ok(None),
|
||||
HistoryInfo::InChangeset(changeset_block_number) => Ok(Some(
|
||||
self.tx()
|
||||
.cursor_dup_read::<tables::StorageChangeSets>()?
|
||||
HistoryInfo::InChangeset(changeset_block_number) => {
|
||||
let entry = self
|
||||
.storage_changeset_cursor()?
|
||||
.lock()
|
||||
.seek_by_key_subkey((changeset_block_number, address).into(), storage_key)?
|
||||
.filter(|entry| entry.key == storage_key)
|
||||
.ok_or_else(|| ProviderError::StorageChangesetNotFound {
|
||||
block_number: changeset_block_number,
|
||||
address,
|
||||
storage_key: Box::new(storage_key),
|
||||
})?
|
||||
.value,
|
||||
)),
|
||||
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => Ok(self
|
||||
.tx()
|
||||
.cursor_dup_read::<tables::PlainStorageState>()?
|
||||
.seek_by_key_subkey(address, storage_key)?
|
||||
.filter(|entry| entry.key == storage_key)
|
||||
.map(|entry| entry.value)
|
||||
.or(Some(StorageValue::ZERO))),
|
||||
.filter(|entry| entry.key == storage_key);
|
||||
Ok(Some(
|
||||
entry
|
||||
.ok_or_else(|| ProviderError::StorageChangesetNotFound {
|
||||
block_number: changeset_block_number,
|
||||
address,
|
||||
storage_key: Box::new(storage_key),
|
||||
})?
|
||||
.value,
|
||||
))
|
||||
}
|
||||
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => {
|
||||
let entry = self
|
||||
.plain_storage_cursor()?
|
||||
.lock()
|
||||
.seek_by_key_subkey(address, storage_key)?
|
||||
.filter(|entry| entry.key == storage_key);
|
||||
Ok(entry.map(|e| e.value).or(Some(StorageValue::ZERO)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -448,20 +517,56 @@ impl<Provider: DBProvider + BlockNumReader> BytecodeReader
|
||||
|
||||
/// State provider for a given block number.
|
||||
/// For more detailed description, see [`HistoricalStateProviderRef`].
|
||||
#[derive(Debug)]
|
||||
pub struct HistoricalStateProvider<Provider> {
|
||||
pub struct HistoricalStateProvider<Provider: DBProvider> {
|
||||
/// Database provider.
|
||||
provider: Provider,
|
||||
/// State at the block number is the main indexer of the state.
|
||||
block_number: BlockNumber,
|
||||
/// Lowest blocks at which different parts of the state are available.
|
||||
lowest_available_blocks: LowestAvailableBlocks,
|
||||
/// Cached plain storage state cursor.
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
/// Cached storage changeset cursor.
|
||||
storage_changeset_cursor: StorageChangesetCursor<Provider::Tx>,
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider + Debug> Debug for HistoricalStateProvider<Provider> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("HistoricalStateProvider")
|
||||
.field("provider", &self.provider)
|
||||
.field("block_number", &self.block_number)
|
||||
.field("lowest_available_blocks", &self.lowest_available_blocks)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider + BlockNumReader> HistoricalStateProvider<Provider> {
|
||||
/// Create new `StateProvider` for historical block number
|
||||
pub fn new(provider: Provider, block_number: BlockNumber) -> Self {
|
||||
Self { provider, block_number, lowest_available_blocks: Default::default() }
|
||||
pub fn new(
|
||||
provider: Provider,
|
||||
block_number: BlockNumber,
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
storage_changeset_cursor: StorageChangesetCursor<Provider::Tx>,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
block_number,
|
||||
lowest_available_blocks: LowestAvailableBlocks::default(),
|
||||
plain_storage_cursor,
|
||||
storage_changeset_cursor,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`HistoricalStateProviderRef`] that shares the cached cursors with this provider.
|
||||
#[inline(always)]
|
||||
pub fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
|
||||
HistoricalStateProviderRef::new_with_cursors(
|
||||
&self.provider,
|
||||
self.block_number,
|
||||
self.lowest_available_blocks,
|
||||
self.plain_storage_cursor.clone(),
|
||||
self.storage_changeset_cursor.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Set the lowest block number at which the account history is available.
|
||||
@@ -481,16 +586,6 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProvider<Provider> {
|
||||
self.lowest_available_blocks.storage_history_block_number = Some(block_number);
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns a new provider that takes the `TX` as reference
|
||||
#[inline(always)]
|
||||
const fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
|
||||
HistoricalStateProviderRef::new_with_lowest_available_blocks(
|
||||
&self.provider,
|
||||
self.block_number,
|
||||
self.lowest_available_blocks,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Delegates all provider impls to [HistoricalStateProviderRef]
|
||||
@@ -527,7 +622,7 @@ impl LowestAvailableBlocks {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
providers::state::historical::{HistoryInfo, LowestAvailableBlocks},
|
||||
providers::{state::historical::HistoryInfo, LowestAvailableBlocks},
|
||||
test_utils::create_test_provider_factory,
|
||||
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
|
||||
};
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use crate::{
|
||||
providers::state::macros::delegate_provider_impls, AccountReader, BlockHashReader,
|
||||
HashedPostStateProvider, StateProvider, StateRootProvider,
|
||||
providers::{state::macros::delegate_provider_impls, PlainStorageCursor},
|
||||
AccountReader, BlockHashReader, HashedPostStateProvider, StateProvider, StateRootProvider,
|
||||
};
|
||||
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx};
|
||||
use reth_primitives_traits::{Account, Bytecode};
|
||||
use reth_storage_api::{BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider};
|
||||
@@ -22,17 +23,45 @@ use reth_trie_db::{
|
||||
/// State provider over latest state that takes tx reference.
|
||||
///
|
||||
/// Wraps a [`DBProvider`] to get access to database.
|
||||
#[derive(Debug)]
|
||||
pub struct LatestStateProviderRef<'b, Provider>(&'b Provider);
|
||||
pub struct LatestStateProviderRef<'b, Provider: DBProvider> {
|
||||
/// Database provider.
|
||||
provider: &'b Provider,
|
||||
/// Optional cached plain storage state cursor (shared with [`LatestStateProvider`]).
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider + std::fmt::Debug> std::fmt::Debug
|
||||
for LatestStateProviderRef<'_, Provider>
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LatestStateProviderRef").field("provider", &self.provider).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'b, Provider: DBProvider> LatestStateProviderRef<'b, Provider> {
|
||||
/// Create new state provider
|
||||
pub const fn new(provider: &'b Provider) -> Self {
|
||||
Self(provider)
|
||||
pub fn new(provider: &'b Provider) -> Self {
|
||||
Self { provider, plain_storage_cursor: PlainStorageCursor::<Provider::Tx>::default() }
|
||||
}
|
||||
|
||||
/// Create new state provider with a shared cursor.
|
||||
pub const fn new_with_cursor(
|
||||
provider: &'b Provider,
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
) -> Self {
|
||||
Self { provider, plain_storage_cursor }
|
||||
}
|
||||
|
||||
fn plain_storage_cursor(
|
||||
&self,
|
||||
) -> ProviderResult<&Mutex<<Provider::Tx as DbTx>::DupCursor<tables::PlainStorageState>>> {
|
||||
self.plain_storage_cursor
|
||||
.get_or_try_init(|| self.provider.tx_ref().cursor_dup_read().map(Mutex::new))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn tx(&self) -> &Provider::Tx {
|
||||
self.0.tx_ref()
|
||||
self.provider.tx_ref()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,10 +72,12 @@ impl<Provider: DBProvider> AccountReader for LatestStateProviderRef<'_, Provider
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider: BlockHashReader> BlockHashReader for LatestStateProviderRef<'_, Provider> {
|
||||
impl<Provider: DBProvider + BlockHashReader> BlockHashReader
|
||||
for LatestStateProviderRef<'_, Provider>
|
||||
{
|
||||
/// Get block hash by number.
|
||||
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
|
||||
self.0.block_hash(number)
|
||||
self.provider.block_hash(number)
|
||||
}
|
||||
|
||||
fn canonical_hashes_range(
|
||||
@@ -54,7 +85,7 @@ impl<Provider: BlockHashReader> BlockHashReader for LatestStateProviderRef<'_, P
|
||||
start: BlockNumber,
|
||||
end: BlockNumber,
|
||||
) -> ProviderResult<Vec<B256>> {
|
||||
self.0.canonical_hashes_range(start, end)
|
||||
self.provider.canonical_hashes_range(start, end)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,8 +193,9 @@ impl<Provider: DBProvider + BlockHashReader> StateProvider
|
||||
account: Address,
|
||||
storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
let mut cursor = self.tx().cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
if let Some(entry) = cursor.seek_by_key_subkey(account, storage_key)? &&
|
||||
// Use the shared cursor
|
||||
if let Some(entry) =
|
||||
self.plain_storage_cursor()?.lock().seek_by_key_subkey(account, storage_key)? &&
|
||||
entry.key == storage_key
|
||||
{
|
||||
return Ok(Some(entry.value))
|
||||
@@ -182,19 +214,29 @@ impl<Provider: DBProvider + BlockHashReader> BytecodeReader
|
||||
}
|
||||
|
||||
/// State provider for the latest state.
|
||||
#[derive(Debug)]
|
||||
pub struct LatestStateProvider<Provider>(Provider);
|
||||
pub struct LatestStateProvider<Provider: DBProvider> {
|
||||
/// Database provider.
|
||||
provider: Provider,
|
||||
/// Cached plain storage state cursor (shared via Arc for ref types).
|
||||
plain_storage_cursor: PlainStorageCursor<Provider::Tx>,
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider + std::fmt::Debug> std::fmt::Debug for LatestStateProvider<Provider> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LatestStateProvider").field("provider", &self.provider).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider: DBProvider> LatestStateProvider<Provider> {
|
||||
/// Create new state provider
|
||||
pub const fn new(db: Provider) -> Self {
|
||||
Self(db)
|
||||
pub fn new(provider: Provider) -> Self {
|
||||
Self { provider, plain_storage_cursor: PlainStorageCursor::<Provider::Tx>::default() }
|
||||
}
|
||||
|
||||
/// Returns a new provider that takes the `TX` as reference
|
||||
#[inline(always)]
|
||||
const fn as_ref(&self) -> LatestStateProviderRef<'_, Provider> {
|
||||
LatestStateProviderRef::new(&self.0)
|
||||
fn as_ref(&self) -> LatestStateProviderRef<'_, Provider> {
|
||||
LatestStateProviderRef::new_with_cursor(&self.provider, self.plain_storage_cursor.clone())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user