mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(provider): read changesets from static files during unwind (#21528)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -3,7 +3,7 @@ use itertools::Itertools;
|
||||
use reth_config::config::{EtlConfig, HashingConfig};
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbDupCursorRW},
|
||||
models::{BlockNumberAddress, CompactU256},
|
||||
models::CompactU256,
|
||||
table::Decompress,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
@@ -179,7 +179,7 @@ where
|
||||
let (range, unwind_progress, _) =
|
||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||
|
||||
provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
|
||||
provider.unwind_storage_hashing_range(range)?;
|
||||
|
||||
let mut stage_checkpoint =
|
||||
input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
|
||||
@@ -227,7 +227,7 @@ mod tests {
|
||||
use rand::Rng;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRW, DbDupCursorRO},
|
||||
models::StoredBlockBodyIndices,
|
||||
models::{BlockNumberAddress, StoredBlockBodyIndices},
|
||||
};
|
||||
use reth_ethereum_primitives::Block;
|
||||
use reth_primitives_traits::SealedBlock;
|
||||
|
||||
@@ -166,7 +166,7 @@ where
|
||||
let (range, unwind_progress, _) =
|
||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||
|
||||
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;
|
||||
provider.unwind_storage_history_indices_range(range)?;
|
||||
|
||||
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
|
||||
}
|
||||
|
||||
@@ -728,7 +728,7 @@ impl<N: ProviderNodeTypes> StorageChangeSetReader for BlockchainProvider<N> {
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
|
||||
self.consistent_provider()?.storage_changesets_range(range)
|
||||
}
|
||||
|
||||
@@ -1397,7 +1397,7 @@ impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
|
||||
let range = to_range(range);
|
||||
let mut changesets = Vec::new();
|
||||
|
||||
@@ -40,7 +40,8 @@ use reth_db_api::{
|
||||
database::Database,
|
||||
models::{
|
||||
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
|
||||
ShardedKey, StorageBeforeTx, StorageSettings, StoredBlockBodyIndices,
|
||||
BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
|
||||
StoredBlockBodyIndices,
|
||||
},
|
||||
table::Table,
|
||||
tables,
|
||||
@@ -1384,14 +1385,14 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
|
||||
if self.cached_storage_settings().storage_changesets_in_static_files {
|
||||
self.static_file_provider.storage_changesets_range(range)
|
||||
} else {
|
||||
self.tx
|
||||
.cursor_dup_read::<tables::StorageChangeSets>()?
|
||||
.walk_range(BlockNumberAddress::range(range))?
|
||||
.walk_range(BlockNumberAddressRange::from(range))?
|
||||
.map(|r| r.map_err(Into::into))
|
||||
.collect()
|
||||
}
|
||||
@@ -2834,11 +2835,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
|
||||
let changesets = self
|
||||
.tx
|
||||
.cursor_read::<tables::AccountChangeSets>()?
|
||||
.walk_range(range)?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let changesets = self.account_changesets_range(range)?;
|
||||
self.unwind_account_hashing(changesets.iter())
|
||||
}
|
||||
|
||||
@@ -2896,13 +2893,9 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
|
||||
|
||||
fn unwind_storage_hashing_range(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumberAddress>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
|
||||
let changesets = self
|
||||
.tx
|
||||
.cursor_read::<tables::StorageChangeSets>()?
|
||||
.walk_range(range)?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let changesets = self.storage_changesets_range(range)?;
|
||||
self.unwind_storage_hashing(changesets.into_iter())
|
||||
}
|
||||
|
||||
@@ -2997,11 +2990,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<usize> {
|
||||
let changesets = self
|
||||
.tx
|
||||
.cursor_read::<tables::AccountChangeSets>()?
|
||||
.walk_range(range)?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let changesets = self.account_changesets_range(range)?;
|
||||
self.unwind_account_history_indices(changesets.iter())
|
||||
}
|
||||
|
||||
@@ -3063,13 +3052,9 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
|
||||
fn unwind_storage_history_indices_range(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumberAddress>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<usize> {
|
||||
let changesets = self
|
||||
.tx
|
||||
.cursor_read::<tables::StorageChangeSets>()?
|
||||
.walk_range(range)?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let changesets = self.storage_changesets_range(range)?;
|
||||
self.unwind_storage_history_indices(changesets.into_iter())
|
||||
}
|
||||
|
||||
|
||||
@@ -2499,7 +2499,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
|
||||
let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
|
||||
self.walk_storage_changeset_range(range).collect()
|
||||
|
||||
@@ -1025,7 +1025,7 @@ impl<T: NodePrimitives, ChainSpec: Send + Sync> StorageChangeSetReader
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
_range: RangeInclusive<BlockNumber>,
|
||||
_range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(reth_db_api::models::BlockNumberAddress, StorageEntry)>> {
|
||||
Ok(Vec::default())
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ pub trait HashingWriter: Send {
|
||||
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
|
||||
fn unwind_storage_hashing_range(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumberAddress>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;
|
||||
|
||||
/// Iterates over storages and inserts them to hashing table.
|
||||
|
||||
@@ -44,7 +44,7 @@ pub trait HistoryWriter: Send {
|
||||
/// Returns number of changesets walked.
|
||||
fn unwind_storage_history_indices_range(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumberAddress>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<usize>;
|
||||
|
||||
/// Insert storage change index to database. Used inside `StorageHistoryIndex` stage
|
||||
|
||||
@@ -430,7 +430,7 @@ impl<C: Send + Sync, N: NodePrimitives> StorageChangeSetReader for NoopProvider<
|
||||
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
_range: RangeInclusive<BlockNumber>,
|
||||
_range: impl core::ops::RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<
|
||||
Vec<(reth_db_api::models::BlockNumberAddress, reth_primitives_traits::StorageEntry)>,
|
||||
> {
|
||||
|
||||
@@ -3,7 +3,7 @@ use alloc::{
|
||||
vec::Vec,
|
||||
};
|
||||
use alloy_primitives::{Address, BlockNumber, B256};
|
||||
use core::ops::RangeInclusive;
|
||||
use core::ops::{RangeBounds, RangeInclusive};
|
||||
use reth_primitives_traits::StorageEntry;
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
|
||||
@@ -53,11 +53,9 @@ pub trait StorageChangeSetReader: Send {
|
||||
) -> ProviderResult<Option<StorageEntry>>;
|
||||
|
||||
/// Get all storage changesets in a range of blocks.
|
||||
///
|
||||
/// NOTE: Get inclusive range of blocks.
|
||||
fn storage_changesets_range(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(reth_db_api::models::BlockNumberAddress, StorageEntry)>>;
|
||||
|
||||
/// Get the total count of all storage changes.
|
||||
|
||||
Reference in New Issue
Block a user