From 017c9cea9c1685baaab925923d443c3e97769dde Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Sat, 17 Jun 2023 12:06:25 +0100 Subject: [PATCH] chore: move stage methods to `StageCheckpointProvider` and add `StageCheckpointWriter` (#3195) Co-authored-by: Georgios Konstantopoulos --- bin/reth/src/debug_cmd/execution.rs | 2 +- bin/reth/src/debug_cmd/merkle.rs | 2 +- bin/reth/src/node/mod.rs | 2 +- bin/reth/src/stage/run.rs | 2 +- crates/consensus/beacon/src/engine/mod.rs | 12 ++--- crates/stages/src/pipeline/mod.rs | 2 +- crates/stages/src/stages/merkle.rs | 4 +- crates/storage/provider/src/lib.rs | 4 +- .../provider/src/providers/database/mod.rs | 8 ++- .../src/providers/database/provider.rs | 49 ++++++++----------- crates/storage/provider/src/providers/mod.rs | 8 ++- .../storage/provider/src/test_utils/noop.rs | 8 ++- crates/storage/provider/src/traits/mod.rs | 2 +- .../provider/src/traits/stage_checkpoint.rs | 15 +++++- 14 files changed, 67 insertions(+), 53 deletions(-) diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 3aeab6911c..0865f5abbb 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -26,7 +26,7 @@ use reth_interfaces::{ use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; -use reth_provider::{ProviderFactory, StageCheckpointProvider}; +use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_stages::{ sets::DefaultStages, diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 1009fe6d86..4e7ee12d7b 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -9,7 +9,7 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, ChainSpec, }; -use reth_provider::{ProviderFactory, StageCheckpointProvider}; +use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b7b6b72156..3fa6c84784 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -42,7 +42,7 @@ use reth_network_api::NetworkInfo; use reth_primitives::{stage::StageId, BlockHashOrNumber, ChainSpec, Head, SealedHeader, H256}; use reth_provider::{ BlockHashProvider, BlockProvider, CanonStateSubscriptions, HeaderProvider, ProviderFactory, - StageCheckpointProvider, + StageCheckpointReader, }; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 4af0b00179..c7be9961f7 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_primitives::ChainSpec; -use reth_provider::{ProviderFactory, StageCheckpointProvider}; +use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 5082a608f6..a43729dabb 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -21,7 +21,7 @@ use reth_primitives::{ SealedHeader, H256, U256, }; use reth_provider::{ - BlockProvider, BlockSource, CanonChainTracker, ProviderError, StageCheckpointProvider, + BlockProvider, BlockSource, CanonChainTracker, ProviderError, StageCheckpointReader, }; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, @@ -210,7 +210,7 @@ pub struct BeaconConsensusEngine where DB: Database, Client: HeadersClient + BodiesClient, - BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointProvider, + BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointReader, { /// Controls syncing triggered by engine updates. sync: EngineSyncController, @@ -238,11 +238,7 @@ where impl BeaconConsensusEngine where DB: Database + Unpin + 'static, - BT: BlockchainTreeEngine - + BlockProvider - + CanonChainTracker - + StageCheckpointProvider - + 'static, + BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointReader + 'static, Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { /// Create a new instance of the [BeaconConsensusEngine]. @@ -1235,7 +1231,7 @@ where BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker - + StageCheckpointProvider + + StageCheckpointReader + Unpin + 'static, { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 4032b1bedf..14c78aac1c 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -6,7 +6,7 @@ use reth_primitives::{ constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId, BlockNumber, ChainSpec, H256, }; -use reth_provider::{ProviderFactory, StageCheckpointProvider}; +use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter}; use std::{pin::Pin, sync::Arc}; use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index bfb0344e43..af4802619d 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -12,7 +12,9 @@ use reth_primitives::{ trie::StoredSubNode, BlockNumber, SealedHeader, H256, }; -use reth_provider::{DatabaseProviderRW, HeaderProvider, ProviderError}; +use reth_provider::{ + DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, +}; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; use std::fmt::Debug; use tracing::*; diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 868f668453..00b4f022ab 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -16,8 +16,8 @@ pub use traits::{ BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, - StageCheckpointProvider, StateProvider, StateProviderBox, StateProviderFactory, - StateRootProvider, TransactionsProvider, WithdrawalsProvider, + StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox, + StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, }; /// Provider trait implementations. diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 4ba37607b7..f04fd9c683 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -2,7 +2,7 @@ use crate::{ providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, traits::{BlockSource, ReceiptProvider}, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, HeaderProvider, - ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider, + ProviderError, StageCheckpointReader, StateProviderBox, TransactionsProvider, WithdrawalsProvider, }; use reth_db::{database::Database, models::StoredBlockBodyIndices}; @@ -280,10 +280,14 @@ impl WithdrawalsProvider for ProviderFactory { } } -impl StageCheckpointProvider for ProviderFactory { +impl StageCheckpointReader for ProviderFactory { fn get_stage_checkpoint(&self, id: StageId) -> Result> { self.provider()?.get_stage_checkpoint(id) } + + fn get_stage_checkpoint_progress(&self, id: StageId) -> Result>> { + self.provider()?.get_stage_checkpoint_progress(id) + } } impl EvmEnvProvider for ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 3916969d76..e24d467042 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,9 +1,9 @@ use crate::{ insert_canonical_block, post_state::StorageChangeset, - traits::{AccountExtProvider, BlockSource, ReceiptProvider}, + traits::{AccountExtProvider, BlockSource, ReceiptProvider, StageCheckpointWriter}, AccountProvider, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, - HeaderProvider, PostState, ProviderError, StageCheckpointProvider, TransactionError, + HeaderProvider, PostState, ProviderError, StageCheckpointReader, TransactionError, TransactionsProvider, WithdrawalsProvider, }; use itertools::{izip, Itertools}; @@ -1072,32 +1072,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Save stage checkpoint. - pub fn save_stage_checkpoint( - &self, - id: StageId, - checkpoint: StageCheckpoint, - ) -> std::result::Result<(), DatabaseError> { - self.tx.put::(id.to_string(), checkpoint) - } - - /// Get stage checkpoint progress. - pub fn get_stage_checkpoint_progress( - &self, - id: StageId, - ) -> std::result::Result>, DatabaseError> { - self.tx.get::(id.to_string()) - } - - /// Save stage checkpoint progress. - pub fn save_stage_checkpoint_progress( - &self, - id: StageId, - checkpoint: Vec, - ) -> std::result::Result<(), DatabaseError> { - self.tx.put::(id.to_string(), checkpoint) - } - /// Query the block body by number. pub fn block_body_indices( &self, @@ -1865,8 +1839,25 @@ impl<'this, TX: DbTx<'this>> EvmEnvProvider for DatabaseProvider<'this, TX> { } } -impl<'this, TX: DbTx<'this>> StageCheckpointProvider for DatabaseProvider<'this, TX> { +impl<'this, TX: DbTx<'this>> StageCheckpointReader for DatabaseProvider<'this, TX> { fn get_stage_checkpoint(&self, id: StageId) -> Result> { Ok(self.tx.get::(id.to_string())?) } + + /// Get stage checkpoint progress. + fn get_stage_checkpoint_progress(&self, id: StageId) -> Result>> { + Ok(self.tx.get::(id.to_string())?) + } +} + +impl<'this, TX: DbTxMut<'this>> StageCheckpointWriter for DatabaseProvider<'this, TX> { + /// Save stage checkpoint progress. + fn save_stage_checkpoint_progress(&self, id: StageId, checkpoint: Vec) -> Result<()> { + Ok(self.tx.put::(id.to_string(), checkpoint)?) + } + + /// Save stage checkpoint. + fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint) -> Result<()> { + Ok(self.tx.put::(id.to_string(), checkpoint)?) + } } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 6502cacb31..94267675ac 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -2,7 +2,7 @@ use crate::{ BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError, - ReceiptProvider, StageCheckpointProvider, StateProviderBox, StateProviderFactory, + ReceiptProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionsProvider, WithdrawalsProvider, }; use reth_db::{database::Database, models::StoredBlockBodyIndices}; @@ -344,7 +344,7 @@ where } } -impl StageCheckpointProvider for BlockchainProvider +impl StageCheckpointReader for BlockchainProvider where DB: Database, Tree: Send + Sync, @@ -352,6 +352,10 @@ where fn get_stage_checkpoint(&self, id: StageId) -> Result> { self.database.provider()?.get_stage_checkpoint(id) } + + fn get_stage_checkpoint_progress(&self, id: StageId) -> Result>> { + self.database.provider()?.get_stage_checkpoint_progress(id) + } } impl EvmEnvProvider for BlockchainProvider diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index bd0ac1fea7..cf58499130 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -1,7 +1,7 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, - BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointProvider, + BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, }; @@ -311,10 +311,14 @@ impl StateProviderFactory for NoopProvider { } } -impl StageCheckpointProvider for NoopProvider { +impl StageCheckpointReader for NoopProvider { fn get_stage_checkpoint(&self, _id: StageId) -> Result> { Ok(None) } + + fn get_stage_checkpoint_progress(&self, _id: StageId) -> Result>> { + Ok(None) + } } impl WithdrawalsProvider for NoopProvider { diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 1c7c568071..d6564a87f2 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -46,4 +46,4 @@ pub use chain::{ }; mod stage_checkpoint; -pub use stage_checkpoint::StageCheckpointProvider; +pub use stage_checkpoint::{StageCheckpointReader, StageCheckpointWriter}; diff --git a/crates/storage/provider/src/traits/stage_checkpoint.rs b/crates/storage/provider/src/traits/stage_checkpoint.rs index a0b735f3ec..4610b26436 100644 --- a/crates/storage/provider/src/traits/stage_checkpoint.rs +++ b/crates/storage/provider/src/traits/stage_checkpoint.rs @@ -3,7 +3,20 @@ use reth_primitives::stage::{StageCheckpoint, StageId}; /// The trait for fetching stage checkpoint related data. #[auto_impl::auto_impl(&, Arc)] -pub trait StageCheckpointProvider: Send + Sync { +pub trait StageCheckpointReader: Send + Sync { /// Fetch the checkpoint for the given stage. fn get_stage_checkpoint(&self, id: StageId) -> Result>; + + /// Get stage checkpoint progress. + fn get_stage_checkpoint_progress(&self, id: StageId) -> Result>>; +} + +/// The trait for updating stage checkpoint related data. +#[auto_impl::auto_impl(&, Arc)] +pub trait StageCheckpointWriter: Send + Sync { + /// Save stage checkpoint. + fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint) -> Result<()>; + + /// Save stage checkpoint progress. + fn save_stage_checkpoint_progress(&self, id: StageId, checkpoint: Vec) -> Result<()>; }