diff --git a/Cargo.lock b/Cargo.lock index 665a8f008f..d5b7479793 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,6 +4449,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-util", "proptest", + "reth-auto-seal-consensus", "reth-beacon-consensus", "reth-db", "reth-discv4", @@ -4479,6 +4480,23 @@ dependencies = [ "tui", ] +[[package]] +name = "reth-auto-seal-consensus" +version = "0.1.0" +dependencies = [ + "futures-util", + "reth-beacon-consensus", + "reth-executor", + "reth-interfaces", + "reth-primitives", + "reth-provider", + "reth-revm", + "reth-transaction-pool", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-beacon-consensus" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0da5e418d6..e2bd810c18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "bin/reth", + "crates/consensus/auto-seal", "crates/consensus/beacon", "crates/consensus/common", "crates/executor", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 421b2477c3..f5d388032e 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -18,6 +18,7 @@ reth-stages = { path = "../../crates/stages"} reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"] } reth-transaction-pool = { path = "../../crates/transaction-pool" } reth-beacon-consensus = { path = "../../crates/consensus/beacon" } +reth-auto-seal-consensus = { path = "../../crates/consensus/auto-seal" } reth-executor = { path = "../../crates/executor" } reth-rpc-engine-api = { path = "../../crates/rpc/rpc-engine-api" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 10a134879f..145dc65d0f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -13,6 +13,7 @@ use events::NodeEvent; use eyre::Context; use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, FutureExt, Stream, StreamExt}; +use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; use reth_db::{ database::Database, @@ -33,10 +34,7 @@ use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, - headers::{ - client::{HeadersClient, StatusUpdater}, - downloader::HeaderDownloader, - }, + headers::{client::StatusUpdater, downloader::HeaderDownloader}, }, sync::SyncStateUpdater, }; @@ -71,6 +69,9 @@ use tokio::sync::{ }; use tracing::*; +use reth_interfaces::p2p::headers::client::HeadersClient; +use reth_stages::stages::{MERKLE_EXECUTION, MERKLE_UNWIND}; + pub mod events; /// Start the node @@ -121,6 +122,10 @@ pub struct Command { #[clap(flatten)] debug: DebugArgs, + + /// Automatically mine blocks for new transactions + #[arg(long)] + auto_mine: bool, } impl Command { @@ -146,7 +151,12 @@ impl Command { init_genesis(db.clone(), self.chain.clone())?; - let consensus: Arc = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))); + let consensus: Arc = if self.auto_mine { + debug!(target: "reth::cli", "Using auto seal"); + Arc::new(AutoSealConsensus::new(Arc::clone(&self.chain))) + } else { + Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) + }; self.init_trusted_nodes(&mut config); @@ -198,20 +208,55 @@ impl Command { } }; - let client = network.fetch_client().await?; - let (pipeline, events) = self - .build_networked_pipeline( - &mut config, - network.clone(), - client, - Arc::clone(&consensus), - db.clone(), - &ctx.task_executor, + let pipeline = if self.auto_mine { + let (_, client, task) = AutoSealBuilder::new( + Arc::clone(&self.chain), + shareable_db.clone(), + transaction_pool.clone(), + consensus_engine_tx.clone(), ) - .await?; + .build(); - ctx.task_executor - .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); + debug!(target: "reth::cli", "Spawning auto mine task"); + ctx.task_executor.spawn(Box::pin(task)); + + let (pipeline, events) = self + .build_networked_pipeline( + &mut config, + network.clone(), + client, + Arc::clone(&consensus), + db.clone(), + &ctx.task_executor, + ) + .await?; + + ctx.task_executor.spawn_critical( + "events task", + events::handle_events(Some(network.clone()), events), + ); + + pipeline + } else { + let client = network.fetch_client().await?; + let (pipeline, events) = self + .build_networked_pipeline( + &mut config, + network.clone(), + client, + Arc::clone(&consensus), + db.clone(), + &ctx.task_executor, + ) + .await?; + + ctx.task_executor.spawn_critical( + "events task", + events::handle_events(Some(network.clone()), events), + ); + + pipeline + }; // configure blockchain tree let tree_externals = TreeExternals::new( @@ -311,7 +356,6 @@ impl Command { None }; - // TODO: remove Arc requirement from downloader builders. // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers) .build(client.clone(), Arc::clone(&consensus)) @@ -581,7 +625,9 @@ impl Command { .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, }) - .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)), + .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)) + .disable_if(MERKLE_UNWIND, || self.auto_mine) + .disable_if(MERKLE_EXECUTION, || self.auto_mine), ) .build(); diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml new file mode 100644 index 0000000000..de504574e4 --- /dev/null +++ b/crates/consensus/auto-seal/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "reth-auto-seal-consensus" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/paradigmxyz/reth" +readme = "README.md" +description = "A consensus impl for local testing purposes" + +[dependencies] +# reth +reth-beacon-consensus = { path = "../beacon" } +reth-primitives = { path = "../../primitives" } +reth-interfaces = { path = "../../interfaces" } +reth-provider = { path = "../../storage/provider" } +reth-revm = { path = "../../revm" } +reth-executor = { path = "../../executor" } +reth-transaction-pool = { path = "../../transaction-pool" } + +# async +futures-util = "0.3" +tokio = { version = "1", features = ["sync", "time"] } +tokio-stream = "0.1" +tracing = "0.1" + +[dev-dependencies] +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } diff --git a/crates/consensus/auto-seal/src/client.rs b/crates/consensus/auto-seal/src/client.rs new file mode 100644 index 0000000000..0658cc969a --- /dev/null +++ b/crates/consensus/auto-seal/src/client.rs @@ -0,0 +1,130 @@ +//! This includes download client implementations for auto sealing miners. +use crate::Storage; +use reth_interfaces::p2p::{ + bodies::client::{BodiesClient, BodiesFut}, + download::DownloadClient, + headers::client::{HeadersClient, HeadersFut, HeadersRequest}, + priority::Priority, +}; +use reth_primitives::{ + BlockBody, BlockHashOrNumber, Header, HeadersDirection, PeerId, WithPeerId, H256, +}; +use std::fmt::Debug; +use tracing::{trace, warn}; + +/// A download client that polls the miner for transactions and assembles blocks to be returned in +/// the download process. +/// +/// When polled, the miner will assemble blocks when miners produce ready transactions and store the +/// blocks in memory. +#[derive(Debug, Clone)] +pub struct AutoSealClient { + storage: Storage, +} + +impl AutoSealClient { + pub(crate) fn new(storage: Storage) -> Self { + Self { storage } + } + + async fn fetch_headers(&self, request: HeadersRequest) -> Vec
{ + trace!(target: "consensus::auto", ?request, "received headers request"); + + let storage = self.storage.read().await; + let HeadersRequest { start, limit, direction } = request; + let mut headers = Vec::new(); + + let mut block: BlockHashOrNumber = match start { + BlockHashOrNumber::Hash(start) => start.into(), + BlockHashOrNumber::Number(num) => { + if let Some(hash) = storage.block_hash(num) { + hash.into() + } else { + warn!(target: "consensus::auto", num, "no matching block found"); + return headers + } + } + }; + + for _ in 0..limit { + // fetch from storage + if let Some(header) = storage.header_by_hash_or_number(block) { + match direction { + HeadersDirection::Falling => block = header.parent_hash.into(), + HeadersDirection::Rising => { + let next = header.number + 1; + block = next.into() + } + } + headers.push(header); + } else { + break + } + } + + trace!(target: "consensus::auto", ?headers, "returning headers"); + + headers + } + + async fn fetch_bodies(&self, hashes: Vec) -> Vec { + trace!(target: "consensus::auto", ?hashes, "received bodies request"); + let storage = self.storage.read().await; + let mut bodies = Vec::new(); + for hash in hashes { + if let Some(body) = storage.bodies.get(&hash).cloned() { + bodies.push(body); + } else { + break + } + } + + trace!(target: "consensus::auto", ?bodies, "returning bodies"); + + bodies + } +} + +impl HeadersClient for AutoSealClient { + type Output = HeadersFut; + + fn get_headers_with_priority( + &self, + request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + let this = self.clone(); + Box::pin(async move { + let headers = this.fetch_headers(request).await; + Ok(WithPeerId::new(PeerId::random(), headers)) + }) + } +} + +impl BodiesClient for AutoSealClient { + type Output = BodiesFut; + + fn get_block_bodies_with_priority( + &self, + hashes: Vec, + _priority: Priority, + ) -> Self::Output { + let this = self.clone(); + Box::pin(async move { + let bodies = this.fetch_bodies(hashes).await; + Ok(WithPeerId::new(PeerId::random(), bodies)) + }) + } +} + +impl DownloadClient for AutoSealClient { + fn report_bad_message(&self, _peer_id: PeerId) { + warn!("Reported a bad message on a miner, we should never produce bad blocks"); + // noop + } + + fn num_connected_peers(&self) -> usize { + // no such thing as connected peers when we are mining ourselves + 1 + } +} diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs new file mode 100644 index 0000000000..e8c9696511 --- /dev/null +++ b/crates/consensus/auto-seal/src/lib.rs @@ -0,0 +1,203 @@ +#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! A [Consensus] implementation for local testing purposes +//! that automatically seals blocks. +//! +//! The Mining task polls a [MiningMode], and will return a list of transactions that are ready to +//! be mined. +//! +//! These downloaders poll the miner, assemble the block, and return transactions that are ready to +//! be mined. + +use reth_interfaces::consensus::{Consensus, ConsensusError}; +use reth_primitives::{ + BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock, + SealedHeader, H256, U256, +}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tracing::trace; + +mod client; +mod mode; +mod task; + +pub use crate::client::AutoSealClient; +pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner}; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_transaction_pool::TransactionPool; +pub use task::MiningTask; + +/// A consensus implementation intended for local development and testing purposes. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct AutoSealConsensus { + /// Configuration + chain_spec: Arc, +} + +impl AutoSealConsensus { + /// Create a new instance of [AutoSealConsensus] + pub fn new(chain_spec: Arc) -> Self { + Self { chain_spec } + } +} + +impl Consensus for AutoSealConsensus { + fn pre_validate_header( + &self, + _header: &SealedHeader, + _parent: &SealedHeader, + ) -> Result<(), ConsensusError> { + Ok(()) + } + + fn validate_header( + &self, + _header: &SealedHeader, + _total_difficulty: U256, + ) -> Result<(), ConsensusError> { + Ok(()) + } + + fn pre_validate_block(&self, _block: &SealedBlock) -> Result<(), ConsensusError> { + Ok(()) + } + + fn has_block_reward(&self, _total_difficulty: U256, _difficulty: U256) -> bool { + false + } +} + +/// Builder type for configuring the setup +pub struct AutoSealBuilder { + client: Client, + consensus: AutoSealConsensus, + pool: Pool, + mode: MiningMode, + storage: Storage, + to_engine: UnboundedSender, +} + +// === impl AutoSealBuilder === + +impl AutoSealBuilder { + /// Creates a new builder instance to configure all parts. + pub fn new( + chain_spec: Arc, + client: Client, + pool: Pool, + to_engine: UnboundedSender, + ) -> Self { + let mode = MiningMode::interval(std::time::Duration::from_secs(1)); + Self { + storage: Storage::new(&chain_spec), + client, + consensus: AutoSealConsensus::new(chain_spec), + pool, + mode, + to_engine, + } + } + + /// Sets the [MiningMode] it operates in, default is [MiningMode::Auto] + pub fn mode(mut self, mode: MiningMode) -> Self { + self.mode = mode; + self + } + + /// Consumes the type and returns all components + pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask) { + let Self { client, consensus, pool, mode, storage, to_engine } = self; + let auto_client = AutoSealClient::new(storage.clone()); + let task = MiningTask::new( + Arc::clone(&consensus.chain_spec), + mode, + to_engine, + storage, + client, + pool, + ); + (consensus, auto_client, task) + } +} + +/// In memory storage +#[derive(Debug, Clone, Default)] +pub(crate) struct Storage { + inner: Arc>, +} + +// == impl Storage === + +impl Storage { + fn new(chain_spec: &ChainSpec) -> Self { + let header = chain_spec.genesis_header(); + let best_hash = header.hash_slow(); + Self { inner: Arc::new(RwLock::new(StorageInner { best_hash, ..Default::default() })) } + } + + /// Returns the write lock of the storage + pub(crate) async fn write(&self) -> RwLockWriteGuard<'_, StorageInner> { + self.inner.write().await + } + + /// Returns the read lock of the storage + pub(crate) async fn read(&self) -> RwLockReadGuard<'_, StorageInner> { + self.inner.read().await + } +} + +#[derive(Default, Debug)] +pub(crate) struct StorageInner { + /// Headers buffered for download. + pub(crate) headers: HashMap, + /// A mapping between block hash and number. + pub(crate) hash_to_number: HashMap, + /// Bodies buffered for download. + pub(crate) bodies: HashMap, + /// Tracks best block + pub(crate) best_block: u64, + /// Tracks hash of best block + pub(crate) best_hash: H256, +} + +// === impl StorageInner === + +impl StorageInner { + /// Returns the block hash for the given block number if it exists. + pub(crate) fn block_hash(&self, num: u64) -> Option { + self.hash_to_number.iter().find_map(|(k, v)| num.eq(v).then_some(*k)) + } + + /// Returns the matching header if it exists. + pub(crate) fn header_by_hash_or_number( + &self, + hash_or_num: BlockHashOrNumber, + ) -> Option
{ + let num = match hash_or_num { + BlockHashOrNumber::Hash(hash) => self.hash_to_number.get(&hash).copied()?, + BlockHashOrNumber::Number(num) => num, + }; + self.headers.get(&num).cloned() + } + + /// Inserts a new header+body pair + pub(crate) fn insert_new_block(&mut self, mut header: Header, body: BlockBody) { + header.number = self.best_block + 1; + header.parent_hash = self.best_hash; + + self.best_hash = header.hash_slow(); + self.best_block = header.number; + + trace!(target: "consensus::auto", num=self.best_block, hash=?self.best_hash, "inserting new block"); + self.headers.insert(header.number, header); + self.bodies.insert(self.best_hash, body); + self.hash_to_number.insert(self.best_hash, self.best_block); + } +} diff --git a/crates/consensus/auto-seal/src/mode.rs b/crates/consensus/auto-seal/src/mode.rs new file mode 100644 index 0000000000..809455311e --- /dev/null +++ b/crates/consensus/auto-seal/src/mode.rs @@ -0,0 +1,155 @@ +//! The mode the auto seal miner is operating in. + +use futures_util::{stream::Fuse, StreamExt}; +use reth_primitives::TxHash; +use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; +use std::{ + fmt, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{sync::mpsc::Receiver, time::Interval}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; + +/// Mode of operations for the `Miner` +#[derive(Debug)] +pub enum MiningMode { + /// A miner that does nothing + None, + /// A miner that listens for new transactions that are ready. + /// + /// Either one transaction will be mined per block, or any number of transactions will be + /// allowed + Auto(ReadyTransactionMiner), + /// A miner that constructs a new block every `interval` tick + FixedBlockTime(FixedBlockTimeMiner), +} + +// === impl MiningMode === + +impl MiningMode { + /// Creates a new instant mining mode that listens for new transactions and tries to build + /// non-empty blocks as soon as transactions arrive. + pub fn instant(max_transactions: usize, listener: Receiver) -> Self { + MiningMode::Auto(ReadyTransactionMiner { + max_transactions, + has_pending_txs: None, + rx: ReceiverStream::new(listener).fuse(), + }) + } + + /// Creates a new interval miner that builds a block ever `duration`. + pub fn interval(duration: Duration) -> Self { + MiningMode::FixedBlockTime(FixedBlockTimeMiner::new(duration)) + } + + /// polls the Pool and returns those transactions that should be put in a block, if any. + pub(crate) fn poll( + &mut self, + pool: &Pool, + cx: &mut Context<'_>, + ) -> Poll::Transaction>>>> + where + Pool: TransactionPool, + { + match self { + MiningMode::None => Poll::Pending, + MiningMode::Auto(miner) => miner.poll(pool, cx), + MiningMode::FixedBlockTime(miner) => miner.poll(pool, cx), + } + } +} + +/// A miner that's supposed to create a new block every `interval`, mining all transactions that are +/// ready at that time. +/// +/// The default blocktime is set to 6 seconds +#[derive(Debug)] +pub struct FixedBlockTimeMiner { + /// The interval this fixed block time miner operates with + interval: Interval, +} + +// === impl FixedBlockTimeMiner === + +impl FixedBlockTimeMiner { + /// Creates a new instance with an interval of `duration` + pub(crate) fn new(duration: Duration) -> Self { + let start = tokio::time::Instant::now() + duration; + Self { interval: tokio::time::interval_at(start, duration) } + } + + fn poll( + &mut self, + pool: &Pool, + cx: &mut Context<'_>, + ) -> Poll::Transaction>>>> + where + Pool: TransactionPool, + { + if self.interval.poll_tick(cx).is_ready() { + // drain the pool + return Poll::Ready(pool.best_transactions().collect()) + } + Poll::Pending + } +} + +impl Default for FixedBlockTimeMiner { + fn default() -> Self { + Self::new(Duration::from_secs(6)) + } +} + +/// A miner that Listens for new ready transactions +pub struct ReadyTransactionMiner { + /// how many transactions to mine per block + max_transactions: usize, + /// stores whether there are pending transactions (if known) + has_pending_txs: Option, + /// Receives hashes of transactions that are ready + rx: Fuse>, +} + +// === impl ReadyTransactionMiner === + +impl ReadyTransactionMiner { + fn poll( + &mut self, + pool: &Pool, + cx: &mut Context<'_>, + ) -> Poll::Transaction>>>> + where + Pool: TransactionPool, + { + // drain the notification stream + while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) { + self.has_pending_txs = Some(true); + } + + if self.has_pending_txs == Some(false) { + return Poll::Pending + } + + let transactions = pool.best_transactions().take(self.max_transactions).collect::>(); + + // there are pending transactions if we didn't drain the pool + self.has_pending_txs = Some(transactions.len() >= self.max_transactions); + + if transactions.is_empty() { + return Poll::Pending + } + + Poll::Ready(transactions) + } +} + +impl fmt::Debug for ReadyTransactionMiner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadyTransactionMiner") + .field("max_transactions", &self.max_transactions) + .finish_non_exhaustive() + } +} diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs new file mode 100644 index 0000000000..766c84c5ce --- /dev/null +++ b/crates/consensus/auto-seal/src/task.rs @@ -0,0 +1,213 @@ +use crate::{mode::MiningMode, Storage}; +use futures_util::{future::BoxFuture, FutureExt}; +use reth_beacon_consensus::BeaconEngineMessage; +use reth_executor::executor::Executor; +use reth_interfaces::consensus::ForkchoiceState; +use reth_primitives::{ + constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS}, + proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, + EMPTY_OMMER_ROOT, U256, +}; +use reth_provider::StateProviderFactory; +use reth_revm::database::{State, SubState}; +use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; +use std::{ + collections::VecDeque, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tracing::{trace, warn}; + +/// A Future that listens for new ready transactions and puts new blocks into storage +pub struct MiningTask { + /// The configured chain spec + chain_spec: Arc, + /// The client used to interact with the state + client: Client, + /// The active miner + miner: MiningMode, + /// Single active future that inserts a new block into `storage` + insert_task: Option>, + /// Shared storage to insert new blocks + storage: Storage, + /// Pool where transactions are stored + pool: Pool, + /// backlog of sets of transactions ready to be mined + queued: VecDeque::Transaction>>>>, + /// TODO: ideally this would just be a sender of hashes + to_engine: UnboundedSender, +} + +// === impl MiningTask === + +impl MiningTask { + /// Creates a new instance of the task + pub(crate) fn new( + chain_spec: Arc, + miner: MiningMode, + to_engine: UnboundedSender, + storage: Storage, + client: Client, + pool: Pool, + ) -> Self { + Self { + chain_spec, + client, + miner, + insert_task: None, + storage, + pool, + to_engine, + queued: Default::default(), + } + } +} + +impl Future for MiningTask +where + Client: StateProviderFactory + Clone + Unpin + 'static, + Pool: TransactionPool + Unpin + 'static, + ::Transaction: IntoRecoveredTransaction, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // this drives block production and + loop { + if let Poll::Ready(transactions) = this.miner.poll(&this.pool, cx) { + // miner returned a set of transaction that we feed to the producer + this.queued.push_back(transactions); + } + + if this.insert_task.is_none() { + if this.queued.is_empty() { + // nothing to insert + break + } + + // ready to queue in new insert task + let storage = this.storage.clone(); + let transactions = this.queued.pop_front().expect("not empty"); + + let to_engine = this.to_engine.clone(); + let client = this.client.clone(); + let chain_spec = Arc::clone(&this.chain_spec); + let pool = this.pool.clone(); + this.insert_task = Some(Box::pin(async move { + let mut storage = storage.write().await; + let mut header = Header { + parent_hash: storage.best_hash, + ommers_hash: EMPTY_OMMER_ROOT, + beneficiary: Default::default(), + state_root: Default::default(), + transactions_root: Default::default(), + receipts_root: Default::default(), + withdrawals_root: None, + logs_bloom: Default::default(), + difficulty: Default::default(), + number: storage.best_block + 1, + gas_limit: 30_000_000, + gas_used: 0, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + mix_hash: Default::default(), + nonce: 0, + base_fee_per_gas: None, + extra_data: Default::default(), + }; + + let transactions = transactions + .into_iter() + .map(|tx| tx.to_recovered_transaction().into_signed()) + .collect::>(); + + header.transactions_root = if transactions.is_empty() { + EMPTY_TRANSACTIONS + } else { + proofs::calculate_transaction_root(transactions.iter()) + }; + + let block = + Block { header, body: transactions, ommers: vec![], withdrawals: None }; + + // execute the new block + let substate = SubState::new(State::new(client.latest().unwrap())); + let mut executor = Executor::new(chain_spec, substate); + + trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions"); + + match executor.execute_transactions(&block, U256::ZERO, None) { + Ok((res, gas_used)) => { + let Block { mut header, body, .. } = block; + + // clear all transactions from pool + // TODO this should happen automatically via events + pool.remove_transactions(body.iter().map(|tx| tx.hash)); + + header.receipts_root = if res.receipts().is_empty() { + EMPTY_RECEIPTS + } else { + let receipts_with_bloom = res + .receipts() + .iter() + .map(|r| r.clone().into()) + .collect::>(); + proofs::calculate_receipt_root(receipts_with_bloom.iter()) + }; + + let body = + BlockBody { transactions: body, ommers: vec![], withdrawals: None }; + header.gas_used = gas_used; + + storage.insert_new_block(header, body); + + let new_hash = storage.best_hash; + let state = ForkchoiceState { + head_block_hash: new_hash, + finalized_block_hash: new_hash, + safe_block_hash: new_hash, + }; + + trace!(target: "consensus::auto", ?state, "sending fork choice update"); + let (tx, _rx) = oneshot::channel(); + let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs: None, + tx, + }); + } + Err(err) => { + warn!(target: "consensus::auto", ?err, "failed to execute block") + } + } + })); + } + + if let Some(mut fut) = this.insert_task.take() { + match fut.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + this.insert_task = Some(fut); + break + } + } + } + } + + Poll::Pending + } +} + +impl std::fmt::Debug for MiningTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MiningTask").finish_non_exhaustive() + } +} diff --git a/crates/primitives/src/constants.rs b/crates/primitives/src/constants.rs index 3004b8084b..258c93a5ea 100644 --- a/crates/primitives/src/constants.rs +++ b/crates/primitives/src/constants.rs @@ -38,6 +38,15 @@ pub const KECCAK_EMPTY: H256 = pub const EMPTY_OMMER_ROOT: H256 = H256(hex!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347")); -/// Withdrawals root of empty withdrawals set. -pub const EMPTY_WITHDRAWALS: H256 = +/// hash of an empty set `keccak256(rlp([]))` +const EMPTY_SET_HASH: H256 = H256(hex!("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")); + +/// Transactions root of empty receipts set. +pub const EMPTY_RECEIPTS: H256 = EMPTY_SET_HASH; + +/// Transactions root of empty transactions set. +pub const EMPTY_TRANSACTIONS: H256 = EMPTY_SET_HASH; + +/// Withdrawals root of empty withdrawals set. +pub const EMPTY_WITHDRAWALS: H256 = EMPTY_SET_HASH; diff --git a/crates/stages/src/pipeline/set.rs b/crates/stages/src/pipeline/set.rs index edeac9c3b1..902b3bfa9d 100644 --- a/crates/stages/src/pipeline/set.rs +++ b/crates/stages/src/pipeline/set.rs @@ -186,6 +186,19 @@ where self } + /// Disables the given stage if the given closure returns true. + /// + /// See [Self::disable] + pub fn disable_if(self, stage_id: StageId, f: F) -> Self + where + F: FnOnce() -> bool, + { + if f() { + return self.disable(stage_id) + } + self + } + /// Consumes the builder and returns the contained [`Stage`]s in the order specified. pub fn build(mut self) -> Vec>> { let mut stages = Vec::new();