chore(stages): Helper for walking over table range (#1025)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Kim, JinSan
2023-01-31 12:52:10 +09:00
committed by GitHub
parent be70f810e9
commit d865db49d5
9 changed files with 142 additions and 37 deletions

View File

@@ -717,9 +717,7 @@ mod tests {
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let walker = canonical_cursor.walk(range.start)?.take_while(|entry| {
entry.as_ref().map(|(num, _)| *num < range.end).unwrap_or_default()
});
let walker = canonical_cursor.walk_range(range.start..range.end)?;
let mut headers = Vec::default();
for entry in walker {

View File

@@ -104,8 +104,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// get canonical blocks (num,hash)
let canonical_batch = canonicals
.walk(start_block)?
.take_while(|entry| entry.as_ref().map(|e| e.0 <= end_block).unwrap_or_default())
.walk_range(start_block..end_block + 1)?
.map(|i| i.map(BlockNumHash))
.collect::<Result<Vec<_>, _>>()?;
@@ -348,10 +347,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// get all batches for account change
// Check if walk and walk_dup would do the same thing
let account_changeset_batch = account_changeset
.walk(from_transition_rev)?
.take_while(|item| {
item.as_ref().map(|(num, _)| *num < to_transition_rev).unwrap_or_default()
})
.walk_range(from_transition_rev..to_transition_rev)?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainState
@@ -365,12 +361,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// get all batches for storage change
let storage_changeset_batch = storage_changeset
.walk((from_transition_rev, Address::zero()).into())?
.take_while(|item| {
item.as_ref()
.map(|(key, _)| key.transition_id() < to_transition_rev)
.unwrap_or_default()
})
.walk_range(
(from_transition_rev, Address::zero()).into()..
(to_transition_rev, Address::zero()).into(),
)?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainStorage

View File

@@ -93,8 +93,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// Aggregate all transition changesets and and make list of account that have been
// changed.
tx.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default())
.walk_range(from_transition..to_transition)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// fold all account to one set of changed accounts
@@ -137,8 +136,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// Aggregate all transition changesets and and make list of account that have been changed.
tx.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition_rev)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default())
.walk_range(from_transition_rev..to_transition_rev)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()

View File

@@ -99,10 +99,10 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// Aggregate all transition changesets and and make list of storages that have been
// changed.
tx.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition, H160::zero()).into())?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.0 .0 < to_transition).unwrap_or_default()
})
.walk_range(
(from_transition, Address::zero()).into()..
(to_transition, Address::zero()).into(),
)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// fold all storages and save its old state so we can remove it from HashedStorage
@@ -173,12 +173,10 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// Aggregate all transition changesets and make list of accounts that have been changed.
tx.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition_rev, H160::zero()).into())?
.take_while(|res| {
res.as_ref()
.map(|(TransitionIdAddress((k, _)), _)| *k < to_transition_rev)
.unwrap_or_default()
})
.walk_range(
(from_transition_rev, Address::zero()).into()..
(to_transition_rev, Address::zero()).into(),
)?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()

View File

@@ -87,9 +87,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Walk the transactions from start to end index (inclusive)
let entries = tx_cursor
.walk(start_tx_index)?
.take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default());
let entries = tx_cursor.walk_range(start_tx_index..end_tx_index + 1)?;
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");

View File

@@ -1,4 +1,4 @@
use std::marker::PhantomData;
use std::{marker::PhantomData, ops::Range};
use crate::{
common::{IterPairResult, PairResult, ValueOnlyResult},
@@ -38,6 +38,15 @@ pub trait DbCursorRO<'tx, T: Table> {
where
Self: Sized;
/// Returns an iterator starting at a key greater or equal than `start_key` and ending at a key
/// less than `end_key`
fn walk_range<'cursor>(
&'cursor mut self,
range: Range<T::Key>,
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized;
/// Returns an iterator that walks backwards through the table. If `start_key`
/// is None, starts from the last entry of the table. If it not, starts at a key
/// greater or equal than the key value wrapped inside Some().
@@ -185,6 +194,56 @@ impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> std::iter::Iterator
}
}
/// Provides a range iterator to `Cursor` when handling `Table`.
/// Also check [`Walker`]
pub struct RangeWalker<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> {
/// Cursor to be used to walk through the table.
cursor: &'cursor mut CURSOR,
/// `(key, value)` where to start the walk.
start: IterPairResult<T>,
/// exclusive `key` where to stop the walk.
end_key: T::Key,
/// flag whether is ended
is_done: bool,
/// Phantom data for 'tx. As it is only used for `DbCursorRO`.
_tx_phantom: PhantomData<&'tx T>,
}
impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> std::iter::Iterator
for RangeWalker<'cursor, 'tx, T, CURSOR>
{
type Item = Result<(T::Key, T::Value), Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None
}
let start = self.start.take();
if start.is_some() {
return start
}
let res = self.cursor.next().transpose()?;
if let Ok((key, value)) = res {
if key < self.end_key {
Some(Ok((key, value)))
} else {
self.is_done = true;
None
}
} else {
Some(res)
}
}
}
impl<'cursor, 'tx, T: Table, CURSOR: DbCursorRO<'tx, T>> RangeWalker<'cursor, 'tx, T, CURSOR> {
/// construct RangeWalker
pub fn new(cursor: &'cursor mut CURSOR, start: IterPairResult<T>, end_key: T::Key) -> Self {
Self { cursor, start, end_key, is_done: false, _tx_phantom: std::marker::PhantomData }
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
///
/// Reason why we have two lifetimes is to distinguish between `'cursor` lifetime

View File

@@ -1,10 +1,11 @@
//! Mock database
use std::collections::BTreeMap;
use std::{collections::BTreeMap, ops::Range};
use crate::{
common::{PairResult, ValueOnlyResult},
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker,
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
ReverseWalker, Walker,
},
database::{Database, DatabaseGAT},
table::{DupSort, Table},
@@ -139,6 +140,16 @@ impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock {
todo!()
}
fn walk_range<'cursor>(
&'cursor mut self,
_range: Range<T::Key>,
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
{
todo!()
}
fn walk_back<'cursor>(
&'cursor mut self,
_start_key: Option<T::Key>,

View File

@@ -1,10 +1,11 @@
//! Cursor wrapper for libmdbx-sys.
use std::{borrow::Cow, marker::PhantomData};
use std::{borrow::Cow, marker::PhantomData, ops::Range};
use crate::{
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, ReverseWalker, Walker,
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
ReverseWalker, Walker,
},
table::{Compress, DupSort, Encode, Table},
tables::utils::*,
@@ -88,6 +89,22 @@ impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T>
Ok(Walker::new(self, start))
}
fn walk_range<'cursor>(
&'cursor mut self,
range: Range<T::Key>,
) -> Result<RangeWalker<'cursor, 'tx, T, Self>, Error>
where
Self: Sized,
{
let start = self
.inner
.set_range(range.start.encode().as_ref())
.map_err(|e| Error::Read(e.into()))?
.map(decoder::<T>);
Ok(RangeWalker::new(self, start, range.end))
}
fn walk_back<'cursor>(
&'cursor mut self,
start_key: Option<T::Key>,

View File

@@ -216,6 +216,38 @@ mod tests {
assert_eq!(first.1, value, "First next should be put value");
}
#[test]
fn db_cursor_walk_range() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW);
// PUT (0, 0), (1, 0), (2, 0), (3, 0)
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 2, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, H256::zero()))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
// [1, 3)
let mut walker = cursor.walk_range(1..3).unwrap();
assert_eq!(walker.next(), Some(Ok((1, H256::zero()))));
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
assert_eq!(walker.next(), None);
// next() returns None after walker is done
assert_eq!(walker.next(), None);
// [2, 4)
let mut walker = cursor.walk_range(2..4).unwrap();
assert_eq!(walker.next(), Some(Ok((2, H256::zero()))));
assert_eq!(walker.next(), Some(Ok((3, H256::zero()))));
assert_eq!(walker.next(), None);
// next() returns None after walker is done
assert_eq!(walker.next(), None);
}
#[test]
fn db_walker() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW);