mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat: OverlayStateProvider (#18822)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -123,21 +123,21 @@ fn verify_and_repair<N: ProviderNodeTypes>(
|
||||
// Check that a pipeline sync isn't in progress.
|
||||
verify_checkpoints(provider_rw.as_ref())?;
|
||||
|
||||
// Create cursors for making modifications with
|
||||
let tx = provider_rw.tx_mut();
|
||||
tx.disable_long_read_transaction_safety();
|
||||
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
|
||||
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
|
||||
// Create the hashed cursor factory
|
||||
// Create the cursor factories. These cannot accept the `&mut` tx above because they require it
|
||||
// to be AsRef.
|
||||
let tx = provider_rw.tx_ref();
|
||||
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
|
||||
|
||||
// Create the trie cursor factory
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
|
||||
|
||||
// Create the verifier
|
||||
let verifier = Verifier::new(trie_cursor_factory, hashed_cursor_factory)?;
|
||||
|
||||
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
|
||||
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
|
||||
let mut inconsistent_nodes = 0;
|
||||
let start_time = Instant::now();
|
||||
let mut last_progress_time = Instant::now();
|
||||
|
||||
@@ -17,6 +17,7 @@ mod state;
|
||||
pub use state::{
|
||||
historical::{HistoricalStateProvider, HistoricalStateProviderRef, LowestAvailableBlocks},
|
||||
latest::{LatestStateProvider, LatestStateProviderRef},
|
||||
overlay::OverlayStateProvider,
|
||||
};
|
||||
|
||||
mod consistent_view;
|
||||
|
||||
@@ -2,3 +2,4 @@
|
||||
pub(crate) mod historical;
|
||||
pub(crate) mod latest;
|
||||
pub(crate) mod macros;
|
||||
pub(crate) mod overlay;
|
||||
|
||||
111
crates/storage/provider/src/providers/state/overlay.rs
Normal file
111
crates/storage/provider/src/providers/state/overlay.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use alloy_primitives::B256;
|
||||
use reth_db_api::DatabaseError;
|
||||
use reth_storage_api::DBProvider;
|
||||
use reth_trie::{
|
||||
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
|
||||
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
|
||||
updates::TrieUpdatesSorted,
|
||||
HashedPostStateSorted,
|
||||
};
|
||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// State provider with in-memory overlay from trie updates and hashed post state.
|
||||
///
|
||||
/// This provider uses in-memory trie updates and hashed post state as an overlay
|
||||
/// on top of a database provider, implementing [`TrieCursorFactory`] and [`HashedCursorFactory`]
|
||||
/// using the in-memory overlay factories.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OverlayStateProvider<Provider: DBProvider> {
|
||||
/// The in-memory trie cursor factory that wraps the database cursor factory.
|
||||
trie_cursor_factory:
|
||||
InMemoryTrieCursorFactory<DatabaseTrieCursorFactory<Provider::Tx>, Arc<TrieUpdatesSorted>>,
|
||||
/// The hashed cursor factory that wraps the database cursor factory.
|
||||
hashed_cursor_factory: HashedPostStateCursorFactory<
|
||||
DatabaseHashedCursorFactory<Provider::Tx>,
|
||||
Arc<HashedPostStateSorted>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<Provider> OverlayStateProvider<Provider>
|
||||
where
|
||||
Provider: DBProvider + Clone,
|
||||
{
|
||||
/// Create new overlay state provider. The `Provider` must be cloneable, which generally means
|
||||
/// it should be wrapped in an `Arc`.
|
||||
pub fn new(
|
||||
provider: Provider,
|
||||
trie_updates: Arc<TrieUpdatesSorted>,
|
||||
hashed_post_state: Arc<HashedPostStateSorted>,
|
||||
) -> Self {
|
||||
// Create the trie cursor factory
|
||||
let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(provider.clone().into_tx());
|
||||
let trie_cursor_factory =
|
||||
InMemoryTrieCursorFactory::new(db_trie_cursor_factory, trie_updates);
|
||||
|
||||
// Create the hashed cursor factory
|
||||
let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(provider.into_tx());
|
||||
let hashed_cursor_factory =
|
||||
HashedPostStateCursorFactory::new(db_hashed_cursor_factory, hashed_post_state);
|
||||
|
||||
Self { trie_cursor_factory, hashed_cursor_factory }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
|
||||
where
|
||||
Provider: DBProvider + Clone,
|
||||
InMemoryTrieCursorFactory<DatabaseTrieCursorFactory<Provider::Tx>, Arc<TrieUpdatesSorted>>:
|
||||
TrieCursorFactory,
|
||||
{
|
||||
type AccountTrieCursor = <InMemoryTrieCursorFactory<
|
||||
DatabaseTrieCursorFactory<Provider::Tx>,
|
||||
Arc<TrieUpdatesSorted>,
|
||||
> as TrieCursorFactory>::AccountTrieCursor;
|
||||
|
||||
type StorageTrieCursor = <InMemoryTrieCursorFactory<
|
||||
DatabaseTrieCursorFactory<Provider::Tx>,
|
||||
Arc<TrieUpdatesSorted>,
|
||||
> as TrieCursorFactory>::StorageTrieCursor;
|
||||
|
||||
fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor, DatabaseError> {
|
||||
self.trie_cursor_factory.account_trie_cursor()
|
||||
}
|
||||
|
||||
fn storage_trie_cursor(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
) -> Result<Self::StorageTrieCursor, DatabaseError> {
|
||||
self.trie_cursor_factory.storage_trie_cursor(hashed_address)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
|
||||
where
|
||||
Provider: DBProvider + Clone,
|
||||
HashedPostStateCursorFactory<
|
||||
DatabaseHashedCursorFactory<Provider::Tx>,
|
||||
Arc<HashedPostStateSorted>,
|
||||
>: HashedCursorFactory,
|
||||
{
|
||||
type AccountCursor = <HashedPostStateCursorFactory<
|
||||
DatabaseHashedCursorFactory<Provider::Tx>,
|
||||
Arc<HashedPostStateSorted>,
|
||||
> as HashedCursorFactory>::AccountCursor;
|
||||
|
||||
type StorageCursor = <HashedPostStateCursorFactory<
|
||||
DatabaseHashedCursorFactory<Provider::Tx>,
|
||||
Arc<HashedPostStateSorted>,
|
||||
> as HashedCursorFactory>::StorageCursor;
|
||||
|
||||
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor, DatabaseError> {
|
||||
self.hashed_cursor_factory.hashed_account_cursor()
|
||||
}
|
||||
|
||||
fn hashed_storage_cursor(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
) -> Result<Self::StorageCursor, DatabaseError> {
|
||||
self.hashed_cursor_factory.hashed_storage_cursor(hashed_address)
|
||||
}
|
||||
}
|
||||
@@ -486,6 +486,12 @@ impl HashedPostStateSorted {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Self> for HashedPostStateSorted {
|
||||
fn as_ref(&self) -> &Self {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Sorted account state optimized for iterating during state trie calculation.
|
||||
#[derive(Clone, Eq, PartialEq, Default, Debug)]
|
||||
pub struct HashedAccountsSorted {
|
||||
|
||||
@@ -449,6 +449,12 @@ impl TrieUpdatesSorted {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Self> for TrieUpdatesSorted {
|
||||
fn as_ref(&self) -> &Self {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Sorted storage trie updates reference used for serializing to file.
|
||||
#[derive(PartialEq, Eq, Clone, Default, Debug)]
|
||||
#[cfg_attr(any(test, feature = "serde"), derive(serde::Serialize))]
|
||||
|
||||
@@ -9,23 +9,17 @@ use reth_primitives_traits::Account;
|
||||
use reth_trie::hashed_cursor::{HashedCursor, HashedCursorFactory, HashedStorageCursor};
|
||||
|
||||
/// A struct wrapping database transaction that implements [`HashedCursorFactory`].
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseHashedCursorFactory<'a, TX>(&'a TX);
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseHashedCursorFactory<T>(T);
|
||||
|
||||
impl<TX> Clone for DatabaseHashedCursorFactory<'_, TX> {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, TX> DatabaseHashedCursorFactory<'a, TX> {
|
||||
impl<T> DatabaseHashedCursorFactory<T> {
|
||||
/// Create new database hashed cursor factory.
|
||||
pub const fn new(tx: &'a TX) -> Self {
|
||||
pub const fn new(tx: T) -> Self {
|
||||
Self(tx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx> HashedCursorFactory for DatabaseHashedCursorFactory<'_, TX> {
|
||||
impl<TX: DbTx> HashedCursorFactory for DatabaseHashedCursorFactory<&TX> {
|
||||
type AccountCursor = DatabaseHashedAccountCursor<<TX as DbTx>::Cursor<tables::HashedAccounts>>;
|
||||
type StorageCursor =
|
||||
DatabaseHashedStorageCursor<<TX as DbTx>::DupCursor<tables::HashedStorages>>;
|
||||
|
||||
@@ -32,7 +32,7 @@ pub trait DatabaseProof<'a, TX> {
|
||||
}
|
||||
|
||||
impl<'a, TX: DbTx> DatabaseProof<'a, TX>
|
||||
for Proof<DatabaseTrieCursorFactory<'a, TX>, DatabaseHashedCursorFactory<'a, TX>>
|
||||
for Proof<DatabaseTrieCursorFactory<&'a TX>, DatabaseHashedCursorFactory<&'a TX>>
|
||||
{
|
||||
/// Create a new [Proof] instance from database transaction.
|
||||
fn from_tx(tx: &'a TX) -> Self {
|
||||
@@ -104,7 +104,7 @@ pub trait DatabaseStorageProof<'a, TX> {
|
||||
}
|
||||
|
||||
impl<'a, TX: DbTx> DatabaseStorageProof<'a, TX>
|
||||
for StorageProof<DatabaseTrieCursorFactory<'a, TX>, DatabaseHashedCursorFactory<'a, TX>>
|
||||
for StorageProof<DatabaseTrieCursorFactory<&'a TX>, DatabaseHashedCursorFactory<&'a TX>>
|
||||
{
|
||||
fn from_tx(tx: &'a TX, address: Address) -> Self {
|
||||
Self::new(DatabaseTrieCursorFactory::new(tx), DatabaseHashedCursorFactory::new(tx), address)
|
||||
|
||||
@@ -136,7 +136,7 @@ pub trait DatabaseHashedPostState<TX>: Sized {
|
||||
}
|
||||
|
||||
impl<'a, TX: DbTx> DatabaseStateRoot<'a, TX>
|
||||
for StateRoot<DatabaseTrieCursorFactory<'a, TX>, DatabaseHashedCursorFactory<'a, TX>>
|
||||
for StateRoot<DatabaseTrieCursorFactory<&'a TX>, DatabaseHashedCursorFactory<&'a TX>>
|
||||
{
|
||||
fn from_tx(tx: &'a TX) -> Self {
|
||||
Self::new(DatabaseTrieCursorFactory::new(tx), DatabaseHashedCursorFactory::new(tx))
|
||||
|
||||
@@ -35,7 +35,7 @@ pub trait DatabaseHashedStorage<TX>: Sized {
|
||||
}
|
||||
|
||||
impl<'a, TX: DbTx> DatabaseStorageRoot<'a, TX>
|
||||
for StorageRoot<DatabaseTrieCursorFactory<'a, TX>, DatabaseHashedCursorFactory<'a, TX>>
|
||||
for StorageRoot<DatabaseTrieCursorFactory<&'a TX>, DatabaseHashedCursorFactory<&'a TX>>
|
||||
{
|
||||
fn from_tx(tx: &'a TX, address: Address) -> Self {
|
||||
Self::new(
|
||||
|
||||
@@ -12,24 +12,20 @@ use reth_trie::{
|
||||
};
|
||||
|
||||
/// Wrapper struct for database transaction implementing trie cursor factory trait.
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseTrieCursorFactory<'a, TX>(&'a TX);
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseTrieCursorFactory<T>(T);
|
||||
|
||||
impl<TX> Clone for DatabaseTrieCursorFactory<'_, TX> {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, TX> DatabaseTrieCursorFactory<'a, TX> {
|
||||
impl<T> DatabaseTrieCursorFactory<T> {
|
||||
/// Create new [`DatabaseTrieCursorFactory`].
|
||||
pub const fn new(tx: &'a TX) -> Self {
|
||||
pub const fn new(tx: T) -> Self {
|
||||
Self(tx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of the trie cursor factory for a database transaction.
|
||||
impl<TX: DbTx> TrieCursorFactory for DatabaseTrieCursorFactory<'_, TX> {
|
||||
impl<TX> TrieCursorFactory for DatabaseTrieCursorFactory<&TX>
|
||||
where
|
||||
TX: DbTx,
|
||||
{
|
||||
type AccountTrieCursor = DatabaseAccountTrieCursor<<TX as DbTx>::Cursor<tables::AccountsTrie>>;
|
||||
type StorageTrieCursor =
|
||||
DatabaseStorageTrieCursor<<TX as DbTx>::DupCursor<tables::StoragesTrie>>;
|
||||
|
||||
@@ -21,7 +21,7 @@ pub trait DatabaseTrieWitness<'a, TX> {
|
||||
}
|
||||
|
||||
impl<'a, TX: DbTx> DatabaseTrieWitness<'a, TX>
|
||||
for TrieWitness<DatabaseTrieCursorFactory<'a, TX>, DatabaseHashedCursorFactory<'a, TX>>
|
||||
for TrieWitness<DatabaseTrieCursorFactory<&'a TX>, DatabaseHashedCursorFactory<&'a TX>>
|
||||
{
|
||||
fn from_tx(tx: &'a TX) -> Self {
|
||||
Self::new(DatabaseTrieCursorFactory::new(tx), DatabaseHashedCursorFactory::new(tx))
|
||||
|
||||
@@ -214,6 +214,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Type alias for the factory tuple returned by `create_factories`
|
||||
type ProofFactories<'a, Tx> = (
|
||||
InMemoryTrieCursorFactory<DatabaseTrieCursorFactory<&'a Tx>, &'a TrieUpdatesSorted>,
|
||||
HashedPostStateCursorFactory<DatabaseHashedCursorFactory<&'a Tx>, &'a HashedPostStateSorted>,
|
||||
);
|
||||
|
||||
/// This contains all information shared between all storage proof instances.
|
||||
#[derive(Debug)]
|
||||
pub struct ProofTaskTx<Tx> {
|
||||
@@ -240,20 +246,15 @@ impl<Tx> ProofTaskTx<Tx>
|
||||
where
|
||||
Tx: DbTx,
|
||||
{
|
||||
fn create_factories(
|
||||
&self,
|
||||
) -> (
|
||||
InMemoryTrieCursorFactory<'_, DatabaseTrieCursorFactory<'_, Tx>>,
|
||||
HashedPostStateCursorFactory<'_, DatabaseHashedCursorFactory<'_, Tx>>,
|
||||
) {
|
||||
fn create_factories(&self) -> ProofFactories<'_, Tx> {
|
||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
||||
DatabaseTrieCursorFactory::new(&self.tx),
|
||||
&self.task_ctx.nodes_sorted,
|
||||
self.task_ctx.nodes_sorted.as_ref(),
|
||||
);
|
||||
|
||||
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
|
||||
DatabaseHashedCursorFactory::new(&self.tx),
|
||||
&self.task_ctx.state_sorted,
|
||||
self.task_ctx.state_sorted.as_ref(),
|
||||
);
|
||||
|
||||
(trie_cursor_factory, hashed_cursor_factory)
|
||||
|
||||
@@ -7,25 +7,29 @@ use reth_trie_common::{HashedAccountsSorted, HashedPostStateSorted, HashedStorag
|
||||
|
||||
/// The hashed cursor factory for the post state.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct HashedPostStateCursorFactory<'a, CF> {
|
||||
pub struct HashedPostStateCursorFactory<CF, T> {
|
||||
cursor_factory: CF,
|
||||
post_state: &'a HashedPostStateSorted,
|
||||
post_state: T,
|
||||
}
|
||||
|
||||
impl<'a, CF> HashedPostStateCursorFactory<'a, CF> {
|
||||
impl<CF, T> HashedPostStateCursorFactory<CF, T> {
|
||||
/// Create a new factory.
|
||||
pub const fn new(cursor_factory: CF, post_state: &'a HashedPostStateSorted) -> Self {
|
||||
pub const fn new(cursor_factory: CF, post_state: T) -> Self {
|
||||
Self { cursor_factory, post_state }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, CF: HashedCursorFactory> HashedCursorFactory for HashedPostStateCursorFactory<'a, CF> {
|
||||
impl<'a, CF, T> HashedCursorFactory for HashedPostStateCursorFactory<CF, &'a T>
|
||||
where
|
||||
CF: HashedCursorFactory,
|
||||
T: AsRef<HashedPostStateSorted>,
|
||||
{
|
||||
type AccountCursor = HashedPostStateAccountCursor<'a, CF::AccountCursor>;
|
||||
type StorageCursor = HashedPostStateStorageCursor<'a, CF::StorageCursor>;
|
||||
|
||||
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor, DatabaseError> {
|
||||
let cursor = self.cursor_factory.hashed_account_cursor()?;
|
||||
Ok(HashedPostStateAccountCursor::new(cursor, &self.post_state.accounts))
|
||||
Ok(HashedPostStateAccountCursor::new(cursor, &self.post_state.as_ref().accounts))
|
||||
}
|
||||
|
||||
fn hashed_storage_cursor(
|
||||
@@ -33,7 +37,10 @@ impl<'a, CF: HashedCursorFactory> HashedCursorFactory for HashedPostStateCursorF
|
||||
hashed_address: B256,
|
||||
) -> Result<Self::StorageCursor, DatabaseError> {
|
||||
let cursor = self.cursor_factory.hashed_storage_cursor(hashed_address)?;
|
||||
Ok(HashedPostStateStorageCursor::new(cursor, self.post_state.storages.get(&hashed_address)))
|
||||
Ok(HashedPostStateStorageCursor::new(
|
||||
cursor,
|
||||
self.post_state.as_ref().storages.get(&hashed_address),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,27 +6,31 @@ use reth_trie_common::{BranchNodeCompact, Nibbles};
|
||||
|
||||
/// The trie cursor factory for the trie updates.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InMemoryTrieCursorFactory<'a, CF> {
|
||||
pub struct InMemoryTrieCursorFactory<CF, T> {
|
||||
/// Underlying trie cursor factory.
|
||||
cursor_factory: CF,
|
||||
/// Reference to sorted trie updates.
|
||||
trie_updates: &'a TrieUpdatesSorted,
|
||||
trie_updates: T,
|
||||
}
|
||||
|
||||
impl<'a, CF> InMemoryTrieCursorFactory<'a, CF> {
|
||||
impl<CF, T> InMemoryTrieCursorFactory<CF, T> {
|
||||
/// Create a new trie cursor factory.
|
||||
pub const fn new(cursor_factory: CF, trie_updates: &'a TrieUpdatesSorted) -> Self {
|
||||
pub const fn new(cursor_factory: CF, trie_updates: T) -> Self {
|
||||
Self { cursor_factory, trie_updates }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, CF: TrieCursorFactory> TrieCursorFactory for InMemoryTrieCursorFactory<'a, CF> {
|
||||
impl<'a, CF, T> TrieCursorFactory for InMemoryTrieCursorFactory<CF, &'a T>
|
||||
where
|
||||
CF: TrieCursorFactory,
|
||||
T: AsRef<TrieUpdatesSorted>,
|
||||
{
|
||||
type AccountTrieCursor = InMemoryTrieCursor<'a, CF::AccountTrieCursor>;
|
||||
type StorageTrieCursor = InMemoryTrieCursor<'a, CF::StorageTrieCursor>;
|
||||
|
||||
fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor, DatabaseError> {
|
||||
let cursor = self.cursor_factory.account_trie_cursor()?;
|
||||
Ok(InMemoryTrieCursor::new(Some(cursor), self.trie_updates.account_nodes_ref()))
|
||||
Ok(InMemoryTrieCursor::new(Some(cursor), self.trie_updates.as_ref().account_nodes_ref()))
|
||||
}
|
||||
|
||||
fn storage_trie_cursor(
|
||||
@@ -36,7 +40,7 @@ impl<'a, CF: TrieCursorFactory> TrieCursorFactory for InMemoryTrieCursorFactory<
|
||||
// if the storage trie has no updates then we use this as the in-memory overlay.
|
||||
static EMPTY_UPDATES: Vec<(Nibbles, Option<BranchNodeCompact>)> = Vec::new();
|
||||
|
||||
let storage_trie_updates = self.trie_updates.storage_tries.get(&hashed_address);
|
||||
let storage_trie_updates = self.trie_updates.as_ref().storage_tries.get(&hashed_address);
|
||||
let (storage_nodes, cleared) = storage_trie_updates
|
||||
.map(|u| (u.storage_nodes_ref(), u.is_deleted()))
|
||||
.unwrap_or((&EMPTY_UPDATES, false));
|
||||
|
||||
Reference in New Issue
Block a user