feat(stages): respect PruneModes in Index History stages (#4382)

This commit is contained in:
Alexey Shekhirin
2023-08-31 11:27:07 +01:00
committed by GitHub
parent cd71f689cd
commit edf31806d8
4 changed files with 234 additions and 38 deletions

View File

@@ -817,6 +817,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let factory = factory.with_stack_config(stack_config);
let prune_modes = prune_config.map(|prune| prune.parts).unwrap_or_default();
let header_mode =
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
let pipeline = builder
@@ -849,7 +851,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.clean_threshold
.max(stage_config.account_hashing.clean_threshold)
.max(stage_config.storage_hashing.clean_threshold),
prune_config.map(|prune| prune.parts).unwrap_or_default(),
prune_modes.clone(),
)
.with_metrics_tx(metrics_tx),
)
@@ -865,9 +867,11 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
prune_modes.clone(),
))
.set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold,
prune_modes,
)),
)
.build(db, self.chain.clone());

View File

@@ -1,33 +1,37 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PrunePart,
};
use reth_provider::{
AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader,
PruneCheckpointWriter,
};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
/// on index sharding take a look at [`reth_db::tables::AccountHistory`]
///
/// Pruning: we don't need to store and act on [`reth_primitives::PruneModes`],
/// because this stage indexes the already pruned account changesets generated by
/// [`crate::stages::ExecutionStage`].
#[derive(Debug)]
pub struct IndexAccountHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_modes: PruneModes,
}
impl IndexAccountHistoryStage {
/// Create new instance of [IndexAccountHistoryStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { commit_threshold, prune_modes }
}
}
impl Default for IndexAccountHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
Self { commit_threshold: 100_000, prune_modes: PruneModes::none() }
}
}
@@ -42,8 +46,29 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
async fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) =
self.prune_modes.prune_target_block_account_history(input.target())?
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::AccountHistory)?.is_none() {
provider.save_prune_checkpoint(
PrunePart::AccountHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@@ -98,7 +123,7 @@ mod tests {
generators,
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{hex_literal::hex, Address, BlockNumber, H160, H256, MAINNET};
use reth_primitives::{hex_literal::hex, Address, BlockNumber, PruneMode, H160, H256, MAINNET};
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
@@ -371,16 +396,76 @@ mod tests {
);
}
#[tokio::test]
async fn insert_index_with_prune_modes() {
// init
let tx = TestTransaction::default();
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockBodyIndices>(
0,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)
.unwrap();
tx.put::<tables::BlockBodyIndices>(
100,
StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
)
.unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::AccountChangeSet>(20, acc()).unwrap();
tx.put::<tables::AccountChangeSet>(36, acc()).unwrap();
tx.put::<tables::AccountChangeSet>(100, acc()).unwrap();
Ok(())
})
.unwrap();
// run
let input = ExecInput { target: Some(100), ..Default::default() };
let mut stage = IndexAccountHistoryStage {
prune_modes: PruneModes {
account_history: Some(PruneMode::Before(36)),
..Default::default()
},
..Default::default()
};
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let out = stage.execute(&provider, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(100), done: true });
provider.commit().unwrap();
// verify
let table = cast(tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])]));
// unwind
unwind(&tx, 100, 0).await;
// verify initial state
let table = tx.table::<tables::AccountHistory>().unwrap();
assert!(table.is_empty());
}
stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history);
struct IndexAccountHistoryTestRunner {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for IndexAccountHistoryTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000 }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
}
}
@@ -392,7 +477,10 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold }
Self::S {
commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(),
}
}
}

View File

