Compare commits

..

11 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
188 changed files with 2186 additions and 5305 deletions

View File

@@ -1,5 +0,0 @@
---
reth-transaction-pool: minor
---
Added `consensus_ref` method to `PoolTransaction` trait for borrowing consensus transactions without cloning.

View File

@@ -1,6 +0,0 @@
---
reth-rpc-eth-api: minor
reth-rpc-server-types: minor
---
Added `eth_getStorageValues` RPC method for batch storage slot retrieval across multiple addresses.

View File

@@ -1,5 +0,0 @@
---
reth-storage-api: patch
---
Added `Arc` to `auto_impl` derive for storage-api traits to support automatic `Arc` wrapper implementations.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

View File

@@ -1,6 +1,6 @@
[profile.default]
retries = { backoff = "exponential", count = 2, delay = "2s", jitter = true }
slow-timeout = { period = "30s", terminate-after = 2 }
slow-timeout = { period = "30s", terminate-after = 4 }
[[profile.default.overrides]]
filter = "test(general_state_tests)"

View File

@@ -124,7 +124,7 @@ jobs:
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: "1.93" # MSRV
toolchain: "1.88" # MSRV
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:

View File

@@ -102,7 +102,7 @@ jobs:
- name: Install cross main
id: cross_main
run: |
cargo install cross --locked --git https://github.com/cross-rs/cross
cargo install cross --git https://github.com/cross-rs/cross
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -38,7 +38,7 @@ jobs:
cache-on-failure: true
- name: Build reth
run: |
cargo install --locked --path bin/reth
cargo install --path bin/reth
- name: Run headers stage
run: |
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -313,74 +313,6 @@ GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
Before adding a comment, ask: Would someone reading just the current code (no PR, no history) find this helpful?
#### Rust Style Guides
##### Type Ordering in Files
When defining structs, traits, and functions in a file, follow this ordering convention. The file's primary type (matching the file name) comes first, followed by supporting public types, then private types and helpers.
```rust
use ...;
/// The primary type of this file (matches filename).
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// Followed by public auxiliary types that support the primary type
/// Configuration for the processor.
pub struct PayloadProcessorConfig { ... }
/// Result type returned by processor operations.
pub struct ProcessorResult { ... }
// Followed by public traits related to the primary type
pub trait ProcessorExt { ... }
// Followed by private helper types
struct InternalState { ... }
// Followed by private helper functions
fn validate_input() { ... }
```
❌ **Bad**: Adding new traits and auxiliary types **above** the file's primary type (see [#22133](https://github.com/paradigmxyz/reth/pull/22133)):
```rust
use ...;
// ❌ BAD - new auxiliary struct added before the file's main type
pub struct CacheWaitDurations { ... }
// ❌ BAD - new trait added before the file's main type
pub trait WaitForCaches { ... }
// The file's primary type is buried below unrelated additions
pub struct PayloadProcessor { ... }
```
✅ **Good**: New types go **after** the primary type:
```rust
use ...;
// ✅ The file's primary type stays at the top
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// ✅ Auxiliary types follow the primary type
pub struct CacheWaitDurations { ... }
pub trait WaitForCaches { ... }
impl WaitForCaches for PayloadProcessor { ... }
```
### Example Contribution Workflow
Let's say you want to fix a bug where external IP resolution fails on startup:

654
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
[workspace.package]
version = "1.11.0"
version = "1.11.3"
edition = "2024"
rust-version = "1.93"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
homepage = "https://paradigmxyz.github.io/reth"
repository = "https://github.com/paradigmxyz/reth"
@@ -27,6 +27,7 @@ members = [
"crates/engine/invalid-block-hooks/",
"crates/engine/local",
"crates/engine/primitives/",
"crates/engine/service",
"crates/engine/tree/",
"crates/engine/util/",
"crates/era",
@@ -55,7 +56,6 @@ members = [
"crates/net/discv5/",
"crates/net/dns/",
"crates/net/downloaders/",
"crates/net/snap-sync/",
"crates/net/ecies/",
"crates/net/eth-wire-types",
"crates/net/eth-wire/",
@@ -344,12 +344,12 @@ reth-discv4 = { path = "crates/net/discv4" }
reth-discv5 = { path = "crates/net/discv5" }
reth-dns-discovery = { path = "crates/net/dns" }
reth-downloaders = { path = "crates/net/downloaders" }
reth-snap-sync = { path = "crates/net/snap-sync" }
reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" }
reth-engine-local = { path = "crates/engine/local" }
reth-engine-primitives = { path = "crates/engine/primitives", default-features = false }
reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-service = { path = "crates/engine/service" }
reth-engine-util = { path = "crates/engine/util" }
reth-era = { path = "crates/era" }
reth-era-downloader = { path = "crates/era-downloader" }
@@ -449,14 +449,18 @@ revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = ["map-foldhash"] }
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
@@ -468,10 +472,15 @@ alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
@@ -485,7 +494,9 @@ alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
@@ -504,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -526,11 +540,14 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
quanta = "0.12"
paste = "1.0"
rand = "0.9"
rayon = "1.7"
@@ -548,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false, features = ["attributes"] }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -586,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -611,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }
@@ -666,7 +692,6 @@ cipher = "0.4.3"
comfy-table = "7.0"
concat-kdf = "0.1.0"
crossbeam-channel = "0.5.13"
crossbeam-utils = "0.8"
crossterm = "0.29.0"
csv = "1.3.0"
ctrlc = "3.4"

View File

@@ -19,11 +19,10 @@ pre-build = [
image = "ubuntu:24.04"
pre-build = [
"apt update",
"apt install --yes gcc gcc-riscv64-linux-gnu g++-riscv64-linux-gnu libclang-dev make",
"apt install --yes gcc gcc-riscv64-linux-gnu libclang-dev make",
]
env.passthrough = [
"CARGO_TARGET_RISCV64GC_UNKNOWN_LINUX_GNU_LINKER=riscv64-linux-gnu-gcc",
"CXX_riscv64gc_unknown_linux_gnu=riscv64-linux-gnu-g++",
]
[build.env]

View File

@@ -80,7 +80,7 @@ build-native-%:
#
# These commands require that:
#
# - `cross` is installed (`cargo install --locked cross`).
# - `cross` is installed (`cargo install cross`).
# - Docker is running.
# - The current user is in the `docker` group.
#
@@ -261,7 +261,7 @@ lint-typos: ensure-typos
ensure-typos:
@if ! command -v typos &> /dev/null; then \
echo "typos not found. Please install it by running the command 'cargo install --locked typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
echo "typos not found. Please install it by running the command 'cargo install typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
exit 1; \
fi

View File

@@ -93,7 +93,7 @@ When updating this, also update:
- .github/workflows/lint.yml
-->
The Minimum Supported Rust Version (MSRV) of this project is [1.93.0](https://blog.rust-lang.org/2026/01/22/Rust-1.93.0/).
The Minimum Supported Rust Version (MSRV) of this project is [1.88.0](https://blog.rust-lang.org/2025/06/26/Rust-1.88.0/).
See the docs for detailed instructions on how to [build from source](https://reth.rs/installation/source/).

View File

@@ -29,8 +29,6 @@ pub(crate) struct BenchContext {
pub(crate) next_block: u64,
/// Whether the chain is an OP rollup.
pub(crate) is_optimism: bool,
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
pub(crate) use_reth_namespace: bool,
}
impl BenchContext {
@@ -142,14 +140,6 @@ impl BenchContext {
};
let next_block = first_block.header.number + 1;
let use_reth_namespace = bench_args.reth_new_payload;
Ok(Self {
auth_provider,
block_provider,
benchmark_mode,
next_block,
is_optimism,
use_reth_namespace,
})
Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism })
}
}

View File

@@ -6,7 +6,7 @@ use crate::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth, payload_to_new_payload},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
@@ -47,14 +47,6 @@ pub struct Command {
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
}
/// Mode for determining when to stop ramping.
@@ -146,9 +138,6 @@ impl Command {
);
}
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
}
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
@@ -174,7 +163,7 @@ impl Command {
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params, execution_data) = payload_to_new_payload(
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
@@ -185,18 +174,13 @@ impl Command {
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file = GasRampPayloadFile {
version: version as u8,
block_hash,
params: params.clone(),
execution_data: Some(execution_data.clone()),
};
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
let reth_data = self.reth_new_payload.then_some(execution_data);
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
call_new_payload(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,

View File

@@ -20,7 +20,7 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload_with_reth},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
@@ -150,15 +150,10 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
use_reth_namespace,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -235,40 +230,16 @@ impl Command {
finalized_block_hash: finalized,
};
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let fcu_start = Instant::now();
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = if server_timings.is_some() {
// When using server-side latency for newPayload, derive total from the
// independently measured components to avoid mixing server-side and
// client-side (network-inclusive) timings.
np_latency + fcu_latency
} else {
start.elapsed()
};
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,

View File

@@ -8,7 +8,7 @@ use crate::{
NEW_PAYLOAD_OUTPUT_SUFFIX,
},
},
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
valid_payload::{block_to_new_payload, call_new_payload},
};
use alloy_provider::Provider;
use clap::Parser;
@@ -49,15 +49,10 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
use_reth_namespace,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -105,28 +100,12 @@ impl Command {
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload(&auth_provider, version, params).await?;
let latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),

View File

@@ -27,9 +27,6 @@ pub(crate) struct GasRampPayloadFile {
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
/// The execution data for `reth_newPayload`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
@@ -40,12 +37,6 @@ pub(crate) struct NewPayloadResult {
pub(crate) gas_used: u64,
/// The latency of the `newPayload` call.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
impl NewPayloadResult {
@@ -76,12 +67,9 @@ impl Serialize for NewPayloadResult {
{
// convert the time to microseconds
let time = self.latency.as_micros();
let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
let mut state = serializer.serialize_struct("NewPayloadResult", 2)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("latency", &time)?;
state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
state.end()
}
}
@@ -138,7 +126,7 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 10)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
@@ -148,18 +136,6 @@ impl Serialize for CombinedResult {
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
state.serialize_field("total_latency", &total_latency)?;
state.serialize_field(
"persistence_wait",
&self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
)?;
state.serialize_field(
"execution_cache_wait",
&self.new_payload_result.execution_cache_wait.as_micros(),
)?;
state.serialize_field(
"sparse_trie_wait",
&self.new_payload_result.sparse_trie_wait.as_micros(),
)?;
state.end()
}
}

View File

@@ -23,15 +23,12 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth},
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
ForkchoiceState, JwtSecret, PraguePayloadFields,
};
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reth_cli_runner::CliContext;
@@ -127,14 +124,6 @@ pub struct Command {
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
ws_rpc_url: Option<String>,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
}
/// A loaded payload ready for execution.
@@ -174,9 +163,6 @@ impl Command {
self.persistence_threshold
);
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
}
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
@@ -262,15 +248,7 @@ impl Command {
"Executing gas ramp payload (newPayload + FCU)"
);
let reth_data =
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
let _ = call_new_payload_with_reth(
&auth_provider,
payload.version,
payload.file.params.clone(),
reth_data,
)
.await?;
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
@@ -325,47 +303,20 @@ impl Command {
"Sending newPayload"
);
let params = serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?;
let status = auth_provider
.new_payload_v4(
execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
let reth_data = self.reth_new_payload.then(|| ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
),
});
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let server_timings = call_new_payload_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
params,
reth_data,
)
.await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
@@ -375,12 +326,10 @@ impl Command {
debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_start = Instant::now();
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency =
if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
@@ -403,7 +352,7 @@ impl Command {
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
debug!(target: "reth-bench", ?fcu_result, "Payload executed successfully");
debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
parent_hash = block_hash;
}

View File

@@ -6,14 +6,12 @@ use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
@@ -163,13 +161,10 @@ where
}
}
/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn block_to_new_payload(
block: AnyRpcBlock,
is_optimism: bool,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let block = block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
@@ -184,19 +179,13 @@ pub(crate) fn block_to_new_payload(
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
/// Converts an execution payload and sidecar into versioned engine API params and an
/// [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
@@ -255,7 +244,7 @@ pub(crate) fn payload_to_new_payload(
}
};
Ok((version, params, execution_data))
Ok((version, params))
}
/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
@@ -263,109 +252,32 @@ pub(crate) fn payload_to_new_payload(
///
/// # Panics
/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
#[allow(dead_code)]
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params, None).await
}
) -> TransportResult<()> {
let method = version.method_name();
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
#[derive(Debug, Deserialize)]
struct RethPayloadStatus {
#[serde(flatten)]
status: PayloadStatus,
latency_us: u64,
#[serde(default)]
persistence_wait_us: Option<u64>,
#[serde(default)]
execution_cache_wait_us: u64,
#[serde(default)]
sparse_trie_wait_us: u64,
}
debug!(target: "reth-bench", method, "Sending newPayload");
/// Server-side timing breakdown from `reth_newPayload` endpoint.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct NewPayloadTimingBreakdown {
/// Server-side execution latency.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
/// `reth_execution_data` is provided.
///
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
///
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
/// the standard engine namespace.
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
reth_execution_data: Option<ExecutionData>,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
if let Some(execution_data) = reth_execution_data {
let method = "reth_newPayload";
let reth_params = serde_json::to_value((execution_data.clone(),))
.expect("ExecutionData serialization cannot fail");
debug!(target: "reth-bench", method, "Sending newPayload");
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
while !resp.status.is_valid() {
if resp.status.is_invalid() {
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
)))
}
if resp.status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
format!("Invalid {method}: {status:?}"),
))))
}
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
} else {
let method = version.method_name();
debug!(target: "reth-bench", method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {status:?}")),
)))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
status = provider.client().request(method, &params).await?;
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
Ok(None)
status = provider.client().request(method, &params).await?;
}
Ok(())
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -285,6 +285,7 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
// (We can't just use `upsert` method with a dup cursor, it's not properly
// supported)
let nibbles = StoredNibblesSubKey(path);
let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
if storage_trie_cursor
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|v| v.nibbles == nibbles)
@@ -292,7 +293,6 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
{
storage_trie_cursor.delete_current()?;
}
let entry = StorageTrieEntry { nibbles, node };
storage_trie_cursor.upsert(account, &entry)?;
}
Output::Progress(path) => {

View File

@@ -384,19 +384,15 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
let mut total_size: Option<u64> = None;
let mut last_error: Option<eyre::Error> = None;
let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
Ok((final_path.clone(), size))
};
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
if let Some(total) = total_size &&
existing_size >= total
{
return finalize_download(total);
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, total));
}
if attempt > 1 {
@@ -480,7 +476,9 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
continue;
}
return finalize_download(current_total);
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, current_total));
}
Err(last_error

View File

@@ -139,7 +139,7 @@ where
total_decoded_blocks += file_client.headers_len();
total_decoded_txns += file_client.total_transactions();
let (mut pipeline, events, _runtime) = build_import_pipeline_impl(
let (mut pipeline, events) = build_import_pipeline_impl(
config,
provider_factory.clone(),
&consensus,
@@ -265,11 +265,7 @@ pub fn build_import_pipeline_impl<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
evm_config: E,
) -> eyre::Result<(
Pipeline<N>,
impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>,
reth_tasks::Runtime,
)>
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
where
N: ProviderNodeTypes,
C: FullConsensus<N::Primitives> + 'static,
@@ -285,12 +281,9 @@ where
.sealed_header(last_block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?;
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())
.expect("failed to create runtime");
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(file_client.clone(), consensus.clone())
.into_task_with(&runtime);
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
header_downloader.update_local_head(local_head);
@@ -298,7 +291,7 @@ where
let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), provider_factory.clone())
.into_task_with(&runtime);
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
body_downloader
@@ -333,5 +326,5 @@ where
let events = pipeline.events().map(Into::into);
Ok((pipeline, events, runtime))
Ok((pipeline, events))
}

View File

@@ -54,20 +54,12 @@ impl<T: PayloadTypes> PayloadTestContext<T> {
Ok(())
}
/// Wait until the best built payload is ready.
///
/// Panics if the payload builder does not produce a non-empty payload within 30 seconds.
/// Wait until the best built payload is ready
pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
let start = std::time::Instant::now();
loop {
let payload =
self.payload_builder.best_payload(payload_id).await.transpose().ok().flatten();
if payload.is_none_or(|p| p.block().body().transactions().is_empty()) {
assert!(
start.elapsed() < std::time::Duration::from_secs(30),
"timed out waiting for a non-empty payload for {payload_id} — \
check that the chain spec supports all generated tx types"
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
continue
}

View File

@@ -32,13 +32,6 @@ fn default_account_worker_count() -> usize {
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
/// The size of proof targets chunk optimized for small blocks (≤20M gas used).
/// Benchmarks: <https://gist.github.com/yongkangc/fda9c24846f0ba891376bcf81b002008>
pub const SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE: usize = 30;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
/// computation.
@@ -49,6 +42,18 @@ pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK
/// This will be deducted from the thread count of main reth global threadpool.
pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Returns the default maximum concurrency for prewarm task based on available parallelism.
fn default_prewarm_max_concurrency() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map_or(16, |n| n.get())
}
#[cfg(not(feature = "std"))]
{
16
}
}
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
@@ -156,6 +161,8 @@ pub struct TreeConfig {
/// where immediate payload regeneration is desired despite the head not changing or moving to
/// an ancestor.
always_process_payload_attributes_on_canonical_head: bool,
/// Maximum concurrency for the prewarm task.
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
@@ -202,6 +209,7 @@ impl Default for TreeConfig {
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: default_prewarm_max_concurrency(),
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
@@ -238,6 +246,7 @@ impl TreeConfig {
precompile_cache_disabled: bool,
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
@@ -266,6 +275,7 @@ impl TreeConfig {
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
@@ -523,6 +533,17 @@ impl TreeConfig {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Setter for prewarm max concurrency.
pub const fn with_prewarm_max_concurrency(mut self, prewarm_max_concurrency: usize) -> Self {
self.prewarm_max_concurrency = prewarm_max_concurrency;
self
}
/// Return the prewarm max concurrency.
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count

View File

@@ -15,7 +15,6 @@ use futures::{future::Either, FutureExt, TryFutureExt};
use reth_errors::RethResult;
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
use std::time::Duration;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Type alias for backwards compat
@@ -143,20 +142,6 @@ impl Future for PendingPayloadId {
}
}
/// Timing breakdown for `reth_newPayload` responses.
#[derive(Debug, Clone, Copy)]
pub struct NewPayloadTimings {
/// Server-side execution latency.
pub latency: Duration,
/// Time spent waiting for persistence to complete.
/// `None` when no persistence was in-flight.
pub persistence_wait: Option<Duration>,
/// Time spent waiting for the execution cache lock.
pub execution_cache_wait: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie_wait: Duration,
}
/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
/// consensus layer).
#[derive(Debug)]
@@ -168,16 +153,6 @@ pub enum BeaconEngineMessage<Payload: PayloadTypes> {
/// The sender for returning payload status result.
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
},
/// Message with new payload used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
RethNewPayload {
/// The execution payload received by Engine API.
payload: Payload::ExecutionData,
/// The sender for returning payload status result and timing breakdown.
tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
},
/// Message with updated forkchoice state.
ForkchoiceUpdated {
/// The updated forkchoice state.
@@ -203,15 +178,6 @@ impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
payload.block_hash()
)
}
Self::RethNewPayload { payload, .. } => {
write!(
f,
"RethNewPayload(parent: {}, number: {}, hash: {})",
payload.parent_hash(),
payload.block_number(),
payload.block_hash()
)
}
Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
// we don't want to print the entire payload attributes, because for OP this
// includes all txs
@@ -257,19 +223,6 @@ where
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a new payload message used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
pub async fn reth_new_payload(
&self,
payload: Payload::ExecutionData,
) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>

View File

@@ -0,0 +1,47 @@
[package]
name = "reth-engine-service"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-consensus.workspace = true
reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-node-types.workspace = true
reth-chainspec.workspace = true
reth-engine-primitives.workspace = true
reth-trie-db.workspace = true
# async
futures.workspace = true
pin-project.workspace = true
# misc
[dev-dependencies]
reth-engine-tree = { workspace = true, features = ["test-utils"] }
reth-ethereum-consensus.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true
reth-primitives-traits.workspace = true
reth-node-ethereum.workspace = true
reth-trie-db.workspace = true
alloy-eips.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

View File

@@ -0,0 +1,12 @@
//! Engine service implementation.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
/// Engine Service
pub mod service;

View File

@@ -0,0 +1,229 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_chainspec::EthChainSpec;
use reth_consensus::FullConsensus;
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
};
pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent,
};
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_trie_db::ChangesetCache;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
/// Alias for consensus engine stream.
pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
/// Alias for chain orchestrator.
type EngineServiceType<N, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<
EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
<N as NodeTypes>::Primitives,
>,
EngineMessageStream<<N as NodeTypes>::Payload>,
BasicBlockDownloader<Client, BlockTy<N>>,
>,
PipelineSync<N>,
>;
/// The type that drives the chain forward and communicates progress.
#[pin_project]
#[expect(missing_debug_implementations)]
// TODO(mattsse): remove hidden once fixed : <https://github.com/rust-lang/rust/issues/135363>
// otherwise rustdoc fails to resolve the alias
#[doc(hidden)]
pub struct EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
orchestrator: EngineServiceType<N, Client>,
}
impl<N, Client> EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
/// Constructor for `EngineService`.
#[expect(clippy::too_many_arguments)]
pub fn new<V, C>(
consensus: Arc<dyn FullConsensus<N::Primitives>>,
chain_spec: Arc<N::ChainSpec>,
client: Client,
incoming_requests: EngineMessageStream<N::Payload>,
pipeline: Pipeline<N>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self
where
V: EngineValidator<N::Payload>,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let engine_kind =
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
blockchain_db,
consensus,
payload_validator,
persistence_handle,
payload_builder,
canonical_in_memory_state,
tree_config,
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
}
/// Returns a mutable reference to the orchestrator.
pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
&mut self.orchestrator
}
}
impl<N, Client> Stream for EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
type Item = ChainEvent<ConsensusEngineEvent<N::Primitives>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;
StreamExt::poll_next_unpin(&mut orchestrator, cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook};
use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator};
use reth_ethereum_consensus::EthBeaconConsensus;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::EthEvmConfig;
use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_node_ethereum::EthereumEngineValidator;
use reth_primitives_traits::SealedHeader;
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
};
use reth_prune::Pruner;
use reth_tasks::TokioTaskExecutor;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
#[test]
fn eth_chain_orchestrator_build() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let client = TestFullBlockClient::default();
let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
let incoming_requests = UnboundedReceiverStream::new(rx);
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let blockchain_db =
BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
.unwrap();
let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
let evm_config = EthEvmConfig::new(chain_spec.clone());
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
blockchain_db.clone(),
consensus.clone(),
evm_config.clone(),
engine_payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
let (tx, _rx) = unbounded_channel();
let _eth_service = EngineService::new(
consensus,
chain_spec,
client,
Box::pin(incoming_requests),
pipeline,
pipeline_task_spawner,
provider_factory,
blockchain_db,
pruner,
PayloadBuilderHandle::new(tx),
engine_validator,
TreeConfig::default(),
sync_metrics_tx,
evm_config,
changeset_cache,
);
}
}

