chore: remove ress crates from workspace (#22057)

Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Georgios Konstantopoulos
2026-02-11 08:39:56 -05:00
committed by GitHub
parent a5ced84098
commit 8a25d7d3cf
25 changed files with 198 additions and 2390 deletions

View File

@@ -0,0 +1,7 @@
---
reth: patch
reth-cli-commands: patch
reth-node-core: patch
---
Removed experimental ress protocol support for stateless Ethereum nodes.

298
Cargo.lock generated
View File

@@ -2719,12 +2719,12 @@ dependencies = [
[[package]]
name = "ctrlc"
version = "3.5.1"
version = "3.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
dependencies = [
"dispatch2",
"nix 0.30.1",
"nix 0.31.1",
"windows-sys 0.61.2",
]
@@ -2944,9 +2944,9 @@ dependencies = [
[[package]]
name = "deranged"
version = "0.5.5"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587"
checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4"
dependencies = [
"powerfmt",
"serde_core",
@@ -4277,6 +4277,19 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasip2",
"wasip3",
]
[[package]]
name = "ghash"
version = "0.5.1"
@@ -4846,6 +4859,12 @@ dependencies = [
"zerovec",
]
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]]
name = "ident_case"
version = "1.0.1"
@@ -5135,9 +5154,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "jemalloc_pprof"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74ff642505c7ce8d31c0d43ec0e235c6fd4585d9b8172d8f9dd04d36590200b5"
checksum = "8a0d44c349cfe2654897fadcb9de4f0bfbf48288ec344f700b2bd59f152dd209"
dependencies = [
"anyhow",
"libc",
@@ -5450,6 +5469,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "leb128fmt"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.180"
@@ -5702,9 +5727,9 @@ dependencies = [
[[package]]
name = "mappings"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db4d277bb50d4508057e7bddd7fcd19ef4a4cc38051b6a5a36868d75ae2cbeb9"
checksum = "8bab1e61a4b76757edb59cd81fcaa7f3ba9018d43b527d9abfad877b4c6c60f2"
dependencies = [
"anyhow",
"libc",
@@ -6038,9 +6063,9 @@ dependencies = [
[[package]]
name = "ntapi"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c70f219e21142367c70c0b30c6a9e3a14d55b4d12a204d897fbec83a0363f081"
checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae"
dependencies = [
"winapi",
]
@@ -6434,7 +6459,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost 0.14.3",
"prost",
"reqwest",
"thiserror 2.0.18",
"tokio",
@@ -6450,7 +6475,7 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost 0.14.3",
"prost",
"tonic",
"tonic-prost",
]
@@ -6793,16 +6818,16 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof_util"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4429d44e5e2c8a69399fc0070379201eed018e3df61e04eb7432811df073c224"
checksum = "eea0cc524de808a6d98d192a3d99fe95617031ad4a52ec0a0f987ef4432e8fe1"
dependencies = [
"anyhow",
"backtrace",
"flate2",
"num",
"paste",
"prost 0.13.5",
"prost",
]
[[package]]
@@ -6969,16 +6994,6 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive 0.13.5",
]
[[package]]
name = "prost"
version = "0.14.3"
@@ -6986,20 +7001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
dependencies = [
"bytes",
"prost-derive 0.14.3",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.114",
"prost-derive",
]
[[package]]
@@ -7244,9 +7246,9 @@ dependencies = [
[[package]]
name = "rapidhash"
version = "4.2.2"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71ec30b38a417407efe7676bad0ca6b78f995f810185ece9af3bd5dc561185a9"
checksum = "84816e4c99c467e92cf984ee6328caa976dfecd33a673544489d79ca2caaefe5"
dependencies = [
"rand 0.9.2",
"rustversion",
@@ -7512,7 +7514,6 @@ dependencies = [
"aquamarine",
"backon",
"clap",
"eyre",
"reth-chainspec",
"reth-cli-runner",
"reth-cli-util",
@@ -7521,8 +7522,6 @@ dependencies = [
"reth-db",
"reth-ethereum-cli",
"reth-ethereum-payload-builder",
"reth-ethereum-primitives",
"reth-evm",
"reth-network",
"reth-network-api",
"reth-node-api",
@@ -7534,8 +7533,6 @@ dependencies = [
"reth-payload-primitives",
"reth-primitives",
"reth-provider",
"reth-ress-protocol",
"reth-ress-provider",
"reth-revm",
"reth-rpc",
"reth-rpc-api",
@@ -7544,10 +7541,8 @@ dependencies = [
"reth-rpc-eth-types",
"reth-rpc-server-types",
"reth-tasks",
"reth-tokio-util",
"reth-transaction-pool",
"tempfile",
"tokio",
"tracing",
]
@@ -9727,58 +9722,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-ress-protocol"
version = "1.10.2"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"alloy-rlp",
"arbitrary",
"futures",
"proptest",
"proptest-arbitrary-interop",
"reth-eth-wire",
"reth-ethereum-primitives",
"reth-network",
"reth-network-api",
"reth-provider",
"reth-ress-protocol",
"reth-storage-errors",
"reth-tracing",
"strum",
"strum_macros",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "reth-ress-provider"
version = "1.10.2"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"eyre",
"futures",
"parking_lot",
"reth-chain-state",
"reth-errors",
"reth-ethereum-primitives",
"reth-evm",
"reth-node-api",
"reth-primitives-traits",
"reth-ress-protocol",
"reth-revm",
"reth-storage-api",
"reth-tasks",
"reth-tokio-util",
"reth-trie",
"schnellru",
"tokio",
"tracing",
]
[[package]]
name = "reth-revm"
version = "1.10.2"
@@ -11264,9 +11207,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.22"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "ryu-js"
@@ -12007,12 +11950,12 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.24.0"
version = "3.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c"
checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1"
dependencies = [
"fastrand 2.3.0",
"getrandom 0.3.4",
"getrandom 0.4.1",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -12363,9 +12306,9 @@ dependencies = [
[[package]]
name = "toml"
version = "0.9.11+spec-1.1.0"
version = "0.9.12+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46"
checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863"
dependencies = [
"indexmap 2.13.0",
"serde_core",
@@ -12399,9 +12342,9 @@ dependencies = [
[[package]]
name = "toml_parser"
version = "1.0.6+spec-1.1.0"
version = "1.0.7+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
checksum = "247eaa3197818b831697600aadf81514e577e0cba5eab10f7e064e78ae154df1"
dependencies = [
"winnow",
]
@@ -12445,7 +12388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6c55a2d6a14174563de34409c9f92ff981d006f56da9c6ecd40d9d4a31500b0"
dependencies = [
"bytes",
"prost 0.14.3",
"prost",
"tonic",
]
@@ -12849,9 +12792,9 @@ checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-ident"
version = "1.0.22"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e"
[[package]]
name = "unicode-segmentation"
@@ -13089,6 +13032,15 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasip3"
version = "0.4.0+wasi-0.3.0-rc-2026-01-06"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5"
dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.108"
@@ -13148,6 +13100,28 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "wasm-encoder"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319"
dependencies = [
"leb128fmt",
"wasmparser",
]
[[package]]
name = "wasm-metadata"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap 2.13.0",
"wasm-encoder",
"wasmparser",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
@@ -13161,6 +13135,18 @@ dependencies = [
"web-sys",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags 2.10.0",
"hashbrown 0.15.5",
"indexmap 2.13.0",
"semver 1.0.27",
]
[[package]]
name = "wasmtimer"
version = "0.4.3"
@@ -13699,6 +13685,88 @@ name = "wit-bindgen"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
dependencies = [
"wit-bindgen-rust-macro",
]
[[package]]
name = "wit-bindgen-core"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc"
dependencies = [
"anyhow",
"heck",
"wit-parser",
]
[[package]]
name = "wit-bindgen-rust"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap 2.13.0",
"prettyplease",
"syn 2.0.114",
"wasm-metadata",
"wit-bindgen-core",
"wit-component",
]
[[package]]
name = "wit-bindgen-rust-macro"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a"
dependencies = [
"anyhow",
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.114",
"wit-bindgen-core",
"wit-bindgen-rust",
]
[[package]]
name = "wit-component"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags 2.10.0",
"indexmap 2.13.0",
"log",
"serde",
"serde_derive",
"serde_json",
"wasm-encoder",
"wasm-metadata",
"wasmparser",
"wit-parser",
]
[[package]]
name = "wit-parser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap 2.13.0",
"log",
"semver 1.0.27",
"serde",
"serde_derive",
"serde_json",
"unicode-xid",
"wasmparser",
]
[[package]]
name = "write16"
@@ -13882,9 +13950,9 @@ dependencies = [
[[package]]
name = "zmij"
version = "1.0.19"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445"
checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7"
[[package]]
name = "zstd"

View File

@@ -83,8 +83,6 @@ members = [
"crates/prune/db",
"crates/prune/prune",
"crates/prune/types",
"crates/ress/protocol",
"crates/ress/provider",
"crates/revm/",
"crates/rpc/ipc/",
"crates/rpc/rpc-api/",
@@ -434,8 +432,6 @@ reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "34.0.0", default-features = false }

View File

@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-transaction-pool.workspace = true
reth-cli-runner.workspace = true
@@ -53,14 +52,10 @@ reth-payload-primitives.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# alloy
alloy-rpc-types = { workspace = true, features = ["engine"] }
@@ -68,13 +63,9 @@ alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
tracing.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# misc
aquamarine.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
[dev-dependencies]
backon.workspace = true

View File

@@ -205,9 +205,6 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{CliContext, CliRunner};
@@ -218,3 +215,4 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use tracing as _;

View File

@@ -8,7 +8,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
@@ -22,27 +22,13 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node_exit_future, .. } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.evm_config,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
node_exit_future.await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@@ -1,67 +0,0 @@
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::ConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
evm_config: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn_task(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
evm_config,
Box::new(task_executor.clone()),
args.max_witness_window,
args.witness_max_parallel,
args.witness_cache_size,
pending_state,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn_task(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}

View File

@@ -68,10 +68,6 @@ pub use benchmark_args::BenchmarkArgs;
mod engine;
pub use engine::{DefaultEngineValues, EngineArgs};
/// `RessArgs` for configuring ress subprotocol.
mod ress_args;
pub use ress_args::RessArgs;
/// `EraArgs` for configuring ERA files import.
mod era;
pub use era::{DefaultEraHost, EraArgs, EraSourceArgs};

View File

@@ -1,50 +0,0 @@
use clap::Args;
/// The default number of maximum active connections.
const MAX_ACTIVE_CONNECTIONS_DEFAULT: u64 = 5;
/// The default maximum witness lookback window.
const MAX_WITNESS_WINDOW_DEFAULT: u64 = 1024;
/// The default maximum number of witnesses to generate in parallel.
const WITNESS_MAX_PARALLEL_DEFAULT: usize = 5;
/// The default witness cache size.
const WITNESS_CACHE_SIZE_DEFAULT: u32 = 10;
/// Parameters for configuring the `ress` subprotocol.
#[derive(Debug, Clone, Args, PartialEq, Eq)]
#[command(next_help_heading = "Ress")]
pub struct RessArgs {
/// Enable support for `ress` subprotocol.
#[arg(long = "ress.enable", default_value_t = false)]
pub enabled: bool,
/// The maximum number of active connections for `ress` subprotocol.
#[arg(long = "ress.max-active-connections", default_value_t = MAX_ACTIVE_CONNECTIONS_DEFAULT)]
pub max_active_connections: u64,
/// The maximum witness lookback window.
#[arg(long = "ress.max-witness-window", default_value_t = MAX_WITNESS_WINDOW_DEFAULT)]
pub max_witness_window: u64,
/// The maximum number of witnesses to generate in parallel.
#[arg(long = "ress.witness-max-parallel", default_value_t = WITNESS_MAX_PARALLEL_DEFAULT)]
pub witness_max_parallel: usize,
/// Witness cache size.
#[arg(long = "ress.witness-cache-size", default_value_t = WITNESS_CACHE_SIZE_DEFAULT)]
pub witness_cache_size: u32,
}
impl Default for RessArgs {
fn default() -> Self {
Self {
enabled: false,
max_active_connections: MAX_ACTIVE_CONNECTIONS_DEFAULT,
max_witness_window: MAX_WITNESS_WINDOW_DEFAULT,
witness_max_parallel: WITNESS_MAX_PARALLEL_DEFAULT,
witness_cache_size: WITNESS_CACHE_SIZE_DEFAULT,
}
}
}

View File

@@ -1,65 +0,0 @@
[package]
name = "reth-ress-protocol"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-eth-wire.workspace = true
reth-network-api.workspace = true
reth-network.workspace = true
reth-storage-errors.workspace = true
reth-ethereum-primitives.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-consensus.workspace = true
# misc
futures.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
tracing.workspace = true
# feature `arbitrary`
arbitrary = { workspace = true, features = ["derive"], optional = true }
[dev-dependencies]
reth-eth-wire = { workspace = true, features = ["arbitrary"] }
reth-network = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
# enable `test-utils` feature on this crate
reth-ress-protocol = { path = ".", features = ["test-utils"] }
tokio.workspace = true
strum.workspace = true
strum_macros.workspace = true
arbitrary = { workspace = true, features = ["derive"] }
proptest.workspace = true
proptest-arbitrary-interop.workspace = true
[features]
test-utils = [
"reth-network/test-utils",
"reth-ress-protocol/test-utils",
"reth-ethereum-primitives/test-utils",
"reth-provider/test-utils",
]
arbitrary = [
"dep:arbitrary",
"reth-eth-wire/arbitrary",
"alloy-primitives/arbitrary",
"reth-ress-protocol/arbitrary",
"reth-ethereum-primitives/arbitrary",
"alloy-consensus/arbitrary",
]

View File

@@ -1,96 +0,0 @@
# Ress Protocol (RESS)
The `ress` protocol runs on top of [RLPx], allowing a stateless nodes to fetch necessary state data (witness, block, bytecode) from a stateful node. The protocol is an optional extension for peers that support (or are interested in) stateless Ethereum full nodes.
**Note**: In this context, “stateless nodes” does not imply holding zero state on disk. Rather, such nodes still maintain minimal or partial state (e.g., essential bytecodes), which is relatively small. For simplicity, we continue to use the term “stateless” throughout these documents.
The current version is `ress/0`.
## Overview
The `ress` protocol is designed to provide support for stateless nodes. Its goal is to enable the exchange of necessary block execution state from stateful nodes to a stateless nodes so that the latter can store in disk only the minimal required state (such as bytecode) and lazily fetch other state data. It supports retrieving a state witness for a target new payload from a stateful peer, as well as contract bytecode and full block data (headers and bodies). The `ress` protocol is intended to be run alongside other protocols (e.g., `eth`), rather than as a standalone protocol.
## Basic operation
Once a connection is established, a [NodeType] message must be sent. After the peer's node type is validated according to the connection rules, any other protocol messages may be sent. The `ress` session will be terminated if the peer combination is invalid (e.g., stateful-to-stateful connections are not needed).
Within a session, four types of messages can be exchanged: headers, bodies, bytecode, and witness.
During the startup phase, a stateless node downloads the necessary ancestor blocks (headers and bodies) via the header and body messages. When the stateless node receives a new payload through the engine API, it requests a witness—a compressed multi Merkle proof of state—using the witness message. From this witness, the stateless node can determine if any contract bytecode is missing by comparing it with its disk. It then requests any missing bytecode. All requests are sent to the connected stateful peer and occur synchronously.
## Protocol Messages
In most messages, the first element of the message data list is the request-id. For requests, this is a 64-bit integer value chosen by the requesting peer. The responding peer must mirror the value in the request-id element of the response message.
### NodeType (0x00)
`[nodetype]`
Informs a peer of its node type. This message should be sent immediately after the connection is established and before any other RESS protocol messages.
There are two types of nodes in the network:
| ID | Node Type |
| --- | --------- |
| 0 | Stateless |
| 1 | Stateful |
The following table shows which connections between node types are valid:
| | stateless | stateful |
| --------- | --------- | -------- |
| stateless | true | true |
| stateful | true | false |
### GetHeaders (0x01)
`[request-id: P, [blockhash: B_32, limit: P]]`
Require the peer to return a Headers message. The response must contain up to limit block headers, beginning at blockhash in the canonical chain and traversing towards the genesis block (descending order).
### Headers (0x02)
`[request-id: P, [header₁, header₂, ...]]`
This is the response to GetHeaders, containing the requested headers. The header list may be empty if none of the requested block headers were found. The number of headers that can be requested in a single message may be subject to implementation-defined limits.
### GetBlockBodies (0x03)
`[request-id: P, [blockhash₁: B_32, blockhash₂: B_32, ...]]`
This message requests block body data by hash. The number of blocks that can be requested in a single message may be subject to implementation-defined limits.
### BlockBodies (0x04)
`[request-id: P, [block-body₁, block-body₂, ...]]`
This is the response to GetBlockBodies. The items in the list contain the body data of the requested blocks. The list may be empty if none of the requested blocks were available.
### GetBytecode (0x05)
`[request-id: P, [codehash: B_32]]`
Require peer to return a bytecode message containing the bytecode of the given code hash.
### Bytecode (0x06)
`[request-id: P, [bytes]]`
This is the response to GetBytecode, providing the requested bytecode. Response corresponds to a code hash of the GetBytecode request.
### GetWitness (0x07)
`[request-id: P, [blockhash: B_32]]`
Require peer to return a state witness message containing the state witness of the given block hash.
### Witness (0x08)
`[request-id: P, [node₁: bytes, node₂: bytes, ...]]`
This is the response to GetWitness, providing the requested state witness. Response corresponds to a block hash of the GetWitness request.
[NodeType]: #NodeType-0x00
[RLPx]: https://github.com/ethereum/devp2p/blob/master/rlpx.md

View File

@@ -1,365 +0,0 @@
use crate::{GetHeaders, NodeType, RessMessage, RessProtocolMessage, RessProtocolProvider};
use alloy_consensus::Header;
use alloy_primitives::{bytes::BytesMut, BlockHash, Bytes, B256};
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use reth_eth_wire::{message::RequestPair, multiplex::ProtocolConnection};
use reth_ethereum_primitives::BlockBody;
use reth_network_api::{test_utils::PeersHandle, PeerId, ReputationChangeKind};
use reth_storage_errors::ProviderResult;
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
/// The connection handler for the custom `RLPx` protocol.
#[derive(Debug)]
pub struct RessProtocolConnection<P> {
/// Provider.
provider: P,
/// The type of this node..
node_type: NodeType,
/// Peers handle.
peers_handle: PeersHandle,
/// Peer ID.
peer_id: PeerId,
/// Protocol connection.
conn: ProtocolConnection,
/// Stream of incoming commands.
commands: UnboundedReceiverStream<RessPeerRequest>,
/// The total number of active connections.
active_connections: Arc<AtomicU64>,
/// Flag indicating whether the node type was sent to the peer.
node_type_sent: bool,
/// Flag indicating whether this stream has previously been terminated.
terminated: bool,
/// Incremental counter for request ids.
next_id: u64,
/// Collection of inflight requests.
inflight_requests: HashMap<u64, RessPeerRequest>,
/// Pending witness responses.
pending_witnesses: FuturesUnordered<WitnessFut>,
}
impl<P> RessProtocolConnection<P> {
/// Create new connection.
pub fn new(
provider: P,
node_type: NodeType,
peers_handle: PeersHandle,
peer_id: PeerId,
conn: ProtocolConnection,
commands: UnboundedReceiverStream<RessPeerRequest>,
active_connections: Arc<AtomicU64>,
) -> Self {
Self {
provider,
node_type,
peers_handle,
peer_id,
conn,
commands,
active_connections,
node_type_sent: false,
terminated: false,
next_id: 0,
inflight_requests: HashMap::default(),
pending_witnesses: FuturesUnordered::new(),
}
}
/// Returns the next request id
const fn next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 1;
id
}
/// Report bad message from current peer.
fn report_bad_message(&self) {
self.peers_handle.reputation_change(self.peer_id, ReputationChangeKind::BadMessage);
}
fn on_command(&mut self, command: RessPeerRequest) -> RessProtocolMessage {
let next_id = self.next_id();
let message = match &command {
RessPeerRequest::GetHeaders { request, .. } => {
RessProtocolMessage::get_headers(next_id, *request)
}
RessPeerRequest::GetBlockBodies { request, .. } => {
RessProtocolMessage::get_block_bodies(next_id, request.clone())
}
RessPeerRequest::GetWitness { block_hash, .. } => {
RessProtocolMessage::get_witness(next_id, *block_hash)
}
RessPeerRequest::GetBytecode { code_hash, .. } => {
RessProtocolMessage::get_bytecode(next_id, *code_hash)
}
};
self.inflight_requests.insert(next_id, command);
message
}
}
impl<P> RessProtocolConnection<P>
where
P: RessProtocolProvider + Clone + 'static,
{
fn on_headers_request(&self, request: GetHeaders) -> Vec<Header> {
match self.provider.headers(request) {
Ok(headers) => headers,
Err(error) => {
trace!(target: "ress::net::connection", peer_id = %self.peer_id, ?request, %error, "error retrieving headers");
Default::default()
}
}
}
fn on_block_bodies_request(&self, request: Vec<B256>) -> Vec<BlockBody> {
match self.provider.block_bodies(request.clone()) {
Ok(bodies) => bodies,
Err(error) => {
trace!(target: "ress::net::connection", peer_id = %self.peer_id, ?request, %error, "error retrieving block bodies");
Default::default()
}
}
}
fn on_bytecode_request(&self, code_hash: B256) -> Bytes {
match self.provider.bytecode(code_hash) {
Ok(Some(bytecode)) => bytecode,
Ok(None) => {
trace!(target: "ress::net::connection", peer_id = %self.peer_id, %code_hash, "bytecode not found");
Default::default()
}
Err(error) => {
trace!(target: "ress::net::connection", peer_id = %self.peer_id, %code_hash, %error, "error retrieving bytecode");
Default::default()
}
}
}
fn on_witness_response(
&self,
request: RequestPair<B256>,
witness_result: ProviderResult<Vec<Bytes>>,
) -> RessProtocolMessage {
let peer_id = self.peer_id;
let block_hash = request.message;
let witness = match witness_result {
Ok(witness) => {
trace!(target: "ress::net::connection", %peer_id, %block_hash, len = witness.len(), "witness found");
witness
}
Err(error) => {
trace!(target: "ress::net::connection", %peer_id, %block_hash, %error, "error retrieving witness");
Default::default()
}
};
RessProtocolMessage::witness(request.request_id, witness)
}
fn on_ress_message(&mut self, msg: RessProtocolMessage) -> OnRessMessageOutcome {
match msg.message {
RessMessage::NodeType(node_type) => {
if !self.node_type.is_valid_connection(&node_type) {
// Note types are not compatible, terminate the connection.
return OnRessMessageOutcome::Terminate;
}
}
RessMessage::GetHeaders(req) => {
let request = req.message;
trace!(target: "ress::net::connection", peer_id = %self.peer_id, ?request, "serving headers");
let header = self.on_headers_request(request);
let response = RessProtocolMessage::headers(req.request_id, header);
return OnRessMessageOutcome::Response(response.encoded());
}
RessMessage::GetBlockBodies(req) => {
let request = req.message;
trace!(target: "ress::net::connection", peer_id = %self.peer_id, ?request, "serving block bodies");
let bodies = self.on_block_bodies_request(request);
let response = RessProtocolMessage::block_bodies(req.request_id, bodies);
return OnRessMessageOutcome::Response(response.encoded());
}
RessMessage::GetBytecode(req) => {
let code_hash = req.message;
trace!(target: "ress::net::connection", peer_id = %self.peer_id, %code_hash, "serving bytecode");
let bytecode = self.on_bytecode_request(code_hash);
let response = RessProtocolMessage::bytecode(req.request_id, bytecode);
return OnRessMessageOutcome::Response(response.encoded());
}
RessMessage::GetWitness(req) => {
let block_hash = req.message;
trace!(target: "ress::net::connection", peer_id = %self.peer_id, %block_hash, "serving witness");
let provider = self.provider.clone();
self.pending_witnesses.push(Box::pin(async move {
let result = provider.witness(block_hash).await;
(req, result)
}));
}
RessMessage::Headers(res) => {
if let Some(RessPeerRequest::GetHeaders { tx, .. }) =
self.inflight_requests.remove(&res.request_id)
{
let _ = tx.send(res.message);
} else {
self.report_bad_message();
}
}
RessMessage::BlockBodies(res) => {
if let Some(RessPeerRequest::GetBlockBodies { tx, .. }) =
self.inflight_requests.remove(&res.request_id)
{
let _ = tx.send(res.message);
} else {
self.report_bad_message();
}
}
RessMessage::Bytecode(res) => {
if let Some(RessPeerRequest::GetBytecode { tx, .. }) =
self.inflight_requests.remove(&res.request_id)
{
let _ = tx.send(res.message);
} else {
self.report_bad_message();
}
}
RessMessage::Witness(res) => {
if let Some(RessPeerRequest::GetWitness { tx, .. }) =
self.inflight_requests.remove(&res.request_id)
{
let _ = tx.send(res.message);
} else {
self.report_bad_message();
}
}
};
OnRessMessageOutcome::None
}
}
impl<P> Drop for RessProtocolConnection<P> {
fn drop(&mut self) {
let _ = self
.active_connections
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| Some(c.saturating_sub(1)));
}
}
impl<P> Stream for RessProtocolConnection<P>
where
P: RessProtocolProvider + Clone + Unpin + 'static,
{
type Item = BytesMut;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.terminated {
return Poll::Ready(None)
}
if !this.node_type_sent {
this.node_type_sent = true;
return Poll::Ready(Some(RessProtocolMessage::node_type(this.node_type).encoded()))
}
'conn: loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
let message = this.on_command(cmd);
let encoded = message.encoded();
trace!(target: "ress::net::connection", peer_id = %this.peer_id, ?message, encoded = alloy_primitives::hex::encode(&encoded), "Sending peer command");
return Poll::Ready(Some(encoded));
}
if let Poll::Ready(Some((request, witness_result))) =
this.pending_witnesses.poll_next_unpin(cx)
{
let response = this.on_witness_response(request, witness_result);
return Poll::Ready(Some(response.encoded()));
}
if let Poll::Ready(maybe_msg) = this.conn.poll_next_unpin(cx) {
let Some(next) = maybe_msg else { break 'conn };
let msg = match RessProtocolMessage::decode_message(&mut &next[..]) {
Ok(msg) => {
trace!(target: "ress::net::connection", peer_id = %this.peer_id, message = ?msg.message_type, "Processing message");
msg
}
Err(error) => {
trace!(target: "ress::net::connection", peer_id = %this.peer_id, %error, "Error decoding peer message");
this.report_bad_message();
continue;
}
};
match this.on_ress_message(msg) {
OnRessMessageOutcome::Response(bytes) => return Poll::Ready(Some(bytes)),
OnRessMessageOutcome::Terminate => break 'conn,
OnRessMessageOutcome::None => {}
};
continue;
}
return Poll::Pending;
}
// Terminating the connection.
this.terminated = true;
Poll::Ready(None)
}
}
type WitnessFut =
Pin<Box<dyn Future<Output = (RequestPair<B256>, ProviderResult<Vec<Bytes>>)> + Send>>;
/// Ress peer request.
#[derive(Debug)]
pub enum RessPeerRequest {
/// Get block headers.
GetHeaders {
/// The request for block headers.
request: GetHeaders,
/// The sender for the response.
tx: oneshot::Sender<Vec<Header>>,
},
/// Get block bodies.
GetBlockBodies {
/// The request for block bodies.
request: Vec<BlockHash>,
/// The sender for the response.
tx: oneshot::Sender<Vec<BlockBody>>,
},
/// Get bytecode for specific code hash
GetBytecode {
/// Target code hash that we want to get bytecode for.
code_hash: B256,
/// The sender for the response.
tx: oneshot::Sender<Bytes>,
},
/// Get witness for specific block.
GetWitness {
/// Target block hash that we want to get witness for.
block_hash: BlockHash,
/// The sender for the response.
tx: oneshot::Sender<Vec<Bytes>>,
},
}
enum OnRessMessageOutcome {
/// Response to send to the peer.
Response(BytesMut),
/// Terminate the connection.
Terminate,
/// No action.
None,
}

View File

@@ -1,185 +0,0 @@
use crate::{
connection::{RessPeerRequest, RessProtocolConnection},
NodeType, RessProtocolMessage, RessProtocolProvider,
};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler};
use reth_network_api::{test_utils::PeersHandle, Direction, PeerId};
use std::{
fmt,
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
/// The events that can be emitted by our custom protocol.
#[derive(Debug)]
pub enum ProtocolEvent {
/// Connection established.
Established {
/// Connection direction.
direction: Direction,
/// Peer ID.
peer_id: PeerId,
/// Sender part for forwarding commands.
to_connection: mpsc::UnboundedSender<RessPeerRequest>,
},
/// Number of max active connections exceeded. New connection was rejected.
MaxActiveConnectionsExceeded {
/// The current number
num_active: u64,
},
}
/// Protocol state is a helper struct to store the protocol events.
#[derive(Clone, Debug)]
pub struct ProtocolState {
/// Protocol event sender.
pub events_sender: mpsc::UnboundedSender<ProtocolEvent>,
/// The number of active connections.
pub active_connections: Arc<AtomicU64>,
}
impl ProtocolState {
/// Create new protocol state.
pub fn new(events_sender: mpsc::UnboundedSender<ProtocolEvent>) -> Self {
Self { events_sender, active_connections: Arc::default() }
}
/// Returns the current number of active connections.
pub fn active_connections(&self) -> u64 {
self.active_connections.load(Ordering::Relaxed)
}
}
/// The protocol handler takes care of incoming and outgoing connections.
#[derive(Clone)]
pub struct RessProtocolHandler<P> {
/// Provider.
pub provider: P,
/// Node type.
pub node_type: NodeType,
/// Peers handle.
pub peers_handle: PeersHandle,
/// The maximum number of active connections.
pub max_active_connections: u64,
/// Current state of the protocol.
pub state: ProtocolState,
}
impl<P> fmt::Debug for RessProtocolHandler<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RessProtocolHandler")
.field("node_type", &self.node_type)
.field("peers_handle", &self.peers_handle)
.field("max_active_connections", &self.max_active_connections)
.field("state", &self.state)
.finish_non_exhaustive()
}
}
impl<P> ProtocolHandler for RessProtocolHandler<P>
where
P: RessProtocolProvider + Clone + Unpin + 'static,
{
type ConnectionHandler = Self;
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
let num_active = self.state.active_connections();
if num_active >= self.max_active_connections {
trace!(
target: "ress::net",
num_active, max_connections = self.max_active_connections, %socket_addr,
"ignoring incoming connection, max active reached"
);
let _ = self
.state
.events_sender
.send(ProtocolEvent::MaxActiveConnectionsExceeded { num_active });
None
} else {
Some(self.clone())
}
}
fn on_outgoing(
&self,
socket_addr: SocketAddr,
peer_id: PeerId,
) -> Option<Self::ConnectionHandler> {
let num_active = self.state.active_connections();
if num_active >= self.max_active_connections {
trace!(
target: "ress::net",
num_active, max_connections = self.max_active_connections, %socket_addr, %peer_id,
"ignoring outgoing connection, max active reached"
);
let _ = self
.state
.events_sender
.send(ProtocolEvent::MaxActiveConnectionsExceeded { num_active });
None
} else {
Some(self.clone())
}
}
}
impl<P> ConnectionHandler for RessProtocolHandler<P>
where
P: RessProtocolProvider + Clone + Unpin + 'static,
{
type Connection = RessProtocolConnection<P>;
fn protocol(&self) -> Protocol {
RessProtocolMessage::protocol()
}
fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: Direction,
_peer_id: PeerId,
) -> OnNotSupported {
if self.node_type.is_stateful() {
OnNotSupported::KeepAlive
} else {
OnNotSupported::Disconnect
}
}
fn into_connection(
self,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
let (tx, rx) = mpsc::unbounded_channel();
// Emit connection established event.
self.state
.events_sender
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
.ok();
// Increment the number of active sessions.
self.state.active_connections.fetch_add(1, Ordering::Relaxed);
RessProtocolConnection::new(
self.provider.clone(),
self.node_type,
self.peers_handle,
peer_id,
conn,
UnboundedReceiverStream::from(rx),
self.state.active_connections,
)
}
}

View File

@@ -1,53 +0,0 @@
//! RESS protocol for stateless Ethereum nodes.
//!
//! Enables stateless nodes to fetch execution witnesses, bytecode, and block data from
//! stateful peers for minimal on-disk state with full execution capability.
//!
//! ## Node Types
//!
//! - **Stateless**: Minimal state, requests data on-demand
//! - **Stateful**: Full Ethereum nodes providing state data
//!
//! Valid connections: Stateless ↔ Stateless ✅, Stateless ↔ Stateful ✅, Stateful ↔ Stateful ❌
//!
//! ## Messages
//!
//! - `NodeType (0x00)`: Handshake
//! - `GetHeaders/Headers (0x01/0x02)`: Block headers
//! - `GetBlockBodies/BlockBodies (0x03/0x04)`: Block bodies
//! - `GetBytecode/Bytecode (0x05/0x06)`: Contract bytecode
//! - `GetWitness/Witness (0x07/0x08)`: Execution witnesses
//!
//! ## Flow
//!
//! 1. Exchange `NodeType` for compatibility
//! 2. Download ancestor blocks via headers/bodies
//! 3. For new payloads: request witness → get missing bytecode → execute
//!
//! Protocol version: `ress/1`
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
mod types;
pub use types::*;
mod message;
pub use message::*;
mod provider;
pub use provider::*;
mod handlers;
pub use handlers::*;
mod connection;
pub use connection::{RessPeerRequest, RessProtocolConnection};
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@@ -1,324 +0,0 @@
//! Implements Ress protocol
//! Defines structs/enums for messages, request-response pairs.
//!
//! Examples include creating, encoding, and decoding protocol messages.
use crate::NodeType;
use alloy_consensus::Header;
use alloy_primitives::{
bytes::{Buf, BufMut},
BlockHash, Bytes, B256,
};
use alloy_rlp::{BytesMut, Decodable, Encodable, RlpDecodable, RlpEncodable};
use reth_eth_wire::{message::RequestPair, protocol::Protocol, Capability};
use reth_ethereum_primitives::BlockBody;
/// An Ress protocol message, containing a message ID and payload.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct RessProtocolMessage {
/// The unique identifier representing the type of the Ress message.
pub message_type: RessMessageID,
/// The content of the message, including specific data based on the message type.
pub message: RessMessage,
}
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for RessProtocolMessage {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let message: RessMessage = u.arbitrary()?;
Ok(Self { message_type: message.message_id(), message })
}
}
impl RessProtocolMessage {
/// Returns the capability for the `ress` protocol.
pub const fn capability() -> Capability {
Capability::new_static("ress", 1)
}
/// Returns the protocol for the `ress` protocol.
pub const fn protocol() -> Protocol {
Protocol::new(Self::capability(), 9)
}
/// Create node type message.
pub const fn node_type(node_type: NodeType) -> Self {
RessMessage::NodeType(node_type).into_protocol_message()
}
/// Headers request.
pub const fn get_headers(request_id: u64, request: GetHeaders) -> Self {
RessMessage::GetHeaders(RequestPair { request_id, message: request })
.into_protocol_message()
}
/// Headers response.
pub const fn headers(request_id: u64, headers: Vec<Header>) -> Self {
RessMessage::Headers(RequestPair { request_id, message: headers }).into_protocol_message()
}
/// Block bodies request.
pub const fn get_block_bodies(request_id: u64, block_hashes: Vec<B256>) -> Self {
RessMessage::GetBlockBodies(RequestPair { request_id, message: block_hashes })
.into_protocol_message()
}
/// Block bodies response.
pub const fn block_bodies(request_id: u64, bodies: Vec<BlockBody>) -> Self {
RessMessage::BlockBodies(RequestPair { request_id, message: bodies })
.into_protocol_message()
}
/// Bytecode request.
pub const fn get_bytecode(request_id: u64, code_hash: B256) -> Self {
RessMessage::GetBytecode(RequestPair { request_id, message: code_hash })
.into_protocol_message()
}
/// Bytecode response.
pub const fn bytecode(request_id: u64, bytecode: Bytes) -> Self {
RessMessage::Bytecode(RequestPair { request_id, message: bytecode }).into_protocol_message()
}
/// Execution witness request.
pub const fn get_witness(request_id: u64, block_hash: BlockHash) -> Self {
RessMessage::GetWitness(RequestPair { request_id, message: block_hash })
.into_protocol_message()
}
/// Execution witness response.
pub const fn witness(request_id: u64, witness: Vec<Bytes>) -> Self {
RessMessage::Witness(RequestPair { request_id, message: witness }).into_protocol_message()
}
/// Return RLP encoded message.
pub fn encoded(&self) -> BytesMut {
let mut buf = BytesMut::with_capacity(self.length());
self.encode(&mut buf);
buf
}
/// Decodes a `RessProtocolMessage` from the given message buffer.
pub fn decode_message(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let message_type = RessMessageID::decode(buf)?;
let message = match message_type {
RessMessageID::NodeType => RessMessage::NodeType(NodeType::decode(buf)?),
RessMessageID::GetHeaders => RessMessage::GetHeaders(RequestPair::decode(buf)?),
RessMessageID::Headers => RessMessage::Headers(RequestPair::decode(buf)?),
RessMessageID::GetBlockBodies => RessMessage::GetBlockBodies(RequestPair::decode(buf)?),
RessMessageID::BlockBodies => RessMessage::BlockBodies(RequestPair::decode(buf)?),
RessMessageID::GetBytecode => RessMessage::GetBytecode(RequestPair::decode(buf)?),
RessMessageID::Bytecode => RessMessage::Bytecode(RequestPair::decode(buf)?),
RessMessageID::GetWitness => RessMessage::GetWitness(RequestPair::decode(buf)?),
RessMessageID::Witness => RessMessage::Witness(RequestPair::decode(buf)?),
};
Ok(Self { message_type, message })
}
}
impl Encodable for RessProtocolMessage {
fn encode(&self, out: &mut dyn BufMut) {
self.message_type.encode(out);
self.message.encode(out);
}
fn length(&self) -> usize {
self.message_type.length() + self.message.length()
}
}
/// Represents message IDs for `ress` protocol messages.
#[repr(u8)]
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[cfg_attr(test, derive(strum_macros::EnumCount))]
pub enum RessMessageID {
/// Node type message.
NodeType = 0x00,
/// Headers request message.
GetHeaders = 0x01,
/// Headers response message.
Headers = 0x02,
/// Block bodies request message.
GetBlockBodies = 0x03,
/// Block bodies response message.
BlockBodies = 0x04,
/// Bytecode request message.
GetBytecode = 0x05,
/// Bytecode response message.
Bytecode = 0x06,
/// Witness request message.
GetWitness = 0x07,
/// Witness response message.
Witness = 0x08,
}
impl Encodable for RessMessageID {
fn encode(&self, out: &mut dyn BufMut) {
out.put_u8(*self as u8);
}
fn length(&self) -> usize {
1
}
}
impl Decodable for RessMessageID {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
0x00 => Self::NodeType,
0x01 => Self::GetHeaders,
0x02 => Self::Headers,
0x03 => Self::GetBlockBodies,
0x04 => Self::BlockBodies,
0x05 => Self::GetBytecode,
0x06 => Self::Bytecode,
0x07 => Self::GetWitness,
0x08 => Self::Witness,
_ => return Err(alloy_rlp::Error::Custom("Invalid message type")),
};
buf.advance(1);
Ok(id)
}
}
/// Represents a message in the ress protocol.
#[derive(PartialEq, Eq, Clone, Debug)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub enum RessMessage {
/// Represents a node type message required for handshake.
NodeType(NodeType),
/// Represents a headers request message.
GetHeaders(RequestPair<GetHeaders>),
/// Represents a headers response message.
Headers(RequestPair<Vec<Header>>),
/// Represents a block bodies request message.
GetBlockBodies(RequestPair<Vec<B256>>),
/// Represents a block bodies response message.
BlockBodies(RequestPair<Vec<BlockBody>>),
/// Represents a bytecode request message.
GetBytecode(RequestPair<B256>),
/// Represents a bytecode response message.
Bytecode(RequestPair<Bytes>),
/// Represents a witness request message.
GetWitness(RequestPair<BlockHash>),
/// Represents a witness response message.
Witness(RequestPair<Vec<Bytes>>),
}
impl RessMessage {
/// Return [`RessMessageID`] that corresponds to the given message.
pub const fn message_id(&self) -> RessMessageID {
match self {
Self::NodeType(_) => RessMessageID::NodeType,
Self::GetHeaders(_) => RessMessageID::GetHeaders,
Self::Headers(_) => RessMessageID::Headers,
Self::GetBlockBodies(_) => RessMessageID::GetBlockBodies,
Self::BlockBodies(_) => RessMessageID::BlockBodies,
Self::GetBytecode(_) => RessMessageID::GetBytecode,
Self::Bytecode(_) => RessMessageID::Bytecode,
Self::GetWitness(_) => RessMessageID::GetWitness,
Self::Witness(_) => RessMessageID::Witness,
}
}
/// Convert message into [`RessProtocolMessage`].
pub const fn into_protocol_message(self) -> RessProtocolMessage {
let message_type = self.message_id();
RessProtocolMessage { message_type, message: self }
}
}
impl From<RessMessage> for RessProtocolMessage {
fn from(value: RessMessage) -> Self {
value.into_protocol_message()
}
}
impl Encodable for RessMessage {
fn encode(&self, out: &mut dyn BufMut) {
match self {
Self::NodeType(node_type) => node_type.encode(out),
Self::GetHeaders(request) => request.encode(out),
Self::Headers(header) => header.encode(out),
Self::GetBlockBodies(request) => request.encode(out),
Self::BlockBodies(body) => body.encode(out),
Self::GetBytecode(request) | Self::GetWitness(request) => request.encode(out),
Self::Bytecode(bytecode) => bytecode.encode(out),
Self::Witness(witness) => witness.encode(out),
}
}
fn length(&self) -> usize {
match self {
Self::NodeType(node_type) => node_type.length(),
Self::GetHeaders(request) => request.length(),
Self::Headers(header) => header.length(),
Self::GetBlockBodies(request) => request.length(),
Self::BlockBodies(body) => body.length(),
Self::GetBytecode(request) | Self::GetWitness(request) => request.length(),
Self::Bytecode(bytecode) => bytecode.length(),
Self::Witness(witness) => witness.length(),
}
}
}
/// A request for a peer to return block headers starting at the requested block.
/// The peer must return at most [`limit`](#structfield.limit) headers.
/// The headers will be returned starting at [`start_hash`](#structfield.start_hash), traversing
/// towards the genesis block.
#[derive(PartialEq, Eq, Clone, Copy, Debug, RlpEncodable, RlpDecodable)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct GetHeaders {
/// The block hash that the peer should start returning headers from.
pub start_hash: BlockHash,
/// The maximum number of headers to return.
pub limit: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use proptest_arbitrary_interop::arb;
use std::fmt;
use strum::EnumCount;
fn rlp_roundtrip<V>(value: V)
where
V: Encodable + Decodable + PartialEq + fmt::Debug,
{
let encoded = alloy_rlp::encode(&value);
let decoded = V::decode(&mut &encoded[..]);
assert_eq!(Ok(value), decoded);
}
#[test]
fn protocol_message_count() {
let protocol = RessProtocolMessage::protocol();
assert_eq!(protocol.messages(), RessMessageID::COUNT as u8);
}
proptest! {
#[test]
fn message_type_roundtrip(message_type in arb::<RessMessageID>()) {
rlp_roundtrip(message_type);
}
#[test]
fn message_roundtrip(message in arb::<RessProtocolMessage>()) {
let encoded = alloy_rlp::encode(&message);
let decoded = RessProtocolMessage::decode_message(&mut &encoded[..]);
assert_eq!(Ok(message), decoded);
}
}
}

