feat(pruning): prune ChangeSets & History during pipeline (#3728)

This commit is contained in:
joshieDo
2023-07-31 15:36:03 +01:00
committed by GitHub
parent 1ac2f15f1d
commit 134fe81efb
19 changed files with 434 additions and 164 deletions

View File

@@ -760,12 +760,17 @@ impl Command {
.set(AccountHashingStage::new(
stage_config.account_hashing.clean_threshold,
stage_config.account_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(StorageHashingStage::new(
stage_config.storage_hashing.clean_threshold,
stage_config.storage_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(
stage_config.merkle.clean_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,

View File

@@ -86,14 +86,22 @@ async fn unwind_and_copy<DB: Database>(
// Bring hashes to TO
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
AccountHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();
let unwind_inner_tx = provider.into_tx();
@@ -124,6 +132,7 @@ async fn dry_run<DB: Database>(
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
prune_modes: Default::default(),
}
.execute(
&provider,

View File

@@ -208,12 +208,22 @@ impl Command {
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
}
StageEnum::StorageHashing => {
(Box::new(StorageHashingStage::new(1, batch_size)), None)
}
StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),

View File

@@ -79,7 +79,7 @@ pub use net::{
SEPOLIA_BOOTNODES,
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef};
pub use revm_primitives::JumpMap;
pub use serde_helper::JsonU256;

View File

@@ -5,5 +5,5 @@ mod target;
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::PrunePart;
pub use part::{PrunePart, PrunePartError};
pub use target::PruneModes;

View File

@@ -1,8 +1,10 @@
use derive_more::Display;
use reth_codecs::{main_codec, Compact};
use thiserror::Error;
/// Part of the data that can be pruned.
#[main_codec]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[derive(Debug, Display, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum PrunePart {
/// Prune part responsible for the `TxSenders` table.
SenderRecovery,
@@ -16,6 +18,14 @@ pub enum PrunePart {
StorageHistory,
}
/// PrunePart error type.
#[derive(Debug, Error)]
pub enum PrunePartError {
/// Invalid configuration of a prune part.
#[error("The configuration provided for {0} is invalid.")]
Configuration(PrunePart),
}
#[cfg(test)]
impl Default for PrunePart {
fn default() -> Self {

View File

@@ -1,4 +1,7 @@
use crate::{serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber, PruneMode};
use crate::{
prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber,
PruneMode, PrunePart,
};
use paste::paste;
use serde::{Deserialize, Serialize};
@@ -19,10 +22,16 @@ pub struct PruneModes {
)]
pub receipts: Option<PruneMode>,
/// Account History pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub account_history: Option<PruneMode>,
/// Storage History pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub storage_history: Option<PruneMode>,
}
@@ -51,12 +60,15 @@ macro_rules! impl_prune_parts {
$human_part,
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Option<(BlockNumber, PruneMode)> {
self.$part.as_ref().and_then(|mode| {
self.prune_target_block(mode, tip, $min_blocks).map(|block| {
(block, *mode)
})
})
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
match &self.$part {
Some(mode) =>
match self.prune_target_block(mode, tip, $min_blocks) {
Some(block) => Ok(Some((block, *mode))),
None => Err(PrunePartError::Configuration(PrunePart::[<$human_part>]))
}
None => Ok(None)
}
}
}
)+
@@ -107,17 +119,17 @@ impl PruneModes {
Some(tip.saturating_sub(*distance))
}
PruneMode::Before(n) if tip.saturating_sub(*n) >= min_blocks.unwrap_or_default() => {
Some(*n)
Some(n.saturating_sub(1))
}
_ => None,
}
}
impl_prune_parts!(
(sender_recovery, "Sender Recovery", None),
(transaction_lookup, "Transaction Lookup", None),
(sender_recovery, "SenderRecovery", None),
(transaction_lookup, "TransactionLookup", None),
(receipts, "Receipts", Some(64)),
(account_history, "Account History", None),
(storage_history, "Storage History", None)
(account_history, "AccountHistory", Some(64)),
(storage_history, "StorageHistory", Some(64))
);
}

View File

@@ -4,6 +4,9 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum PrunerError {
#[error(transparent)]
PrunePart(#[from] reth_primitives::PrunePartError),
#[error("Inconsistent data: {0}")]
InconsistentData(&'static str),

View File

@@ -72,13 +72,13 @@ impl<DB: Database> Pruner<DB> {
let provider = self.provider_factory.provider_rw()?;
if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_receipts(tip_block_number)
self.modes.prune_target_block_receipts(tip_block_number)?
{
self.prune_receipts(&provider, to_block, prune_mode)?;
}
if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_transaction_lookup(tip_block_number)
self.modes.prune_target_block_transaction_lookup(tip_block_number)?
{
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
}

View File

@@ -95,7 +95,7 @@ fn merkle(c: &mut Criterion) {
// don't need to run each stage for that many times
group.sample_size(10);
let stage = MerkleStage::Both { clean_threshold: u64::MAX };
let stage = MerkleStage::Both { clean_threshold: u64::MAX, prune_modes: Default::default() };
measure_stage(
&mut group,
setup::unwind_hashes,
@@ -104,7 +104,7 @@ fn merkle(c: &mut Criterion) {
"Merkle-incremental".to_string(),
);
let stage = MerkleStage::Both { clean_threshold: 0 };
let stage = MerkleStage::Both { clean_threshold: 0, prune_modes: Default::default() };
measure_stage(
&mut group,
setup::unwind_hashes,

View File

@@ -49,6 +49,9 @@ pub enum StageError {
#[source]
error: executor::BlockExecutionError,
},
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PrunePartError),
/// Invalid checkpoint passed to the stage
#[error("Invalid stage checkpoint: {0}")]
StageCheckpoint(u64),

View File

@@ -60,7 +60,7 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
/// The commit thresholds of the execution stage.
thresholds: ExecutionStageThresholds,
/// Pruning configuration.
prune_targets: PruneModes,
prune_modes: PruneModes,
}
impl<EF: ExecutorFactory> ExecutionStage<EF> {
@@ -68,9 +68,9 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
pub fn new(
executor_factory: EF,
thresholds: ExecutionStageThresholds,
prune_targets: PruneModes,
prune_modes: PruneModes,
) -> Self {
Self { metrics_tx: None, executor_factory, thresholds, prune_targets }
Self { metrics_tx: None, executor_factory, thresholds, prune_modes }
}
/// Create an execution stage with the provided executor factory.
@@ -110,7 +110,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Execute block range
let mut state = PostState::default();
state.add_prune_targets(self.prune_targets);
state.add_prune_modes(self.prune_modes);
for block_number in start_block..=max_block {
let td = provider
@@ -425,8 +425,7 @@ mod tests {
use reth_db::{models::AccountBeforeTx, test_utils::create_test_rw_db};
use reth_primitives::{
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, PruneMode, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET,
U256,
ChainSpecBuilder, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET, U256,
};
use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider};
use reth_revm::Factory;
@@ -894,86 +893,4 @@ mod tests {
]
);
}
#[tokio::test]
async fn test_prune() {
let test_tx = TestTransaction::default();
let factory = Arc::new(ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone()));
let provider = factory.provider_rw().unwrap();
let input = ExecInput {
target: Some(1),
/// The progress of this stage the last time it was executed.
checkpoint: None,
};
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
// insert pre state
let provider = factory.provider_rw().unwrap();
let code = hex!("5a465a905090036002900360015500");
let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("1000000000000000000000000000000000000000")),
Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
)
.unwrap();
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")),
Account {
nonce: 0,
balance: U256::from(0x3635c9adc5dea00000u128),
bytecode_hash: None,
},
)
.unwrap();
provider
.tx_ref()
.put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
.unwrap();
provider.commit().unwrap();
let check_pruning = |factory: Arc<ProviderFactory<_>>,
prune_targets: PruneModes,
expect_num_receipts: usize| async move {
let provider = factory.provider_rw().unwrap();
let mut execution_stage = ExecutionStage::new(
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())),
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
prune_targets,
);
execution_stage.execute(&provider, input).await.unwrap();
assert_eq!(
provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
expect_num_receipts
);
};
let mut prune = PruneModes::none();
check_pruning(factory.clone(), prune, 1).await;
prune.receipts = Some(PruneMode::Full);
check_pruning(factory.clone(), prune, 0).await;
prune.receipts = Some(PruneMode::Before(1));
check_pruning(factory.clone(), prune, 1).await;
prune.receipts = Some(PruneMode::Before(2));
check_pruning(factory.clone(), prune, 0).await;
prune.receipts = Some(PruneMode::Distance(0));
check_pruning(factory.clone(), prune, 1).await;
}
}

View File

@@ -15,6 +15,7 @@ use reth_primitives::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint,
StageId,
},
PruneModes,
};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter};
use std::{
@@ -34,18 +35,25 @@ pub struct AccountHashingStage {
pub clean_threshold: u64,
/// The maximum number of accounts to process before committing.
pub commit_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental
/// update based on how many changesets exist.
pub prune_modes: PruneModes,
}
impl AccountHashingStage {
/// Create new instance of [AccountHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { clean_threshold, commit_threshold, prune_modes }
}
}
impl Default for AccountHashingStage {
fn default() -> Self {
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
prune_modes: PruneModes::default(),
}
}
}
@@ -143,12 +151,19 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
}
let (from_block, to_block) = input.next_block_range().into_inner();
let has_enough_changesets = self
.prune_modes
.prune_target_block_account_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset.
if to_block - from_block > self.clean_threshold || from_block == 1 {
if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets
{
let tx = provider.tx_ref();
let stage_checkpoint = input
.checkpoint
@@ -448,6 +463,7 @@ mod tests {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl AccountHashingTestRunner {
@@ -511,6 +527,7 @@ mod tests {
tx: TestTransaction::default(),
commit_threshold: 1000,
clean_threshold: 1000,
prune_modes: PruneModes::default(),
}
}
}
@@ -526,6 +543,7 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
prune_modes: self.prune_modes,
}
}
}

