feat: add Future AT to LaunchNode and allow customizing local attributes builder (#18556)

This commit is contained in:
Arsenii Kulikov
2025-09-19 13:34:49 +04:00
committed by GitHub
parent 4e78f956fd
commit c9a95d085d
9 changed files with 235 additions and 58 deletions

1
Cargo.lock generated
View File

@@ -9605,6 +9605,7 @@ dependencies = [
"alloy-rpc-types-engine",
"assert_matches",
"auto_impl",
"either",
"op-alloy-rpc-types-engine",
"reth-chain-state",
"reth-chainspec",

View File

@@ -1,17 +1,14 @@
use alloy_eips::eip2718::Encodable2718;
use alloy_genesis::Genesis;
use alloy_primitives::{b256, hex};
use alloy_primitives::{b256, hex, Address};
use futures::StreamExt;
use reth_chainspec::ChainSpec;
use reth_node_api::{BlockBody, FullNodeComponents, FullNodePrimitives, NodeTypes};
use reth_node_builder::{
rpc::RethRpcAddOns, DebugNodeLauncher, EngineNodeLauncher, FullNode, NodeBuilder, NodeConfig,
NodeHandle,
};
use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeBuilder, NodeConfig, NodeHandle};
use reth_node_core::args::DevArgs;
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
use reth_rpc_eth_api::helpers::EthTransactions;
use reth_rpc_eth_api::{helpers::EthTransactions, EthApiServer};
use reth_tasks::TaskManager;
use std::sync::Arc;
@@ -29,23 +26,58 @@ async fn can_run_dev_node() -> eyre::Result<()> {
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.launch_with_fn(|builder| {
let engine_launcher = EngineNodeLauncher::new(
builder.task_executor().clone(),
builder.config().datadir(),
Default::default(),
);
let launcher = DebugNodeLauncher::new(engine_launcher);
builder.launch_with(launcher)
})
.launch_with_debug_capabilities()
.await?;
assert_chain_advances(node).await;
assert_chain_advances(&node).await;
Ok(())
}
async fn assert_chain_advances<N, AddOns>(node: FullNode<N, AddOns>)
#[tokio::test]
async fn can_run_dev_node_custom_attributes() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let fee_recipient = Address::random();
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.launch_with_debug_capabilities()
.map_debug_payload_attributes(move |mut attributes| {
attributes.suggested_fee_recipient = fee_recipient;
attributes
})
.await?;
assert_chain_advances(&node).await;
assert!(
node.rpc_registry.eth_api().balance(fee_recipient, Default::default()).await.unwrap() > 0
);
assert!(
node.rpc_registry
.eth_api()
.block_by_number(Default::default(), false)
.await
.unwrap()
.unwrap()
.header
.beneficiary ==
fee_recipient
);
Ok(())
}
async fn assert_chain_advances<N, AddOns>(node: &FullNode<N, AddOns>)
where
N: FullNodeComponents<Provider: CanonStateSubscriptions>,
AddOns: RethRpcAddOns<N, EthApi: EthTransactions>,

View File

@@ -662,9 +662,9 @@ where
///
/// This is equivalent to [`WithLaunchContext::launch`], but will enable the debugging features,
/// if they are configured.
pub async fn launch_with_debug_capabilities(
pub fn launch_with_debug_capabilities(
self,
) -> eyre::Result<<DebugNodeLauncher as LaunchNode<NodeBuilderWithComponents<T, CB, AO>>>::Node>
) -> <DebugNodeLauncher as LaunchNode<NodeBuilderWithComponents<T, CB, AO>>>::Future
where
T::Types: DebugNode<NodeAdapter<T, CB::Components>>,
DebugNodeLauncher: LaunchNode<NodeBuilderWithComponents<T, CB, AO>>,
@@ -678,7 +678,7 @@ where
builder.config.datadir(),
engine_tree_config,
));
builder.launch_with(launcher).await
builder.launch_with(launcher)
}
/// Returns an [`EngineNodeLauncher`] that can be used to launch the node with engine API

View File

@@ -251,11 +251,11 @@ where
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
{
/// Launches the node with the given launcher.
pub async fn launch_with<L>(self, launcher: L) -> eyre::Result<L::Node>
pub fn launch_with<L>(self, launcher: L) -> L::Future
where
L: LaunchNode<Self>,
{
launcher.launch_node(self).await
launcher.launch_node(self)
}
/// Sets the hook that is run once the rpc server is started.

View File

@@ -1,12 +1,19 @@
use super::LaunchNode;
use crate::{rpc::RethRpcAddOns, EngineNodeLauncher, Node, NodeHandle};
use alloy_consensus::transaction::Either;
use alloy_provider::network::AnyNetwork;
use jsonrpsee::core::{DeserializeOwned, Serialize};
use reth_chainspec::EthChainSpec;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_engine_local::LocalMiner;
use reth_node_api::{BlockTy, FullNodeComponents, PayloadAttributesBuilder, PayloadTypes};
use std::sync::Arc;
use reth_node_api::{
BlockTy, FullNodeComponents, PayloadAttrTy, PayloadAttributesBuilder, PayloadTypes,
};
use std::{
future::{Future, IntoFuture},
pin::Pin,
sync::Arc,
};
use tracing::info;
/// [`Node`] extension with support for debugging utilities.
@@ -104,16 +111,54 @@ impl<L> DebugNodeLauncher<L> {
}
}
impl<L, Target, N, AddOns> LaunchNode<Target> for DebugNodeLauncher<L>
/// Future for the [`DebugNodeLauncher`].
#[expect(missing_debug_implementations, clippy::type_complexity)]
pub struct DebugNodeLauncherFuture<L, Target, N>
where
N: FullNodeComponents<Types: DebugNode<N>>,
{
inner: L,
target: Target,
local_payload_attributes_builder:
Option<Box<dyn PayloadAttributesBuilder<PayloadAttrTy<N::Types>>>>,
map_attributes:
Option<Box<dyn Fn(PayloadAttrTy<N::Types>) -> PayloadAttrTy<N::Types> + Send + Sync>>,
}
impl<L, Target, N, AddOns> DebugNodeLauncherFuture<L, Target, N>
where
N: FullNodeComponents<Types: DebugNode<N>>,
AddOns: RethRpcAddOns<N>,
L: LaunchNode<Target, Node = NodeHandle<N, AddOns>>,
{
type Node = NodeHandle<N, AddOns>;
pub fn with_payload_attributes_builder(
self,
builder: impl PayloadAttributesBuilder<PayloadAttrTy<N::Types>>,
) -> Self {
Self {
inner: self.inner,
target: self.target,
local_payload_attributes_builder: Some(Box::new(builder)),
map_attributes: None,
}
}
async fn launch_node(self, target: Target) -> eyre::Result<Self::Node> {
let handle = self.inner.launch_node(target).await?;
pub fn map_debug_payload_attributes(
self,
f: impl Fn(PayloadAttrTy<N::Types>) -> PayloadAttrTy<N::Types> + Send + Sync + 'static,
) -> Self {
Self {
inner: self.inner,
target: self.target,
local_payload_attributes_builder: None,
map_attributes: Some(Box::new(f)),
}
}
async fn launch_node(self) -> eyre::Result<NodeHandle<N, AddOns>> {
let Self { inner, target, local_payload_attributes_builder, map_attributes } = self;
let handle = inner.launch_node(target).await?;
let config = &handle.node.config;
if let Some(url) = config.debug.rpc_consensus_url.clone() {
@@ -179,11 +224,23 @@ where
let pool = handle.node.pool.clone();
let payload_builder_handle = handle.node.payload_builder_handle.clone();
let builder = if let Some(builder) = local_payload_attributes_builder {
Either::Left(builder)
} else {
let local = N::Types::local_payload_attributes_builder(&chain_spec);
let builder = if let Some(f) = map_attributes {
Either::Left(move |block_number| f(local.build(block_number)))
} else {
Either::Right(local)
};
Either::Right(builder)
};
let dev_mining_mode = handle.node.config.dev_mining_mode(pool);
handle.node.task_executor.spawn_critical("local engine", async move {
LocalMiner::new(
blockchain_db,
N::Types::local_payload_attributes_builder(&chain_spec),
builder,
beacon_engine_handle,
dev_mining_mode,
payload_builder_handle,
@@ -196,3 +253,38 @@ where
Ok(handle)
}
}
impl<L, Target, N, AddOns> IntoFuture for DebugNodeLauncherFuture<L, Target, N>
where
Target: Send + 'static,
N: FullNodeComponents<Types: DebugNode<N>>,
AddOns: RethRpcAddOns<N> + 'static,
L: LaunchNode<Target, Node = NodeHandle<N, AddOns>> + 'static,
{
type Output = eyre::Result<NodeHandle<N, AddOns>>;
type IntoFuture = Pin<Box<dyn Future<Output = eyre::Result<NodeHandle<N, AddOns>>> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.launch_node())
}
}
impl<L, Target, N, AddOns> LaunchNode<Target> for DebugNodeLauncher<L>
where
Target: Send + 'static,
N: FullNodeComponents<Types: DebugNode<N>>,
AddOns: RethRpcAddOns<N> + 'static,
L: LaunchNode<Target, Node = NodeHandle<N, AddOns>> + 'static,
{
type Node = NodeHandle<N, AddOns>;
type Future = DebugNodeLauncherFuture<L, Target, N>;
fn launch_node(self, target: Target) -> Self::Future {
DebugNodeLauncherFuture {
inner: self.inner,
target,
local_payload_attributes_builder: None,
map_attributes: None,
}
}
}

View File

@@ -11,7 +11,6 @@ use crate::{
use alloy_consensus::BlockHeader;
use futures::{stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_db_api::{database_metrics::DatabaseMetrics, Database};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
@@ -37,7 +36,7 @@ use reth_provider::{
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use std::sync::Arc;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -61,27 +60,22 @@ impl EngineNodeLauncher {
) -> Self {
Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
}
}
impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
where
Types: NodeTypesForProvider + NodeTypes,
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
T: FullNodeTypes<
Types = Types,
DB = DB,
Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
>,
CB: NodeComponentsBuilder<T>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
+ EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
{
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
async fn launch_node(
async fn launch_node<T, CB, AO>(
self,
target: NodeBuilderWithComponents<T, CB, AO>,
) -> eyre::Result<Self::Node> {
) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
where
T: FullNodeTypes<
Types: NodeTypesForProvider,
Provider = BlockchainProvider<
NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
>,
>,
CB: NodeComponentsBuilder<T>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
+ EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
{
let Self { ctx, engine_tree_config } = self;
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
@@ -112,7 +106,7 @@ where
debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
})
.with_genesis()?
.inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
.inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
})
.with_metrics_task()
@@ -368,3 +362,24 @@ where
Ok(handle)
}
}
impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
where
T: FullNodeTypes<
Types: NodeTypesForProvider,
Provider = BlockchainProvider<
NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
>,
>,
CB: NodeComponentsBuilder<T> + 'static,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
+ EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
+ 'static,
{
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
Box::pin(self.launch_node(target))
}
}

View File

@@ -10,7 +10,7 @@ pub(crate) mod engine;
pub use common::LaunchContext;
pub use exex::ExExLauncher;
use std::future::Future;
use std::future::IntoFuture;
/// A general purpose trait that launches a new node of any kind.
///
@@ -21,22 +21,26 @@ use std::future::Future;
///
/// See also [`EngineNodeLauncher`](crate::EngineNodeLauncher) and
/// [`NodeBuilderWithComponents::launch_with`](crate::NodeBuilderWithComponents)
pub trait LaunchNode<Target> {
pub trait LaunchNode<Target>: Send {
/// The node type that is created.
type Node;
/// The future type that is returned.
type Future: IntoFuture<Output = eyre::Result<Self::Node>, IntoFuture: Send>;
/// Create and return a new node asynchronously.
fn launch_node(self, target: Target) -> impl Future<Output = eyre::Result<Self::Node>>;
fn launch_node(self, target: Target) -> Self::Future;
}
impl<F, Target, Fut, Node> LaunchNode<Target> for F
where
F: FnOnce(Target) -> Fut + Send,
Fut: Future<Output = eyre::Result<Node>> + Send,
Fut: IntoFuture<Output = eyre::Result<Node>, IntoFuture: Send> + Send,
{
type Node = Node;
type Future = Fut;
fn launch_node(self, target: Target) -> impl Future<Output = eyre::Result<Self::Node>> {
fn launch_node(self, target: Target) -> Self::Future {
self(target)
}
}

View File

@@ -26,6 +26,7 @@ op-alloy-rpc-types-engine = { workspace = true, optional = true }
# misc
auto_impl.workspace = true
either.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = ["sync"] }
@@ -44,6 +45,7 @@ std = [
"serde/std",
"thiserror/std",
"reth-primitives-traits/std",
"either/std",
]
op = [
"dep:op-alloy-rpc-types-engine",

View File

@@ -1,6 +1,7 @@
//! Core traits for working with execution payloads.
use alloc::vec::Vec;
use crate::PayloadBuilderError;
use alloc::{boxed::Box, vec::Vec};
use alloy_eips::{
eip4895::{Withdrawal, Withdrawals},
eip7685::Requests,
@@ -11,8 +12,6 @@ use core::fmt;
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_primitives_traits::{NodePrimitives, SealedBlock, SealedHeader};
use crate::PayloadBuilderError;
/// Represents a successfully built execution payload (block).
///
/// Provides access to the underlying block data, execution results, and associated metadata
@@ -147,6 +146,38 @@ pub trait PayloadAttributesBuilder<Attributes>: Send + Sync + 'static {
fn build(&self, timestamp: u64) -> Attributes;
}
impl<Attributes, F> PayloadAttributesBuilder<Attributes> for F
where
F: Fn(u64) -> Attributes + Send + Sync + 'static,
{
fn build(&self, timestamp: u64) -> Attributes {
self(timestamp)
}
}
impl<Attributes, L, R> PayloadAttributesBuilder<Attributes> for either::Either<L, R>
where
L: PayloadAttributesBuilder<Attributes>,
R: PayloadAttributesBuilder<Attributes>,
{
fn build(&self, timestamp: u64) -> Attributes {
match self {
Self::Left(l) => l.build(timestamp),
Self::Right(r) => r.build(timestamp),
}
}
}
impl<Attributes> PayloadAttributesBuilder<Attributes>
for Box<dyn PayloadAttributesBuilder<Attributes>>
where
Attributes: 'static,
{
fn build(&self, timestamp: u64) -> Attributes {
self.as_ref().build(timestamp)
}
}
/// Trait to build the EVM environment for the next block from the given payload attributes.
///
/// Accepts payload attributes from CL, parent header and additional payload builder context.