From 88f83fca3939826d7ad3c2612dab179e7e6d64d7 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 2 Aug 2023 15:12:53 +0100 Subject: [PATCH] feat(pruner): account history (#4000) --- .../interfaces/src/test_utils/generators.rs | 45 ++-- crates/prune/src/pruner.rs | 240 +++++++++++++++++- crates/stages/benches/setup/mod.rs | 12 +- .../src/stages/index_account_history.rs | 6 +- .../src/stages/index_storage_history.rs | 6 +- crates/stages/src/stages/merkle.rs | 7 +- crates/stages/src/test_utils/test_db.rs | 56 +++- .../src/providers/database/provider.rs | 54 ++-- 8 files changed, 353 insertions(+), 73 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index fbbdf42a21..6b33d9973d 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -191,21 +191,22 @@ pub fn random_block_range( blocks } -type Transition = Vec<(Address, Account, Vec)>; +/// Collection of account and storage entry changes +pub type ChangeSet = Vec<(Address, Account, Vec)>; type AccountState = (Account, Vec); -/// Generate a range of transitions for given blocks and accounts. +/// Generate a range of changesets for given blocks and accounts. /// Assumes all accounts start with an empty storage. /// -/// Returns a Vec of account and storage changes for each transition, +/// Returns a Vec of account and storage changes for each block, /// along with the final state of all accounts and storages. -pub fn random_transition_range<'a, R: Rng, IBlk, IAcc>( +pub fn random_changeset_range<'a, R: Rng, IBlk, IAcc>( rng: &mut R, blocks: IBlk, accounts: IAcc, - n_changes: std::ops::Range, + n_storage_changes: std::ops::Range, key_range: std::ops::Range, -) -> (Vec, BTreeMap) +) -> (Vec, BTreeMap) where IBlk: IntoIterator, IAcc: IntoIterator))>, @@ -217,16 +218,20 @@ where let valid_addresses = state.keys().copied().collect(); - let mut transitions = Vec::new(); + let mut changesets = Vec::new(); blocks.into_iter().for_each(|block| { - let mut transition = Vec::new(); - let (from, to, mut transfer, new_entries) = - random_account_change(rng, &valid_addresses, n_changes.clone(), key_range.clone()); + let mut changeset = Vec::new(); + let (from, to, mut transfer, new_entries) = random_account_change( + rng, + &valid_addresses, + n_storage_changes.clone(), + key_range.clone(), + ); // extract from sending account let (prev_from, _) = state.get_mut(&from).unwrap(); - transition.push((from, *prev_from, Vec::new())); + changeset.push((from, *prev_from, Vec::new())); transfer = max(min(transfer, prev_from.balance), U256::from(1)); prev_from.balance = prev_from.balance.wrapping_sub(transfer); @@ -250,11 +255,11 @@ where }) .collect(); - transition.push((to, *prev_to, old_entries)); + changeset.push((to, *prev_to, old_entries)); prev_to.balance = prev_to.balance.wrapping_add(transfer); - transitions.push(transition); + changesets.push(changeset); }); let final_state = state @@ -263,7 +268,7 @@ where (addr, (acc, storage.into_iter().map(|v| v.into()).collect())) }) .collect(); - (transitions, final_state) + (changesets, final_state) } /// Generate a random account change. @@ -272,7 +277,7 @@ where pub fn random_account_change( rng: &mut R, valid_addresses: &Vec
, - n_changes: std::ops::Range, + n_storage_changes: std::ops::Range, key_range: std::ops::Range, ) -> (Address, Address, U256, Vec) { let mut addresses = valid_addresses.choose_multiple(rng, 2).cloned(); @@ -282,9 +287,13 @@ pub fn random_account_change( let balance_change = U256::from(rng.gen::()); - let storage_changes = (0..n_changes.sample_single(rng)) - .map(|_| random_storage_entry(rng, key_range.clone())) - .collect(); + let storage_changes = if n_storage_changes.is_empty() { + Vec::new() + } else { + (0..n_storage_changes.sample_single(rng)) + .map(|_| random_storage_entry(rng, key_range.clone())) + .collect() + }; (addr_from, addr_to, balance_change, storage_changes) } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 3b8270bde8..57775390e7 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -2,9 +2,16 @@ use crate::{Metrics, PrunerError}; use rayon::prelude::*; -use reth_db::{database::Database, tables}; +use reth_db::{ + abstraction::cursor::{DbCursorRO, DbCursorRW}, + database::Database, + models::ShardedKey, + tables, + transaction::DbTxMut, + BlockNumberList, +}; use reth_primitives::{ - BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, + Address, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, }; use reth_provider::{ BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, @@ -23,11 +30,17 @@ pub struct BatchSizes { receipts: usize, transaction_lookup: usize, transaction_senders: usize, + account_history: usize, } impl Default for BatchSizes { fn default() -> Self { - Self { receipts: 10000, transaction_lookup: 10000, transaction_senders: 10000 } + Self { + receipts: 10000, + transaction_lookup: 10000, + transaction_senders: 10000, + account_history: 10000, + } } } @@ -103,6 +116,12 @@ impl Pruner { .record(part_start.elapsed()) } + if let Some((to_block, prune_mode)) = + self.modes.prune_target_block_account_history(tip_block_number)? + { + self.prune_account_history(&provider, to_block, prune_mode)?; + } + provider.commit()?; self.last_pruned_block_number = Some(tip_block_number); @@ -188,7 +207,7 @@ impl Pruner { let total = range.clone().count(); let mut processed = 0; - provider.prune_table_in_batches::( + provider.prune_table_with_iterator_in_batches::( range, self.batch_sizes.receipts, |entries| { @@ -256,7 +275,7 @@ impl Pruner { // Pre-sort hashes to prune them in order hashes.sort_unstable(); - let entries = provider.prune_table::(hashes)?; + let entries = provider.prune_table_with_iterator::(hashes)?; processed += entries; trace!( target: "pruner", @@ -296,7 +315,7 @@ impl Pruner { let total = range.clone().count(); let mut processed = 0; - provider.prune_table_in_batches::( + provider.prune_table_with_range_in_batches::( range, self.batch_sizes.transaction_senders, |entries| { @@ -317,22 +336,135 @@ impl Pruner { Ok(()) } + + /// Prune account history up to the provided block, inclusive. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + fn prune_account_history( + &self, + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + prune_mode: PruneMode, + ) -> PrunerResult { + let from_block = provider + .get_prune_checkpoint(PrunePart::AccountHistory)? + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + let range = from_block..=to_block; + let total = range.clone().count(); + + let mut processed = 0; + provider.prune_table_with_range_in_batches::( + range, + self.batch_sizes.account_history, + |entries| { + processed += entries; + trace!( + target: "pruner", + %entries, + progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + "Pruned account history (changesets)" + ); + }, + )?; + + let mut cursor = provider.tx_ref().cursor_write::()?; + // Prune `AccountHistory` table: + // 1. If the shard has `highest_block_number` less than or equal to the target block number + // for pruning, delete the shard completely. + // 2. If the shard has `highest_block_number` greater than the target block number for + // pruning, filter block numbers inside the shard which are less than the target + // block number for pruning. + while let Some(result) = cursor.next()? { + let (key, blocks): (ShardedKey
, BlockNumberList) = result; + + if key.highest_block_number <= to_block { + // If shard consists only of block numbers less than the target one, delete shard + // completely. + cursor.delete_current()?; + if key.highest_block_number == to_block { + // Shard contains only block numbers up to the target one, so we can skip to the + // next address. It is guaranteed that further shards for this address will not + // contain the target block number, as it's in this shard. + cursor.seek_exact(ShardedKey::last(key.key))?; + } + } else { + // Shard contains block numbers that are higher than the target one, so we need to + // filter it. It is guaranteed that further shards for this address will not contain + // the target block number, as it's in this shard. + let blocks = blocks + .iter(0) + .skip_while(|block| *block <= to_block as usize) + .collect::>(); + if blocks.is_empty() { + // If there are no more blocks in this shard, we need to remove it, as empty + // shards are not allowed. + if key.highest_block_number == u64::MAX { + // If current shard is the last shard for this address, replace it with the + // previous shard. + if let Some((prev_key, prev_value)) = cursor.prev()? { + if prev_key.key == key.key { + cursor.delete_current()?; + // Upsert will replace the last shard for this address with the + // previous value + cursor.upsert(key.clone(), prev_value)?; + } + } + } else { + // If current shard is not the last shard for this address, just delete it. + cursor.delete_current()?; + } + } else { + cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?; + } + + // Jump to the next address + cursor.seek_exact(ShardedKey::last(key.key))?; + } + + processed += 1; + if processed % self.batch_sizes.account_history == 0 { + trace!( + target: "pruner", + entries = self.batch_sizes.account_history, + "Pruned account history (indices)" + ); + } + } + + if processed % self.batch_sizes.account_history != 0 { + trace!( + target: "pruner", + entries = processed % self.batch_sizes.account_history, + "Pruned account history (indices)" + ); + } + + provider.save_prune_checkpoint( + PrunePart::AccountHistory, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } } #[cfg(test)] mod tests { use crate::{pruner::BatchSizes, Pruner}; use assert_matches::assert_matches; - use reth_db::{tables, test_utils::create_test_rw_db}; + use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList}; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_receipt}, + generators::{ + random_block_range, random_changeset_range, random_eoa_account_range, random_receipt, + }, }; use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET, + Address, BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET, }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; + use std::{collections::BTreeMap, ops::AddAssign}; #[test] fn is_pruning_needed() { @@ -542,4 +674,94 @@ mod tests { // ended last time test_prune(20); } + + #[test] + fn prune_account_history() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let block_num = 7000; + let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let accounts = + random_eoa_account_range(&mut rng, 0..3).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.clone().into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..0, + 0..0, + ); + tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); + tx.insert_history(changesets.clone(), None).expect("insert history"); + + let account_occurrences = tx.table::().unwrap().into_iter().fold( + BTreeMap::::new(), + |mut map, (key, _)| { + map.entry(key.key).or_default().add_assign(1); + map + }, + ); + assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + assert_eq!( + tx.table::().unwrap().len(), + changesets.iter().flatten().count() + ); + + let original_shards = tx.table::().unwrap(); + + let test_prune = |to_block: BlockNumber| { + let prune_mode = PruneMode::Before(to_block); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + PruneModes { account_history: Some(prune_mode), ..Default::default() }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + account_history: 10, + ..Default::default() + }, + ); + + let provider = tx.inner_rw(); + assert_matches!(pruner.prune_account_history(&provider, to_block, prune_mode), Ok(())); + provider.commit().expect("commit"); + + assert_eq!( + tx.table::().unwrap().len(), + changesets[to_block as usize + 1..].iter().flatten().count() + ); + + let actual_shards = tx.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.highest_block_number > to_block) + .map(|(key, blocks)| { + let new_blocks = blocks + .iter(0) + .skip_while(|block| *block <= to_block as usize) + .collect::>(); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(), + Some(PruneCheckpoint { block_number: to_block, prune_mode }) + ); + }; + + // Prune first time: no previous checkpoint is present + test_prune(3000); + // Prune second time: previous checkpoint is present, should continue pruning from where + // ended last time + test_prune(4500); + } } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 0c2a6dc191..f61f7e273c 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -8,8 +8,8 @@ use reth_db::{ use reth_interfaces::test_utils::{ generators, generators::{ - random_block_range, random_contract_account_range, random_eoa_account_range, - random_transition_range, + random_block_range, random_changeset_range, random_contract_account_range, + random_eoa_account_range, }, }; use reth_primitives::{Account, Address, SealedBlock, H256, MAINNET}; @@ -119,7 +119,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { let mut blocks = random_block_range(&mut rng, 0..=num_blocks, H256::zero(), txs_range); - let (transitions, start_state) = random_transition_range( + let (transitions, start_state) = random_changeset_range( &mut rng, blocks.iter().take(2), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), @@ -139,10 +139,10 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { let offset = transitions.len() as u64; - tx.insert_transitions(transitions, None).unwrap(); + tx.insert_changesets(transitions, None).unwrap(); tx.commit(|tx| updates.flush(tx)).unwrap(); - let (transitions, final_state) = random_transition_range( + let (transitions, final_state) = random_changeset_range( &mut rng, blocks.iter().skip(2), start_state, @@ -150,7 +150,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { key_range, ); - tx.insert_transitions(transitions, Some(offset)).unwrap(); + tx.insert_changesets(transitions, Some(offset)).unwrap(); tx.insert_accounts_and_storages(final_state).unwrap(); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index fe0b6d3b40..14943d38a2 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -92,7 +92,7 @@ mod tests { }; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_contract_account_range, random_transition_range}, + generators::{random_block_range, random_changeset_range, random_contract_account_range}, }; use reth_primitives::{hex_literal::hex, Address, BlockNumber, H160, H256, MAINNET}; @@ -408,7 +408,7 @@ mod tests { let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3); - let (transitions, _) = random_transition_range( + let (transitions, _) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), @@ -417,7 +417,7 @@ mod tests { ); // add block changeset from block 1. - self.tx.insert_transitions(transitions, Some(start))?; + self.tx.insert_changesets(transitions, Some(start))?; Ok(()) } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index a17c5f14e7..4d746817e2 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -91,7 +91,7 @@ mod tests { }; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_contract_account_range, random_transition_range}, + generators::{random_block_range, random_changeset_range, random_contract_account_range}, }; use reth_primitives::{ hex_literal::hex, Address, BlockNumber, StorageEntry, H160, H256, MAINNET, U256, @@ -422,7 +422,7 @@ mod tests { let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3); - let (transitions, _) = random_transition_range( + let (transitions, _) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), @@ -431,7 +431,7 @@ mod tests { ); // add block changeset from block 1. - self.tx.insert_transitions(transitions, Some(start))?; + self.tx.insert_changesets(transitions, Some(start))?; Ok(()) } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index ffcc427b58..64d27426c5 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -376,8 +376,7 @@ mod tests { use reth_interfaces::test_utils::{ generators, generators::{ - random_block, random_block_range, random_contract_account_range, - random_transition_range, + random_block, random_block_range, random_changeset_range, random_contract_account_range, }, }; use reth_primitives::{ @@ -533,7 +532,7 @@ mod tests { blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3)); self.tx.insert_blocks(blocks.iter(), None)?; - let (transitions, final_state) = random_transition_range( + let (transitions, final_state) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), @@ -541,7 +540,7 @@ mod tests { 0..256, ); // add block changeset from block 1. - self.tx.insert_transitions(transitions, Some(start))?; + self.tx.insert_changesets(transitions, Some(start))?; self.tx.insert_accounts_and_storages(final_state)?; // Calculate state root diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index a4df2b207f..630ba97f7c 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -9,11 +9,12 @@ use reth_db::{ transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, DatabaseEnv, DatabaseError as DbError, }; +use reth_interfaces::test_utils::generators::ChangeSet; use reth_primitives::{ keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry, TxHash, TxNumber, H256, MAINNET, U256, }; -use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory}; +use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, HistoryWriter, ProviderFactory}; use std::{ borrow::Borrow, collections::BTreeMap, @@ -347,35 +348,62 @@ impl TestTransaction { }) } - /// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into - /// corresponding tables. - pub fn insert_transitions( + /// Insert collection of [ChangeSet] into corresponding tables. + pub fn insert_changesets( &self, - transitions: I, - transition_offset: Option, + changesets: I, + block_offset: Option, ) -> Result<(), DbError> where - I: IntoIterator)>>, + I: IntoIterator, { - let offset = transition_offset.unwrap_or_default(); + let offset = block_offset.unwrap_or_default(); self.commit(|tx| { - transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| { - changes.into_iter().try_for_each(|(address, old_account, old_storage)| { - let tid = offset + transition_id as u64; + changesets.into_iter().enumerate().try_for_each(|(block, changeset)| { + changeset.into_iter().try_for_each(|(address, old_account, old_storage)| { + let block = offset + block as u64; // Insert into account changeset. tx.put::( - tid, + block, AccountBeforeTx { address, info: Some(old_account) }, )?; - let tid_address = (tid, address).into(); + let block_address = (block, address).into(); // Insert into storage changeset. old_storage.into_iter().try_for_each(|entry| { - tx.put::(tid_address, entry) + tx.put::(block_address, entry) }) }) }) }) } + + pub fn insert_history( + &self, + changesets: I, + block_offset: Option, + ) -> reth_interfaces::Result<()> + where + I: IntoIterator, + { + let mut accounts = BTreeMap::>::new(); + let mut storages = BTreeMap::<(Address, H256), Vec>::new(); + + for (block, changeset) in changesets.into_iter().enumerate() { + for (address, _, storage_entries) in changeset { + accounts.entry(address).or_default().push(block as u64); + for storage_entry in storage_entries { + storages.entry((address, storage_entry.key)).or_default().push(block as u64); + } + } + } + + let provider = self.factory.provider_rw()?; + provider.insert_account_history_index(accounts)?; + provider.insert_storage_history_index(storages)?; + provider.commit()?; + + Ok(()) + } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 446065457c..19bab6e507 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -17,7 +17,7 @@ use reth_db::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, }, - table::{Key, Table}, + table::Table, tables, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, @@ -621,31 +621,23 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { /// Prune the table for the specified pre-sorted key iterator. /// Returns number of rows pruned. - pub fn prune_table( + pub fn prune_table_with_iterator( &self, - keys: impl IntoIterator, - ) -> std::result::Result - where - T: Table, - K: Key, - { - self.prune_table_in_batches::(keys, usize::MAX, |_| {}) + keys: impl IntoIterator, + ) -> std::result::Result { + self.prune_table_with_iterator_in_batches::(keys, usize::MAX, |_| {}) } /// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after /// every `batch_size` pruned rows. /// /// Returns number of rows pruned. - pub fn prune_table_in_batches( + pub fn prune_table_with_iterator_in_batches( &self, - keys: impl IntoIterator, + keys: impl IntoIterator, batch_size: usize, mut batch_callback: impl FnMut(usize), - ) -> std::result::Result - where - T: Table, - K: Key, - { + ) -> std::result::Result { let mut cursor = self.tx.cursor_write::()?; let mut deleted = 0; @@ -667,6 +659,36 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(deleted) } + /// Prune the table for the specified key range, calling `chunk_callback` after every + /// `batch_size` pruned rows. + /// + /// Returns number of rows pruned. + pub fn prune_table_with_range_in_batches( + &self, + keys: impl RangeBounds, + batch_size: usize, + mut batch_callback: impl FnMut(usize), + ) -> std::result::Result { + let mut cursor = self.tx.cursor_write::()?; + let mut walker = cursor.walk_range(keys)?; + let mut deleted = 0; + + while walker.next().transpose()?.is_some() { + walker.delete_current()?; + deleted += 1; + + if deleted % batch_size == 0 { + batch_callback(batch_size); + } + } + + if deleted % batch_size != 0 { + batch_callback(deleted % batch_size); + } + + Ok(deleted) + } + /// Load shard and remove it. If list is empty, last shard was full or /// there are no shards at all. fn take_shard(&self, key: T::Key) -> Result>