From cf029304cc138b92477a09beba7673691473eff3 Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Thu, 4 Aug 2022 04:13:31 +0300 Subject: [PATCH] Fix index unwind, add tests --- src/stages/history_index.rs | 191 +++++++++++++++++++++++++++++++++--- src/stages/stage_util.rs | 1 + 2 files changed, 176 insertions(+), 16 deletions(-) diff --git a/src/stages/history_index.rs b/src/stages/history_index.rs index 8458146..3ed2a61 100644 --- a/src/stages/history_index.rs +++ b/src/stages/history_index.rs @@ -30,6 +30,46 @@ pub struct AccountHistoryIndex { pub flush_interval: u64, } +impl AccountHistoryIndex { + fn execute<'db, 'tx, E>( + &mut self, + tx: &'tx mut MdbxTransaction<'db, RW, E>, + input: StageInput, + ) -> Result + where + 'db: 'tx, + E: EnvironmentKind, + { + Ok(execute_index( + tx, + input, + &self.temp_dir, + self.flush_interval, + tables::AccountChangeSet, + tables::AccountHistory, + |block_number, AccountChange { address, .. }| (block_number, address), + )?) + } + + fn unwind<'db, 'tx, E>( + &mut self, + tx: &'tx mut MdbxTransaction<'db, RW, E>, + input: UnwindInput, + ) -> anyhow::Result + where + 'db: 'tx, + E: EnvironmentKind, + { + unwind_index( + tx, + input, + tables::AccountChangeSet, + tables::AccountHistory, + |_, AccountChange { address, .. }| address, + ) + } +} + #[async_trait] impl<'db, E> Stage<'db, E> for AccountHistoryIndex where @@ -47,15 +87,7 @@ where where 'db: 'tx, { - Ok(execute_index( - tx, - input, - &self.temp_dir, - self.flush_interval, - tables::AccountChangeSet, - tables::AccountHistory, - |block_number, AccountChange { address, .. }| (block_number, address), - )?) + Self::execute(self, tx, input) } async fn unwind<'tx>( @@ -66,13 +98,7 @@ where where 'db: 'tx, { - unwind_index( - tx, - input, - tables::AccountChangeSet, - tables::AccountHistory, - |_, AccountChange { address, .. }| address, - ) + Self::unwind(self, tx, input) } async fn prune<'tx>( @@ -318,3 +344,136 @@ where Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::kv::new_mem_chaindata; + + fn collect_bitmap_and_check<'db, K: TransactionKind, E: EnvironmentKind>( + tx: &MdbxTransaction<'db, K, E>, + changed_blocks: &BTreeSet, + address: Address, + limit: u64, + ) { + let walker = tx + .cursor(tables::AccountHistory) + .unwrap() + .walk(Some(BitmapKey { + inner: address, + block_number: BlockNumber(0), + })); + + pin!(walker); + + let mut indexed_changed_blocks = BTreeSet::new(); + while let Some((key, bitmap)) = walker.next().transpose().unwrap() { + if key.inner != address { + break; + } + + for value in bitmap.iter() { + if let Some(&last_indexed) = indexed_changed_blocks.last() { + assert!(last_indexed < value); + } + indexed_changed_blocks.insert(value); + } + + if key.block_number.0 != u64::MAX { + assert_eq!(*indexed_changed_blocks.last().unwrap(), key.block_number.0); + } + } + + assert_eq!( + changed_blocks + .iter() + .copied() + .take_while(|v| v <= &limit) + .collect::>(), + indexed_changed_blocks + ); + } + + #[test] + fn execute_account_index() { + let chaindata = new_mem_chaindata().unwrap(); + + let mut tx = chaindata.begin_mutable().unwrap(); + + const LIMIT: u64 = 2_000_000; + + let address = Address::from_low_u64_be(0x42); + let mut account = Account { + nonce: 0, + balance: U256::ZERO, + code_hash: EMPTY_HASH, + }; + let mut changed_blocks = BTreeSet::new(); + for block in 0..LIMIT { + if block % 2 == 0 { + changed_blocks.insert(block); + account.nonce = block * 2; + tx.set( + tables::AccountChangeSet, + BlockNumber(block), + AccountChange { + address, + account: Some(account), + }, + ) + .unwrap(); + } + } + + let mut stage = AccountHistoryIndex { + temp_dir: Arc::new(TempDir::new().unwrap()), + flush_interval: LIMIT, + }; + + let mut stage_progress = None; + for limit in [LIMIT / 2, LIMIT] { + stage.flush_interval = limit / 3; + let executed = stage + .execute( + &mut tx, + StageInput { + restarted: false, + first_started_at: (Instant::now(), None), + previous_stage: Some((EXECUTION, BlockNumber(limit))), + stage_progress, + }, + ) + .unwrap(); + + assert_eq!( + executed, + ExecOutput::Progress { + stage_progress: BlockNumber(limit), + done: true, + reached_tip: true, + }, + ); + stage_progress = Some(BlockNumber(limit)); + + collect_bitmap_and_check(&tx, &changed_blocks, address, limit); + } + + assert_eq!( + stage + .unwind( + &mut tx, + UnwindInput { + stage_progress: stage_progress.unwrap(), + unwind_to: BlockNumber(LIMIT / 2), + bad_block: None, + }, + ) + .unwrap(), + UnwindOutput { + stage_progress: BlockNumber(LIMIT / 2) + } + ); + + collect_bitmap_and_check(&tx, &changed_blocks, address, LIMIT / 2); + } +} diff --git a/src/stages/stage_util.rs b/src/stages/stage_util.rs index 4e20d2b..3be67cf 100644 --- a/src/stages/stage_util.rs +++ b/src/stages/stage_util.rs @@ -135,6 +135,7 @@ where }, new_bm, )?; + break; } bm = cursor