diff --git a/crates/executor/src/blockchain_tree/mod.rs b/crates/executor/src/blockchain_tree/mod.rs index fe25987601..f8388aee3c 100644 --- a/crates/executor/src/blockchain_tree/mod.rs +++ b/crates/executor/src/blockchain_tree/mod.rs @@ -2,13 +2,21 @@ use chain::{BlockChainId, Chain, ForkBlock}; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{ - blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error, + blockchain_tree::BlockStatus, + consensus::Consensus, + events::{NewBlockNotifications, NewBlockNotificationsSender}, + executor::Error as ExecError, + Error, }; use reth_primitives::{ - BlockHash, BlockNumHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, U256, + BlockHash, BlockNumHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, + SealedHeader, U256, }; use reth_provider::{post_state::PostState, ExecutorFactory, HeaderProvider, Transaction}; -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; pub mod block_indices; use block_indices::BlockIndices; @@ -80,6 +88,8 @@ pub struct BlockchainTree { externals: TreeExternals, /// Tree configuration config: BlockchainTreeConfig, + /// Unbounded channel for sending new block notifications. + new_block_notication_sender: NewBlockNotificationsSender, } /// A container that wraps chains and block indices to allow searching for block hashes across all @@ -118,6 +128,11 @@ impl BlockchainTree last_canonical_hashes.last().cloned().unwrap_or_default() }; + // size of the broadcast is double of max reorg depth because at max reorg depth we can have + // send at least N block at the time. + let (new_block_notication_sender, _) = + tokio::sync::broadcast::channel(2 * max_reorg_depth as usize); + Ok(Self { externals, block_chain_id_generator: 0, @@ -127,6 +142,7 @@ impl BlockchainTree BTreeMap::from_iter(last_canonical_hashes.into_iter()), ), config, + new_block_notication_sender, }) } @@ -549,6 +565,8 @@ impl BlockchainTree // update canonical index self.block_indices.canonicalize_blocks(new_canon_chain.blocks()); + let headers: Vec> = + new_canon_chain.blocks().iter().map(|(_, b)| Arc::new(b.header.clone())).collect(); // if joins to the tip if new_canon_chain.fork_block_hash() == old_tip.hash { // append to database @@ -565,13 +583,27 @@ impl BlockchainTree let old_canon_chain = self.revert_canonical(canon_fork.number)?; // commit new canonical chain. self.commit_canonical(new_canon_chain)?; + // insert old canon chain self.insert_chain(old_canon_chain); } + // Broadcast new canonical blocks. + headers.into_iter().for_each(|header| { + // ignore if receiver is dropped. + let _ = self.new_block_notication_sender.send(header); + }); + Ok(()) } + /// Subscribe to new blocks events. + /// + /// Note: Only canonical blocks are send. + pub fn subscribe_new_blocks(&self) -> NewBlockNotifications { + self.new_block_notication_sender.subscribe() + } + /// Canonicalize the given chain and commit it to the database. fn commit_canonical(&mut self, chain: Chain) -> Result<(), Error> { let mut tx = Transaction::new(&self.externals.db)?; @@ -726,8 +758,8 @@ mod tests { } } - #[test] - fn sanity_path() { + #[tokio::test] + async fn sanity_path() { let data = BlockChainTestData::default(); let (mut block1, exec1) = data.blocks[0].clone(); block1.number = 11; @@ -743,6 +775,7 @@ mod tests { // make tree let config = BlockchainTreeConfig::new(1, 2, 3); let mut tree = BlockchainTree::new(externals, config).expect("failed to create tree"); + let mut new_block_notification = tree.subscribe_new_blocks(); // genesis block 10 is already canonical assert_eq!(tree.make_canonical(&H256::zero()), Ok(())); @@ -789,8 +822,13 @@ mod tests { // make block1 canonical assert_eq!(tree.make_canonical(&block1.hash()), Ok(())); + // check notification + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone()))); + // make block2 canonical assert_eq!(tree.make_canonical(&block2.hash()), Ok(())); + // check notification. + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); // Trie state: // b2 (canonical block) @@ -847,6 +885,9 @@ mod tests { // make b2a canonical assert_eq!(tree.make_canonical(&block2a_hash), Ok(())); + // check notification. + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2a.header.clone()))); + // Trie state: // b2a b2 (side chain) // | / @@ -866,6 +907,9 @@ mod tests { .assert(&tree); assert_eq!(tree.make_canonical(&block1a_hash), Ok(())); + // check notification. + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1a.header.clone()))); + // Trie state: // b2a b2 (side chain) // | / @@ -890,6 +934,11 @@ mod tests { // make b2 canonical assert_eq!(tree.make_canonical(&block2.hash()), Ok(())); + + // check notification. + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone()))); + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); + // Trie state: // b2 b2a (side chain) // | / @@ -947,6 +996,9 @@ mod tests { // commit b2a assert_eq!(tree.make_canonical(&block2.hash), Ok(())); + // check notification. + assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone()))); + // Trie state: // b2 b2a (side chain) // | / diff --git a/crates/executor/src/blockchain_tree/shareable.rs b/crates/executor/src/blockchain_tree/shareable.rs index 0f46a9cd9d..7989df13cb 100644 --- a/crates/executor/src/blockchain_tree/shareable.rs +++ b/crates/executor/src/blockchain_tree/shareable.rs @@ -4,6 +4,7 @@ use reth_db::database::Database; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer}, consensus::Consensus, + events::{ChainEventSubscriptions, NewBlockNotifications}, Error, }; use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders}; @@ -82,3 +83,11 @@ impl BlockchainTreePendingState Ok(Box::new(post_state)) } } + +impl ChainEventSubscriptions + for ShareableBlockchainTree +{ + fn subscribe_new_blocks(&self) -> NewBlockNotifications { + self.tree.read().subscribe_new_blocks() + } +} diff --git a/crates/interfaces/src/events.rs b/crates/interfaces/src/events.rs index 02a46ace62..93e21684d2 100644 --- a/crates/interfaces/src/events.rs +++ b/crates/interfaces/src/events.rs @@ -1,21 +1,18 @@ -use reth_primitives::{Header, H256}; +use reth_primitives::SealedHeader; use std::sync::Arc; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::broadcast::{Receiver, Sender}; + +/// New block notification that is Arc around [SealedHeader]. +pub type NewBlockNotification = Arc; /// Type alias for a receiver that receives [NewBlockNotification] -pub type NewBlockNotifications = UnboundedReceiver; +pub type NewBlockNotifications = Receiver; + +/// Type alias for a sender that sends [NewBlockNotification] +pub type NewBlockNotificationsSender = Sender; /// A type that allows to register chain related event subscriptions. pub trait ChainEventSubscriptions: Send + Sync { /// Get notified when a new block was imported. fn subscribe_new_blocks(&self) -> NewBlockNotifications; } - -/// A notification that's emitted when a new block was imported. -#[derive(Clone, Debug)] -pub struct NewBlockNotification { - /// Hash of the block that was imported - pub hash: H256, - /// The block header of the new block - pub header: Arc
, -} diff --git a/crates/interfaces/src/test_utils/events.rs b/crates/interfaces/src/test_utils/events.rs index 427737dad2..d62626ebaf 100644 --- a/crates/interfaces/src/test_utils/events.rs +++ b/crates/interfaces/src/test_utils/events.rs @@ -1,31 +1,27 @@ use crate::events::{ChainEventSubscriptions, NewBlockNotification, NewBlockNotifications}; use async_trait::async_trait; -use reth_primitives::{Header, H256}; +use reth_primitives::{Header, SealedHeader, H256}; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::broadcast::{self, Receiver, Sender}; /// A test ChainEventSubscriptions #[derive(Clone, Default)] pub struct TestChainEventSubscriptions { - new_blocks_txs: Arc>>>, + new_blocks_txs: Arc>>>, } impl TestChainEventSubscriptions { /// Adds new block to the queue that can be consumed with /// [`TestChainEventSubscriptions::subscribe_new_blocks`] - pub fn add_new_block(&mut self, hash: H256, header: Header) { + pub fn add_new_block(&mut self, header: SealedHeader) { let header = Arc::new(header); - self.new_blocks_txs - .lock() - .as_mut() - .unwrap() - .retain(|tx| tx.send(NewBlockNotification { hash, header: header.clone() }).is_ok()) + self.new_blocks_txs.lock().as_mut().unwrap().retain(|tx| tx.send(header.clone()).is_ok()) } } impl ChainEventSubscriptions for TestChainEventSubscriptions { fn subscribe_new_blocks(&self) -> NewBlockNotifications { - let (new_blocks_tx, new_blocks_rx) = unbounded_channel(); + let (new_blocks_tx, new_blocks_rx) = broadcast::channel(100); self.new_blocks_txs.lock().as_mut().unwrap().push(new_blocks_tx); new_blocks_rx diff --git a/crates/rpc/rpc-types/src/eth/block.rs b/crates/rpc/rpc-types/src/eth/block.rs index db886905ca..552da56a93 100644 --- a/crates/rpc/rpc-types/src/eth/block.rs +++ b/crates/rpc/rpc-types/src/eth/block.rs @@ -1,8 +1,8 @@ //! Contains types that represent ethereum types in [reth_primitives] when used in RPC use crate::Transaction; use reth_primitives::{ - Address, Block as PrimitiveBlock, Bloom, Bytes, Header as PrimitiveHeader, Withdrawal, H256, - H64, U256, + Address, Block as PrimitiveBlock, Bloom, Bytes, Header as PrimitiveHeader, SealedHeader, + Withdrawal, H256, H64, U256, }; use reth_rlp::Encodable; use serde::{ser::Error, Deserialize, Serialize, Serializer}; @@ -166,7 +166,7 @@ impl Block { let uncles = block.ommers.into_iter().map(|h| h.hash_slow()).collect(); let base_fee_per_gas = block.header.base_fee_per_gas; - let header = Header::from_primitive_with_hash(block.header, block_hash); + let header = Header::from_primitive_with_hash(block.header.seal(block_hash)); Self { header, @@ -183,7 +183,7 @@ impl Block { /// an Uncle from its header. pub fn uncle_block_from_header(header: PrimitiveHeader) -> Self { let hash = header.hash_slow(); - let rpc_header = Header::from_primitive_with_hash(header.clone(), hash); + let rpc_header = Header::from_primitive_with_hash(header.clone().seal(hash)); let uncle_block = PrimitiveBlock { header, ..Default::default() }; let size = Some(U256::from(uncle_block.length())); Self { @@ -246,29 +246,33 @@ impl Header { /// Converts the primitive header type to this RPC type /// /// CAUTION: this takes the header's hash as is and does _not_ calculate the hash. - pub fn from_primitive_with_hash(primitive_header: PrimitiveHeader, block_hash: H256) -> Self { - let PrimitiveHeader { - parent_hash, - ommers_hash, - beneficiary, - state_root, - transactions_root, - receipts_root, - logs_bloom, - difficulty, - number, - gas_limit, - gas_used, - timestamp, - mix_hash, - nonce, - base_fee_per_gas: _, - extra_data, - withdrawals_root, + pub fn from_primitive_with_hash(primitive_header: SealedHeader) -> Self { + let SealedHeader { + header: + PrimitiveHeader { + parent_hash, + ommers_hash, + beneficiary, + state_root, + transactions_root, + receipts_root, + logs_bloom, + difficulty, + number, + gas_limit, + gas_used, + timestamp, + mix_hash, + nonce, + base_fee_per_gas: _, + extra_data, + withdrawals_root, + }, + hash, } = primitive_header; Header { - hash: Some(block_hash), + hash: Some(hash), parent_hash, uncles_hash: ommers_hash, miner: beneficiary, diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 20dd300438..c98d0a5424 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -5,7 +5,7 @@ use futures::StreamExt; use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; use reth_interfaces::events::ChainEventSubscriptions; use reth_network_api::NetworkInfo; -use reth_primitives::{filter::FilteredParams, BlockId, TxHash}; +use reth_primitives::{filter::FilteredParams, TxHash}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::{ @@ -18,7 +18,7 @@ use reth_rpc_types::{ use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; use tokio_stream::{ - wrappers::{ReceiverStream, UnboundedReceiverStream}, + wrappers::{BroadcastStream, ReceiverStream}, Stream, }; @@ -121,7 +121,7 @@ async fn handle_accepted( subscription_task_spawner.spawn(Box::pin(async move { // get new block subscription let mut new_blocks = - UnboundedReceiverStream::new(pubsub.chain_events.subscribe_new_blocks()); + BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks()); // get current sync status let mut initial_sync_status = pubsub.network.is_syncing(); let current_sub_res = pubsub.sync_status(initial_sync_status).await; @@ -206,16 +206,18 @@ where { /// Returns a stream that yields all new RPC blocks. fn into_new_headers_stream(self) -> impl Stream { - UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| { - Header::from_primitive_with_hash(new_block.header.as_ref().clone(), new_block.hash) + BroadcastStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| { + let new_block = new_block.expect("new block subscription never ends; qed"); + Header::from_primitive_with_hash(new_block.as_ref().clone()) }) } /// Returns a stream that yields all logs that match the given filter. fn into_log_stream(self, filter: FilteredParams) -> impl Stream { - UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks()) + BroadcastStream::new(self.chain_events.subscribe_new_blocks()) .filter_map(move |new_block| { - let block_id: BlockId = new_block.hash.into(); + let Some(new_block) = new_block.ok() else { return futures::future::ready(None); }; + let block_id = new_block.hash.into(); let txs = self.client.transactions_by_block(block_id).ok().flatten(); let receipts = self.client.receipts_by_block(block_id).ok().flatten(); match (txs, receipts) {