View File

@@ -1,63 +0,0 @@
use crate::GetHeaders;
use alloy_consensus::Header;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::Encodable;
use reth_ethereum_primitives::BlockBody;
use reth_network::eth_requests::{MAX_BODIES_SERVE, MAX_HEADERS_SERVE, SOFT_RESPONSE_LIMIT};
use reth_storage_errors::provider::ProviderResult;
use std::future::Future;
/// A provider trait for ress protocol.
pub trait RessProtocolProvider: Send + Sync {
/// Return block header by hash.
fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>>;
/// Return block headers.
fn headers(&self, request: GetHeaders) -> ProviderResult<Vec<Header>> {
if request.limit == 0 {
return Ok(Vec::new());
}
let mut total_bytes = 0;
let mut block_hash = request.start_hash;
let mut headers = Vec::new();
while let Some(header) = self.header(block_hash)? {
block_hash = header.parent_hash;
total_bytes += header.length();
headers.push(header);
if headers.len() >= request.limit as usize ||
headers.len() >= MAX_HEADERS_SERVE ||
total_bytes > SOFT_RESPONSE_LIMIT
{
break
}
}
Ok(headers)
}
/// Return block body by hash.
fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>>;
/// Return block bodies.
fn block_bodies(&self, block_hashes: Vec<B256>) -> ProviderResult<Vec<BlockBody>> {
let mut total_bytes = 0;
let mut bodies = Vec::new();
for block_hash in block_hashes {
if let Some(body) = self.block_body(block_hash)? {
total_bytes += body.length();
bodies.push(body);
if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
break
}
} else {
break
}
}
Ok(bodies)
}
/// Return bytecode by code hash.
fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>>;
/// Return witness by block hash.
fn witness(&self, block_hash: B256) -> impl Future<Output = ProviderResult<Vec<Bytes>>> + Send;
}