View File

@@ -29,7 +29,7 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-revm = { workspace = true, features = ["optional-balance-check"] }
reth-stages-api.workspace = true
reth-tasks = { workspace = true, features = ["rayon"] }
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie.workspace = true

View File

@@ -10,7 +10,7 @@
use futures::FutureExt;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::Runtime;
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
@@ -80,7 +80,7 @@ pub enum BackfillEvent {
#[derive(Debug)]
pub struct PipelineSync<N: ProviderNodeTypes> {
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Runtime,
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<N>,
@@ -90,7 +90,7 @@ pub struct PipelineSync<N: ProviderNodeTypes> {
impl<N: ProviderNodeTypes> PipelineSync<N> {
/// Create a new instance.
pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Runtime) -> Self {
pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self {
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(Box::new(pipeline))),
@@ -140,10 +140,10 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
async move {
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
},
}),
);
self.pipeline_state = PipelineState::Running(rx);
@@ -241,7 +241,7 @@ mod tests {
use reth_provider::test_utils::MockNodeTypesWithDB;
use reth_stages::ExecOutput;
use reth_stages_api::StageCheckpoint;
use reth_tasks::Runtime;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
struct TestHarness {
@@ -267,7 +267,7 @@ mod tests {
})]))
.build(chain_spec);
let pipeline_sync = PipelineSync::new(pipeline, Runtime::test());
let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),

View File

@@ -1,110 +0,0 @@
//! Engine orchestrator launch helper.
//!
//! Provides [`build_engine_orchestrator`](crate::launch::build_engine_orchestrator) which wires
//! together all engine components and returns a
//! [`ChainOrchestrator`](crate::chain::ChainOrchestrator) ready to be polled as a `Stream`.
use crate::{
backfill::PipelineSync,
chain::ChainOrchestrator,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches},
};
use futures::Stream;
use reth_consensus::FullConsensus;
use reth_engine_primitives::BeaconEngineMessage;
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::Runtime;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
/// Builds the engine [`ChainOrchestrator`] that drives the chain forward.
///
/// This spawns and wires together the following components:
///
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
/// blocks and performing pruning outside the critical consensus path.
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
/// messages to the tree handler and manages download requests.
/// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node
/// needs to catch up over large block ranges.
///
/// The returned orchestrator implements [`Stream`] and yields
/// [`ChainEvent`]s.
///
/// [`ChainEvent`]: crate::chain::ChainEvent
#[expect(clippy::too_many_arguments, clippy::type_complexity)]
pub fn build_engine_orchestrator<N, Client, S, V, C>(
engine_kind: EngineApiKind,
consensus: Arc<dyn FullConsensus<N::Primitives>>,
client: Client,
incoming_requests: S,
pipeline: Pipeline<N>,
pipeline_task_spawner: Runtime,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
S,
BasicBlockDownloader<Client, <N::Primitives as NodePrimitives>::Block>,
>,
PipelineSync<N>,
>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
V: EngineValidator<N::Payload> + WaitForCaches,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
blockchain_db,
consensus,
payload_validator,
persistence_handle,
payload_builder,
canonical_in_memory_state,
tree_config,
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
ChainOrchestrator::new(handler, backfill_sync)
}

View File

@@ -100,8 +100,6 @@ pub mod chain;
pub mod download;
/// Engine Api chain handler support.
pub mod engine;
/// Engine orchestrator launch helper.
pub mod launch;
/// Metrics support.
pub mod metrics;
/// The background writer service, coordinating write operations on static files and the database.

View File

@@ -4,7 +4,7 @@ use crossbeam_channel::Sender as CrossbeamSender;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
@@ -18,6 +18,7 @@ use std::{
Arc,
},
thread::JoinHandle,
time::Instant,
};
use thiserror::Error;
use tracing::{debug, error, instrument};
@@ -118,7 +119,7 @@ where
Ok(())
}
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(new_tip_num))]
fn on_remove_blocks_above(
&self,
new_tip_num: u64,

View File

@@ -845,8 +845,10 @@ impl SavedCache {
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state.
pub(crate) fn clear(&self) {
/// Clears all caches, resetting them to empty state,
/// and updates the hash of the block this cache belongs to.
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
self.hash = hash;
self.caches.clear();
}
}

View File

@@ -3,7 +3,7 @@ use alloy_primitives::{Address, StorageKey, StorageValue, B256};
use metrics::{Gauge, Histogram};
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives_traits::{Account, Bytecode, FastInstant as Instant};
use reth_primitives_traits::{Account, Bytecode};
use reth_provider::{
AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
StateProvider, StateRootProvider, StorageRootProvider,
@@ -14,7 +14,7 @@ use reth_trie::{
};
use std::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
time::{Duration, Instant},
};
/// Nanoseconds per second

View File

@@ -8,9 +8,9 @@ use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::{constants::gas_units::MEGAGAS, FastInstant as Instant};
use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_trie::updates::TrieUpdates;
use std::time::Duration;
use std::time::{Duration, Instant};
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
@@ -34,10 +34,6 @@ pub struct EngineApiMetrics {
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
#[allow(dead_code)]
pub(crate) bal: BalMetrics,
/// Gas-bucketed execution sub-phase metrics.
pub(crate) execution_gas_buckets: ExecutionGasBucketMetrics,
/// Gas-bucketed block validation sub-phase metrics.
pub(crate) block_validation_gas_buckets: BlockValidationGasBucketMetrics,
}
impl EngineApiMetrics {
@@ -86,22 +82,6 @@ impl EngineApiMetrics {
self.executor.post_execution_histogram.record(elapsed);
}
/// Records execution duration into the gas-bucketed execution histogram.
pub fn record_block_execution_gas_bucket(&self, gas_used: u64, elapsed: Duration) {
let idx = GasBucketMetrics::bucket_index(gas_used);
self.execution_gas_buckets.buckets[idx]
.execution_gas_bucket_histogram
.record(elapsed.as_secs_f64());
}
/// Records state root duration into the gas-bucketed block validation histogram.
pub fn record_state_root_gas_bucket(&self, gas_used: u64, elapsed_secs: f64) {
let idx = GasBucketMetrics::bucket_index(gas_used);
self.block_validation_gas_buckets.buckets[idx]
.state_root_gas_bucket_histogram
.record(elapsed_secs);
}
/// Records the time spent waiting for the next transaction from the iterator.
pub fn record_transaction_wait(&self, elapsed: Duration) {
self.executor.transaction_wait_histogram.record(elapsed);
@@ -300,8 +280,7 @@ impl GasBucketMetrics {
.record(gas_used as f64 / elapsed.as_secs_f64());
}
/// Returns the bucket index for a given gas value.
pub(crate) fn bucket_index(gas_used: u64) -> usize {
fn bucket_index(gas_used: u64) -> usize {
GAS_BUCKET_THRESHOLDS
.iter()
.position(|&threshold| gas_used < threshold)
@@ -309,7 +288,7 @@ impl GasBucketMetrics {
}
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
pub(crate) fn bucket_label(index: usize) -> String {
fn bucket_label(index: usize) -> String {
if index == 0 {
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
format!("<{hi}M")
@@ -324,56 +303,6 @@ impl GasBucketMetrics {
}
}
/// Per-gas-bucket execution duration metric.
#[derive(Clone, Metrics)]
#[metrics(scope = "sync.execution")]
pub(crate) struct ExecutionGasBucketSeries {
/// Gas-bucketed EVM execution duration.
pub(crate) execution_gas_bucket_histogram: Histogram,
}
/// Holds pre-initialized [`ExecutionGasBucketSeries`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct ExecutionGasBucketMetrics {
buckets: [ExecutionGasBucketSeries; NUM_GAS_BUCKETS],
}
impl Default for ExecutionGasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = GasBucketMetrics::bucket_label(i);
ExecutionGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
/// Per-gas-bucket block validation metrics (state root).
#[derive(Clone, Metrics)]
#[metrics(scope = "sync.block_validation")]
pub(crate) struct BlockValidationGasBucketSeries {
/// Gas-bucketed state root computation duration.
pub(crate) state_root_gas_bucket_histogram: Histogram,
}
/// Holds pre-initialized [`BlockValidationGasBucketSeries`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct BlockValidationGasBucketMetrics {
buckets: [BlockValidationGasBucketSeries; NUM_GAS_BUCKETS],
}
impl Default for BlockValidationGasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = GasBucketMetrics::bucket_label(i);
BlockValidationGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
/// Metrics for engine newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]

View File

@@ -19,7 +19,7 @@ use reth_chain_state::{
use reth_consensus::{Consensus, FullConsensus};
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated,
ForkchoiceStateTracker, OnForkChoiceUpdated,
};
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::ConfigureEvm;
@@ -27,9 +27,7 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
};
use reth_primitives_traits::{
FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
@@ -42,7 +40,7 @@ use reth_tasks::spawn_os_thread;
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc, time::Duration};
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
use crossbeam_channel::{Receiver, Sender};
use tokio::sync::{
@@ -323,7 +321,7 @@ where
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T> + WaitForCaches,
V: EngineValidator<T>,
{
/// Creates a new [`EngineApiTreeHandler`].
#[expect(clippy::too_many_arguments)]
@@ -1413,7 +1411,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
@@ -1555,94 +1553,6 @@ where
// handle the event if any
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload { payload, tx } => {
// Before processing the new payload, we wait for persistence and
// cache updates to complete. We do it in parallel, spawning
// persistence and cache update wait tasks with Tokio, so that we
// can get an unbiased breakdown on how long did every step take.
//
// If we first wait for persistence, and only then for cache
// updates, we will offset the cache update waits by the duration of
// persistence, which is incorrect.
debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
let pending_persistence = self.persistence_state.rx.take();
let persistence_rx = if let Some((rx, start_time, _action)) =
pending_persistence
{
let (persistence_tx, persistence_rx) =
std::sync::mpsc::channel();
tokio::task::spawn_blocking(move || {
let start = Instant::now();
let result =
rx.recv().expect("persistence state channel closed");
let _ = persistence_tx.send((
result,
start_time,
start.elapsed(),
));
});
Some(persistence_rx)
} else {
None
};
let cache_wait = self.payload_validator.wait_for_caches();
let persistence_wait = if let Some(persistence_rx) = persistence_rx
{
let (result, start_time, wait_duration) = persistence_rx
.recv()
.expect("persistence result channel closed");
let _ = self.on_persistence_complete(result, start_time);
Some(wait_duration)
} else {
None
};
debug!(
target: "engine::tree",
?persistence_wait,
execution_cache_wait = ?cache_wait.execution_cache,
sparse_trie_wait = ?cache_wait.sparse_trie,
"Persistence finished and caches updated for reth_newPayload"
);
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());
let timings = NewPayloadTimings {
latency,
persistence_wait,
execution_cache_wait: cache_wait.execution_cache,
sparse_trie_wait: cache_wait.sparse_trie,
};
if let Err(err) =
tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
}
}
}
@@ -2692,7 +2602,7 @@ where
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
fn insert_block_or_payload<Input, Err>(
&mut self,
block_id: BlockWithParent,
@@ -3136,23 +3046,3 @@ enum PersistTarget {
/// Persist all blocks up to and including the canonical head.
Head,
}
/// Result of waiting for caches to become available.
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheWaitDurations {
/// Time spent waiting for the execution cache lock.
pub execution_cache: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie: Duration,
}
/// Trait for types that can wait for caches to become available.
///
/// This is used by `reth_newPayload` endpoint to ensure that payload processing
/// waits for any ongoing operations to complete before starting.
pub trait WaitForCaches {
/// Waits for cache updates to complete.
///
/// Returns the time spent waiting for each cache separately.
fn wait_for_caches(&self) -> CacheWaitDurations;
}

View File

@@ -8,7 +8,7 @@ use crate::tree::{
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
StateProviderBuilder, TreeConfig,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
@@ -20,7 +20,6 @@ use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
use reth_engine_primitives::{SMALL_BLOCK_GAS_THRESHOLD, SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE};
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
@@ -28,13 +27,13 @@ use reth_evm::{
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::{ForEachOrdered, Runtime};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -44,13 +43,14 @@ use reth_trie_sparse::{
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
};
use std::{
collections::BTreeMap,
ops::Not,
sync::{
atomic::AtomicBool,
mpsc::{self, channel},
Arc,
},
time::Duration,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, warn, Span};
@@ -97,7 +97,6 @@ pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
@@ -133,6 +132,8 @@ where
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
@@ -171,6 +172,7 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
@@ -179,46 +181,6 @@ where
}
}
impl<Evm> WaitForCaches for PayloadProcessor<Evm>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
self.executor.spawn_blocking(move || {
let _ = execution_tx.send(execution_cache.wait_for_availability());
});
self.executor.spawn_blocking(move || {
let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
});
let execution_cache_duration =
execution_rx.recv().expect("execution cache wait task failed to send result");
let sparse_trie_duration =
sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
debug!(
target: "engine::tree::payload_processor",
?execution_cache_duration,
?sparse_trie_duration,
"Execution cache and sparse trie locks acquired"
);
CacheWaitDurations {
execution_cache: execution_cache_duration,
sparse_trie: sparse_trie_duration,
}
}
}
impl<N, Evm> PayloadProcessor<Evm>
where
N: NodePrimitives,
@@ -286,30 +248,48 @@ where
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let chunk_size = Self::adaptive_chunk_size(config, env.gas_used);
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
bal,
v2_proofs_enabled,
);
// Create and spawn the storage proof task.
// Capture parent_state_root before env is moved into spawn_caching_with
let parent_state_root = env.parent_state_root;
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
// The prewarm task converts the BAL to HashedPostState and sends it on
// to_multi_proof after slot prefetching completes.
self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
Some(bal),
v2_proofs_enabled,
)
} else {
// Normal path: spawn with transaction prewarming
self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
v2_proofs_enabled,
)
};
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle =
ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers, v2_proofs_enabled);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
chunk_size,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof.clone(),
)
@@ -344,7 +324,6 @@ where
from_multi_proof,
config,
parent_state_root,
chunk_size,
);
PayloadHandle {
@@ -384,10 +363,6 @@ where
}
}
/// Transaction count threshold below which proof workers are halved, since fewer transactions
/// produce fewer state changes and most workers would be idle overhead.
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
@@ -396,42 +371,22 @@ where
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Returns the multiproof chunk size adapted to the block's gas usage.
///
/// For blocks with ≤20M gas used, a smaller chunk size (30) yields better throughput.
/// For larger blocks, the configured default chunk size is used.
const fn adaptive_chunk_size(config: &TreeConfig, gas_used: u64) -> Option<usize> {
if !config.multiproof_chunking_enabled() {
return None;
}
let size = if gas_used > 0 && gas_used <= SMALL_BLOCK_GAS_THRESHOLD {
SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE
} else {
config.multiproof_chunk_size()
};
Some(size)
}
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
/// iteration with [`ForEachOrdered`] to recover signatures in parallel while streaming
/// results to execution in the original transaction order.
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
) {
let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
if transaction_count == 0 {
// Empty block — nothing to do.
@@ -452,49 +407,63 @@ where
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send((idx, tx.clone()));
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);
let _ = ooo_tx.send((idx, tx));
}
});
} else {
// Parallel path — recover signatures in parallel on rayon, stream results
// to execution in order via `for_each_ordered`.
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions
.into_par_iter()
.enumerate()
.map(|(idx, tx)| {
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
tx.map(|tx| {
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
// Send to prewarming out of order with the original index.
let _ = prewarm_tx.send((idx, tx.clone()));
tx
})
})
.for_each_ordered(|tx| {
let _ = execute_tx.send(tx);
});
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = ooo_rx.recv() {
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry()
&& *entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
}
} else {
queue.insert(idx, tx);
}
}
});
(prewarm_rx, execute_rx)
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some(), %v2_proofs_enabled)
)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -526,8 +495,10 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
self.prewarm_max_concurrency,
);
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
@@ -568,7 +539,6 @@ where
/// Spawns the [`SparseTrieTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
#[expect(clippy::too_many_arguments)]
fn spawn_sparse_trie_task(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
@@ -577,7 +547,6 @@ where
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
chunk_size: Option<usize>,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
@@ -585,6 +554,8 @@ where
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
let executor = self.executor.clone();
let parent_span = Span::current();
@@ -673,7 +644,7 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
let deferred = if let Some(state_root) = computed_state_root {
let start = Instant::now();
let start = std::time::Instant::now();
let (trie, deferred) = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
@@ -955,7 +926,7 @@ impl PayloadExecutionCache {
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let cache = self.inner.read();
let mut cache = self.inner.write();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
@@ -963,7 +934,7 @@ impl PayloadExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
if let Some(c) = cache.as_ref() {
if let Some(c) = cache.as_mut() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
@@ -984,13 +955,13 @@ impl PayloadExecutionCache {
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
// Fork block: clear and update the hash on the ORIGINAL before cloning.
// This prevents the canonical chain from matching on the stale hash
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -1001,12 +972,6 @@ impl PayloadExecutionCache {
None
}
/// Clears the tracked cache
#[expect(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
}
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
@@ -1078,9 +1043,6 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
/// Total gas used by all transactions in the block.
/// Used to adaptively select multiproof chunk size for optimal throughput.
pub gas_used: u64,
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
@@ -1097,7 +1059,6 @@ where
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
gas_used: 0,
withdrawals: None,
}
}
@@ -1182,10 +1143,18 @@ mod tests {
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
// When the parent hash doesn't match (fork block), the cache is cleared,
// hash updated on the original, and clone returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
drop(cache);
// The stored cache now has the fork block's parent hash.
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
let original = execution_cache.get_cache_for(hash);
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
}
#[test]
@@ -1409,4 +1378,61 @@ mod tests {
"State root mismatch: task={root_from_task}, base={root_from_regular}"
);
}
/// Tests the full prewarm lifecycle for a fork block:
///
/// 1. Cache is at canonical block 4.
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
/// 3. Prewarm populates the shared cache with fork-specific state.
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
/// data.
#[test]
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
let execution_cache = PayloadExecutionCache::default();
// Canonical chain at block 4.
let block4_hash = B256::from([4u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
let fork_parent = B256::from([2u8; 32]);
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
let prewarm_cache = prewarm_cache.unwrap();
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
let fork_addr = Address::from([0xBB; 20]);
let fork_key = B256::from([0xCC; 32]);
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
let during_prewarm = execution_cache.get_cache_for(block4_hash);
assert!(
during_prewarm.is_none(),
"cache must be unavailable while prewarm holds a reference"
);
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
drop(prewarm_cache);
// Canonical block 5 arrives (parent = block 4).
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
// clears the stale fork data, and returns a cache with hash = block4_hash.
let block5_cache = execution_cache.get_cache_for(block4_hash);
assert!(
block5_cache.is_some(),
"canonical chain must get cache after fork prewarm is dropped"
);
assert_eq!(
block5_cache.as_ref().unwrap().executed_block_hash(),
block4_hash,
"cache must carry the canonical parent hash, not the fork parent"
);
}
}

View File

@@ -8,7 +8,6 @@ use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as Cros
use derive_more::derive::Deref;
use metrics::{Gauge, Histogram};
use reth_metrics::Metrics;
use reth_primitives_traits::FastInstant as Instant;
use reth_provider::AccountReader;
use reth_revm::state::EvmState;
use reth_trie::{
@@ -26,7 +25,7 @@ use reth_trie_parallel::{
targets_v2::MultiProofTargetsV2,
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
/// Source of state changes, either from EVM execution or from a Block Access List.
@@ -772,11 +771,6 @@ impl MultiProofTask {
fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
// Remove already fetched proof targets to avoid redundant work.
targets.retain_difference(&self.fetched_proof_targets);
if targets.is_empty() {
return 0;
}
extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
// For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the
@@ -895,10 +889,6 @@ impl MultiProofTask {
state_updates += 1;
}
if not_fetched_state_update.is_empty() {
return state_updates;
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
@@ -1583,7 +1573,7 @@ mod tests {
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -2066,7 +2056,7 @@ mod tests {
panic!("Expected PrefetchProofs message");
};
assert!(proofs_requested >= 1);
assert_eq!(proofs_requested, 1);
}
/// Verifies that different message types arriving mid-batch are not lost and preserve order.

View File

@@ -3,7 +3,7 @@
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use std::{sync::Arc, time::Instant};
use std::sync::Arc;
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
@@ -28,27 +28,6 @@ impl SharedPreservedSparseTrie {
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.0.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie to become available"
);
}
elapsed
}
}
/// Guard that holds the lock on the preserved trie.

View File

@@ -14,7 +14,7 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal,
bal::{self, total_slots, BALSlotIter},
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -25,13 +25,12 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, StorageKey, B256};
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
StateReader,
@@ -39,18 +38,22 @@ use reth_provider::{
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
Arc,
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender},
Arc,
},
time::Instant,
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream, each paired with its block index.
Transactions(Receiver<(usize, Tx)>),
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
@@ -83,6 +86,8 @@ where
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -103,12 +108,13 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
trace!(
target: "engine::tree::payload_processor::prewarm",
prewarming_threads = executor.prewarming_pool().current_num_threads(),
max_concurrency,
transaction_count = ctx.env.transaction_count,
"Initialized prewarm task"
);
@@ -118,6 +124,7 @@ where
executor,
execution_cache,
ctx,
max_concurrency,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
@@ -133,7 +140,7 @@ where
/// subsequent transactions in the block.
fn spawn_all<Tx>(
&self,
pending: mpsc::Receiver<(usize, Tx)>,
pending: mpsc::Receiver<Tx>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
) where
@@ -141,28 +148,30 @@ where
{
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let span = Span::current();
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
let pool_threads = executor.prewarming_pool().current_num_threads();
// Don't spawn more workers than transactions. When transaction_count is 0
// (unknown), use all pool threads.
let workers_needed = if ctx.env.transaction_count > 0 {
ctx.env.transaction_count.min(pool_threads)
} else {
pool_threads
};
let (done_tx, done_rx) = mpsc::channel();
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
max_concurrency
} else {
transaction_count.min(max_concurrency)
};
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_count = 0usize;
while let Ok((tx_index, tx)) = pending.recv() {
let mut tx_index = 0usize;
while let Ok(tx) = pending.recv() {
// Stop distributing if termination was requested
if ctx.terminate_execution.load(Ordering::Relaxed) {
trace!(
@@ -179,7 +188,7 @@ where
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
tx_count += 1;
tx_index += 1;
}
// Send withdrawal prefetch targets after all transactions have been distributed
@@ -198,7 +207,7 @@ where
while done_rx.recv().is_ok() {}
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
});
}
@@ -265,8 +274,10 @@ where
}
}
/// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch
/// accounts and storage slots.
/// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
///
/// Divides the total slots across `max_concurrency` workers, each responsible for
/// prefetching a range of slots from the BAL.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
@@ -285,35 +296,59 @@ where
return;
}
if bal.is_empty() {
let total_slots = total_slots(&bal);
trace!(
target: "engine::tree::payload_processor::prewarm",
total_slots,
max_concurrency = self.max_concurrency,
"Starting BAL prewarm"
);
if total_slots == 0 {
self.send_bal_hashed_state(&bal);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
}
trace!(
target: "engine::tree::payload_processor::prewarm",
accounts = bal.len(),
"Starting BAL prewarm"
);
let (done_tx, done_rx) = mpsc::channel();
let ctx = self.ctx.clone();
self.executor.prewarming_pool().install(|| {
bal.par_iter().for_each_init(
|| (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
|(ctx, provider), account| {
if ctx.terminate_execution.load(Ordering::Relaxed) {
return;
}
ctx.prefetch_bal_account(provider, account);
},
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
// Spawn workers with their assigned ranges
for i in 0..workers_needed {
let start = i * slots_per_worker + i.min(remainder);
let extra = if i < remainder { 1 } else { 0 };
let end = start + slots_per_worker + extra;
self.ctx.spawn_bal_worker(
i,
&self.executor,
Arc::clone(&bal),
start..end,
done_tx.clone(),
);
});
}
// Drop our handle to done_tx so we can detect completion
drop(done_tx);
// Wait for all workers to complete
let mut completed_workers = 0;
while done_rx.recv().is_ok() {
completed_workers += 1;
}
trace!(
target: "engine::tree::payload_processor::prewarm",
"All BAL prewarm accounts completed"
completed_workers,
"All BAL prewarm workers completed"
);
// Convert BAL to HashedPostState and send to multiproof task
@@ -550,7 +585,7 @@ where
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) where
Tx: ExecutableTxFor<Evm>,
{
@@ -625,7 +660,7 @@ where
workers_needed: usize,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
@@ -633,65 +668,115 @@ where
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: &span, "prewarm worker", idx);
task_executor.prewarming_pool().spawn(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
});
tx_sender
}
/// Prefetches a single account and all its storage slots from the BAL into the cache.
/// Spawns a worker task for BAL slot prefetching.
///
/// The `provider` is lazily initialized on first call and reused across accounts on the same
/// thread.
fn prefetch_bal_account(
/// The worker iterates over the specified range of slots in the BAL and ensures
/// each slot is loaded into the cache by accessing it through the state provider.
fn spawn_bal_worker(
&self,
provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
account: &alloy_eip7928::AccountChanges,
idx: usize,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
) {
let state_provider = match provider {
Some(p) => p,
slot @ None => {
let built = match self.provider.build() {
Ok(p) => p,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
return;
}
};
let saved_cache =
self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
let ctx = self.clone();
let span = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"bal prewarm worker",
idx,
range_start = range.start,
range_end = range.end
);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.prefetch_bal_slots(bal, range, done_tx);
});
}
/// Prefetches storage slots from a BAL range into the cache.
///
/// This iterates through the specified range of slots and accesses them via the state
/// provider to populate the cache.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn prefetch_bal_slots(
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;
// Build state provider
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
let _ = done_tx.send(());
return;
}
};
// Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
let start = Instant::now();
let _ = state_provider.basic_account(&account.address);
// Track last seen address to avoid fetching the same account multiple times.
let mut last_address = None;
for slot in &account.storage_changes {
let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
}
for &slot in &account.storage_reads {
let _ = state_provider.storage(account.address, StorageKey::from(slot));
// Iterate through the assigned range of slots
for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
// Fetch the account if this is a different address than the last one
if last_address != Some(address) {
let _ = state_provider.basic_account(&address);
last_address = Some(address);
}
// Access the slot to populate the cache
let _ = state_provider.storage(address, slot);
}
self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
let elapsed = start.elapsed();
trace!(
target: "engine::tree::payload_processor::prewarm",
?range,
elapsed_ms = elapsed.as_millis(),
"BAL prewarm worker completed"
);
// Signal completion
let _ = done_tx.send(());
metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
}
}

