diff --git a/Cargo.lock b/Cargo.lock index 27c918a742..fa170f8721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4854,12 +4854,17 @@ dependencies = [ name = "reth-miner" version = "0.1.0" dependencies = [ + "futures-core", + "futures-util", "parking_lot 0.12.1", "reth-primitives", "reth-rlp", "reth-rpc-types", "sha2 0.10.6", "thiserror", + "tokio", + "tokio-stream", + "tracing", ] [[package]] diff --git a/crates/miner/Cargo.toml b/crates/miner/Cargo.toml index d2dd9593e7..82b6e56a23 100644 --- a/crates/miner/Cargo.toml +++ b/crates/miner/Cargo.toml @@ -13,7 +13,14 @@ reth-primitives = { path = "../primitives" } reth-rpc-types = { path = "../rpc/rpc-types" } reth-rlp = { path = "../rlp" } +## async +tokio = { version = "1", features = ["sync"] } +tokio-stream = "0.1" +futures-util = "0.3" +futures-core = "0.3" + ## misc thiserror = "1.0" -sha2 = { version = "0.10.6", default-features = false } -parking_lot = "0.12.1" +sha2 = { version = "0.10", default-features = false } +parking_lot = "0.12" +tracing = "0.1.37" diff --git a/crates/miner/src/error.rs b/crates/miner/src/error.rs index f7184d1cb3..ef23377383 100644 --- a/crates/miner/src/error.rs +++ b/crates/miner/src/error.rs @@ -1,6 +1,17 @@ //! Error types emitted by types or implementations of this crate. +use tokio::sync::oneshot; + /// Possible error variants during payload building. #[derive(Debug, thiserror::Error)] -#[error("Payload builder error")] -pub struct PayloadBuilderError; +pub enum PayloadBuilderError { + /// A oneshot channels has been closed. + #[error("Sender has been dropped")] + ChannelClosed, +} + +impl From for PayloadBuilderError { + fn from(_: oneshot::error::RecvError) -> Self { + PayloadBuilderError::ChannelClosed + } +} diff --git a/crates/miner/src/lib.rs b/crates/miner/src/lib.rs index 581a82bd3b..56023888c9 100644 --- a/crates/miner/src/lib.rs +++ b/crates/miner/src/lib.rs @@ -10,19 +10,28 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -//! reth miner implementation +//! This trait implements the [PayloadBuilderService] responsible for managing payload jobs. +//! +//! It Defines the abstractions to create and update payloads: +//! - [PayloadJobGenerator]: a type that knows how to create new jobs for creating payloads based +//! on [PayloadAttributes]. +//! - [PayloadJob]: a type that can yields (better) payloads over time. +pub mod error; mod payload; +mod service; +mod traits; +pub use payload::{BuiltPayload, PayloadBuilderAttributes}; +pub use reth_rpc_types::engine::PayloadId; +pub use service::{PayloadBuilderHandle, PayloadBuilderService, PayloadStore as PayloadStore2}; +pub use traits::{PayloadJob, PayloadJobGenerator}; use crate::error::PayloadBuilderError; use parking_lot::Mutex; -pub use payload::{BuiltPayload, PayloadBuilderAttributes}; -use reth_primitives::H256; -use reth_rpc_types::engine::{ExecutionPayloadEnvelope, PayloadAttributes, PayloadId}; +use reth_primitives::{H256, U256}; +use reth_rpc_types::engine::{ExecutionPayloadEnvelope, PayloadAttributes}; use std::{collections::HashMap, sync::Arc}; -pub mod error; - /// A type that has access to all locally built payloads and can create new ones. /// This type is intended to by used by the engine API. pub trait PayloadStore: Send + Sync { @@ -68,7 +77,9 @@ impl PayloadStore for TestPayloadStore { ) -> Result { let attr = PayloadBuilderAttributes::new(parent, attributes); let payload_id = attr.payload_id(); - self.payloads.lock().insert(payload_id, BuiltPayload::new(payload_id, Default::default())); + self.payloads + .lock() + .insert(payload_id, BuiltPayload::new(payload_id, Default::default(), U256::ZERO)); Ok(payload_id) } } diff --git a/crates/miner/src/payload.rs b/crates/miner/src/payload.rs index ebb0bd1fc8..d5130d7bc9 100644 --- a/crates/miner/src/payload.rs +++ b/crates/miner/src/payload.rs @@ -1,6 +1,6 @@ //! Contains types required for building a payload. -use reth_primitives::{Address, Block, SealedBlock, Withdrawal, H256}; +use reth_primitives::{Address, Block, SealedBlock, Withdrawal, H256, U256}; use reth_rlp::Encodable; use reth_rpc_types::engine::{PayloadAttributes, PayloadId}; @@ -13,27 +13,40 @@ use reth_rpc_types::engine::{PayloadAttributes, PayloadId}; pub struct BuiltPayload { /// Identifier of the payload pub(crate) id: PayloadId, - /// The initially empty block. - _initial_empty_block: SealedBlock, + /// The built block + pub(crate) block: SealedBlock, + /// The fees of the block + pub(crate) fees: U256, } // === impl BuiltPayload === impl BuiltPayload { /// Initializes the payload with the given initial block. - pub(crate) fn new(id: PayloadId, initial: Block) -> Self { - Self { id, _initial_empty_block: initial.seal_slow() } + pub(crate) fn new(id: PayloadId, block: Block, fees: U256) -> Self { + Self { id, block: block.seal_slow(), fees } } /// Returns the identifier of the payload. pub fn id(&self) -> PayloadId { self.id } + + /// Returns the identifier of the payload. + pub fn block(&self) -> &SealedBlock { + &self.block + } + + /// Fees of the block + pub fn fees(&self) -> U256 { + self.fees + } } /// Container type for all components required to build a payload. #[derive(Debug, Clone, PartialEq, Eq)] pub struct PayloadBuilderAttributes { + // TODO include id here /// Parent block to build the payload on top pub(crate) parent: H256, /// Timestamp for the generated payload @@ -63,7 +76,7 @@ impl PayloadBuilderAttributes { /// Generates the payload id for the configured payload /// /// Returns an 8-byte identifier by hashing the payload components. - pub fn payload_id(&self) -> PayloadId { + pub(crate) fn payload_id(&self) -> PayloadId { use sha2::Digest; let mut hasher = sha2::Sha256::new(); hasher.update(self.parent.as_bytes()); diff --git a/crates/miner/src/service.rs b/crates/miner/src/service.rs new file mode 100644 index 0000000000..57d365de82 --- /dev/null +++ b/crates/miner/src/service.rs @@ -0,0 +1,201 @@ +//! Support for building payloads. +//! +//! The payload builder is responsible for building payloads. +//! Once a new payload is created, it is continuously updated. + +use crate::{traits::PayloadJobGenerator, BuiltPayload, PayloadBuilderAttributes, PayloadJob}; +use futures_util::stream::{StreamExt, TryStreamExt}; +use reth_rpc_types::engine::PayloadId; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{trace, warn}; + +/// A communication channel to the [PayloadBuilderService] that can retrieve payloads. +#[derive(Debug, Clone)] +pub struct PayloadStore { + inner: PayloadBuilderHandle, +} + +// === impl PayloadStore === + +impl PayloadStore { + /// Returns the best payload for the given identifier. + pub async fn get_payload(&self, id: PayloadId) -> Option> { + self.inner.get_payload(id).await + } +} + +/// A communication channel to the [PayloadBuilderService]. +/// +/// This is the API used to create new payloads and to get the current state of existing ones. +#[derive(Debug, Clone)] +pub struct PayloadBuilderHandle { + /// Sender half of the message channel to the [PayloadBuilderService]. + to_service: mpsc::UnboundedSender, +} + +// === impl PayloadBuilderHandle === + +impl PayloadBuilderHandle { + /// Returns the best payload for the given identifier. + pub async fn get_payload(&self, id: PayloadId) -> Option> { + let (tx, rx) = oneshot::channel(); + self.to_service.send(PayloadServiceCommand::GetPayload(id, tx)).ok()?; + rx.await.ok()? + } + + /// Starts building a new payload for the given payload attributes. + /// + /// Returns the identifier of the payload. + /// + /// Note: if there's already payload in progress with same identifier, it will be returned. + pub async fn new_payload( + &self, + attr: PayloadBuilderAttributes, + ) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); + rx.await + } +} + +/// A service that manages payload building tasks. +/// +/// This type is an endless future that manages the building of payloads. +/// +/// It tracks active payloads and their build jobs that run in the worker pool. +/// +/// By design, this type relies entirely on the [PayloadJobGenerator] to create new payloads and +/// does know nothing about how to build them, itt just drives the payload jobs. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PayloadBuilderService +where + Gen: PayloadJobGenerator, +{ + /// The type that knows how to create new payloads. + generator: Gen, + /// All active payload jobs. + payload_jobs: Vec<(Gen::Job, PayloadId)>, + /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand. + _service_tx: mpsc::UnboundedSender, + /// Receiver half of the command channel. + command_rx: UnboundedReceiverStream, +} + +// === impl PayloadBuilderService === + +impl PayloadBuilderService +where + Gen: PayloadJobGenerator, +{ + /// Creates a new payload builder service. + pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) { + let (service_tx, command_rx) = mpsc::unbounded_channel(); + let service = Self { + generator, + payload_jobs: Vec::new(), + _service_tx: service_tx.clone(), + command_rx: UnboundedReceiverStream::new(command_rx), + }; + let handle = PayloadBuilderHandle { to_service: service_tx }; + (service, handle) + } + + /// Returns true if the given payload is currently being built. + fn contains_payload(&self, id: PayloadId) -> bool { + self.payload_jobs.iter().any(|(_, job_id)| *job_id == id) + } + + /// Returns the best payload for the given identifier. + fn get_payload(&self, id: PayloadId) -> Option> { + self.payload_jobs.iter().find(|(_, job_id)| *job_id == id).map(|(j, _)| j.best_payload()) + } +} + +impl Future for PayloadBuilderService +where + Gen: PayloadJobGenerator + Unpin + 'static, + ::Job: Unpin + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + // we poll all jobs first, so we always have the latest payload that we can report if + // requests + // we don't care about the order of the jobs, so we can just swap_remove them + 'jobs: for idx in (0..this.payload_jobs.len()).rev() { + let (mut job, id) = this.payload_jobs.swap_remove(idx); + + // drain better payloads from the job + loop { + match job.try_poll_next_unpin(cx) { + Poll::Ready(Some(Ok(payload))) => { + trace!(?payload, %id, "new payload"); + } + Poll::Ready(Some(Err(err))) => { + warn!(?err, %id, "payload job failed; resolving payload"); + continue 'jobs + } + Poll::Ready(None) => { + // job is done + trace!(?id, "payload job finished"); + continue 'jobs + } + Poll::Pending => { + // still pending, put it back + this.payload_jobs.push((job, id)); + continue 'jobs + } + } + } + } + + // marker for exit condition + // TODO(mattsse): this could be optmized so we only poll new jobs + let mut new_job = false; + + // drain all requests + while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { + match cmd { + PayloadServiceCommand::BuildNewPayload(attr, tx) => { + let id = attr.payload_id(); + if !this.contains_payload(id) { + // no job for this payload yet, create one + new_job = true; + let job = this.generator.new_payload_job(attr); + this.payload_jobs.push((job, id)); + } + + // return the id of the payload + let _ = tx.send(id); + } + PayloadServiceCommand::GetPayload(id, tx) => { + let _ = tx.send(this.get_payload(id)); + } + } + } + + if !new_job { + return Poll::Pending + } + } + } +} + +/// Message type for the [PayloadBuilderService]. +#[derive(Debug)] +enum PayloadServiceCommand { + /// Start building a new payload. + BuildNewPayload(PayloadBuilderAttributes, oneshot::Sender), + /// Get the current payload. + GetPayload(PayloadId, oneshot::Sender>>), +} diff --git a/crates/miner/src/traits.rs b/crates/miner/src/traits.rs new file mode 100644 index 0000000000..c01ab39664 --- /dev/null +++ b/crates/miner/src/traits.rs @@ -0,0 +1,37 @@ +//! Trait abstractions used by the payload crate. + +use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; +use futures_core::TryStream; + +use std::sync::Arc; + +/// A type that can build a payload. +/// +/// This type is a Stream that yields better payloads. +/// +/// Note: PaylodJob need to be cancel safe. +/// +/// TODO convert this into a future? +pub trait PayloadJob: + TryStream, Error = PayloadBuilderError> + Send + Sync +{ + /// Returns the best payload that has been built so far. + /// + /// Note: this is expected to be an empty block without transaction if nothing has been built + /// yet. + fn best_payload(&self) -> Arc; +} + +/// A type that knows how to create new jobs for creating payloads. +pub trait PayloadJobGenerator: Send + Sync { + /// The type that manages the lifecycle of a payload. + /// + /// This type is a Stream that yields better payloads payload. + type Job: PayloadJob; + + /// Creates the initial payload and a new [PayloadJob] that yields better payloads. + /// + /// Note: this is expected to build a new (empty) payload without transactions, so it can be + /// returned directly. when asked for + fn new_payload_job(&self, attr: PayloadBuilderAttributes) -> Self::Job; +} diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs index c31c6c63a5..3c952f14a7 100644 --- a/crates/rpc/rpc-types/src/eth/engine/payload.rs +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -20,6 +20,12 @@ impl PayloadId { } } +impl std::fmt::Display for PayloadId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + /// This structure maps for the return value of `engine_getPayloadV2` of the beacon chain spec. /// /// See also: