From 32e642d6b0af5a7a10ad485685373508968a04bb Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 15 Jun 2023 18:06:45 +0100 Subject: [PATCH] fix(stages): disable index history stages checkpoints (#3178) --- .../src/stages/index_account_history.rs | 213 +--------------- .../src/stages/index_storage_history.rs | 228 +----------------- 2 files changed, 14 insertions(+), 427 deletions(-) diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 2261da83a8..f20b9c5bea 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,13 +1,8 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError}; -use reth_primitives::{ - stage::{ - CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId, - }, - BlockNumber, -}; +use reth_db::database::Database; +use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_provider::DatabaseProviderRW; -use std::{fmt::Debug, ops::RangeInclusive}; +use std::fmt::Debug; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information @@ -44,27 +39,11 @@ impl Stage for IndexAccountHistoryStage { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - let mut stage_checkpoint = stage_checkpoint( - provider, - input.checkpoint(), - // It is important to provide the full block range into the checkpoint, - // not the one accounting for commit threshold, to get the correct range end. - &input.next_block_range(), - )?; - let indices = provider.get_account_transition_ids_from_changeset(range.clone())?; - let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::(); - // Insert changeset to history index provider.insert_account_history_index(indices)?; - stage_checkpoint.progress.processed += changesets; - - Ok(ExecOutput { - checkpoint: StageCheckpoint::new(*range.end()) - .with_index_history_stage_checkpoint(stage_checkpoint), - done: is_final_range, - }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) } /// Unwind the stage. @@ -76,70 +55,15 @@ impl Stage for IndexAccountHistoryStage { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - let changesets = provider.unwind_account_history_indices(range)?; - - let checkpoint = - if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() { - stage_checkpoint.progress.processed -= changesets as u64; - StageCheckpoint::new(unwind_progress) - .with_index_history_stage_checkpoint(stage_checkpoint) - } else { - StageCheckpoint::new(unwind_progress) - }; + provider.unwind_account_history_indices(range)?; // from HistoryIndex higher than that number. - Ok(UnwindOutput { checkpoint }) + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } } -/// The function proceeds as follows: -/// 1. It first checks if the checkpoint has an [IndexHistoryCheckpoint] that matches the given -/// block range. If it does, the function returns that checkpoint. -/// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates -/// a new [IndexHistoryCheckpoint] with the given block range and updates the progress with the -/// current progress. -/// 3. If none of the above conditions are met, it creates a new [IndexHistoryCheckpoint] with the -/// given block range and calculates the progress by counting the number of processed entries in the -/// [tables::AccountChangeSet] table within the given block range. -fn stage_checkpoint( - provider: &DatabaseProviderRW<'_, &DB>, - checkpoint: StageCheckpoint, - range: &RangeInclusive, -) -> Result { - Ok(match checkpoint.index_history_stage_checkpoint() { - Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. }) - if block_range == CheckpointBlockRange::from(range) => - { - stage_checkpoint - } - Some(IndexHistoryCheckpoint { block_range, progress }) - if block_range.to == checkpoint.block_number => - { - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange::from(range), - progress: EntitiesCheckpoint { - processed: progress.processed, - total: provider.tx_ref().entries::()? as u64, - }, - } - } - _ => IndexHistoryCheckpoint { - block_range: CheckpointBlockRange::from(range), - progress: EntitiesCheckpoint { - processed: provider - .tx_ref() - .cursor_read::()? - .walk_range(0..=checkpoint.block_number)? - .count() as u64, - total: provider.tx_ref().entries::()? as u64, - }, - }, - }) -} - #[cfg(test)] mod tests { - use assert_matches::assert_matches; use reth_provider::ProviderFactory; use std::collections::BTreeMap; @@ -213,18 +137,7 @@ mod tests { let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let mut provider = factory.provider_rw().unwrap(); let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, - progress: EntitiesCheckpoint { processed: 2, total: 2 } - } - ), - done: true - } - ); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); provider.commit().unwrap(); } @@ -437,116 +350,4 @@ mod tests { ]) ); } - - #[tokio::test] - async fn stage_checkpoint_range() { - // init - let test_tx = TestTransaction::default(); - - // setup - partial_setup(&test_tx); - - // run - { - let mut stage = IndexAccountHistoryStage { commit_threshold: 4 }; // Two runs required - let factory = ProviderFactory::new(&test_tx.tx, MAINNET.clone()); - let mut provider = factory.provider_rw().unwrap(); - - let mut input = ExecInput { target: Some(5), ..Default::default() }; - let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 1, to: 5 }, - progress: EntitiesCheckpoint { processed: 1, total: 2 } - } - ), - done: false - } - ); - input.checkpoint = Some(out.checkpoint); - - let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 5, to: 5 }, - progress: EntitiesCheckpoint { processed: 2, total: 2 } - } - ), - done: true - } - ); - - provider.commit().unwrap(); - } - - // verify - let table = cast(test_tx.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])])); - - // unwind - unwind(&test_tx, 5, 0).await; - - // verify initial state - let table = test_tx.table::().unwrap(); - assert!(table.is_empty()); - } - - #[test] - fn stage_checkpoint_recalculation() { - let tx = TestTransaction::default(); - - tx.commit(|tx| { - tx.put::( - 1, - AccountBeforeTx { - address: H160(hex!("0000000000000000000000000000000000000001")), - info: None, - }, - ) - .unwrap(); - tx.put::( - 1, - AccountBeforeTx { - address: H160(hex!("0000000000000000000000000000000000000002")), - info: None, - }, - ) - .unwrap(); - tx.put::( - 2, - AccountBeforeTx { - address: H160(hex!("0000000000000000000000000000000000000001")), - info: None, - }, - ) - .unwrap(); - tx.put::( - 2, - AccountBeforeTx { - address: H160(hex!("0000000000000000000000000000000000000002")), - info: None, - }, - ) - .unwrap(); - Ok(()) - }) - .unwrap(); - - let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); - let provider = factory.provider_rw().unwrap(); - - assert_matches!( - stage_checkpoint(&provider, StageCheckpoint::new(1), &(1..=2)).unwrap(), - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 1, to: 2 }, - progress: EntitiesCheckpoint { processed: 2, total: 4 } - } - ); - } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index e54d080b66..2566861de9 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,16 +1,8 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::{ - cursor::DbCursorRO, database::Database, models::BlockNumberAddress, tables, transaction::DbTx, - DatabaseError, -}; -use reth_primitives::{ - stage::{ - CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId, - }, - BlockNumber, -}; +use reth_db::{database::Database, models::BlockNumberAddress}; +use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_provider::DatabaseProviderRW; -use std::{fmt::Debug, ops::RangeInclusive}; +use std::fmt::Debug; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information @@ -47,26 +39,10 @@ impl Stage for IndexStorageHistoryStage { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - let mut stage_checkpoint = stage_checkpoint( - provider, - input.checkpoint(), - // It is important to provide the full block range into the checkpoint, - // not the one accounting for commit threshold, to get the correct range end. - &input.next_block_range(), - )?; - let indices = provider.get_storage_transition_ids_from_changeset(range.clone())?; - let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::(); - provider.insert_storage_history_index(indices)?; - stage_checkpoint.progress.processed += changesets; - - Ok(ExecOutput { - checkpoint: StageCheckpoint::new(*range.end()) - .with_index_history_stage_checkpoint(stage_checkpoint), - done: is_final_range, - }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) } /// Unwind the stage. @@ -78,71 +54,14 @@ impl Stage for IndexStorageHistoryStage { let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - let changesets = - provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?; + provider.unwind_storage_history_indices(BlockNumberAddress::range(range))?; - let checkpoint = - if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() { - stage_checkpoint.progress.processed -= changesets as u64; - StageCheckpoint::new(unwind_progress) - .with_index_history_stage_checkpoint(stage_checkpoint) - } else { - StageCheckpoint::new(unwind_progress) - }; - - Ok(UnwindOutput { checkpoint }) + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } } -/// The function proceeds as follows: -/// 1. It first checks if the checkpoint has an [IndexHistoryCheckpoint] that matches the given -/// block range. If it does, the function returns that checkpoint. -/// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates -/// a new [IndexHistoryCheckpoint] with the given block range and updates the progress with the -/// current progress. -/// 3. If none of the above conditions are met, it creates a new [IndexHistoryCheckpoint] with the -/// given block range and calculates the progress by counting the number of processed entries in the -/// [tables::StorageChangeSet] table within the given block range. -fn stage_checkpoint( - provider: &DatabaseProviderRW<'_, &DB>, - checkpoint: StageCheckpoint, - range: &RangeInclusive, -) -> Result { - Ok(match checkpoint.index_history_stage_checkpoint() { - Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. }) - if block_range == CheckpointBlockRange::from(range) => - { - stage_checkpoint - } - Some(IndexHistoryCheckpoint { block_range, progress }) - if block_range.to == checkpoint.block_number => - { - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange::from(range), - progress: EntitiesCheckpoint { - processed: progress.processed, - total: provider.tx_ref().entries::()? as u64, - }, - } - } - _ => IndexHistoryCheckpoint { - block_range: CheckpointBlockRange::from(range), - progress: EntitiesCheckpoint { - processed: provider - .tx_ref() - .cursor_read::()? - .walk_range(BlockNumberAddress::range(0..=checkpoint.block_number))? - .count() as u64, - total: provider.tx_ref().entries::()? as u64, - }, - }, - }) -} - #[cfg(test)] mod tests { - - use assert_matches::assert_matches; use reth_provider::ProviderFactory; use std::collections::BTreeMap; @@ -226,18 +145,7 @@ mod tests { let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let mut provider = factory.provider_rw().unwrap(); let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, - progress: EntitiesCheckpoint { processed: 2, total: 2 } - } - ), - done: true - } - ); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); provider.commit().unwrap(); } @@ -453,126 +361,4 @@ mod tests { ]) ); } - - #[tokio::test] - async fn stage_checkpoint_range() { - // init - let test_tx = TestTransaction::default(); - - // setup - partial_setup(&test_tx); - - // run - { - let mut stage = IndexStorageHistoryStage { commit_threshold: 4 }; // Two runs required - let factory = ProviderFactory::new(&test_tx.tx, MAINNET.clone()); - let mut provider = factory.provider_rw().unwrap(); - - let mut input = ExecInput { target: Some(5), ..Default::default() }; - let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 1, to: 5 }, - progress: EntitiesCheckpoint { processed: 1, total: 2 } - } - ), - done: false - } - ); - input.checkpoint = Some(out.checkpoint); - - let out = stage.execute(&mut provider, input).await.unwrap(); - assert_eq!( - out, - ExecOutput { - checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 5, to: 5 }, - progress: EntitiesCheckpoint { processed: 2, total: 2 } - } - ), - done: true - } - ); - - provider.commit().unwrap(); - } - - // verify - let table = cast(test_tx.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])])); - - // unwind - unwind(&test_tx, 5, 0).await; - - // verify initial state - let table = test_tx.table::().unwrap(); - assert!(table.is_empty()); - } - - #[test] - fn stage_checkpoint_recalculation() { - let tx = TestTransaction::default(); - - tx.commit(|tx| { - tx.put::( - BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000001" - ))), - ) - .unwrap(); - tx.put::( - BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000002" - ))), - ) - .unwrap(); - tx.put::( - BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000002")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000001" - ))), - ) - .unwrap(); - tx.put::( - BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000001" - ))), - ) - .unwrap(); - tx.put::( - BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000002" - ))), - ) - .unwrap(); - tx.put::( - BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000002")))), - storage(H256(hex!( - "0000000000000000000000000000000000000000000000000000000000000001" - ))), - ) - .unwrap(); - Ok(()) - }) - .unwrap(); - - let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); - let provider = factory.provider_rw().unwrap(); - - assert_matches!( - stage_checkpoint(&provider, StageCheckpoint::new(1), &(1..=2)).unwrap(), - IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { from: 1, to: 2 }, - progress: EntitiesCheckpoint { processed: 3, total: 6 } - } - ); - } }