View File

@@ -14,7 +14,7 @@ use reth_primitives::{
CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId,
StorageHashingCheckpoint,
},
StorageEntry,
PruneModes, StorageEntry,
};
use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader};
use std::{collections::BTreeMap, fmt::Debug};
@@ -29,18 +29,25 @@ pub struct StorageHashingStage {
pub clean_threshold: u64,
/// The maximum number of slots to process before committing.
pub commit_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental
/// update based on how many changesets exist.
pub prune_modes: PruneModes,
}
impl StorageHashingStage {
/// Create new instance of [StorageHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { clean_threshold, commit_threshold, prune_modes }
}
}
impl Default for StorageHashingStage {
fn default() -> Self {
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
prune_modes: PruneModes::default(),
}
}
}
@@ -63,12 +70,19 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
}
let (from_block, to_block) = input.next_block_range().into_inner();
let has_enough_changesets = self
.prune_modes
.prune_target_block_storage_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset, along with their storages.
if to_block - from_block > self.clean_threshold || from_block == 1 {
if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets
{
let stage_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint());
@@ -457,11 +471,17 @@ mod tests {
tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl Default for StorageHashingTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000 }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
clean_threshold: 1000,
prune_modes: PruneModes::default(),
}
}
}
@@ -476,6 +496,7 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
prune_modes: self.prune_modes,
}
}
}

View File

@@ -1,6 +1,9 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneModes,
};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter};
use std::fmt::Debug;
@@ -12,18 +15,20 @@ pub struct IndexAccountHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_modes: PruneModes,
}
impl IndexAccountHistoryStage {
/// Create new instance of [IndexAccountHistoryStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
Self { commit_threshold, prune_modes: PruneModes::default() }
}
}
impl Default for IndexAccountHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
Self { commit_threshold: 100_000, prune_modes: PruneModes::default() }
}
}
@@ -38,8 +43,16 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
async fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, _)) =
self.prune_modes.prune_target_block_account_history(input.target())?
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@@ -372,11 +385,16 @@ mod tests {
struct IndexAccountHistoryTestRunner {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for IndexAccountHistoryTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000 }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::default(),
}
}
}
@@ -388,7 +406,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold }
Self::S { commit_threshold: self.commit_threshold, prune_modes: self.prune_modes }
}
}

View File

@@ -1,6 +1,9 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneModes,
};
use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader};
use std::fmt::Debug;
@@ -12,18 +15,20 @@ pub struct IndexStorageHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_modes: PruneModes,
}
impl IndexStorageHistoryStage {
/// Create new instance of [IndexStorageHistoryStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
Self { commit_threshold, prune_modes: PruneModes::default() }
}
}
impl Default for IndexStorageHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
Self { commit_threshold: 100_000, prune_modes: PruneModes::default() }
}
}
@@ -38,8 +43,16 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
async fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, _)) =
self.prune_modes.prune_target_block_storage_history(input.target())?
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@@ -386,11 +399,16 @@ mod tests {
struct IndexStorageHistoryTestRunner {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for IndexStorageHistoryTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000 }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::default(),
}
}
}
@@ -402,7 +420,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold }
Self::S { commit_threshold: self.commit_threshold, prune_modes: self.prune_modes }
}
}

