mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 08:37:59 -05:00
feat(storage): accept any RangeBounds to walk_range (#1309)
This commit is contained in:
@@ -92,7 +92,7 @@ impl ExecutionStage {
|
||||
|
||||
// Get block headers and bodies
|
||||
let block_batch = headers_cursor
|
||||
.walk_range(start_block..end_block + 1)?
|
||||
.walk_range(start_block..=end_block)?
|
||||
.map(|entry| -> Result<(Header, U256, StoredBlockBody, Vec<Header>), StageError> {
|
||||
let (number, header) = entry?;
|
||||
let (_, td) = td_cursor
|
||||
@@ -350,8 +350,8 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
|
||||
// get all batches for storage change
|
||||
let storage_changeset_batch = storage_changeset
|
||||
.walk_range(
|
||||
(from_transition_rev, Address::zero()).into()..
|
||||
(to_transition_rev, Address::zero()).into(),
|
||||
TransitionIdAddress((from_transition_rev, Address::zero()))..
|
||||
TransitionIdAddress((to_transition_rev, Address::zero())),
|
||||
)?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
|
||||
@@ -103,8 +103,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
// changed.
|
||||
tx.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk_range(
|
||||
(from_transition, Address::zero()).into()..
|
||||
(to_transition, Address::zero()).into(),
|
||||
TransitionIdAddress((from_transition, Address::zero()))..
|
||||
TransitionIdAddress((to_transition, Address::zero())),
|
||||
)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
@@ -173,8 +173,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
// Aggregate all transition changesets and make list of accounts that have been changed.
|
||||
tx.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk_range(
|
||||
(from_transition_rev, Address::zero()).into()..
|
||||
(to_transition_rev, Address::zero()).into(),
|
||||
TransitionIdAddress((from_transition_rev, Address::zero()))..
|
||||
TransitionIdAddress((to_transition_rev, Address::zero())),
|
||||
)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
|
||||
@@ -75,7 +75,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
// Acquire the cursor over the transactions
|
||||
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
|
||||
// Walk the transactions from start to end index (inclusive)
|
||||
let entries = tx_cursor.walk_range(start_tx_index..end_tx_index + 1)?;
|
||||
let entries = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;
|
||||
|
||||
// Iterate over transactions in chunks
|
||||
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
|
||||
|
||||
@@ -341,8 +341,8 @@ impl DBTrieLoader {
|
||||
|
||||
let mut storage_cursor = tx.cursor_dup_read::<tables::StorageChangeSet>()?;
|
||||
|
||||
let start = (tid_range.start, Address::zero()).into();
|
||||
let end = (tid_range.end, Address::zero()).into();
|
||||
let start = TransitionIdAddress((tid_range.start, Address::zero()));
|
||||
let end = TransitionIdAddress((tid_range.end, Address::zero()));
|
||||
let mut walker = storage_cursor.walk_range(start..end)?;
|
||||
|
||||
while let Some((TransitionIdAddress((_, address)), StorageEntry { key, .. })) =
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::{marker::PhantomData, ops::Range};
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
ops::{Bound, RangeBounds},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
common::{IterPairResult, PairResult, ValueOnlyResult},
|
||||
@@ -40,11 +43,10 @@ pub trait DbCursorRO<'tx, T: Table> {
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Returns an iterator starting at a key greater or equal than `start_key` and ending at a key
|
||||
/// less than `end_key`
|
||||
/// Returns an iterator for the keys in the specified range.
|
||||
fn walk_range<'cursor>(
|
||||
&'cursor mut self,
|
||||
range: Range<T::Key>,
|
||||
range: impl RangeBounds<T::Key>,
|
||||
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
@@ -203,8 +205,10 @@ pub struct RangeWalker<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> {
|
||||
cursor: &'cursor mut CURSOR,
|
||||
/// `(key, value)` where to start the walk.
|
||||
start: IterPairResult<T>,
|
||||
/// exclusive `key` where to stop the walk.
|
||||
end_key: T::Key,
|
||||
/// `key` where to start the walk.
|
||||
start_key: Bound<T::Key>,
|
||||
/// `key` where to stop the walk.
|
||||
end_key: Bound<T::Key>,
|
||||
/// flag whether is ended
|
||||
is_done: bool,
|
||||
/// Phantom data for 'tx. As it is only used for `DbCursorRO`.
|
||||
@@ -221,28 +225,46 @@ impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> std::iter::Iterator
|
||||
}
|
||||
|
||||
let start = self.start.take();
|
||||
if start.is_some() {
|
||||
if start.is_some() && matches!(self.start_key, Bound::Included(_) | Bound::Unbounded) {
|
||||
return start
|
||||
}
|
||||
|
||||
let res = self.cursor.next().transpose()?;
|
||||
if let Ok((key, value)) = res {
|
||||
if key < self.end_key {
|
||||
Some(Ok((key, value)))
|
||||
} else {
|
||||
match self.cursor.next().transpose() {
|
||||
Some(Ok((key, value))) => match &self.end_key {
|
||||
Bound::Included(end_key) if &key <= end_key => Some(Ok((key, value))),
|
||||
Bound::Excluded(end_key) if &key < end_key => Some(Ok((key, value))),
|
||||
Bound::Unbounded => Some(Ok((key, value))),
|
||||
_ => {
|
||||
self.is_done = true;
|
||||
None
|
||||
}
|
||||
},
|
||||
Some(res @ Err(_)) => Some(res),
|
||||
None if matches!(self.end_key, Bound::Unbounded) => {
|
||||
self.is_done = true;
|
||||
None
|
||||
}
|
||||
} else {
|
||||
Some(res)
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> RangeWalker<'cursor, 'tx, T, CURSOR> {
|
||||
/// construct RangeWalker
|
||||
pub fn new(cursor: &'cursor mut CURSOR, start: IterPairResult<T>, end_key: T::Key) -> Self {
|
||||
Self { cursor, start, end_key, is_done: false, _tx_phantom: std::marker::PhantomData }
|
||||
pub fn new(
|
||||
cursor: &'cursor mut CURSOR,
|
||||
start: IterPairResult<T>,
|
||||
start_key: Bound<T::Key>,
|
||||
end_key: Bound<T::Key>,
|
||||
) -> Self {
|
||||
Self {
|
||||
cursor,
|
||||
start,
|
||||
start_key,
|
||||
end_key,
|
||||
is_done: false,
|
||||
_tx_phantom: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! Mock database
|
||||
use std::{collections::BTreeMap, ops::Range};
|
||||
use std::{collections::BTreeMap, ops::RangeBounds};
|
||||
|
||||
use crate::{
|
||||
common::{PairResult, ValueOnlyResult},
|
||||
@@ -142,7 +142,7 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock {
|
||||
|
||||
fn walk_range<'cursor>(
|
||||
&'cursor mut self,
|
||||
_range: Range<T::Key>,
|
||||
_range: impl RangeBounds<T::Key>,
|
||||
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
|
||||
where
|
||||
Self: Sized,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Cursor wrapper for libmdbx-sys.
|
||||
|
||||
use std::{borrow::Cow, marker::PhantomData, ops::Range};
|
||||
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};
|
||||
|
||||
use crate::{
|
||||
cursor::{
|
||||
@@ -94,18 +94,26 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
|
||||
|
||||
fn walk_range<'cursor>(
|
||||
&'cursor mut self,
|
||||
range: Range<T::Key>,
|
||||
range: impl RangeBounds<T::Key>,
|
||||
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let start = self
|
||||
.inner
|
||||
.set_range(range.start.encode().as_ref())
|
||||
.map_err(|e| Error::Read(e.into()))?
|
||||
.map(decoder::<T>);
|
||||
let start = match range.start_bound().cloned() {
|
||||
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
|
||||
Bound::Excluded(key) => {
|
||||
self.inner
|
||||
.set_range(key.encode().as_ref())
|
||||
.map_err(|e| Error::Read(e.into()))?
|
||||
.map(decoder::<T>);
|
||||
self.inner.next()
|
||||
}
|
||||
Bound::Unbounded => self.inner.first(),
|
||||
}
|
||||
.map_err(|e| Error::Read(e.into()))?
|
||||
.map(decoder::<T>);
|
||||
|
||||
Ok(RangeWalker::new(self, start, range.end))
|
||||
Ok(RangeWalker::new(self, start, range.start_bound().cloned(), range.end_bound().cloned()))
|
||||
}
|
||||
|
||||
fn walk_back<'cursor>(
|
||||
|
||||
@@ -239,6 +239,21 @@ mod tests {
|
||||
// next() returns None after walker is done
|
||||
assert_eq!(walker.next(), None);
|
||||
|
||||
// [1, 2]
|
||||
let mut walker = cursor.walk_range(1..=2).unwrap();
|
||||
assert_eq!(walker.next(), Some(Ok((1, H256::zero()))));
|
||||
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
|
||||
// next() returns None after walker is done
|
||||
assert_eq!(walker.next(), None);
|
||||
|
||||
// [1, ∞)
|
||||
let mut walker = cursor.walk_range(1..).unwrap();
|
||||
assert_eq!(walker.next(), Some(Ok((1, H256::zero()))));
|
||||
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
|
||||
assert_eq!(walker.next(), Some(Ok((3, H256::zero()))));
|
||||
// next() returns None after walker is done
|
||||
assert_eq!(walker.next(), None);
|
||||
|
||||
// [2, 4)
|
||||
let mut walker = cursor.walk_range(2..4).unwrap();
|
||||
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
|
||||
@@ -246,6 +261,14 @@ mod tests {
|
||||
assert_eq!(walker.next(), None);
|
||||
// next() returns None after walker is done
|
||||
assert_eq!(walker.next(), None);
|
||||
|
||||
// (∞, 3)
|
||||
let mut walker = cursor.walk_range(..3).unwrap();
|
||||
assert_eq!(walker.next(), Some(Ok((0, H256::zero()))));
|
||||
assert_eq!(walker.next(), Some(Ok((1, H256::zero()))));
|
||||
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
|
||||
// next() returns None after walker is done
|
||||
assert_eq!(walker.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user