feat(prune): don't prune changesets if taking an incremental hashing/trie route (#4025)

eth tests unrelated, new blob txs
This commit is contained in:
joshieDo
2023-08-02 18:06:17 +01:00
committed by GitHub
parent 94dfeb3ade
commit 2e1ef4dfa5
13 changed files with 122 additions and 145 deletions

View File

@@ -185,6 +185,12 @@ impl ImportCommand {
max_blocks: config.stages.execution.max_blocks,
max_changes: config.stages.execution.max_changes,
},
config
.stages
.merkle
.clean_threshold
.max(config.stages.account_hashing.clean_threshold)
.max(config.stages.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
)

View File

@@ -142,6 +142,11 @@ impl Command {
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds { max_blocks: None, max_changes: None },
stage_conf
.merkle
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
)

View File

@@ -14,7 +14,7 @@ use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage,
StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
},
ExecInput, PipelineError, Stage,
};
@@ -96,6 +96,7 @@ impl Command {
let mut execution_stage = ExecutionStage::new(
factory,
ExecutionStageThresholds { max_blocks: Some(1), max_changes: None },
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::all(),
);

View File

@@ -755,6 +755,11 @@ impl<Ext: RethCliExt> Command<Ext> {
max_blocks: stage_config.execution.max_blocks,
max_changes: stage_config.execution.max_changes,
},
stage_config
.merkle
.clean_threshold
.max(stage_config.account_hashing.clean_threshold)
.max(stage_config.storage_hashing.clean_threshold),
prune_config.map(|prune| prune.parts).unwrap_or_default(),
)
.with_metrics_tx(metrics_tx),
@@ -762,17 +767,12 @@ impl<Ext: RethCliExt> Command<Ext> {
.set(AccountHashingStage::new(
stage_config.account_hashing.clean_threshold,
stage_config.account_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(StorageHashingStage::new(
stage_config.storage_hashing.clean_threshold,
stage_config.storage_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(
stage_config.merkle.clean_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,

View File

@@ -7,7 +7,7 @@ use reth_provider::ProviderFactory;
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage,
StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
},
Stage, UnwindInput,
};
@@ -70,6 +70,7 @@ async fn unwind_and_copy<DB: Database>(
let mut exec_stage = ExecutionStage::new(
reth_revm::Factory::new(db_tool.chain.clone()),
ExecutionStageThresholds { max_blocks: Some(u64::MAX), max_changes: None },
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::all(),
);
@@ -86,22 +87,14 @@ async fn unwind_and_copy<DB: Database>(
// Bring hashes to TO
AccountHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
let unwind_inner_tx = provider.into_tx();
@@ -132,7 +125,6 @@ async fn dry_run<DB: Database>(
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
prune_modes: Default::default(),
}
.execute(
&provider,

View File

@@ -202,28 +202,19 @@ impl Command {
max_blocks: Some(batch_size),
max_changes: None,
},
config.stages.merkle.clean_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
}
StageEnum::StorageHashing => {
(Box::new(StorageHashingStage::new(1, batch_size)), None)
}
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),

View File

@@ -95,7 +95,7 @@ fn merkle(c: &mut Criterion) {
// don't need to run each stage for that many times
group.sample_size(10);
let stage = MerkleStage::Both { clean_threshold: u64::MAX, prune_modes: Default::default() };
let stage = MerkleStage::Both { clean_threshold: u64::MAX };
measure_stage(
&mut group,
setup::unwind_hashes,
@@ -104,7 +104,7 @@ fn merkle(c: &mut Criterion) {
"Merkle-incremental".to_string(),
);
let stage = MerkleStage::Both { clean_threshold: 0, prune_modes: Default::default() };
let stage = MerkleStage::Both { clean_threshold: 0 };
measure_stage(
&mut group,
setup::unwind_hashes,

View File

@@ -1,7 +1,8 @@
use crate::{
ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError, UnwindInput,
UnwindOutput,
stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, ExecInput, ExecOutput, MetricEvent,
MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput,
};
use num_traits::Zero;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
@@ -59,6 +60,11 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
executor_factory: EF,
/// The commit thresholds of the execution stage.
thresholds: ExecutionStageThresholds,
/// The highest threshold (in number of blocks) for switching between incremental
/// and full calculations across [`super::MerkleStage`], [`super::AccountHashingStage`] and
/// [`super::StorageHashingStage`]. This is required to figure out if can prune or not
/// changesets on subsequent pipeline runs.
external_clean_threshold: u64,
/// Pruning configuration.
prune_modes: PruneModes,
}
@@ -68,16 +74,28 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
pub fn new(
executor_factory: EF,
thresholds: ExecutionStageThresholds,
external_clean_threshold: u64,
prune_modes: PruneModes,
) -> Self {
Self { metrics_tx: None, executor_factory, thresholds, prune_modes }
Self {
metrics_tx: None,
external_clean_threshold,
executor_factory,
thresholds,
prune_modes,
}
}
/// Create an execution stage with the provided executor factory.
///
/// The commit threshold will be set to 10_000.
pub fn new_with_factory(executor_factory: EF) -> Self {
Self::new(executor_factory, ExecutionStageThresholds::default(), PruneModes::default())
Self::new(
executor_factory,
ExecutionStageThresholds::default(),
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::default(),
)
}
/// Set the metric events sender.
@@ -98,6 +116,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let start_block = input.next_block();
let max_block = input.target();
let prune_modes = self.adjust_prune_modes(provider, start_block, max_block)?;
// Build executor
let mut executor =
@@ -110,7 +129,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Execute block range
let mut state = PostState::default();
state.add_prune_modes(self.prune_modes);
state.add_prune_modes(prune_modes);
for block_number in start_block..=max_block {
let td = provider
@@ -163,6 +182,35 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
done,
})
}
/// Adjusts the prune modes related to changesets.
///
/// This function verifies whether the [`super::MerkleStage`] or Hashing stages will run from
/// scratch. If at least one stage isn't starting anew, it implies that pruning of
/// changesets cannot occur. This is determined by checking the highest clean threshold
/// (`self.external_clean_threshold`) across the stages.
///
/// Given that `start_block` changes with each checkpoint, it's necessary to inspect
/// [`tables::AccountsTrie`] to ensure that [`super::MerkleStage`] hasn't
/// been previously executed.
fn adjust_prune_modes<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, &DB>,
start_block: u64,
max_block: u64,
) -> Result<PruneModes, StageError> {
let mut prune_modes = self.prune_modes;
// If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase
// changeset related pruning configurations
if !(max_block - start_block > self.external_clean_threshold ||
provider.tx_ref().entries::<tables::AccountsTrie>()?.is_zero())
{
prune_modes.account_history = None;
prune_modes.storage_history = None;
}
Ok(prune_modes)
}
}
fn execution_checkpoint<DB: Database>(
@@ -438,6 +486,7 @@ mod tests {
ExecutionStage::new(
factory,
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::none(),
)
}

View File

@@ -15,7 +15,6 @@ use reth_primitives::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint,
StageId,
},
PruneModes,
};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter};
use std::{
@@ -35,25 +34,18 @@ pub struct AccountHashingStage {
pub clean_threshold: u64,
/// The maximum number of accounts to process before committing.
pub commit_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental
/// update based on how many changesets exist.
pub prune_modes: PruneModes,
}
impl AccountHashingStage {
/// Create new instance of [AccountHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { clean_threshold, commit_threshold, prune_modes }
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
}
}
impl Default for AccountHashingStage {
fn default() -> Self {
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
prune_modes: PruneModes::default(),
}
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
}
}
@@ -151,19 +143,12 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
}
let (from_block, to_block) = input.next_block_range().into_inner();
let has_enough_changesets = self
.prune_modes
.prune_target_block_account_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset.
if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets
{
if to_block - from_block > self.clean_threshold || from_block == 1 {
let tx = provider.tx_ref();
let stage_checkpoint = input
.checkpoint
@@ -463,7 +448,6 @@ mod tests {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl AccountHashingTestRunner {
@@ -527,7 +511,6 @@ mod tests {
tx: TestTransaction::default(),
commit_threshold: 1000,
clean_threshold: 1000,
prune_modes: PruneModes::default(),
}
}
}
@@ -543,7 +526,6 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
prune_modes: self.prune_modes,
}
}
}