View File

@@ -1,112 +0,0 @@
//! Miscellaneous test utilities.
use crate::RessProtocolProvider;
use alloy_consensus::Header;
use alloy_primitives::{map::B256Map, Bytes, B256};
use reth_ethereum_primitives::BlockBody;
use reth_storage_errors::provider::ProviderResult;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
/// Noop implementation of [`RessProtocolProvider`].
#[derive(Clone, Copy, Default, Debug)]
pub struct NoopRessProtocolProvider;
impl RessProtocolProvider for NoopRessProtocolProvider {
fn header(&self, _block_hash: B256) -> ProviderResult<Option<Header>> {
Ok(None)
}
fn block_body(&self, _block_hash: B256) -> ProviderResult<Option<BlockBody>> {
Ok(None)
}
fn bytecode(&self, _code_hash: B256) -> ProviderResult<Option<Bytes>> {
Ok(None)
}
async fn witness(&self, _block_hash: B256) -> ProviderResult<Vec<Bytes>> {
Ok(Vec::new())
}
}
/// Mock implementation of [`RessProtocolProvider`].
#[derive(Clone, Default, Debug)]
pub struct MockRessProtocolProvider {
headers: Arc<Mutex<B256Map<Header>>>,
block_bodies: Arc<Mutex<B256Map<BlockBody>>>,
bytecodes: Arc<Mutex<B256Map<Bytes>>>,
witnesses: Arc<Mutex<B256Map<Vec<Bytes>>>>,
witness_delay: Option<Duration>,
}
impl MockRessProtocolProvider {
/// Configure witness response delay.
pub const fn with_witness_delay(mut self, delay: Duration) -> Self {
self.witness_delay = Some(delay);
self
}
/// Insert header.
pub fn add_header(&self, block_hash: B256, header: Header) {
self.headers.lock().unwrap().insert(block_hash, header);
}
/// Extend headers from iterator.
pub fn extend_headers(&self, headers: impl IntoIterator<Item = (B256, Header)>) {
self.headers.lock().unwrap().extend(headers);
}
/// Insert block body.
pub fn add_block_body(&self, block_hash: B256, body: BlockBody) {
self.block_bodies.lock().unwrap().insert(block_hash, body);
}
/// Extend block bodies from iterator.
pub fn extend_block_bodies(&self, bodies: impl IntoIterator<Item = (B256, BlockBody)>) {
self.block_bodies.lock().unwrap().extend(bodies);
}
/// Insert bytecode.
pub fn add_bytecode(&self, code_hash: B256, bytecode: Bytes) {
self.bytecodes.lock().unwrap().insert(code_hash, bytecode);
}
/// Extend bytecodes from iterator.
pub fn extend_bytecodes(&self, bytecodes: impl IntoIterator<Item = (B256, Bytes)>) {
self.bytecodes.lock().unwrap().extend(bytecodes);
}
/// Insert witness.
pub fn add_witness(&self, block_hash: B256, witness: Vec<Bytes>) {
self.witnesses.lock().unwrap().insert(block_hash, witness);
}
/// Extend witnesses from iterator.
pub fn extend_witnesses(&self, witnesses: impl IntoIterator<Item = (B256, Vec<Bytes>)>) {
self.witnesses.lock().unwrap().extend(witnesses);
}
}
impl RessProtocolProvider for MockRessProtocolProvider {
fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
Ok(self.headers.lock().unwrap().get(&block_hash).cloned())
}
fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
Ok(self.block_bodies.lock().unwrap().get(&block_hash).cloned())
}
fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
Ok(self.bytecodes.lock().unwrap().get(&code_hash).cloned())
}
async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
if let Some(delay) = self.witness_delay {
tokio::time::sleep(delay).await;
}
Ok(self.witnesses.lock().unwrap().get(&block_hash).cloned().unwrap_or_default())
}
}