View File

@@ -11,7 +11,7 @@ use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
@@ -32,7 +32,10 @@ use reth_trie_sparse::{
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
use std::{sync::mpsc, time::Duration};
use std::{
sync::mpsc,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, error, instrument, trace};
#[expect(clippy::large_enum_variant)]
@@ -590,24 +593,18 @@ where
self.process_leaf_updates(true)?;
for (address, mut new) in self.new_storage_updates.drain() {
match self.storage_updates.entry(address) {
Entry::Vacant(entry) => {
entry.insert(new); // insert the whole map at once, no per-slot loop
}
Entry::Occupied(mut entry) => {
let updates = entry.get_mut();
for (slot, new) in new.drain() {
match updates.entry(slot) {
Entry::Occupied(mut slot_entry) => {
if new.is_changed() {
slot_entry.insert(new);
}
}
Entry::Vacant(slot_entry) => {
slot_entry.insert(new);
}
let updates = self.storage_updates.entry(address).or_default();
for (slot, new) in new.drain() {
match updates.entry(slot) {
Entry::Occupied(mut entry) => {
// Only overwrite existing entries with new values
if new.is_changed() {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
}
}

View File

@@ -7,8 +7,8 @@ use crate::tree::{
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
@@ -31,8 +31,8 @@ use reth_payload_primitives::{
BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
};
use reth_primitives_traits::{
AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
AlloyBlockHeader, BlockBody, BlockTy, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock,
SealedHeader, SignerRecoverable,
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
@@ -49,6 +49,7 @@ use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
sync::{mpsc::RecvTimeoutError, Arc},
time::Instant,
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
@@ -396,7 +397,6 @@ where
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
};
@@ -597,8 +597,6 @@ where
};
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
self.metrics
.record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
// ensure state root matches
@@ -767,7 +765,6 @@ where
let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, result_rx))
@@ -1145,7 +1142,7 @@ where
level = "debug",
target = "engine::tree::payload_validator",
skip_all,
fields(?strategy)
fields(strategy)
)]
fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
&mut self,
@@ -1585,15 +1582,6 @@ where
}
}
impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
self.payload_processor.wait_for_caches()
}
}
/// Enum representing either block or payload being validated.
#[derive(Debug)]
pub enum BlockOrPayload<T: PayloadTypes> {
@@ -1671,15 +1659,4 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
}
}
/// Returns the total gas used by the block.
pub fn gas_used(&self) -> u64
where
T::ExecutionData: ExecutionPayload,
{
match self {
Self::Payload(payload) => payload.gas_used(),
Self::Block(block) => block.gas_used(),
}
}
}

View File

@@ -23,7 +23,7 @@
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use reth_primitives_traits::FastInstant as Instant;
use std::time::Instant;
use tracing::trace;
/// The state of the persistence task.

View File

@@ -77,8 +77,7 @@ impl EngineMessageStore {
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, .. } |
BeaconEngineMessage::RethNewPayload { payload, .. } => {
BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),

View File

@@ -425,9 +425,17 @@ impl TotalDifficulty {
/// Convert to an [`Entry`]
pub fn to_entry(&self) -> Entry {
// era1 spec: `total-difficulty = { type: 0x0600, data: SSZ uint256 }` (little-endian)
let data = self.value.to_le_bytes::<32>().to_vec();
Entry::new(TOTAL_DIFFICULTY, data)
let mut data = [0u8; 32];
let be_bytes = self.value.to_be_bytes_vec();
if be_bytes.len() <= 32 {
data[32 - be_bytes.len()..].copy_from_slice(&be_bytes);
} else {
data.copy_from_slice(&be_bytes[be_bytes.len() - 32..]);
}
Entry::new(TOTAL_DIFFICULTY, data.to_vec())
}
/// Create from an [`Entry`]
@@ -446,8 +454,8 @@ impl TotalDifficulty {
)));
}
// era1 spec: `total-difficulty = { type: 0x0600, data: SSZ uint256 }` (little-endian)
let value = U256::from_le_slice(&entry.data);
// Convert 32-byte array to U256
let value = U256::from_be_slice(&entry.data);
Ok(Self { value })
}
@@ -600,19 +608,6 @@ mod tests {
assert_eq!(recovered.value, value);
}
#[test]
fn test_total_difficulty_ssz_le_encoding() {
// Verify that total-difficulty is encoded as SSZ uint256 (little-endian).
// See https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md
let value = U256::from(1u64);
let td = TotalDifficulty::new(value);
let entry = td.to_entry();
// Little-endian: least significant byte first [1, 0, 0, ..., 0]
assert_eq!(entry.data[0], 1, "First byte must be 1 (little-endian)");
assert_eq!(entry.data[31], 0, "Last byte must be 0 (little-endian)");
}
#[test]
fn test_compression_roundtrip() {
let rlp_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

View File

@@ -158,7 +158,6 @@ where
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
prewarming_threads: command.engine.prewarming_threads,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(

View File

@@ -53,7 +53,9 @@ impl<
<<Self::BuiltPayload as BuiltPayload>::Primitives as NodePrimitives>::Block,
>,
) -> Self::ExecutionData {
T::block_to_payload(block)
let (payload, sidecar) =
ExecutionPayload::from_block_unchecked(block.hash(), &block.into_block());
ExecutionData { payload, sidecar }
}
}

View File

@@ -285,7 +285,7 @@ where
Arc::new(ctx.node.consensus().clone()),
ctx.node.evm_config().clone(),
ctx.config.rpc.flashbots_config(),
ctx.node.task_executor().clone(),
Box::new(ctx.node.task_executor().clone()),
Arc::new(EthereumEngineValidator::new(ctx.config.chain.clone())),
);

View File

@@ -214,7 +214,7 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
TransactionTestContext::validate_sidecar(envelope);
// build last Prague payload
node.payload.timestamp = current_timestamp + 1;
node.payload.timestamp = current_timestamp + 11;
let prague_payload = node.new_payload().await?;
assert!(matches!(prague_payload.sidecars(), BlobSidecars::Eip4844(_)));
@@ -227,7 +227,7 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
// validate sidecar
TransactionTestContext::validate_sidecar(envelope);
tokio::time::sleep(Duration::from_secs(6)).await;
tokio::time::sleep(Duration::from_secs(11)).await;
// fetch second blob tx from rpc again
let envelope = node.rpc.envelope_by_hash(blob_tx_hash).await?;

View File

@@ -282,7 +282,6 @@ async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.prague_activated()
.build(),
),
false,

View File

