refactor: use reverse walker (#938)

Co-authored-by: lambdaclass-user <github@lambdaclass.com>
This commit is contained in:
Mariano A. Nicolini
2023-01-20 14:38:26 -03:00
committed by GitHub
parent d86c9890bd
commit 25e9b399f3
9 changed files with 70 additions and 31 deletions

View File

@@ -5,7 +5,7 @@ use std::{
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
cursor::DbCursorRO,
database::{Database, DatabaseGAT},
models::{BlockNumHash, StoredBlockBody},
table::Table,
@@ -196,13 +196,13 @@ where
F: FnMut(T::Key) -> BlockNumber,
{
let mut cursor = self.cursor_write::<T>()?;
let mut entry = cursor.last()?;
while let Some((key, _)) = entry {
if selector(key) <= block {
let mut reverse_walker = cursor.walk_back(None)?;
while let Some(Ok((key, _))) = reverse_walker.next() {
if selector(key.clone()) <= block {
break
}
cursor.delete_current()?;
entry = cursor.prev()?;
self.delete::<T>(key, None)?;
}
Ok(())
}

View File

@@ -193,8 +193,8 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut block_transition_cursor = tx.cursor_write::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
let mut entry = body_cursor.last()?;
while let Some((key, body)) = entry {
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((key, body)) = rev_walker.next().transpose()? {
if key.number() <= input.unwind_to {
break
}
@@ -225,9 +225,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
}
// Delete the current body value
body_cursor.delete_current()?;
// Move the cursor to the previous value
entry = body_cursor.prev()?;
tx.delete::<tables::BlockBodies>(key, None)?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })

View File

@@ -3,7 +3,7 @@ use crate::{
Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
cursor::DbCursorRO,
database::Database,
models::{BlockNumHash, StoredBlockBody, TransitionIdAddress},
tables,
@@ -348,22 +348,20 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
}
// Discard unwinded changesets
let mut entry = account_changeset.last()?;
while let Some((transition_id, _)) = entry {
let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?;
while let Some((transition_id, _)) = rev_acc_changeset_walker.next().transpose()? {
if transition_id < from_transition_rev {
break
}
account_changeset.delete_current()?;
entry = account_changeset.prev()?;
tx.delete::<tables::AccountChangeSet>(transition_id, None)?;
}
let mut entry = storage_changeset.last()?;
while let Some((key, _)) = entry {
let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?;
while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? {
if key.transition_id() < from_transition_rev {
break
}
storage_changeset.delete_current()?;
entry = storage_changeset.prev()?;
tx.delete::<tables::StorageChangeSet>(key, None)?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })

View File

@@ -451,9 +451,10 @@ mod tests {
self.tx.commit(|tx| {
let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSet>()?;
let mut row = changeset_cursor.last()?;
while let Some((tid_address, entry)) = row {
let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
while let Some((tid_address, entry)) = rev_changeset_walker.next().transpose()? {
if tid_address.transition_id() <= target_transition {
break
}
@@ -464,8 +465,6 @@ mod tests {
if entry.value != U256::ZERO {
storage_cursor.append_dup(tid_address.address(), entry)?;
}
row = changeset_cursor.prev()?;
}
Ok(())
})?;

View File

@@ -155,10 +155,9 @@ impl TestTransaction {
{
self.query(|tx| {
let mut cursor = tx.cursor_read::<T>()?;
let mut entry = cursor.last()?;
while let Some((_, value)) = entry {
let mut rev_walker = cursor.walk_back(None)?;
while let Some((_, value)) = rev_walker.next().transpose()? {
assert!(selector(value) <= num);
entry = cursor.prev()?;
}
Ok(())
})

View File

@@ -34,6 +34,16 @@ pub trait DbCursorRO<'tx, T: Table> {
) -> Result<Walker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized;
/// Returns an iterator that walks backwards through the table. If `start_key`
/// is None, starts from the last entry of the table. If it not, starts at a key
/// greater or equal than the key value wrapped inside Some().
fn walk_back<'cursor>(
&'cursor mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized;
}
/// Read only cursor over DupSort table.

View File

@@ -3,7 +3,9 @@ use std::collections::BTreeMap;
use crate::{
common::{PairResult, ValueOnlyResult},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, Walker},
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker,
},
database::{Database, DatabaseGAT},
table::{DupSort, Table},
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
@@ -132,6 +134,16 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock {
{
todo!()
}
fn walk_back<'cursor>(
&'cursor mut self,
_start_key: Option<T::Key>,
) -> Result<ReverseWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
{
todo!()
}
}
impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock {

View File

@@ -37,9 +37,9 @@ pub trait Decode: Send + Sync + Sized + Debug {
}
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].
pub trait Key: Encode + Decode + Ord {}
pub trait Key: Encode + Decode + Ord + Clone {}
impl<T> Key for T where T: Encode + Decode + Ord {}
impl<T> Key for T where T: Encode + Decode + Ord + Clone {}
/// Generic trait that enforces the database value to implement [`Compress`] and [`Decompress`].
pub trait Value: Compress + Decompress + Serialize {}

View File

@@ -3,7 +3,9 @@
use std::{borrow::Cow, marker::PhantomData};
use crate::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, Walker},
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker,
},
table::{Compress, DupSort, Encode, Table},
tables::utils::*,
Error,
@@ -81,6 +83,27 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
Ok(Walker::new(self, start))
}
fn walk_back<'cursor>(
&'cursor mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
{
if let Some(start_key) = start_key {
let start = self
.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>);
return Ok(ReverseWalker::new(self, start))
}
let start = self.last().transpose();
Ok(ReverseWalker::new(self, start))
}
}
impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx, K, T> {