View File

@@ -1,63 +0,0 @@
use alloy_primitives::bytes::{Buf, BufMut};
use alloy_rlp::{Decodable, Encodable};
/// Represents the type of node in the RESS protocol.
///
/// This enum is used during the handshake phase to identify whether a peer is a stateless
/// or stateful node. The node type determines which connections are valid:
/// - Stateless ↔ Stateless: valid
/// - Stateless ↔ Stateful: valid
/// - Stateful ↔ Stateful: invalid
///
/// Use [`is_valid_connection`](Self::is_valid_connection) to check if a connection between
/// two node types is allowed.
#[repr(u8)]
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub enum NodeType {
/// Stateless ress node.
Stateless = 0x00,
/// Stateful reth node.
Stateful,
}
impl Encodable for NodeType {
fn encode(&self, out: &mut dyn BufMut) {
out.put_u8(*self as u8);
}
fn length(&self) -> usize {
1
}
}
impl Decodable for NodeType {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
0x00 => Self::Stateless,
0x01 => Self::Stateful,
_ => return Err(alloy_rlp::Error::Custom("Invalid message type")),
};
buf.advance(1);
Ok(id)
}
}
impl NodeType {
/// Return `true` if node type is stateful.
pub const fn is_stateful(&self) -> bool {
matches!(self, Self::Stateful)
}
/// Return `true` if the connection between this and other node types
/// can be considered valid.
///
/// Validity:
/// | stateless | stateful |
/// ----------|-----------|----------|
/// stateless | + | + |
/// stateful | + | - |
pub const fn is_valid_connection(&self, other: &Self) -> bool {
!self.is_stateful() || !other.is_stateful()
}
}