@@ -90,8 +90,8 @@ async fn test_fee_history() -> eyre::Result<()> {
assert_eq!(block.header.gas_used, receipt.gas_used,);
assert_eq!(block.header.base_fee_per_gas.unwrap(), expected_first_base_fee as u64);
for _ in 0..20 {
let _ = GasWaster::deploy_builder(&provider, U256::from(rng.random_range(0..100)))
for _ in 0..100 {
let _ = GasWaster::deploy_builder(&provider, U256::from(rng.random_range(0..1000)))
.send()
.await?;
@@ -100,7 +100,7 @@ async fn test_fee_history() -> eyre::Result<()> {
let latest_block = provider.get_block_number().await?;
for _ in 0..20 {
for _ in 0..100 {
let latest_block = rng.random_range(0..=latest_block);
let block_count = rng.random_range(1..=(latest_block + 1));

View File

@@ -2,7 +2,8 @@
use alloy_consensus::BlockHeader;
use metrics::{Counter, Gauge, Histogram};
use reth_metrics::Metrics;
use reth_primitives_traits::{Block, FastInstant as Instant, RecoveredBlock};
use reth_primitives_traits::{Block, RecoveredBlock};
use std::time::Instant;
/// Executor metrics.
#[derive(Metrics, Clone)]

View File

@@ -1,6 +1,6 @@
use alloy_primitives::{Address, B256, U256};
use reth_primitives_traits::{Account, Bytecode};
use revm::database::{states::BundleState, BundleAccount};
use revm::database::BundleState;
pub use alloy_evm::block::BlockExecutionResult;
@@ -37,11 +37,6 @@ impl<T> BlockExecutionOutput<T> {
self.state.account(address).map(|a| a.info.as_ref().map(Into::into))
}
/// Returns the state [`BundleAccount`] for the given address.
pub fn account_state(&self, address: &Address) -> Option<&BundleAccount> {
self.state.account(address)
}
/// Get storage if value is known.
///
/// This means that depending on status we can potentially return `U256::ZERO`.

View File

@@ -10,7 +10,6 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_stages_api::ExecutionStageThresholds;
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
fmt::Debug,
pin::Pin,
sync::Arc,
@@ -287,9 +286,6 @@ where
backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
/// Custom thresholds for the backfill job, if set.
backfill_thresholds: Option<ExecutionStageThresholds>,
/// Notifications that arrived during backfill and need to be delivered after it completes.
/// These are notifications for blocks beyond the backfill range that we must not drop.
pending_notifications: VecDeque<ExExNotification<E::Primitives>>,
}
impl<P, E> ExExNotificationsWithHead<P, E>
@@ -316,7 +312,6 @@ where
pending_check_backfill: true,
backfill_job: None,
backfill_thresholds: None,
pending_notifications: VecDeque::new(),
}
}
@@ -453,34 +448,6 @@ where
// 3. If backfill is in progress yield new notifications
if let Some(backfill_job) = &mut this.backfill_job {
debug!(target: "exex::notifications", "Polling backfill job");
// Drain the notification channel to prevent backpressure from stalling the
// ExExManager. During backfill, the ExEx is not consuming from the channel,
// so the capacity-1 channel fills up, which blocks the manager's PollSender,
// which fills the manager's 1024-entry buffer, which blocks all upstream
// senders. Notifications for blocks covered by the backfill range are
// discarded (they'll be re-delivered by the backfill job), while
// notifications beyond the backfill range are buffered for delivery after the
// backfill completes.
while let Poll::Ready(Some(notification)) = this.notifications.poll_recv(cx) {
// Always buffer revert-containing notifications (ChainReverted,
// ChainReorged) because the backfill job only re-delivers
// ChainCommitted from the database. Discarding a reorg here would
// leave the ExEx unaware of the fork switch.
if notification.reverted_chain().is_some() {
this.pending_notifications.push_back(notification);
continue;
}
if let Some(committed) = notification.committed_chain() &&
committed.tip().number() <= this.initial_local_head.number
{
// Covered by backfill range, safe to discard
continue;
}
// Beyond the backfill range — buffer for delivery after backfill
this.pending_notifications.push_back(notification);
}
if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
@@ -492,18 +459,13 @@ where
this.backfill_job = None;
}
// 4. Deliver any notifications that were buffered during backfill
if let Some(notification) = this.pending_notifications.pop_front() {
return Poll::Ready(Some(Ok(notification)))
}
// 5. Otherwise advance the regular event stream
// 4. Otherwise advance the regular event stream
loop {
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
// 6. In case the exex is ahead of the new tip, we must skip it
// 5. In case the exex is ahead of the new tip, we must skip it
if let Some(committed) = notification.committed_chain() {
// inclusive check because we should start with `exex.head + 1`
if this.initial_exex_head.block.number >= committed.tip().number() {
@@ -827,135 +789,4 @@ mod tests {
Ok(())
}
/// Regression test for <https://github.com/paradigmxyz/reth/issues/19665>.
///
/// During backfill, `poll_next` must drain the notification channel so that
/// the upstream `ExExManager` is never blocked by a full channel. Without
/// the drain loop the capacity-1 channel stays full for the entire backfill
/// duration, which stalls the manager's `PollSender` and eventually blocks
/// all upstream senders once the 1024-entry buffer fills up.
///
/// The key assertion is the `try_send` after the first `poll_next`: it
/// proves the channel was drained during the backfill poll. Without the
/// fix this `try_send` fails because the notification is still sitting in
/// the channel.
#[tokio::test]
async fn exex_notifications_backfill_drains_channel() -> eyre::Result<()> {
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider::new(provider_factory.clone())?;
// Insert block 1 into the DB so there's something to backfill
let node_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
)
.try_recover()?;
let node_head = node_head_block.num_hash();
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(&node_head_block)?;
provider_rw.commit()?;
// ExEx head is at genesis — backfill will run for block 1
let exex_head =
ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
// Notification for a block AFTER the backfill range (block 2).
let post_backfill_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
// Another notification (block 3) used to probe channel capacity.
let probe_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 2,
BlockParams { parent: None, ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
// Fill the capacity-1 channel.
notifications_tx.send(post_backfill_notification.clone()).await?;
// Confirm the channel is full — this is the precondition that causes the
// stall in production: the ExExManager's PollSender would block here.
assert!(
notifications_tx.try_send(probe_notification.clone()).is_err(),
"channel should be full before backfill poll"
);
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthEvmConfig::mainnet(),
notifications_rx,
wal.handle(),
)
.with_head(exex_head);
// Poll once — this returns the backfill result for block 1. Crucially,
// the drain loop in poll_next runs in this same call, consuming the
// notification from the channel and buffering it.
let backfill_result = notifications.next().await.transpose()?;
assert_eq!(
backfill_result,
Some(ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(
notifications.evm_config.clone(),
notifications.provider.clone()
)
.backfill(1..=1)
.next()
.ok_or_eyre("failed to backfill")??
)
})
);
// KEY ASSERTION: the channel was drained during the backfill poll above.
// Without the drain loop this try_send fails because the original
// notification is still occupying the capacity-1 channel.
assert!(
notifications_tx.try_send(probe_notification.clone()).is_ok(),
"channel should have been drained during backfill poll"
);
// The first buffered notification (block 2) was drained from the channel
// during backfill and is delivered now.
let buffered = notifications.next().await.transpose()?;
assert_eq!(buffered, Some(post_backfill_notification));
// The probe notification (block 3) that we just sent is delivered next.
let probe = notifications.next().await.transpose()?;
assert_eq!(probe, Some(probe_notification));
Ok(())
}
}

View File

@@ -16,7 +16,7 @@ use reth_network_p2p::{
};
use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader};
use reth_storage_api::HeaderProvider;
use reth_tasks::Runtime;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
@@ -285,9 +285,17 @@ where
C: BodiesClient<Body = B::Body> + 'static,
Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
{
/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given [`Runtime`].
pub fn into_task_with(self, runtime: &Runtime) -> TaskDownloader<B> {
TaskDownloader::spawn_with(self, runtime)
/// Spawns the downloader task via [`tokio::task::spawn`]
pub fn into_task(self) -> TaskDownloader<B> {
self.into_task_with(&TokioTaskExecutor::default())
}
/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner.
pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<B>
where
S: TaskSpawner,
{
TaskDownloader::spawn_with(self, spawner)
}
}

View File

@@ -1,13 +1,13 @@
use alloy_primitives::BlockNumber;
use futures::Stream;
use futures_util::StreamExt;
use futures_util::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_network_p2p::{
bodies::downloader::{BodyDownloader, BodyDownloaderResult},
error::DownloadResult,
};
use reth_primitives_traits::Block;
use reth_tasks::Runtime;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
@@ -32,11 +32,50 @@ pub struct TaskDownloader<B: Block> {
}
impl<B: Block + 'static> TaskDownloader<B> {
/// Spawns the given `downloader` via the given [`Runtime`] and returns a [`TaskDownloader`]
/// that's connected to that task.
pub fn spawn_with<T>(downloader: T, runtime: &Runtime) -> Self
/// Spawns the given `downloader` via [`tokio::task::spawn`] returns a [`TaskDownloader`] that's
/// connected to that task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime
///
/// # Example
///
/// ```
/// use reth_consensus::Consensus;
/// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader};
/// use reth_network_p2p::bodies::client::BodiesClient;
/// use reth_primitives_traits::{Block, InMemorySize};
/// use reth_storage_api::HeaderProvider;
/// use std::{fmt::Debug, sync::Arc};
///
/// fn t<
/// B: Block + 'static,
/// C: BodiesClient<Body = B::Body> + 'static,
/// Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
/// >(
/// client: Arc<C>,
/// consensus: Arc<dyn Consensus<B>>,
/// provider: Provider,
/// ) {
/// let downloader =
/// BodiesDownloaderBuilder::default().build::<B, _, _>(client, consensus, provider);
/// let downloader = TaskDownloader::spawn(downloader);
/// }
/// ```
pub fn spawn<T>(downloader: T) -> Self
where
T: BodyDownloader<Block = B> + 'static,
{
Self::spawn_with(downloader, &TokioTaskExecutor::default())
}
/// Spawns the given `downloader` via the given [`TaskSpawner`] returns a [`TaskDownloader`]
/// that's connected to that task.
pub fn spawn_with<T, S>(downloader: T, spawner: &S) -> Self
where
T: BodyDownloader<Block = B> + 'static,
S: TaskSpawner,
{
let (bodies_tx, bodies_rx) = mpsc::channel(BODIES_TASK_BUFFER_SIZE);
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
@@ -47,7 +86,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
downloader,
};
runtime.spawn_task(downloader);
spawner.spawn_task(downloader.boxed());
Self { from_downloader: ReceiverStream::new(bodies_rx), to_downloader }
}
@@ -162,8 +201,7 @@ mod tests {
Arc::new(TestConsensus::default()),
factory,
);
let runtime = Runtime::test();
let mut downloader = TaskDownloader::spawn_with(downloader, &runtime);
let mut downloader = TaskDownloader::spawn(downloader);
downloader.set_download_range(0..=19).expect("failed to set download range");
@@ -186,8 +224,7 @@ mod tests {
Arc::new(TestConsensus::default()),
factory,
);
let runtime = Runtime::test();
let mut downloader = TaskDownloader::spawn_with(downloader, &runtime);
let mut downloader = TaskDownloader::spawn(downloader);
downloader.set_download_range(1..=0).expect("failed to set download range");
assert_matches!(downloader.next().await, Some(Err(DownloadError::InvalidBodyRange { .. })));

View File

@@ -21,7 +21,7 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_tasks::Runtime;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
@@ -660,12 +660,20 @@ where
H: HeadersClient,
Self: HeaderDownloader + 'static,
{
/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given [`Runtime`].
pub fn into_task_with(
/// Spawns the downloader task via [`tokio::task::spawn`]
pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
self.into_task_with(&TokioTaskExecutor::default())
}
/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`.
pub fn into_task_with<S>(
self,
runtime: &Runtime,
) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
TaskDownloader::spawn_with(self, runtime)
spawner: &S,
) -> TaskDownloader<<Self as HeaderDownloader>::Header>
where
S: TaskSpawner,
{
TaskDownloader::spawn_with(self, spawner)
}
}

View File

@@ -1,5 +1,5 @@
use alloy_primitives::Sealable;
use futures::Stream;
use futures::{FutureExt, Stream};
use futures_util::StreamExt;
use pin_project::pin_project;
use reth_network_p2p::headers::{
@@ -7,7 +7,7 @@ use reth_network_p2p::headers::{
error::HeadersDownloaderResult,
};
use reth_primitives_traits::SealedHeader;
use reth_tasks::Runtime;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
@@ -33,11 +33,42 @@ pub struct TaskDownloader<H: Sealable> {
// === impl TaskDownloader ===
impl<H: Sealable + Send + Sync + Unpin + 'static> TaskDownloader<H> {
/// Spawns the given `downloader` via the given [`Runtime`] and returns a [`TaskDownloader`]
/// Spawns the given `downloader` via [`tokio::task::spawn`] and returns a [`TaskDownloader`]
/// that's connected to that task.
pub fn spawn_with<T>(downloader: T, runtime: &Runtime) -> Self
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime
///
/// # Example
///
/// ```
/// # use std::sync::Arc;
/// # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloader;
/// # use reth_downloaders::headers::task::TaskDownloader;
/// # use reth_consensus::HeaderValidator;
/// # use reth_network_p2p::headers::client::HeadersClient;
/// # use reth_primitives_traits::BlockHeader;
/// # fn t<H: HeadersClient<Header: BlockHeader> + 'static>(consensus:Arc<dyn HeaderValidator<H::Header>>, client: Arc<H>) {
/// let downloader = ReverseHeadersDownloader::<H>::builder().build(
/// client,
/// consensus
/// );
/// let downloader = TaskDownloader::spawn(downloader);
/// # }
pub fn spawn<T>(downloader: T) -> Self
where
T: HeaderDownloader<Header = H> + 'static,
{
Self::spawn_with(downloader, &TokioTaskExecutor::default())
}
/// Spawns the given `downloader` via the given [`TaskSpawner`] returns a [`TaskDownloader`]
/// that's connected to that task.
pub fn spawn_with<T, S>(downloader: T, spawner: &S) -> Self
where
T: HeaderDownloader<Header = H> + 'static,
S: TaskSpawner,
{
let (headers_tx, headers_rx) = mpsc::channel(HEADERS_TASK_BUFFER_SIZE);
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
@@ -47,7 +78,7 @@ impl<H: Sealable + Send + Sync + Unpin + 'static> TaskDownloader<H> {
updates: UnboundedReceiverStream::new(updates_rx),
downloader,
};
runtime.spawn_task(downloader);
spawner.spawn_task(downloader.boxed());
Self { from_downloader: ReceiverStream::new(headers_rx), to_downloader }
}
@@ -178,8 +209,7 @@ mod tests {
.request_limit(1)
.build(Arc::clone(&client), Arc::new(TestConsensus::default()));
let runtime = Runtime::test();
let mut downloader = TaskDownloader::spawn_with(downloader, &runtime);
let mut downloader = TaskDownloader::spawn(downloader);
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));

View File

@@ -1,27 +0,0 @@
//! Implements the `GetBlockAccessLists` and `BlockAccessLists` message types.
use alloc::vec::Vec;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use reth_codecs_derive::add_arbitrary_tests;
/// A request for block access lists from the given block hashes.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct GetBlockAccessLists(
/// The block hashes to request block access lists for.
pub Vec<B256>,
);
/// Response for [`GetBlockAccessLists`] containing one BAL per requested block hash.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct BlockAccessLists(
/// The requested block access lists as opaque bytes. Unavailable entries are represented by
/// empty byte slices.
pub Vec<Bytes>,
);

View File

@@ -169,10 +169,7 @@ impl NewPooledTransactionHashes {
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
}
Self::Eth68(_) => {
matches!(
version,
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
)
matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70)
}
}
}

View File

@@ -110,11 +110,6 @@ impl Capability {
Self::eth(EthVersion::Eth70)
}
/// Returns the [`EthVersion::Eth71`] capability.
pub const fn eth_71() -> Self {
Self::eth(EthVersion::Eth71)
}
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
@@ -145,12 +140,6 @@ impl Capability {
self.name == "eth" && self.version == 70
}
/// Whether this is eth v71.
#[inline]
pub fn is_eth_v71(&self) -> bool {
self.name == "eth" && self.version == 71
}
/// Whether this is any eth version.
#[inline]
pub fn is_eth(&self) -> bool {
@@ -158,8 +147,7 @@ impl Capability {
self.is_eth_v67() ||
self.is_eth_v68() ||
self.is_eth_v69() ||
self.is_eth_v70() ||
self.is_eth_v71()
self.is_eth_v70()
}
}
@@ -179,7 +167,7 @@ impl From<EthVersion> for Capability {
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for Capability {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let version = u.int_in_range(66..=71)?; // Valid eth protocol versions are 66-71
let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70
// Only generate valid eth protocol name for now since it's the only supported protocol
Ok(Self::new_static("eth", version))
}
@@ -195,7 +183,6 @@ pub struct Capabilities {
eth_68: bool,
eth_69: bool,
eth_70: bool,
eth_71: bool,
}
impl Capabilities {
@@ -207,7 +194,6 @@ impl Capabilities {
eth_68: value.iter().any(Capability::is_eth_v68),
eth_69: value.iter().any(Capability::is_eth_v69),
eth_70: value.iter().any(Capability::is_eth_v70),
eth_71: value.iter().any(Capability::is_eth_v71),
inner: value,
}
}
@@ -226,7 +212,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub const fn supports_eth(&self) -> bool {
self.eth_71 || self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
@@ -258,12 +244,6 @@ impl Capabilities {
pub const fn supports_eth_v70(&self) -> bool {
self.eth_70
}
/// Whether this peer supports eth v71 protocol.
#[inline]
pub const fn supports_eth_v71(&self) -> bool {
self.eth_71
}
}
impl From<Vec<Capability>> for Capabilities {
@@ -288,7 +268,6 @@ impl Decodable for Capabilities {
eth_68: inner.iter().any(Capability::is_eth_v68),
eth_69: inner.iter().any(Capability::is_eth_v69),
eth_70: inner.iter().any(Capability::is_eth_v70),
eth_71: inner.iter().any(Capability::is_eth_v71),
inner,
})
}

View File

@@ -38,9 +38,6 @@ pub use state::*;
pub mod receipts;
pub use receipts::*;
pub mod block_access_lists;
pub use block_access_lists::*;
pub mod disconnect_reason;
pub use disconnect_reason::*;

View File

@@ -1,4 +1,4 @@
//! Implements Ethereum wire protocol for versions 66 through 71.
//! Implements Ethereum wire protocol for versions 66 through 70.
//! Defines structs/enums for messages, request-response pairs, and broadcasts.
//! Handles compatibility with [`EthVersion`].
//!
@@ -7,10 +7,10 @@
//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
use super::{
broadcast::NewBlockHashes, BlockAccessLists, BlockBodies, BlockHeaders, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData,
PooledTransactions, Receipts, Status, StatusEth69, Transactions,
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
Transactions,
};
use crate::{
status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
@@ -168,32 +168,6 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
}
EthMessageID::GetBlockAccessLists => {
if version < EthVersion::Eth71 {
// Beyond the max ID for this version — treat as raw capability message
// (e.g. a snap protocol message in the multiplexed ID space).
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
EthMessage::Other(RawCapabilityMessage::new(
message_type.to_u8() as usize,
raw_payload.into(),
))
} else {
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
}
}
EthMessageID::BlockAccessLists => {
if version < EthVersion::Eth71 {
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
EthMessage::Other(RawCapabilityMessage::new(
message_type.to_u8() as usize,
raw_payload.into(),
))
} else {
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
}
}
EthMessageID::Other(_) => {
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
@@ -276,8 +250,6 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
///
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
/// requests/responses.
///
/// The `eth/71` draft extends eth/70 with block access list request/response messages.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
@@ -338,8 +310,6 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
/// a [`RequestPair`], but with a custom inline encoding.
GetReceipts70(RequestPair<GetReceipts70>),
/// Represents a `GetBlockAccessLists` request-response pair for eth/71.
GetBlockAccessLists(RequestPair<GetBlockAccessLists>),
/// Represents a Receipts request-response pair.
#[cfg_attr(
feature = "serde",
@@ -362,8 +332,6 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// request id. The type still wraps a [`RequestPair`], but with a custom
/// inline encoding.
Receipts70(RequestPair<Receipts70<N::Receipt>>),
/// Represents a `BlockAccessLists` request-response pair for eth/71.
BlockAccessLists(RequestPair<BlockAccessLists>),
/// Represents a `BlockRangeUpdate` message broadcast to the network.
#[cfg_attr(
feature = "serde",
@@ -396,8 +364,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
Self::GetBlockAccessLists(_) => EthMessageID::GetBlockAccessLists,
Self::BlockAccessLists(_) => EthMessageID::BlockAccessLists,
Self::Other(msg) => EthMessageID::Other(msg.id as u8),
}
}
@@ -410,7 +376,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetBlockHeaders(_) |
Self::GetReceipts(_) |
Self::GetReceipts70(_) |
Self::GetBlockAccessLists(_) |
Self::GetPooledTransactions(_) |
Self::GetNodeData(_)
)
@@ -424,7 +389,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::Receipts(_) |
Self::Receipts69(_) |
Self::Receipts70(_) |
Self::BlockAccessLists(_) |
Self::BlockHeaders(_) |
Self::BlockBodies(_) |
Self::NodeData(_)
@@ -479,11 +443,9 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.encode(out),
Self::GetReceipts(request) => request.encode(out),
Self::GetReceipts70(request) => request.encode(out),
Self::GetBlockAccessLists(request) => request.encode(out),
Self::Receipts(receipts) => receipts.encode(out),
Self::Receipts69(receipt69) => receipt69.encode(out),
Self::Receipts70(receipt70) => receipt70.encode(out),
Self::BlockAccessLists(block_access_lists) => block_access_lists.encode(out),
Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
Self::Other(unknown) => out.put_slice(&unknown.payload),
}
@@ -506,11 +468,9 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.length(),
Self::GetReceipts(request) => request.length(),
Self::GetReceipts70(request) => request.length(),
Self::GetBlockAccessLists(request) => request.length(),
Self::Receipts(receipts) => receipts.length(),
Self::Receipts69(receipt69) => receipt69.length(),
Self::Receipts70(receipt70) => receipt70.length(),
Self::BlockAccessLists(block_access_lists) => block_access_lists.length(),
Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
Self::Other(unknown) => unknown.length(),
}
@@ -599,14 +559,6 @@ pub enum EthMessageID {
///
/// Introduced in Eth69
BlockRangeUpdate = 0x11,
/// Requests block access lists.
///
/// Introduced in Eth71
GetBlockAccessLists = 0x12,
/// Represents block access lists.
///
/// Introduced in Eth71
BlockAccessLists = 0x13,
/// Represents unknown message types.
Other(u8),
}
@@ -631,17 +583,13 @@ impl EthMessageID {
Self::GetReceipts => 0x0f,
Self::Receipts => 0x10,
Self::BlockRangeUpdate => 0x11,
Self::GetBlockAccessLists => 0x12,
Self::BlockAccessLists => 0x13,
Self::Other(value) => *value, // Return the stored `u8`
}
}
/// Returns the max value for the given version.
pub const fn max(version: EthVersion) -> u8 {
if version.is_eth71() {
Self::BlockAccessLists.to_u8()
} else if version.is_eth69_or_newer() {
if version as u8 >= EthVersion::Eth69 as u8 {
Self::BlockRangeUpdate.to_u8()
} else {
Self::Receipts.to_u8()
@@ -686,8 +634,6 @@ impl Decodable for EthMessageID {
0x0f => Self::GetReceipts,
0x10 => Self::Receipts,
0x11 => Self::BlockRangeUpdate,
0x12 => Self::GetBlockAccessLists,
0x13 => Self::BlockAccessLists,
unknown => Self::Other(*unknown),
};
buf.advance(1);
@@ -716,8 +662,6 @@ impl TryFrom<usize> for EthMessageID {
0x0f => Ok(Self::GetReceipts),
0x10 => Ok(Self::Receipts),
0x11 => Ok(Self::BlockRangeUpdate),
0x12 => Ok(Self::GetBlockAccessLists),
0x13 => Ok(Self::BlockAccessLists),
_ => Err("Invalid message ID"),
}
}
@@ -798,9 +742,8 @@ where
mod tests {
use super::MessageError;
use crate::{
message::RequestPair, BlockAccessLists, EthMessage, EthMessageID, EthNetworkPrimitives,
EthVersion, GetBlockAccessLists, GetNodeData, NodeData, ProtocolMessage,
RawCapabilityMessage,
message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
};
use alloy_primitives::hex;
use alloy_rlp::{Decodable, Encodable, Error};
@@ -841,57 +784,6 @@ mod tests {
assert!(matches!(msg, Err(MessageError::Invalid(..))));
}
#[test]
fn test_bal_message_version_gating() {
// On versions < Eth71, GetBlockAccessLists and BlockAccessLists IDs are treated as
// raw capability messages (Other) since they fall beyond the eth range and may
// belong to another sub-protocol (e.g. snap).
let get_block_access_lists =
EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1337,
message: GetBlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::GetBlockAccessLists,
message: get_block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
let block_access_lists =
EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
request_id: 1337,
message: BlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::BlockAccessLists,
message: block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
}
#[test]
fn test_bal_message_eth71_roundtrip() {
let msg = ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(
RequestPair { request_id: 42, message: GetBlockAccessLists(vec![]) },
));
let encoded = encode(msg.clone());
let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth71,
&mut &encoded[..],
)
.unwrap();
assert_eq!(decoded, msg);
}
#[test]
fn request_pair_encode() {
let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };

View File

@@ -29,8 +29,6 @@ pub enum EthVersion {
Eth69 = 69,
/// The `eth` protocol version 70.
Eth70 = 70,
/// The `eth` protocol version 71.
Eth71 = 71,
}
impl EthVersion {
@@ -64,19 +62,9 @@ impl EthVersion {
pub const fn is_eth70(&self) -> bool {
matches!(self, Self::Eth70)
}
/// Returns true if the version is eth/71
pub const fn is_eth71(&self) -> bool {
matches!(self, Self::Eth71)
}
/// Returns true if the version is eth/69 or newer.
pub const fn is_eth69_or_newer(&self) -> bool {
matches!(self, Self::Eth69 | Self::Eth70 | Self::Eth71)
}
}
/// RLP encodes `EthVersion` as a single byte (66-71).
/// RLP encodes `EthVersion` as a single byte (66-69).
impl Encodable for EthVersion {
fn encode(&self, out: &mut dyn BufMut) {
(*self as u8).encode(out)
@@ -88,7 +76,7 @@ impl Encodable for EthVersion {
}
/// RLP decodes a single byte into `EthVersion`.
/// Returns error if byte is not a valid version (66-71).
/// Returns error if byte is not a valid version (66-69).
impl Decodable for EthVersion {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let version = u8::decode(buf)?;
@@ -116,7 +104,6 @@ impl TryFrom<&str> for EthVersion {
"68" => Ok(Self::Eth68),
"69" => Ok(Self::Eth69),
"70" => Ok(Self::Eth70),
"71" => Ok(Self::Eth71),
_ => Err(ParseVersionError(s.to_string())),
}
}
@@ -142,7 +129,6 @@ impl TryFrom<u8> for EthVersion {
68 => Ok(Self::Eth68),
69 => Ok(Self::Eth69),
70 => Ok(Self::Eth70),
71 => Ok(Self::Eth71),
_ => Err(ParseVersionError(u.to_string())),
}
}
@@ -173,7 +159,6 @@ impl From<EthVersion> for &'static str {
EthVersion::Eth68 => "68",
EthVersion::Eth69 => "69",
EthVersion::Eth70 => "70",
EthVersion::Eth71 => "71",
}
}
}
@@ -231,7 +216,6 @@ mod tests {
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap());
assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap());
assert_eq!(EthVersion::Eth71, EthVersion::try_from("71").unwrap());
}
#[test]
@@ -241,7 +225,6 @@ mod tests {
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
assert_eq!(EthVersion::Eth69, "69".parse().unwrap());
assert_eq!(EthVersion::Eth70, "70".parse().unwrap());
assert_eq!(EthVersion::Eth71, "71".parse().unwrap());
}
#[test]
@@ -252,7 +235,6 @@ mod tests {
EthVersion::Eth68,
EthVersion::Eth69,
EthVersion::Eth70,
EthVersion::Eth71,
];
for version in versions {
@@ -271,7 +253,6 @@ mod tests {
(68_u8, Ok(EthVersion::Eth68)),
(69_u8, Ok(EthVersion::Eth69)),
(70_u8, Ok(EthVersion::Eth70)),
(71_u8, Ok(EthVersion::Eth71)),
(65_u8, Err(RlpError::Custom("invalid eth version"))),
];

View File

@@ -294,8 +294,7 @@ mod tests {
use alloy_primitives::B256;
use alloy_rlp::Encodable;
use reth_eth_wire_types::{
message::RequestPair, GetAccountRangeMessage, GetBlockAccessLists, GetBlockHeaders,
HeadersDirection,
message::RequestPair, GetAccountRangeMessage, GetBlockHeaders, HeadersDirection,
};
// Helper to create eth message and its bytes
@@ -420,40 +419,4 @@ mod tests {
let snap_boundary_result = inner.decode_message(snap_boundary_bytes);
assert!(snap_boundary_result.is_err());
}
#[test]
fn test_eth70_message_id_0x12_is_snap() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth70);
let snap_msg = SnapProtocolMessage::GetAccountRange(GetAccountRangeMessage {
request_id: 1,
root_hash: B256::default(),
starting_hash: B256::default(),
limit_hash: B256::default(),
response_bytes: 1000,
});
let encoded = inner.encode_snap_message(snap_msg);
assert_eq!(encoded[0], EthMessageID::message_count(EthVersion::Eth70));
let decoded = inner.decode_message(BytesMut::from(&encoded[..])).unwrap();
assert!(matches!(decoded, EthSnapMessage::Snap(_)));
}
#[test]
fn test_eth71_message_id_0x12_is_eth() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth71);
let eth_msg = EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1,
message: GetBlockAccessLists(vec![B256::ZERO]),
});
let protocol_msg = ProtocolMessage::from(eth_msg.clone());
let mut buf = Vec::new();
protocol_msg.encode(&mut buf);
let decoded = inner.decode_message(BytesMut::from(&buf[..])).unwrap();
let EthSnapMessage::Eth(decoded_eth) = decoded else {
panic!("expected eth message");
};
assert_eq!(decoded_eth, eth_msg);
}
}

