diff --git a/Cargo.lock b/Cargo.lock index 303deff97f..0c2652425a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6475,7 +6475,10 @@ dependencies = [ name = "reth-chain-state" version = "1.0.3" dependencies = [ + "auto_impl", + "derive_more", "parking_lot 0.12.3", + "pin-project", "rand 0.8.5", "reth-chainspec", "reth-execution-types", @@ -6483,6 +6486,8 @@ dependencies = [ "reth-trie", "revm", "tokio", + "tokio-stream", + "tracing", ] [[package]] @@ -8090,11 +8095,9 @@ dependencies = [ "assert_matches", "auto_impl", "dashmap 6.0.1", - "derive_more", "itertools 0.13.0", "metrics", "parking_lot 0.12.3", - "pin-project", "rand 0.8.5", "rayon", "reth-blockchain-tree-api", @@ -8122,7 +8125,6 @@ dependencies = [ "strum", "tempfile", "tokio", - "tokio-stream", "tracing", ] diff --git a/crates/chain-state/Cargo.toml b/crates/chain-state/Cargo.toml index 0940a779dc..dd19778608 100644 --- a/crates/chain-state/Cargo.toml +++ b/crates/chain-state/Cargo.toml @@ -22,9 +22,16 @@ revm = { workspace = true, optional = true} # async tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } +tokio-stream = { workspace = true, features = ["sync"] } + +# tracing +tracing.workspace = true # misc +auto_impl.workspace = true +derive_more.workspace = true parking_lot.workspace = true +pin-project.workspace = true rand = { workspace = true, optional = true } [dev-dependencies] diff --git a/crates/chain-state/src/lib.rs b/crates/chain-state/src/lib.rs index e43e4bd57e..3eb4cfbe81 100644 --- a/crates/chain-state/src/lib.rs +++ b/crates/chain-state/src/lib.rs @@ -14,6 +14,13 @@ pub use in_memory::*; mod chain_info; pub use chain_info::ChainInfoTracker; +mod notifications; +pub use notifications::{ + CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, + CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream, + ForkChoiceSubscriptions, +}; + #[cfg(any(test, feature = "test-utils"))] /// Common test helpers pub mod test_utils; diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/chain-state/src/notifications.rs similarity index 98% rename from crates/storage/provider/src/traits/chain.rs rename to crates/chain-state/src/notifications.rs index 878e67a9f2..d0279b5bc8 100644 --- a/crates/storage/provider/src/traits/chain.rs +++ b/crates/chain-state/src/notifications.rs @@ -1,8 +1,8 @@ //! Canonical chain state notification trait and types. -use crate::{BlockReceipts, Chain}; use auto_impl::auto_impl; use derive_more::{Deref, DerefMut}; +use reth_execution_types::{BlockReceipts, Chain}; use reth_primitives::{SealedBlockWithSenders, SealedHeader}; use std::{ pin::Pin, @@ -61,7 +61,7 @@ impl Stream for CanonStateNotificationStream { } /// Chain action that is triggered when a new block is imported or old block is reverted. -/// and will return all [`crate::ExecutionOutcome`] and +/// and will return all `ExecutionOutcome` and /// [`reth_primitives::SealedBlockWithSenders`] of both reverted and committed blocks. #[derive(Clone, Debug)] pub enum CanonStateNotification { diff --git a/crates/chain-state/src/test_utils.rs b/crates/chain-state/src/test_utils.rs index f4a865692e..23c2bf71f5 100644 --- a/crates/chain-state/src/test_utils.rs +++ b/crates/chain-state/src/test_utils.rs @@ -1,12 +1,19 @@ -use crate::in_memory::ExecutedBlock; +use crate::{ + in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications, + CanonStateSubscriptions, +}; use rand::Rng; -use reth_execution_types::ExecutionOutcome; +use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{ Address, Block, BlockNumber, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, }; use reth_trie::{updates::TrieUpdates, HashedPostState}; use revm::db::BundleState; -use std::{ops::Range, sync::Arc}; +use std::{ + ops::Range, + sync::{Arc, Mutex}, +}; +use tokio::sync::broadcast::{self, Sender}; fn get_executed_block(block_number: BlockNumber, receipts: Receipts) -> ExecutedBlock { let mut block = Block::default(); @@ -50,3 +57,34 @@ pub fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBloc pub fn get_executed_blocks(range: Range) -> impl Iterator { range.map(get_executed_block_with_number) } + +/// A test `ChainEventSubscriptions` +#[derive(Clone, Debug, Default)] +pub struct TestCanonStateSubscriptions { + canon_notif_tx: Arc>>>, +} + +impl TestCanonStateSubscriptions { + /// Adds new block commit to the queue that can be consumed with + /// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`] + pub fn add_next_commit(&self, new: Arc) { + let event = CanonStateNotification::Commit { new }; + self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) + } + + /// Adds reorg to the queue that can be consumed with + /// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`] + pub fn add_next_reorg(&self, old: Arc, new: Arc) { + let event = CanonStateNotification::Reorg { old, new }; + self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) + } +} + +impl CanonStateSubscriptions for TestCanonStateSubscriptions { + fn subscribe_to_canonical_state(&self) -> CanonStateNotifications { + let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100); + self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx); + + canon_notif_rx + } +} diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 9ebe7f4cfb..c7e1721616 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -39,7 +39,6 @@ revm.workspace = true # async tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } -tokio-stream = { workspace = true, features = ["sync"] } # tracing tracing.workspace = true @@ -50,9 +49,7 @@ metrics.workspace = true # misc auto_impl.workspace = true -derive_more.workspace = true itertools.workspace = true -pin-project.workspace = true parking_lot.workspace = true dashmap = { workspace = true, features = ["inline"] } strum.workspace = true @@ -78,4 +75,9 @@ rand.workspace = true [features] optimism = ["reth-primitives/optimism", "reth-execution-types/optimism"] serde = ["reth-execution-types/serde"] -test-utils = ["alloy-rlp", "reth-db/test-utils", "reth-nippy-jar/test-utils"] +test-utils = [ + "alloy-rlp", + "reth-db/test-utils", + "reth-nippy-jar/test-utils", + "reth-chain-state/test-utils" +] diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index a578fa09d5..41b8c5438a 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -39,6 +39,11 @@ pub use bundle_state::{OriginalValuesKnown, StateChanges, StateReverts}; /// Writer standalone type. pub mod writer; +pub use reth_chain_state::{ + CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, + CanonStateNotifications, CanonStateSubscriptions, +}; + pub(crate) fn to_range>(bounds: R) -> std::ops::Range { let start = match bounds.start_bound() { std::ops::Bound::Included(&v) => v, diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 3a0bc1f59b..d7aa5b1517 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -1,18 +1,18 @@ use crate::{ AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, - EvmEnvProvider, FullExecutionDataProvider, HeaderProvider, ProviderError, - PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, - StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory, - TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider, + BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, ChainSpecProvider, + ChangeSetReader, DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider, + HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, + RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, + StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TreeViewer, + WithdrawalsProvider, }; use reth_blockchain_tree_api::{ error::{CanonicalError, InsertBlockError}, BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome, InsertPayloadOk, }; -use reth_chain_state::ChainInfoTracker; +use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ChainInfoTracker}; use reth_chainspec::{ChainInfo, ChainSpec}; use reth_db_api::{ database::Database, diff --git a/crates/storage/provider/src/test_utils/events.rs b/crates/storage/provider/src/test_utils/events.rs deleted file mode 100644 index 39e53772ca..0000000000 --- a/crates/storage/provider/src/test_utils/events.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast::{self, Sender}; - -use crate::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, Chain}; - -/// A test `ChainEventSubscriptions` -#[derive(Clone, Debug, Default)] -pub struct TestCanonStateSubscriptions { - canon_notif_tx: Arc>>>, -} - -impl TestCanonStateSubscriptions { - /// Adds new block commit to the queue that can be consumed with - /// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`] - pub fn add_next_commit(&self, new: Arc) { - let event = CanonStateNotification::Commit { new }; - self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) - } - - /// Adds reorg to the queue that can be consumed with - /// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`] - pub fn add_next_reorg(&self, old: Arc, new: Arc) { - let event = CanonStateNotification::Reorg { old, new }; - self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) - } -} - -impl CanonStateSubscriptions for TestCanonStateSubscriptions { - fn subscribe_to_canonical_state(&self) -> CanonStateNotifications { - let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100); - self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx); - - canon_notif_rx - } -} diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index 4d40ad54e9..2e43212cde 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -7,13 +7,12 @@ use reth_db::{ use std::sync::Arc; pub mod blocks; -mod events; mod mock; mod noop; -pub use events::TestCanonStateSubscriptions; pub use mock::{ExtendedAccount, MockEthProvider}; pub use noop::NoopProvider; +pub use reth_chain_state::test_utils::TestCanonStateSubscriptions; /// Creates test provider factory with mainnet chain spec. pub fn create_test_provider_factory() -> ProviderFactory>> { diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 8032a09392..1f0dfb4d16 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -3,6 +3,7 @@ use std::{ sync::Arc, }; +use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions}; use reth_chainspec::{ChainInfo, ChainSpec, MAINNET}; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; @@ -24,11 +25,10 @@ use crate::{ providers::StaticFileProvider, traits::{BlockSource, ReceiptProvider}, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - CanonStateNotifications, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - EvmEnvProvider, HeaderProvider, PruneCheckpointReader, ReceiptProviderIdExt, RequestsProvider, - StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, - StateRootProvider, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, - WithdrawalsProvider, + ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, PruneCheckpointReader, + ReceiptProviderIdExt, RequestsProvider, StageCheckpointReader, StateProvider, StateProviderBox, + StateProviderFactory, StateRootProvider, StaticFileProviderFactory, TransactionVariant, + TransactionsProvider, WithdrawalsProvider, }; /// Supports various api interfaces for testing purposes. diff --git a/crates/storage/provider/src/traits/full.rs b/crates/storage/provider/src/traits/full.rs index c53150560d..f47bd3efd2 100644 --- a/crates/storage/provider/src/traits/full.rs +++ b/crates/storage/provider/src/traits/full.rs @@ -1,10 +1,11 @@ //! Helper provider traits to encapsulate all provider traits for simplicity. use crate::{ - AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, - DatabaseProviderFactory, EvmEnvProvider, HeaderProvider, StageCheckpointReader, - StateProviderFactory, StaticFileProviderFactory, TransactionsProvider, + AccountReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, + EvmEnvProvider, HeaderProvider, StageCheckpointReader, StateProviderFactory, + StaticFileProviderFactory, TransactionsProvider, }; +use reth_chain_state::CanonStateSubscriptions; use reth_db_api::database::Database; /// Helper trait to unify all provider traits for simplicity. diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 466a9e2908..bd7507875c 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -18,13 +18,6 @@ pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider}; mod state; pub use state::StateWriter; -mod chain; -pub use chain::{ - CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, - CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream, - ForkChoiceSubscriptions, -}; - mod spec; pub use spec::ChainSpecProvider; diff --git a/crates/storage/provider/src/traits/tree_viewer.rs b/crates/storage/provider/src/traits/tree_viewer.rs index a8eea44a69..f75dbae24d 100644 --- a/crates/storage/provider/src/traits/tree_viewer.rs +++ b/crates/storage/provider/src/traits/tree_viewer.rs @@ -1,5 +1,6 @@ -use crate::{BlockchainTreePendingStateProvider, CanonStateSubscriptions}; +use crate::BlockchainTreePendingStateProvider; use reth_blockchain_tree_api::{BlockchainTreeEngine, BlockchainTreeViewer}; +use reth_chain_state::CanonStateSubscriptions; /// Helper trait to combine all the traits we need for the `BlockchainProvider` ///