diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs index 51c67f7eef..f2858d9417 100644 --- a/crates/consensus/beacon/src/engine/forkchoice.rs +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -51,6 +51,18 @@ impl ForkchoiceStateTracker { self.latest_status().map(|s| s.is_valid()).unwrap_or(false) } + /// Returns whether the latest received FCU is syncing: [ForkchoiceStatus::Syncing] + #[allow(unused)] + pub(crate) fn is_latest_syncing(&self) -> bool { + self.latest_status().map(|s| s.is_syncing()).unwrap_or(false) + } + + /// Returns whether the latest received FCU is syncing: [ForkchoiceStatus::Invalid] + #[allow(unused)] + pub(crate) fn is_latest_invalid(&self) -> bool { + self.latest_status().map(|s| s.is_invalid()).unwrap_or(false) + } + /// Returns the last valid head hash. #[allow(unused)] pub(crate) fn last_valid_head(&self) -> Option { @@ -98,6 +110,10 @@ impl ForkchoiceStatus { matches!(self, ForkchoiceStatus::Valid) } + pub(crate) fn is_invalid(&self) -> bool { + matches!(self, ForkchoiceStatus::Invalid) + } + pub(crate) fn is_syncing(&self) -> bool { matches!(self, ForkchoiceStatus::Syncing) } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index f1c7c12b05..cd64817792 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1715,12 +1715,13 @@ where // Poll prune controller if all conditions are met: // 1. Pipeline is idle - // 2. Pruning is running and we need to prioritize checking its events OR no engine and - // sync messages are pending and we may start pruning - // 3. Latest FCU status is VALID + // 2. Either of two: + // 1. Pruning is running and we need to prioritize checking its events + // 2. Both engine and sync messages are pending AND latest FCU status is not INVALID, + // so we may start pruning if this.sync.is_pipeline_idle() && - (this.is_prune_active() || is_pending) && - this.forkchoice_state_tracker.is_latest_valid() + (this.is_prune_active() || + is_pending && !this.forkchoice_state_tracker.is_latest_invalid()) { if let Some(ref mut prune) = this.prune { match prune.poll(cx, this.blockchain.canonical_tip().number) { diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 5f19431c9e..8c1ab001af 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -6,12 +6,13 @@ use reth_db::{ abstraction::cursor::{DbCursorRO, DbCursorRW}, database::Database, models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey}, + table::Table, tables, transaction::DbTxMut, BlockNumberList, }; use reth_primitives::{ - Address, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, + BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, }; use reth_provider::{ BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, @@ -81,6 +82,11 @@ impl Pruner { /// Run the pruner pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { + trace!( + target: "pruner", + %tip_block_number, + "Pruner started" + ); let start = Instant::now(); let provider = self.provider_factory.provider_rw()?; @@ -143,8 +149,15 @@ impl Pruner { provider.commit()?; self.last_pruned_block_number = Some(tip_block_number); - self.metrics.pruner.duration_seconds.record(start.elapsed()); + let elapsed = start.elapsed(); + self.metrics.pruner.duration_seconds.record(elapsed); + trace!( + target: "pruner", + %tip_block_number, + ?elapsed, + "Pruner finished" + ); Ok(()) } @@ -228,11 +241,11 @@ impl Pruner { provider.prune_table_with_iterator_in_batches::( range, self.batch_sizes.receipts, - |entries| { - processed += entries; + |rows| { + processed += rows; trace!( target: "pruner", - %entries, + %rows, progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), "Pruned receipts" ); @@ -293,11 +306,11 @@ impl Pruner { // Pre-sort hashes to prune them in order hashes.sort_unstable(); - let entries = provider.prune_table_with_iterator::(hashes)?; - processed += entries; + let rows = provider.prune_table_with_iterator::(hashes)?; + processed += rows; trace!( target: "pruner", - %entries, + %rows, progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), "Pruned transaction lookup" ); @@ -336,11 +349,11 @@ impl Pruner { provider.prune_table_with_range_in_batches::( range, self.batch_sizes.transaction_senders, - |entries| { - processed += entries; + |rows, _| { + processed += rows; trace!( target: "pruner", - %entries, + %rows, progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), "Pruned transaction senders" ); @@ -370,92 +383,34 @@ impl Pruner { let range = from_block..=to_block; let total = range.clone().count(); - let mut processed = 0; provider.prune_table_with_range_in_batches::( range, self.batch_sizes.account_history, - |entries| { - processed += entries; + |keys, rows| { trace!( target: "pruner", - %entries, - progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + %keys, + %rows, + progress = format!("{:.1}%", 100.0 * keys as f64 / total as f64), "Pruned account history (changesets)" ); }, )?; - let mut cursor = provider.tx_ref().cursor_write::()?; - // Prune `AccountHistory` table: - // 1. If the shard has `highest_block_number` less than or equal to the target block number - // for pruning, delete the shard completely. - // 2. If the shard has `highest_block_number` greater than the target block number for - // pruning, filter block numbers inside the shard which are less than the target - // block number for pruning. - while let Some(result) = cursor.next()? { - let (key, blocks): (ShardedKey
, BlockNumberList) = result; - - if key.highest_block_number <= to_block { - // If shard consists only of block numbers less than the target one, delete shard - // completely. - cursor.delete_current()?; - if key.highest_block_number == to_block { - // Shard contains only block numbers up to the target one, so we can skip to the - // next address. It is guaranteed that further shards for this address will not - // contain the target block number, as it's in this shard. - cursor.seek_exact(ShardedKey::last(key.key))?; - } - } else { - // Shard contains block numbers that are higher than the target one, so we need to - // filter it. It is guaranteed that further shards for this address will not contain - // the target block number, as it's in this shard. - let blocks = blocks - .iter(0) - .skip_while(|block| *block <= to_block as usize) - .collect::>(); - if blocks.is_empty() { - // If there are no more blocks in this shard, we need to remove it, as empty - // shards are not allowed. - if key.highest_block_number == u64::MAX { - // If current shard is the last shard for this address, replace it with the - // previous shard. - if let Some((prev_key, prev_value)) = cursor.prev()? { - if prev_key.key == key.key { - cursor.delete_current()?; - // Upsert will replace the last shard for this address with the - // previous value - cursor.upsert(key.clone(), prev_value)?; - } - } - } else { - // If current shard is not the last shard for this address, just delete it. - cursor.delete_current()?; - } - } else { - cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?; - } - - // Jump to the next address - cursor.seek_exact(ShardedKey::last(key.key))?; - } - - processed += 1; - if processed % self.batch_sizes.account_history == 0 { + self.prune_history_indices::( + provider, + to_block, + |a, b| a.key == b.key, + |key| ShardedKey::last(key.key), + self.batch_sizes.account_history, + |rows| { trace!( target: "pruner", - entries = self.batch_sizes.account_history, + rows, "Pruned account history (indices)" ); - } - } - - if processed % self.batch_sizes.account_history != 0 { - trace!( - target: "pruner", - entries = processed % self.batch_sizes.account_history, - "Pruned account history (indices)" - ); - } + }, + )?; provider.save_prune_checkpoint( PrunePart::AccountHistory, @@ -478,104 +433,136 @@ impl Pruner { .map(|checkpoint| checkpoint.block_number + 1) .unwrap_or_default(); let block_range = from_block..=to_block; - let total = block_range.clone().count(); let range = BlockNumberAddress::range(block_range); - let mut processed = 0; provider.prune_table_with_range_in_batches::( range, self.batch_sizes.storage_history, - |entries| { - processed += entries; + |keys, rows| { trace!( target: "pruner", - %entries, - progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + %keys, + %rows, "Pruned storage history (changesets)" ); }, )?; - let mut cursor = provider.tx_ref().cursor_write::()?; - // Prune `StorageHistory` table: + self.prune_history_indices::( + provider, + to_block, + |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, + |key| StorageShardedKey::last(key.address, key.sharded_key.key), + self.batch_sizes.storage_history, + |rows| { + trace!( + target: "pruner", + rows, + "Pruned storage history (indices)" + ); + }, + )?; + + provider.save_prune_checkpoint( + PrunePart::StorageHistory, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } + + /// Prune history indices up to the provided block, inclusive. + fn prune_history_indices( + &self, + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + key_matches: impl Fn(&T::Key, &T::Key) -> bool, + last_key: impl Fn(&T::Key) -> T::Key, + batch_size: usize, + batch_callback: impl Fn(usize), + ) -> PrunerResult + where + T: Table, + T::Key: AsRef>, + { + let mut processed = 0; + let mut cursor = provider.tx_ref().cursor_write::()?; + // Prune history table: // 1. If the shard has `highest_block_number` less than or equal to the target block number // for pruning, delete the shard completely. // 2. If the shard has `highest_block_number` greater than the target block number for // pruning, filter block numbers inside the shard which are less than the target // block number for pruning. while let Some(result) = cursor.next()? { - let (key, blocks): (StorageShardedKey, BlockNumberList) = result; + let (key, blocks): (T::Key, BlockNumberList) = result; - if key.sharded_key.highest_block_number <= to_block { + if key.as_ref().highest_block_number <= to_block { // If shard consists only of block numbers less than the target one, delete shard // completely. cursor.delete_current()?; - if key.sharded_key.highest_block_number == to_block { + if key.as_ref().highest_block_number == to_block { // Shard contains only block numbers up to the target one, so we can skip to the - // next storage slot for this address. It is guaranteed that further shards for - // this address and storage slot will not contain the target block number, as - // it's in this shard. - cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?; + // next sharded key. It is guaranteed that further shards for this sharded key + // will not contain the target block number, as it's in this shard. + cursor.seek_exact(last_key(&key))?; } } else { // Shard contains block numbers that are higher than the target one, so we need to - // filter it. It is guaranteed that further shards for this address and storage slot - // will not contain the target block number, as it's in this shard. - let blocks = blocks + // filter it. It is guaranteed that further shards for this sharded key will not + // contain the target block number, as it's in this shard. + let new_blocks = blocks .iter(0) .skip_while(|block| *block <= to_block as usize) .collect::>(); - if blocks.is_empty() { - // If there are no more blocks in this shard, we need to remove it, as empty - // shards are not allowed. - if key.sharded_key.highest_block_number == u64::MAX { - // If current shard is the last shard for this address and storage slot, - // replace it with the previous shard. - if let Some((prev_key, prev_value)) = cursor.prev()? { - if prev_key.address == key.address && - prev_key.sharded_key.key == key.sharded_key.key + + if blocks.len() != new_blocks.len() { + // If there were blocks less than or equal to the target one + // (so the shard has changed), update the shard. + if new_blocks.is_empty() { + // If there are no more blocks in this shard, we need to remove it, as empty + // shards are not allowed. + if key.as_ref().highest_block_number == u64::MAX { + // If current shard is the last shard for this sharded key, replace it + // with the previous shard. + if let Some(prev_value) = cursor + .prev()? + .filter(|(prev_key, _)| key_matches(prev_key, &key)) + .map(|(_, prev_value)| prev_value) { cursor.delete_current()?; - // Upsert will replace the last shard for this address and storage - // slot with the previous value + // Upsert will replace the last shard for this sharded key with the + // previous value cursor.upsert(key.clone(), prev_value)?; + } else { + // If there's no previous shard for this sharded key, + // just delete last shard completely. + cursor.delete_current()?; } + } else { + // If current shard is not the last shard for this sharded key, + // just delete it. + cursor.delete_current()?; } } else { - // If current shard is not the last shard for this address, just delete it. - cursor.delete_current()?; + cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?; } - } else { - cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?; } // Jump to the next address - cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?; + cursor.seek_exact(last_key(&key))?; } processed += 1; - if processed % self.batch_sizes.storage_history == 0 { - trace!( - target: "pruner", - entries = self.batch_sizes.storage_history, - "Pruned storage history (indices)" - ); + + if processed % batch_size == 0 { + batch_callback(batch_size); } } - if processed % self.batch_sizes.storage_history != 0 { - trace!( - target: "pruner", - entries = processed % self.batch_sizes.storage_history, - "Pruned storage history (indices)" - ); + if processed % batch_size != 0 { + batch_callback(processed % batch_size); } - provider.save_prune_checkpoint( - PrunePart::StorageHistory, - PruneCheckpoint { block_number: to_block, prune_mode }, - )?; - Ok(()) } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 19bab6e507..7a82cac2a3 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -660,33 +660,40 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { } /// Prune the table for the specified key range, calling `chunk_callback` after every - /// `batch_size` pruned rows. + /// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort + /// tables, these numbers will be different as one key can correspond to multiple rows. /// /// Returns number of rows pruned. pub fn prune_table_with_range_in_batches( &self, keys: impl RangeBounds, batch_size: usize, - mut batch_callback: impl FnMut(usize), - ) -> std::result::Result { + mut batch_callback: impl FnMut(usize, usize), + ) -> std::result::Result<(), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; let mut walker = cursor.walk_range(keys)?; - let mut deleted = 0; + let mut deleted_keys = 0; + let mut deleted_rows = 0; + let mut previous_key = None; - while walker.next().transpose()?.is_some() { + while let Some((key, _)) = walker.next().transpose()? { walker.delete_current()?; - deleted += 1; + deleted_rows += 1; + if previous_key.as_ref().map(|previous_key| previous_key != &key).unwrap_or(true) { + deleted_keys += 1; + previous_key = Some(key); + } - if deleted % batch_size == 0 { - batch_callback(batch_size); + if deleted_rows % batch_size == 0 { + batch_callback(deleted_keys, deleted_rows); } } - if deleted % batch_size != 0 { - batch_callback(deleted % batch_size); + if deleted_rows % batch_size != 0 { + batch_callback(deleted_keys, deleted_rows); } - Ok(deleted) + Ok(()) } /// Load shard and remove it. If list is empty, last shard was full or