View File

@@ -205,10 +205,7 @@ impl HelloMessageBuilder {
protocol_version: protocol_version.unwrap_or_default(),
client_version: client_version.unwrap_or_else(|| RETH_CLIENT_VERSION.to_string()),
protocols: protocols.unwrap_or_else(|| {
let mut protos: Vec<Protocol> =
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect();
protos.push(Protocol::snap_1());
protos
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect()
}),
port: port.unwrap_or(DEFAULT_TCP_PORT),
id,

View File

@@ -610,20 +610,12 @@ where
let _ = this.primary.to_primary.send(msg);
} else {
// delegate to installed satellite if any
let mut handled = false;
for proto in &this.inner.protocols {
if proto.shared_cap == *cap {
proto.send_raw(msg.clone());
handled = true;
proto.send_raw(msg);
break
}
}
if !handled {
// No satellite handler for this capability (e.g. snap/1
// handled inline). Route to primary so the session can
// handle it via RawCapabilityMessage.
let _ = this.primary.to_primary.send(msg);
}
}
} else {
return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(

View File

@@ -45,13 +45,6 @@ impl Protocol {
Self::eth(EthVersion::Eth68)
}
/// Returns the `snap/1` capability.
///
/// The snap protocol defines 8 message types (0x00..0x07).
pub const fn snap_1() -> Self {
Self::new(Capability::new_static("snap", 1), 8)
}
/// Consumes the type and returns a tuple of the [Capability] and number of messages.
#[inline]
pub(crate) fn split(self) -> (Capability, u8) {
@@ -91,7 +84,5 @@ mod tests {
assert_eq!(Protocol::eth(EthVersion::Eth67).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth68).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth69).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth70).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth71).messages(), 20);
}
}

View File

@@ -1,20 +1,13 @@
//! API related to listening for network events.
use reth_eth_wire_types::{
message::RequestPair,
snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
},
BlockAccessLists, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::{
error::{RequestError, RequestResult},
snap::client::SnapResponse,
};
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind};
use reth_tokio_util::EventStream;
@@ -259,51 +252,6 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Requests block access lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The request for block access lists.
request: GetBlockAccessLists,
/// The channel to send the response for block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
/// Requests an account range from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetAccountRange {
/// The request for an account range.
request: GetAccountRangeMessage,
/// The channel to send the response for the account range.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests storage ranges from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetStorageRanges {
/// The request for storage ranges.
request: GetStorageRangesMessage,
/// The channel to send the response for storage ranges.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests bytecodes from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetByteCodes {
/// The request for bytecodes.
request: GetByteCodesMessage,
/// The channel to send the response for bytecodes.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests trie nodes from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetTrieNodes {
/// The request for trie nodes.
request: GetTrieNodesMessage,
/// The channel to send the response for trie nodes.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
}
// === impl PeerRequest ===
@@ -324,41 +272,10 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
Self::GetAccountRange { response, .. } |
Self::GetStorageRanges { response, .. } |
Self::GetByteCodes { response, .. } |
Self::GetTrieNodes { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns true if this request is supported for the negotiated eth protocol version.
#[inline]
pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
match self {
Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
_ => true,
}
}
/// Returns `true` if this is a snap protocol request.
pub const fn is_snap_request(&self) -> bool {
matches!(
self,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. }
)
}
/// Returns the [`EthMessage`] for this type.
///
/// # Panics
///
/// Panics if called on a snap protocol request variant. Use [`Self::is_snap_request`] to
/// check before calling this method. Snap requests are handled separately in the session
/// layer.
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
match self {
Self::GetBlockHeaders { request, .. } => {
@@ -382,18 +299,6 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts70 { request, .. } => {
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
}
Self::GetBlockAccessLists { request, .. } => {
EthMessage::GetBlockAccessLists(RequestPair {
request_id,
message: request.clone(),
})
}
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => {
panic!("snap protocol requests cannot be converted to EthMessage, handle them separately via is_snap_request()")
}
}
}
@@ -444,18 +349,3 @@ impl<R> fmt::Debug for PeerRequestSender<R> {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_block_access_lists_version_support() {
let (tx, _rx) = oneshot::channel();
let req: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
}
}

View File

@@ -38,7 +38,7 @@ use reth_eth_wire_types::{
capability::Capabilities, Capability, DisconnectReason, EthVersion, NetworkPrimitives,
UnifiedStatus,
};
use reth_network_p2p::{snap::client::SnapClient, sync::NetworkSyncUpdater};
use reth_network_p2p::sync::NetworkSyncUpdater;
use reth_network_peers::NodeRecord;
use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant};
@@ -48,7 +48,7 @@ pub type PeerId = alloy_primitives::B512;
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider<
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block> + SnapClient,
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
> + NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider
@@ -62,8 +62,7 @@ pub trait FullNetwork:
impl<T> FullNetwork for T where
T: BlockDownloaderProvider<
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>
+ SnapClient,
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
> + NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider

View File

@@ -20,7 +20,7 @@ use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_storage_api::{noop::NoopProvider, BlockNumReader, BlockReader, HeaderProvider};
use reth_tasks::Runtime;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::SECP256K1;
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
@@ -76,7 +76,7 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// The default mode of the network.
pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
pub executor: Runtime,
pub executor: Box<dyn TaskSpawner>,
/// The `Status` message to send to peers at the beginning.
pub status: UnifiedStatus,
/// Sets the hello message for the p2p handshake in `RLPx`
@@ -206,7 +206,7 @@ pub struct NetworkConfigBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The default mode of the network.
network_mode: NetworkMode,
/// The executor to use for spawning tasks.
executor: Option<Runtime>,
executor: Option<Box<dyn TaskSpawner>>,
/// Sets the hello message for the p2p handshake in `RLPx`
hello_message: Option<HelloMessageWithProtocols>,
/// The executor to use for spawning tasks.
@@ -342,7 +342,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
/// Sets the executor to use for spawning tasks.
///
/// If `None`, then [`tokio::spawn`] is used for spawning tasks.
pub fn with_task_executor(mut self, executor: Runtime) -> Self {
pub fn with_task_executor(mut self, executor: Box<dyn TaskSpawner>) -> Self {
self.executor = Some(executor);
self
}
@@ -691,11 +691,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
chain_id,
block_import: block_import.unwrap_or_else(|| Box::<ProofOfStakeBlockImport>::default()),
network_mode,
executor: executor.unwrap_or_else(|| match tokio::runtime::Handle::try_current() {
Ok(handle) => Runtime::with_existing_handle(handle)
.expect("failed to create runtime with existing handle"),
Err(_) => Runtime::test(),
}),
executor: executor.unwrap_or_else(|| Box::<TokioTaskExecutor>::default()),
status,
hello_message,
extra_protocols,

View File

@@ -6,13 +6,12 @@ use crate::{
};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::Bytes;
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection,
NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@@ -282,19 +281,6 @@ where
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
/// Handles [`GetBlockAccessLists`] queries.
///
/// For now this returns one empty BAL per requested hash.
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}
#[inline]
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
where
@@ -366,9 +352,6 @@ where
IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
this.on_receipts70_request(peer_id, request, response)
}
IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
this.on_block_access_lists_request(peer_id, request, response)
}
}
},
);
@@ -454,15 +437,4 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel sender for the response containing Receipts70.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Request Block Access Lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The ID of the peer to request block access lists from.
peer_id: PeerId,
/// The requested block hashes.
request: GetBlockAccessLists,
/// The channel sender for the response containing block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
}

View File

@@ -4,9 +4,6 @@ use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
use alloy_primitives::B256;
use futures::{future, future::Either};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
@@ -14,7 +11,6 @@ use reth_network_p2p::{
error::{PeerRequestResult, RequestError},
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
snap::client::{SnapClient, SnapResponse},
BlockClient,
};
use reth_network_peers::PeerId;
@@ -109,92 +105,3 @@ impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
type Block = N::Block;
}
type SnapClientFuture = Either<
FlattenedResponse<PeerRequestResult<SnapResponse>>,
future::Ready<PeerRequestResult<SnapResponse>>,
>;
impl<N: NetworkPrimitives> SnapClient for FetchClient<N> {
type Output = SnapClientFuture;
fn get_account_range_with_priority(
&self,
request: GetAccountRangeMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetAccountRange { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
self.get_storage_ranges_with_priority(request, Priority::Normal)
}
fn get_storage_ranges_with_priority(
&self,
request: GetStorageRangesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetStorageRanges { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
self.get_byte_codes_with_priority(request, Priority::Normal)
}
fn get_byte_codes_with_priority(
&self,
request: GetByteCodesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetByteCodes { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
self.get_trie_nodes_with_priority(request, Priority::Normal)
}
fn get_trie_nodes_with_priority(
&self,
request: GetTrieNodesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetTrieNodes { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
}

View File

@@ -10,22 +10,17 @@ use futures::StreamExt;
use reth_eth_wire::{
Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
};
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
};
use reth_network_api::{test_utils::PeersHandle, PeerRequest};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest,
priority::Priority,
snap::client::SnapResponse,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::{
collections::{HashMap, VecDeque},
ops::RangeInclusive,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
@@ -38,17 +33,6 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
/// Tracks an inflight snap request, bridging the session's response back to the download caller.
#[derive(Debug)]
struct InflightSnapRequest {
/// The peer that's handling this request
peer_id: PeerId,
/// The channel to send the final response (with peer id) to the download caller
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
/// The receiver for the session's response
rx: oneshot::Receiver<RequestResult<SnapResponse>>,
}
/// Manages data fetching operations.
///
/// This type is hooked into the staged sync pipeline and delegates download request to available
@@ -61,8 +45,6 @@ pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
/// Currently active [`GetBlockBodies`] requests
inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
/// Currently active snap protocol requests
inflight_snap_requests: Vec<InflightSnapRequest>,
/// The list of _available_ peers for requests.
peers: HashMap<PeerId, Peer>,
/// The handle to the peers manager
@@ -85,7 +67,6 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
inflight_snap_requests: Vec::new(),
peers: Default::default(),
peers_handle,
num_active_peers,
@@ -133,16 +114,6 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
if let Some(req) = self.inflight_bodies_requests.remove(peer) {
let _ = req.response.send(Err(RequestError::ConnectionDropped));
}
// Cancel inflight snap requests for this peer
let mut i = 0;
while i < self.inflight_snap_requests.len() {
if &self.inflight_snap_requests[i].peer_id == peer {
let req = self.inflight_snap_requests.swap_remove(i);
let _ = req.response.send(Err(RequestError::ConnectionDropped));
} else {
i += 1;
}
}
}
/// Updates the block information for the peer.
@@ -200,7 +171,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
/// Returns the next action to return
fn poll_action(&mut self) -> PollAction<N> {
fn poll_action(&mut self) -> PollAction {
// we only check and not pop here since we don't know yet whether a peer is available.
if self.queued_requests.is_empty() {
return PollAction::NoRequests
@@ -213,39 +184,13 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
return PollAction::NoPeersAvailable
};
// Snap requests bypass the block request path and are dispatched directly
if request.is_snap_request() {
let snap_request = self.prepare_snap_request(peer_id, request);
return PollAction::Ready(FetchAction::SnapRequest { peer_id, request: snap_request })
}
let request = self.prepare_block_request(peer_id, request);
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
}
/// Advance the state the syncer
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction<N>> {
// Poll inflight snap requests and forward responses
let mut i = 0;
while i < self.inflight_snap_requests.len() {
match Pin::new(&mut self.inflight_snap_requests[i].rx).poll(cx) {
Poll::Ready(result) => {
let req = self.inflight_snap_requests.swap_remove(i);
let resp = match result {
Ok(Ok(snap_resp)) => Ok((req.peer_id, snap_resp).into()),
Ok(Err(err)) => Err(err),
Err(_) => Err(RequestError::ChannelClosed),
};
let _ = req.response.send(resp);
self.on_snap_response(req.peer_id);
}
Poll::Pending => {
i += 1;
}
}
}
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
// drain buffered actions first
loop {
let no_peers_available = match self.poll_action() {
@@ -311,58 +256,6 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
self.inflight_bodies_requests.insert(peer_id, inflight);
BlockRequest::GetBlockBodies(GetBlockBodies(request))
}
DownloadRequest::GetAccountRange { .. } |
DownloadRequest::GetStorageRanges { .. } |
DownloadRequest::GetByteCodes { .. } |
DownloadRequest::GetTrieNodes { .. } => {
unreachable!("snap requests are handled via prepare_snap_request")
}
}
}
/// Handles a new snap request to a peer.
///
/// Converts the download request into a [`PeerRequest`] for dispatch to the peer session.
/// The `DownloadRequest`'s response channel is stored as an inflight snap request so the
/// response can be forwarded back (with the peer id) once the session replies.
///
/// Caution: this assumes the peer exists and is idle
fn prepare_snap_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> PeerRequest<N> {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state = req.peer_state();
}
match req {
DownloadRequest::GetAccountRange { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetAccountRange { request, response: tx }
}
DownloadRequest::GetStorageRanges { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetStorageRanges { request, response: tx }
}
DownloadRequest::GetByteCodes { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetByteCodes { request, response: tx }
}
DownloadRequest::GetTrieNodes { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetTrieNodes { request, response: tx }
}
_ => unreachable!("only snap requests should be passed to prepare_snap_request"),
}
}
/// Called when a snap response is received.
///
/// Marks the peer as idle so it can accept new requests.
fn on_snap_response(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state.on_request_finished();
}
}
@@ -448,8 +341,8 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
/// The outcome of [`StateFetcher::poll_action`]
enum PollAction<N: NetworkPrimitives = EthNetworkPrimitives> {
Ready(FetchAction<N>),
enum PollAction {
Ready(FetchAction),
NoRequests,
NoPeersAvailable,
}
@@ -560,8 +453,6 @@ enum PeerState {
GetBlockHeaders,
/// Peer is handling a `GetBlockBodies` request.
GetBlockBodies,
/// Peer is handling a snap protocol request.
SnapRequest,
/// Peer session is about to close
Closing,
}
@@ -600,7 +491,6 @@ struct Request<Req, Resp> {
/// Requests that can be sent to the Syncer from a [`FetchClient`]
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
/// Download the requested headers and send response through channel
GetBlockHeaders {
@@ -615,30 +505,6 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
},
/// Request an account range via snap protocol
GetAccountRange {
request: GetAccountRangeMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request storage ranges via snap protocol
GetStorageRanges {
request: GetStorageRangesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request bytecodes via snap protocol
GetByteCodes {
request: GetByteCodesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request trie nodes via snap protocol
GetTrieNodes {
request: GetTrieNodesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
}
// === impl DownloadRequest ===
@@ -649,22 +515,15 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
match self {
Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => PeerState::SnapRequest,
}
}
/// Returns the requested priority of this request
const fn get_priority(&self) -> &Priority {
match self {
Self::GetBlockHeaders { priority, .. } |
Self::GetBlockBodies { priority, .. } |
Self::GetAccountRange { priority, .. } |
Self::GetStorageRanges { priority, .. } |
Self::GetByteCodes { priority, .. } |
Self::GetTrieNodes { priority, .. } => priority,
Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
priority
}
}
}
@@ -673,25 +532,10 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
self.get_priority().is_normal()
}
/// Returns `true` if this is a snap protocol request.
const fn is_snap_request(&self) -> bool {
matches!(
self,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. }
)
}
/// Returns the best peer requirements for this request.
fn best_peer_requirements(&self) -> BestPeerRequirements {
match self {
Self::GetBlockHeaders { .. } |
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => BestPeerRequirements::None,
Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
Self::GetBlockBodies { range_hint, .. } => {
if let Some(range) = range_hint {
BestPeerRequirements::FullBlockRange(range.clone())
@@ -704,7 +548,7 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
}
/// An action the syncer can emit.
pub(crate) enum FetchAction<N: NetworkPrimitives = EthNetworkPrimitives> {
pub(crate) enum FetchAction {
/// Dispatch an eth request to the given peer.
BlockRequest {
/// The targeted recipient for the request
@@ -712,13 +556,6 @@ pub(crate) enum FetchAction<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The request to send
request: BlockRequest,
},
/// Dispatch a snap protocol request to the given peer.
SnapRequest {
/// The targeted recipient for the request
peer_id: PeerId,
/// The snap request to send
request: PeerRequest<N>,
},
}
/// Outcome of a processed response.

View File

@@ -551,13 +551,6 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
})
}
PeerRequest::GetBlockAccessLists { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
peer_id,
request,
response,
})
}
PeerRequest::GetPooledTransactions { request, response } => {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,
@@ -565,13 +558,6 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
});
}
PeerRequest::GetAccountRange { .. } |
PeerRequest::GetStorageRanges { .. } |
PeerRequest::GetByteCodes { .. } |
PeerRequest::GetTrieNodes { .. } => {
// Snap protocol requests from peers are not handled here.
// They are handled in the session layer directly.
}
}
}