View File

@@ -1,299 +0,0 @@
use alloy_primitives::{Bytes, B256};
use futures::StreamExt;
use reth_network::{test_utils::Testnet, NetworkEventListenerProvider, Peers};
use reth_network_api::{
events::{NetworkEvent, PeerEvent},
test_utils::PeersHandleProvider,
};
use reth_provider::test_utils::MockEthProvider;
use reth_ress_protocol::{
test_utils::{MockRessProtocolProvider, NoopRessProtocolProvider},
GetHeaders, NodeType, ProtocolEvent, ProtocolState, RessPeerRequest, RessProtocolHandler,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
#[tokio::test(flavor = "multi_thread")]
async fn disconnect_on_stateful_pair() {
reth_tracing::init_test_tracing();
let mut net = Testnet::create_with(2, MockEthProvider::default()).await;
let protocol_provider = NoopRessProtocolProvider;
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let peer0 = &mut net.peers_mut()[0];
peer0.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateful,
peers_handle: peer0.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
let (tx, mut from_peer1) = mpsc::unbounded_channel();
let peer1 = &mut net.peers_mut()[1];
peer1.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateful,
peers_handle: peer1.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
// spawn and connect all the peers
let handle = net.spawn();
handle.connect_peers().await;
match from_peer0.recv().await.unwrap() {
ProtocolEvent::Established { peer_id, .. } => {
assert_eq!(peer_id, *handle.peers()[1].peer_id());
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
match from_peer1.recv().await.unwrap() {
ProtocolEvent::Established { peer_id, .. } => {
assert_eq!(peer_id, *handle.peers()[0].peer_id());
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
let mut peer0_event_listener = handle.peers()[0].network().event_listener();
loop {
if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) =
peer0_event_listener.next().await.unwrap()
{
assert_eq!(peer_id, *handle.peers()[1].peer_id());
break
}
}
let mut peer1_event_listener = handle.peers()[1].network().event_listener();
loop {
if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) =
peer1_event_listener.next().await.unwrap()
{
assert_eq!(peer_id, *handle.peers()[0].peer_id());
break
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn message_exchange() {
reth_tracing::init_test_tracing();
let mut net = Testnet::create_with(2, MockEthProvider::default()).await;
let protocol_provider = NoopRessProtocolProvider;
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let peer0 = &mut net.peers_mut()[0];
peer0.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateless,
peers_handle: peer0.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
let (tx, mut from_peer1) = mpsc::unbounded_channel();
let peer1 = &mut net.peers_mut()[1];
peer1.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateless,
peers_handle: peer1.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
// spawn and connect all the peers
let handle = net.spawn();
handle.connect_peers().await;
let peer0_to_peer1 = from_peer0.recv().await.unwrap();
let peer0_conn = match peer0_to_peer1 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, *handle.peers()[1].peer_id());
to_connection
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
let peer1_to_peer0 = from_peer1.recv().await.unwrap();
match peer1_to_peer0 {
ProtocolEvent::Established { peer_id, .. } => {
assert_eq!(peer_id, *handle.peers()[0].peer_id());
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
// send get headers message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn
.send(RessPeerRequest::GetHeaders {
request: GetHeaders { start_hash: B256::ZERO, limit: 1 },
tx,
})
.unwrap();
assert_eq!(rx.await.unwrap(), Vec::new());
// send get bodies message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(RessPeerRequest::GetBlockBodies { request: Vec::new(), tx }).unwrap();
assert_eq!(rx.await.unwrap(), Vec::new());
// send get witness message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(RessPeerRequest::GetWitness { block_hash: B256::ZERO, tx }).unwrap();
assert_eq!(rx.await.unwrap(), Vec::<Bytes>::new());
// send get bytecode message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(RessPeerRequest::GetBytecode { code_hash: B256::ZERO, tx }).unwrap();
assert_eq!(rx.await.unwrap(), Bytes::default());
}
#[tokio::test(flavor = "multi_thread")]
async fn witness_fetching_does_not_block() {
reth_tracing::init_test_tracing();
let mut net = Testnet::create_with(2, MockEthProvider::default()).await;
let witness_delay = Duration::from_millis(100);
let protocol_provider = MockRessProtocolProvider::default().with_witness_delay(witness_delay);
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let peer0 = &mut net.peers_mut()[0];
peer0.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider.clone(),
node_type: NodeType::Stateless,
peers_handle: peer0.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
let (tx, mut from_peer1) = mpsc::unbounded_channel();
let peer1 = &mut net.peers_mut()[1];
peer1.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateless,
peers_handle: peer1.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
// spawn and connect all the peers
let handle = net.spawn();
handle.connect_peers().await;
let peer0_to_peer1 = from_peer0.recv().await.unwrap();
let peer0_conn = match peer0_to_peer1 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, *handle.peers()[1].peer_id());
to_connection
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
let peer1_to_peer0 = from_peer1.recv().await.unwrap();
match peer1_to_peer0 {
ProtocolEvent::Established { peer_id, .. } => {
assert_eq!(peer_id, *handle.peers()[0].peer_id());
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
// send get witness message from peer0 to peer1
let witness_requested_at = Instant::now();
let (witness_tx, witness_rx) = oneshot::channel();
peer0_conn
.send(RessPeerRequest::GetWitness { block_hash: B256::ZERO, tx: witness_tx })
.unwrap();
// send get bytecode message from peer0 to peer1
let bytecode_requested_at = Instant::now();
let (tx, rx) = oneshot::channel();
peer0_conn.send(RessPeerRequest::GetBytecode { code_hash: B256::ZERO, tx }).unwrap();
assert_eq!(rx.await.unwrap(), Bytes::default());
assert!(bytecode_requested_at.elapsed() < witness_delay);
// await for witness response
assert_eq!(witness_rx.await.unwrap(), Vec::<Bytes>::new());
assert!(witness_requested_at.elapsed() >= witness_delay);
}
#[tokio::test(flavor = "multi_thread")]
async fn max_active_connections() {
reth_tracing::init_test_tracing();
let mut net = Testnet::create_with(3, MockEthProvider::default()).await;
let protocol_provider = NoopRessProtocolProvider;
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let peer0 = &mut net.peers_mut()[0];
peer0.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateful,
peers_handle: peer0.handle().peers_handle().clone(),
max_active_connections: 1,
state: ProtocolState::new(tx),
});
let (tx, _from_peer1) = mpsc::unbounded_channel();
let peer1 = &mut net.peers_mut()[1];
let peer1_id = peer1.peer_id();
let peer1_addr = peer1.local_addr();
peer1.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateless,
peers_handle: peer1.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
let (tx, _from_peer2) = mpsc::unbounded_channel();
let peer2 = &mut net.peers_mut()[2];
let peer2_id = peer2.peer_id();
let peer2_addr = peer2.local_addr();
peer2.add_rlpx_sub_protocol(RessProtocolHandler {
provider: protocol_provider,
node_type: NodeType::Stateless,
peers_handle: peer2.handle().peers_handle().clone(),
max_active_connections: 100,
state: ProtocolState::new(tx),
});
let handle = net.spawn();
// connect peers 0 and 1
let peer0_handle = &handle.peers()[0];
peer0_handle.network().add_peer(peer1_id, peer1_addr);
let _peer0_to_peer1 = match from_peer0.recv().await.unwrap() {
ProtocolEvent::Established { peer_id, to_connection, .. } => {
assert_eq!(peer_id, *peer1_id);
to_connection
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
// connect peers 0 and 2, max active connections exceeded.
peer0_handle.network().add_peer(peer2_id, peer2_addr);
match from_peer0.recv().await.unwrap() {
ProtocolEvent::MaxActiveConnectionsExceeded { num_active } => {
assert_eq!(num_active, 1);
}
ev => {
panic!("unexpected event: {ev:?}");
}
};
}

View File

@@ -1,5 +0,0 @@
#![allow(missing_docs)]
mod e2e;
const fn main() {}

View File

@@ -1,37 +0,0 @@
[package]
name = "reth-ress-provider"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
reth-ress-protocol.workspace = true
reth-primitives-traits.workspace = true
reth-storage-api.workspace = true
reth-errors.workspace = true
reth-evm.workspace = true
reth-revm = { workspace = true, features = ["witness"] }
reth-chain-state.workspace = true
reth-trie.workspace = true
reth-ethereum-primitives.workspace = true
reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-node-api.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-consensus.workspace = true
# misc
futures.workspace = true
tokio.workspace = true
parking_lot.workspace = true
schnellru.workspace = true
eyre.workspace = true
tracing.workspace = true

View File

@@ -1,240 +0,0 @@
//! Reth implementation of [`reth_ress_protocol::RessProtocolProvider`].
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
use alloy_consensus::BlockHeader as _;
use alloy_primitives::{Bytes, B256};
use parking_lot::Mutex;
use reth_chain_state::{ExecutedBlock, MemoryOverlayStateProvider};
use reth_errors::{ProviderError, ProviderResult};
use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
use reth_ress_protocol::RessProtocolProvider;
use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
use reth_tasks::TaskSpawner;
use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
use schnellru::{ByLength, LruMap};
use std::{sync::Arc, time::Instant};
use tokio::sync::{oneshot, Semaphore};
use tracing::*;
mod recorder;
use recorder::StateWitnessRecorderDatabase;
mod pending_state;
pub use pending_state::*;
use reth_storage_api::{BlockReader, BlockSource, StateProviderFactory};
/// Reth provider implementing [`RessProtocolProvider`].
#[expect(missing_debug_implementations)]
#[derive(Clone)]
pub struct RethRessProtocolProvider<P, E> {
provider: P,
evm_config: E,
task_spawner: Box<dyn TaskSpawner>,
max_witness_window: u64,
witness_semaphore: Arc<Semaphore>,
witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
pending_state: PendingState<EthPrimitives>,
}
impl<P, E> RethRessProtocolProvider<P, E>
where
P: BlockReader<Block = Block> + StateProviderFactory,
E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
{
/// Create new ress protocol provider.
pub fn new(
provider: P,
evm_config: E,
task_spawner: Box<dyn TaskSpawner>,
max_witness_window: u64,
witness_max_parallel: usize,
cache_size: u32,
pending_state: PendingState<EthPrimitives>,
) -> eyre::Result<Self> {
Ok(Self {
provider,
evm_config,
task_spawner,
max_witness_window,
witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
pending_state,
})
}
/// Retrieve a valid or invalid block by block hash.
pub fn block_by_hash(
&self,
block_hash: B256,
) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
// NOTE: we keep track of the pending state locally because reth does not provider a way
// to access non-canonical or invalid blocks via the provider.
let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
Some(block)
} else if let Some(block) =
self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
{
let signers = block.recover_signers()?;
Some(Arc::new(block.into_recovered_with_signers(signers)))
} else {
// we attempt to look up invalid block last
self.pending_state.invalid_recovered_block(&block_hash)
};
Ok(maybe_block)
}
/// Generate state witness
pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
return Ok(witness.as_ref().clone())
}
let block =
self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
let best_block_number = self.provider.best_block_number()?;
if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
return Err(ProviderError::TrieWitnessError(
"witness target block exceeds maximum witness window".to_owned(),
))
}
let mut executed_ancestors = Vec::new();
let mut ancestor_hash = block.parent_hash();
let historical = 'sp: loop {
match self.provider.state_by_block_hash(ancestor_hash) {
Ok(state_provider) => break 'sp state_provider,
Err(_) => {
// Attempt to retrieve a valid executed block first.
let mut executed = self.pending_state.executed_block(&ancestor_hash);
// If it's not present, attempt to lookup invalid block.
if executed.is_none() &&
let Some(invalid) =
self.pending_state.invalid_recovered_block(&ancestor_hash)
{
trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
executed =
Some(ExecutedBlock { recovered_block: invalid, ..Default::default() });
}
let Some(executed) = executed else {
return Err(ProviderError::StateForHashNotFound(ancestor_hash))
};
ancestor_hash = executed.sealed_block().parent_hash();
executed_ancestors.push(executed);
}
};
};
// Execute all gathered blocks to gather accesses state.
let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
));
let mut record = ExecutionWitnessRecord::default();
// We allow block execution to fail, since we still want to record all accessed state by
// invalid blocks.
if let Err(error) = self.evm_config.batch_executor(&mut db).execute_with_state_closure(
&block,
|state: &State<_>| {
record.record_executed_state(state);
},
) {
debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
}
// NOTE: there might be a race condition where target ancestor hash gets evicted from the
// database.
let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
let bundles: Vec<_> =
executed_ancestors.iter().rev().map(|block| block.trie_data()).collect();
let trie_input = TrieInput::from_blocks_sorted(
bundles.iter().map(|data| (data.hashed_state.as_ref(), data.trie_updates.as_ref())),
);
let mut hashed_state = db.into_state();
hashed_state.extend(record.hashed_state);
// Gather the state witness.
let witness = if hashed_state.is_empty() {
// If no state was accessed, at least the root node must be present.
let multiproof = witness_state_provider.multiproof(
trie_input,
MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
)?;
let mut witness = Vec::new();
if let Some(root_node) =
multiproof.account_subtree.into_inner().remove(&Nibbles::default())
{
witness.push(root_node);
}
witness
} else {
witness_state_provider.witness(trie_input, hashed_state)?
};
// Insert witness into the cache.
let cached_witness = Arc::new(witness.clone());
self.witness_cache.lock().insert(block_hash, cached_witness);
Ok(witness)
}
}
impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
where
P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
{
fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
trace!(target: "reth::ress_provider", %block_hash, "Serving header");
Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
}
fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
}
fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
let maybe_bytecode = 'bytecode: {
if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
break 'bytecode Some(bytecode);
}
self.provider.latest()?.bytecode_by_hash(&code_hash)?
};
Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
}
async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
let started_at = Instant::now();
let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
let this = self.clone();
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_blocking_task(Box::pin(async move {
let result = this.generate_witness(block_hash);
let _ = tx.send(result);
}));
match rx.await {
Ok(Ok(witness)) => {
trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
Ok(witness)
}
Ok(Err(error)) => Err(error),
Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
}
}
}