@@ -1,33 +1,36 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PrunePart,
};
use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader,
};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
/// on index sharding take a look at [`reth_db::tables::StorageHistory`].
///
/// Pruning: we don't need to store and act on [`reth_primitives::PruneModes`],
/// because this stage indexes the already pruned storage changesets generated by
/// [`crate::stages::ExecutionStage`].
#[derive(Debug)]
pub struct IndexStorageHistoryStage {
/// Number of blocks after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
/// Pruning configuration.
pub prune_modes: PruneModes,
}
impl IndexStorageHistoryStage {
/// Create new instance of [IndexStorageHistoryStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self {
Self { commit_threshold, prune_modes }
}
}
impl Default for IndexStorageHistoryStage {
fn default() -> Self {
Self { commit_threshold: 100_000 }
Self { commit_threshold: 100_000, prune_modes: PruneModes::none() }
}
}
@@ -42,8 +45,29 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
async fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) =
self.prune_modes.prune_target_block_storage_history(input.target())?
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::StorageHistory)?.is_none() {
provider.save_prune_checkpoint(
PrunePart::StorageHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@@ -98,7 +122,7 @@ mod tests {
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{
hex_literal::hex, Address, BlockNumber, StorageEntry, H160, H256, MAINNET, U256,
hex_literal::hex, Address, BlockNumber, PruneMode, StorageEntry, H160, H256, MAINNET, U256,
};
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
@@ -385,16 +409,76 @@ mod tests {
);
}
#[tokio::test]
async fn insert_index_with_prune_modes() {
// init
let tx = TestTransaction::default();
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockBodyIndices>(
0,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)
.unwrap();
tx.put::<tables::BlockBodyIndices>(
100,
StoredBlockBodyIndices { tx_count: 5, ..Default::default() },
)
.unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::StorageChangeSet>(trns(20), storage(STORAGE_KEY)).unwrap();
tx.put::<tables::StorageChangeSet>(trns(36), storage(STORAGE_KEY)).unwrap();
tx.put::<tables::StorageChangeSet>(trns(100), storage(STORAGE_KEY)).unwrap();
Ok(())
})
.unwrap();
// run
let input = ExecInput { target: Some(100), ..Default::default() };
let mut stage = IndexStorageHistoryStage {
prune_modes: PruneModes {
storage_history: Some(PruneMode::Before(36)),
..Default::default()
},
..Default::default()
};
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let out = stage.execute(&provider, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(100), done: true });
provider.commit().unwrap();
// verify
let table = cast(tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100]),]));
// unwind
unwind(&tx, 100, 0).await;
// verify initial state
let table = tx.table::<tables::StorageHistory>().unwrap();
assert!(table.is_empty());
}
stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history);
struct IndexStorageHistoryTestRunner {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
prune_modes: PruneModes,
}
impl Default for IndexStorageHistoryTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000 }
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
}
}
@@ -406,7 +490,10 @@ mod tests {
}
fn stage(&self) -> Self::S {
Self::S { commit_threshold: self.commit_threshold }
Self::S {
commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(),
}
}
}

View File

@@ -135,7 +135,7 @@ mod tests {
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())),
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
prune_modes,
prune_modes.clone(),
);
execution_stage.execute(&provider, input).await.unwrap();
@@ -155,19 +155,36 @@ mod tests {
);
// Check AccountHistory
let mut acc_indexing_stage = IndexAccountHistoryStage::default();
acc_indexing_stage.execute(&provider, input).await.unwrap();
let mut account_history: Cursor<'_, RW, AccountHistory> =
provider.tx_ref().cursor_read::<tables::AccountHistory>().unwrap();
assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
let mut acc_indexing_stage =
IndexAccountHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() };
if let Some(PruneMode::Full) = prune_modes.account_history {
// Full is not supported
assert!(acc_indexing_stage.execute(&provider, input).await.is_err());
} else {
acc_indexing_stage.execute(&provider, input).await.unwrap();
let mut account_history: Cursor<'_, RW, AccountHistory> =
provider.tx_ref().cursor_read::<tables::AccountHistory>().unwrap();
assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
}
// Check StorageHistory
let mut storage_indexing_stage = IndexStorageHistoryStage::default();
storage_indexing_stage.execute(&provider, input).await.unwrap();
let mut storage_indexing_stage =
IndexStorageHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() };
let mut storage_history =
provider.tx_ref().cursor_read::<tables::StorageHistory>().unwrap();
assert_eq!(storage_history.walk(None).unwrap().count(), expect_num_storage_changesets);
if let Some(PruneMode::Full) = prune_modes.storage_history {
// Full is not supported
assert!(acc_indexing_stage.execute(&provider, input).await.is_err());
} else {
storage_indexing_stage.execute(&provider, input).await.unwrap();
let mut storage_history =
provider.tx_ref().cursor_read::<tables::StorageHistory>().unwrap();
assert_eq!(
storage_history.walk(None).unwrap().count(),
expect_num_storage_changesets
);
}
};
// In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed