feat(engine): add trigger-based MiningMode variant (#22250)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Georgios Konstantopoulos
2026-02-16 16:28:25 -08:00
committed by GitHub
parent 2eec519bf9
commit 5b8808e5fd
3 changed files with 59 additions and 4 deletions

View File

@@ -3,7 +3,7 @@
use alloy_primitives::{TxHash, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use futures_util::{stream::Fuse, Stream, StreamExt};
use reth_engine_primitives::ConsensusEngineHandle;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
@@ -14,6 +14,7 @@ use reth_storage_api::BlockReader;
use reth_transaction_pool::TransactionPool;
use std::{
collections::VecDeque,
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
@@ -24,7 +25,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
/// A mining mode for the local dev engine.
#[derive(Debug)]
pub enum MiningMode<Pool: TransactionPool + Unpin> {
/// In this mode a block is built as soon as
/// a valid transaction reaches the pool.
@@ -43,6 +43,25 @@ pub enum MiningMode<Pool: TransactionPool + Unpin> {
},
/// In this mode a block is built at a fixed interval.
Interval(Interval),
/// In this mode a block is built when the trigger stream yields a value.
///
/// This is a general-purpose trigger that can be fired on demand, for example via a channel
/// or any other [`Stream`] implementation.
Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
}
impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Instant { max_transactions, accumulated, .. } => f
.debug_struct("Instant")
.field("max_transactions", max_transactions)
.field("accumulated", accumulated)
.finish(),
Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
}
}
}
impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
@@ -57,6 +76,14 @@ impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
let start = tokio::time::Instant::now() + duration;
Self::Interval(tokio::time::interval_at(start, duration))
}
/// Constructor for a [`MiningMode::Trigger`]
///
/// Accepts any stream that yields `()` values, each of which triggers a new block to be
/// mined. This can be backed by a channel, a custom stream, or any other async source.
pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
Self::Trigger(Box::pin(trigger))
}
}
impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
@@ -91,6 +118,12 @@ impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
}
Poll::Pending
}
Self::Trigger(trigger) => {
if trigger.poll_next_unpin(cx).is_ready() {
return Poll::Ready(())
}
Poll::Pending
}
}
}
}