View File

@@ -1,129 +0,0 @@
use alloy_consensus::BlockHeader as _;
use alloy_primitives::{
map::{B256Map, B256Set},
BlockNumber, B256,
};
use futures::StreamExt;
use parking_lot::RwLock;
use reth_chain_state::ExecutedBlock;
use reth_ethereum_primitives::EthPrimitives;
use reth_node_api::{ConsensusEngineEvent, NodePrimitives};
use reth_primitives_traits::{Bytecode, RecoveredBlock};
use reth_storage_api::BlockNumReader;
use reth_tokio_util::EventStream;
use std::{collections::BTreeMap, sync::Arc};
use tracing::*;
/// Pending state for [`crate::RethRessProtocolProvider`].
#[derive(Clone, Default, Debug)]
pub struct PendingState<N: NodePrimitives>(Arc<RwLock<PendingStateInner<N>>>);
#[derive(Default, Debug)]
struct PendingStateInner<N: NodePrimitives> {
blocks_by_hash: B256Map<ExecutedBlock<N>>,
invalid_blocks_by_hash: B256Map<Arc<RecoveredBlock<N::Block>>>,
block_hashes_by_number: BTreeMap<BlockNumber, B256Set>,
}
impl<N: NodePrimitives> PendingState<N> {
/// Insert executed block with trie updates.
pub fn insert_block(&self, block: ExecutedBlock<N>) {
let mut this = self.0.write();
let block_hash = block.recovered_block.hash();
this.block_hashes_by_number
.entry(block.recovered_block.number())
.or_default()
.insert(block_hash);
this.blocks_by_hash.insert(block_hash, block);
}
/// Insert invalid block.
pub fn insert_invalid_block(&self, block: Arc<RecoveredBlock<N::Block>>) {
let mut this = self.0.write();
let block_hash = block.hash();
this.block_hashes_by_number.entry(block.number()).or_default().insert(block_hash);
this.invalid_blocks_by_hash.insert(block_hash, block);
}
/// Returns only valid executed blocks by hash.
pub fn executed_block(&self, hash: &B256) -> Option<ExecutedBlock<N>> {
self.0.read().blocks_by_hash.get(hash).cloned()
}
/// Returns valid recovered block.
pub fn recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
self.executed_block(hash).map(|b| b.recovered_block)
}
/// Returns invalid recovered block.
pub fn invalid_recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
self.0.read().invalid_blocks_by_hash.get(hash).cloned()
}
/// Find bytecode in executed blocks state.
pub fn find_bytecode(&self, code_hash: B256) -> Option<Bytecode> {
let this = self.0.read();
for block in this.blocks_by_hash.values() {
if let Some(contract) = block.execution_output.bytecode(&code_hash) {
return Some(contract);
}
}
None
}
/// Remove all blocks before the specified block number.
pub fn remove_before(&self, block_number: BlockNumber) -> u64 {
let mut removed = 0;
let mut this = self.0.write();
while this
.block_hashes_by_number
.first_key_value()
.is_some_and(|(number, _)| number <= &block_number)
{
let (_, block_hashes) = this.block_hashes_by_number.pop_first().unwrap();
for block_hash in block_hashes {
removed += 1;
this.blocks_by_hash.remove(&block_hash);
this.invalid_blocks_by_hash.remove(&block_hash);
}
}
removed
}
}
/// A task to maintain pending state based on consensus engine events.
pub async fn maintain_pending_state<P>(
mut events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
provider: P,
pending_state: PendingState<EthPrimitives>,
) where
P: BlockNumReader,
{
while let Some(event) = events.next().await {
match event {
ConsensusEngineEvent::CanonicalBlockAdded(block, _) |
ConsensusEngineEvent::ForkBlockAdded(block, _) => {
trace!(target: "reth::ress_provider", block = ? block.recovered_block().num_hash(), "Insert block into pending state");
pending_state.insert_block(block);
}
ConsensusEngineEvent::InvalidBlock(block) => {
if let Ok(block) = block.try_recover() {
trace!(target: "reth::ress_provider", block = ?block.num_hash(), "Insert invalid block into pending state");
pending_state.insert_invalid_block(Arc::new(block));
}
}
ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
if status.is_valid() {
let target = state.finalized_block_hash;
if let Ok(Some(block_number)) = provider.block_number(target) {
let count = pending_state.remove_before(block_number);
trace!(target: "reth::ress_provider", block_number, count, "Removing blocks before finalized");
}
}
}
// ignore
ConsensusEngineEvent::CanonicalChainCommitted(_, _) |
ConsensusEngineEvent::BlockReceived(_) => (),
}
}
}