View File

@@ -14,7 +14,7 @@ use reth_primitives::{
CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId,
StorageHashingCheckpoint,
},
PruneModes, StorageEntry,
StorageEntry,
};
use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader};
use std::{collections::BTreeMap, fmt::Debug};
@@ -29,25 +29,18 @@ pub struct StorageHashingStage {
pub clean_threshold: u64,
/// The maximum number of slots to process before committing.
pub commit_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental
/// update based on how many changesets exist.
pub prune_modes: PruneModes,
}
impl StorageHashingStage {
/// Create new instance of [StorageHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { clean_threshold, commit_threshold, prune_modes }
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
}
}
impl Default for StorageHashingStage {
fn default() -> Self {
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
prune_modes: PruneModes::default(),
}
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
}
}
@@ -70,19 +63,12 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
}
let (from_block, to_block) = input.next_block_range().into_inner();
let has_enough_changesets = self
.prune_modes
.prune_target_block_storage_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset, along with their storages.
if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets
{
if to_block - from_block > self.clean_threshold || from_block == 1 {
let stage_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint());
@@ -471,17 +457,11 @@ mod tests {
tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl Default for StorageHashingTestRunner {
fn default() -> Self {
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
clean_threshold: 1000,
prune_modes: PruneModes::default(),
}
Self { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000 }
}
}
@@ -496,7 +476,6 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
prune_modes: self.prune_modes,
}
}
}

View File