View File

@@ -3,7 +3,7 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use crate::types::{BlockAccessLists, Receipts69, Receipts70};
use crate::types::{Receipts69, Receipts70};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
@@ -121,11 +121,6 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
},
/// Represents a response to a request for block access lists.
BlockAccessLists {
/// The receiver channel for the response to a block access lists request.
response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
},
}
// === impl PeerResponse ===
@@ -165,10 +160,6 @@ impl<N: NetworkPrimitives> PeerResponse<N> {
Ok(res) => PeerResponseResult::Receipts70(res),
Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
},
Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
Ok(res) => PeerResponseResult::BlockAccessLists(res),
Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
},
};
Poll::Ready(res)
}
@@ -191,8 +182,6 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
/// Represents a result containing receipts or an error for eth/70.
Receipts70(RequestResult<Receipts70<N::Receipt>>),
/// Represents a result containing block access lists or an error.
BlockAccessLists(RequestResult<BlockAccessLists>),
}
// === impl PeerResponseResult ===
@@ -237,13 +226,6 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
}
Err(err) => Err(err),
},
Self::BlockAccessLists(resp) => match resp {
Ok(res) => {
let request = RequestPair { request_id: id, message: res };
Ok(EthMessage::BlockAccessLists(request))
}
Err(err) => Err(err),
},
}
}
@@ -257,7 +239,6 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::Receipts(res) => res.as_ref().err(),
Self::Receipts69(res) => res.as_ref().err(),
Self::Receipts70(res) => res.as_ref().err(),
Self::BlockAccessLists(res) => res.as_ref().err(),
}
}

View File

@@ -282,12 +282,6 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
EthMessage::Receipts70(resp) => {
on_response!(resp, GetReceipts70)
}
EthMessage::GetBlockAccessLists(req) => {
on_request!(req, BlockAccessLists, GetBlockAccessLists)
}
EthMessage::BlockAccessLists(resp) => {
on_response!(resp, GetBlockAccessLists)
}
EthMessage::BlockRangeUpdate(msg) => {
// Validate that earliest <= latest according to the spec
if msg.earliest > msg.latest {
@@ -316,186 +310,23 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
OnIncomingMessageOutcome::Ok
}
EthMessage::Other(raw) => {
if let Some(outcome) = self.try_handle_snap_response(&raw) {
outcome
} else {
self.try_emit_broadcast(PeerMessage::Other(raw)).into()
}
}
EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
}
}
/// Attempts to decode a raw capability message as a snap protocol response and resolve the
/// matching inflight request.
///
/// Returns `Some` if the message ID falls in the snap range (even if decoding or matching
/// fails), `None` if the message is not a snap message.
fn try_handle_snap_response(
&mut self,
raw: &RawCapabilityMessage,
) -> Option<OnIncomingMessageOutcome<N>> {
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
use reth_network_p2p::snap::client::SnapResponse;
let eth_offset = EthMessageID::message_count(self.conn.version()) as usize;
let snap_count = 8; // snap/1 has 8 message types (0x00..0x07)
// Check if the raw message ID falls in the snap range
if raw.id < eth_offset || raw.id >= eth_offset + snap_count {
return None;
}
let snap_id = (raw.id - eth_offset) as u8;
// Only handle response messages (odd IDs: AccountRange=1, StorageRanges=3, ByteCodes=5,
// TrieNodes=7)
if snap_id.is_multiple_of(2) {
// This is a snap *request* from the remote peer, not a response.
// For now, we don't handle incoming snap requests — let it pass through.
return None;
}
let mut buf = raw.payload.as_ref();
let snap_msg = match SnapProtocolMessage::decode(snap_id, &mut buf) {
Ok(msg) => msg,
Err(err) => {
debug!(target: "net::session", %err, ?snap_id, remote_peer_id=?self.remote_peer_id, "failed to decode snap response");
self.on_bad_message();
return Some(OnIncomingMessageOutcome::Ok);
}
};
// Extract request_id and build the SnapResponse
let (request_id, expected_variant, snap_response) = match snap_msg {
SnapProtocolMessage::AccountRange(msg) => {
(msg.request_id, "GetAccountRange", SnapResponse::AccountRange(msg))
}
SnapProtocolMessage::StorageRanges(msg) => {
(msg.request_id, "GetStorageRanges", SnapResponse::StorageRanges(msg))
}
SnapProtocolMessage::ByteCodes(msg) => {
(msg.request_id, "GetByteCodes", SnapResponse::ByteCodes(msg))
}
SnapProtocolMessage::TrieNodes(msg) => {
(msg.request_id, "GetTrieNodes", SnapResponse::TrieNodes(msg))
}
_ => {
// Not a response message — shouldn't happen given the odd-ID check above
return None;
}
};
if let Some(req) = self.inflight_requests.remove(&request_id) {
match req.request {
RequestState::Waiting(
PeerRequest::GetAccountRange { response, .. } |
PeerRequest::GetStorageRanges { response, .. } |
PeerRequest::GetByteCodes { response, .. } |
PeerRequest::GetTrieNodes { response, .. },
) => {
trace!(peer_id=?self.remote_peer_id, ?request_id, %expected_variant, "received snap response from peer");
let _ = response.send(Ok(snap_response));
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(request) => {
request.send_bad_response();
}
RequestState::TimedOut => {
self.update_request_timeout(req.timestamp, Instant::now());
}
}
} else {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received snap response to unknown request");
self.on_bad_message();
}
Some(OnIncomingMessageOutcome::Ok)
}
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let version = self.conn.version();
if !Self::is_request_supported_for_version(&request, version) {
debug!(
target: "net",
?request,
peer_id=?self.remote_peer_id,
?version,
"Request not supported for negotiated eth version",
);
request.send_err_response(RequestError::UnsupportedCapability);
return;
}
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let msg = request.create_request_message(request_id).map_versioned(self.conn.version());
if request.is_snap_request() {
// Snap requests are encoded as raw capability messages with adjusted message IDs.
// The snap message ID is offset by the eth message count for multiplexing.
if let Some(raw_msg) = self.encode_snap_request(&request, request_id) {
self.queued_outgoing.push_back(OutgoingMessage::Raw(raw_msg));
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
self.inflight_requests.insert(request_id, req);
} else {
request.send_err_response(RequestError::UnsupportedCapability);
}
} else {
let msg = request.create_request_message(request_id).map_versioned(version);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
self.inflight_requests.insert(request_id, req);
}
}
/// Encodes a snap protocol request as a [`RawCapabilityMessage`].
fn encode_snap_request(
&self,
request: &PeerRequest<N>,
_request_id: u64,
) -> Option<RawCapabilityMessage> {
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
let snap_msg = match request {
PeerRequest::GetAccountRange { request, .. } => {
SnapProtocolMessage::GetAccountRange(request.clone())
}
PeerRequest::GetStorageRanges { request, .. } => {
SnapProtocolMessage::GetStorageRanges(request.clone())
}
PeerRequest::GetByteCodes { request, .. } => {
SnapProtocolMessage::GetByteCodes(request.clone())
}
PeerRequest::GetTrieNodes { request, .. } => {
SnapProtocolMessage::GetTrieNodes(request.clone())
}
_ => return None,
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
let encoded = snap_msg.encode();
// The first byte is the snap message ID, which needs to be offset
// by the eth protocol message count for proper multiplexing.
let snap_id = encoded[0];
let adjusted_id = snap_id + EthMessageID::message_count(self.conn.version());
let mut payload = Vec::with_capacity(encoded.len() - 1);
payload.extend_from_slice(&encoded[1..]);
Some(RawCapabilityMessage::new(adjusted_id as usize, payload.into()))
}
#[inline]
fn is_request_supported_for_version(request: &PeerRequest<N>, version: EthVersion) -> bool {
request.is_supported_by_eth_version(version)
self.inflight_requests.insert(request_id, req);
}
/// Handle a message received from the internal network
@@ -1107,9 +938,9 @@ mod tests {
use reth_chainspec::MAINNET;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockAccessLists,
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockBodies,
HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
UnifiedStatus,
};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
@@ -1409,22 +1240,6 @@ mod tests {
}
}
#[test]
fn test_reject_bal_request_for_eth70() {
let (tx, _rx) = oneshot::channel();
let request: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth70
));
assert!(ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth71
));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_keep_alive() {
let mut builder = SessionBuilder::default();

View File

@@ -28,7 +28,7 @@ use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_api::{PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use reth_network_types::SessionsConfig;
use reth_tasks::Runtime;
use reth_tasks::TaskSpawner;
use rustc_hash::FxHashMap;
use secp256k1::SecretKey;
use std::{
@@ -87,7 +87,7 @@ pub struct SessionManager<N: NetworkPrimitives> {
/// Size of the command buffer per session.
session_command_buffer: usize,
/// The executor for spawned tasks.
executor: Runtime,
executor: Box<dyn TaskSpawner>,
/// All pending session that are currently handshaking, exchanging `Hello`s.
///
/// Events produced during the authentication phase are reported to this manager. Once the
@@ -130,7 +130,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
pub fn new(
secret_key: SecretKey,
config: SessionsConfig,
executor: Runtime,
executor: Box<dyn TaskSpawner>,
status: UnifiedStatus,
hello_message: HelloMessageWithProtocols,
fork_filter: ForkFilter,
@@ -229,7 +229,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
where
F: Future<Output = ()> + Send + 'static,
{
self.executor.spawn_task(f);
self.executor.spawn_task(f.boxed());
}
/// Invoked on a received status update.
@@ -908,7 +908,7 @@ pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
}
/// Starts the authentication process for a connection initiated by a remote peer.
#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id))]
#[expect(clippy::too_many_arguments)]
async fn start_pending_outbound_session<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,

View File

@@ -403,16 +403,6 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}
}
/// Sends a snap request directly to the peer's session.
///
/// Unlike block requests, snap requests don't need response tracking here because
/// the [`StateFetcher`] bridges the response back to the caller internally.
fn handle_snap_request(&mut self, peer: PeerId, request: PeerRequest<N>) {
if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
let _ = peer.request_tx.to_session_tx.try_send(request);
}
}
/// Handle the outcome of processed response, for example directly queue another request.
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
match outcome {
@@ -463,9 +453,6 @@ impl<N: NetworkPrimitives> NetworkState<N> {
FetchAction::BlockRequest { peer_id, request } => {
self.handle_block_request(peer_id, request)
}
FetchAction::SnapRequest { peer_id, request } => {
self.handle_snap_request(peer_id, request)
}
}
}

View File

@@ -10,7 +10,7 @@ use crate::{
policy::NetworkPolicies,
TransactionsHandle, TransactionsManager, TransactionsManagerConfig,
},
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, PeersConfig,
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
@@ -29,7 +29,7 @@ use reth_network_peers::PeerId;
use reth_storage_api::{
noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
};
use reth_tasks::Runtime;
use reth_tasks::TokioTaskExecutor;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore,
@@ -198,7 +198,7 @@ where
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
Runtime::test(),
TokioTaskExecutor::default(),
);
peer.map_transactions_manager(EthTransactionPool::eth_pool(
pool,
@@ -228,7 +228,7 @@ where
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
Runtime::test(),
TokioTaskExecutor::default(),
);
peer.map_transactions_manager_with(
@@ -718,7 +718,6 @@ where
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.disable_dns_discovery()
.disable_discv4_discovery()
.peer_config(PeersConfig::test())
}
}

View File

@@ -1949,7 +1949,7 @@ impl PooledTransactionsHashesBuilder {
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
Self::Eth68(Default::default())
}
}

View File

