perf: random changes (#5795)

This commit is contained in:
DaniPopes
2023-12-16 20:11:26 +02:00
committed by GitHub
parent bfe12ce885
commit 27caf7b4ce
5 changed files with 183 additions and 231 deletions

View File

@@ -1809,7 +1809,7 @@ mod tests {
.with_pending_blocks((block2.number + 1, HashSet::new()))
.assert(&tree);
assert!(tree.make_canonical(&block1a_hash).is_ok());
assert_matches!(tree.make_canonical(&block1a_hash), Ok(_));
// Trie state:
// b2a b2 (side chain)
// | /

View File

@@ -120,21 +120,18 @@ impl Receipts {
/// Retrieves gas spent by transactions as a vector of tuples (transaction index, gas used).
pub fn gas_spent_by_tx(&self) -> Result<Vec<(u64, u64)>, PruneSegmentError> {
self.last()
.map(|block_r| {
block_r
.iter()
.enumerate()
.map(|(id, tx_r)| {
if let Some(receipt) = tx_r.as_ref() {
Ok((id as u64, receipt.cumulative_gas_used))
} else {
Err(PruneSegmentError::ReceiptsPruned)
}
})
.collect::<Result<Vec<_>, PruneSegmentError>>()
})
.unwrap_or(Ok(vec![]))
let Some(block_r) = self.last() else {
return Ok(vec![]);
};
let mut out = Vec::with_capacity(block_r.len());
for (id, tx_r) in block_r.iter().enumerate() {
if let Some(receipt) = tx_r.as_ref() {
out.push((id as u64, receipt.cumulative_gas_used));
} else {
return Err(PruneSegmentError::ReceiptsPruned);
}
}
Ok(out)
}
}

View File

@@ -153,7 +153,7 @@ where
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> std::iter::Iterator for Walker<'cursor, T, CURSOR> {
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for Walker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
@@ -227,9 +227,7 @@ impl<'cursor, T: Table, CURSOR: DbCursorRW<T> + DbCursorRO<T>> ReverseWalker<'cu
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> std::iter::Iterator
for ReverseWalker<'cursor, T, CURSOR>
{
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for ReverseWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
@@ -270,10 +268,9 @@ where
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> std::iter::Iterator
for RangeWalker<'cursor, T, CURSOR>
{
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for RangeWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None
@@ -292,11 +289,10 @@ impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> std::iter::Iterator
}
},
Some(res @ Err(_)) => Some(res),
None if matches!(self.end_key, Bound::Unbounded) => {
self.is_done = true;
None => {
self.is_done = matches!(self.end_key, Bound::Unbounded);
None
}
_ => None,
}
}
}
@@ -361,9 +357,7 @@ impl<'cursor, T: DupSort, CURSOR: DbCursorRW<T> + DbDupCursorRO<T>> DupWalker<'c
}
}
impl<'cursor, T: DupSort, CURSOR: DbDupCursorRO<T>> std::iter::Iterator
for DupWalker<'cursor, T, CURSOR>
{
impl<'cursor, T: DupSort, CURSOR: DbDupCursorRO<T>> Iterator for DupWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();

View File

@@ -1,8 +1,5 @@
//! Cursor wrapper for libmdbx-sys.
use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};
use crate::{
common::{PairResult, ValueOnlyResult},
cursor::{
@@ -10,11 +7,13 @@ use crate::{
ReverseWalker, Walker,
},
metrics::{Operation, OperationMetrics},
table::{Compress, DupSort, Encode, Table},
table::{Compress, Decode, Decompress, DupSort, Encode, Table},
tables::utils::*,
DatabaseError,
};
use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation};
use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};
/// Read only Cursor.
pub type CursorRO<T> = Cursor<RO, T>;
@@ -56,12 +55,17 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
}
}
/// Takes `(key, value)` from the database and decodes it appropriately.
#[macro_export]
macro_rules! decode {
($v:expr) => {
$v.map_err(|e| $crate::DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
};
/// Decodes a `(key, value)` pair from the database.
#[allow(clippy::type_complexity)]
pub fn decode<T>(
res: Result<Option<(Cow<'_, [u8]>, Cow<'_, [u8]>)>, impl Into<i32>>,
) -> PairResult<T>
where
T: Table,
T::Key: Decode,
T::Value: Decompress,
{
res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
}
/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
@@ -80,39 +84,36 @@ macro_rules! compress_to_buf_or_ref {
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
fn first(&mut self) -> PairResult<T> {
decode!(self.inner.first())
decode::<T>(self.inner.first())
}
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode!(self.inner.set_key(key.encode().as_ref()))
decode::<T>(self.inner.set_key(key.encode().as_ref()))
}
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode!(self.inner.set_range(key.encode().as_ref()))
decode::<T>(self.inner.set_range(key.encode().as_ref()))
}
fn next(&mut self) -> PairResult<T> {
decode!(self.inner.next())
decode::<T>(self.inner.next())
}
fn prev(&mut self) -> PairResult<T> {
decode!(self.inner.prev())
decode::<T>(self.inner.prev())
}
fn last(&mut self) -> PairResult<T> {
decode!(self.inner.last())
decode::<T>(self.inner.last())
}
fn current(&mut self) -> PairResult<T> {
decode!(self.inner.get_current())
decode::<T>(self.inner.get_current())
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
self.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decoder::<T>)
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
} else {
self.first().transpose()
};
@@ -130,10 +131,8 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
}
Bound::Unbounded => self.inner.first(),
}
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decoder::<T>);
};
let start = decode::<T>(start).transpose();
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
}
@@ -142,7 +141,7 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode!(self.inner.set_range(start_key.encode().as_ref()))
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
} else {
self.last()
}
@@ -155,12 +154,12 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T> {
decode!(self.inner.next_dup())
decode::<T>(self.inner.next_dup())
}
/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T> {
decode!(self.inner.next_nodup())
decode::<T>(self.inner.next_nodup())
}
/// Returns the next `value` of a duplicate `key`.