View File

@@ -10,7 +10,7 @@ use reth_primitives::{
hex,
stage::{EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, StageId},
trie::StoredSubNode,
BlockNumber, SealedHeader, H256,
BlockNumber, PruneModes, SealedHeader, H256,
};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
@@ -47,6 +47,9 @@ pub enum MerkleStage {
/// The threshold (in number of blocks) for switching from incremental trie building
/// of changes to whole rebuild.
clean_threshold: u64,
/// Prune mode configuration. Required to know if we can actually make an incremental root
/// update based on how many changesets exist.
prune_modes: PruneModes,
},
/// The unwind portion of the merkle stage.
Unwind,
@@ -54,13 +57,13 @@ pub enum MerkleStage {
/// Able to execute and unwind. Used for tests
#[cfg(any(test, feature = "test-utils"))]
#[allow(missing_docs)]
Both { clean_threshold: u64 },
Both { clean_threshold: u64, prune_modes: PruneModes },
}
impl MerkleStage {
/// Stage default for the [MerkleStage::Execution].
pub fn default_execution() -> Self {
Self::Execution { clean_threshold: 50_000 }
Self::Execution { clean_threshold: 50_000, prune_modes: PruneModes::default() }
}
/// Stage default for the [MerkleStage::Unwind].
@@ -69,8 +72,8 @@ impl MerkleStage {
}
/// Create new instance of [MerkleStage::Execution].
pub fn new_execution(clean_threshold: u64) -> Self {
Self::Execution { clean_threshold }
pub fn new_execution(clean_threshold: u64, prune_modes: PruneModes) -> Self {
Self::Execution { clean_threshold, prune_modes }
}
/// Check that the computed state root matches the root in the expected header.
@@ -128,6 +131,26 @@ impl MerkleStage {
}
Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
}
/// When pruning is enabled for account and storage history, we might not have all changesets
/// required for an incremental state root update on a pipeline re-run.
pub fn has_enough_changesets(
&self,
prune_modes: PruneModes,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<bool, StageError> {
Ok(prune_modes
.prune_target_block_account_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block &&
prune_modes
.prune_target_block_storage_history(to_block)?
.map(|(block_number, _)| block_number)
.unwrap_or_default() <
from_block)
}
}
#[async_trait::async_trait]
@@ -148,14 +171,16 @@ impl<DB: Database> Stage<DB> for MerkleStage {
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let threshold = match self {
let (threshold, prune_modes) = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
MerkleStage::Execution { clean_threshold, prune_modes } => {
(*clean_threshold, *prune_modes)
}
#[cfg(any(test, feature = "test-utils"))]
MerkleStage::Both { clean_threshold } => *clean_threshold,
MerkleStage::Both { clean_threshold, prune_modes } => (*clean_threshold, *prune_modes),
};
let range = input.next_block_range();
@@ -168,10 +193,12 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let target_block_root = target_block.state_root;
let mut checkpoint = self.get_execution_checkpoint(provider)?;
let (trie_root, entities_checkpoint) = if range.is_empty() {
(target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
} else if to_block - from_block > threshold || from_block == 1 {
} else if to_block - from_block > threshold ||
from_block == 1 ||
!self.has_enough_changesets(prune_modes, from_block, to_block)?
{
// if there are more blocks than threshold it is faster to rebuild the trie
let mut entities_checkpoint = if let Some(checkpoint) =
checkpoint.as_ref().filter(|c| c.target_block == to_block)
@@ -445,11 +472,16 @@ mod tests {
struct MerkleTestRunner {
tx: TestTransaction,
clean_threshold: u64,
prune_modes: PruneModes,
}
impl Default for MerkleTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), clean_threshold: 10000 }
Self {
tx: TestTransaction::default(),
clean_threshold: 10000,
prune_modes: PruneModes::default(),
}
}
}
@@ -461,7 +493,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S::Both { clean_threshold: self.clean_threshold }
Self::S::Both { clean_threshold: self.clean_threshold, prune_modes: self.prune_modes }
}
}

