diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 8ebe166d08..6ba054cd69 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -717,9 +717,7 @@ mod tests { let mut header_cursor = tx.cursor_read::()?; let mut canonical_cursor = tx.cursor_read::()?; - let walker = canonical_cursor.walk(range.start)?.take_while(|entry| { - entry.as_ref().map(|(num, _)| *num < range.end).unwrap_or_default() - }); + let walker = canonical_cursor.walk_range(range.start..range.end)?; let mut headers = Vec::default(); for entry in walker { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 94d5c31fb9..a64ee9b174 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -104,8 +104,7 @@ impl Stage for ExecutionStage { // get canonical blocks (num,hash) let canonical_batch = canonicals - .walk(start_block)? - .take_while(|entry| entry.as_ref().map(|e| e.0 <= end_block).unwrap_or_default()) + .walk_range(start_block..end_block + 1)? .map(|i| i.map(BlockNumHash)) .collect::, _>>()?; @@ -348,10 +347,7 @@ impl Stage for ExecutionStage { // get all batches for account change // Check if walk and walk_dup would do the same thing let account_changeset_batch = account_changeset - .walk(from_transition_rev)? - .take_while(|item| { - item.as_ref().map(|(num, _)| *num < to_transition_rev).unwrap_or_default() - }) + .walk_range(from_transition_rev..to_transition_rev)? .collect::, _>>()?; // revert all changes to PlainState @@ -365,12 +361,10 @@ impl Stage for ExecutionStage { // get all batches for storage change let storage_changeset_batch = storage_changeset - .walk((from_transition_rev, Address::zero()).into())? - .take_while(|item| { - item.as_ref() - .map(|(key, _)| key.transition_id() < to_transition_rev) - .unwrap_or_default() - }) + .walk_range( + (from_transition_rev, Address::zero()).into().. + (to_transition_rev, Address::zero()).into(), + )? .collect::, _>>()?; // revert all changes to PlainStorage diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 7a06e86ef2..5fb5610b30 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -93,8 +93,7 @@ impl Stage for AccountHashingStage { // Aggregate all transition changesets and and make list of account that have been // changed. tx.cursor_read::()? - .walk(from_transition)? - .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default()) + .walk_range(from_transition..to_transition)? .collect::, _>>()? .into_iter() // fold all account to one set of changed accounts @@ -137,8 +136,7 @@ impl Stage for AccountHashingStage { // Aggregate all transition changesets and and make list of account that have been changed. tx.cursor_read::()? - .walk(from_transition_rev)? - .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default()) + .walk_range(from_transition_rev..to_transition_rev)? .collect::, _>>()? .into_iter() .rev() diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index ecaf9118ff..023947ce3b 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -99,10 +99,10 @@ impl Stage for StorageHashingStage { // Aggregate all transition changesets and and make list of storages that have been // changed. tx.cursor_read::()? - .walk((from_transition, H160::zero()).into())? - .take_while(|res| { - res.as_ref().map(|(k, _)| k.0 .0 < to_transition).unwrap_or_default() - }) + .walk_range( + (from_transition, Address::zero()).into().. + (to_transition, Address::zero()).into(), + )? .collect::, _>>()? .into_iter() // fold all storages and save its old state so we can remove it from HashedStorage @@ -173,12 +173,10 @@ impl Stage for StorageHashingStage { // Aggregate all transition changesets and make list of accounts that have been changed. tx.cursor_read::()? - .walk((from_transition_rev, H160::zero()).into())? - .take_while(|res| { - res.as_ref() - .map(|(TransitionIdAddress((k, _)), _)| *k < to_transition_rev) - .unwrap_or_default() - }) + .walk_range( + (from_transition_rev, Address::zero()).into().. + (to_transition_rev, Address::zero()).into(), + )? .collect::, _>>()? .into_iter() .rev() diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index c49bc853d9..6f18da67ed 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -87,9 +87,7 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor over the transactions let mut tx_cursor = tx.cursor_read::()?; // Walk the transactions from start to end index (inclusive) - let entries = tx_cursor - .walk(start_tx_index)? - .take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default()); + let entries = tx_cursor.walk_range(start_tx_index..end_tx_index + 1)?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); diff --git a/crates/storage/db/src/abstraction/cursor.rs b/crates/storage/db/src/abstraction/cursor.rs index 71fb15ff34..e581afbaa5 100644 --- a/crates/storage/db/src/abstraction/cursor.rs +++ b/crates/storage/db/src/abstraction/cursor.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::{marker::PhantomData, ops::Range}; use crate::{ common::{IterPairResult, PairResult, ValueOnlyResult}, @@ -38,6 +38,15 @@ pub trait DbCursorRO<'tx, T: Table> { where Self: Sized; + /// Returns an iterator starting at a key greater or equal than `start_key` and ending at a key + /// less than `end_key` + fn walk_range<'cursor>( + &'cursor mut self, + range: Range, + ) -> Result, Error> + where + Self: Sized; + /// Returns an iterator that walks backwards through the table. If `start_key` /// is None, starts from the last entry of the table. If it not, starts at a key /// greater or equal than the key value wrapped inside Some(). @@ -185,6 +194,56 @@ impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> std::iter::Iterator } } +/// Provides a range iterator to `Cursor` when handling `Table`. +/// Also check [`Walker`] +pub struct RangeWalker<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> { + /// Cursor to be used to walk through the table. + cursor: &'cursor mut CURSOR, + /// `(key, value)` where to start the walk. + start: IterPairResult, + /// exclusive `key` where to stop the walk. + end_key: T::Key, + /// flag whether is ended + is_done: bool, + /// Phantom data for 'tx. As it is only used for `DbCursorRO`. + _tx_phantom: PhantomData<&'tx T>, +} + +impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> std::iter::Iterator + for RangeWalker<'cursor, 'tx, T, CURSOR> +{ + type Item = Result<(T::Key, T::Value), Error>; + fn next(&mut self) -> Option { + if self.is_done { + return None + } + + let start = self.start.take(); + if start.is_some() { + return start + } + + let res = self.cursor.next().transpose()?; + if let Ok((key, value)) = res { + if key < self.end_key { + Some(Ok((key, value))) + } else { + self.is_done = true; + None + } + } else { + Some(res) + } + } +} + +impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> RangeWalker<'cursor, 'tx, T, CURSOR> { + /// construct RangeWalker + pub fn new(cursor: &'cursor mut CURSOR, start: IterPairResult, end_key: T::Key) -> Self { + Self { cursor, start, end_key, is_done: false, _tx_phantom: std::marker::PhantomData } + } +} + /// Provides an iterator to `Cursor` when handling a `DupSort` table. /// /// Reason why we have two lifetimes is to distinguish between `'cursor` lifetime diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index f6db059d89..23ab62f664 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -1,10 +1,11 @@ //! Mock database -use std::collections::BTreeMap; +use std::{collections::BTreeMap, ops::Range}; use crate::{ common::{PairResult, ValueOnlyResult}, cursor::{ - DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker, + DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, + ReverseWalker, Walker, }, database::{Database, DatabaseGAT}, table::{DupSort, Table}, @@ -139,6 +140,16 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock { todo!() } + fn walk_range<'cursor>( + &'cursor mut self, + _range: Range, + ) -> Result, Error> + where + Self: Sized, + { + todo!() + } + fn walk_back<'cursor>( &'cursor mut self, _start_key: Option, diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index f14a922700..95d1aa0f85 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -1,10 +1,11 @@ //! Cursor wrapper for libmdbx-sys. -use std::{borrow::Cow, marker::PhantomData}; +use std::{borrow::Cow, marker::PhantomData, ops::Range}; use crate::{ cursor::{ - DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker, + DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, + ReverseWalker, Walker, }, table::{Compress, DupSort, Encode, Table}, tables::utils::*, @@ -88,6 +89,22 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> Ok(Walker::new(self, start)) } + fn walk_range<'cursor>( + &'cursor mut self, + range: Range, + ) -> Result, Error> + where + Self: Sized, + { + let start = self + .inner + .set_range(range.start.encode().as_ref()) + .map_err(|e| Error::Read(e.into()))? + .map(decoder::); + + Ok(RangeWalker::new(self, start, range.end)) + } + fn walk_back<'cursor>( &'cursor mut self, start_key: Option, diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 89a36e688f..3f324f9b4c 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -216,6 +216,38 @@ mod tests { assert_eq!(first.1, value, "First next should be put value"); } + #[test] + fn db_cursor_walk_range() { + let db: Arc> = test_utils::create_test_db(EnvKind::RW); + + // PUT (0, 0), (1, 0), (2, 0), (3, 0) + let tx = db.tx_mut().expect(ERROR_INIT_TX); + vec![0, 1, 2, 3] + .into_iter() + .try_for_each(|key| tx.put::(key, H256::zero())) + .expect(ERROR_PUT); + tx.commit().expect(ERROR_COMMIT); + + let tx = db.tx().expect(ERROR_INIT_TX); + let mut cursor = tx.cursor_read::().unwrap(); + + // [1, 3) + let mut walker = cursor.walk_range(1..3).unwrap(); + assert_eq!(walker.next(), Some(Ok((1, H256::zero())))); + assert_eq!(walker.next(), Some(Ok((2, H256::zero())))); + assert_eq!(walker.next(), None); + // next() returns None after walker is done + assert_eq!(walker.next(), None); + + // [2, 4) + let mut walker = cursor.walk_range(2..4).unwrap(); + assert_eq!(walker.next(), Some(Ok((2, H256::zero())))); + assert_eq!(walker.next(), Some(Ok((3, H256::zero())))); + assert_eq!(walker.next(), None); + // next() returns None after walker is done + assert_eq!(walker.next(), None); + } + #[test] fn db_walker() { let db: Arc> = test_utils::create_test_db(EnvKind::RW);