@@ -12,13 +12,14 @@ use reth_network::{
};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
NetworkInfo, PeerKind, Peers, PeersInfo,
NetworkInfo, Peers, PeersInfo,
};
use reth_network_p2p::{
headers::client::{HeadersClient, HeadersRequest},
sync::{NetworkSyncUpdater, SyncState},
};
use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
use reth_network_types::peers::config::PeerBackoffDurations;
use reth_provider::test_utils::MockEthProvider;
use reth_storage_api::noop::NoopProvider;
use reth_tracing::init_test_tracing;
@@ -379,7 +380,10 @@ async fn test_trusted_peer_only() {
let _handle = net.spawn();
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let peers_config = PeersConfig::test().with_trusted_nodes_only(true);
let peers_config = PeersConfig::default()
.with_backoff_durations(PeerBackoffDurations::test())
.with_ban_duration(Duration::from_millis(200))
.with_trusted_nodes_only(true);
let config = NetworkConfigBuilder::eth(secret_key)
.listener_port(0)
@@ -401,8 +405,8 @@ async fn test_trusted_peer_only() {
// connect to an untrusted peer should fail.
handle.add_peer(*handle0.peer_id(), handle0.local_addr());
// wait 500ms, the number of connection is still 0.
tokio::time::sleep(Duration::from_millis(500)).await;
// wait 1 second, the number of connection is still 0.
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(handle.num_connected_peers(), 0);
// add to trusted peer.
@@ -415,22 +419,17 @@ async fn test_trusted_peer_only() {
// only receive connections from trusted peers.
handle1.add_peer(*handle.peer_id(), handle.local_addr());
// wait 500ms, the number of connections is still 1, because peer1 is untrusted.
tokio::time::sleep(Duration::from_millis(500)).await;
// wait 1 second, the number of connections is still 1, because peer1 is untrusted.
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(handle.num_connected_peers(), 1);
// remove handle from handle1's peer list to prevent a competing outgoing connection attempt
// from handle1 racing with handle's outgoing connection below, which can cause duplicate
// session resolution to drop a connection
handle1.remove_peer(*handle.peer_id(), PeerKind::Basic);
handle.add_trusted_peer(*handle1.peer_id(), handle1.local_addr());
// wait for the next session established event to check the handle1 incoming connection
let outgoing_peer_id1 = event_stream.next_session_established().await.unwrap();
assert_eq!(outgoing_peer_id1, *handle1.peer_id());
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 2);
// check that handle0 and handle1 both have peers.
@@ -442,7 +441,8 @@ async fn test_trusted_peer_only() {
async fn test_network_state_change() {
let net = Testnet::create(1).await;
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let peers_config = PeersConfig::test();
let peers_config =
PeersConfig::default().with_refill_slots_interval(Duration::from_millis(500));
let config = NetworkConfigBuilder::eth(secret_key)
.listener_port(0)
@@ -466,16 +466,16 @@ async fn test_network_state_change() {
handle.add_peer(*handle0.peer_id(), handle0.local_addr());
// wait 500ms, the number of connections is still 0, because network is Hibernate.
tokio::time::sleep(Duration::from_millis(500)).await;
// wait 2 seconds, the number of connections is still 0, because network is Hibernate.
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 0);
// Set network state to Active.
handle.set_network_active();
// wait 500ms, the number of connections should be 1, because network is Active and outbound
// wait 2 seconds, the number of connections should be 1, because network is Active and outbound
// slot should be filled.
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 1);
}
@@ -483,7 +483,7 @@ async fn test_network_state_change() {
async fn test_exceed_outgoing_connections() {
let net = Testnet::create(2).await;
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let peers_config = PeersConfig::test().with_max_outbound(1);
let peers_config = PeersConfig::default().with_max_outbound(1);
let config = NetworkConfigBuilder::eth(secret_key)
.listener_port(0)
@@ -514,9 +514,9 @@ async fn test_exceed_outgoing_connections() {
handle.add_peer(*handle1.peer_id(), handle1.local_addr());
// wait 500ms, the number of connections is still 1, indicating that the max outbound is in
// wait 2 seconds, the number of connections is still 1, indicating that the max outbound is in
// effect.
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 1);
}
@@ -524,7 +524,7 @@ async fn test_exceed_outgoing_connections() {
async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
let net = Testnet::create(1).await;
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let peers_config = PeersConfig::test().with_max_inbound(0);
let peers_config = PeersConfig::default().with_max_inbound(0);
let config = NetworkConfigBuilder::eth(secret_key)
.listener_port(0)
@@ -543,7 +543,7 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
tokio::task::spawn(network);
let net_handle = net.spawn();
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(handle.num_connected_peers(), 0);
@@ -623,15 +623,15 @@ async fn test_rejected_by_already_connect() {
// incoming connection from the same peer should be rejected by already connected
// and num_inbount should still be 1
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
// incoming connection from other_peer2 should succeed
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());
// wait 500ms and check that other_peer2 is not rejected by TooManyPeers
tokio::time::sleep(Duration::from_millis(500)).await;
// wait 2 seconds and check that other_peer2 is not rejected by TooManyPeers
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 2);
}
@@ -641,7 +641,7 @@ async fn new_random_peer(
) -> NetworkManager<EthNetworkPrimitives> {
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let peers_config =
PeersConfig::test().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);
let config = NetworkConfigBuilder::new(secret_key)
.listener_port(0)
@@ -775,7 +775,7 @@ async fn test_reconnect_trusted() {
// Await that handle1 (trusted peer) reconnects automatically
let reconnect_result =
tokio::time::timeout(Duration::from_secs(10), listener0.next_session_established()).await;
tokio::time::timeout(Duration::from_secs(60), listener0.next_session_established()).await;
match reconnect_result {
Ok(Some(peer)) => {

View File

@@ -5,20 +5,13 @@ use crate::{
error::PeerRequestResult,
headers::client::{HeadersClient, SingleHeaderRequest},
priority::Priority,
snap::client::{SnapClient, SnapResponse},
BlockClient,
};
use alloy_consensus::BlockHeader;
use alloy_primitives::{Sealable, B256};
use core::marker::PhantomData;
use reth_consensus::Consensus;
use reth_eth_wire_types::{
snap::{
AccountRangeMessage, ByteCodesMessage, GetAccountRangeMessage, GetByteCodesMessage,
GetStorageRangesMessage, GetTrieNodesMessage, StorageRangesMessage, TrieNodesMessage,
},
EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
};
use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
use reth_network_peers::{PeerId, WithPeerId};
use reth_primitives_traits::{SealedBlock, SealedHeader};
use std::{
@@ -747,83 +740,6 @@ where
type Block = Net::Block;
}
impl<Net> SnapClient for NoopFullBlockClient<Net>
where
Net: NetworkPrimitives,
{
type Output = futures::future::Ready<PeerRequestResult<SnapResponse>>;
fn get_account_range_with_priority(
&self,
request: GetAccountRangeMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::AccountRange(AccountRangeMessage {
request_id: request.request_id,
accounts: vec![],
proof: vec![],
}),
)))
}
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
self.get_storage_ranges_with_priority(request, Priority::Normal)
}
fn get_storage_ranges_with_priority(
&self,
request: GetStorageRangesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::StorageRanges(StorageRangesMessage {
request_id: request.request_id,
slots: vec![],
proof: vec![],
}),
)))
}
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
self.get_byte_codes_with_priority(request, Priority::Normal)
}
fn get_byte_codes_with_priority(
&self,
request: GetByteCodesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::ByteCodes(ByteCodesMessage {
request_id: request.request_id,
codes: vec![],
}),
)))
}
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
self.get_trie_nodes_with_priority(request, Priority::Normal)
}
fn get_trie_nodes_with_priority(
&self,
request: GetTrieNodesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::TrieNodes(TrieNodesMessage {
request_id: request.request_id,
nodes: vec![],
}),
)))
}
}
impl<Net> Default for NoopFullBlockClient<Net> {
fn default() -> Self {
Self(PhantomData::<Net>)

View File

@@ -1,56 +0,0 @@
[package]
name = "reth-snap-sync"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Snap sync protocol implementation for reth"
[lints]
workspace = true
[dependencies]
# reth
reth-eth-wire-types.workspace = true
reth-network-p2p.workspace = true
reth-network-peers.workspace = true
reth-primitives-traits.workspace = true
reth-storage-api.workspace = true
reth-storage-errors.workspace = true
reth-db-api.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-metrics.workspace = true
reth-trie.workspace = true
reth-trie-db.workspace = true
reth-execution-errors.workspace = true
# misc (non-workspace)
rand = "0.9"
# ethereum
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-consensus.workspace = true
alloy-trie.workspace = true
# async
futures.workspace = true
tokio = { workspace = true, features = ["sync", "time", "macros"] }
tokio-stream.workspace = true
# misc
tracing.workspace = true
thiserror.workspace = true
metrics.workspace = true
derive_more.workspace = true
[dev-dependencies]
reth-provider = { workspace = true, features = ["test-utils"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
[features]
default = []

View File

@@ -1,8 +0,0 @@
//! Configuration for snap sync.
/// Configuration for snap sync.
#[derive(Debug, Clone, Default)]
pub struct SnapSyncConfig {
/// Whether snap sync is enabled.
pub enabled: bool,
}

View File

@@ -1,647 +0,0 @@
//! Snap sync downloader.
//!
//! Orchestrates the multi-phase snap sync process:
//! 1. Download account ranges via `GetAccountRange`
//! 2. Download storage slots via `GetStorageRanges`
//! 3. Download bytecodes via `GetByteCodes`
//! 4. Verify state root against pivot block
use crate::{
error::SnapSyncError,
metrics::SnapSyncMetrics,
progress::{SnapPhase, SnapProgress},
};
use alloy_consensus::constants::KECCAK_EMPTY;
use alloy_primitives::{keccak256, Bytes, B256, U256};
use alloy_rlp::Decodable;
use alloy_trie::TrieAccount;
use reth_db_api::{
cursor::DbCursorRO,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage,
};
use reth_network_p2p::snap::client::{SnapClient, SnapResponse};
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use reth_trie::StateRoot;
use reth_trie_db::DatabaseStateRoot;
use std::collections::HashSet;
use tokio::sync::watch;
use tracing::{debug, info, trace, warn};
/// Maximum response size in bytes for snap requests (512 KB).
const MAX_RESPONSE_BYTES: u64 = 512 * 1024;
/// Number of accounts to accumulate before flushing to DB.
const ACCOUNT_WRITE_BATCH_SIZE: usize = 10_000;
/// Number of storage slots to accumulate before flushing to DB.
const STORAGE_WRITE_BATCH_SIZE: usize = 50_000;
/// Number of bytecodes to request in a single batch.
const BYTECODE_BATCH_SIZE: usize = 64;
/// Hash representing the maximum key (all 0xFF).
const HASH_MAX: B256 = B256::repeat_byte(0xFF);
/// The snap sync downloader that orchestrates state download from peers.
#[derive(Debug)]
pub struct SnapSyncDownloader<C, F> {
/// The snap-capable network client.
client: C,
/// Database provider factory for writing state.
provider_factory: F,
/// Current sync progress.
progress: SnapProgress,
/// Metrics.
metrics: SnapSyncMetrics,
/// Cancellation signal.
cancel_rx: watch::Receiver<bool>,
}
impl<C, F> SnapSyncDownloader<C, F>
where
C: SnapClient + Clone + Send + Sync + 'static,
F: DatabaseProviderFactory + Send + Sync + 'static,
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
{
/// Creates a new snap sync downloader.
pub fn new(
client: C,
provider_factory: F,
pivot_hash: B256,
pivot_number: u64,
state_root: B256,
cancel_rx: watch::Receiver<bool>,
) -> Self {
Self {
client,
provider_factory,
progress: SnapProgress::new(pivot_hash, pivot_number, state_root),
metrics: SnapSyncMetrics::default(),
cancel_rx,
}
}
/// Runs the snap sync to completion.
///
/// Returns `Ok(())` if state was successfully downloaded and verified,
/// or an error if sync failed.
pub async fn run(&mut self) -> Result<(), SnapSyncError> {
info!(
target: "snap_sync",
pivot_hash = %self.progress.pivot_hash,
pivot_number = self.progress.pivot_number,
state_root = %self.progress.state_root,
"starting snap sync"
);
// Phase 1: Download accounts
self.progress.phase = SnapPhase::Accounts;
self.metrics.phase.set(1.0);
let storage_accounts = self.download_accounts().await?;
info!(
target: "snap_sync",
accounts = self.progress.accounts_downloaded,
storage_accounts = storage_accounts.len(),
"account download complete"
);
// Phase 2: Download storage slots
self.progress.phase = SnapPhase::Storages;
self.metrics.phase.set(2.0);
let code_hashes = self.download_storages(&storage_accounts).await?;
info!(
target: "snap_sync",
slots = self.progress.storage_slots_downloaded,
"storage download complete"
);
// Phase 3: Download bytecodes
self.progress.phase = SnapPhase::Bytecodes;
self.metrics.phase.set(3.0);
self.download_bytecodes(code_hashes).await?;
info!(
target: "snap_sync",
bytecodes = self.progress.bytecodes_downloaded,
"bytecode download complete"
);
// Phase 4: Verification (hashing + merkle root)
self.progress.phase = SnapPhase::Verification;
self.metrics.phase.set(4.0);
info!(target: "snap_sync", "verifying state root against pivot block");
self.verify_state_root()?;
self.progress.phase = SnapPhase::Done;
self.metrics.phase.set(5.0);
Ok(())
}
/// Returns the current progress.
pub const fn progress(&self) -> &SnapProgress {
&self.progress
}
/// Checks if cancellation was requested.
fn is_cancelled(&self) -> bool {
*self.cancel_rx.borrow()
}
// ========================================================================
// Phase 4: Verification
// ========================================================================
/// Computes the state root from the downloaded state and verifies it against
/// the pivot block's expected state root.
fn verify_state_root(&self) -> Result<(), SnapSyncError> {
info!(target: "snap_sync", "computing state root from downloaded state");
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let computed_root = StateRoot::from_tx(provider.tx_ref())
.root()
.map_err(|e| SnapSyncError::StateRootVerification(e.to_string()))?;
if computed_root != self.progress.state_root {
return Err(SnapSyncError::StateRootMismatch {
expected: self.progress.state_root,
got: computed_root,
});
}
info!(target: "snap_sync", %computed_root, "state root verified successfully");
Ok(())
}
// ========================================================================
// Phase 1: Account download
// ========================================================================
/// Downloads all accounts from the state trie via `GetAccountRange` requests.
///
/// Returns a list of `(address_hash, storage_root)` for accounts that have
/// non-empty storage (`storage_root` != `EMPTY_TRIE_HASH`).
async fn download_accounts(&mut self) -> Result<Vec<(B256, B256)>, SnapSyncError> {
let state_root = self.progress.state_root;
let mut cursor = self.progress.account_cursor;
let mut storage_accounts: Vec<(B256, B256)> = Vec::new();
// Batch buffer for DB writes
let mut account_batch: Vec<(B256, Account, B256)> = Vec::new();
loop {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let response = retry_snap_request(|| {
let req = GetAccountRangeMessage {
request_id: rand_request_id(),
root_hash: state_root,
starting_hash: cursor,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_account_range(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::AccountRange(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidAccountRange(
"unexpected response type".into(),
));
}
};
if msg.accounts.is_empty() {
// Empty response means we've reached the end or peer doesn't serve this root
debug!(target: "snap_sync", %cursor, "received empty account range, finishing");
break;
}
// Process each account
for account_data in &msg.accounts {
let trie_account = decode_slim_account(&account_data.body)?;
let account = Account {
nonce: trie_account.nonce,
balance: trie_account.balance,
bytecode_hash: if trie_account.code_hash == KECCAK_EMPTY {
None
} else {
Some(trie_account.code_hash)
},
};
// Track accounts with storage
let empty_root = alloy_trie::EMPTY_ROOT_HASH;
if trie_account.storage_root != empty_root {
storage_accounts.push((account_data.hash, trie_account.storage_root));
}
account_batch.push((account_data.hash, account, trie_account.storage_root));
self.progress.accounts_downloaded += 1;
}
// Update cursor to continue after the last account
cursor = increment_hash(msg.accounts.last().unwrap().hash);
// Flush batch if large enough
if account_batch.len() >= ACCOUNT_WRITE_BATCH_SIZE {
self.write_accounts(&account_batch)?;
account_batch.clear();
}
self.metrics.accounts_downloaded.set(self.progress.accounts_downloaded as f64);
self.progress.account_cursor = cursor;
trace!(
target: "snap_sync",
accounts = self.progress.accounts_downloaded,
%cursor,
"account download progress"
);
// If cursor wrapped around to zero, we've covered the full range
if cursor == B256::ZERO {
break;
}
}
// Flush remaining
if !account_batch.is_empty() {
self.write_accounts(&account_batch)?;
}
Ok(storage_accounts)
}
/// Writes a batch of accounts to the database.
///
/// Account hashes are used as keys since snap protocol returns hashed addresses.
/// The address→hash mapping will be resolved during the hashing stage.
fn write_accounts(&self, batch: &[(B256, Account, B256)]) -> Result<(), SnapSyncError> {
// For snap sync, we write to HashedAccounts table since snap returns hashed keys.
// The pipeline's hashing stage normally computes this from PlainAccountState,
// but for snap we go directly to the hashed form.
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (hash, account, _storage_root) in batch {
tx.put::<tables::HashedAccounts>(*hash, *account)?;
}
provider.commit()?;
Ok(())
}
// ========================================================================
// Phase 2: Storage download
// ========================================================================
/// Downloads storage slots for all accounts with non-empty storage roots.
///
/// Returns the set of code hashes encountered during account processing.
async fn download_storages(
&mut self,
storage_accounts: &[(B256, B256)],
) -> Result<HashSet<B256>, SnapSyncError> {
let state_root = self.progress.state_root;
let code_hashes: HashSet<B256> = HashSet::new();
// Process storage accounts in chunks
let mut slot_batch: Vec<(B256, B256, U256)> = Vec::new();
for (account_hash, _storage_root) in storage_accounts {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let mut slot_cursor = B256::ZERO;
loop {
let account_hash_val = *account_hash;
let response = retry_snap_request(|| {
let req = GetStorageRangesMessage {
request_id: rand_request_id(),
root_hash: state_root,
account_hashes: vec![account_hash_val],
starting_hash: slot_cursor,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_storage_ranges(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::StorageRanges(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidStorageRange(
"unexpected response type".into(),
));
}
};
if msg.slots.is_empty() || msg.slots[0].is_empty() {
break;
}
let slots = &msg.slots[0];
for slot in slots {
let value = U256::from_be_slice(&slot.data);
slot_batch.push((*account_hash, slot.hash, value));
self.progress.storage_slots_downloaded += 1;
}
// Update cursor
slot_cursor = increment_hash(slots.last().unwrap().hash);
// Flush if needed
if slot_batch.len() >= STORAGE_WRITE_BATCH_SIZE {
self.write_storage_slots(&slot_batch)?;
slot_batch.clear();
}
self.metrics
.storage_slots_downloaded
.set(self.progress.storage_slots_downloaded as f64);
// If no proof or we've reached the end of this account's storage
if msg.proof.is_empty() || slot_cursor == B256::ZERO {
break;
}
}
}
// Flush remaining
if !slot_batch.is_empty() {
self.write_storage_slots(&slot_batch)?;
}
Ok(code_hashes)
}
/// Writes a batch of storage slots to the database.
fn write_storage_slots(&self, batch: &[(B256, B256, U256)]) -> Result<(), SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (account_hash, slot_hash, value) in batch {
tx.put::<tables::HashedStorages>(
*account_hash,
StorageEntry { key: *slot_hash, value: *value },
)?;
}
provider.commit()?;
Ok(())
}
// ========================================================================
// Phase 3: Bytecode download
// ========================================================================
/// Downloads contract bytecodes by their code hashes.
async fn download_bytecodes(
&mut self,
code_hashes: HashSet<B256>,
) -> Result<(), SnapSyncError> {
// Collect code hashes from accounts we've already written
let mut all_code_hashes: Vec<B256> = self.collect_code_hashes()?;
all_code_hashes.extend(code_hashes);
all_code_hashes.sort();
all_code_hashes.dedup();
// Remove KECCAK_EMPTY since that means no code
all_code_hashes.retain(|h| *h != KECCAK_EMPTY);
self.progress.bytecodes_total = all_code_hashes.len() as u64;
info!(
target: "snap_sync",
total = all_code_hashes.len(),
"starting bytecode download"
);
// Download in batches
for chunk in all_code_hashes.chunks(BYTECODE_BATCH_SIZE) {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let hashes = chunk.to_vec();
let response = retry_snap_request(|| {
let req = GetByteCodesMessage {
request_id: rand_request_id(),
hashes: hashes.clone(),
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_byte_codes(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::ByteCodes(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidBytecode("unexpected response type".into()));
}
};
self.write_bytecodes(chunk, &msg.codes)?;
self.progress.bytecodes_downloaded += msg.codes.len() as u64;
self.metrics.bytecodes_downloaded.set(self.progress.bytecodes_downloaded as f64);
}
Ok(())
}
/// Collects code hashes from accounts already written to DB.
fn collect_code_hashes(&self) -> Result<Vec<B256>, SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
let mut cursor = tx.cursor_read::<tables::HashedAccounts>()?;
let mut code_hashes = Vec::new();
let mut entry = cursor.first()?;
while let Some((_, account)) = entry {
if let Some(hash) = account.bytecode_hash &&
hash != KECCAK_EMPTY
{
code_hashes.push(hash);
}
entry = cursor.next()?;
}
Ok(code_hashes)
}
/// Writes bytecodes to the database, verifying hash integrity.
fn write_bytecodes(
&self,
expected_hashes: &[B256],
codes: &[Bytes],
) -> Result<(), SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (i, code) in codes.iter().enumerate() {
let hash = keccak256(code);
if i < expected_hashes.len() && hash != expected_hashes[i] {
warn!(
target: "snap_sync",
expected = %expected_hashes[i],
got = %hash,
"bytecode hash mismatch, skipping"
);
continue;
}
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
tx.put::<tables::Bytecodes>(hash, bytecode)?;
}
provider.commit()?;
Ok(())
}
}
/// Decodes a "slim" account body from the snap protocol into a `TrieAccount`.
///
/// Slim format is `RLP([nonce, balance, storage_root, code_hash])` but with
/// empty `storage_root` and `code_hash` omitted.
fn decode_slim_account(data: &Bytes) -> Result<TrieAccount, SnapSyncError> {
// The snap protocol uses "slim" encoding where empty values are omitted.
// We need to decode the RLP and fill in defaults for missing fields.
let account = TrieAccount::decode(&mut data.as_ref())?;
Ok(account)
}
/// Increments a hash by 1. Returns `B256::ZERO` on overflow (wraps around).
fn increment_hash(hash: B256) -> B256 {
let mut bytes = hash.0;
for i in (0..32).rev() {
if bytes[i] < 0xFF {
bytes[i] += 1;
return B256::from(bytes);
}
bytes[i] = 0;
}
B256::ZERO
}
/// Generates a random request ID.
fn rand_request_id() -> u64 {
rand::random()
}
/// Maximum number of retries for a snap request before giving up.
const MAX_SNAP_RETRIES: u32 = 10;
/// Retries a snap request with exponential backoff on failure.
async fn retry_snap_request<F, Fut, T>(
mut make_request: F,
metrics: &SnapSyncMetrics,
cancel_rx: &watch::Receiver<bool>,
) -> Result<T, SnapSyncError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, reth_network_p2p::error::RequestError>>,
{
let mut attempts = 0u32;
loop {
if *cancel_rx.borrow() {
return Err(SnapSyncError::Cancelled);
}
match make_request().await {
Ok(resp) => return Ok(resp),
Err(err) => {
attempts += 1;
metrics.request_failures.increment(1);
if attempts >= MAX_SNAP_RETRIES {
return Err(SnapSyncError::Request(err));
}
let delay = std::time::Duration::from_secs(1 << attempts.min(5));
warn!(
target: "snap_sync",
%err,
attempts,
"snap request failed, retrying in {:?}",
delay
);
tokio::time::sleep(delay).await;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_rlp::Encodable;
#[test]
fn test_increment_hash() {
let zero = B256::ZERO;
let one = increment_hash(zero);
assert_eq!(one.0[31], 1);
let max = B256::repeat_byte(0xFF);
let wrapped = increment_hash(max);
assert_eq!(wrapped, B256::ZERO);
let mut mid = B256::ZERO;
mid.0[31] = 0xFF;
let next = increment_hash(mid);
assert_eq!(next.0[30], 1);
assert_eq!(next.0[31], 0);
}
#[test]
fn test_decode_slim_account() {
let trie_account = TrieAccount {
nonce: 42,
balance: U256::from(1000),
storage_root: alloy_trie::EMPTY_ROOT_HASH,
code_hash: KECCAK_EMPTY,
};
let mut buf = Vec::new();
trie_account.encode(&mut buf);
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
assert_eq!(decoded.nonce, 42);
assert_eq!(decoded.balance, U256::from(1000));
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
}
#[test]
fn test_decode_slim_account_empty() {
let trie_account = TrieAccount {
nonce: 0,
balance: U256::ZERO,
storage_root: alloy_trie::EMPTY_ROOT_HASH,
code_hash: KECCAK_EMPTY,
};
let mut buf = Vec::new();
trie_account.encode(&mut buf);
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
assert_eq!(decoded.nonce, 0);
assert_eq!(decoded.balance, U256::ZERO);
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
}
}

View File

@@ -1,52 +0,0 @@
//! Snap sync error types.
use alloy_primitives::B256;
use reth_db_api::DatabaseError;
use reth_network_p2p::error::RequestError;
use reth_storage_errors::provider::ProviderError;
/// Errors that can occur during snap sync.
#[derive(Debug, thiserror::Error)]
pub enum SnapSyncError {
/// The computed state root does not match the pivot block's state root.
#[error("state root mismatch: expected {expected}, got {got}")]
StateRootMismatch {
/// Expected state root from pivot header.
expected: B256,
/// Computed state root after snap sync.
got: B256,
},
/// A peer returned an invalid or inconsistent account range.
#[error("invalid account range response: {0}")]
InvalidAccountRange(String),
/// A peer returned an invalid or inconsistent storage range.
#[error("invalid storage range response: {0}")]
InvalidStorageRange(String),
/// A peer returned invalid bytecodes.
#[error("invalid bytecode response: {0}")]
InvalidBytecode(String),
/// No peers available that support the snap protocol.
#[error("no snap-capable peers available")]
NoPeers,
/// Network request failed.
#[error("network request failed: {0}")]
Request(#[from] RequestError),
/// Database/provider error.
#[error("provider error: {0}")]
Provider(#[from] ProviderError),
/// Pivot block not found.
#[error("pivot block {0} not found")]
PivotNotFound(B256),
/// RLP decoding error.
#[error("rlp decode error: {0}")]
RlpDecode(#[from] alloy_rlp::Error),
/// Database error.
#[error("database error: {0}")]
Database(#[from] DatabaseError),
/// State root verification failed.
#[error("state root verification error: {0}")]
StateRootVerification(String),
/// Snap sync was cancelled.
#[error("snap sync cancelled")]
Cancelled,
}

View File

@@ -1,26 +0,0 @@
//! Snap sync protocol implementation for reth.
//!
//! Downloads state from peers via the [snap protocol](https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
//! instead of executing all historical blocks. The sync proceeds in phases:
//!
//! 1. **Account download**: Fetch all account leaves via `GetAccountRange`
//! 2. **Storage download**: Fetch storage slots for accounts with non-empty storage roots
//! 3. **Bytecode download**: Fetch contract bytecodes by code hash
//! 4. **State root verification**: Build hashed state + merkle trie, verify against pivot block
//!
//! After snap sync completes, normal execution resumes from the pivot block onward.
pub mod config;
pub mod downloader;
pub mod error;
pub mod metrics;
pub mod progress;
pub mod server;
pub mod task;
pub use config::SnapSyncConfig;
pub use downloader::SnapSyncDownloader;
pub use error::SnapSyncError;
pub use progress::{SnapPhase, SnapProgress};
pub use server::{IncomingSnapRequest, SnapRequestHandler};
pub use task::run_snap_sync;

View File

@@ -1,21 +0,0 @@
//! Snap sync metrics.
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
#[derive(Metrics)]
#[metrics(scope = "snap_sync")]
pub(crate) struct SnapSyncMetrics {
/// Number of accounts downloaded.
pub(crate) accounts_downloaded: Gauge,
/// Number of storage slots downloaded.
pub(crate) storage_slots_downloaded: Gauge,
/// Number of bytecodes downloaded.
pub(crate) bytecodes_downloaded: Gauge,
/// Current phase (0=idle, 1=accounts, 2=storages, 3=bytecodes, 4=verify, 5=done).
pub(crate) phase: Gauge,
/// Total peer request failures.
pub(crate) request_failures: Counter,
}

View File

@@ -1,87 +0,0 @@
//! Snap sync progress tracking.
//!
//! Tracks the current phase and cursor positions to support resumability.
use alloy_primitives::{Address, B256};
/// Current phase of snap sync.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SnapPhase {
/// Not started yet.
#[default]
Idle,
/// Downloading account ranges.
Accounts,
/// Downloading storage slots for accounts with non-empty storage roots.
Storages,
/// Downloading contract bytecodes.
Bytecodes,
/// Building hashed state and verifying merkle root.
Verification,
/// Snap sync completed successfully.
Done,
}
/// Tracks snap sync progress for resumability.
#[derive(Debug, Clone, Default)]
pub struct SnapProgress {
/// The pivot block hash.
pub pivot_hash: B256,
/// The pivot block number.
pub pivot_number: u64,
/// The pivot block's state root.
pub state_root: B256,
/// Current sync phase.
pub phase: SnapPhase,
/// Account download cursor: next account hash to fetch.
pub account_cursor: B256,
/// Number of accounts downloaded so far.
pub accounts_downloaded: u64,
/// Storage download cursor: current account address being fetched.
pub storage_account_cursor: Option<Address>,
/// Storage slot cursor within the current account.
pub storage_slot_cursor: B256,
/// Number of storage slots downloaded so far.
pub storage_slots_downloaded: u64,
/// Number of bytecodes downloaded so far.
pub bytecodes_downloaded: u64,
/// Total number of bytecodes to download.
pub bytecodes_total: u64,
}
impl SnapProgress {
/// Creates a new progress tracker for the given pivot.
pub fn new(pivot_hash: B256, pivot_number: u64, state_root: B256) -> Self {
Self {
pivot_hash,
pivot_number,
state_root,
phase: SnapPhase::Accounts,
..Default::default()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_progress_new() {
let hash = B256::repeat_byte(0x01);
let state_root = B256::repeat_byte(0x02);
let progress = SnapProgress::new(hash, 100, state_root);
assert_eq!(progress.pivot_hash, hash);
assert_eq!(progress.pivot_number, 100);
assert_eq!(progress.state_root, state_root);
assert_eq!(progress.phase, SnapPhase::Accounts);
assert_eq!(progress.accounts_downloaded, 0);
assert_eq!(progress.storage_slots_downloaded, 0);
assert_eq!(progress.bytecodes_downloaded, 0);
}
#[test]
fn test_phase_default() {
assert_eq!(SnapPhase::default(), SnapPhase::Idle);
}
}

View File

@@ -1,480 +0,0 @@
//! Snap sync request handler (server-side).
//!
//! Handles incoming snap protocol requests from peers, serving account ranges,
//! storage ranges, bytecodes, and trie nodes from the local database.
//!
//! Modeled after [`EthRequestHandler`](reth_network::eth_requests::EthRequestHandler).
use alloy_consensus::constants::KECCAK_EMPTY;
use alloy_primitives::Bytes;
use alloy_rlp::Encodable;
use alloy_trie::EMPTY_ROOT_HASH;
use futures::StreamExt;
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
tables,
transaction::DbTx,
};
use reth_eth_wire_types::snap::*;
use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_primitives_traits::Account;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tracing::{debug, trace};
/// Maximum number of accounts to serve per request.
const MAX_ACCOUNTS_SERVE: usize = 1024;
/// Maximum number of storage slots to serve per account per request.
const MAX_STORAGE_SERVE: usize = 1024;
/// Maximum number of bytecodes to serve per request.
const MAX_BYTECODES_SERVE: usize = 1024;
/// Maximum response size (2MB, matching eth limit).
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
/// Handles incoming snap protocol requests from peers.
///
/// This is spawned as a background service and polled to process requests.
#[derive(Debug)]
#[must_use = "Handler does nothing unless polled."]
pub struct SnapRequestHandler<F> {
/// Provider factory for DB access.
provider_factory: F,
/// Incoming snap requests.
incoming_requests: tokio_stream::wrappers::ReceiverStream<IncomingSnapRequest>,
}
impl<F> SnapRequestHandler<F> {
/// Create a new instance.
pub fn new(
provider_factory: F,
incoming: tokio::sync::mpsc::Receiver<IncomingSnapRequest>,
) -> Self {
Self {
provider_factory,
incoming_requests: tokio_stream::wrappers::ReceiverStream::new(incoming),
}
}
}
impl<F> SnapRequestHandler<F>
where
F: DatabaseProviderFactory,
{
/// Handle a `GetAccountRange` request.
fn on_account_range_request(
&self,
peer_id: PeerId,
request: GetAccountRangeMessage,
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
) {
trace!(target: "net::snap", ?peer_id, ?request.starting_hash, ?request.limit_hash, "Received GetAccountRange");
let mut accounts = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
proof: vec![],
}));
return;
};
let tx = provider.tx_ref();
let Ok(mut cursor) = tx.cursor_read::<tables::HashedAccounts>() else {
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
proof: vec![],
}));
return;
};
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
if let Ok(walker) = cursor.walk(Some(request.starting_hash)) {
for entry in walker {
let Ok((hash, account)) = entry else { break };
if hash > request.limit_hash {
break;
}
let slim = encode_account(&account);
total_bytes += 32 + slim.len();
accounts.push(AccountData { hash, body: slim });
if accounts.len() >= MAX_ACCOUNTS_SERVE || total_bytes >= limit {
break;
}
}
}
trace!(target: "net::snap", ?peer_id, num_accounts = accounts.len(), total_bytes, "Serving GetAccountRange");
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
// TODO: add merkle proofs
proof: vec![],
}));
}
/// Handle a `GetStorageRanges` request.
fn on_storage_ranges_request(
&self,
peer_id: PeerId,
request: GetStorageRangesMessage,
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
) {
trace!(target: "net::snap", ?peer_id, num_accounts = request.account_hashes.len(), "Received GetStorageRanges");
let mut all_slots = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
proof: vec![],
}));
return;
};
let tx = provider.tx_ref();
let Ok(mut cursor) = tx.cursor_dup_read::<tables::HashedStorages>() else {
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
proof: vec![],
}));
return;
};
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
for (i, account_hash) in request.account_hashes.iter().enumerate() {
let mut account_slots = Vec::new();
// For the first account, use the request's starting_hash.
// For subsequent accounts, start from the beginning.
let start = if i == 0 { request.starting_hash } else { Default::default() };
if let Ok(walker) = cursor.walk_dup(Some(*account_hash), Some(start)) {
for entry in walker {
let Ok((_, storage_entry)) = entry else { break };
if storage_entry.key > request.limit_hash {
break;
}
let mut value_buf = Vec::new();
storage_entry.value.encode(&mut value_buf);
total_bytes += 32 + value_buf.len();
account_slots.push(StorageData {
hash: storage_entry.key,
data: Bytes::from(value_buf),
});
if account_slots.len() >= MAX_STORAGE_SERVE || total_bytes >= limit {
break;
}
}
}
all_slots.push(account_slots);
if total_bytes >= limit {
break;
}
}
trace!(target: "net::snap", ?peer_id, num_accounts = all_slots.len(), total_bytes, "Serving GetStorageRanges");
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
// TODO: add boundary proofs for partial ranges
proof: vec![],
}));
}
/// Handle a `GetByteCodes` request.
fn on_byte_codes_request(
&self,
peer_id: PeerId,
request: GetByteCodesMessage,
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
) {
trace!(target: "net::snap", ?peer_id, num_hashes = request.hashes.len(), "Received GetByteCodes");
let mut codes = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
return;
};
let tx = provider.tx_ref();
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
for hash in &request.hashes {
if *hash == KECCAK_EMPTY {
continue;
}
let Ok(Some(bytecode)) = tx.get::<tables::Bytecodes>(*hash) else {
continue;
};
let raw = bytecode.original_bytes();
total_bytes += raw.len();
codes.push(raw);
if codes.len() >= MAX_BYTECODES_SERVE || total_bytes >= limit {
break;
}
}
trace!(target: "net::snap", ?peer_id, num_codes = codes.len(), total_bytes, "Serving GetByteCodes");
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
}
/// Handle a `GetTrieNodes` request.
fn on_trie_nodes_request(
&self,
peer_id: PeerId,
request: GetTrieNodesMessage,
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
) {
debug!(target: "net::snap", ?peer_id, num_paths = request.paths.len(), "Received GetTrieNodes (stub)");
// TODO: implement trie node lookups from AccountsTrie / StoragesTrie tables
let _ =
response.send(Ok(TrieNodesMessage { request_id: request.request_id, nodes: vec![] }));
}
}
/// Encode an [`Account`] into slim RLP format (as `TrieAccount`).
///
/// Accounts in the snap protocol are exchanged as RLP-encoded `TrieAccount`.
/// Since we don't know the true storage root when reading from `HashedAccounts`,
/// we use `EMPTY_ROOT_HASH` as a placeholder.
fn encode_account(account: &Account) -> Bytes {
let trie_account = account.into_trie_account(EMPTY_ROOT_HASH);
let mut buf = Vec::new();
trie_account.encode(&mut buf);
Bytes::from(buf)
}
/// Incoming snap request variants delegated by the network.
#[derive(Debug)]
pub enum IncomingSnapRequest {
/// Request for an account range.
GetAccountRange {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetAccountRangeMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
},
/// Request for storage slot ranges.
GetStorageRanges {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetStorageRangesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
},
/// Request for contract bytecodes.
GetByteCodes {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetByteCodesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
},
/// Request for trie nodes.
GetTrieNodes {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetTrieNodesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
},
}
impl<F> Future for SnapRequestHandler<F>
where
F: DatabaseProviderFactory + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match this.incoming_requests.poll_next_unpin(cx) {
Poll::Ready(Some(incoming)) => match incoming {
IncomingSnapRequest::GetAccountRange { peer_id, request, response } => {
this.on_account_range_request(peer_id, request, response);
}
IncomingSnapRequest::GetStorageRanges { peer_id, request, response } => {
this.on_storage_ranges_request(peer_id, request, response);
}
IncomingSnapRequest::GetByteCodes { peer_id, request, response } => {
this.on_byte_codes_request(peer_id, request, response);
}
IncomingSnapRequest::GetTrieNodes { peer_id, request, response } => {
this.on_trie_nodes_request(peer_id, request, response);
}
},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{keccak256, B256, U256};
use alloy_rlp::Decodable;
use reth_db_api::transaction::DbTxMut;
use reth_provider::test_utils::create_test_provider_factory;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use tokio::sync::mpsc;
#[test]
fn test_encode_account_roundtrip() {
let account = Account {
nonce: 10,
balance: U256::from(500),
bytecode_hash: Some(B256::repeat_byte(0xAB)),
};
let encoded = encode_account(&account);
assert!(!encoded.is_empty());
let decoded = alloy_trie::TrieAccount::decode(&mut encoded.as_ref()).unwrap();
assert_eq!(decoded.nonce, 10);
assert_eq!(decoded.balance, U256::from(500));
assert_eq!(decoded.code_hash, B256::repeat_byte(0xAB));
}
#[tokio::test]
async fn test_account_range_empty_db() {
let factory = create_test_provider_factory();
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetAccountRange {
peer_id: PeerId::random(),
request: GetAccountRangeMessage {
request_id: 1,
root_hash: B256::ZERO,
starting_hash: B256::ZERO,
limit_hash: B256::repeat_byte(0xFF),
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
assert!(result.accounts.is_empty());
drop(tx);
handle.await.unwrap();
}
#[tokio::test]
async fn test_byte_codes_request() {
let factory = create_test_provider_factory();
// Write bytecodes into the DB
let code = Bytes::from(vec![0x60, 0x00, 0x60, 0x00, 0xFD]); // PUSH0 PUSH0 REVERT
let code_hash = keccak256(&code);
{
let provider = factory.database_provider_rw().unwrap();
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
provider.tx_ref().put::<tables::Bytecodes>(code_hash, bytecode).unwrap();
provider.commit().unwrap();
}
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetByteCodes {
peer_id: PeerId::random(),
request: GetByteCodesMessage {
request_id: 2,
hashes: vec![code_hash],
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
assert_eq!(result.codes.len(), 1);
assert_eq!(result.codes[0], code);
drop(tx);
handle.await.unwrap();
}
#[tokio::test]
async fn test_storage_ranges_empty() {
let factory = create_test_provider_factory();
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetStorageRanges {
peer_id: PeerId::random(),
request: GetStorageRangesMessage {
request_id: 3,
root_hash: B256::ZERO,
account_hashes: vec![B256::repeat_byte(0x01)],
starting_hash: B256::ZERO,
limit_hash: B256::repeat_byte(0xFF),
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
// One entry per requested account, but the inner vec is empty since no storage exists
assert_eq!(result.slots.len(), 1);
assert!(result.slots[0].is_empty());
drop(tx);
handle.await.unwrap();
}
}

View File

@@ -1,46 +0,0 @@
//! Snap sync task for integration with the node builder.
//!
//! Provides a standalone async function that runs snap sync to completion,
//! suitable for spawning as a background task before the pipeline starts.
use crate::{downloader::SnapSyncDownloader, error::SnapSyncError};
use alloy_primitives::B256;
use reth_db_api::transaction::{DbTx, DbTxMut};
use reth_network_p2p::snap::client::SnapClient;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use tracing::info;
/// Runs snap sync to completion, returning the pivot block number on success.
///
/// This is the main entry point for integrating snap sync with the node builder.
/// It creates a [`SnapSyncDownloader`], runs all phases (account download, storage
/// download, bytecode download, state root verification), and returns the pivot
/// block number that the pipeline should resume from.
pub async fn run_snap_sync<C, F>(
client: C,
provider_factory: F,
pivot_hash: B256,
pivot_number: u64,
state_root: B256,
) -> Result<u64, SnapSyncError>
where
C: SnapClient + Clone + Send + Sync + 'static,
F: DatabaseProviderFactory + Send + Sync + 'static,
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
{
let (_, cancel_rx) = tokio::sync::watch::channel(false);
let mut downloader = SnapSyncDownloader::new(
client,
provider_factory,
pivot_hash,
pivot_number,
state_root,
cancel_rx,
);
info!(target: "snap::sync", pivot_number, %pivot_hash, "Starting snap sync");
downloader.run().await?;
info!(target: "snap::sync", pivot_number, "Snap sync completed successfully");
Ok(pivot_number)
}

View File

@@ -98,7 +98,7 @@ pub trait FullNodeComponents: FullNodeTypes + Clone + 'static {
/// Returns an executor handle to spawn tasks.
///
/// This can be used to spawn critical, blocking tasks or register tasks that should be
/// terminated gracefully.
/// terminated gracefully. See also [`TaskSpawner`](reth_tasks::TaskSpawner).
fn task_executor(&self) -> &TaskExecutor;
}

View File

@@ -24,6 +24,7 @@ reth-db-common.workspace = true
reth-downloaders.workspace = true
reth-engine-local.workspace = true
reth-engine-primitives.workspace = true
reth-engine-service.workspace = true
reth-engine-tree.workspace = true
reth-engine-util.workspace = true
reth-evm.workspace = true
@@ -54,7 +55,6 @@ reth-tokio-util.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-snap-sync.workspace = true
reth-basic-payload-builder.workspace = true
reth-node-ethstats.workspace = true

View File

@@ -986,7 +986,7 @@ impl<Node: FullNodeTypes<Types: NodeTypes<ChainSpec: Hardforks>>> BuilderContext
secret_key,
default_peers_path,
)
.with_task_executor(self.executor.clone())
.with_task_executor(Box::new(self.executor.clone()))
.set_head(self.head);
Ok(builder)

Some files were not shown because too many files have changed in this diff Show More