View File

@@ -35,3 +35,188 @@ pub use merkle::*;
pub use sender_recovery::*;
pub use total_difficulty::*;
pub use tx_lookup::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::{
stage::Stage,
stages::{ExecutionStage, IndexAccountHistoryStage, IndexStorageHistoryStage},
test_utils::TestTransaction,
ExecInput,
};
use reth_db::{
cursor::DbCursorRO,
mdbx::{cursor::Cursor, RW},
tables,
transaction::{DbTx, DbTxMut},
AccountHistory, DatabaseEnv,
};
use reth_interfaces::test_utils::generators::{self, random_block};
use reth_primitives::{
hex_literal::hex, keccak256, Account, Bytecode, ChainSpecBuilder, PruneMode, PruneModes,
SealedBlock, H160, MAINNET, U256,
};
use reth_provider::{
AccountExtReader, BlockWriter, DatabaseProviderRW, ProviderFactory, ReceiptProvider,
StorageReader,
};
use reth_revm::Factory;
use reth_rlp::Decodable;
use std::sync::Arc;
#[tokio::test]
async fn test_prune() {
let test_tx = TestTransaction::default();
let factory = Arc::new(ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone()));
let provider = factory.provider_rw().unwrap();
let tip = 66;
let input = ExecInput {
target: Some(tip),
/// The progress of this stage the last time it was executed.
checkpoint: None,
};
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
// Fill with bogus blocks to respect PruneMode distance.
let mut head = block.hash;
let mut rng = generators::rng();
for block_number in 2..=tip {
let nblock = random_block(&mut rng, block_number, Some(head), Some(0), Some(0));
head = nblock.hash;
provider.insert_block(nblock, None).unwrap();
}
provider.commit().unwrap();
// insert pre state
let provider = factory.provider_rw().unwrap();
let code = hex!("5a465a905090036002900360015500");
let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("1000000000000000000000000000000000000000")),
Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
)
.unwrap();
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")),
Account {
nonce: 0,
balance: U256::from(0x3635c9adc5dea00000u128),
bytecode_hash: None,
},
)
.unwrap();
provider
.tx_ref()
.put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
.unwrap();
provider.commit().unwrap();
let check_pruning = |factory: Arc<ProviderFactory<_>>,
prune_modes: PruneModes,
expect_num_receipts: usize,
expect_num_acc_changesets: usize,
expect_num_storage_changesets: usize| async move {
let provider: DatabaseProviderRW<'_, &DatabaseEnv> = factory.provider_rw().unwrap();
// Check execution and create receipts and changesets according to the pruning
// configuration
let mut execution_stage = ExecutionStage::new(
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())),
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
prune_modes,
);
execution_stage.execute(&provider, input).await.unwrap();
assert_eq!(
provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
expect_num_receipts
);
assert_eq!(
provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
expect_num_storage_changesets
);
assert_eq!(
provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
expect_num_acc_changesets
);
// Check AccountHistory
let mut acc_indexing_stage =
IndexAccountHistoryStage { prune_modes, ..Default::default() };
if let Some(PruneMode::Full) = prune_modes.account_history {
// Full is not supported
assert!(acc_indexing_stage.execute(&provider, input).await.is_err());
} else {
acc_indexing_stage.execute(&provider, input).await.unwrap();
let mut account_history: Cursor<'_, RW, AccountHistory> =
provider.tx_ref().cursor_read::<tables::AccountHistory>().unwrap();
assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
}
// Check StorageHistory
let mut storage_indexing_stage =
IndexStorageHistoryStage { prune_modes, ..Default::default() };
if let Some(PruneMode::Full) = prune_modes.storage_history {
// Full is not supported
assert!(acc_indexing_stage.execute(&provider, input).await.is_err());
} else {
storage_indexing_stage.execute(&provider, input).await.unwrap();
let mut storage_history =
provider.tx_ref().cursor_read::<tables::StorageHistory>().unwrap();
assert_eq!(
storage_history.walk(None).unwrap().count(),
expect_num_storage_changesets
);
}
};
// In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
// storage.
let mut prune = PruneModes::none();
check_pruning(factory.clone(), prune, 1, 3, 1).await;
prune.receipts = Some(PruneMode::Full);
prune.account_history = Some(PruneMode::Full);
prune.storage_history = Some(PruneMode::Full);
// This will result in error for account_history and storage_history, which is caught.
check_pruning(factory.clone(), prune, 0, 0, 0).await;
prune.receipts = Some(PruneMode::Before(1));
prune.account_history = Some(PruneMode::Before(1));
prune.storage_history = Some(PruneMode::Before(1));
check_pruning(factory.clone(), prune, 1, 3, 1).await;
prune.receipts = Some(PruneMode::Before(2));
prune.account_history = Some(PruneMode::Before(2));
prune.storage_history = Some(PruneMode::Before(2));
// The one account is the miner
check_pruning(factory.clone(), prune, 0, 1, 0).await;
prune.receipts = Some(PruneMode::Distance(66));
prune.account_history = Some(PruneMode::Distance(66));
prune.storage_history = Some(PruneMode::Distance(66));
check_pruning(factory.clone(), prune, 1, 3, 1).await;
prune.receipts = Some(PruneMode::Distance(64));
prune.account_history = Some(PruneMode::Distance(64));
prune.storage_history = Some(PruneMode::Distance(64));
// The one account is the miner
check_pruning(factory.clone(), prune, 0, 1, 0).await;
}
}