View File

@@ -1,57 +0,0 @@
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_revm::{
state::{AccountInfo, Bytecode},
Database,
};
use reth_trie::{HashedPostState, HashedStorage};
/// The state witness recorder that records all state accesses during execution.
/// It does so by implementing the [`reth_revm::Database`] and recording accesses of accounts and
/// slots.
#[derive(Debug)]
pub(crate) struct StateWitnessRecorderDatabase<D> {
database: D,
state: HashedPostState,
}
impl<D> StateWitnessRecorderDatabase<D> {
pub(crate) fn new(database: D) -> Self {
Self { database, state: Default::default() }
}
pub(crate) fn into_state(self) -> HashedPostState {
self.state
}
}
impl<D: Database> Database for StateWitnessRecorderDatabase<D> {
type Error = D::Error;
fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
let maybe_account = self.database.basic(address)?;
let hashed_address = keccak256(address);
self.state.accounts.insert(hashed_address, maybe_account.as_ref().map(|acc| acc.into()));
Ok(maybe_account)
}
fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error> {
let value = self.database.storage(address, index)?;
let hashed_address = keccak256(address);
let hashed_slot = keccak256(B256::from(index));
self.state
.storages
.entry(hashed_address)
.or_insert_with(|| HashedStorage::new(false))
.storage
.insert(hashed_slot, value);
Ok(value)
}
fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error> {
self.database.block_hash(number)
}
fn code_by_hash(&mut self, code_hash: B256) -> Result<Bytecode, Self::Error> {
self.database.code_by_hash(code_hash)
}
}

View File

@@ -1117,30 +1117,6 @@ Storage:
WARNING: Changing this setting on an existing database requires a full resync.
Ress:
--ress.enable
Enable support for `ress` subprotocol
--ress.max-active-connections <MAX_ACTIVE_CONNECTIONS>
The maximum number of active connections for `ress` subprotocol
[default: 5]
--ress.max-witness-window <MAX_WITNESS_WINDOW>
The maximum witness lookback window
[default: 1024]
--ress.witness-max-parallel <WITNESS_MAX_PARALLEL>
The maximum number of witnesses to generate in parallel
[default: 5]
--ress.witness-cache-size <WITNESS_CACHE_SIZE>
Witness cache size
[default: 10]
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout