diff --git a/.github/assets/check_wasm.sh b/.github/assets/check_wasm.sh index 00cd726d81..32a7dad865 100755 --- a/.github/assets/check_wasm.sh +++ b/.github/assets/check_wasm.sh @@ -32,7 +32,6 @@ exclude_crates=( reth-exex-test-utils reth-ipc reth-net-nat - reth-network-ress reth-network reth-node-api reth-node-builder @@ -56,6 +55,8 @@ exclude_crates=( reth-rpc-layer reth-stages reth-engine-local + reth-ress-protocol + reth-ress-provider # The following are not supposed to be working reth # all of the crates below reth-invalid-block-hooks # reth-provider diff --git a/Cargo.lock b/Cargo.lock index e70cb5dbdd..fc9a6bd5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6756,6 +6756,8 @@ dependencies = [ "reth-primitives-traits", "reth-provider", "reth-prune", + "reth-ress-protocol", + "reth-ress-provider", "reth-revm", "reth-rpc", "reth-rpc-api", @@ -6766,6 +6768,7 @@ dependencies = [ "reth-stages", "reth-static-file", "reth-tasks", + "reth-tokio-util", "reth-tracing", "reth-transaction-pool", "reth-trie", @@ -8215,32 +8218,6 @@ dependencies = [ "url", ] -[[package]] -name = "reth-network-ress" -version = "1.2.2" -dependencies = [ - "alloy-consensus", - "alloy-primitives", - "alloy-rlp", - "arbitrary", - "futures", - "proptest", - "proptest-arbitrary-interop", - "reth-eth-wire", - "reth-ethereum-primitives", - "reth-network", - "reth-network-api", - "reth-network-ress", - "reth-provider", - "reth-storage-errors", - "reth-tracing", - "strum 0.27.1", - "strum_macros 0.27.1", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "reth-network-types" version = "1.2.2" @@ -9168,6 +9145,60 @@ dependencies = [ "toml", ] +[[package]] +name = "reth-ress-protocol" +version = "1.2.2" +dependencies = [ + "alloy-consensus", + "alloy-primitives", + "alloy-rlp", + "arbitrary", + "futures", + "proptest", + "proptest-arbitrary-interop", + "reth-eth-wire", + "reth-ethereum-primitives", + "reth-network", + "reth-network-api", + "reth-provider", + "reth-ress-protocol", + "reth-storage-errors", + "reth-tracing", + "strum 0.27.1", + "strum_macros 0.27.1", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "reth-ress-provider" +version = "1.2.2" +dependencies = [ + "alloy-consensus", + "alloy-primitives", + "eyre", + "futures", + "parking_lot", + "reth-chain-state", + "reth-engine-primitives", + "reth-evm", + "reth-network", + "reth-network-api", + "reth-node-api", + "reth-primitives", + "reth-primitives-traits", + "reth-provider", + "reth-ress-protocol", + "reth-revm", + "reth-tasks", + "reth-tokio-util", + "reth-trie", + "schnellru", + "tokio", + "tracing", +] + [[package]] name = "reth-revm" version = "1.2.2" diff --git a/Cargo.toml b/Cargo.toml index bf4388fa5a..400fb1cbf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,6 @@ members = [ "crates/net/network/", "crates/net/p2p/", "crates/net/peers/", - "crates/net/ress/", "crates/node/api/", "crates/node/builder/", "crates/node/core/", @@ -91,6 +90,8 @@ members = [ "crates/primitives/", "crates/prune/prune", "crates/prune/types", + "crates/ress/protocol", + "crates/ress/provider", "crates/revm/", "crates/rpc/ipc/", "crates/rpc/rpc-api/", @@ -428,6 +429,8 @@ reth-trie-db = { path = "crates/trie/db" } reth-trie-parallel = { path = "crates/trie/parallel" } reth-trie-sparse = { path = "crates/trie/sparse" } reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false } +reth-ress-protocol = { path = "crates/ress/protocol" } +reth-ress-provider = { path = "crates/ress/provider" } # revm revm = { version = "20.0.0-alpha.4", default-features = false } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 39e7579d41..58376f6368 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -63,8 +63,11 @@ reth-node-events.workspace = true reth-node-metrics.workspace = true reth-consensus.workspace = true reth-prune.workspace = true +reth-tokio-util.workspace = true +reth-ress-protocol.workspace = true +reth-ress-provider.workspace = true -# crypto +# alloy alloy-eips = { workspace = true, features = ["kzg"] } alloy-rlp.workspace = true alloy-rpc-types = { workspace = true, features = ["engine"] } diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 627d4c475a..cea426aa3a 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -182,6 +182,9 @@ pub mod rpc { } } +/// Ress subprotocol installation. +pub mod ress; + // re-export for convenience #[doc(inline)] pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner}; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 8dc8f20fab..45f56ce94c 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -4,8 +4,9 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); use clap::Parser; -use reth::cli::Cli; +use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol}; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; +use reth_node_builder::NodeHandle; use reth_node_ethereum::EthereumNode; use tracing::info; @@ -17,11 +18,27 @@ fn main() { unsafe { std::env::set_var("RUST_BACKTRACE", "1") }; } - if let Err(err) = Cli::::parse().run(async move |builder, _| { - info!(target: "reth::cli", "Launching node"); - let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?; - handle.node_exit_future.await - }) { + if let Err(err) = + Cli::::parse().run(async move |builder, ress_args| { + info!(target: "reth::cli", "Launching node"); + let NodeHandle { node, node_exit_future } = + builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?; + + // Install ress subprotocol. + if ress_args.enabled { + install_ress_subprotocol( + ress_args, + node.provider, + node.block_executor, + node.network, + node.task_executor, + node.add_ons_handle.engine_events.new_listener(), + )?; + } + + node_exit_future.await + }) + { eprintln!("Error: {err:?}"); std::process::exit(1); } diff --git a/bin/reth/src/ress.rs b/bin/reth/src/ress.rs new file mode 100644 index 0000000000..2daf185b6e --- /dev/null +++ b/bin/reth/src/ress.rs @@ -0,0 +1,66 @@ +use reth_evm::execute::BlockExecutorProvider; +use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; +use reth_network_api::FullNetwork; +use reth_node_api::BeaconConsensusEngineEvent; +use reth_node_core::args::RessArgs; +use reth_primitives::EthPrimitives; +use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes}; +use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler}; +use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider}; +use reth_tasks::TaskExecutor; +use reth_tokio_util::EventStream; +use tokio::sync::mpsc; +use tracing::*; + +/// Install `ress` subprotocol if it's enabled. +pub fn install_ress_subprotocol( + args: RessArgs, + provider: BlockchainProvider

, + block_executor: E, + network: N, + task_executor: TaskExecutor, + engine_events: EventStream>, +) -> eyre::Result<()> +where + P: ProviderNodeTypes, + E: BlockExecutorProvider + Clone, + N: FullNetwork + NetworkProtocols, +{ + info!(target: "reth::cli", "Installing ress subprotocol"); + let pending_state = PendingState::default(); + + // Spawn maintenance task for pending state. + task_executor.spawn(maintain_pending_state( + engine_events, + provider.clone(), + pending_state.clone(), + )); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let provider = RethRessProtocolProvider::new( + provider, + block_executor, + Box::new(task_executor.clone()), + pending_state, + args.witness_max_parallel, + args.witness_cache_size, + )?; + network.add_rlpx_sub_protocol( + RessProtocolHandler { + provider, + node_type: NodeType::Stateful, + peers_handle: network.peers_handle().clone(), + max_active_connections: args.max_active_connections, + state: ProtocolState::new(tx), + } + .into_rlpx_sub_protocol(), + ); + info!(target: "reth::cli", "Ress subprotocol support enabled"); + + task_executor.spawn(async move { + while let Some(event) = rx.recv().await { + trace!(target: "reth::ress", ?event, "Received ress event"); + } + }); + Ok(()) +} diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index 8ea72e85eb..407b375139 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -721,6 +721,25 @@ Engine: --engine.state-root-task-compare-updates Enable comparing trie updates from the state root task to the trie updates from the regular state root calculation +Ress: + --ress.enable + Enable support for `ress` subprotocol + + --ress.max-active-connections + The maximum number of active connections for `ress` subprotocol + + [default: 5] + + --ress.witness-max-parallel + The maximum number of witnesses to generate in parallel + + [default: 5] + + --ress.witness-cache-size + Witness cache size + + [default: 10] + Logging: --log.stdout.format The format to use for logs written to stdout diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 6163c87300..d6392ebf12 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -19,7 +19,9 @@ pub mod error; pub mod events; /// Implementation of network traits for that does nothing. pub mod noop; + pub mod test_utils; +use test_utils::PeersHandleProvider; pub use alloy_rpc_types_admin::EthProtocolInfo; use reth_network_p2p::sync::NetworkSyncUpdater; @@ -49,6 +51,7 @@ pub trait FullNetwork: + NetworkEventListenerProvider + PeersInfo + Peers + + PeersHandleProvider + Clone + 'static { @@ -61,6 +64,7 @@ impl FullNetwork for T where + NetworkEventListenerProvider + PeersInfo + Peers + + PeersHandleProvider + Clone + 'static { diff --git a/crates/node/core/src/args/mod.rs b/crates/node/core/src/args/mod.rs index 1649b8b56b..3a5e55ce29 100644 --- a/crates/node/core/src/args/mod.rs +++ b/crates/node/core/src/args/mod.rs @@ -60,5 +60,9 @@ pub use benchmark_args::BenchmarkArgs; mod engine; pub use engine::EngineArgs; +/// `RessArgs` for configuring ress subprotocol. +mod ress_args; +pub use ress_args::RessArgs; + mod error; pub mod types; diff --git a/crates/node/core/src/args/ress_args.rs b/crates/node/core/src/args/ress_args.rs new file mode 100644 index 0000000000..4b9cfbb631 --- /dev/null +++ b/crates/node/core/src/args/ress_args.rs @@ -0,0 +1,42 @@ +use clap::Args; + +/// The default number of maximum active connections. +const MAX_ACTIVE_CONNECTIONS_DEFAULT: u64 = 5; + +/// The default maximum number of witnesses to generate in parallel. +const WITNESS_MAX_PARALLEL_DEFAULT: usize = 5; + +/// The default witness cache size. +const WITNESS_CACHE_SIZE_DEFAULT: u32 = 10; + +/// Parameters for configuring the `ress` subprotocol. +#[derive(Debug, Clone, Args, PartialEq, Eq)] +#[command(next_help_heading = "Ress")] +pub struct RessArgs { + /// Enable support for `ress` subprotocol. + #[arg(long = "ress.enable", default_value_t = false)] + pub enabled: bool, + + /// The maximum number of active connections for `ress` subprotocol. + #[arg(long = "ress.max-active-connections", default_value_t = MAX_ACTIVE_CONNECTIONS_DEFAULT)] + pub max_active_connections: u64, + + /// The maximum number of witnesses to generate in parallel. + #[arg(long = "ress.witness-max-parallel", default_value_t = WITNESS_MAX_PARALLEL_DEFAULT)] + pub witness_max_parallel: usize, + + /// Witness cache size. + #[arg(long = "ress.witness-cache-size", default_value_t = WITNESS_CACHE_SIZE_DEFAULT)] + pub witness_cache_size: u32, +} + +impl Default for RessArgs { + fn default() -> Self { + Self { + enabled: false, + max_active_connections: MAX_ACTIVE_CONNECTIONS_DEFAULT, + witness_max_parallel: WITNESS_MAX_PARALLEL_DEFAULT, + witness_cache_size: WITNESS_CACHE_SIZE_DEFAULT, + } + } +} diff --git a/crates/net/ress/Cargo.toml b/crates/ress/protocol/Cargo.toml similarity index 90% rename from crates/net/ress/Cargo.toml rename to crates/ress/protocol/Cargo.toml index d061b007df..3a36c1071c 100644 --- a/crates/net/ress/Cargo.toml +++ b/crates/ress/protocol/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "reth-network-ress" +name = "reth-ress-protocol" version.workspace = true edition.workspace = true rust-version.workspace = true @@ -39,7 +39,7 @@ reth-provider = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true # enable `test-utils` feature on this crate -reth-network-ress = { path = ".", features = ["test-utils"] } +reth-ress-protocol = { path = ".", features = ["test-utils"] } tokio.workspace = true strum.workspace = true @@ -51,7 +51,7 @@ proptest-arbitrary-interop.workspace = true [features] test-utils = [ "reth-network/test-utils", - "reth-network-ress/test-utils", + "reth-ress-protocol/test-utils", "reth-ethereum-primitives/test-utils", "reth-provider/test-utils", ] @@ -59,7 +59,7 @@ arbitrary = [ "dep:arbitrary", "reth-eth-wire/arbitrary", "alloy-primitives/arbitrary", - "reth-network-ress/arbitrary", + "reth-ress-protocol/arbitrary", "reth-ethereum-primitives/arbitrary", "alloy-consensus/arbitrary", ] diff --git a/crates/net/ress/README.md b/crates/ress/protocol/README.md similarity index 100% rename from crates/net/ress/README.md rename to crates/ress/protocol/README.md diff --git a/crates/net/ress/src/connection.rs b/crates/ress/protocol/src/connection.rs similarity index 95% rename from crates/net/ress/src/connection.rs rename to crates/ress/protocol/src/connection.rs index 9dd868ae7b..8661629888 100644 --- a/crates/net/ress/src/connection.rs +++ b/crates/ress/protocol/src/connection.rs @@ -1,6 +1,4 @@ -use crate::{ - GetHeaders, NodeType, RessMessage, RessProtocolMessage, RessProtocolProvider, StateWitnessNet, -}; +use crate::{GetHeaders, NodeType, RessMessage, RessProtocolMessage, RessProtocolProvider}; use alloy_consensus::Header; use alloy_primitives::{bytes::BytesMut, BlockHash, Bytes, B256}; use futures::{stream::FuturesUnordered, Stream, StreamExt}; @@ -152,19 +150,15 @@ where fn on_witness_response( &self, request: RequestPair, - witness_result: ProviderResult>, + witness_result: ProviderResult>, ) -> RessProtocolMessage { let peer_id = self.peer_id; let block_hash = request.message; let witness = match witness_result { - Ok(Some(witness)) => { - trace!(target: "ress::net::connection", %peer_id, %block_hash, "witness found"); + Ok(witness) => { + trace!(target: "ress::net::connection", %peer_id, %block_hash, len = witness.len(), "witness found"); witness } - Ok(None) => { - trace!(target: "ress::net::connection", %peer_id, %block_hash, "witness not found"); - Default::default() - } Err(error) => { trace!(target: "ress::net::connection", %peer_id, %block_hash, %error, "error retrieving witness"); Default::default() @@ -326,9 +320,8 @@ where } } -type WitnessFut = Pin< - Box, ProviderResult>)> + Send>, ->; +type WitnessFut = + Pin, ProviderResult>)> + Send>>; /// Ress peer request. #[derive(Debug)] @@ -359,7 +352,7 @@ pub enum RessPeerRequest { /// Target block hash that we want to get witness for. block_hash: BlockHash, /// The sender for the response. - tx: oneshot::Sender, + tx: oneshot::Sender>, }, } diff --git a/crates/net/ress/src/handlers.rs b/crates/ress/protocol/src/handlers.rs similarity index 100% rename from crates/net/ress/src/handlers.rs rename to crates/ress/protocol/src/handlers.rs diff --git a/crates/net/ress/src/lib.rs b/crates/ress/protocol/src/lib.rs similarity index 100% rename from crates/net/ress/src/lib.rs rename to crates/ress/protocol/src/lib.rs diff --git a/crates/net/ress/src/message.rs b/crates/ress/protocol/src/message.rs similarity index 98% rename from crates/net/ress/src/message.rs rename to crates/ress/protocol/src/message.rs index ee602df9a6..5398f56e3a 100644 --- a/crates/net/ress/src/message.rs +++ b/crates/ress/protocol/src/message.rs @@ -3,7 +3,7 @@ //! //! Examples include creating, encoding, and decoding protocol messages. -use crate::{NodeType, StateWitnessNet}; +use crate::NodeType; use alloy_consensus::Header; use alloy_primitives::{ bytes::{Buf, BufMut}, @@ -87,7 +87,7 @@ impl RessProtocolMessage { } /// Execution witness response. - pub fn witness(request_id: u64, witness: StateWitnessNet) -> Self { + pub fn witness(request_id: u64, witness: Vec) -> Self { RessMessage::Witness(RequestPair { request_id, message: witness }).into_protocol_message() } @@ -211,7 +211,7 @@ pub enum RessMessage { /// Represents a witness request message. GetWitness(RequestPair), /// Represents a witness response message. - Witness(RequestPair), + Witness(RequestPair>), } impl RessMessage { diff --git a/crates/net/ress/src/provider.rs b/crates/ress/protocol/src/provider.rs similarity index 91% rename from crates/net/ress/src/provider.rs rename to crates/ress/protocol/src/provider.rs index bb565784b1..bac8b1f5f0 100644 --- a/crates/net/ress/src/provider.rs +++ b/crates/ress/protocol/src/provider.rs @@ -1,4 +1,4 @@ -use crate::{GetHeaders, StateWitnessNet}; +use crate::GetHeaders; use alloy_consensus::Header; use alloy_primitives::{Bytes, B256}; use alloy_rlp::Encodable; @@ -56,8 +56,5 @@ pub trait RessProtocolProvider: Send + Sync { fn bytecode(&self, code_hash: B256) -> ProviderResult>; /// Return witness by block hash. - fn witness( - &self, - block_hash: B256, - ) -> impl Future>> + Send; + fn witness(&self, block_hash: B256) -> impl Future>> + Send; } diff --git a/crates/net/ress/src/test_utils.rs b/crates/ress/protocol/src/test_utils.rs similarity index 92% rename from crates/net/ress/src/test_utils.rs rename to crates/ress/protocol/src/test_utils.rs index c0e27ddcc5..7201eabae9 100644 --- a/crates/net/ress/src/test_utils.rs +++ b/crates/ress/protocol/src/test_utils.rs @@ -1,6 +1,6 @@ //! Miscellaneous test utilities. -use crate::{RessProtocolProvider, StateWitnessNet}; +use crate::RessProtocolProvider; use alloy_consensus::Header; use alloy_primitives::{map::B256HashMap, Bytes, B256}; use reth_ethereum_primitives::BlockBody; @@ -27,8 +27,8 @@ impl RessProtocolProvider for NoopRessProtocolProvider { Ok(None) } - async fn witness(&self, _block_hash: B256) -> ProviderResult> { - Ok(None) + async fn witness(&self, _block_hash: B256) -> ProviderResult> { + Ok(Vec::new()) } } @@ -38,7 +38,7 @@ pub struct MockRessProtocolProvider { headers: Arc>>, block_bodies: Arc>>, bytecodes: Arc>>, - witnesses: Arc>>, + witnesses: Arc>>>, witness_delay: Option, } @@ -80,12 +80,12 @@ impl MockRessProtocolProvider { } /// Insert witness. - pub fn add_witness(&self, block_hash: B256, witness: StateWitnessNet) { + pub fn add_witness(&self, block_hash: B256, witness: Vec) { self.witnesses.lock().unwrap().insert(block_hash, witness); } /// Extend witnesses from iterator. - pub fn extend_witnesses(&self, witnesses: impl IntoIterator) { + pub fn extend_witnesses(&self, witnesses: impl IntoIterator)>) { self.witnesses.lock().unwrap().extend(witnesses); } } @@ -103,10 +103,10 @@ impl RessProtocolProvider for MockRessProtocolProvider { Ok(self.bytecodes.lock().unwrap().get(&code_hash).cloned()) } - async fn witness(&self, block_hash: B256) -> ProviderResult> { + async fn witness(&self, block_hash: B256) -> ProviderResult> { if let Some(delay) = self.witness_delay { tokio::time::sleep(delay).await; } - Ok(self.witnesses.lock().unwrap().get(&block_hash).cloned()) + Ok(self.witnesses.lock().unwrap().get(&block_hash).cloned().unwrap_or_default()) } } diff --git a/crates/net/ress/src/types.rs b/crates/ress/protocol/src/types.rs similarity index 56% rename from crates/net/ress/src/types.rs rename to crates/ress/protocol/src/types.rs index 6fe61312be..f9f2825fb7 100644 --- a/crates/net/ress/src/types.rs +++ b/crates/ress/protocol/src/types.rs @@ -1,10 +1,5 @@ -use alloy_primitives::{ - bytes::{Buf, BufMut}, - Bytes, B256, -}; -use alloy_rlp::{ - Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper, -}; +use alloy_primitives::bytes::{Buf, BufMut}; +use alloy_rlp::{Decodable, Encodable}; /// Node type variant. #[repr(u8)] @@ -57,27 +52,3 @@ impl NodeType { !self.is_stateful() || !other.is_stateful() } } - -/// State witness entry in the format used for networking. -#[derive(PartialEq, Eq, Clone, Debug, RlpEncodable, RlpDecodable)] -#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] -pub struct StateWitnessEntry { - /// Trie node hash. - pub hash: B256, - /// RLP-encoded trie node. - pub bytes: Bytes, -} - -/// State witness in the format used for networking. -#[derive(PartialEq, Eq, Clone, Default, Debug, RlpEncodableWrapper, RlpDecodableWrapper)] -#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] -pub struct StateWitnessNet( - /// State witness entries. - pub Vec, -); - -impl FromIterator<(B256, Bytes)> for StateWitnessNet { - fn from_iter>(iter: T) -> Self { - Self(iter.into_iter().map(|(hash, bytes)| StateWitnessEntry { hash, bytes }).collect()) - } -} diff --git a/crates/net/ress/tests/it/e2e.rs b/crates/ress/protocol/tests/it/e2e.rs similarity index 98% rename from crates/net/ress/tests/it/e2e.rs rename to crates/ress/protocol/tests/it/e2e.rs index 0c0b2191cb..5259e763a5 100644 --- a/crates/net/ress/tests/it/e2e.rs +++ b/crates/ress/protocol/tests/it/e2e.rs @@ -5,11 +5,11 @@ use reth_network_api::{ events::{NetworkEvent, PeerEvent}, test_utils::PeersHandleProvider, }; -use reth_network_ress::{ +use reth_provider::test_utils::MockEthProvider; +use reth_ress_protocol::{ test_utils::{MockRessProtocolProvider, NoopRessProtocolProvider}, GetHeaders, NodeType, ProtocolEvent, ProtocolState, RessPeerRequest, RessProtocolHandler, }; -use reth_provider::test_utils::MockEthProvider; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, oneshot}; @@ -150,7 +150,7 @@ async fn message_exchange() { // send get witness message from peer0 to peer1 let (tx, rx) = oneshot::channel(); peer0_conn.send(RessPeerRequest::GetWitness { block_hash: B256::ZERO, tx }).unwrap(); - assert_eq!(rx.await.unwrap(), Default::default()); + assert_eq!(rx.await.unwrap(), Vec::::new()); // send get bytecode message from peer0 to peer1 let (tx, rx) = oneshot::channel(); @@ -226,7 +226,7 @@ async fn witness_fetching_does_not_block() { assert!(bytecode_requested_at.elapsed() < witness_delay); // await for witness response - assert_eq!(witness_rx.await.unwrap(), Default::default()); + assert_eq!(witness_rx.await.unwrap(), Vec::::new()); assert!(witness_requested_at.elapsed() >= witness_delay); } diff --git a/crates/net/ress/tests/it/main.rs b/crates/ress/protocol/tests/it/main.rs similarity index 100% rename from crates/net/ress/tests/it/main.rs rename to crates/ress/protocol/tests/it/main.rs diff --git a/crates/ress/provider/Cargo.toml b/crates/ress/provider/Cargo.toml new file mode 100644 index 0000000000..bb83dfe655 --- /dev/null +++ b/crates/ress/provider/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "reth-ress-provider" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +reth-ress-protocol.workspace = true +reth-primitives-traits.workspace = true +reth-primitives.workspace = true +reth-provider.workspace = true +reth-network.workspace = true +reth-network-api.workspace = true +reth-evm.workspace = true +reth-revm = { workspace = true, features = ["witness"] } +reth-chain-state.workspace = true +reth-trie.workspace = true +reth-engine-primitives.workspace = true +reth-tasks.workspace = true +reth-tokio-util.workspace = true +reth-node-api.workspace = true + +# alloy +alloy-primitives.workspace = true +alloy-consensus.workspace = true + +# misc +futures.workspace = true +tokio.workspace = true +parking_lot.workspace = true +schnellru.workspace = true +eyre.workspace = true +tracing.workspace = true diff --git a/crates/ress/provider/src/lib.rs b/crates/ress/provider/src/lib.rs new file mode 100644 index 0000000000..c29bf710db --- /dev/null +++ b/crates/ress/provider/src/lib.rs @@ -0,0 +1,239 @@ +//! Reth implementation of [`reth_ress_protocol::RessProtocolProvider`]. + +use alloy_consensus::BlockHeader as _; +use alloy_primitives::{Bytes, B256}; +use parking_lot::Mutex; +use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, MemoryOverlayStateProvider}; +use reth_evm::execute::{BlockExecutorProvider, Executor}; +use reth_primitives::{Block, BlockBody, EthPrimitives, Header, RecoveredBlock}; +use reth_primitives_traits::Block as _; +use reth_provider::{ + BlockReader, BlockSource, ProviderError, ProviderResult, StateProvider, StateProviderFactory, +}; +use reth_ress_protocol::RessProtocolProvider; +use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord}; +use reth_tasks::TaskSpawner; +use reth_trie::{MultiProofTargets, Nibbles, TrieInput}; +use schnellru::{ByLength, LruMap}; +use std::{sync::Arc, time::Instant}; +use tokio::sync::{oneshot, Semaphore}; +use tracing::*; + +mod recorder; +use recorder::StateWitnessRecorderDatabase; + +mod pending_state; +pub use pending_state::*; + +/// Reth provider implementing [`RessProtocolProvider`]. +#[allow(missing_debug_implementations)] +pub struct RethRessProtocolProvider { + provider: P, + block_executor: E, + task_spawner: Box, + pending_state: PendingState, + witness_semaphore: Arc, + witness_cache: Arc>>>>, +} + +impl Clone for RethRessProtocolProvider { + fn clone(&self) -> Self { + Self { + provider: self.provider.clone(), + block_executor: self.block_executor.clone(), + task_spawner: self.task_spawner.clone(), + pending_state: self.pending_state.clone(), + witness_semaphore: self.witness_semaphore.clone(), + witness_cache: self.witness_cache.clone(), + } + } +} + +impl RethRessProtocolProvider +where + P: BlockReader + StateProviderFactory, + E: BlockExecutorProvider + Clone, +{ + /// Create new ress protocol provider. + pub fn new( + provider: P, + block_executor: E, + task_spawner: Box, + pending_state: PendingState, + witness_max_parallel: usize, + cache_size: u32, + ) -> eyre::Result { + Ok(Self { + provider, + block_executor, + task_spawner, + pending_state, + witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)), + witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))), + }) + } + + /// Retrieve a valid or invalid block by block hash. + pub fn block_by_hash( + &self, + block_hash: B256, + ) -> ProviderResult>>> { + // NOTE: we keep track of the pending state locally because reth does not provider a way + // to access non-canonical or invalid blocks via the provider. + let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) { + Some(block) + } else if let Some(block) = + self.provider.find_block_by_hash(block_hash, BlockSource::Any)? + { + let signers = block.recover_signers()?; + Some(Arc::new(block.into_recovered_with_signers(signers))) + } else { + // we attempt to look up invalid block last + self.pending_state.invalid_recovered_block(&block_hash) + }; + Ok(maybe_block) + } + + /// Generate state witness + pub fn generate_witness(&self, block_hash: B256) -> ProviderResult> { + if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() { + return Ok(witness.as_ref().clone()) + } + + let block = + self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?; + + let mut executed_ancestors = Vec::new(); + let mut ancestor_hash = block.parent_hash(); + let historical = 'sp: loop { + match self.provider.state_by_block_hash(ancestor_hash) { + Ok(state_provider) => break 'sp state_provider, + Err(_) => { + // Attempt to retrieve a valid executed block first. + let mut executed = self.pending_state.executed_block(&ancestor_hash); + + // If it's not present, attempt to lookup invalid block. + if executed.is_none() { + if let Some(invalid) = + self.pending_state.invalid_recovered_block(&ancestor_hash) + { + trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction"); + executed = Some(ExecutedBlockWithTrieUpdates { + block: ExecutedBlock { + recovered_block: invalid, + ..Default::default() + }, + ..Default::default() + }); + } + } + + let Some(executed) = executed else { + return Err(ProviderError::StateForHashNotFound(ancestor_hash)) + }; + ancestor_hash = executed.sealed_block().parent_hash(); + executed_ancestors.push(executed); + } + }; + }; + + // Execute all gathered blocks to gather accesses state. + let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new( + MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()), + )); + let mut record = ExecutionWitnessRecord::default(); + + // We allow block execution to fail, since we still want to record all accessed state by + // invalid blocks. + if let Err(error) = self.block_executor.executor(&mut db).execute_with_state_closure( + &block, + |state: &State<_>| { + record.record_executed_state(state); + }, + ) { + debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block"); + } + + // NOTE: there might be a race condition where target ancestor hash gets evicted from the + // database. + let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?; + let mut trie_input = TrieInput::default(); + for block in executed_ancestors.into_iter().rev() { + trie_input.append_cached_ref(&block.trie, &block.hashed_state); + } + let mut hashed_state = db.into_state(); + hashed_state.extend(record.hashed_state); + + // Gather the state witness. + let witness = if hashed_state.is_empty() { + // If no state was accessed, at least the root node must be present. + let multiproof = witness_state_provider.multiproof( + trie_input, + MultiProofTargets::from_iter([(B256::ZERO, Default::default())]), + )?; + let mut witness = Vec::new(); + if let Some(root_node) = + multiproof.account_subtree.into_inner().remove(&Nibbles::default()) + { + witness.push(root_node); + } + witness + } else { + witness_state_provider.witness(trie_input, hashed_state)? + }; + + // Insert witness into the cache. + self.witness_cache.lock().insert(block_hash, Arc::new(witness.clone())); + + Ok(witness) + } +} + +impl RessProtocolProvider for RethRessProtocolProvider +where + P: BlockReader + StateProviderFactory + Clone + 'static, + E: BlockExecutorProvider + Clone, +{ + fn header(&self, block_hash: B256) -> ProviderResult> { + trace!(target: "reth::ress_provider", %block_hash, "Serving header"); + Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone())) + } + + fn block_body(&self, block_hash: B256) -> ProviderResult> { + trace!(target: "reth::ress_provider", %block_hash, "Serving block body"); + Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone())) + } + + fn bytecode(&self, code_hash: B256) -> ProviderResult> { + trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode"); + let maybe_bytecode = 'bytecode: { + if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) { + break 'bytecode Some(bytecode); + } + + self.provider.latest()?.bytecode_by_hash(&code_hash)? + }; + + Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes())) + } + + async fn witness(&self, block_hash: B256) -> ProviderResult> { + trace!(target: "reth::ress_provider", %block_hash, "Serving witness"); + let started_at = Instant::now(); + let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?; + let this = self.clone(); + let (tx, rx) = oneshot::channel(); + self.task_spawner.spawn_blocking(Box::pin(async move { + let result = this.generate_witness(block_hash); + let _ = tx.send(result); + })); + match rx.await { + Ok(Ok(witness)) => { + trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness"); + Ok(witness) + } + Ok(Err(error)) => Err(error), + Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())), + } + } +} diff --git a/crates/ress/provider/src/pending_state.rs b/crates/ress/provider/src/pending_state.rs new file mode 100644 index 0000000000..0546edd6cc --- /dev/null +++ b/crates/ress/provider/src/pending_state.rs @@ -0,0 +1,128 @@ +use alloy_consensus::BlockHeader as _; +use alloy_primitives::{ + map::{B256HashSet, B256Map}, + BlockNumber, B256, +}; +use futures::StreamExt; +use parking_lot::RwLock; +use reth_chain_state::ExecutedBlockWithTrieUpdates; +use reth_node_api::{BeaconConsensusEngineEvent, NodePrimitives}; +use reth_primitives::{Bytecode, EthPrimitives, RecoveredBlock}; +use reth_provider::BlockNumReader; +use reth_tokio_util::EventStream; +use std::{collections::BTreeMap, sync::Arc}; +use tracing::*; + +/// Pending state for [`crate::RethRessProtocolProvider`]. +#[derive(Clone, Default, Debug)] +pub struct PendingState(Arc>>); + +#[derive(Default, Debug)] +struct PendingStateInner { + blocks_by_hash: B256Map>, + invalid_blocks_by_hash: B256Map>>, + block_hashes_by_number: BTreeMap, +} + +impl PendingState { + /// Insert executed block with trie updates. + pub fn insert_block(&self, block: ExecutedBlockWithTrieUpdates) { + let mut this = self.0.write(); + let block_hash = block.recovered_block.hash(); + this.block_hashes_by_number + .entry(block.recovered_block.number()) + .or_default() + .insert(block_hash); + this.blocks_by_hash.insert(block_hash, block); + } + + /// Insert invalid block. + pub fn insert_invalid_block(&self, block: Arc>) { + let mut this = self.0.write(); + let block_hash = block.hash(); + this.block_hashes_by_number.entry(block.number()).or_default().insert(block_hash); + this.invalid_blocks_by_hash.insert(block_hash, block); + } + + /// Returns only valid executed blocks by hash. + pub fn executed_block(&self, hash: &B256) -> Option> { + self.0.read().blocks_by_hash.get(hash).cloned() + } + + /// Returns valid recovered block. + pub fn recovered_block(&self, hash: &B256) -> Option>> { + self.executed_block(hash).map(|b| b.recovered_block.clone()) + } + + /// Returns invalid recovered block. + pub fn invalid_recovered_block(&self, hash: &B256) -> Option>> { + self.0.read().invalid_blocks_by_hash.get(hash).cloned() + } + + /// Find bytecode in executed blocks state. + pub fn find_bytecode(&self, code_hash: B256) -> Option { + let this = self.0.read(); + for block in this.blocks_by_hash.values() { + if let Some(contract) = block.execution_output.bytecode(&code_hash) { + return Some(contract); + } + } + None + } + + /// Remove all blocks before the specified block number. + pub fn remove_before(&self, block_number: BlockNumber) -> u64 { + let mut removed = 0; + let mut this = self.0.write(); + while this + .block_hashes_by_number + .first_key_value() + .is_some_and(|(number, _)| number <= &block_number) + { + let (_, block_hashes) = this.block_hashes_by_number.pop_first().unwrap(); + for block_hash in block_hashes { + removed += 1; + this.blocks_by_hash.remove(&block_hash); + this.invalid_blocks_by_hash.remove(&block_hash); + } + } + removed + } +} + +/// A task to maintain pending state based on consensus engine events. +pub async fn maintain_pending_state

( + mut events: EventStream>, + provider: P, + pending_state: PendingState, +) where + P: BlockNumReader, +{ + while let Some(event) = events.next().await { + match event { + BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _) | + BeaconConsensusEngineEvent::ForkBlockAdded(block, _) => { + trace!(target: "reth::ress_provider", block = ? block.recovered_block().num_hash(), "Insert block into pending state"); + pending_state.insert_block(block); + } + BeaconConsensusEngineEvent::InvalidBlock(block) => { + if let Ok(block) = block.try_recover() { + trace!(target: "reth::ress_provider", block = ?block.num_hash(), "Insert invalid block into pending state"); + pending_state.insert_invalid_block(Arc::new(block)); + } + } + BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => { + if status.is_valid() { + let target = state.finalized_block_hash; + if let Ok(Some(block_number)) = provider.block_number(target) { + let count = pending_state.remove_before(block_number); + trace!(target: "reth::ress_provider", block_number, count, "Removing blocks before finalized"); + } + } + } + // ignore + BeaconConsensusEngineEvent::CanonicalChainCommitted(_, _) | + BeaconConsensusEngineEvent::LiveSyncProgress(_) => (), + } + } +} diff --git a/crates/ress/provider/src/recorder.rs b/crates/ress/provider/src/recorder.rs new file mode 100644 index 0000000000..b692dd9a4d --- /dev/null +++ b/crates/ress/provider/src/recorder.rs @@ -0,0 +1,56 @@ +use alloy_primitives::{keccak256, Address, B256, U256}; +use reth_revm::{ + state::{AccountInfo, Bytecode}, + Database, +}; +use reth_trie::{HashedPostState, HashedStorage}; + +/// The state witness recorder that records all state accesses during execution. +/// It does so by implementing the [`reth_revm::Database`] and recording accesses of accounts and +/// slots. +pub(crate) struct StateWitnessRecorderDatabase { + database: D, + state: HashedPostState, +} + +impl StateWitnessRecorderDatabase { + pub(crate) fn new(database: D) -> Self { + Self { database, state: Default::default() } + } + + pub(crate) fn into_state(self) -> HashedPostState { + self.state + } +} + +impl Database for StateWitnessRecorderDatabase { + type Error = D::Error; + + fn basic(&mut self, address: Address) -> Result, Self::Error> { + let maybe_account = self.database.basic(address)?; + let hashed_address = keccak256(address); + self.state.accounts.insert(hashed_address, maybe_account.as_ref().map(|acc| acc.into())); + Ok(maybe_account) + } + + fn storage(&mut self, address: Address, index: U256) -> Result { + let value = self.database.storage(address, index)?; + let hashed_address = keccak256(address); + let hashed_slot = keccak256(B256::from(index)); + self.state + .storages + .entry(hashed_address) + .or_insert_with(|| HashedStorage::new(false)) + .storage + .insert(hashed_slot, value); + Ok(value) + } + + fn block_hash(&mut self, number: u64) -> Result { + self.database.block_hash(number) + } + + fn code_by_hash(&mut self, code_hash: B256) -> Result { + self.database.code_by_hash(code_hash) + } +}