diff --git a/Cargo.lock b/Cargo.lock index dee57f8411..f05e91f71b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4496,6 +4496,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", + "reth-stages", "reth-transaction-pool", "tokio", "tokio-stream", @@ -4756,6 +4757,7 @@ dependencies = [ "hex-literal 0.3.4", "modular-bitfield", "parity-scale-codec", + "parking_lot 0.12.1", "rand 0.8.5", "reth-codecs", "reth-db", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index fb03a337d2..0b9db1d32b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -9,10 +9,9 @@ use crate::{ utils::get_single_header, }; use clap::{crate_version, Parser}; -use events::NodeEvent; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{pin_mut, stream::select as stream_select, FutureExt, Stream, StreamExt}; +use futures::{pin_mut, stream::select as stream_select, FutureExt, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; use reth_db::{ @@ -27,10 +26,11 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_executor::blockchain_tree::{ - externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, + config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, + events::NewBlockNotificationSink, p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, headers::{client::StatusUpdater, downloader::HeaderDownloader}, @@ -208,19 +208,36 @@ impl Command { } }; - let pipeline = if self.auto_mine { - let (_, client, task) = AutoSealBuilder::new( + // configure blockchain tree + let tree_externals = TreeExternals::new( + db.clone(), + Arc::clone(&consensus), + Factory::new(self.chain.clone()), + Arc::clone(&self.chain), + ); + let tree_config = BlockchainTreeConfig::default(); + // The size of the broadcast is twice the maximum reorg depth, because at maximum reorg + // depth at least N blocks must be sent at once. + let new_block_notification_sender = + NewBlockNotificationSink::new(tree_config.max_reorg_depth() as usize * 2); + let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new( + tree_externals, + new_block_notification_sender.clone(), + tree_config, + )?); + + // Configure the pipeline + let mut pipeline = if self.auto_mine { + let (_, client, mut task) = AutoSealBuilder::new( Arc::clone(&self.chain), shareable_db.clone(), transaction_pool.clone(), consensus_engine_tx.clone(), + new_block_notification_sender.clone(), ) .build(); - debug!(target: "reth::cli", "Spawning auto mine task"); - ctx.task_executor.spawn(Box::pin(task)); - - let (pipeline, events) = self + let mut pipeline = self .build_networked_pipeline( &mut config, network.clone(), @@ -231,42 +248,31 @@ impl Command { ) .await?; - ctx.task_executor.spawn_critical( - "events task", - events::handle_events(Some(network.clone()), events), - ); + let pipeline_events = pipeline.events(); + task.set_pipeline_events(pipeline_events); + debug!(target: "reth::cli", "Spawning auto mine task"); + ctx.task_executor.spawn(Box::pin(task)); pipeline } else { let client = network.fetch_client().await?; - let (pipeline, events) = self - .build_networked_pipeline( - &mut config, - network.clone(), - client, - Arc::clone(&consensus), - db.clone(), - &ctx.task_executor, - ) - .await?; - - ctx.task_executor.spawn_critical( - "events task", - events::handle_events(Some(network.clone()), events), - ); - - pipeline + self.build_networked_pipeline( + &mut config, + network.clone(), + client, + Arc::clone(&consensus), + db.clone(), + &ctx.task_executor, + ) + .await? }; - // configure blockchain tree - let tree_externals = TreeExternals::new( - db.clone(), - consensus, - Factory::new(self.chain.clone()), - Arc::clone(&self.chain), + let events = stream_select( + network.event_listener().map(Into::into), + pipeline.events().map(Into::into), ); - let blockchain_tree = - ShareableBlockchainTree::new(BlockchainTree::new(tree_externals, Default::default())?); + ctx.task_executor + .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); let beacon_consensus_engine = BeaconConsensusEngine::new( Arc::clone(&db), @@ -276,7 +282,6 @@ impl Command { consensus_engine_rx, self.debug.max_block, ); - info!(target: "reth::cli", "Consensus engine initialized"); let engine_api_handle = @@ -344,7 +349,7 @@ impl Command { consensus: Arc, db: Arc>, task_executor: &TaskExecutor, - ) -> eyre::Result<(Pipeline, NetworkHandle>, impl Stream)> + ) -> eyre::Result, NetworkHandle>> where Client: HeadersClient + BodiesClient + Clone + 'static, { @@ -365,7 +370,7 @@ impl Command { .build(client, Arc::clone(&consensus), db.clone()) .into_task_with(task_executor); - let mut pipeline = self + let pipeline = self .build_pipeline( config, header_downloader, @@ -377,11 +382,7 @@ impl Command { ) .await?; - let events = stream_select( - network.event_listener().map(Into::into), - pipeline.events().map(Into::into), - ); - Ok((pipeline, events)) + Ok(pipeline) } fn load_config(&self) -> eyre::Result { diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml index 8a6351a8c9..a7f232458c 100644 --- a/crates/consensus/auto-seal/Cargo.toml +++ b/crates/consensus/auto-seal/Cargo.toml @@ -13,6 +13,7 @@ reth-beacon-consensus = { path = "../beacon" } reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-provider = { path = "../../storage/provider" } +reth-stages = { path = "../../stages" } reth-revm = { path = "../../revm" } reth-transaction-pool = { path = "../../transaction-pool" } diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index e8c9696511..d5c18d30ad 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -14,11 +14,13 @@ //! These downloaders poll the miner, assemble the block, and return transactions that are ready to //! be mined. +use reth_beacon_consensus::BeaconEngineMessage; use reth_interfaces::consensus::{Consensus, ConsensusError}; use reth_primitives::{ BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock, SealedHeader, H256, U256, }; +use reth_transaction_pool::TransactionPool; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::trace; @@ -29,8 +31,7 @@ mod task; pub use crate::client::AutoSealClient; pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner}; -use reth_beacon_consensus::BeaconEngineMessage; -use reth_transaction_pool::TransactionPool; +use reth_interfaces::events::NewBlockNotificationSink; pub use task::MiningTask; /// A consensus implementation intended for local development and testing purposes. @@ -82,6 +83,7 @@ pub struct AutoSealBuilder { mode: MiningMode, storage: Storage, to_engine: UnboundedSender, + new_block_notification_sender: NewBlockNotificationSink, } // === impl AutoSealBuilder === @@ -93,6 +95,7 @@ impl AutoSealBuilder { client: Client, pool: Pool, to_engine: UnboundedSender, + new_block_notification_sender: NewBlockNotificationSink, ) -> Self { let mode = MiningMode::interval(std::time::Duration::from_secs(1)); Self { @@ -102,6 +105,7 @@ impl AutoSealBuilder { pool, mode, to_engine, + new_block_notification_sender, } } @@ -113,12 +117,21 @@ impl AutoSealBuilder { /// Consumes the type and returns all components pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask) { - let Self { client, consensus, pool, mode, storage, to_engine } = self; + let Self { + client, + consensus, + pool, + mode, + storage, + to_engine, + new_block_notification_sender, + } = self; let auto_client = AutoSealClient::new(storage.clone()); let task = MiningTask::new( Arc::clone(&consensus.chain_spec), mode, to_engine, + new_block_notification_sender, storage, client, pool, diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 1a7b5bb62e..cbdab03466 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -1,7 +1,7 @@ use crate::{mode::MiningMode, Storage}; -use futures_util::{future::BoxFuture, FutureExt}; +use futures_util::{future::BoxFuture, FutureExt, StreamExt}; use reth_beacon_consensus::BeaconEngineMessage; -use reth_interfaces::consensus::ForkchoiceState; +use reth_interfaces::{consensus::ForkchoiceState, events::NewBlockNotificationSink}; use reth_primitives::{ constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS}, proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, @@ -12,6 +12,7 @@ use reth_revm::{ database::{State, SubState}, executor::Executor, }; +use reth_stages::{stages::FINISH, PipelineEvent}; use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; use std::{ collections::VecDeque, @@ -22,7 +23,8 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::sync::{mpsc::UnboundedSender, oneshot}; -use tracing::{trace, warn}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, trace, warn}; /// A Future that listens for new ready transactions and puts new blocks into storage pub struct MiningTask { @@ -33,7 +35,7 @@ pub struct MiningTask { /// The active miner miner: MiningMode, /// Single active future that inserts a new block into `storage` - insert_task: Option>, + insert_task: Option>>>, /// Shared storage to insert new blocks storage: Storage, /// Pool where transactions are stored @@ -42,6 +44,10 @@ pub struct MiningTask { queued: VecDeque::Transaction>>>>, /// TODO: ideally this would just be a sender of hashes to_engine: UnboundedSender, + /// Used to notify consumers of new blocks + new_block_notification_sender: NewBlockNotificationSink, + /// The pipeline events to listen on + pipe_line_events: Option>, } // === impl MiningTask === @@ -52,6 +58,7 @@ impl MiningTask { chain_spec: Arc, miner: MiningMode, to_engine: UnboundedSender, + new_block_notification_sender: NewBlockNotificationSink, storage: Storage, client: Client, pool: Pool, @@ -64,9 +71,16 @@ impl MiningTask { storage, pool, to_engine, + new_block_notification_sender, queued: Default::default(), + pipe_line_events: None, } } + + /// Sets the pipeline events to listen on. + pub fn set_pipeline_events(&mut self, events: UnboundedReceiverStream) { + self.pipe_line_events = Some(events); + } } impl Future for MiningTask @@ -101,6 +115,11 @@ where let client = this.client.clone(); let chain_spec = Arc::clone(&this.chain_spec); let pool = this.pool.clone(); + let mut events = this.pipe_line_events.take(); + let new_block_notification_sender = this.new_block_notification_sender.clone(); + + // Create the mining future that creates a block, notifies the engine that drives + // the pipeline this.insert_task = Some(Box::pin(async move { let mut storage = storage.write().await; let mut header = Header { @@ -151,7 +170,6 @@ where let Block { mut header, body, .. } = block; // clear all transactions from pool - // TODO this should happen automatically via events pool.remove_transactions(body.iter().map(|tx| tx.hash)); header.receipts_root = if res.receipts().is_empty() { @@ -169,7 +187,7 @@ where BlockBody { transactions: body, ommers: vec![], withdrawals: None }; header.gas_used = gas_used; - storage.insert_new_block(header, body); + storage.insert_new_block(header.clone(), body); let new_hash = storage.best_hash; let state = ForkchoiceState { @@ -177,25 +195,54 @@ where finalized_block_hash: new_hash, safe_block_hash: new_hash, }; + drop(storage); - trace!(target: "consensus::auto", ?state, "sending fork choice update"); + // send the new update to the engine, this will trigger the pipeline to + // download the block, execute it and store it in the database. let (tx, _rx) = oneshot::channel(); let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs: None, tx, }); + debug!(target: "consensus::auto", ?state, "sent fork choice update"); + + // wait for the pipeline to finish + if let Some(events) = events.as_mut() { + debug!(target: "consensus::auto", "waiting for finish stage event..."); + // wait for the finish stage to + loop { + if let Some(PipelineEvent::Running { stage_id, .. }) = + events.next().await + { + if stage_id == FINISH { + debug!(target: "consensus::auto", "received finish stage event"); + break + } + } + } + } + + let header = header.seal_slow(); + debug!(target: "consensus::auto", header=?header.hash(), "sending block notification"); + + // send block notification + let _ = new_block_notification_sender.send(Arc::new(header)); } Err(err) => { warn!(target: "consensus::auto", ?err, "failed to execute block") } } + + events })); } if let Some(mut fut) = this.insert_task.take() { match fut.poll_unpin(cx) { - Poll::Ready(_) => {} + Poll::Ready(events) => { + this.pipe_line_events = events; + } Poll::Pending => { this.insert_task = Some(fut); break diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index f7b3a89971..bc8cefecbc 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -469,7 +469,9 @@ mod tests { post_state::PostState, test_utils::TestExecutorFactory, }; - use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus}; + use reth_interfaces::{ + events::NewBlockNotificationSink, sync::NoopSyncStateUpdate, test_utils::TestConsensus, + }; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::Transaction; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; @@ -550,7 +552,8 @@ mod tests { let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); let config = BlockchainTreeConfig::new(1, 2, 3); let tree = ShareableBlockchainTree::new( - BlockchainTree::new(externals, config).expect("failed to create tree"), + BlockchainTree::new(externals, NewBlockNotificationSink::new(2), config) + .expect("failed to create tree"), ); let (sync_tx, sync_rx) = unbounded_channel(); diff --git a/crates/executor/src/blockchain_tree/mod.rs b/crates/executor/src/blockchain_tree/mod.rs index 666cb7633b..4a823c9d54 100644 --- a/crates/executor/src/blockchain_tree/mod.rs +++ b/crates/executor/src/blockchain_tree/mod.rs @@ -2,11 +2,8 @@ use chain::{BlockChainId, Chain, ForkBlock}; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{ - blockchain_tree::BlockStatus, - consensus::Consensus, - events::{NewBlockNotifications, NewBlockNotificationsSender}, - executor::Error as ExecError, - Error, + blockchain_tree::BlockStatus, consensus::Consensus, events::NewBlockNotifications, + executor::Error as ExecError, Error, }; use reth_primitives::{ BlockHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, SealedHeader, U256, @@ -34,6 +31,7 @@ pub use shareable::ShareableBlockchainTree; pub mod post_state_data; pub use post_state_data::{PostStateData, PostStateDataRef}; +use reth_interfaces::events::NewBlockNotificationSink; #[cfg_attr(doc, aquamarine::aquamarine)] /// Tree of chains and its identifications. @@ -88,7 +86,7 @@ pub struct BlockchainTree { /// Tree configuration config: BlockchainTreeConfig, /// Unbounded channel for sending new block notifications. - new_block_notication_sender: NewBlockNotificationsSender, + new_block_notification_sender: NewBlockNotificationSink, } /// A container that wraps chains and block indices to allow searching for block hashes across all @@ -104,6 +102,7 @@ impl BlockchainTree /// Create a new blockchain tree. pub fn new( externals: TreeExternals, + new_block_notification_sender: NewBlockNotificationSink, config: BlockchainTreeConfig, ) -> Result { let max_reorg_depth = config.max_reorg_depth(); @@ -127,11 +126,6 @@ impl BlockchainTree last_canonical_hashes.last().cloned().unwrap_or_default() }; - // size of the broadcast is double of max reorg depth because at max reorg depth we can have - // send at least N block at the time. - let (new_block_notication_sender, _) = - tokio::sync::broadcast::channel(2 * max_reorg_depth as usize); - Ok(Self { externals, block_chain_id_generator: 0, @@ -141,7 +135,7 @@ impl BlockchainTree BTreeMap::from_iter(last_canonical_hashes.into_iter()), ), config, - new_block_notication_sender, + new_block_notification_sender, }) } @@ -590,7 +584,7 @@ impl BlockchainTree // Broadcast new canonical blocks. headers.into_iter().for_each(|header| { // ignore if receiver is dropped. - let _ = self.new_block_notication_sender.send(header); + let _ = self.new_block_notification_sender.send(header); }); Ok(()) @@ -600,7 +594,7 @@ impl BlockchainTree /// /// Note: Only canonical blocks are send. pub fn subscribe_new_blocks(&self) -> NewBlockNotifications { - self.new_block_notication_sender.subscribe() + self.new_block_notification_sender.subscribe() } /// Canonicalize the given chain and commit it to the database. @@ -783,7 +777,8 @@ mod tests { // make tree let config = BlockchainTreeConfig::new(1, 2, 3); - let mut tree = BlockchainTree::new(externals, config).expect("failed to create tree"); + let mut tree = BlockchainTree::new(externals, NewBlockNotificationSink::new(10), config) + .expect("failed to create tree"); let mut new_block_notification = tree.subscribe_new_blocks(); // genesis block 10 is already canonical diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 6a571efba9..f496b85970 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -32,6 +32,7 @@ secp256k1 = { version = "0.26.0", default-features = false, features = [ "rand", ], optional = true } modular-bitfield = "0.11.2" +parking_lot = "0.12.1" [dev-dependencies] reth-db = { path = "../storage/db", features = ["test-utils"] } diff --git a/crates/interfaces/src/events.rs b/crates/interfaces/src/events.rs index 93e21684d2..91c8b54ed0 100644 --- a/crates/interfaces/src/events.rs +++ b/crates/interfaces/src/events.rs @@ -1,6 +1,7 @@ +use parking_lot::Mutex; use reth_primitives::SealedHeader; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast::{error::SendError, Receiver, Sender}; /// New block notification that is Arc around [SealedHeader]. pub type NewBlockNotification = Arc; @@ -16,3 +17,38 @@ pub trait ChainEventSubscriptions: Send + Sync { /// Get notified when a new block was imported. fn subscribe_new_blocks(&self) -> NewBlockNotifications; } + +/// A shareable Sender that allows to send [NewBlockNotification] to all receivers. +#[derive(Debug, Clone)] +pub struct NewBlockNotificationSink { + inner: Arc>>, +} + +// === impl NewBlockNotificationSink === + +impl NewBlockNotificationSink { + /// Creates a new NewBlockNotificationSink with the given capacity. + // // size of the broadcast is double of max reorg depth because at max reorg depth we can have + // // send at least N block at the time. + pub fn new(capacity: usize) -> Self { + let inner = tokio::sync::broadcast::channel(capacity); + Self { inner: Arc::new(Mutex::new(inner.0)) } + } + + /// Attempts to send a value to all active Receiver handles, returning it back if it could not + /// be sent. + pub fn send( + &self, + header: NewBlockNotification, + ) -> Result> { + let sender = self.inner.lock(); + sender.send(header) + } + + /// Creates a new Receiver handle that will receive notifications sent after this call to + /// subscribe. + pub fn subscribe(&self) -> Receiver { + let sender = self.inner.lock(); + sender.subscribe() + } +} diff --git a/crates/rpc/rpc-api/src/eth_pubsub.rs b/crates/rpc/rpc-api/src/eth_pubsub.rs index d27c068ec1..790c4251a4 100644 --- a/crates/rpc/rpc-api/src/eth_pubsub.rs +++ b/crates/rpc/rpc-api/src/eth_pubsub.rs @@ -6,7 +6,7 @@ use reth_rpc_types::pubsub::{Params, SubscriptionKind}; pub trait EthPubSubApi { /// Create an ethereum subscription for the given params #[subscription( - name = "eth_subscribe", + name = "eth_subscribe" => "eth_subscription", unsubscribe = "eth_unsubscribe", item = reth_rpc_types::pubsub::SubscriptionResult )]