feat: integrate ress (#14959)

This commit is contained in:
Roman Krasiuk
2025-03-12 10:34:49 +01:00
committed by GitHub
parent aac3a6eb47
commit b968fa04ad
26 changed files with 720 additions and 104 deletions

View File

@@ -63,8 +63,11 @@ reth-node-events.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-prune.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# crypto
# alloy
alloy-eips = { workspace = true, features = ["kzg"] }
alloy-rlp.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }

View File

@@ -182,6 +182,9 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};

View File

@@ -4,8 +4,9 @@
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
use clap::Parser;
use reth::cli::Cli;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
use tracing::info;
@@ -17,11 +18,27 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
handle.node_exit_future.await
}) {
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.block_executor,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
eprintln!("Error: {err:?}");
std::process::exit(1);
}

66
bin/reth/src/ress.rs Normal file
View File

@@ -0,0 +1,66 @@
use reth_evm::execute::BlockExecutorProvider;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::BeaconConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_primitives::EthPrimitives;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
block_executor: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<BeaconConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
block_executor,
Box::new(task_executor.clone()),
pending_state,
args.witness_max_parallel,
args.witness_cache_size,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}