View File

@@ -50,7 +50,7 @@ use revm::primitives::{BlockEnv, CfgEnv, SpecId};
use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
ops::{Bound, Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::{mpsc, Arc},
time::{Duration, Instant},
};
@@ -112,6 +112,54 @@ impl<TX: DbTxMut> DatabaseProvider<TX> {
}
}
impl<TX: DbTx> DatabaseProvider<TX> {
fn cursor_read_collect<T: Table<Key = u64>, R>(
&self,
range: impl RangeBounds<T::Key>,
mut f: impl FnMut(T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
self.cursor_read_collect_with_key::<T, R>(range, |_, v| f(v))
}
fn cursor_read_collect_with_key<T: Table<Key = u64>, R>(
&self,
range: impl RangeBounds<T::Key>,
f: impl FnMut(T::Key, T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
let capacity = match range_size_hint(&range) {
Some(0) | None => return Ok(Vec::new()),
Some(capacity) => capacity,
};
let mut cursor = self.tx.cursor_read::<T>()?;
self.cursor_collect_with_capacity(&mut cursor, range, capacity, f)
}
fn cursor_collect<T: Table<Key = u64>, R>(
&self,
cursor: &mut impl DbCursorRO<T>,
range: impl RangeBounds<T::Key>,
mut f: impl FnMut(T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
let capacity = range_size_hint(&range).unwrap_or(0);
self.cursor_collect_with_capacity(cursor, range, capacity, |_, v| f(v))
}
fn cursor_collect_with_capacity<T: Table<Key = u64>, R>(
&self,
cursor: &mut impl DbCursorRO<T>,
range: impl RangeBounds<T::Key>,
capacity: usize,
mut f: impl FnMut(T::Key, T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
let mut items = Vec::with_capacity(capacity);
for entry in cursor.walk_range(range)? {
let (key, value) = entry?;
items.push(f(key, value)?);
}
Ok(items)
}
}
/// For a given key, unwind all history shards that are below the given block number.
///
/// S - Sharded key subtype.
@@ -1179,14 +1227,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
// we skip the block.
if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(num)? {
let tx_range = block_body_indices.tx_num_range();
let body = if tx_range.is_empty() {
Vec::new()
} else {
tx_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<Result<Vec<_>, _>>()?
};
let body = self.cursor_collect(&mut tx_cursor, tx_range, |tx| Ok(tx.into()))?;
// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
@@ -1355,19 +1396,14 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
&self,
id: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<TransactionSigned>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
if let Some(block_number) = self.convert_hash_or_number(id)? {
if let Some(body) = self.block_body_indices(block_number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
let transactions = tx_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(transactions))
};
return self
.cursor_read_collect::<tables::Transactions, _>(body.tx_num_range(), |tx| {
Ok(tx.into())
})
.map(Some)
.map_err(Into::into);
}
}
Ok(None)
@@ -1377,48 +1413,25 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<TransactionSigned>>> {
let mut results = Vec::new();
let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
for entry in body_cursor.walk_range(range)? {
let (_, body) = entry?;
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
results.push(Vec::new());
} else {
results.push(
tx_cursor
.walk_range(tx_num_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<Result<Vec<_>, _>>()?,
);
}
}
Ok(results)
self.cursor_read_collect::<tables::BlockBodyIndices, _>(range, |body| {
self.cursor_collect(&mut tx_cursor, body.tx_num_range(), |tx| Ok(tx.into()))
})
.map_err(Into::into)
}
fn transactions_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
Ok(self
.tx
.cursor_read::<tables::Transactions>()?
.walk_range(range)?
.map(|entry| entry.map(|tx| tx.1))
.collect::<Result<Vec<_>, _>>()?)
self.cursor_read_collect::<tables::Transactions, _>(range, Ok).map_err(Into::into)
}
fn senders_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
Ok(self
.tx
.cursor_read::<tables::TxSenders>()?
.walk_range(range)?
.map(|entry| entry.map(|sender| sender.1))
.collect::<Result<Vec<_>, _>>()?)
self.cursor_read_collect::<tables::TxSenders, _>(range, Ok).map_err(Into::into)
}
fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
@@ -1442,17 +1455,10 @@ impl<TX: DbTx> ReceiptProvider for DatabaseProvider<TX> {
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
if let Some(number) = self.convert_hash_or_number(block)? {
if let Some(body) = self.block_body_indices(number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
let mut receipts_cursor = self.tx.cursor_read::<tables::Receipts>()?;
let receipts = receipts_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, receipt)| receipt))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(receipts))
};
return self
.cursor_read_collect::<tables::Receipts, _>(body.tx_num_range(), Ok)
.map(Some)
.map_err(Into::into);
}
}
Ok(None)
@@ -1682,41 +1688,28 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all block changesets and make a list of accounts that have been changed.
// Note that collecting and then reversing the order is necessary to ensure that the
// changes are applied in the correct order.
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.map(|entry| entry.map(|(_, e)| (keccak256(e.address), e.info)))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>();
hashed_accounts
.iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| -> ProviderResult<()> {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
Ok(())
})?;
// Apply values to HashedState, and remove the account if it's None.
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
Ok(hashed_accounts)
}
@@ -1726,24 +1719,15 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;
let hashed_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<B256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);
hashed_accounts.iter().try_for_each(|(hashed_address, account)| -> ProviderResult<()> {
let hashed_accounts =
accounts.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
Ok(())
})?;
}
Ok(hashed_accounts)
}
@@ -1751,54 +1735,36 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
&self,
range: Range<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Aggregate all block changesets and make list of accounts that have been changed.
let hashed_storages = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
let mut changesets = self.tx.cursor_read::<tables::StorageChangeSet>()?;
let mut hashed_storages = changesets
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, B256), U256>,
(BlockNumberAddress((_, address)), storage_entry)| {
accounts.insert((address, storage_entry.key), storage_entry.value);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>();
.map(|entry| {
entry.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
})
.collect::<Result<Vec<_>, _>>()?;
hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> = HashMap::default();
for (hashed_address, hashed_slot) in hashed_storages.keys() {
hashed_storage_keys.entry(*hashed_address).or_default().insert(*hashed_slot);
// Apply values to HashedState, and remove the account if it's None.
let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> = HashMap::new();
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
hashed_storage_keys.entry(hashed_address).or_default().insert(key);
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
}
hashed_storages
.into_iter()
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(|((hashed_address, key), value)| -> ProviderResult<()> {
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})?;
Ok(hashed_storage_keys)
}
@@ -1958,29 +1924,18 @@ impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
&self,
range: Range<BlockNumberAddress>,
) -> ProviderResult<usize> {
let storage_changesets = self
let mut storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.map(|entry| {
entry.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
})
.collect::<Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all storages and get last block number
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, B256), u64>, (index, storage)| {
// we just need address and lowest block number.
accounts.insert((index.address(), storage.key), index.block_number());
accounts
},
);
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
let mut cursor = self.tx.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
for &(address, storage_key, rem_index) in &storage_changesets {
let partial_shard = unwind_history_shards::<_, tables::StorageHistory, _>(
&mut cursor,
StorageShardedKey::last(address, storage_key),
@@ -2001,6 +1956,7 @@ impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
}
}
let changesets = storage_changesets.len();
Ok(changesets)
}
@@ -2008,27 +1964,17 @@ impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<usize> {
let account_changeset = self
let mut last_indices = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.map(|entry| entry.map(|(index, account)| (account.address, index)))
.collect::<Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all account and get last block number
.fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, u64>, (index, account)| {
// we just need address and lowest block number.
accounts.insert(account.address, index);
accounts
});
last_indices.sort_by_key(|(a, _)| *a);
// Unwind the account history index.
let mut cursor = self.tx.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
for &(address, rem_index) in &last_indices {
let partial_shard = unwind_history_shards::<_, tables::AccountHistory, _>(
&mut cursor,
ShardedKey::last(address),
@@ -2046,6 +1992,7 @@ impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
}
}
let changesets = last_indices.len();
Ok(changesets)
}
}
@@ -2125,6 +2072,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
}
trie_updates.flush(&self.tx)?;
}
// get blocks
let blocks = self.get_take_block_range::<TAKE>(chain_spec, range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
@@ -2348,3 +2296,17 @@ impl<TX: DbTxMut> PruneCheckpointWriter for DatabaseProvider<TX> {
Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
}
}
fn range_size_hint(range: &impl RangeBounds<TxNumber>) -> Option<usize> {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start.checked_add(1)?,
Bound::Unbounded => 0,
};
let end = match range.end_bound().cloned() {
Bound::Included(end) => end.saturating_add(1),
Bound::Excluded(end) => end,
Bound::Unbounded => return None,
};
end.checked_sub(start).map(|x| x as _)
}