From 087d0a63170b944dc70435c691ec54069ccfeea2 Mon Sep 17 00:00:00 2001 From: rakita Date: Mon, 10 Apr 2023 13:11:15 +0200 Subject: [PATCH] feat: CanonStateNotification for commit and reorg (#2156) --- Cargo.lock | 4 +- bin/reth/src/args/rpc_server_args.rs | 7 +- bin/reth/src/node/mod.rs | 9 +- crates/consensus/auto-seal/src/lib.rs | 21 +- crates/consensus/auto-seal/src/task.rs | 50 ++- crates/consensus/beacon/src/engine/error.rs | 9 +- crates/consensus/beacon/src/engine/mod.rs | 13 +- crates/executor/Cargo.toml | 1 + .../src/blockchain_tree/block_indices.rs | 3 +- crates/executor/src/blockchain_tree/chain.rs | 379 ++---------------- crates/executor/src/blockchain_tree/mod.rs | 117 +++--- .../src/blockchain_tree/post_state_data.rs | 3 +- .../executor/src/blockchain_tree/shareable.rs | 12 +- crates/interfaces/src/events.rs | 54 --- crates/interfaces/src/lib.rs | 3 - crates/interfaces/src/test_utils/events.rs | 29 -- crates/interfaces/src/test_utils/mod.rs | 2 - crates/miner/src/payload.rs | 2 +- crates/primitives/src/block.rs | 14 + crates/primitives/src/lib.rs | 4 +- crates/revm/revm-primitives/src/lib.rs | 3 + crates/rpc/rpc-builder/src/lib.rs | 18 +- crates/rpc/rpc-builder/tests/it/utils.rs | 7 +- crates/rpc/rpc/Cargo.toml | 1 + crates/rpc/rpc/src/eth/filter.rs | 8 +- crates/rpc/rpc/src/eth/logs_utils.rs | 30 +- crates/rpc/rpc/src/eth/pubsub.rs | 69 ++-- crates/storage/provider/Cargo.toml | 8 +- crates/storage/provider/src/chain.rs | 375 +++++++++++++++++ crates/storage/provider/src/lib.rs | 10 +- crates/storage/provider/src/post_state.rs | 12 +- crates/storage/provider/src/providers/mod.rs | 2 +- .../storage/provider/src/test_utils/events.rs | 35 ++ .../storage/provider/src/test_utils/mock.rs | 2 +- crates/storage/provider/src/test_utils/mod.rs | 2 + .../storage/provider/src/test_utils/noop.rs | 2 +- crates/storage/provider/src/traits/chain.rs | 86 ++++ crates/storage/provider/src/traits/evm_env.rs | 4 +- crates/storage/provider/src/traits/mod.rs | 6 + crates/storage/provider/src/transaction.rs | 2 +- 40 files changed, 801 insertions(+), 617 deletions(-) delete mode 100644 crates/interfaces/src/events.rs delete mode 100644 crates/interfaces/src/test_utils/events.rs create mode 100644 crates/storage/provider/src/chain.rs create mode 100644 crates/storage/provider/src/test_utils/events.rs create mode 100644 crates/storage/provider/src/traits/chain.rs diff --git a/Cargo.lock b/Cargo.lock index ec36e81210..c9dce1f6fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4734,6 +4734,7 @@ name = "reth-executor" version = "0.1.0" dependencies = [ "aquamarine", + "assert_matches", "auto_impl", "hash-db", "parking_lot 0.12.1", @@ -5005,8 +5006,8 @@ dependencies = [ "reth-revm-primitives", "reth-rlp", "reth-tracing", - "revm-primitives", "thiserror", + "tokio", "tracing", "triehash", ] @@ -5109,6 +5110,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util 0.7.7", "tower", "tracing", ] diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 78eca92d69..6b1e123994 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -3,10 +3,11 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; use jsonrpsee::server::ServerHandle; -use reth_interfaces::events::ChainEventSubscriptions; use reth_network_api::{NetworkInfo, Peers}; use reth_primitives::ChainSpec; -use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; +use reth_provider::{ + BlockProvider, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, StateProviderFactory, +}; use reth_rpc::{JwtError, JwtSecret}; use reth_rpc_builder::{ constants, error::RpcError, IpcServerBuilder, RethRpcModule, RpcModuleSelection, @@ -134,7 +135,7 @@ impl RpcServerArgs { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { reth_rpc_builder::launch( client, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index abdea40fcd..25fd4cd78b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -30,7 +30,6 @@ use reth_executor::blockchain_tree::{ }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, - events::NewBlockNotificationSink, p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, headers::{client::StatusUpdater, downloader::HeaderDownloader}, @@ -231,11 +230,11 @@ impl Command { let tree_config = BlockchainTreeConfig::default(); // The size of the broadcast is twice the maximum reorg depth, because at maximum reorg // depth at least N blocks must be sent at once. - let new_block_notification_sender = - NewBlockNotificationSink::new(tree_config.max_reorg_depth() as usize * 2); + let (canon_state_notification_sender, _receiver) = + tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2); let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new( tree_externals, - new_block_notification_sender.clone(), + canon_state_notification_sender.clone(), tree_config, )?); @@ -246,7 +245,7 @@ impl Command { shareable_db.clone(), transaction_pool.clone(), consensus_engine_tx.clone(), - new_block_notification_sender.clone(), + canon_state_notification_sender, ) .build(); diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index 5a1ca7a68f..d0418df51b 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -20,6 +20,7 @@ use reth_primitives::{ BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock, SealedHeader, H256, U256, }; +use reth_provider::CanonStateNotificationSender; use reth_transaction_pool::TransactionPool; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -31,7 +32,6 @@ mod task; pub use crate::client::AutoSealClient; pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner}; -use reth_interfaces::events::NewBlockNotificationSink; pub use task::MiningTask; /// A consensus implementation intended for local development and testing purposes. @@ -83,7 +83,7 @@ pub struct AutoSealBuilder { mode: MiningMode, storage: Storage, to_engine: UnboundedSender, - new_block_notification_sender: NewBlockNotificationSink, + canon_state_notification: CanonStateNotificationSender, } // === impl AutoSealBuilder === @@ -95,7 +95,7 @@ impl AutoSealBuilder { client: Client, pool: Pool, to_engine: UnboundedSender, - new_block_notification_sender: NewBlockNotificationSink, + canon_state_notification: CanonStateNotificationSender, ) -> Self { let mode = MiningMode::interval(std::time::Duration::from_secs(1)); Self { @@ -105,7 +105,7 @@ impl AutoSealBuilder { pool, mode, to_engine, - new_block_notification_sender, + canon_state_notification, } } @@ -117,21 +117,14 @@ impl AutoSealBuilder { /// Consumes the type and returns all components pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask) { - let Self { - client, - consensus, - pool, - mode, - storage, - to_engine, - new_block_notification_sender, - } = self; + let Self { client, consensus, pool, mode, storage, to_engine, canon_state_notification } = + self; let auto_client = AutoSealClient::new(storage.clone()); let task = MiningTask::new( Arc::clone(&consensus.chain_spec), mode, to_engine, - new_block_notification_sender, + canon_state_notification, storage, client, pool, diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 5577c22ee4..73da2c9702 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -1,13 +1,13 @@ use crate::{mode::MiningMode, Storage}; use futures_util::{future::BoxFuture, FutureExt, StreamExt}; use reth_beacon_consensus::BeaconEngineMessage; -use reth_interfaces::{consensus::ForkchoiceState, events::NewBlockNotificationSink}; +use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{ constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS}, proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, - EMPTY_OMMER_ROOT, U256, + SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256, }; -use reth_provider::StateProviderFactory; +use reth_provider::{CanonStateNotificationSender, Chain, StateProviderFactory}; use reth_revm::{ database::{State, SubState}, executor::Executor, @@ -45,7 +45,7 @@ pub struct MiningTask { /// TODO: ideally this would just be a sender of hashes to_engine: UnboundedSender, /// Used to notify consumers of new blocks - new_block_notification_sender: NewBlockNotificationSink, + canon_state_notification: CanonStateNotificationSender, /// The pipeline events to listen on pipe_line_events: Option>, } @@ -58,7 +58,7 @@ impl MiningTask { chain_spec: Arc, miner: MiningMode, to_engine: UnboundedSender, - new_block_notification_sender: NewBlockNotificationSink, + canon_state_notification: CanonStateNotificationSender, storage: Storage, client: Client, pool: Pool, @@ -71,7 +71,7 @@ impl MiningTask { storage, pool, to_engine, - new_block_notification_sender, + canon_state_notification, queued: Default::default(), pipe_line_events: None, } @@ -116,7 +116,7 @@ where let chain_spec = Arc::clone(&this.chain_spec); let pool = this.pool.clone(); let mut events = this.pipe_line_events.take(); - let new_block_notification_sender = this.new_block_notification_sender.clone(); + let canon_state_notification = this.canon_state_notification.clone(); // Create the mining future that creates a block, notifies the engine that drives // the pipeline @@ -165,24 +165,30 @@ where trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions"); - match executor.execute_transactions(&block, U256::ZERO, None) { - Ok((res, gas_used)) => { + let senders = block + .body + .iter() + .map(|tx| tx.recover_signer()) + .collect::>>()?; + + match executor.execute_transactions(&block, U256::ZERO, Some(senders.clone())) { + Ok((post_state, gas_used)) => { let Block { mut header, body, .. } = block; // clear all transactions from pool pool.remove_transactions(body.iter().map(|tx| tx.hash)); - header.receipts_root = if res.receipts().is_empty() { + header.receipts_root = if post_state.receipts().is_empty() { EMPTY_RECEIPTS } else { - let receipts_with_bloom = res + let receipts_with_bloom = post_state .receipts() .iter() .map(|r| r.clone().into()) .collect::>(); proofs::calculate_receipt_root(receipts_with_bloom.iter()) }; - + let transactions = body.clone(); let body = BlockBody { transactions: body, ommers: vec![], withdrawals: None }; header.gas_used = gas_used; @@ -223,11 +229,25 @@ where } } - let header = header.seal_slow(); - debug!(target: "consensus::auto", header=?header.hash(), "sending block notification"); + // seal the block + let block = Block { + header, + body: transactions, + ommers: vec![], + withdrawals: None, + }; + let sealed_block = block.seal_slow(); + let sealed_block_with_senders = + SealedBlockWithSenders::new(sealed_block, senders) + .expect("senders are valid"); + debug!(target: "consensus::auto", header=?sealed_block_with_senders.hash(), "sending block notification"); + + let chain = + Arc::new(Chain::new(vec![(sealed_block_with_senders, post_state)])); // send block notification - let _ = new_block_notification_sender.send(Arc::new(header)); + let _ = canon_state_notification + .send(reth_provider::CanonStateNotification::Commit { new: chain }); } Err(err) => { warn!(target: "consensus::auto", ?err, "failed to execute block") diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index aa6cea12f1..0ee864a757 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -31,12 +31,19 @@ pub enum BeaconEngineError { PayloadBuilderError(#[from] PayloadBuilderError), /// Pipeline error. #[error(transparent)] - Pipeline(#[from] PipelineError), + Pipeline(#[from] Box), /// Common error. Wrapper around [reth_interfaces::Error]. #[error(transparent)] Common(#[from] reth_interfaces::Error), } +// box the pipeline error as it is a large enum. +impl From for BeaconEngineError { + fn from(e: PipelineError) -> Self { + Self::Pipeline(Box::new(e)) + } +} + // for convenience in the beacon engine impl From for BeaconEngineError { fn from(e: reth_interfaces::db::Error) -> Self { diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 177f9244bc..bc433aa7de 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -568,9 +568,7 @@ mod tests { post_state::PostState, test_utils::TestExecutorFactory, }; - use reth_interfaces::{ - events::NewBlockNotificationSink, sync::NoopSyncStateUpdate, test_utils::TestConsensus, - }; + use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus}; use reth_miner::TestPayloadStore; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::Transaction; @@ -653,8 +651,9 @@ mod tests { // Setup blockchain tree let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); let config = BlockchainTreeConfig::new(1, 2, 3); + let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); let tree = ShareableBlockchainTree::new( - BlockchainTree::new(externals, NewBlockNotificationSink::new(2), config) + BlockchainTree::new(externals, canon_state_notification_sender, config) .expect("failed to create tree"), ); @@ -709,7 +708,7 @@ mod tests { .await; assert_matches!( rx.await, - Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -747,7 +746,7 @@ mod tests { .await; assert_matches!( rx.await, - Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -782,7 +781,7 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed)))) + Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 7f3f04524b..8a15ccd99e 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -46,6 +46,7 @@ reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-primitives = { path = "../primitives", features = ["test-utils"] } reth-provider = { path = "../storage/provider", features = ["test-utils"] } parking_lot = "0.12" +assert_matches = "1.5" [features] test-utils = [] diff --git a/crates/executor/src/blockchain_tree/block_indices.rs b/crates/executor/src/blockchain_tree/block_indices.rs index 888f32b4c4..fb98a1d4ab 100644 --- a/crates/executor/src/blockchain_tree/block_indices.rs +++ b/crates/executor/src/blockchain_tree/block_indices.rs @@ -1,7 +1,8 @@ //! Implementation of [`BlockIndices`] related to [`super::BlockchainTree`] -use super::chain::{BlockChainId, Chain}; +use super::chain::BlockChainId; use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders}; +use reth_provider::Chain; use std::collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}; /// Internal indices of the blocks and chains. diff --git a/crates/executor/src/blockchain_tree/chain.rs b/crates/executor/src/blockchain_tree/chain.rs index 5aec1fbbd5..e205defd81 100644 --- a/crates/executor/src/blockchain_tree/chain.rs +++ b/crates/executor/src/blockchain_tree/chain.rs @@ -6,122 +6,52 @@ use crate::{blockchain_tree::PostStateDataRef, post_state::PostState}; use reth_db::database::Database; use reth_interfaces::{consensus::Consensus, executor::Error as ExecError, Error}; use reth_primitives::{ - BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders, SealedHeader, U256, + BlockHash, BlockNumber, ForkBlock, SealedBlockWithSenders, SealedHeader, U256, }; use reth_provider::{ - providers::PostStateProvider, BlockExecutor, ExecutorFactory, PostStateDataProvider, + providers::PostStateProvider, BlockExecutor, Chain, ExecutorFactory, PostStateDataProvider, StateProviderFactory, }; -use std::collections::BTreeMap; +use std::{ + collections::BTreeMap, + ops::{Deref, DerefMut}, +}; use super::externals::TreeExternals; /// The ID of a sidechain internally in a [`BlockchainTree`][super::BlockchainTree]. pub(crate) type BlockChainId = u64; -/// A side chain. -/// -/// The sidechain contains the state of accounts after execution of its blocks, -/// changesets for those blocks (and their transactions), as well as the blocks themselves. -/// -/// Each chain in the tree are identified using a unique ID. +/// A chain if the blockchain tree, that has functionality to execute blocks and append them to the +/// it self. #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct Chain { - /// The state of accounts after execution of the blocks in this chain. - /// - /// This state also contains the individual changes that lead to the current state. - state: PostState, - /// The blocks in this chain. - blocks: BTreeMap, - /// A mapping of each block number in the chain to the highest transition ID in the chain's - /// state after execution of the block. - /// - /// This is used to revert changes in the state until a certain block number when the chain is - /// split. - block_transitions: BTreeMap, +pub struct AppendableChain { + chain: Chain, } -/// Block number and hash of the forked block. -pub type ForkBlock = BlockNumHash; +impl Deref for AppendableChain { + type Target = Chain; -impl Chain { - /// Get the blocks in this chain. - pub fn blocks(&self) -> &BTreeMap { - &self.blocks + fn deref(&self) -> &Self::Target { + &self.chain + } +} + +impl DerefMut for AppendableChain { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.chain + } +} + +impl AppendableChain { + /// Crate a new appendable chain from a given chain. + pub fn new(chain: Chain) -> Self { + Self { chain } } - /// Get post state of this chain - pub fn state(&self) -> &PostState { - &self.state - } - - /// Return block number of the block hash. - pub fn block_number(&self, block_hash: BlockHash) -> Option { - self.blocks.iter().find_map(|(num, block)| (block.hash() == block_hash).then_some(*num)) - } - - /// Return post state of the block at the `block_number` or None if block is not known - pub fn state_at_block(&self, block_number: BlockNumber) -> Option { - let mut state = self.state.clone(); - if self.tip().number == block_number { - return Some(state) - } - - if let Some(&transition_id) = self.block_transitions.get(&block_number) { - state.revert_to(transition_id); - return Some(state) - } - - None - } - - /// Destructure the chain into its inner components, the blocks and the state. - pub fn into_inner(self) -> (BTreeMap, PostState) { - (self.blocks, self.state) - } - - /// Get the block at which this chain forked. - pub fn fork_block(&self) -> ForkBlock { - let tip = self.first(); - ForkBlock { number: tip.number.saturating_sub(1), hash: tip.parent_hash } - } - - /// Get the block number at which this chain forked. - pub fn fork_block_number(&self) -> BlockNumber { - self.first().number.saturating_sub(1) - } - - /// Get the block hash at which this chain forked. - pub fn fork_block_hash(&self) -> BlockHash { - self.first().parent_hash - } - - /// Get the first block in this chain. - pub fn first(&self) -> &SealedBlockWithSenders { - self.blocks.first_key_value().expect("Chain has at least one block for first").1 - } - - /// Get the tip of the chain. - /// - /// # Note - /// - /// Chains always have at least one block. - pub fn tip(&self) -> &SealedBlockWithSenders { - self.blocks.last_key_value().expect("Chain should have at least one block").1 - } - - /// Create new chain with given blocks and post state. - pub fn new(blocks: Vec<(SealedBlockWithSenders, PostState)>) -> Self { - let mut state = PostState::default(); - let mut block_transitions = BTreeMap::new(); - let mut block_num_hash = BTreeMap::new(); - for (block, block_state) in blocks.into_iter() { - state.extend(block_state); - block_transitions.insert(block.number, state.transitions_count()); - block_num_hash.insert(block.number, block); - } - - Self { state, block_transitions, blocks: block_num_hash } + /// Get the chain. + pub fn into_inner(self) -> Chain { + self.chain } /// Create a new chain that forks off of the canonical chain. @@ -155,7 +85,7 @@ impl Chain { externals, )?; - Ok(Self::new(vec![(block.clone(), changeset)])) + Ok(Self { chain: Chain::new(vec![(block.clone(), changeset)]) }) } /// Create a new chain that forks off of an existing sidechain. @@ -174,15 +104,15 @@ impl Chain { { let parent_number = block.number - 1; let parent = self - .blocks + .blocks() .get(&parent_number) .ok_or(ExecError::BlockNumberNotFoundInChain { block_number: parent_number })?; let revert_to_transition_id = self - .block_transitions + .block_transitions() .get(&parent.number) .expect("Should have the transition ID for the parent block"); - let mut state = self.state.clone(); + let mut state = self.chain.state().clone(); // Revert state to the state after execution of the parent block state.revert_to(*revert_to_transition_id); @@ -204,9 +134,11 @@ impl Chain { state.extend(block_state); let chain = Self { - block_transitions: BTreeMap::from([(block.number, state.transitions_count())]), - state, - blocks: BTreeMap::from([(block.number, block)]), + chain: Chain { + block_transitions: BTreeMap::from([(block.number, state.transitions_count())]), + state, + blocks: BTreeMap::from([(block.number, block)]), + }, }; // If all is okay, return new chain back. Present chain is not modified. @@ -277,238 +209,9 @@ impl Chain { externals, )?; self.state.extend(block_state); - self.block_transitions.insert(block.number, self.state.transitions_count()); + let transition_count = self.state.transitions_count(); + self.block_transitions.insert(block.number, transition_count); self.blocks.insert(block.number, block); Ok(()) } - - /// Merge two chains by appending the given chain into the current one. - /// - /// The state of accounts for this chain is set to the state of the newest chain. - pub fn append_chain(&mut self, chain: Chain) -> Result<(), Error> { - let chain_tip = self.tip(); - if chain_tip.hash != chain.fork_block_hash() { - return Err(ExecError::AppendChainDoesntConnect { - chain_tip: chain_tip.num_hash(), - other_chain_fork: chain.fork_block().into_components(), - } - .into()) - } - - // Insert blocks from other chain - self.blocks.extend(chain.blocks.into_iter()); - let current_transition_count = self.state.transitions_count(); - self.state.extend(chain.state); - - // Update the block transition mapping, shifting the transition ID by the current number of - // transitions in *this* chain - for (block_number, transition_id) in chain.block_transitions.iter() { - self.block_transitions.insert(*block_number, transition_id + current_transition_count); - } - Ok(()) - } - - /// Split this chain at the given block. - /// - /// The given block will be the first block in the first returned chain. - /// - /// If the given block is not found, [`ChainSplit::NoSplitPending`] is returned. - /// Split chain at the number or hash, block with given number will be included at first chain. - /// If any chain is empty (Does not have blocks) None will be returned. - /// - /// # Note - /// - /// The block number to transition ID mapping is only found in the second chain, making it - /// impossible to perform any state reverts on the first chain. - /// - /// The second chain only contains the changes that were reverted on the first chain; however, - /// it retains the up to date state as if the chains were one, i.e. the second chain is an - /// extension of the first. - pub fn split(mut self, split_at: SplitAt) -> ChainSplit { - let chain_tip = *self.blocks.last_entry().expect("chain is never empty").key(); - let block_number = match split_at { - SplitAt::Hash(block_hash) => { - let Some(block_number) = self.block_number(block_hash) else { return ChainSplit::NoSplitPending(self)}; - // If block number is same as tip whole chain is becoming canonical. - if block_number == chain_tip { - return ChainSplit::NoSplitCanonical(self) - } - block_number - } - SplitAt::Number(block_number) => { - if block_number >= chain_tip { - return ChainSplit::NoSplitCanonical(self) - } - if block_number < *self.blocks.first_entry().expect("chain is never empty").key() { - return ChainSplit::NoSplitPending(self) - } - block_number - } - }; - - let higher_number_blocks = self.blocks.split_off(&(block_number + 1)); - - let mut canonical_state = std::mem::take(&mut self.state); - let new_state = canonical_state.split_at( - *self.block_transitions.get(&block_number).expect("Unknown block transition ID"), - ); - self.state = new_state; - - ChainSplit::Split { - canonical: Chain { - state: canonical_state, - block_transitions: BTreeMap::new(), - blocks: self.blocks, - }, - pending: Chain { - state: self.state, - block_transitions: self.block_transitions, - blocks: higher_number_blocks, - }, - } - } -} - -/// Used in spliting the chain. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum SplitAt { - /// Split at block number. - Number(BlockNumber), - /// Split at block hash. - Hash(BlockHash), -} - -/// Result of spliting chain. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum ChainSplit { - /// Chain is not splited. Pending chain is returned. - /// Given block split is higher than last block. - /// Or in case of split by hash when hash is unknown. - NoSplitPending(Chain), - /// Chain is not splited. Canonical chain is returned. - /// Given block split is lower than first block. - NoSplitCanonical(Chain), - /// Chain is splited in two. - /// Given block split is contained in first chain. - Split { - /// Left contains lower block number that get canonicalized. - /// And substate is empty and not usable. - canonical: Chain, - /// Right contains higher block number, that is still pending. - /// And substate from original chain is moved here. - pending: Chain, - }, -} - -#[cfg(test)] -mod tests { - use super::*; - use reth_primitives::{Account, H160, H256}; - - #[test] - fn chain_append() { - let block = SealedBlockWithSenders::default(); - let block1_hash = H256([0x01; 32]); - let block2_hash = H256([0x02; 32]); - let block3_hash = H256([0x03; 32]); - let block4_hash = H256([0x04; 32]); - - let mut block1 = block.clone(); - let mut block2 = block.clone(); - let mut block3 = block.clone(); - let mut block4 = block; - - block1.block.header.hash = block1_hash; - block2.block.header.hash = block2_hash; - block3.block.header.hash = block3_hash; - block4.block.header.hash = block4_hash; - - block3.block.header.header.parent_hash = block2_hash; - - let mut chain1 = - Chain { blocks: BTreeMap::from([(1, block1), (2, block2)]), ..Default::default() }; - - let chain2 = - Chain { blocks: BTreeMap::from([(3, block3), (4, block4)]), ..Default::default() }; - - assert_eq!(chain1.append_chain(chain2.clone()), Ok(())); - - // chain1 got changed so this will fail - assert!(chain1.append_chain(chain2).is_err()); - } - - #[test] - fn test_number_split() { - let mut base_state = PostState::default(); - let account = Account { nonce: 10, ..Default::default() }; - base_state.create_account(H160([1; 20]), account); - base_state.finish_transition(); - - let mut block_state1 = PostState::default(); - block_state1.create_account(H160([2; 20]), Account::default()); - block_state1.finish_transition(); - - let mut block_state2 = PostState::default(); - block_state2.create_account(H160([3; 20]), Account::default()); - block_state2.finish_transition(); - - let mut block1 = SealedBlockWithSenders::default(); - let block1_hash = H256([15; 32]); - block1.number = 1; - block1.hash = block1_hash; - block1.senders.push(H160([4; 20])); - - let mut block2 = SealedBlockWithSenders::default(); - let block2_hash = H256([16; 32]); - block2.number = 2; - block2.hash = block2_hash; - block2.senders.push(H160([4; 20])); - - let chain = Chain::new(vec![ - (block1.clone(), block_state1.clone()), - (block2.clone(), block_state2.clone()), - ]); - - let mut split1_state = chain.state.clone(); - let split2_state = split1_state.split_at(*chain.block_transitions.get(&1).unwrap()); - - let chain_split1 = Chain { - state: split1_state, - block_transitions: BTreeMap::new(), - blocks: BTreeMap::from([(1, block1.clone())]), - }; - - let chain_split2 = Chain { - state: split2_state, - block_transitions: chain.block_transitions.clone(), - blocks: BTreeMap::from([(2, block2.clone())]), - }; - - // return tip state - assert_eq!(chain.state_at_block(block2.number), Some(chain.state.clone())); - assert_eq!(chain.state_at_block(block1.number), Some(chain_split1.state.clone())); - // state at unknown block - assert_eq!(chain.state_at_block(100), None); - - // split in two - assert_eq!( - chain.clone().split(SplitAt::Hash(block1_hash)), - ChainSplit::Split { canonical: chain_split1, pending: chain_split2 } - ); - - // split at unknown block hash - assert_eq!( - chain.clone().split(SplitAt::Hash(H256([100; 32]))), - ChainSplit::NoSplitPending(chain.clone()) - ); - - // split at higher number - assert_eq!( - chain.clone().split(SplitAt::Number(10)), - ChainSplit::NoSplitCanonical(chain.clone()) - ); - - // split at lower number - assert_eq!(chain.clone().split(SplitAt::Number(0)), ChainSplit::NoSplitPending(chain)); - } } diff --git a/crates/executor/src/blockchain_tree/mod.rs b/crates/executor/src/blockchain_tree/mod.rs index 323771bc6e..ad7c651bf8 100644 --- a/crates/executor/src/blockchain_tree/mod.rs +++ b/crates/executor/src/blockchain_tree/mod.rs @@ -1,14 +1,18 @@ //! Implementation of [`BlockchainTree`] -use chain::{BlockChainId, Chain, ForkBlock}; +use chain::BlockChainId; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{ - blockchain_tree::BlockStatus, consensus::Consensus, events::NewBlockNotifications, - executor::Error as ExecError, Error, + blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error, }; use reth_primitives::{ - BlockHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, SealedHeader, U256, + BlockHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders, U256, +}; +use reth_provider::{ + chain::{ChainSplit, SplitAt}, + post_state::PostState, + CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain, + ExecutorFactory, HeaderProvider, Transaction, }; -use reth_provider::{post_state::PostState, ExecutorFactory, HeaderProvider, Transaction}; use std::{ collections::{BTreeMap, HashMap}, sync::Arc, @@ -18,7 +22,7 @@ pub mod block_indices; use block_indices::BlockIndices; pub mod chain; -use chain::{ChainSplit, SplitAt}; +pub use chain::AppendableChain; pub mod config; use config::BlockchainTreeConfig; @@ -31,7 +35,6 @@ pub use shareable::ShareableBlockchainTree; pub mod post_state_data; pub use post_state_data::{PostStateData, PostStateDataRef}; -use reth_interfaces::events::NewBlockNotificationSink; #[cfg_attr(doc, aquamarine::aquamarine)] /// Tree of chains and its identifications. @@ -76,7 +79,7 @@ use reth_interfaces::events::NewBlockNotificationSink; #[derive(Debug)] pub struct BlockchainTree { /// The tracked chains and their current data. - chains: HashMap, + chains: HashMap, /// Static blockchain ID generator block_chain_id_generator: u64, /// Indices to block and their connection to the canonical chain. @@ -85,15 +88,15 @@ pub struct BlockchainTree { externals: TreeExternals, /// Tree configuration config: BlockchainTreeConfig, - /// Unbounded channel for sending new block notifications. - new_block_notification_sender: NewBlockNotificationSink, + /// Broadcast channel for canon state changes notifications. + canon_state_notification_sender: CanonStateNotificationSender, } /// A container that wraps chains and block indices to allow searching for block hashes across all /// sidechains. pub struct BlockHashes<'a> { /// The current tracked chains. - pub chains: &'a mut HashMap, + pub chains: &'a mut HashMap, /// The block indices for all chains. pub indices: &'a BlockIndices, } @@ -102,7 +105,7 @@ impl BlockchainTree /// Create a new blockchain tree. pub fn new( externals: TreeExternals, - new_block_notification_sender: NewBlockNotificationSink, + canon_state_notification_sender: CanonStateNotificationSender, config: BlockchainTreeConfig, ) -> Result { let max_reorg_depth = config.max_reorg_depth(); @@ -135,7 +138,7 @@ impl BlockchainTree BTreeMap::from_iter(last_canonical_hashes.into_iter()), ), config, - new_block_notification_sender, + canon_state_notification_sender, }) } @@ -279,7 +282,7 @@ impl BlockchainTree .header(&block.parent_hash)? .ok_or(ExecError::CanonicalChain { block_hash: block.parent_hash })? .seal(block.parent_hash); - let chain = Chain::new_canonical_fork( + let chain = AppendableChain::new_canonical_fork( &block, &parent_header, canonical_block_hashes, @@ -348,7 +351,7 @@ impl BlockchainTree /// Insert a chain into the tree. /// /// Inserts a chain into the tree and builds the block indices. - fn insert_chain(&mut self, chain: Chain) -> BlockChainId { + fn insert_chain(&mut self, chain: AppendableChain) -> BlockChainId { let chain_id = self.block_chain_id_generator; self.block_chain_id_generator += 1; self.block_indices.insert_chain(chain_id, &chain); @@ -488,12 +491,18 @@ impl BlockchainTree /// Split a sidechain at the given point, and return the canonical part of it. /// /// The pending part of the chain is reinserted into the tree with the same `chain_id`. - fn split_chain(&mut self, chain_id: BlockChainId, chain: Chain, split_at: SplitAt) -> Chain { + fn split_chain( + &mut self, + chain_id: BlockChainId, + chain: AppendableChain, + split_at: SplitAt, + ) -> Chain { + let chain = chain.into_inner(); match chain.split(split_at) { ChainSplit::Split { canonical, pending } => { // rest of splited chain is inserted back with same chain_id. self.block_indices.insert_chain(chain_id, &pending); - self.chains.insert(chain_id, pending); + self.chains.insert(chain_id, AppendableChain::new(pending)); canonical } ChainSplit::NoSplitCanonical(canonical) => canonical, @@ -558,10 +567,12 @@ impl BlockchainTree // update canonical index self.block_indices.canonicalize_blocks(new_canon_chain.blocks()); - let headers: Vec> = - new_canon_chain.blocks().iter().map(|(_, b)| Arc::new(b.header.clone())).collect(); - // if joins to the tip + let chain_action; + + // if joins to the tip; if new_canon_chain.fork_block_hash() == old_tip.hash { + chain_action = + CanonStateNotification::Commit { new: Arc::new(new_canon_chain.clone()) }; // append to database self.commit_canonical(new_canon_chain)?; } else { @@ -574,18 +585,22 @@ impl BlockchainTree } let old_canon_chain = self.revert_canonical(canon_fork.number)?; + + // state action + chain_action = CanonStateNotification::Reorg { + old: Arc::new(old_canon_chain.clone()), + new: Arc::new(new_canon_chain.clone()), + }; + // commit new canonical chain. self.commit_canonical(new_canon_chain)?; // insert old canon chain - self.insert_chain(old_canon_chain); + self.insert_chain(AppendableChain::new(old_canon_chain)); } - // Broadcast new canonical blocks. - headers.into_iter().for_each(|header| { - // ignore if receiver is dropped. - let _ = self.new_block_notification_sender.send(header); - }); + // send notification + let _ = self.canon_state_notification_sender.send(chain_action); Ok(()) } @@ -593,8 +608,8 @@ impl BlockchainTree /// Subscribe to new blocks events. /// /// Note: Only canonical blocks are send. - pub fn subscribe_new_blocks(&self) -> NewBlockNotifications { - self.new_block_notification_sender.subscribe() + pub fn subscribe_canon_state(&self) -> CanonStateNotifications { + self.canon_state_notification_sender.subscribe() } /// Canonicalize the given chain and commit it to the database. @@ -626,7 +641,7 @@ impl BlockchainTree } self.block_indices.unwind_canonical_chain(unwind_to); // insert old canonical chain to BlockchainTree. - self.insert_chain(old_canon_chain); + self.insert_chain(AppendableChain::new(old_canon_chain)); Ok(()) } @@ -657,6 +672,7 @@ impl BlockchainTree mod tests { use super::*; use crate::test_utils::TestExecutorFactory; + use assert_matches::assert_matches; use reth_db::{ mdbx::{test_utils::create_test_rw_db, Env, WriteMap}, transaction::DbTxMut, @@ -777,9 +793,9 @@ mod tests { // make tree let config = BlockchainTreeConfig::new(1, 2, 3); - let mut tree = BlockchainTree::new(externals, NewBlockNotificationSink::new(10), config) - .expect("failed to create tree"); - let mut new_block_notification = tree.subscribe_new_blocks(); + let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10); + let mut tree = + BlockchainTree::new(externals, sender, config).expect("failed to create tree"); // genesis block 10 is already canonical assert_eq!(tree.make_canonical(&H256::zero()), Ok(())); @@ -828,12 +844,12 @@ mod tests { // make block1 canonical assert_eq!(tree.make_canonical(&block1.hash()), Ok(())); // check notification - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone()))); + assert_matches!(canon_notif.try_recv(), Ok(CanonStateNotification::Commit{ new}) if *new.blocks() == BTreeMap::from([(block1.number,block1.clone())])); - // make block2 canonical + // make block2 canonicals assert_eq!(tree.make_canonical(&block2.hash()), Ok(())); // check notification. - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); + assert_matches!(canon_notif.try_recv(), Ok(CanonStateNotification::Commit{ new}) if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())])); // Trie state: // b2 (canonical block) @@ -893,7 +909,10 @@ mod tests { // make b2a canonical assert_eq!(tree.make_canonical(&block2a_hash), Ok(())); // check notification. - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2a.header.clone()))); + assert_matches!(canon_notif.try_recv(), + Ok(CanonStateNotification::Reorg{ old, new}) + if *old.blocks() == BTreeMap::from([(block2.number,block2.clone())]) + && *new.blocks() == BTreeMap::from([(block2a.number,block2a.clone())])); // Trie state: // b2a b2 (side chain) @@ -915,9 +934,6 @@ mod tests { .assert(&tree); assert_eq!(tree.make_canonical(&block1a_hash), Ok(())); - // check notification. - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1a.header.clone()))); - // Trie state: // b2a b2 (side chain) // | / @@ -941,13 +957,15 @@ mod tests { .with_pending_blocks((block1a.number + 1, HashSet::new())) .assert(&tree); + // check notification. + assert_matches!(canon_notif.try_recv(), + Ok(CanonStateNotification::Reorg{ old, new}) + if *old.blocks() == BTreeMap::from([(block1.number,block1.clone()),(block2a.number,block2a.clone())]) + && *new.blocks() == BTreeMap::from([(block1a.number,block1a.clone())])); + // make b2 canonical assert_eq!(tree.make_canonical(&block2.hash()), Ok(())); - // check notification. - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone()))); - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); - // Trie state: // b2 b2a (side chain) // | / @@ -967,6 +985,12 @@ mod tests { .with_pending_blocks((block2.number + 1, HashSet::new())) .assert(&tree); + // check notification. + assert_matches!(canon_notif.try_recv(), + Ok(CanonStateNotification::Reorg{ old, new}) + if *old.blocks() == BTreeMap::from([(block1a.number,block1a.clone())]) + && *new.blocks() == BTreeMap::from([(block1.number,block1.clone()),(block2.number,block2.clone())])); + // finalize b1 that would make b1a removed from tree tree.finalize_block(11); // Trie state: @@ -1008,8 +1032,6 @@ mod tests { // commit b2a assert_eq!(tree.make_canonical(&block2.hash), Ok(())); - // check notification. - assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); // Trie state: // b2 b2a (side chain) @@ -1026,6 +1048,11 @@ mod tests { .with_pending_blocks((block2.number + 1, HashSet::new())) .assert(&tree); + // check notification. + assert_matches!(canon_notif.try_recv(), + Ok(CanonStateNotification::Commit{ new}) + if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())])); + // update canonical block to b2, this would make b2a be removed assert_eq!(tree.restore_canonical_hashes(12), Ok(())); // Trie state: diff --git a/crates/executor/src/blockchain_tree/post_state_data.rs b/crates/executor/src/blockchain_tree/post_state_data.rs index a14bef5b47..324400281a 100644 --- a/crates/executor/src/blockchain_tree/post_state_data.rs +++ b/crates/executor/src/blockchain_tree/post_state_data.rs @@ -1,7 +1,6 @@ //! Substate for blockchain trees -use crate::blockchain_tree::chain::ForkBlock; -use reth_primitives::{BlockHash, BlockNumber}; +use reth_primitives::{BlockHash, BlockNumber, ForkBlock}; use reth_provider::{post_state::PostState, PostStateDataProvider}; use std::collections::BTreeMap; diff --git a/crates/executor/src/blockchain_tree/shareable.rs b/crates/executor/src/blockchain_tree/shareable.rs index adbf7ed9b1..4e661027a5 100644 --- a/crates/executor/src/blockchain_tree/shareable.rs +++ b/crates/executor/src/blockchain_tree/shareable.rs @@ -4,12 +4,14 @@ use reth_db::database::Database; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer}, consensus::Consensus, - events::{ChainEventSubscriptions, NewBlockNotifications}, provider::ProviderError, Error, }; use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders}; -use reth_provider::{BlockchainTreePendingStateProvider, ExecutorFactory, PostStateDataProvider}; +use reth_provider::{ + BlockchainTreePendingStateProvider, CanonStateSubscriptions, ExecutorFactory, + PostStateDataProvider, +}; use std::{ collections::{BTreeMap, HashSet}, sync::Arc, @@ -100,10 +102,10 @@ impl BlockchainTreePendingState } } -impl ChainEventSubscriptions +impl CanonStateSubscriptions for ShareableBlockchainTree { - fn subscribe_new_blocks(&self) -> NewBlockNotifications { - self.tree.read().subscribe_new_blocks() + fn subscribe_canon_state(&self) -> reth_provider::CanonStateNotifications { + self.tree.read().subscribe_canon_state() } } diff --git a/crates/interfaces/src/events.rs b/crates/interfaces/src/events.rs deleted file mode 100644 index 91c8b54ed0..0000000000 --- a/crates/interfaces/src/events.rs +++ /dev/null @@ -1,54 +0,0 @@ -use parking_lot::Mutex; -use reth_primitives::SealedHeader; -use std::sync::Arc; -use tokio::sync::broadcast::{error::SendError, Receiver, Sender}; - -/// New block notification that is Arc around [SealedHeader]. -pub type NewBlockNotification = Arc; - -/// Type alias for a receiver that receives [NewBlockNotification] -pub type NewBlockNotifications = Receiver; - -/// Type alias for a sender that sends [NewBlockNotification] -pub type NewBlockNotificationsSender = Sender; - -/// A type that allows to register chain related event subscriptions. -pub trait ChainEventSubscriptions: Send + Sync { - /// Get notified when a new block was imported. - fn subscribe_new_blocks(&self) -> NewBlockNotifications; -} - -/// A shareable Sender that allows to send [NewBlockNotification] to all receivers. -#[derive(Debug, Clone)] -pub struct NewBlockNotificationSink { - inner: Arc>>, -} - -// === impl NewBlockNotificationSink === - -impl NewBlockNotificationSink { - /// Creates a new NewBlockNotificationSink with the given capacity. - // // size of the broadcast is double of max reorg depth because at max reorg depth we can have - // // send at least N block at the time. - pub fn new(capacity: usize) -> Self { - let inner = tokio::sync::broadcast::channel(capacity); - Self { inner: Arc::new(Mutex::new(inner.0)) } - } - - /// Attempts to send a value to all active Receiver handles, returning it back if it could not - /// be sent. - pub fn send( - &self, - header: NewBlockNotification, - ) -> Result> { - let sender = self.inner.lock(); - sender.send(header) - } - - /// Creates a new Receiver handle that will receive notifications sent after this call to - /// subscribe. - pub fn subscribe(&self) -> Receiver { - let sender = self.inner.lock(); - sender.subscribe() - } -} diff --git a/crates/interfaces/src/lib.rs b/crates/interfaces/src/lib.rs index f512f7ff76..41b0fdee47 100644 --- a/crates/interfaces/src/lib.rs +++ b/crates/interfaces/src/lib.rs @@ -20,9 +20,6 @@ pub mod executor; mod error; pub use error::{Error, Result}; -/// Traits for subscribing to events. -pub mod events; - /// P2P traits. pub mod p2p; diff --git a/crates/interfaces/src/test_utils/events.rs b/crates/interfaces/src/test_utils/events.rs deleted file mode 100644 index d62626ebaf..0000000000 --- a/crates/interfaces/src/test_utils/events.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::events::{ChainEventSubscriptions, NewBlockNotification, NewBlockNotifications}; -use async_trait::async_trait; -use reth_primitives::{Header, SealedHeader, H256}; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast::{self, Receiver, Sender}; - -/// A test ChainEventSubscriptions -#[derive(Clone, Default)] -pub struct TestChainEventSubscriptions { - new_blocks_txs: Arc>>>, -} - -impl TestChainEventSubscriptions { - /// Adds new block to the queue that can be consumed with - /// [`TestChainEventSubscriptions::subscribe_new_blocks`] - pub fn add_new_block(&mut self, header: SealedHeader) { - let header = Arc::new(header); - self.new_blocks_txs.lock().as_mut().unwrap().retain(|tx| tx.send(header.clone()).is_ok()) - } -} - -impl ChainEventSubscriptions for TestChainEventSubscriptions { - fn subscribe_new_blocks(&self) -> NewBlockNotifications { - let (new_blocks_tx, new_blocks_rx) = broadcast::channel(100); - self.new_blocks_txs.lock().as_mut().unwrap().push(new_blocks_tx); - - new_blocks_rx - } -} diff --git a/crates/interfaces/src/test_utils/mod.rs b/crates/interfaces/src/test_utils/mod.rs index 5fdab195c7..b64acb592a 100644 --- a/crates/interfaces/src/test_utils/mod.rs +++ b/crates/interfaces/src/test_utils/mod.rs @@ -1,12 +1,10 @@ #![allow(unused)] mod bodies; -mod events; mod headers; /// Generators for different data structures like block headers, block bodies and ranges of those. pub mod generators; pub use bodies::*; -pub use events::*; pub use headers::*; diff --git a/crates/miner/src/payload.rs b/crates/miner/src/payload.rs index d23ab68ffa..ebb0bd1fc8 100644 --- a/crates/miner/src/payload.rs +++ b/crates/miner/src/payload.rs @@ -32,7 +32,7 @@ impl BuiltPayload { } /// Container type for all components required to build a payload. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PayloadBuilderAttributes { /// Parent block to build the payload on top pub(crate) parent: H256, diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 8489871fe8..c0a2ae7110 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -652,6 +652,9 @@ pub struct BlockNumHash { pub hash: BlockHash, } +/// Block number and hash of the forked block. +pub type ForkBlock = BlockNumHash; + impl std::fmt::Debug for BlockNumHash { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("").field(&self.number).field(&self.hash).finish() @@ -659,6 +662,11 @@ impl std::fmt::Debug for BlockNumHash { } impl BlockNumHash { + /// Creates a new `BlockNumHash` from a block number and hash. + pub fn new(number: BlockNumber, hash: BlockHash) -> Self { + Self { number, hash } + } + /// Consumes `Self` and returns [`BlockNumber`], [`BlockHash`] pub fn into_components(self) -> (BlockNumber, BlockHash) { (self.number, self.hash) @@ -671,6 +679,12 @@ impl From<(BlockNumber, BlockHash)> for BlockNumHash { } } +impl From<(BlockHash, BlockNumber)> for BlockNumHash { + fn from(val: (BlockHash, BlockNumber)) -> Self { + BlockNumHash { hash: val.0, number: val.1 } + } +} + /// A response to `GetBlockBodies`, containing bodies if any bodies were found. /// /// Withdrawals can be optionally included at the end of the RLP encoded message. diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 7101910171..6d477ed85a 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -40,7 +40,7 @@ pub use account::{Account, Bytecode}; pub use bits::H512; pub use block::{ Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders, - SealedBlock, SealedBlockWithSenders, + ForkBlock, SealedBlock, SealedBlockWithSenders, }; pub use bloom::Bloom; pub use chain::{ @@ -86,6 +86,8 @@ pub type Address = H160; pub type TxHash = H256; /// The sequence number of all existing transactions. pub type TxNumber = u64; +/// The index of transaction in a block. +pub type TxIndex = u64; /// Chain identifier type (introduced in EIP-155). pub type ChainId = u64; /// An account storage key. diff --git a/crates/revm/revm-primitives/src/lib.rs b/crates/revm/revm-primitives/src/lib.rs index 8053704946..b329c9b7c0 100644 --- a/crates/revm/revm-primitives/src/lib.rs +++ b/crates/revm/revm-primitives/src/lib.rs @@ -14,3 +14,6 @@ pub mod env; /// Helpers for type compatibility between reth and revm types mod compat; pub use compat::*; + +/// Re-exports revm types; +pub use revm::*; diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 2a508b4921..a878019750 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -24,9 +24,8 @@ //! Configure only a http server with a selection of [RethRpcModule]s //! //! ``` -//! use reth_interfaces::events::ChainEventSubscriptions; //! use reth_network_api::{NetworkInfo, Peers}; -//! use reth_provider::{BlockProvider, StateProviderFactory, EvmEnvProvider}; +//! use reth_provider::{BlockProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider}; //! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig}; //! use reth_tasks::TokioTaskExecutor; //! use reth_transaction_pool::TransactionPool; @@ -35,7 +34,7 @@ //! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, //! Pool: TransactionPool + Clone + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, -//! Events: ChainEventSubscriptions + Clone + 'static, +//! Events: CanonStateSubscriptions + Clone + 'static, //! { //! // configure the rpc module per transport //! let transports = TransportRpcModuleConfig::default().with_http(vec![ @@ -60,10 +59,9 @@ use jsonrpsee::{ server::{IdProvider, Server, ServerHandle}, RpcModule, }; -use reth_interfaces::events::ChainEventSubscriptions; use reth_ipc::server::IpcServer; use reth_network_api::{NetworkInfo, Peers}; -use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; +use reth_provider::{BlockProvider, CanonStateSubscriptions, EvmEnvProvider, StateProviderFactory}; use reth_rpc::{ eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api, @@ -119,7 +117,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let module_config = module_config.into(); let server_config = server_config.into(); @@ -199,7 +197,7 @@ impl RpcModuleBuilder(self, events: E) -> RpcModuleBuilder where - E: ChainEventSubscriptions + 'static, + E: CanonStateSubscriptions + 'static, { let Self { client, pool, executor, network, .. } = self; RpcModuleBuilder { client, network, pool, executor, events } @@ -212,7 +210,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be /// used to start the transport server(s). @@ -364,7 +362,7 @@ impl RpcModuleSelection { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let mut registry = RethModuleRegistry::new(client, pool, network, executor, events, config); registry.module_for(self) @@ -529,7 +527,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { /// Register Eth Namespace pub fn register_eth(&mut self) -> &mut Self { diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 2c1d6cd842..e2efd31d54 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -1,6 +1,5 @@ -use reth_interfaces::test_utils::TestChainEventSubscriptions; use reth_network_api::test_utils::NoopNetwork; -use reth_provider::test_utils::NoopProvider; +use reth_provider::test_utils::{NoopProvider, TestCanonStateSubscriptions}; use reth_rpc_builder::{ RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle, TransportRpcModuleConfig, @@ -57,12 +56,12 @@ pub fn test_rpc_builder() -> RpcModuleBuilder< TestPool, NoopNetwork, TokioTaskExecutor, - TestChainEventSubscriptions, + TestCanonStateSubscriptions, > { RpcModuleBuilder::default() .with_client(NoopProvider::default()) .with_pool(testing_pool()) .with_network(NoopNetwork::default()) .with_executor(TokioTaskExecutor::default()) - .with_events(TestChainEventSubscriptions::default()) + .with_events(TestCanonStateSubscriptions::default()) } diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 20a95332fa..02c000e645 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -42,6 +42,7 @@ async-trait = "0.1" tokio = { version = "1", features = ["sync"] } tower = "0.4" tokio-stream = { version = "0.1", features = ["sync"] } +tokio-util = "0.7" pin-project = "1.0" bytes = "1.4" diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index a110157c1b..b32ec5dcfd 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -194,9 +194,9 @@ where logs_utils::append_matching_block_logs( &mut all_logs, &filter, - block_hash, - block.number, + (block_hash, block.number).into(), block.body.into_iter().map(|tx| tx.hash).zip(receipts), + false, ); } } @@ -268,9 +268,9 @@ where logs_utils::append_matching_block_logs( &mut all_logs, &filter_params, - block_hash, - block_number, + (block_number, block_hash).into(), block.body.into_iter().map(|tx| tx.hash).zip(receipts), + false, ); // size check diff --git a/crates/rpc/rpc/src/eth/logs_utils.rs b/crates/rpc/rpc/src/eth/logs_utils.rs index f4be24bef8..66bffa766a 100644 --- a/crates/rpc/rpc/src/eth/logs_utils.rs +++ b/crates/rpc/rpc/src/eth/logs_utils.rs @@ -1,19 +1,20 @@ -use reth_primitives::{filter::FilteredParams, BlockNumberOrTag, ChainInfo, Receipt, TxHash, U256}; +use reth_primitives::{ + filter::FilteredParams, BlockNumHash, BlockNumberOrTag, ChainInfo, Receipt, TxHash, U256, +}; use reth_rpc_types::Log; -use revm::primitives::B256 as H256; /// Returns all matching logs of a block's receipts grouped with the hash of their transaction. pub(crate) fn matching_block_logs( filter: &FilteredParams, - block_hash: H256, - block_number: u64, + block: BlockNumHash, tx_and_receipts: I, + removed: bool, ) -> Vec where I: IntoIterator, { let mut all_logs = Vec::new(); - append_matching_block_logs(&mut all_logs, filter, block_hash, block_number, tx_and_receipts); + append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed); all_logs } @@ -21,30 +22,30 @@ where pub(crate) fn append_matching_block_logs( all_logs: &mut Vec, filter: &FilteredParams, - block_hash: H256, - block_number: u64, + block: BlockNumHash, tx_and_receipts: I, + removed: bool, ) where I: IntoIterator, { - let block_number_u256 = U256::from(block_number); + let block_number_u256 = U256::from(block.number); // tracks the index of a log in the entire block let mut log_index: u32 = 0; for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() { let logs = receipt.logs; for (transaction_log_idx, log) in logs.into_iter().enumerate() { - if log_matches_filter(block_hash, block_number, &log, filter) { + if log_matches_filter(block, &log, filter) { let log = Log { address: log.address, topics: log.topics, data: log.data, - block_hash: Some(block_hash), + block_hash: Some(block.hash), block_number: Some(block_number_u256), transaction_hash: Some(transaction_hash), transaction_index: Some(U256::from(transaction_idx)), log_index: Some(U256::from(log_index)), transaction_log_index: Some(U256::from(transaction_log_idx)), - removed: false, + removed, }; all_logs.push(log); } @@ -55,14 +56,13 @@ pub(crate) fn append_matching_block_logs( /// Returns true if the log matches the filter and should be included pub(crate) fn log_matches_filter( - block_hash: H256, - block_number: u64, + block: BlockNumHash, log: &reth_primitives::Log, params: &FilteredParams, ) -> bool { if params.filter.is_some() && - (!params.filter_block_range(block_number) || - !params.filter_block_hash(block_hash) || + (!params.filter_block_range(block.number) || + !params.filter_block_hash(block.hash) || !params.filter_address(log) || !params.filter_topics(log)) { diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index a5a26d9a8d..261f688b4f 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,12 +1,10 @@ //! `eth_` PubSub RPC handler implementation - use crate::eth::{cache::EthStateCache, logs_utils}; use futures::StreamExt; use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; -use reth_interfaces::events::{ChainEventSubscriptions, NewBlockNotification}; use reth_network_api::NetworkInfo; -use reth_primitives::{filter::FilteredParams, Receipt, TransactionSigned, TxHash}; -use reth_provider::{BlockProvider, EvmEnvProvider}; +use reth_primitives::{filter::FilteredParams, TxHash}; +use reth_provider::{BlockProvider, CanonStateSubscriptions, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::{ pubsub::{ @@ -74,7 +72,7 @@ impl EthPubSubApiServer for EthPubSub( ) where Client: BlockProvider + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, - Events: ChainEventSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, Network: NetworkInfo + Clone + 'static, { match kind { @@ -132,7 +130,7 @@ async fn handle_accepted( } SubscriptionKind::Syncing => { // get new block subscription - let mut new_blocks = BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks()); + let mut canon_state = BroadcastStream::new(pubsub.chain_events.subscribe_canon_state()); // get current sync status let mut initial_sync_status = pubsub.network.is_syncing(); let current_sub_res = pubsub.sync_status(initial_sync_status).await; @@ -140,7 +138,7 @@ async fn handle_accepted( // send the current status immediately let _ = accepted_sink.send(¤t_sub_res); - while (new_blocks.next().await).is_some() { + while (canon_state.next().await).is_some() { let current_syncing = pubsub.network.is_syncing(); // Only send a new response if the sync status has changed if current_syncing != initial_sync_status { @@ -213,50 +211,45 @@ where impl EthPubSubInner where Client: BlockProvider + EvmEnvProvider + 'static, - Events: ChainEventSubscriptions + 'static, + Events: CanonStateSubscriptions + 'static, Network: NetworkInfo + 'static, Pool: 'static, { /// Returns a stream that yields all new RPC blocks. fn into_new_headers_stream(self) -> impl Stream { - BroadcastStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| { - let new_block = new_block.expect("new block subscription never ends; qed"); - Header::from_primitive_with_hash(new_block.as_ref().clone()) - }) + BroadcastStream::new(self.chain_events.subscribe_canon_state()) + .map(|new_block| { + let new_chain = new_block.expect("new block subscription never ends; qed"); + new_chain + .commited() + .map(|c| { + c.blocks() + .iter() + .map(|(_, block)| { + Header::from_primitive_with_hash(block.header.clone()) + }) + .collect::>() + }) + .unwrap_or_default() + }) + .flat_map(futures::stream::iter) } /// Returns a stream that yields all logs that match the given filter. fn into_log_stream(self, filter: FilteredParams) -> impl Stream { - BroadcastStream::new(self.chain_events.subscribe_new_blocks()) - .filter_map(move |new_block| { - Box::pin(get_block_receipts(self.eth_cache.clone(), new_block.ok())) + BroadcastStream::new(self.chain_events.subscribe_canon_state()) + .map(move |canon_state| { + canon_state.expect("new block subscription never ends; qed").block_receipts() }) - .flat_map(move |(new_block, transactions, receipts)| { - let block_hash = new_block.hash; - let block_number = new_block.header.number; + .flat_map(futures::stream::iter) + .flat_map(move |(block_receipts, removed)| { let all_logs = logs_utils::matching_block_logs( &filter, - block_hash, - block_number, - transactions.into_iter().map(|tx| tx.hash).zip(receipts), + block_receipts.block, + block_receipts.tx_receipts.into_iter(), + removed, ); futures::stream::iter(all_logs) }) } } - -/// Helper function for getting block receipts and transactions -async fn get_block_receipts( - eth_cache: EthStateCache, - new_block: Option, -) -> Option<(NewBlockNotification, Vec, Vec)> { - let Some(new_block) = new_block else { return None; }; - let (txs, receipts) = futures::join!( - eth_cache.get_block_transactions(new_block.hash), - eth_cache.get_receipts(new_block.hash) - ); - match (txs.ok().flatten(), receipts.ok().flatten()) { - (Some(txs), Some(receipts)) => Some((new_block, txs, receipts)), - _ => None, - } -} diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index d8969f199c..b2e23764a0 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -14,10 +14,11 @@ reth-interfaces = { path = "../../interfaces" } reth-revm-primitives = { path = "../../revm/revm-primitives" } reth-db = { path = "../db" } reth-codecs = { path = "../codecs" } -reth-tracing = {path = "../../tracing"} -reth-rlp = {path = "../../rlp"} +reth-tracing = { path = "../../tracing" } +reth-rlp = { path = "../../rlp" } -revm-primitives = "1.1.0" +# async +tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } # trie cita_trie = "4.0.0" @@ -30,7 +31,6 @@ tracing = "0.1" thiserror = "1.0.37" auto_impl = "1.0" itertools = "0.10" - parking_lot = "0.12" [dev-dependencies] diff --git a/crates/storage/provider/src/chain.rs b/crates/storage/provider/src/chain.rs new file mode 100644 index 0000000000..1614f60331 --- /dev/null +++ b/crates/storage/provider/src/chain.rs @@ -0,0 +1,375 @@ +//! Contains [Chain], a chain of blocks and their final state. + +use crate::PostState; +use reth_interfaces::{executor::Error as ExecError, Error}; +use reth_primitives::{ + BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlockWithSenders, TransitionId, + TxHash, +}; +use std::collections::BTreeMap; + +/// A chain of blocks and their final state. +/// +/// The chain contains the state of accounts after execution of its blocks, +/// changesets for those blocks (and their transactions), as well as the blocks themselves. +/// +/// Used inside the BlockchainTree. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct Chain { + /// The state of accounts after execution of the blocks in this chain. + /// + /// This state also contains the individual changes that lead to the current state. + pub state: PostState, + /// The blocks in this chain. + pub blocks: BTreeMap, + /// A mapping of each block number in the chain to the highest transition ID in the chain's + /// state after execution of the block. + /// + /// This is used to revert changes in the state until a certain block number when the chain is + /// split. + pub block_transitions: BTreeMap, +} + +impl Chain { + /// Get the blocks in this chain. + pub fn blocks(&self) -> &BTreeMap { + &self.blocks + } + + /// Get post state of this chain + pub fn state(&self) -> &PostState { + &self.state + } + + /// Return block number of the block hash. + pub fn block_number(&self, block_hash: BlockHash) -> Option { + self.blocks.iter().find_map(|(num, block)| (block.hash() == block_hash).then_some(*num)) + } + + /// Return block inner transition ids. + pub fn block_transitions(&self) -> &BTreeMap { + &self.block_transitions + } + + /// Return post state of the block at the `block_number` or None if block is not known + pub fn state_at_block(&self, block_number: BlockNumber) -> Option { + let mut state = self.state.clone(); + if self.tip().number == block_number { + return Some(state) + } + + if let Some(&transition_id) = self.block_transitions.get(&block_number) { + state.revert_to(transition_id); + return Some(state) + } + + None + } + + /// Destructure the chain into its inner components, the blocks and the state. + pub fn into_inner(self) -> (BTreeMap, PostState) { + (self.blocks, self.state) + } + + /// Get the block at which this chain forked. + pub fn fork_block(&self) -> ForkBlock { + let tip = self.first(); + ForkBlock { number: tip.number.saturating_sub(1), hash: tip.parent_hash } + } + + /// Get the block number at which this chain forked. + pub fn fork_block_number(&self) -> BlockNumber { + self.first().number.saturating_sub(1) + } + + /// Get the block hash at which this chain forked. + pub fn fork_block_hash(&self) -> BlockHash { + self.first().parent_hash + } + + /// Get the first block in this chain. + pub fn first(&self) -> &SealedBlockWithSenders { + self.blocks.first_key_value().expect("Chain has at least one block for first").1 + } + + /// Get the tip of the chain. + /// + /// # Note + /// + /// Chains always have at least one block. + pub fn tip(&self) -> &SealedBlockWithSenders { + self.blocks.last_key_value().expect("Chain should have at least one block").1 + } + + /// Create new chain with given blocks and post state. + pub fn new(blocks: Vec<(SealedBlockWithSenders, PostState)>) -> Self { + let mut state = PostState::default(); + let mut block_transitions = BTreeMap::new(); + let mut block_num_hash = BTreeMap::new(); + for (block, block_state) in blocks.into_iter() { + state.extend(block_state); + block_transitions.insert(block.number, state.transitions_count()); + block_num_hash.insert(block.number, block); + } + + Self { state, block_transitions, blocks: block_num_hash } + } + + /// Merge two chains by appending the given chain into the current one. + /// + /// The state of accounts for this chain is set to the state of the newest chain. + pub fn append_chain(&mut self, chain: Chain) -> Result<(), Error> { + let chain_tip = self.tip(); + if chain_tip.hash != chain.fork_block_hash() { + return Err(ExecError::AppendChainDoesntConnect { + chain_tip: chain_tip.num_hash(), + other_chain_fork: chain.fork_block().into_components(), + } + .into()) + } + + // Insert blocks from other chain + self.blocks.extend(chain.blocks.into_iter()); + let current_transition_count = self.state.transitions_count(); + self.state.extend(chain.state); + + // Update the block transition mapping, shifting the transition ID by the current number of + // transitions in *this* chain + for (block_number, transition_id) in chain.block_transitions.iter() { + self.block_transitions.insert(*block_number, transition_id + current_transition_count); + } + Ok(()) + } + + /// Get all receipts with attachment. + /// + /// Attachment includes block number, block hash, transaction hash and transaction index. + pub fn receipts_with_attachment(&self) -> Vec { + let mut receipt_attch = Vec::new(); + let mut receipts = self.state().receipts().iter(); + for (block_num, block) in self.blocks().iter() { + let block_num_hash = BlockNumHash::new(*block_num, block.hash()); + let mut tx_receipts = Vec::new(); + for tx in block.body.iter() { + if let Some(receipt) = receipts.next() { + tx_receipts.push((tx.hash(), receipt.clone())); + } + } + receipt_attch.push(BlockReceipts { block: block_num_hash, tx_receipts }); + } + receipt_attch + } + + /// Split this chain at the given block. + /// + /// The given block will be the first block in the first returned chain. + /// + /// If the given block is not found, [`ChainSplit::NoSplitPending`] is returned. + /// Split chain at the number or hash, block with given number will be included at first chain. + /// If any chain is empty (Does not have blocks) None will be returned. + /// + /// # Note + /// + /// The block number to transition ID mapping is only found in the second chain, making it + /// impossible to perform any state reverts on the first chain. + /// + /// The second chain only contains the changes that were reverted on the first chain; however, + /// it retains the up to date state as if the chains were one, i.e. the second chain is an + /// extension of the first. + pub fn split(mut self, split_at: SplitAt) -> ChainSplit { + let chain_tip = *self.blocks.last_entry().expect("chain is never empty").key(); + let block_number = match split_at { + SplitAt::Hash(block_hash) => { + let Some(block_number) = self.block_number(block_hash) else { return ChainSplit::NoSplitPending(self)}; + // If block number is same as tip whole chain is becoming canonical. + if block_number == chain_tip { + return ChainSplit::NoSplitCanonical(self) + } + block_number + } + SplitAt::Number(block_number) => { + if block_number >= chain_tip { + return ChainSplit::NoSplitCanonical(self) + } + if block_number < *self.blocks.first_entry().expect("chain is never empty").key() { + return ChainSplit::NoSplitPending(self) + } + block_number + } + }; + + let higher_number_blocks = self.blocks.split_off(&(block_number + 1)); + + let mut canonical_state = std::mem::take(&mut self.state); + let new_state = canonical_state.split_at( + *self.block_transitions.get(&block_number).expect("Unknown block transition ID"), + ); + self.state = new_state; + + ChainSplit::Split { + canonical: Chain { + state: canonical_state, + block_transitions: BTreeMap::new(), + blocks: self.blocks, + }, + pending: Chain { + state: self.state, + block_transitions: self.block_transitions, + blocks: higher_number_blocks, + }, + } + } +} + +/// Used to hold receipts and their attachment. +#[derive(Default, Clone, Debug)] +pub struct BlockReceipts { + /// Block identifier + pub block: BlockNumHash, + /// Transaction identifier and receipt. + pub tx_receipts: Vec<(TxHash, Receipt)>, +} + +/// Used in spliting the chain. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SplitAt { + /// Split at block number. + Number(BlockNumber), + /// Split at block hash. + Hash(BlockHash), +} + +/// Result of spliting chain. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ChainSplit { + /// Chain is not splited. Pending chain is returned. + /// Given block split is higher than last block. + /// Or in case of split by hash when hash is unknown. + NoSplitPending(Chain), + /// Chain is not splited. Canonical chain is returned. + /// Given block split is lower than first block. + NoSplitCanonical(Chain), + /// Chain is splited in two. + /// Given block split is contained in first chain. + Split { + /// Left contains lower block number that get canonicalized. + /// And substate is empty and not usable. + canonical: Chain, + /// Right contains higher block number, that is still pending. + /// And substate from original chain is moved here. + pending: Chain, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_primitives::{Account, H160, H256}; + + #[test] + fn chain_append() { + let block = SealedBlockWithSenders::default(); + let block1_hash = H256([0x01; 32]); + let block2_hash = H256([0x02; 32]); + let block3_hash = H256([0x03; 32]); + let block4_hash = H256([0x04; 32]); + + let mut block1 = block.clone(); + let mut block2 = block.clone(); + let mut block3 = block.clone(); + let mut block4 = block; + + block1.block.header.hash = block1_hash; + block2.block.header.hash = block2_hash; + block3.block.header.hash = block3_hash; + block4.block.header.hash = block4_hash; + + block3.block.header.header.parent_hash = block2_hash; + + let mut chain1 = + Chain { blocks: BTreeMap::from([(1, block1), (2, block2)]), ..Default::default() }; + + let chain2 = + Chain { blocks: BTreeMap::from([(3, block3), (4, block4)]), ..Default::default() }; + + assert_eq!(chain1.append_chain(chain2.clone()), Ok(())); + + // chain1 got changed so this will fail + assert!(chain1.append_chain(chain2).is_err()); + } + + #[test] + fn test_number_split() { + let mut base_state = PostState::default(); + let account = Account { nonce: 10, ..Default::default() }; + base_state.create_account(H160([1; 20]), account); + base_state.finish_transition(); + + let mut block_state1 = PostState::default(); + block_state1.create_account(H160([2; 20]), Account::default()); + block_state1.finish_transition(); + + let mut block_state2 = PostState::default(); + block_state2.create_account(H160([3; 20]), Account::default()); + block_state2.finish_transition(); + + let mut block1 = SealedBlockWithSenders::default(); + let block1_hash = H256([15; 32]); + block1.number = 1; + block1.hash = block1_hash; + block1.senders.push(H160([4; 20])); + + let mut block2 = SealedBlockWithSenders::default(); + let block2_hash = H256([16; 32]); + block2.number = 2; + block2.hash = block2_hash; + block2.senders.push(H160([4; 20])); + + let chain = Chain::new(vec![ + (block1.clone(), block_state1.clone()), + (block2.clone(), block_state2.clone()), + ]); + + let mut split1_state = chain.state.clone(); + let split2_state = split1_state.split_at(*chain.block_transitions.get(&1).unwrap()); + + let chain_split1 = Chain { + state: split1_state, + block_transitions: BTreeMap::new(), + blocks: BTreeMap::from([(1, block1.clone())]), + }; + + let chain_split2 = Chain { + state: split2_state, + block_transitions: chain.block_transitions.clone(), + blocks: BTreeMap::from([(2, block2.clone())]), + }; + + // return tip state + assert_eq!(chain.state_at_block(block2.number), Some(chain.state.clone())); + assert_eq!(chain.state_at_block(block1.number), Some(chain_split1.state.clone())); + // state at unknown block + assert_eq!(chain.state_at_block(100), None); + + // split in two + assert_eq!( + chain.clone().split(SplitAt::Hash(block1_hash)), + ChainSplit::Split { canonical: chain_split1, pending: chain_split2 } + ); + + // split at unknown block hash + assert_eq!( + chain.clone().split(SplitAt::Hash(H256([100; 32]))), + ChainSplit::NoSplitPending(chain.clone()) + ); + + // split at higher number + assert_eq!( + chain.clone().split(SplitAt::Number(10)), + ChainSplit::NoSplitCanonical(chain.clone()) + ); + + // split at lower number + assert_eq!(chain.clone().split(SplitAt::Number(0)), ChainSplit::NoSplitPending(chain)); + } +} diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 05123fb3fe..e8fe0ba1b3 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -12,9 +12,10 @@ mod traits; pub use traits::{ AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, BlockProvider, - BlockchainTreePendingStateProvider, EvmEnvProvider, ExecutorFactory, HeaderProvider, - PostStateDataProvider, ReceiptProvider, StateProvider, StateProviderBox, StateProviderFactory, - TransactionsProvider, WithdrawalsProvider, + BlockchainTreePendingStateProvider, CanonStateNotification, CanonStateNotificationSender, + CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory, + HeaderProvider, PostStateDataProvider, ReceiptProvider, StateProvider, StateProviderBox, + StateProviderFactory, TransactionsProvider, WithdrawalsProvider, }; /// Provider trait implementations. @@ -45,3 +46,6 @@ pub mod test_utils; /// Re-export provider error. pub use reth_interfaces::provider::ProviderError; + +pub mod chain; +pub use chain::Chain; diff --git a/crates/storage/provider/src/post_state.rs b/crates/storage/provider/src/post_state.rs index 95ddb82576..deb4cdfe43 100644 --- a/crates/storage/provider/src/post_state.rs +++ b/crates/storage/provider/src/post_state.rs @@ -263,8 +263,8 @@ impl PostState { } /// Get the number of transitions causing this [PostState] - pub fn transitions_count(&self) -> usize { - self.current_transition_id as usize + pub fn transitions_count(&self) -> TransitionId { + self.current_transition_id } /// Extend this [PostState] with the changes in another [PostState]. @@ -291,10 +291,10 @@ impl PostState { /// The reverted changes are removed from this post-state, and their effects are reverted. /// /// The reverted changes are returned. - pub fn revert_to(&mut self, transition_id: usize) -> Vec { + pub fn revert_to(&mut self, transition_id: TransitionId) -> Vec { let mut changes_to_revert = Vec::new(); self.changes.retain(|change| { - if change.transition_id() >= transition_id as u64 { + if change.transition_id() >= transition_id { changes_to_revert.push(change.clone()); false } else { @@ -322,7 +322,7 @@ impl PostState { /// 1. This post-state has the changes reverted /// 2. The returned post-state does *not* have the changes reverted, but only contains the /// descriptions of the changes that were reverted in the first post-state. - pub fn split_at(&mut self, transition_id: usize) -> Self { + pub fn split_at(&mut self, transition_id: TransitionId) -> Self { // Clone ourselves let mut non_reverted_state = self.clone(); @@ -913,7 +913,7 @@ mod tests { assert_eq!(state.transitions_count(), 2); assert_eq!(state.accounts().len(), 2); - let reverted_changes = state.revert_to(revert_to as usize); + let reverted_changes = state.revert_to(revert_to); assert_eq!(state.accounts().len(), 1); assert_eq!(state.transitions_count(), 1); assert_eq!(reverted_changes.len(), 1); diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index ec115a26a9..473a17933f 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -12,8 +12,8 @@ use reth_primitives::{ use reth_revm_primitives::{ config::revm_spec, env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env}, + primitives::{BlockEnv, CfgEnv, SpecId}, }; -use revm_primitives::{BlockEnv, CfgEnv, SpecId}; use std::{ops::RangeBounds, sync::Arc}; mod state; diff --git a/crates/storage/provider/src/test_utils/events.rs b/crates/storage/provider/src/test_utils/events.rs new file mode 100644 index 0000000000..230abeb674 --- /dev/null +++ b/crates/storage/provider/src/test_utils/events.rs @@ -0,0 +1,35 @@ +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast::{self, Sender}; + +use crate::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, Chain}; + +/// A test ChainEventSubscriptions +#[derive(Clone, Default)] +pub struct TestCanonStateSubscriptions { + canon_notif_tx: Arc>>>, +} + +impl TestCanonStateSubscriptions { + /// Adds new block commit to the queue that can be consumed with + /// [`TestCanonStateSubscriptions::subscribe_canon_state`] + pub fn add_next_commit(&mut self, new: Arc) { + let event = CanonStateNotification::Commit { new }; + self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) + } + + /// Adds reorg to the queue that can be consumed with + /// [`TestCanonStateSubscriptions::subscribe_canon_state`] + pub fn add_next_reorg(&mut self, old: Arc, new: Arc) { + let event = CanonStateNotification::Reorg { old, new }; + self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok()) + } +} + +impl CanonStateSubscriptions for TestCanonStateSubscriptions { + fn subscribe_canon_state(&self) -> CanonStateNotifications { + let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100); + self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx); + + canon_notif_rx + } +} diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 6ab1c3275b..1bd6347eea 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -10,7 +10,7 @@ use reth_primitives::{ Bytecode, Bytes, ChainInfo, Header, Receipt, StorageKey, StorageValue, TransactionMeta, TransactionSigned, TxHash, TxNumber, H256, U256, }; -use revm_primitives::{BlockEnv, CfgEnv}; +use reth_revm_primitives::primitives::{BlockEnv, CfgEnv}; use std::{ collections::{BTreeMap, HashMap}, ops::RangeBounds, diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index b3097bb5ff..8a72aa01a4 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -1,6 +1,8 @@ pub mod blocks; +mod events; mod mock; mod noop; +pub use events::TestCanonStateSubscriptions; pub use mock::{ExtendedAccount, MockEthProvider}; pub use noop::NoopProvider; diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 11c754d048..3defedb669 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -9,7 +9,7 @@ use reth_primitives::{ Receipt, StorageKey, StorageValue, TransactionMeta, TransactionSigned, TxHash, TxNumber, H256, KECCAK_EMPTY, U256, }; -use revm_primitives::{BlockEnv, CfgEnv}; +use reth_revm_primitives::primitives::{BlockEnv, CfgEnv}; use std::ops::RangeBounds; /// Supports various api interfaces for testing purposes. diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/storage/provider/src/traits/chain.rs new file mode 100644 index 0000000000..545836c0db --- /dev/null +++ b/crates/storage/provider/src/traits/chain.rs @@ -0,0 +1,86 @@ +///! Canonical chain state notification trait and types. +use crate::{chain::BlockReceipts, Chain}; +use auto_impl::auto_impl; +use std::sync::Arc; +use tokio::sync::broadcast::{Receiver, Sender}; + +/// Type alias for a receiver that receives [CanonStateNotification] +pub type CanonStateNotifications = Receiver; + +/// Type alias for a sender that sends [CanonStateNotification] +pub type CanonStateNotificationSender = Sender; + +/// A type that allows to register chain related event subscriptions. +#[auto_impl(&, Arc)] +pub trait CanonStateSubscriptions: Send + Sync { + /// Get notified when a new block was imported. + fn subscribe_canon_state(&self) -> CanonStateNotifications; +} + +/// Chain action that is triggered when a new block is imported or old block is reverted. +/// and will return all [`crate::PostState`] and [`reth_primitives::SealedBlockWithSenders`] of both +/// reverted and commited blocks. +#[derive(Clone, Debug)] +#[allow(missing_docs)] +pub enum CanonStateNotification { + /// Chain reorgs and both old and new chain are returned. + Reorg { old: Arc, new: Arc }, + /// Chain got reverted without reorg and only old chain is returned. + Revert { old: Arc }, + /// Chain got extended without reorg and only new chain is returned. + Commit { new: Arc }, +} + +// For one reason or another, the compiler can't derive PartialEq for CanonStateNotification. +// so we are forced to implement it manually. +impl PartialEq for CanonStateNotification { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Reorg { old: old1, new: new1 }, Self::Reorg { old: old2, new: new2 }) => { + old1 == old2 && new1 == new2 + } + (Self::Revert { old: old1 }, Self::Revert { old: old2 }) => old1 == old2, + (Self::Commit { new: new1 }, Self::Commit { new: new2 }) => new1 == new2, + _ => false, + } + } +} + +impl CanonStateNotification { + /// Get old chain if any. + pub fn reverted(&self) -> Option> { + match self { + Self::Reorg { old, .. } => Some(old.clone()), + Self::Revert { old } => Some(old.clone()), + Self::Commit { .. } => None, + } + } + + /// Get new chain if any. + pub fn commited(&self) -> Option> { + match self { + Self::Reorg { new, .. } => Some(new.clone()), + Self::Revert { .. } => None, + Self::Commit { new } => Some(new.clone()), + } + } + + /// Return receipt with its block number and transaction hash. + /// + /// Last boolean is true if receipt is from reverted block. + pub fn block_receipts(&self) -> Vec<(BlockReceipts, bool)> { + let mut receipts = Vec::new(); + + // get old receipts + if let Some(old) = self.reverted() { + receipts + .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true))); + } + // get new receipts + if let Some(new) = self.commited() { + receipts + .extend(new.receipts_with_attachment().into_iter().map(|receipt| (receipt, false))); + } + receipts + } +} diff --git a/crates/storage/provider/src/traits/evm_env.rs b/crates/storage/provider/src/traits/evm_env.rs index 9afe1ce471..a125b89943 100644 --- a/crates/storage/provider/src/traits/evm_env.rs +++ b/crates/storage/provider/src/traits/evm_env.rs @@ -1,9 +1,9 @@ use reth_interfaces::Result; use reth_primitives::{BlockId, Header}; -use revm_primitives::{BlockEnv, CfgEnv}; +use reth_revm_primitives::primitives::{BlockEnv, CfgEnv}; /// A provider type that knows chain specific information required to configure an -/// [Env](revm_primitives::Env) +/// [Env](reth_revm_primitives::primitives::Env) /// /// This type is mainly used to provide required data to configure the EVM environment. #[auto_impl::auto_impl(&, Arc)] diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 7f10ae4acf..f88ba4f899 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -35,3 +35,9 @@ pub use withdrawals::WithdrawalsProvider; mod executor; pub use executor::{BlockExecutor, ExecutorFactory}; + +mod chain; +pub use chain::{ + CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, + CanonStateSubscriptions, +}; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 75c22cb17b..16167f94bc 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -543,7 +543,7 @@ where self.insert_hashes( fork_block_number, first_transition_id, - first_transition_id + num_transitions as u64, + first_transition_id + num_transitions, new_tip_number, new_tip_hash, expected_state_root,