diff --git a/crates/stages/src/db.rs b/crates/stages/src/db.rs index d02606b4ca..54333fff83 100644 --- a/crates/stages/src/db.rs +++ b/crates/stages/src/db.rs @@ -5,7 +5,7 @@ use std::{ }; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::DbCursorRO, database::{Database, DatabaseGAT}, models::{BlockNumHash, StoredBlockBody}, table::Table, @@ -196,13 +196,13 @@ where F: FnMut(T::Key) -> BlockNumber, { let mut cursor = self.cursor_write::()?; - let mut entry = cursor.last()?; - while let Some((key, _)) = entry { - if selector(key) <= block { + let mut reverse_walker = cursor.walk_back(None)?; + + while let Some(Ok((key, _))) = reverse_walker.next() { + if selector(key.clone()) <= block { break } - cursor.delete_current()?; - entry = cursor.prev()?; + self.delete::(key, None)?; } Ok(()) } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 304cebf152..3166983a82 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -193,8 +193,8 @@ impl Stage for BodyStage()?; let mut tx_transition_cursor = tx.cursor_write::()?; - let mut entry = body_cursor.last()?; - while let Some((key, body)) = entry { + let mut rev_walker = body_cursor.walk_back(None)?; + while let Some((key, body)) = rev_walker.next().transpose()? { if key.number() <= input.unwind_to { break } @@ -225,9 +225,7 @@ impl Stage for BodyStage(key, None)?; } Ok(UnwindOutput { stage_progress: input.unwind_to }) diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 43d4aaf2c9..c05aaaf636 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -3,7 +3,7 @@ use crate::{ Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::DbCursorRO, database::Database, models::{BlockNumHash, StoredBlockBody, TransitionIdAddress}, tables, @@ -348,22 +348,20 @@ impl Stage for ExecutionStage { } // Discard unwinded changesets - let mut entry = account_changeset.last()?; - while let Some((transition_id, _)) = entry { + let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?; + while let Some((transition_id, _)) = rev_acc_changeset_walker.next().transpose()? { if transition_id < from_transition_rev { break } - account_changeset.delete_current()?; - entry = account_changeset.prev()?; + tx.delete::(transition_id, None)?; } - let mut entry = storage_changeset.last()?; - while let Some((key, _)) = entry { + let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?; + while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? { if key.transition_id() < from_transition_rev { break } - storage_changeset.delete_current()?; - entry = storage_changeset.prev()?; + tx.delete::(key, None)?; } Ok(UnwindOutput { stage_progress: input.unwind_to }) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 2b84f37069..22bcf5f6f5 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -451,9 +451,10 @@ mod tests { self.tx.commit(|tx| { let mut storage_cursor = tx.cursor_dup_write::()?; let mut changeset_cursor = tx.cursor_dup_read::()?; - let mut row = changeset_cursor.last()?; - while let Some((tid_address, entry)) = row { + let mut rev_changeset_walker = changeset_cursor.walk_back(None)?; + + while let Some((tid_address, entry)) = rev_changeset_walker.next().transpose()? { if tid_address.transition_id() <= target_transition { break } @@ -464,8 +465,6 @@ mod tests { if entry.value != U256::ZERO { storage_cursor.append_dup(tid_address.address(), entry)?; } - - row = changeset_cursor.prev()?; } Ok(()) })?; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 4dc0863bf8..e6019fcbd6 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -155,10 +155,9 @@ impl TestTransaction { { self.query(|tx| { let mut cursor = tx.cursor_read::()?; - let mut entry = cursor.last()?; - while let Some((_, value)) = entry { + let mut rev_walker = cursor.walk_back(None)?; + while let Some((_, value)) = rev_walker.next().transpose()? { assert!(selector(value) <= num); - entry = cursor.prev()?; } Ok(()) }) diff --git a/crates/storage/db/src/abstraction/cursor.rs b/crates/storage/db/src/abstraction/cursor.rs index 8a3414c8c4..4e4c9d058b 100644 --- a/crates/storage/db/src/abstraction/cursor.rs +++ b/crates/storage/db/src/abstraction/cursor.rs @@ -34,6 +34,16 @@ pub trait DbCursorRO<'tx, T: Table> { ) -> Result, Error> where Self: Sized; + + /// Returns an iterator that walks backwards through the table. If `start_key` + /// is None, starts from the last entry of the table. If it not, starts at a key + /// greater or equal than the key value wrapped inside Some(). + fn walk_back<'cursor>( + &'cursor mut self, + start_key: Option, + ) -> Result, Error> + where + Self: Sized; } /// Read only cursor over DupSort table. diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index 39bc11713e..fd30354063 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -3,7 +3,9 @@ use std::collections::BTreeMap; use crate::{ common::{PairResult, ValueOnlyResult}, - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, Walker}, + cursor::{ + DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker, + }, database::{Database, DatabaseGAT}, table::{DupSort, Table}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, @@ -132,6 +134,16 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock { { todo!() } + + fn walk_back<'cursor>( + &'cursor mut self, + _start_key: Option, + ) -> Result, Error> + where + Self: Sized, + { + todo!() + } } impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock { diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index 35e3dc6c6b..851d4e42b3 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -37,9 +37,9 @@ pub trait Decode: Send + Sync + Sized + Debug { } /// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`]. -pub trait Key: Encode + Decode + Ord {} +pub trait Key: Encode + Decode + Ord + Clone {} -impl Key for T where T: Encode + Decode + Ord {} +impl Key for T where T: Encode + Decode + Ord + Clone {} /// Generic trait that enforces the database value to implement [`Compress`] and [`Decompress`]. pub trait Value: Compress + Decompress + Serialize {} diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 61709bdcf3..9225ecc713 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -3,7 +3,9 @@ use std::{borrow::Cow, marker::PhantomData}; use crate::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, Walker}, + cursor::{ + DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker, + }, table::{Compress, DupSort, Encode, Table}, tables::utils::*, Error, @@ -81,6 +83,27 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> Ok(Walker::new(self, start)) } + + fn walk_back<'cursor>( + &'cursor mut self, + start_key: Option, + ) -> Result, Error> + where + Self: Sized, + { + if let Some(start_key) = start_key { + let start = self + .inner + .set_range(start_key.encode().as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(decoder::); + + return Ok(ReverseWalker::new(self, start)) + } + + let start = self.last().transpose(); + Ok(ReverseWalker::new(self, start)) + } } impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx, K, T> {