@@ -10,7 +10,7 @@ use reth_primitives::{
hex,
stage::{EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, StageId},
trie::StoredSubNode,
BlockNumber, PruneModes, SealedHeader, H256,
BlockNumber, SealedHeader, H256,
};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
@@ -19,6 +19,10 @@ use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress};
use std::fmt::Debug;
use tracing::*;
/// The default threshold (in number of blocks) for switching from incremental trie building
/// of changes to whole rebuild.
pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 50_000;
/// The merkle hashing stage uses input from
/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
/// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes
@@ -47,9 +51,6 @@ pub enum MerkleStage {
/// The threshold (in number of blocks) for switching from incremental trie building
/// of changes to whole rebuild.
clean_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental root
/// update based on how many changesets exist.
prune_modes: PruneModes,
},
/// The unwind portion of the merkle stage.
Unwind,
@@ -57,13 +58,13 @@ pub enum MerkleStage {
/// Able to execute and unwind. Used for tests
#[cfg(any(test, feature = "test-utils"))]
#[allow(missing_docs)]
Both { clean_threshold: u64, prune_modes: PruneModes },
Both { clean_threshold: u64 },
}
impl MerkleStage {
/// Stage default for the [MerkleStage::Execution].
pub fn default_execution() -> Self {
Self::Execution { clean_threshold: 50_000, prune_modes: PruneModes::default() }
Self::Execution { clean_threshold: MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD }
}
/// Stage default for the [MerkleStage::Unwind].
@@ -72,8 +73,8 @@ impl MerkleStage {
}
/// Create new instance of [MerkleStage::Execution].
pub fn new_execution(clean_threshold: u64, prune_modes: PruneModes) -> Self {
Self::Execution { clean_threshold, prune_modes }
pub fn new_execution(clean_threshold: u64) -> Self {
Self::Execution { clean_threshold }
}
/// Check that the computed state root matches the root in the expected header.
@@ -131,26 +132,6 @@ impl MerkleStage {
}
Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
}
/// When pruning is enabled for account and storage history, we might not have all changesets
/// required for an incremental state root update on a pipeline re-run.
pub fn has_enough_changesets(
&self,
prune_modes: PruneModes,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<bool, StageError> {
Ok(prune_modes
.prune_target_block_account_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block &&
prune_modes
.prune_target_block_storage_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block)
}
}
#[async_trait::async_trait]
@@ -171,16 +152,14 @@ impl<DB: Database> Stage<DB> for MerkleStage {
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let (threshold, prune_modes) = match self {
let threshold = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
}
MerkleStage::Execution { clean_threshold, prune_modes } => {
(*clean_threshold, *prune_modes)
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(any(test, feature = "test-utils"))]
MerkleStage::Both { clean_threshold, prune_modes } => (*clean_threshold, *prune_modes),
MerkleStage::Both { clean_threshold } => *clean_threshold,
};
let range = input.next_block_range();
@@ -195,10 +174,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let mut checkpoint = self.get_execution_checkpoint(provider)?;
let (trie_root, entities_checkpoint) = if range.is_empty() {
(target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
} else if to_block - from_block > threshold ||
from_block == 1 ||
!self.has_enough_changesets(prune_modes, from_block, to_block)?
{
} else if to_block - from_block > threshold || from_block == 1 {
// if there are more blocks than threshold it is faster to rebuild the trie
let mut entities_checkpoint = if let Some(checkpoint) =
checkpoint.as_ref().filter(|c| c.target_block == to_block)
@@ -471,16 +447,11 @@ mod tests {
struct MerkleTestRunner {
tx: TestTransaction,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl Default for MerkleTestRunner {
fn default() -> Self {
Self {
tx: TestTransaction::default(),
clean_threshold: 10000,
prune_modes: PruneModes::default(),
}
Self { tx: TestTransaction::default(), clean_threshold: 10000 }
}
}
@@ -492,7 +463,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S::Both { clean_threshold: self.clean_threshold, prune_modes: self.prune_modes }
Self::S::Both { clean_threshold: self.clean_threshold }
}
}

View File

@@ -134,6 +134,7 @@ mod tests {
let mut execution_stage = ExecutionStage::new(
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())),
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
prune_modes,
);

View File

@@ -528,6 +528,10 @@ impl PostState {
std::mem::take(&mut self.storage_changes).inner.into_iter()
{
for (address, mut storage) in storage_changes.into_iter() {
if self.prune_modes.should_prune_storage_history(block_number, tip) {
continue
}
let storage_id = BlockNumberAddress((block_number, address));
// If the account was created and wiped at the same block, skip all storage changes
@@ -561,10 +565,6 @@ impl PostState {
}
}
if self.prune_modes.should_prune_storage_history(block_number, tip) {
continue
}
for (slot, old_value) in storage.storage {
tracing::trace!(target: "provider::post_state", ?storage_id, ?slot, ?old_value, "Storage changed");
storage_changeset_cursor.append_dup(