View File

@@ -79,7 +79,7 @@ pub struct PostState {
/// The receipt(s) of the executed transaction(s).
receipts: BTreeMap<BlockNumber, Vec<Receipt>>,
/// Pruning configuration.
prune_targets: PruneModes,
prune_modes: PruneModes,
}
impl PostState {
@@ -94,8 +94,8 @@ impl PostState {
}
/// Add a pruning configuration.
pub fn add_prune_targets(&mut self, prune_targets: PruneModes) {
self.prune_targets = prune_targets;
pub fn add_prune_modes(&mut self, prune_modes: PruneModes) {
self.prune_modes = prune_modes;
}
/// Return the current size of the poststate.
@@ -518,6 +518,7 @@ impl PostState {
pub fn write_history_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(
&mut self,
tx: &TX,
tip: BlockNumber,
) -> Result<(), DbError> {
// Write storage changes
tracing::trace!(target: "provider::post_state", "Writing storage changes");
@@ -560,6 +561,10 @@ impl PostState {
}
}
if self.prune_modes.should_prune_storage_history(block_number, tip) {
continue
}
for (slot, old_value) in storage.storage {
tracing::trace!(target: "provider::post_state", ?storage_id, ?slot, ?old_value, "Storage changed");
storage_changeset_cursor.append_dup(
@@ -576,6 +581,10 @@ impl PostState {
for (block_number, account_changes) in
std::mem::take(&mut self.account_changes).inner.into_iter()
{
if self.prune_modes.should_prune_account_history(block_number, tip) {
continue
}
for (address, info) in account_changes.into_iter() {
tracing::trace!(target: "provider::post_state", block_number, ?address, old = ?info, "Account changed");
account_changeset_cursor
@@ -592,7 +601,7 @@ impl PostState {
tx: &TX,
tip: BlockNumber,
) -> Result<(), DbError> {
self.write_history_to_db(tx)?;
self.write_history_to_db(tx, tip)?;
// Write new storage state
tracing::trace!(target: "provider::post_state", len = self.storage.len(), "Writing new storage state");
@@ -644,12 +653,12 @@ impl PostState {
// Write the receipts of the transactions if not pruned
tracing::trace!(target: "provider::post_state", len = self.receipts.len(), "Writing receipts");
if !self.receipts.is_empty() && self.prune_targets.receipts != Some(PruneMode::Full) {
if !self.receipts.is_empty() && self.prune_modes.receipts != Some(PruneMode::Full) {
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut receipts_cursor = tx.cursor_write::<tables::Receipts>()?;
for (block, receipts) in self.receipts {
if self.prune_targets.should_prune_receipts(block, tip) {
if self.prune_modes.should_prune_receipts(block, tip) {
continue
}