mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
77 Commits
dan/storag
...
staging
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f09a92acc | ||
|
|
df8ba50cb7 | ||
|
|
61341a1342 | ||
|
|
4a62d38af2 | ||
|
|
dc4f249f09 | ||
|
|
c915841a45 | ||
|
|
217a337d8c | ||
|
|
74d57008b6 | ||
|
|
f8767bc678 | ||
|
|
81c83bba68 | ||
|
|
cd8ec58703 | ||
|
|
931b17c3fd | ||
|
|
807d328cf0 | ||
|
|
8a6bbd29fe | ||
|
|
8bedaaee71 | ||
|
|
09cd105671 | ||
|
|
a0b60b7e64 | ||
|
|
90e15d096d | ||
|
|
a161ca294f | ||
|
|
3a5c41e3da | ||
|
|
968d3c9534 | ||
|
|
fc6666f6a7 | ||
|
|
ff3a854326 | ||
|
|
04543ed16b | ||
|
|
ae3f0d4d1a | ||
|
|
5bccdc4a5d | ||
|
|
0b7cd60668 | ||
|
|
aa983b49af | ||
|
|
2aff617767 | ||
|
|
2c5d00ffb5 | ||
|
|
e2a3527414 | ||
|
|
e4cb3d3aed | ||
|
|
079b7b9d57 | ||
|
|
8a25d7d3cf | ||
|
|
a5ced84098 | ||
|
|
59760a2fe3 | ||
|
|
b9d21f293e | ||
|
|
dec1cad318 | ||
|
|
165b94c3fa | ||
|
|
69e4c06ae7 | ||
|
|
1406a984a7 | ||
|
|
93d6b9782c | ||
|
|
68e4ff1f7d | ||
|
|
33467ea6dd | ||
|
|
3bf9280b3c | ||
|
|
5c93986e6d | ||
|
|
779e0eb8bb | ||
|
|
3d02124e50 | ||
|
|
953a1b3399 | ||
|
|
a364989c61 | ||
|
|
f4075f5926 | ||
|
|
211d3d2924 | ||
|
|
dbc5313a2c | ||
|
|
345fc9cfd2 | ||
|
|
33d61c30cb | ||
|
|
beb0c5e527 | ||
|
|
73c39279b1 | ||
|
|
2fd0a703e2 | ||
|
|
7dcd77de95 | ||
|
|
da6e6afe78 | ||
|
|
04d9a33c68 | ||
|
|
be1f657b3c | ||
|
|
cab4cbf0ea | ||
|
|
29f51abb22 | ||
|
|
dd0ee0709c | ||
|
|
dcd9e50663 | ||
|
|
59d11378b9 | ||
|
|
70ceb755ad | ||
|
|
a49991c766 | ||
|
|
0a77c2aae1 | ||
|
|
93adbf82a0 | ||
|
|
ce08b6f265 | ||
|
|
b03a704a1e | ||
|
|
a51e03fce6 | ||
|
|
57d7c98f66 | ||
|
|
5d9a43f2d4 | ||
|
|
defd0e8e5c |
5
.changelog/dry-ducks-write.md
Normal file
5
.changelog/dry-ducks-write.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.
|
||||
5
.changelog/easy-clouds-meow.md
Normal file
5
.changelog/easy-clouds-meow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
ef-tests: patch
|
||||
---
|
||||
|
||||
Removed reth-stateless crate and stateless validation from ef-tests.
|
||||
4
.changelog/fast-fish-cry.md
Normal file
4
.changelog/fast-fish-cry.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Added WebSocket subscription integration tests for eth_subscribe.
|
||||
7
.changelog/icy-lions-slide.md
Normal file
7
.changelog/icy-lions-slide.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
reth: patch
|
||||
reth-cli-commands: patch
|
||||
reth-node-core: patch
|
||||
---
|
||||
|
||||
Removed experimental ress protocol support for stateless Ethereum nodes.
|
||||
5
.changelog/lazy-lakes-shout.md
Normal file
5
.changelog/lazy-lakes-shout.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-node-builder: patch
|
||||
---
|
||||
|
||||
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.
|
||||
5
.changelog/nice-waves-bow.md
Normal file
5
.changelog/nice-waves-bow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-primitives: patch
|
||||
---
|
||||
|
||||
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.
|
||||
5
.changelog/quiet-frogs-whisper.md
Normal file
5
.changelog/quiet-frogs-whisper.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-provider: patch
|
||||
---
|
||||
|
||||
Removed unused staging types from ProviderFactoryBuilder.
|
||||
4
.changelog/vast-waves-fold.md
Normal file
4
.changelog/vast-waves-fold.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.
|
||||
5
.changelog/warm-foxes-glow.md
Normal file
5
.changelog/warm-foxes-glow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).
|
||||
1
.github/scripts/check_rv32imac.sh
vendored
1
.github/scripts/check_rv32imac.sh
vendored
@@ -27,7 +27,6 @@ crates_to_check=(
|
||||
reth-ethereum-forks
|
||||
reth-ethereum-primitives
|
||||
reth-ethereum-consensus
|
||||
reth-stateless
|
||||
)
|
||||
|
||||
any_failed=0
|
||||
|
||||
1
.github/scripts/check_wasm.sh
vendored
1
.github/scripts/check_wasm.sh
vendored
@@ -63,6 +63,7 @@ exclude_crates=(
|
||||
reth-provider # tokio
|
||||
reth-prune # tokio
|
||||
reth-prune-static-files # reth-provider
|
||||
reth-tasks # tokio rt-multi-thread
|
||||
reth-stages-api # reth-provider, reth-prune
|
||||
reth-static-file # tokio
|
||||
reth-transaction-pool # c-kzg
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -35,6 +35,7 @@ jobs:
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak" \
|
||||
--workspace \
|
||||
--exclude 'example-*' \
|
||||
@@ -61,6 +62,7 @@ jobs:
|
||||
- name: Run RocksDB e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
3
.github/workflows/integration.yml
vendored
3
.github/workflows/integration.yml
vendored
@@ -46,6 +46,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
@@ -76,4 +77,4 @@ jobs:
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: run era1 files integration tests
|
||||
run: cargo nextest run --release --package reth-era --test it -- --ignored
|
||||
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored
|
||||
|
||||
3
.github/workflows/unit.yml
vendored
3
.github/workflows/unit.yml
vendored
@@ -49,6 +49,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
|
||||
${{ matrix.exclude_args }} --workspace \
|
||||
--exclude ef-tests --no-tests=warn \
|
||||
@@ -87,7 +88,7 @@ jobs:
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
|
||||
672
Cargo.lock
generated
672
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
43
Cargo.toml
43
Cargo.toml
@@ -83,8 +83,6 @@ members = [
|
||||
"crates/prune/db",
|
||||
"crates/prune/prune",
|
||||
"crates/prune/types",
|
||||
"crates/ress/protocol",
|
||||
"crates/ress/provider",
|
||||
"crates/revm/",
|
||||
"crates/rpc/ipc/",
|
||||
"crates/rpc/rpc-api/",
|
||||
@@ -101,7 +99,6 @@ members = [
|
||||
"crates/stages/api/",
|
||||
"crates/stages/stages/",
|
||||
"crates/stages/types/",
|
||||
"crates/stateless",
|
||||
"crates/static-file/static-file",
|
||||
"crates/static-file/types/",
|
||||
"crates/storage/codecs/",
|
||||
@@ -309,6 +306,11 @@ inherits = "release"
|
||||
lto = "fat"
|
||||
codegen-units = 1
|
||||
|
||||
[profile.maxperf-symbols]
|
||||
inherits = "maxperf"
|
||||
debug = "full"
|
||||
strip = "none"
|
||||
|
||||
[profile.reproducible]
|
||||
inherits = "release"
|
||||
panic = "abort"
|
||||
@@ -417,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
|
||||
reth-stages = { path = "crates/stages/stages" }
|
||||
reth-stages-api = { path = "crates/stages/api" }
|
||||
reth-stages-types = { path = "crates/stages/types", default-features = false }
|
||||
reth-stateless = { path = "crates/stateless", default-features = false }
|
||||
reth-static-file = { path = "crates/static-file/static-file" }
|
||||
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
|
||||
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
|
||||
@@ -434,8 +435,6 @@ reth-trie-db = { path = "crates/trie/db" }
|
||||
reth-trie-parallel = { path = "crates/trie/parallel" }
|
||||
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
|
||||
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
|
||||
reth-ress-protocol = { path = "crates/ress/protocol" }
|
||||
reth-ress-provider = { path = "crates/ress/provider" }
|
||||
|
||||
# revm
|
||||
revm = { version = "34.0.0", default-features = false }
|
||||
@@ -449,15 +448,15 @@ op-revm = { version = "15.0.0", default-features = false }
|
||||
revm-inspectors = "0.34.2"
|
||||
|
||||
# eth
|
||||
alloy-dyn-abi = "1.5.4"
|
||||
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-sol-types = { version = "1.5.4", default-features = false }
|
||||
alloy-dyn-abi = "1.5.6"
|
||||
alloy-primitives = { version = "1.5.6", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-sol-types = { version = "1.5.6", default-features = false }
|
||||
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
@@ -548,7 +547,7 @@ strum_macros = "0.27"
|
||||
syn = "2.0"
|
||||
thiserror = { version = "2.0.0", default-features = false }
|
||||
tar = "0.4.44"
|
||||
tracing = { version = "0.1.0", default-features = false }
|
||||
tracing = { version = "0.1.0", default-features = false, features = ["attributes"] }
|
||||
tracing-appender = "0.2"
|
||||
url = { version = "2.3", default-features = false }
|
||||
zstd = "0.13"
|
||||
@@ -712,6 +711,28 @@ vergen-git2 = "9.1.0"
|
||||
ipnet = "2.11"
|
||||
|
||||
[patch.crates-io]
|
||||
# revm staging patches
|
||||
revm = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
op-revm = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-bytecode = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-context = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-context-interface = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-database = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-database-interface = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-handler = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-inspector = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-interpreter = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-precompile = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-primitives = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
revm-state = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
|
||||
|
||||
# revm-inspectors staging patch
|
||||
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "e80e2eab72dfa18011e6a99abd37027290a46e83" }
|
||||
|
||||
# alloy-evm staging patches
|
||||
alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "742dc14749ea0279c03ca27b1c26f26ac19fbefb" }
|
||||
alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "742dc14749ea0279c03ca27b1c26f26ac19fbefb" }
|
||||
|
||||
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
|
||||
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
|
||||
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
|
||||
|
||||
@@ -51,7 +51,8 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
|
||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache sccache --start-server && \
|
||||
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
|
||||
sccache --start-server && \
|
||||
if [ -n "$RUSTFLAGS" ]; then \
|
||||
export RUSTFLAGS="$RUSTFLAGS"; \
|
||||
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
|
||||
|
||||
4
Makefile
4
Makefile
@@ -153,7 +153,7 @@ COV_FILE := lcov.info
|
||||
.PHONY: test-unit
|
||||
test-unit: ## Run unit tests.
|
||||
cargo install cargo-nextest --locked
|
||||
cargo nextest run $(UNIT_TEST_ARGS)
|
||||
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
|
||||
|
||||
|
||||
.PHONY: cov-unit
|
||||
@@ -186,7 +186,7 @@ $(EEST_TESTS_DIR):
|
||||
|
||||
.PHONY: ef-tests
|
||||
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
|
||||
cargo nextest run -p ef-tests --release --features ef-tests
|
||||
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
|
||||
|
||||
##@ reth-bench
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
|
||||
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
|
||||
|
||||
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
|
||||
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
|
||||
|
||||
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
|
||||
use crate::valid_payload::call_forkchoice_updated;
|
||||
use eyre::Result;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::{
|
||||
io::{BufReader, Read},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Read input from either a file path or stdin.
|
||||
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
|
||||
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
|
||||
let base: u64 = num_str.trim().parse()?;
|
||||
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
|
||||
}
|
||||
|
||||
/// Parses a duration string, treating bare integers as milliseconds.
|
||||
///
|
||||
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
|
||||
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
|
||||
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
|
||||
match humantime::parse_duration(s) {
|
||||
Ok(d) => Ok(d),
|
||||
Err(_) => {
|
||||
let millis: u64 =
|
||||
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
|
||||
Ok(Duration::from_millis(millis))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use alloy_consensus::Header;
|
||||
use alloy_eips::eip4844::kzg_to_versioned_hash;
|
||||
use alloy_primitives::{Address, B256};
|
||||
@@ -270,4 +289,24 @@ mod tests {
|
||||
assert!(parse_gas_limit("G").is_err());
|
||||
assert!(parse_gas_limit("-1G").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_with_unit() {
|
||||
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
|
||||
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
|
||||
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_bare_millis() {
|
||||
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
|
||||
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
|
||||
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_errors() {
|
||||
assert!(parse_duration("abc").is_err());
|
||||
assert!(parse_duration("").is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
use crate::{
|
||||
bench::{
|
||||
context::BenchContext,
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
@@ -25,7 +26,6 @@ use alloy_provider::Provider;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
@@ -40,6 +40,9 @@ pub struct Command {
|
||||
rpc_url: String,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -117,7 +120,7 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
@@ -131,7 +134,7 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
|
||||
@@ -154,12 +154,18 @@ impl PersistenceSubscription {
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
///
|
||||
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
|
||||
/// connection is not dropped during long MDBX commits that block the server from responding
|
||||
/// to pings.
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
persistence_timeout: Duration,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let ws_connect =
|
||||
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::{
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
@@ -30,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
@@ -78,6 +78,9 @@ pub struct Command {
|
||||
output: Option<PathBuf>,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -166,7 +169,7 @@ impl Command {
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
@@ -177,7 +180,7 @@ impl Command {
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
|
||||
@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
|
||||
reth-primitives.workspace = true
|
||||
reth-db = { workspace = true, features = ["mdbx"] }
|
||||
reth-provider.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-cli-runner.workspace = true
|
||||
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-node-core.workspace = true
|
||||
reth-ethereum-payload-builder.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-node-ethereum.workspace = true
|
||||
reth-node-builder.workspace = true
|
||||
reth-node-metrics.workspace = true
|
||||
reth-consensus.workspace = true
|
||||
reth-tokio-util.workspace = true
|
||||
reth-ress-protocol.workspace = true
|
||||
reth-ress-provider.workspace = true
|
||||
|
||||
# alloy
|
||||
alloy-primitives.workspace = true
|
||||
alloy-rpc-types = { workspace = true, features = ["engine"] }
|
||||
|
||||
# tracing
|
||||
tracing.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
||||
|
||||
# misc
|
||||
aquamarine.workspace = true
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
eyre.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
alloy-node-bindings = "1.6.3"
|
||||
alloy-provider = { workspace = true, features = ["reqwest"] }
|
||||
alloy-rpc-types-eth.workspace = true
|
||||
backon.workspace = true
|
||||
serde_json.workspace = true
|
||||
tempfile.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
toml.workspace = true
|
||||
|
||||
[features]
|
||||
default = [
|
||||
@@ -115,10 +113,12 @@ asm-keccak = [
|
||||
"reth-primitives/asm-keccak",
|
||||
"reth-ethereum-cli/asm-keccak",
|
||||
"reth-node-ethereum/asm-keccak",
|
||||
"alloy-primitives/asm-keccak",
|
||||
]
|
||||
keccak-cache-global = [
|
||||
"reth-node-core/keccak-cache-global",
|
||||
"reth-node-ethereum/keccak-cache-global",
|
||||
"alloy-primitives/keccak-cache-global",
|
||||
]
|
||||
jemalloc = [
|
||||
"reth-cli-util/jemalloc",
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
//! - `asm-keccak`: Replaces the default, pure-Rust implementation of Keccak256 with one implemented
|
||||
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
|
||||
//! details and supported targets.
|
||||
//! - `min-debug-logs`: Disables all logs below `debug` level.
|
||||
//!
|
||||
//! ### Allocator Features
|
||||
//!
|
||||
@@ -52,6 +51,9 @@
|
||||
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
|
||||
use alloy_primitives as _;
|
||||
|
||||
pub mod cli;
|
||||
|
||||
/// Re-exported utils.
|
||||
@@ -206,12 +208,9 @@ pub mod rpc {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ress subprotocol installation.
|
||||
pub mod ress;
|
||||
|
||||
// re-export for convenience
|
||||
#[doc(inline)]
|
||||
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
|
||||
pub use reth_cli_runner::{CliContext, CliRunner};
|
||||
|
||||
// for rendering diagrams
|
||||
use aquamarine as _;
|
||||
@@ -219,3 +218,4 @@ use aquamarine as _;
|
||||
// used in main
|
||||
use clap as _;
|
||||
use reth_cli_util as _;
|
||||
use tracing as _;
|
||||
|
||||
@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
|
||||
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||
|
||||
use clap::Parser;
|
||||
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
|
||||
use reth::cli::Cli;
|
||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||
use reth_node_builder::NodeHandle;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use tracing::info;
|
||||
|
||||
@@ -22,27 +21,12 @@ fn main() {
|
||||
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
|
||||
}
|
||||
|
||||
if let Err(err) =
|
||||
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let NodeHandle { node, node_exit_future } =
|
||||
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
|
||||
// Install ress subprotocol.
|
||||
if ress_args.enabled {
|
||||
install_ress_subprotocol(
|
||||
ress_args,
|
||||
node.provider,
|
||||
node.evm_config,
|
||||
node.network,
|
||||
node.task_executor,
|
||||
node.add_ons_handle.engine_events.new_listener(),
|
||||
)?;
|
||||
}
|
||||
|
||||
node_exit_future.await
|
||||
})
|
||||
{
|
||||
handle.wait_for_node_exit().await
|
||||
}) {
|
||||
eprintln!("Error: {err:?}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
|
||||
use reth_network_api::FullNetwork;
|
||||
use reth_node_api::ConsensusEngineEvent;
|
||||
use reth_node_core::args::RessArgs;
|
||||
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
|
||||
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
|
||||
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tokio_util::EventStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::*;
|
||||
|
||||
/// Install `ress` subprotocol if it's enabled.
|
||||
pub fn install_ress_subprotocol<P, E, N>(
|
||||
args: RessArgs,
|
||||
provider: BlockchainProvider<P>,
|
||||
evm_config: E,
|
||||
network: N,
|
||||
task_executor: TaskExecutor,
|
||||
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
P: ProviderNodeTypes<Primitives = EthPrimitives>,
|
||||
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
|
||||
N: FullNetwork + NetworkProtocols,
|
||||
{
|
||||
info!(target: "reth::cli", "Installing ress subprotocol");
|
||||
let pending_state = PendingState::default();
|
||||
|
||||
// Spawn maintenance task for pending state.
|
||||
task_executor.spawn(maintain_pending_state(
|
||||
engine_events,
|
||||
provider.clone(),
|
||||
pending_state.clone(),
|
||||
));
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
let provider = RethRessProtocolProvider::new(
|
||||
provider,
|
||||
evm_config,
|
||||
Box::new(task_executor.clone()),
|
||||
args.max_witness_window,
|
||||
args.witness_max_parallel,
|
||||
args.witness_cache_size,
|
||||
pending_state,
|
||||
)?;
|
||||
network.add_rlpx_sub_protocol(
|
||||
RessProtocolHandler {
|
||||
provider,
|
||||
node_type: NodeType::Stateful,
|
||||
peers_handle: network.peers_handle().clone(),
|
||||
max_active_connections: args.max_active_connections,
|
||||
state: ProtocolState::new(tx),
|
||||
}
|
||||
.into_rlpx_sub_protocol(),
|
||||
);
|
||||
info!(target: "reth::cli", "Ress subprotocol support enabled");
|
||||
|
||||
task_executor.spawn(async move {
|
||||
while let Some(event) = rx.recv().await {
|
||||
trace!(target: "reth::ress", ?event, "Received ress event");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
255
bin/reth/tests/it/main.rs
Normal file
255
bin/reth/tests/it/main.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use std::process::Command;
|
||||
|
||||
const RETH: &str = env!("CARGO_BIN_EXE_reth");
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
|
||||
///
|
||||
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
|
||||
/// binary startup don't pollute stdout-based assertions.
|
||||
#[track_caller]
|
||||
fn reth_ok(args: &[&str]) -> String {
|
||||
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
|
||||
stdout.into_owned()
|
||||
}
|
||||
|
||||
/// Spawns an isolated dev-mode reth node.
|
||||
///
|
||||
/// Discovery is disabled and peer limits are zeroed so the node is fully
|
||||
/// isolated. Each call gets a unique temporary data directory so that
|
||||
/// concurrent test runs never collide on the default `reth/dev/` path.
|
||||
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
|
||||
use alloy_node_bindings::Reth;
|
||||
|
||||
let datadir = tempfile::tempdir().expect("failed to create temp dir");
|
||||
|
||||
let instance = Reth::at(RETH)
|
||||
.dev()
|
||||
.disable_discovery()
|
||||
.data_dir(datadir.path())
|
||||
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
|
||||
.spawn();
|
||||
|
||||
// Return the TempDir alongside the instance so it lives as long as the node.
|
||||
(instance, datadir)
|
||||
}
|
||||
|
||||
// ── Original tests (from PR #22069) ──────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn help() {
|
||||
let stdout = reth_ok(&["--help"]);
|
||||
assert!(stdout.contains("Usage"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("node"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version() {
|
||||
let stdout = reth_ok(&["--version"]);
|
||||
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_help() {
|
||||
let stdout = reth_ok(&["node", "--help"]);
|
||||
assert!(stdout.contains("--dev"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("--http"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_subcommand() {
|
||||
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
|
||||
assert!(!output.status.success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_flag() {
|
||||
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(!output.status.success());
|
||||
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_eth_syncing() {
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let _syncing = provider.syncing().await.expect("eth_syncing failed");
|
||||
}
|
||||
|
||||
// ── Subcommand --help coverage ───────────────────────────────────────────────
|
||||
//
|
||||
// Every registered subcommand must produce valid --help output. This catches
|
||||
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
|
||||
// broken `help_message()` call) that would otherwise only surface when a user
|
||||
// runs the command.
|
||||
|
||||
#[test]
|
||||
fn init_help() {
|
||||
let stdout = reth_ok(&["init", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn init_state_help() {
|
||||
let stdout = reth_ok(&["init-state", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_help() {
|
||||
let stdout = reth_ok(&["import", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_era_help() {
|
||||
let stdout = reth_ok(&["import-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn export_era_help() {
|
||||
let stdout = reth_ok(&["export-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_help() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn db_help() {
|
||||
let stdout = reth_ok(&["db", "--help"]);
|
||||
assert!(stdout.contains("stats"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_help() {
|
||||
let stdout = reth_ok(&["stage", "--help"]);
|
||||
assert!(stdout.contains("run"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn p2p_help() {
|
||||
let stdout = reth_ok(&["p2p", "--help"]);
|
||||
assert!(stdout.contains("header"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_help() {
|
||||
let stdout = reth_ok(&["config", "--help"]);
|
||||
assert!(stdout.contains("--default"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prune_help() {
|
||||
let stdout = reth_ok(&["prune", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_help() {
|
||||
let stdout = reth_ok(&["download", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn re_execute_help() {
|
||||
let stdout = reth_ok(&["re-execute", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
// ── `config --default` outputs valid TOML ────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn config_default_valid_toml() {
|
||||
let stdout = reth_ok(&["config", "--default"]);
|
||||
|
||||
let parsed: toml::Value =
|
||||
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
|
||||
|
||||
// The default config must contain the [stages] table — this is the heart of
|
||||
// the pipeline configuration and its absence would indicate a serialization
|
||||
// regression.
|
||||
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
|
||||
}
|
||||
|
||||
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_mainnet_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis"]);
|
||||
|
||||
let genesis: serde_json::Value =
|
||||
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_sepolia_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
|
||||
|
||||
let genesis: serde_json::Value = serde_json::from_str(&stdout)
|
||||
.expect("dump-genesis --chain sepolia did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
|
||||
}
|
||||
|
||||
// ── Dev node: send transaction round-trip ────────────────────────────────────
|
||||
//
|
||||
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
|
||||
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
|
||||
// is required.
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_send_tx_and_mine() {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
use alloy_rpc_types_eth::TransactionRequest;
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
// Dev mode pre-funds the first dev account.
|
||||
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
|
||||
assert!(!accounts.is_empty(), "dev node should expose at least one account");
|
||||
|
||||
let sender = accounts[0];
|
||||
let recipient = Address::with_last_byte(0x42);
|
||||
|
||||
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
|
||||
|
||||
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
|
||||
|
||||
// In dev/instant-mine mode the node seals a block for each transaction, so
|
||||
// the receipt becomes available almost immediately.
|
||||
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
|
||||
|
||||
assert!(receipt.status(), "transaction should have succeeded");
|
||||
assert_eq!(receipt.to, Some(recipient));
|
||||
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
|
||||
|
||||
// Verify the transfer actually mutated state.
|
||||
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
|
||||
assert_eq!(balance, U256::from(1_000_000));
|
||||
}
|
||||
|
||||
const fn main() {}
|
||||
@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
|
||||
F: FnOnce(Self, CliRunner) -> R,
|
||||
{
|
||||
let cli = Self::parse_args()?;
|
||||
let runner = CliRunner::try_default_runtime()?;
|
||||
let runner = CliRunner::try_default_runtime()
|
||||
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
|
||||
Ok(cli.with_runner(f, runner))
|
||||
}
|
||||
|
||||
|
||||
@@ -127,10 +127,14 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
|
||||
/// Initializes environment according to [`AccessRights`] and returns an instance of
|
||||
/// [`Environment`].
|
||||
///
|
||||
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
|
||||
/// parallel storage I/O.
|
||||
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
{
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let sf_path = data_dir.static_files();
|
||||
@@ -186,7 +190,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
.build()?;
|
||||
|
||||
let provider_factory =
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
|
||||
if access.is_read_write() {
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
|
||||
@@ -207,6 +211,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
access: AccessRights,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
@@ -217,6 +222,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
self.chain.clone(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
runtime,
|
||||
)?
|
||||
.with_prune_modes(prune_modes.clone());
|
||||
|
||||
|
||||
@@ -70,23 +70,23 @@ pub enum Subcommands {
|
||||
State(state::Command),
|
||||
}
|
||||
|
||||
/// Initializes a provider factory with specified access rights, and then execute with the provided
|
||||
/// command
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
|
||||
/// Execute `db` command
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
|
||||
self,
|
||||
ctx: CliContext,
|
||||
) -> eyre::Result<()> {
|
||||
/// Initializes a provider factory with specified access rights, and then executes the
|
||||
/// provided command.
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let static_files_path = data_dir.static_files();
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Command {
|
||||
let executor = task_executor.clone();
|
||||
let pprof_dump_dir = data_dir.pprof_dumps();
|
||||
|
||||
let handle = task_executor.spawn_critical("metrics server", async move {
|
||||
let handle = task_executor.spawn_critical_task("metrics server", async move {
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
|
||||
@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
// Set up cancellation token for graceful shutdown on Ctrl+C
|
||||
let cancellation = CancellationToken::new();
|
||||
let cancellation_clone = cancellation.clone();
|
||||
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
|
||||
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
|
||||
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
|
||||
cancellation_clone.cancel();
|
||||
});
|
||||
|
||||
@@ -9,7 +9,10 @@ use reth_db_api::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_db_common::{
|
||||
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
|
||||
init::{
|
||||
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
|
||||
insert_genesis_storage_history,
|
||||
},
|
||||
DbTool,
|
||||
};
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
@@ -171,7 +174,7 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
None,
|
||||
)?;
|
||||
}
|
||||
StageEnum::AccountHistory | StageEnum::StorageHistory => {
|
||||
StageEnum::AccountHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
@@ -181,16 +184,29 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
|
||||
insert_genesis_account_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::StorageHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.storages_history_in_rocksdb {
|
||||
rocksdb.clear::<tables::StoragesHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
}
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
|
||||
|
||||
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
|
||||
insert_genesis_storage_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::TxLookup => {
|
||||
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
|
||||
@@ -37,12 +37,14 @@ where
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -57,12 +57,14 @@ where
|
||||
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -10,9 +10,10 @@
|
||||
|
||||
//! Entrypoint for running commands.
|
||||
|
||||
use reth_tasks::{TaskExecutor, TaskManager};
|
||||
use reth_tasks::{PanickedTaskError, TaskExecutor};
|
||||
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
|
||||
use tracing::{debug, error, trace};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
/// Executes CLI commands.
|
||||
///
|
||||
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
|
||||
#[derive(Debug)]
|
||||
pub struct CliRunner {
|
||||
config: CliRunnerConfig,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
runtime: reth_tasks::Runtime,
|
||||
}
|
||||
|
||||
impl CliRunner {
|
||||
/// Attempts to create a new [`CliRunner`] using the default tokio
|
||||
/// [`Runtime`](tokio::runtime::Runtime).
|
||||
/// Attempts to create a new [`CliRunner`] using the default
|
||||
/// [`Runtime`](reth_tasks::Runtime).
|
||||
///
|
||||
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
|
||||
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
|
||||
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
|
||||
}
|
||||
|
||||
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
|
||||
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
Self { config: CliRunnerConfig::new(), tokio_runtime }
|
||||
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
|
||||
pub fn try_with_runtime_config(
|
||||
config: reth_tasks::RuntimeConfig,
|
||||
) -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
|
||||
Ok(Self { config: CliRunnerConfig::default(), runtime })
|
||||
}
|
||||
|
||||
/// Sets the [`CliRunnerConfig`] for this runner.
|
||||
@@ -48,7 +52,7 @@ impl CliRunner {
|
||||
where
|
||||
F: Future<Output = T>,
|
||||
{
|
||||
self.tokio_runtime.block_on(fut)
|
||||
self.runtime.handle().block_on(fut)
|
||||
}
|
||||
|
||||
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
|
||||
@@ -64,12 +68,11 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Executes the command until it finished or ctrl-c was fired
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(command(context)),
|
||||
));
|
||||
|
||||
@@ -77,13 +80,13 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
// after the command has finished or exit signal was received we shutdown the task
|
||||
// manager which fires the shutdown signal to all tasks spawned via the task
|
||||
// after the command has finished or exit signal was received we shutdown the
|
||||
// runtime which fires the shutdown signal to all tasks spawned via the task
|
||||
// executor and awaiting on tasks spawned with graceful shutdown
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -99,17 +102,16 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Spawn the command on the blocking thread pool
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let command_handle =
|
||||
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
|
||||
|
||||
// Wait for the command to complete or ctrl-c
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(
|
||||
async move { command_handle.await.expect("Failed to join blocking task") },
|
||||
),
|
||||
@@ -119,10 +121,10 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -133,48 +135,40 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
|
||||
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes a regular future as a spawned blocking task until completion or until external
|
||||
/// signal received.
|
||||
///
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
|
||||
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
let tokio_runtime = self.tokio_runtime;
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
|
||||
tokio_runtime
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
|
||||
self.runtime
|
||||
.handle()
|
||||
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
|
||||
|
||||
tokio_shutdown(tokio_runtime, false);
|
||||
runtime_shutdown(self.runtime, false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// [`CliRunner`] configuration when executing commands asynchronously
|
||||
struct AsyncCliRunner {
|
||||
context: CliContext,
|
||||
task_manager: TaskManager,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
// === impl AsyncCliRunner ===
|
||||
|
||||
impl AsyncCliRunner {
|
||||
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
|
||||
/// execute commands asynchronously.
|
||||
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
|
||||
let task_executor = task_manager.executor();
|
||||
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
|
||||
}
|
||||
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
|
||||
fn cli_context(
|
||||
runtime: &reth_tasks::Runtime,
|
||||
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
|
||||
let handle =
|
||||
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
|
||||
let context = CliContext { task_executor: runtime.clone() };
|
||||
(context, handle)
|
||||
}
|
||||
|
||||
/// Additional context provided by the [`CliRunner`] when executing commands
|
||||
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
|
||||
/// enabled
|
||||
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
|
||||
// This prevents the costly process of spawning new threads on every
|
||||
// new block, and instead reuses the existing threads.
|
||||
.thread_keep_alive(Duration::from_secs(15))
|
||||
.thread_name("tokio-rt")
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Runs the given future to completion or until a critical task panicked.
|
||||
///
|
||||
/// Returns the error if a task panicked, or the given future returned an error.
|
||||
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
|
||||
async fn run_to_completion_or_panic<F, E>(
|
||||
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
|
||||
fut: F,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
{
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = tasks => {
|
||||
if let Err(panicked_error) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = task_manager_handle => {
|
||||
if let Ok(Err(panicked_error)) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -271,10 +253,10 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
_ = sigterm => {
|
||||
trace!(target: "reth::cli", "Received SIGTERM");
|
||||
info!(target: "reth::cli", "Received SIGTERM");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -287,7 +269,7 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -296,17 +278,17 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
|
||||
/// Default timeout for waiting on the tokio runtime to shut down.
|
||||
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
|
||||
///
|
||||
/// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
/// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
/// complete.
|
||||
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
// Shutdown the runtime on a separate thread
|
||||
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
|
||||
/// Instead, we drop it on a separate thread and optionally wait for completion.
|
||||
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-shutdown".to_string())
|
||||
.name("rt-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(rt);
|
||||
let _ = tx.send(());
|
||||
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
.unwrap();
|
||||
|
||||
if wait {
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
|
||||
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ use reth_node_builder::{
|
||||
PayloadTypes,
|
||||
};
|
||||
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
|
||||
use reth_tasks::TaskManager;
|
||||
use std::sync::Arc;
|
||||
use wallet::Wallet;
|
||||
|
||||
@@ -50,7 +49,7 @@ pub async fn setup<N>(
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
is_dev: bool,
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
|
||||
where
|
||||
N: NodeBuilderHelper,
|
||||
{
|
||||
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
|
||||
connect_nodes: bool,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
|
||||
@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
|
||||
use reth_primitives_traits::AlloyBlockHeader;
|
||||
use reth_provider::providers::BlockchainProvider;
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::sync::Arc;
|
||||
use tracing::{span, Instrument, Level};
|
||||
|
||||
@@ -110,11 +110,9 @@ where
|
||||
self,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -153,7 +151,7 @@ where
|
||||
let span = span!(Level::INFO, "node", idx);
|
||||
let node = N::default();
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.with_types_and_provider::<N, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -197,7 +195,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_stages_types::StageId;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tempfile::TempDir;
|
||||
use tracing::{debug, info, span, Level};
|
||||
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
|
||||
pub struct ChainImportResult {
|
||||
/// The nodes that were created
|
||||
pub nodes: Vec<NodeHelperType<EthereumNode>>,
|
||||
/// The task manager
|
||||
pub task_manager: TaskManager,
|
||||
/// The wallet for testing
|
||||
pub wallet: Wallet,
|
||||
/// Temporary directories that must be kept alive for the duration of the test
|
||||
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
+ Copy
|
||||
+ 'static,
|
||||
) -> eyre::Result<ChainImportResult> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
let node = EthereumNode::default();
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node_with_datadir(exec.clone(), datadir.clone())
|
||||
.testing_node_with_datadir(runtime.clone(), datadir.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
|
||||
|
||||
Ok(ChainImportResult {
|
||||
nodes,
|
||||
task_manager: tasks,
|
||||
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
|
||||
_temp_dirs: temp_dirs,
|
||||
})
|
||||
@@ -333,6 +330,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -397,6 +395,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -497,6 +496,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -210,7 +210,7 @@ where
|
||||
|
||||
let mut node_clients = Vec::new();
|
||||
match result {
|
||||
Ok((nodes, executor, _wallet)) => {
|
||||
Ok((nodes, _wallet)) => {
|
||||
// create HTTP clients for each node's RPC and Engine API endpoints
|
||||
for node in &nodes {
|
||||
node_clients.push(node.to_node_client()?);
|
||||
@@ -218,12 +218,11 @@ where
|
||||
|
||||
// spawn a separate task just to handle the shutdown
|
||||
tokio::spawn(async move {
|
||||
// keep nodes and executor in scope to ensure they're not dropped
|
||||
// keep nodes in scope to ensure they're not dropped
|
||||
let _nodes = nodes;
|
||||
let _executor = executor;
|
||||
// Wait for shutdown signal
|
||||
let _ = shutdown_rx.recv().await;
|
||||
// nodes and executor will be dropped here when the test completes
|
||||
// nodes will be dropped here when the test completes
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
let (nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -147,7 +147,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _wallet) =
|
||||
let (mut nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -201,7 +201,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -268,7 +268,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -336,7 +336,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -421,7 +421,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -485,7 +485,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Engine tree configuration.
|
||||
|
||||
use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use core::time::Duration;
|
||||
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
/// Storage tries beyond this limit are cleared (but allocations preserved).
|
||||
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
|
||||
|
||||
/// Default timeout for the state root task before spawning a sequential fallback.
|
||||
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
|
||||
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
@@ -175,6 +179,11 @@ pub struct TreeConfig {
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum number of storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Timeout for the state root task before spawning a sequential fallback computation.
|
||||
/// If `Some`, after waiting this duration for the state root task, a sequential state root
|
||||
/// computation is spawned in parallel and whichever finishes first is used.
|
||||
/// If `None`, the timeout fallback is disabled.
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -207,6 +216,7 @@ impl Default for TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +251,7 @@ impl TreeConfig {
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -270,6 +281,7 @@ impl TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
state_root_task_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,4 +630,15 @@ impl TreeConfig {
|
||||
self.sparse_trie_max_storage_tries = max_tries;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the state root task timeout.
|
||||
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
|
||||
self.state_root_task_timeout
|
||||
}
|
||||
|
||||
/// Setter for state root task timeout.
|
||||
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
|
||||
self.state_root_task_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
|
||||
#[cfg(feature = "std")]
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_payload_primitives::ExecutionPayload;
|
||||
|
||||
mod error;
|
||||
|
||||
@@ -201,6 +201,7 @@ mod tests {
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
|
||||
|
||||
@@ -141,6 +141,7 @@ test-utils = [
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
"reth-node-ethereum/test-utils",
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
|
||||
[[test]]
|
||||
|
||||
@@ -12,8 +12,7 @@ use rand::Rng;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_engine_tree::tree::{
|
||||
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_evm::OnStateHook;
|
||||
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
setup_provider(&factory, &state_updates).expect("failed to setup provider");
|
||||
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
|
||||
@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let pipeline = pipeline.take().expect("exists");
|
||||
self.pipeline_task_spawner.spawn_critical_blocking(
|
||||
self.pipeline_task_spawner.spawn_critical_blocking_task(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(Some(target)).await;
|
||||
|
||||
@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
|
||||
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
|
||||
|
||||
/// A wrapper of a state provider and a shared cache.
|
||||
///
|
||||
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
|
||||
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
|
||||
/// data for regular block execution. During regular block execution the cache doesn't need to be
|
||||
/// populated because the actual EVM database [`State`](revm::database::State) also caches
|
||||
/// internally during block execution and the cache is then updated after the block with the entire
|
||||
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
|
||||
/// also [`ExecutionCache::insert_state`].
|
||||
#[derive(Debug)]
|
||||
pub struct CachedStateProvider<S> {
|
||||
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
|
||||
/// The state provider
|
||||
state_provider: S,
|
||||
|
||||
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
|
||||
|
||||
/// Metrics for the cached state provider
|
||||
metrics: CachedStateMetrics,
|
||||
|
||||
/// If prewarm enabled we populate every cache miss
|
||||
prewarm: bool,
|
||||
}
|
||||
|
||||
impl<S> CachedStateProvider<S>
|
||||
where
|
||||
S: StateProvider,
|
||||
{
|
||||
impl<S> CachedStateProvider<S> {
|
||||
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
|
||||
/// [`CachedStateMetrics`].
|
||||
pub const fn new(
|
||||
@@ -102,27 +104,18 @@ where
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
) -> Self {
|
||||
Self { state_provider, caches, metrics, prewarm: false }
|
||||
Self { state_provider, caches, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> CachedStateProvider<S> {
|
||||
/// Enables pre-warm mode so that every cache miss is populated.
|
||||
///
|
||||
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
|
||||
/// the cache with data for regular block execution. During regular block execution the
|
||||
/// cache doesn't need to be populated because the actual EVM database
|
||||
/// [`State`](revm::database::State) also caches internally during block execution and the cache
|
||||
/// is then updated after the block with the entire [`BundleState`] output of that block which
|
||||
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
|
||||
pub const fn prewarm(mut self) -> Self {
|
||||
self.prewarm = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether this provider should pre-warm cache misses.
|
||||
const fn is_prewarm(&self) -> bool {
|
||||
self.prewarm
|
||||
impl<S> CachedStateProvider<S, true> {
|
||||
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
|
||||
pub const fn new_prewarm(
|
||||
state_provider: S,
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
) -> Self {
|
||||
Self { state_provider, caches, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
|
||||
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
|
||||
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_account_with(*address, || {
|
||||
self.state_provider.basic_account(address)
|
||||
})? {
|
||||
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
|
||||
Cached(T),
|
||||
}
|
||||
|
||||
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
|
||||
fn storage(
|
||||
&self,
|
||||
account: Address,
|
||||
storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
|
||||
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
|
||||
})? {
|
||||
@@ -360,9 +353,9 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
|
||||
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
|
||||
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_code_with(*code_hash, || {
|
||||
self.state_provider.bytecode_by_hash(code_hash)
|
||||
})? {
|
||||
@@ -378,7 +371,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
|
||||
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
|
||||
self.state_provider.state_root(hashed_state)
|
||||
}
|
||||
@@ -402,7 +397,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
|
||||
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn proof(
|
||||
&self,
|
||||
input: TrieInput,
|
||||
@@ -429,7 +426,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
|
||||
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn storage_root(
|
||||
&self,
|
||||
address: Address,
|
||||
@@ -457,7 +456,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
|
||||
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
|
||||
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
|
||||
self.state_provider.block_hash(number)
|
||||
}
|
||||
@@ -471,7 +470,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
|
||||
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
|
||||
self.state_provider.hashed_post_state(bundle_state)
|
||||
}
|
||||
|
||||
@@ -8,9 +8,17 @@ use reth_metrics::{
|
||||
metrics::{Counter, Gauge, Histogram},
|
||||
Metrics,
|
||||
};
|
||||
use reth_primitives_traits::constants::gas_units::MEGAGAS;
|
||||
use reth_trie::updates::TrieUpdates;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Width of each gas bucket in gas units (10 Mgas).
|
||||
const GAS_BUCKET_SIZE: u64 = 10 * MEGAGAS;
|
||||
|
||||
/// Number of gas buckets. The last bucket is a catch-all for everything above
|
||||
/// `(NUM_GAS_BUCKETS - 1) * GAS_BUCKET_SIZE`.
|
||||
const NUM_GAS_BUCKETS: usize = 5;
|
||||
|
||||
/// Metrics for the `EngineApi`.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EngineApiMetrics {
|
||||
@@ -235,6 +243,63 @@ impl ForkchoiceUpdatedMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-gas-bucket newPayload metrics, initialized once via [`Self::new_with_labels`].
|
||||
#[derive(Clone, Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
pub(crate) struct NewPayloadGasBucketMetrics {
|
||||
/// Latency for new payload calls in this gas bucket.
|
||||
pub(crate) new_payload_gas_bucket_latency: Histogram,
|
||||
/// Gas per second for new payload calls in this gas bucket.
|
||||
pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
|
||||
}
|
||||
|
||||
/// Holds pre-initialized [`NewPayloadGasBucketMetrics`] instances, one per gas bucket.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct GasBucketMetrics {
|
||||
buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
|
||||
}
|
||||
|
||||
impl Default for GasBucketMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buckets: std::array::from_fn(|i| {
|
||||
let label = Self::bucket_label(i);
|
||||
NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GasBucketMetrics {
|
||||
fn record(&self, gas_used: u64, elapsed: Duration) {
|
||||
let idx = Self::bucket_index(gas_used);
|
||||
self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
|
||||
self.buckets[idx]
|
||||
.new_payload_gas_bucket_gas_per_second
|
||||
.record(gas_used as f64 / elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
fn bucket_index(gas_used: u64) -> usize {
|
||||
let idx = gas_used / GAS_BUCKET_SIZE;
|
||||
(idx as usize).min(NUM_GAS_BUCKETS - 1)
|
||||
}
|
||||
|
||||
/// Returns a human-readable label like `<10M`, `10-20M`, … `>40M`.
|
||||
fn bucket_label(index: usize) -> String {
|
||||
let m = GAS_BUCKET_SIZE / 1_000_000;
|
||||
if index == 0 {
|
||||
format!("<{m}M")
|
||||
} else if index < NUM_GAS_BUCKETS - 1 {
|
||||
let lo = m * index as u64;
|
||||
let hi = lo + m;
|
||||
format!("{lo}-{hi}M")
|
||||
} else {
|
||||
let lo = m * index as u64;
|
||||
format!(">{lo}M")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for engine newPayload responses.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
@@ -245,6 +310,9 @@ pub(crate) struct NewPayloadStatusMetrics {
|
||||
/// Start time of the latest new payload call.
|
||||
#[metric(skip)]
|
||||
pub(crate) latest_start_at: Option<Instant>,
|
||||
/// Gas-bucket-labeled latency and gas/s histograms.
|
||||
#[metric(skip)]
|
||||
pub(crate) gas_bucket: GasBucketMetrics,
|
||||
/// The total count of new payload messages received.
|
||||
pub(crate) new_payload_messages: Counter,
|
||||
/// The total count of new payload messages that we responded to with
|
||||
@@ -321,6 +389,7 @@ impl NewPayloadStatusMetrics {
|
||||
self.new_payload_messages.increment(1);
|
||||
self.new_payload_latency.record(elapsed);
|
||||
self.new_payload_last.set(elapsed);
|
||||
self.gas_bucket.record(gas_used, elapsed);
|
||||
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
|
||||
self.forkchoice_updated_new_payload_time_diff
|
||||
.record(start - latest_forkchoice_updated_at);
|
||||
@@ -365,6 +434,8 @@ pub struct BlockValidationMetrics {
|
||||
pub state_root_parallel_fallback_total: Counter,
|
||||
/// Total number of times the state root task failed but the fallback succeeded.
|
||||
pub state_root_task_fallback_success_total: Counter,
|
||||
/// Total number of times the state root task timed out and a sequential fallback was spawned.
|
||||
pub state_root_task_timeout_total: Counter,
|
||||
/// Latest state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub state_root_duration: Gauge,
|
||||
/// Histogram for state root duration ie the time spent blocked waiting for the state root
|
||||
|
||||
@@ -1510,7 +1510,7 @@ where
|
||||
.engine
|
||||
.failed_forkchoice_updated_response_deliveries
|
||||
.increment(1);
|
||||
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
@@ -1534,7 +1534,7 @@ where
|
||||
BeaconOnNewPayloadError::Internal(Box::new(e))
|
||||
}))
|
||||
{
|
||||
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
|
||||
self.metrics
|
||||
.engine
|
||||
.failed_new_payload_response_deliveries
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
//! Executor for mixed I/O and CPU workloads.
|
||||
|
||||
use reth_trie_parallel::root::get_tokio_runtime_handle;
|
||||
use tokio::{runtime::Handle, task::JoinHandle};
|
||||
|
||||
/// An executor for mixed I/O and CPU workloads.
|
||||
///
|
||||
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
|
||||
/// runtime if available or create its own.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorkloadExecutor {
|
||||
inner: WorkloadExecutorInner,
|
||||
}
|
||||
|
||||
impl Default for WorkloadExecutor {
|
||||
fn default() -> Self {
|
||||
Self { inner: WorkloadExecutorInner::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkloadExecutor {
|
||||
/// Returns the handle to the tokio runtime
|
||||
pub(super) const fn handle(&self) -> &Handle {
|
||||
&self.inner.handle
|
||||
}
|
||||
|
||||
/// Runs the provided function on an executor dedicated to blocking operations.
|
||||
#[track_caller]
|
||||
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
self.inner.handle.spawn_blocking(func)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct WorkloadExecutorInner {
|
||||
handle: Handle,
|
||||
}
|
||||
|
||||
impl WorkloadExecutorInner {
|
||||
fn new() -> Self {
|
||||
Self { handle: get_tokio_runtime_handle() }
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,6 @@ use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use executor::WorkloadExecutor;
|
||||
use metrics::{Counter, Histogram};
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
@@ -24,8 +23,8 @@ use rayon::prelude::*;
|
||||
use reth_evm::{
|
||||
block::ExecutableTxParts,
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
|
||||
TxEnvFor,
|
||||
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
|
||||
SpecFor, TxEnvFor,
|
||||
};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
@@ -34,6 +33,7 @@ use reth_provider::{
|
||||
StateProviderFactory, StateReader,
|
||||
};
|
||||
use reth_revm::{db::BundleState, state::EvmState};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
@@ -55,7 +55,6 @@ use std::{
|
||||
use tracing::{debug, debug_span, instrument, warn, Span};
|
||||
|
||||
pub mod bal;
|
||||
pub mod executor;
|
||||
pub mod multiproof;
|
||||
mod preserved_sparse_trie;
|
||||
pub mod prewarm;
|
||||
@@ -109,7 +108,7 @@ where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
/// The executor used by to spawn tasks.
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
/// The most recent cache used for execution.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Metrics for trie operations
|
||||
@@ -146,13 +145,13 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// Returns a reference to the workload executor driving payload tasks.
|
||||
pub const fn executor(&self) -> &WorkloadExecutor {
|
||||
pub const fn executor(&self) -> &Runtime {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
/// Creates a new payload processor.
|
||||
pub fn new(
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
evm_config: Evm,
|
||||
config: &TreeConfig,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
@@ -236,7 +235,8 @@ where
|
||||
+ 'static,
|
||||
{
|
||||
// start preparing transactions immediately
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx) =
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
|
||||
let span = Span::current();
|
||||
let (to_sparse_trie, sparse_trie_rx) = channel();
|
||||
@@ -277,15 +277,7 @@ where
|
||||
|
||||
// Create and spawn the storage proof task
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
task_ctx,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
v2_proofs_enabled,
|
||||
);
|
||||
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
|
||||
|
||||
if config.disable_trie_cache() {
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
@@ -351,7 +343,8 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx) =
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
|
||||
let prewarm_handle =
|
||||
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
|
||||
@@ -364,11 +357,23 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction count threshold below which sequential signature recovery is used.
|
||||
///
|
||||
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
|
||||
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
|
||||
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
|
||||
/// for small blocks.
|
||||
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
|
||||
|
||||
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
|
||||
///
|
||||
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
|
||||
/// sequential iteration to avoid rayon overhead.
|
||||
#[expect(clippy::type_complexity)]
|
||||
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
|
||||
&self,
|
||||
transactions: I,
|
||||
transaction_count: usize,
|
||||
) -> (
|
||||
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
|
||||
@@ -377,22 +382,51 @@ where
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into();
|
||||
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
// Only send Ok(_) variants to prewarming task.
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
if transaction_count == 0 {
|
||||
// Empty block — nothing to do.
|
||||
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
|
||||
// Sequential path for small blocks — avoids rayon work-stealing setup and
|
||||
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
transaction_count,
|
||||
"using sequential sig recovery for small block"
|
||||
);
|
||||
self.executor.spawn_blocking(move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
for (idx, tx) in transactions.into_iter().enumerate() {
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// Parallel path — spawn on rayon for parallel signature recovery.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
transactions.into_par_iter().enumerate().for_each_with(
|
||||
ooo_tx,
|
||||
|ooo_tx, (idx, tx)| {
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
// Only send Ok(_) variants to prewarming task.
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to the execution task in order.
|
||||
@@ -423,7 +457,7 @@ where
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
@@ -432,11 +466,7 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
if self.disable_transaction_prewarming {
|
||||
// if no transactions should be executed we clear them but still spawn the task for
|
||||
// caching updates
|
||||
transactions = mpsc::channel().1;
|
||||
}
|
||||
let skip_prewarm = self.disable_transaction_prewarming;
|
||||
|
||||
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
|
||||
|
||||
@@ -465,7 +495,9 @@ where
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let mode = if let Some(bal) = bal {
|
||||
let mode = if skip_prewarm {
|
||||
PrewarmMode::Skipped
|
||||
} else if let Some(bal) = bal {
|
||||
PrewarmMode::BlockAccessList(bal)
|
||||
} else {
|
||||
PrewarmMode::Transactions(transactions)
|
||||
@@ -723,6 +755,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
|
||||
}
|
||||
|
||||
/// Takes the state root receiver out of the handle for use with custom waiting logic
|
||||
/// (e.g., timeout-based waiting).
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If payload processing was started without background tasks.
|
||||
pub const fn take_state_root_rx(
|
||||
&mut self,
|
||||
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
self.state_root.take().expect("state_root is None")
|
||||
}
|
||||
|
||||
/// Returns a state hook to be used to send state updates to this task.
|
||||
///
|
||||
/// If a multiproof task is spawned the hook will notify it about new states.
|
||||
@@ -1001,9 +1045,7 @@ mod tests {
|
||||
use super::PayloadExecutionCache;
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
|
||||
payload_processor::{
|
||||
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
|
||||
},
|
||||
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
|
||||
precompile_cache::PrecompileCacheMap,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
@@ -1105,7 +1147,7 @@ mod tests {
|
||||
#[test]
|
||||
fn on_inserted_executed_block_populates_cache() {
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
@@ -1134,7 +1176,7 @@ mod tests {
|
||||
#[test]
|
||||
fn on_inserted_executed_block_skips_on_parent_mismatch() {
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
@@ -1269,7 +1311,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let mut payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
|
||||
@@ -1547,17 +1547,11 @@ mod tests {
|
||||
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
|
||||
use revm_primitives::{B256, U256};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tokio::runtime::{Handle, Runtime};
|
||||
|
||||
/// Get a handle to the test runtime, creating it if necessary
|
||||
fn get_test_runtime_handle() -> Handle {
|
||||
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
|
||||
TEST_RT
|
||||
.get_or_init(|| {
|
||||
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
|
||||
})
|
||||
.handle()
|
||||
.clone()
|
||||
/// Get a test runtime, creating it if necessary
|
||||
fn get_test_runtime() -> &'static reth_tasks::Runtime {
|
||||
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
|
||||
TEST_RT.get_or_init(reth_tasks::Runtime::test)
|
||||
}
|
||||
|
||||
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
|
||||
@@ -1573,11 +1567,11 @@ mod tests {
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
let rt_handle = get_test_runtime_handle();
|
||||
let runtime = get_test_runtime();
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
|
||||
let task_ctx = ProofTaskCtx::new(overlay_factory);
|
||||
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
|
||||
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
|
||||
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ use crate::tree::{
|
||||
cached_state::{CachedStateProvider, SavedCache},
|
||||
payload_processor::{
|
||||
bal::{self, total_slots, BALSlotIter},
|
||||
executor::WorkloadExecutor,
|
||||
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
|
||||
PayloadExecutionCache,
|
||||
},
|
||||
@@ -37,6 +36,7 @@ use reth_provider::{
|
||||
StateReader,
|
||||
};
|
||||
use reth_revm::{database::StateProviderDatabase, state::EvmState};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::MultiProofTargets;
|
||||
use std::{
|
||||
ops::Range,
|
||||
@@ -49,13 +49,16 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace, warn, Span};
|
||||
|
||||
/// Determines the prewarming mode: transaction-based or BAL-based.
|
||||
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
|
||||
#[derive(Debug)]
|
||||
pub enum PrewarmMode<Tx> {
|
||||
/// Prewarm by executing transactions from a stream.
|
||||
Transactions(Receiver<Tx>),
|
||||
/// Prewarm by prefetching slots from a Block Access List.
|
||||
BlockAccessList(Arc<BlockAccessList>),
|
||||
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
|
||||
/// benefit). No workers are spawned.
|
||||
Skipped,
|
||||
}
|
||||
|
||||
/// A wrapper for transactions that includes their index in the block.
|
||||
@@ -78,7 +81,7 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// The executor used to spawn execution tasks.
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
/// Shared execution cache.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Context provided to execution tasks
|
||||
@@ -101,7 +104,7 @@ where
|
||||
{
|
||||
/// Initializes the task with the given transactions pending execution
|
||||
pub fn new(
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
execution_cache: PayloadExecutionCache,
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
@@ -416,6 +419,10 @@ where
|
||||
PrewarmMode::BlockAccessList(bal) => {
|
||||
self.run_bal_prewarm(bal, actions_tx);
|
||||
}
|
||||
PrewarmMode::Skipped => {
|
||||
let _ = actions_tx
|
||||
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
}
|
||||
}
|
||||
|
||||
let mut final_execution_outcome = None;
|
||||
@@ -528,11 +535,8 @@ where
|
||||
if let Some(saved_cache) = saved_cache {
|
||||
let caches = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
state_provider = Box::new(
|
||||
CachedStateProvider::new(state_provider, caches, cache_metrics)
|
||||
// ensure we pre-warm the cache
|
||||
.prewarm(),
|
||||
);
|
||||
state_provider =
|
||||
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
|
||||
}
|
||||
|
||||
let state_provider = StateProviderDatabase::new(state_provider);
|
||||
@@ -590,18 +594,11 @@ where
|
||||
return
|
||||
};
|
||||
|
||||
while let Ok(IndexedTransaction { index, tx }) = {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
|
||||
.entered();
|
||||
txs.recv()
|
||||
} {
|
||||
let enter = debug_span!(
|
||||
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
|
||||
let _enter = debug_span!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
"prewarm tx",
|
||||
index,
|
||||
tx_hash = %tx.tx().tx_hash(),
|
||||
is_success = tracing::field::Empty,
|
||||
gas_used = tracing::field::Empty,
|
||||
)
|
||||
.entered();
|
||||
|
||||
@@ -632,12 +629,6 @@ where
|
||||
};
|
||||
metrics.execution_duration.record(start.elapsed());
|
||||
|
||||
// record some basic information about the transactions
|
||||
enter.record("gas_used", res.result.gas_used());
|
||||
enter.record("is_success", res.result.is_success());
|
||||
|
||||
drop(enter);
|
||||
|
||||
// If the task was cancelled, stop execution, and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
break
|
||||
@@ -646,16 +637,12 @@ where
|
||||
// Only send outcome for transactions after the first txn
|
||||
// as the main execution will be just as fast
|
||||
if index > 0 {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
let (targets, storage_targets) =
|
||||
multiproof_targets_from_state(res.state, v2_proofs_enabled);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
if let Some(to_multi_proof) = &to_multi_proof {
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
drop(_enter);
|
||||
}
|
||||
|
||||
metrics.total_runtime.record(start.elapsed());
|
||||
@@ -671,7 +658,7 @@ where
|
||||
fn spawn_workers<Tx>(
|
||||
self,
|
||||
workers_needed: usize,
|
||||
task_executor: &WorkloadExecutor,
|
||||
task_executor: &Runtime,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
done_tx: Sender<()>,
|
||||
) -> CrossbeamSender<IndexedTransaction<Tx>>
|
||||
@@ -708,7 +695,7 @@ where
|
||||
fn spawn_bal_worker(
|
||||
&self,
|
||||
idx: usize,
|
||||
executor: &WorkloadExecutor,
|
||||
executor: &Runtime,
|
||||
bal: Arc<BlockAccessList>,
|
||||
range: Range<usize>,
|
||||
done_tx: Sender<()>,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use super::executor::WorkloadExecutor;
|
||||
use crate::tree::{
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
@@ -13,6 +12,7 @@ use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use smallvec::SmallVec;
|
||||
@@ -44,8 +44,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
Cleared(SparseTrieTask<BPF, A, S>),
|
||||
Cached(SparseTrieCacheTask<A, S>),
|
||||
@@ -56,8 +56,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
match self {
|
||||
@@ -117,8 +117,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie task with the given trie.
|
||||
pub(super) const fn new(
|
||||
@@ -277,12 +277,12 @@ pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparse
|
||||
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrieExt + Default,
|
||||
S: SparseTrieExt + Default + Clone,
|
||||
A: SparseTrie + Default,
|
||||
S: SparseTrie + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_trie(
|
||||
executor: &WorkloadExecutor,
|
||||
executor: &Runtime,
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
@@ -486,7 +486,7 @@ where
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
level = "trace",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
@@ -518,7 +518,7 @@ where
|
||||
|
||||
/// Processes a hashed state update and encodes all state changes as trie updates.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
level = "trace",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::tree::{
|
||||
cached_state::CachedStateProvider,
|
||||
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
|
||||
instrumented_state::InstrumentedStateProvider,
|
||||
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
|
||||
payload_processor::PayloadProcessor,
|
||||
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
|
||||
sparse_trie::StateRootComputeOutcome,
|
||||
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
|
||||
@@ -17,7 +17,6 @@ use alloy_evm::Evm;
|
||||
use alloy_primitives::B256;
|
||||
|
||||
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
|
||||
use rayon::prelude::*;
|
||||
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
|
||||
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
|
||||
use reth_engine_primitives::{
|
||||
@@ -49,7 +48,7 @@ use revm_primitives::Address;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::Arc,
|
||||
sync::{mpsc::RecvTimeoutError, Arc},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
|
||||
@@ -134,6 +133,8 @@ where
|
||||
validator: V,
|
||||
/// Changeset cache for in-memory trie changesets
|
||||
changeset_cache: ChangesetCache,
|
||||
/// Task runtime for spawning parallel work.
|
||||
runtime: reth_tasks::Runtime,
|
||||
}
|
||||
|
||||
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
|
||||
@@ -166,10 +167,11 @@ where
|
||||
config: TreeConfig,
|
||||
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> Self {
|
||||
let precompile_cache_map = PrecompileCacheMap::default();
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
runtime.clone(),
|
||||
evm_config.clone(),
|
||||
&config,
|
||||
precompile_cache_map.clone(),
|
||||
@@ -186,6 +188,7 @@ where
|
||||
metrics: EngineApiMetrics::default(),
|
||||
validator,
|
||||
changeset_cache,
|
||||
runtime,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,35 +231,20 @@ where
|
||||
V: PayloadValidator<T, Block = N::Block>,
|
||||
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
|
||||
{
|
||||
match input {
|
||||
Ok(match input {
|
||||
BlockOrPayload::Payload(payload) => {
|
||||
let (iter, convert) = self
|
||||
let iter = self
|
||||
.evm_config
|
||||
.tx_iterator_for_payload(payload)
|
||||
.map_err(NewPayloadError::other)?
|
||||
.into();
|
||||
|
||||
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
|
||||
let convert = move |tx| {
|
||||
let Either::Left(tx) = tx else { unreachable!() };
|
||||
convert(tx).map(Either::Left).map_err(Either::Left)
|
||||
};
|
||||
|
||||
// Box the closure to satisfy the `Fn` bound both here and in the branch below
|
||||
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
|
||||
.map_err(NewPayloadError::other)?;
|
||||
Either::Left(iter)
|
||||
}
|
||||
BlockOrPayload::Block(block) => {
|
||||
let iter = Either::Right(
|
||||
block.body().clone_transactions().into_par_iter().map(Either::Right),
|
||||
);
|
||||
let convert = move |tx: Either<_, N::SignedTx>| {
|
||||
let Either::Right(tx) = tx else { unreachable!() };
|
||||
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
|
||||
};
|
||||
|
||||
Ok((iter, Box::new(convert)))
|
||||
let txs = block.body().clone_transactions();
|
||||
let convert = |tx: N::SignedTx| tx.try_into_recovered();
|
||||
Either::Right((txs, convert))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a [`ExecutionCtxFor`] for the given payload or block.
|
||||
@@ -515,7 +503,17 @@ where
|
||||
match strategy {
|
||||
StateRootStrategy::StateRootTask => {
|
||||
debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
|
||||
match handle.state_root() {
|
||||
|
||||
let task_result = ensure_ok_post_block!(
|
||||
self.await_state_root_with_timeout(
|
||||
&mut handle,
|
||||
overlay_factory.clone(),
|
||||
&hashed_state,
|
||||
),
|
||||
block
|
||||
);
|
||||
|
||||
match task_result {
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
|
||||
let elapsed = root_time.elapsed();
|
||||
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
|
||||
@@ -586,7 +584,7 @@ where
|
||||
}
|
||||
|
||||
let (root, updates) = ensure_ok_post_block!(
|
||||
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
|
||||
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
|
||||
block
|
||||
);
|
||||
|
||||
@@ -818,21 +816,18 @@ where
|
||||
|
||||
let tx = tx_result.map_err(BlockExecutionError::other)?;
|
||||
let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
|
||||
let tx_hash = <Tx as alloy_evm::RecoveredTx<InnerTx>>::tx(&tx).tx_hash();
|
||||
|
||||
senders.push(tx_signer);
|
||||
|
||||
let span = debug_span!(
|
||||
let _enter = debug_span!(
|
||||
target: "engine::tree",
|
||||
"execute tx",
|
||||
?tx_hash,
|
||||
gas_used = tracing::field::Empty,
|
||||
);
|
||||
let enter = span.entered();
|
||||
)
|
||||
.entered();
|
||||
trace!(target: "engine::tree", "Executing transaction");
|
||||
|
||||
let tx_start = Instant::now();
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
let current_len = executor.receipts().len();
|
||||
@@ -844,8 +839,6 @@ where
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
enter.record("gas_used", gas_used);
|
||||
}
|
||||
drop(exec_span);
|
||||
|
||||
@@ -874,7 +867,8 @@ where
|
||||
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
|
||||
let overlay_factory =
|
||||
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
|
||||
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
|
||||
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
|
||||
.incremental_root_with_updates()
|
||||
}
|
||||
|
||||
/// Compute state root for the given hashed post state in serial.
|
||||
@@ -883,7 +877,6 @@ where
|
||||
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
|
||||
/// trie updates for this block.
|
||||
fn compute_state_root_serial(
|
||||
&self,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
@@ -901,6 +894,102 @@ where
|
||||
.root_with_updates()?)
|
||||
}
|
||||
|
||||
/// Awaits the state root from the background task, with an optional timeout fallback.
|
||||
///
|
||||
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
|
||||
/// state root task up to the timeout duration. If the task doesn't complete in time, a
|
||||
/// sequential state root computation is spawned via `spawn_blocking`. Both computations
|
||||
/// then race: the main thread polls the task receiver and the sequential result channel
|
||||
/// in a loop, returning whichever finishes first.
|
||||
///
|
||||
/// If no timeout is configured, this simply awaits the state root task without any fallback.
|
||||
///
|
||||
/// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
|
||||
/// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
|
||||
/// `Result` captures parallel state root task errors that can still fall back to serial.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_validator",
|
||||
name = "await_state_root",
|
||||
skip_all
|
||||
)]
|
||||
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
|
||||
&self,
|
||||
handle: &mut PayloadHandle<Tx, Err, R>,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
let Some(timeout) = self.config.state_root_task_timeout() else {
|
||||
return Ok(handle.state_root());
|
||||
};
|
||||
|
||||
let task_rx = handle.take_state_root_rx();
|
||||
|
||||
match task_rx.recv_timeout(timeout) {
|
||||
Ok(result) => Ok(result),
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
?timeout,
|
||||
"State root task timed out, spawning sequential fallback"
|
||||
);
|
||||
self.metrics.block_validation.state_root_task_timeout_total.increment(1);
|
||||
|
||||
let (seq_tx, seq_rx) =
|
||||
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
|
||||
|
||||
let seq_overlay = overlay_factory;
|
||||
let seq_hashed_state = hashed_state.clone();
|
||||
self.payload_processor.executor().spawn_blocking(move || {
|
||||
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
|
||||
let _ = seq_tx.send(result);
|
||||
});
|
||||
|
||||
const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
|
||||
loop {
|
||||
match task_rx.recv_timeout(POLL_INTERVAL) {
|
||||
Ok(result) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
source = "task",
|
||||
"State root timeout race won"
|
||||
);
|
||||
return Ok(result);
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"State root task dropped, waiting for sequential fallback"
|
||||
);
|
||||
let result = seq_rx.recv().map_err(|_| {
|
||||
ProviderError::other(std::io::Error::other(
|
||||
"both state root computations failed",
|
||||
))
|
||||
})?;
|
||||
let (state_root, trie_updates) = result?;
|
||||
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {}
|
||||
}
|
||||
|
||||
if let Ok(result) = seq_rx.try_recv() {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
source = "sequential",
|
||||
"State root timeout race won"
|
||||
);
|
||||
let (state_root, trie_updates) = result?;
|
||||
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Compares trie updates from the state root task with serial state root computation.
|
||||
///
|
||||
/// This is used for debugging and validating the correctness of the parallel state root
|
||||
@@ -915,7 +1004,7 @@ where
|
||||
) {
|
||||
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
|
||||
|
||||
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
|
||||
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
|
||||
Ok((serial_root, serial_trie_updates)) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
|
||||
@@ -203,6 +203,7 @@ impl TestHarness {
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
let tree = EngineApiTreeHandler::new(
|
||||
@@ -404,6 +405,7 @@ impl ValidatorTestHarness {
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache,
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
Self { harness, validator, metrics: TestMetrics::default() }
|
||||
|
||||
@@ -22,6 +22,7 @@ reth-node-core.workspace = true
|
||||
reth-node-ethereum.workspace = true
|
||||
reth-node-metrics.workspace = true
|
||||
reth-rpc-server-types.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext};
|
||||
use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
|
||||
use reth_node_metrics::recorder::install_prometheus_recorder;
|
||||
use reth_rpc_server_types::RpcModuleValidator;
|
||||
use reth_tasks::RayonConfig;
|
||||
use reth_tracing::{FileWorkerGuard, Layers};
|
||||
use std::{fmt, sync::Arc};
|
||||
|
||||
@@ -153,6 +154,16 @@ where
|
||||
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
|
||||
}
|
||||
|
||||
let rayon_config = RayonConfig {
|
||||
reserved_cpu_cores: command.engine.reserved_cpu_cores,
|
||||
proof_storage_worker_threads: command.engine.storage_worker_count,
|
||||
proof_account_worker_threads: command.engine.account_worker_count,
|
||||
..Default::default()
|
||||
};
|
||||
let runner = CliRunner::try_with_runtime_config(
|
||||
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
|
||||
)?;
|
||||
|
||||
runner.run_command_until_exit(|ctx| {
|
||||
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
|
||||
})
|
||||
|
||||
@@ -92,7 +92,7 @@ impl<
|
||||
/// This accepts a closure that is used to launch the node via the
|
||||
/// [`NodeCommand`](node::NodeCommand).
|
||||
///
|
||||
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
|
||||
/// This command will be run on the default tokio runtime.
|
||||
///
|
||||
///
|
||||
/// # Example
|
||||
@@ -143,7 +143,7 @@ impl<
|
||||
/// This accepts a closure that is used to launch the node via the
|
||||
/// [`NodeCommand`](node::NodeCommand).
|
||||
///
|
||||
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
|
||||
/// This command will be run on the default tokio runtime.
|
||||
pub fn run_with_components<N>(
|
||||
self,
|
||||
components: impl CliComponentsBuilder<N>,
|
||||
|
||||
@@ -273,6 +273,7 @@ where
|
||||
gas_limit: payload.payload.gas_limit(),
|
||||
basefee: payload.payload.saturated_base_fee_per_gas(),
|
||||
blob_excess_gas_and_price,
|
||||
slot_num: 0,
|
||||
};
|
||||
|
||||
Ok(EvmEnv { cfg_env, block_env })
|
||||
|
||||
@@ -108,8 +108,7 @@ impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExec
|
||||
result: ResultAndState::new(
|
||||
ExecutionResult::Success {
|
||||
reason: SuccessReason::Return,
|
||||
gas_used: 0,
|
||||
gas_refunded: 0,
|
||||
gas: Default::default(),
|
||||
logs: vec![],
|
||||
output: Output::Call(Bytes::from(vec![])),
|
||||
},
|
||||
|
||||
@@ -112,4 +112,5 @@ test-utils = [
|
||||
"reth-primitives-traits/test-utils",
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-stages-types/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
|
||||
@@ -107,26 +107,16 @@ impl EthereumNode {
|
||||
/// use reth_chainspec::MAINNET;
|
||||
/// use reth_node_ethereum::EthereumNode;
|
||||
///
|
||||
/// let factory = EthereumNode::provider_factory_builder()
|
||||
/// .open_read_only(MAINNET.clone(), "datadir")
|
||||
/// .unwrap();
|
||||
/// fn demo(runtime: reth_tasks::Runtime) {
|
||||
/// let factory = EthereumNode::provider_factory_builder()
|
||||
/// .open_read_only(MAINNET.clone(), "datadir", runtime)
|
||||
/// .unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Open a Providerfactory manually with all required components
|
||||
///
|
||||
/// ```no_run
|
||||
/// use reth_chainspec::ChainSpecBuilder;
|
||||
/// use reth_db::open_db_read_only;
|
||||
/// use reth_node_ethereum::EthereumNode;
|
||||
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
|
||||
///
|
||||
/// let factory = EthereumNode::provider_factory_builder()
|
||||
/// .db(open_db_read_only("db", Default::default()).unwrap())
|
||||
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
|
||||
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
|
||||
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
|
||||
/// .build_provider_factory();
|
||||
/// ```
|
||||
/// See also [`ProviderFactory::new`](reth_provider::ProviderFactory::new) for constructing
|
||||
/// a [`ProviderFactory`](reth_provider::ProviderFactory) manually with all required
|
||||
/// components.
|
||||
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {
|
||||
ProviderFactoryBuilder::default()
|
||||
}
|
||||
@@ -513,7 +503,7 @@ where
|
||||
// it doesn't impact the first block or the first gossiped blob transaction, so we
|
||||
// initialize this in the background
|
||||
let kzg_settings = validator.validator().kzg_settings().clone();
|
||||
ctx.task_executor().spawn_blocking(async move {
|
||||
ctx.task_executor().spawn_blocking_task(async move {
|
||||
let _ = kzg_settings.get();
|
||||
debug!(target: "reth::cli", "Initialized KZG settings");
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ use reth_ethereum_primitives::PooledTransactionVariant;
|
||||
use reth_node_builder::{NodeBuilder, NodeHandle};
|
||||
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
@@ -20,8 +20,7 @@ use std::{
|
||||
#[tokio::test]
|
||||
async fn can_handle_blobs() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
let chain_spec = Arc::new(
|
||||
@@ -37,7 +36,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -92,8 +91,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
let chain_spec = Arc::new(
|
||||
@@ -107,7 +105,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
|
||||
.with_force_blob_sidecar_upcasting(),
|
||||
);
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -146,8 +144,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn blob_conversion_at_osaka() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
|
||||
// Osaka activates in 2 slots
|
||||
@@ -170,7 +167,7 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
|
||||
.with_force_blob_sidecar_upcasting(),
|
||||
);
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
|
||||
@@ -27,7 +27,7 @@ async fn can_run_eth_node_with_custom_genesis_number() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) =
|
||||
let (mut nodes, wallet) =
|
||||
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
|
||||
|
||||
let mut node = nodes.pop().unwrap();
|
||||
@@ -81,7 +81,7 @@ async fn custom_genesis_block_query_boundaries() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, _wallet) =
|
||||
let (mut nodes, _wallet) =
|
||||
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
|
||||
|
||||
let node = nodes.pop().unwrap();
|
||||
|
||||
@@ -9,20 +9,19 @@ use reth_node_core::args::DevArgs;
|
||||
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
|
||||
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
|
||||
use reth_rpc_eth_api::{helpers::EthTransactions, EthApiServer};
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_run_dev_node() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let node_config = NodeConfig::test()
|
||||
.with_chain(custom_chain())
|
||||
.with_dev(DevArgs { dev: true, ..Default::default() });
|
||||
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
|
||||
.with_components(EthereumNode::components())
|
||||
.with_add_ons(EthereumAddOns::default())
|
||||
@@ -37,15 +36,14 @@ async fn can_run_dev_node() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn can_run_dev_node_custom_attributes() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let node_config = NodeConfig::test()
|
||||
.with_chain(custom_chain())
|
||||
.with_dev(DevArgs { dev: true, ..Default::default() });
|
||||
let fee_recipient = Address::random();
|
||||
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
|
||||
.with_components(EthereumNode::components())
|
||||
.with_add_ons(EthereumAddOns::default())
|
||||
|
||||
@@ -14,14 +14,14 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_provider::BlockNumReader;
|
||||
use reth_rpc_api::TestingBuildBlockRequestV1;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_run_eth_node() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup::<EthereumNode>(
|
||||
1,
|
||||
Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
@@ -57,8 +57,7 @@ async fn can_run_eth_node() -> eyre::Result<()> {
|
||||
#[cfg(unix)]
|
||||
async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let exec = TaskManager::current();
|
||||
let exec = exec.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
// Chain spec with test allocs
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
@@ -76,7 +75,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http().with_auth_ipc());
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec)
|
||||
.testing_node(runtime)
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -105,8 +104,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
|
||||
#[cfg(unix)]
|
||||
async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let exec = TaskManager::current();
|
||||
let exec = exec.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
// Chain spec with test allocs
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
@@ -121,7 +119,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
|
||||
// Node setup
|
||||
let node_config = NodeConfig::test().with_chain(chain_spec);
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec)
|
||||
.testing_node(runtime)
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -139,7 +137,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
|
||||
async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup::<EthereumNode>(
|
||||
1,
|
||||
Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
@@ -190,8 +188,7 @@ async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
let chain_spec = Arc::new(
|
||||
@@ -208,7 +205,7 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
|
||||
);
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec)
|
||||
.testing_node(runtime)
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -278,7 +275,7 @@ async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
|
||||
.with_sparse_trie_prune_depth(2)
|
||||
.with_sparse_trie_max_storage_tries(100);
|
||||
|
||||
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, _wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
|
||||
@@ -37,7 +37,7 @@ async fn can_handle_invalid_payload_then_valid() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -154,7 +154,7 @@ async fn can_handle_multiple_invalid_payloads() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -255,7 +255,7 @@ async fn can_handle_invalid_payload_with_transactions() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration};
|
||||
async fn can_sync() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup::<EthereumNode>(
|
||||
2,
|
||||
Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
@@ -74,7 +74,7 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, _) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -116,7 +116,7 @@ async fn test_long_reorg() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, _) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -172,7 +172,7 @@ async fn test_reorg_through_backfill() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, _) = setup_engine::<EthereumNode>(
|
||||
2,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -236,7 +236,7 @@ async fn test_tx_propagation() -> eyre::Result<()> {
|
||||
};
|
||||
|
||||
// Setup 10 nodes
|
||||
let (mut nodes, _tasks, _) = setup_engine_with_connection::<EthereumNode>(
|
||||
let (mut nodes, _) = setup_engine_with_connection::<EthereumNode>(
|
||||
10,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_primitives_traits::Recovered;
|
||||
use reth_provider::CanonStateSubscriptions;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use reth_transaction_pool::{
|
||||
blobstore::InMemoryBlobStore, test_utils::OkValidator, BlockInfo, CoinbaseTipOrdering,
|
||||
EthPooledTransaction, Pool, PoolTransaction, TransactionOrigin, TransactionPool,
|
||||
@@ -24,8 +24,7 @@ use std::{sync::Arc, time::Duration};
|
||||
#[tokio::test]
|
||||
async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let executor = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let txpool = Pool::new(
|
||||
OkValidator::default(),
|
||||
@@ -49,7 +48,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(executor.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -63,13 +62,13 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
executor.spawn_critical(
|
||||
runtime.spawn_critical_task(
|
||||
"txpool maintenance task",
|
||||
reth_transaction_pool::maintain::maintain_transaction_pool_future(
|
||||
node.inner.provider.clone(),
|
||||
txpool.clone(),
|
||||
node.inner.provider.clone().canonical_state_stream(),
|
||||
executor.clone(),
|
||||
runtime.clone(),
|
||||
config,
|
||||
),
|
||||
);
|
||||
@@ -98,8 +97,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let executor = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let txpool = Pool::new(
|
||||
OkValidator::default(),
|
||||
@@ -124,7 +122,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(executor.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -135,13 +133,13 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
let w1 = wallets.first().unwrap();
|
||||
let w2 = wallets.last().unwrap();
|
||||
|
||||
executor.spawn_critical(
|
||||
runtime.spawn_critical_task(
|
||||
"txpool maintenance task",
|
||||
reth_transaction_pool::maintain::maintain_transaction_pool_future(
|
||||
node.inner.provider.clone(),
|
||||
txpool.clone(),
|
||||
node.inner.provider.clone().canonical_state_stream(),
|
||||
executor.clone(),
|
||||
runtime.clone(),
|
||||
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
|
||||
),
|
||||
);
|
||||
@@ -231,8 +229,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn maintain_txpool_commit() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
let tasks = TaskManager::current();
|
||||
let executor = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let txpool = Pool::new(
|
||||
OkValidator::default(),
|
||||
@@ -256,7 +253,7 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(executor.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
@@ -265,13 +262,13 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
|
||||
|
||||
let wallet = Wallet::default();
|
||||
|
||||
executor.spawn_critical(
|
||||
runtime.spawn_critical_task(
|
||||
"txpool maintenance task",
|
||||
reth_transaction_pool::maintain::maintain_transaction_pool_future(
|
||||
node.inner.provider.clone(),
|
||||
txpool.clone(),
|
||||
node.inner.provider.clone().canonical_state_stream(),
|
||||
executor.clone(),
|
||||
runtime.clone(),
|
||||
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
|
||||
),
|
||||
);
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_node_builder::{NodeBuilder, NodeHandle};
|
||||
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -29,8 +29,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
|
||||
let mut genesis: Genesis = MAINNET.genesis().clone();
|
||||
genesis.coinbase = address!("0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5");
|
||||
|
||||
let exec = TaskManager::current();
|
||||
let exec = exec.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
let expected_frame = expected_snapshot_frame()?;
|
||||
let prestate_mode = match &expected_frame {
|
||||
@@ -63,7 +62,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
|
||||
);
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec)
|
||||
.testing_node(runtime)
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
|
||||
@@ -21,7 +21,7 @@ use reth_node_core::{
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_payload_primitives::BuiltPayload;
|
||||
use reth_rpc_api::servers::AdminApiServer;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
@@ -57,7 +57,7 @@ async fn test_fee_history() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -142,7 +142,7 @@ async fn test_flashbots_validate_v3() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -224,7 +224,7 @@ async fn test_flashbots_validate_v4() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -314,7 +314,7 @@ async fn test_eth_config() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
@@ -344,8 +344,7 @@ async fn test_eth_config() -> eyre::Result<()> {
|
||||
async fn test_admin_external_ip() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let exec = TaskManager::current();
|
||||
let exec = exec.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
|
||||
// Chain spec with test allocs
|
||||
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
|
||||
@@ -363,7 +362,7 @@ async fn test_admin_external_ip() -> eyre::Result<()> {
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec)
|
||||
.testing_node(runtime)
|
||||
.node(EthereumNode::default())
|
||||
.launch()
|
||||
.await?;
|
||||
|
||||
@@ -142,7 +142,7 @@ async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
|
||||
let (mut nodes, _tasks, wallet) =
|
||||
let (mut nodes, wallet) =
|
||||
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
|
||||
.await?;
|
||||
let mut node = nodes.pop().unwrap();
|
||||
@@ -236,7 +236,7 @@ async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
|
||||
let (mut nodes, _tasks, wallet) =
|
||||
let (mut nodes, wallet) =
|
||||
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
|
||||
.await?;
|
||||
let mut node = nodes.pop().unwrap();
|
||||
@@ -311,7 +311,7 @@ async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
shanghai_spec(),
|
||||
false,
|
||||
@@ -421,7 +421,7 @@ async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Re
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
|
||||
let (mut nodes, _tasks, wallet) =
|
||||
let (mut nodes, wallet) =
|
||||
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
|
||||
.await?;
|
||||
let mut node = nodes.pop().unwrap();
|
||||
|
||||
@@ -29,7 +29,7 @@ async fn test_simulate_v1_with_max_fee_per_blob_gas_only() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
|
||||
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
false,
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_node_builder::{EngineNodeLauncher, FullNodeComponents, NodeBuilder, Nod
|
||||
use reth_node_ethereum::node::{EthereumAddOns, EthereumNode};
|
||||
use reth_provider::providers::BlockchainProvider;
|
||||
use reth_rpc_builder::Identity;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
|
||||
#[test]
|
||||
fn test_basic_setup() {
|
||||
@@ -46,13 +46,13 @@ fn test_basic_setup() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_eth_launcher() {
|
||||
let tasks = TaskManager::current();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
let config = NodeConfig::test();
|
||||
let db = create_test_rw_db();
|
||||
let _builder =
|
||||
NodeBuilder::new(config)
|
||||
.with_database(db)
|
||||
.with_launch_context(tasks.executor())
|
||||
.with_launch_context(runtime.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<
|
||||
NodeTypesWithDBAdapter<EthereumNode, Arc<TempDatabase<DatabaseEnv>>>,
|
||||
>>()
|
||||
@@ -64,7 +64,7 @@ async fn test_eth_launcher() {
|
||||
})
|
||||
.launch_with_fn(|builder| {
|
||||
let launcher = EngineNodeLauncher::new(
|
||||
tasks.executor(),
|
||||
runtime.clone(),
|
||||
builder.config().datadir(),
|
||||
Default::default(),
|
||||
);
|
||||
@@ -81,13 +81,13 @@ fn test_eth_launcher_with_tokio_runtime() {
|
||||
let custom_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
|
||||
|
||||
main_rt.block_on(async {
|
||||
let tasks = TaskManager::current();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
let config = NodeConfig::test();
|
||||
let db = create_test_rw_db();
|
||||
let _builder =
|
||||
NodeBuilder::new(config)
|
||||
.with_database(db)
|
||||
.with_launch_context(tasks.executor())
|
||||
.with_launch_context(runtime.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<
|
||||
NodeTypesWithDBAdapter<EthereumNode, Arc<TempDatabase<DatabaseEnv>>>,
|
||||
>>()
|
||||
@@ -101,7 +101,7 @@ fn test_eth_launcher_with_tokio_runtime() {
|
||||
})
|
||||
.launch_with_fn(|builder| {
|
||||
let launcher = EngineNodeLauncher::new(
|
||||
tasks.executor(),
|
||||
runtime.clone(),
|
||||
builder.config().datadir(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
@@ -13,14 +13,14 @@ use reth_node_core::{
|
||||
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
|
||||
use reth_rpc_api::TestingBuildBlockRequestV1;
|
||||
use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection};
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::str::FromStr;
|
||||
use tempfile::tempdir;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn testing_rpc_build_block_works() -> eyre::Result<()> {
|
||||
let tasks = TaskManager::current();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
let mut rpc_args = reth_node_core::args::RpcServerArgs::default().with_http();
|
||||
rpc_args.http_api = Some(RpcModuleSelection::from_iter([RethRpcModule::Testing]));
|
||||
let tempdir = tempdir().expect("temp datadir");
|
||||
@@ -41,7 +41,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
|
||||
|
||||
let builder = NodeBuilder::new(config)
|
||||
.with_database(db)
|
||||
.with_launch_context(tasks.executor())
|
||||
.with_launch_context(runtime)
|
||||
.with_types::<EthereumNode>()
|
||||
.with_components(EthereumNode::components())
|
||||
.with_add_ons(EthereumAddOns::default())
|
||||
|
||||
@@ -100,6 +100,7 @@ test-utils = [
|
||||
"reth-node-builder?/test-utils",
|
||||
"reth-trie-db?/test-utils",
|
||||
"reth-codecs?/test-utils",
|
||||
"reth-tasks?/test-utils",
|
||||
]
|
||||
|
||||
full = [
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor, TxEnvFor};
|
||||
use alloy_consensus::transaction::Either;
|
||||
use alloy_evm::{block::ExecutableTxParts, RecoveredTx};
|
||||
use rayon::prelude::*;
|
||||
use reth_primitives_traits::TxTy;
|
||||
@@ -21,10 +22,55 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
|
||||
}
|
||||
|
||||
/// Converts a raw transaction into an executable transaction.
|
||||
///
|
||||
/// This trait abstracts the conversion logic (e.g., decoding, signature recovery) that is
|
||||
/// parallelized in the engine.
|
||||
pub trait ConvertTx<RawTx>: Send + Sync + 'static {
|
||||
/// The executable transaction type.
|
||||
type Tx;
|
||||
/// Errors that may occur during conversion.
|
||||
type Error;
|
||||
/// Converts a raw transaction.
|
||||
fn convert(&self, raw: RawTx) -> Result<Self::Tx, Self::Error>;
|
||||
}
|
||||
|
||||
// Blanket impl so closures still work.
|
||||
impl<F, RawTx, Tx, Err> ConvertTx<RawTx> for F
|
||||
where
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type Tx = Tx;
|
||||
type Error = Err;
|
||||
fn convert(&self, raw: RawTx) -> Result<Tx, Err> {
|
||||
self(raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B, RA, RB> ConvertTx<Either<RA, RB>> for Either<A, B>
|
||||
where
|
||||
A: ConvertTx<RA>,
|
||||
B: ConvertTx<RB>,
|
||||
{
|
||||
type Tx = Either<A::Tx, B::Tx>;
|
||||
type Error = Either<A::Error, B::Error>;
|
||||
fn convert(&self, raw: Either<RA, RB>) -> Result<Self::Tx, Self::Error> {
|
||||
match (self, raw) {
|
||||
(Self::Left(a), Either::Left(raw)) => {
|
||||
a.convert(raw).map(Either::Left).map_err(Either::Left)
|
||||
}
|
||||
(Self::Right(b), Either::Right(raw)) => {
|
||||
b.convert(raw).map(Either::Right).map_err(Either::Right)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
|
||||
/// used to convert them to an executable transaction. This tuple is used in the engine to
|
||||
/// parallelize heavy work like decoding or recovery.
|
||||
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
|
||||
pub trait ExecutableTxTuple: Send + 'static {
|
||||
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
|
||||
///
|
||||
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
|
||||
@@ -37,12 +83,16 @@ pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'sta
|
||||
|
||||
/// Iterator over [`ExecutableTxTuple::Tx`].
|
||||
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
|
||||
+ IntoIterator<Item = Self::RawTx>
|
||||
+ Send
|
||||
+ 'static;
|
||||
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
|
||||
/// and will be parallelized in the engine.
|
||||
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
|
||||
type Convert: ConvertTx<Self::RawTx, Tx = Self::Tx, Error = Self::Error>;
|
||||
|
||||
/// Decomposes into the raw transaction iterator and converter.
|
||||
fn into_parts(self) -> (Self::IntoIter, Self::Convert);
|
||||
}
|
||||
|
||||
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
|
||||
@@ -50,7 +100,10 @@ where
|
||||
RawTx: Send + Sync + 'static,
|
||||
Tx: Clone + Send + Sync + 'static,
|
||||
Err: core::error::Error + Send + Sync + 'static,
|
||||
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
|
||||
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator>
|
||||
+ IntoIterator<Item = RawTx>
|
||||
+ Send
|
||||
+ 'static,
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type RawTx = RawTx;
|
||||
@@ -59,6 +112,10 @@ where
|
||||
|
||||
type IntoIter = I;
|
||||
type Convert = F;
|
||||
|
||||
fn into_parts(self) -> (I, F) {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
@@ -76,3 +133,72 @@ where
|
||||
{
|
||||
type Recovered = <T::Tx as ExecutableTxParts<TxEnvFor<Evm>, TxTy<Evm::Primitives>>>::Recovered;
|
||||
}
|
||||
|
||||
/// Wraps `Either<L, R>` to implement both [`IntoParallelIterator`] and [`IntoIterator`],
|
||||
/// mapping items through [`Either::Left`] / [`Either::Right`] on demand without collecting.
|
||||
#[derive(Debug)]
|
||||
pub struct EitherIter<L, R>(Either<L, R>);
|
||||
|
||||
impl<L, R> IntoParallelIterator for EitherIter<L, R>
|
||||
where
|
||||
L: IntoParallelIterator,
|
||||
R: IntoParallelIterator,
|
||||
L::Iter: IndexedParallelIterator,
|
||||
R::Iter: IndexedParallelIterator,
|
||||
{
|
||||
type Item = Either<L::Item, R::Item>;
|
||||
type Iter = Either<
|
||||
rayon::iter::Map<L::Iter, fn(L::Item) -> Either<L::Item, R::Item>>,
|
||||
rayon::iter::Map<R::Iter, fn(R::Item) -> Either<L::Item, R::Item>>,
|
||||
>;
|
||||
|
||||
fn into_par_iter(self) -> Self::Iter {
|
||||
match self.0 {
|
||||
Either::Left(l) => Either::Left(l.into_par_iter().map(Either::Left)),
|
||||
Either::Right(r) => Either::Right(r.into_par_iter().map(Either::Right)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> IntoIterator for EitherIter<L, R>
|
||||
where
|
||||
L: IntoIterator,
|
||||
R: IntoIterator,
|
||||
{
|
||||
type Item = Either<L::Item, R::Item>;
|
||||
type IntoIter = Either<
|
||||
core::iter::Map<L::IntoIter, fn(L::Item) -> Either<L::Item, R::Item>>,
|
||||
core::iter::Map<R::IntoIter, fn(R::Item) -> Either<L::Item, R::Item>>,
|
||||
>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
match self.0 {
|
||||
Either::Left(l) => Either::Left(l.into_iter().map(Either::Left)),
|
||||
Either::Right(r) => Either::Right(r.into_iter().map(Either::Right)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: `EitherIter` is just a newtype over `Either<L, R>`.
|
||||
unsafe impl<L: Send, R: Send> Send for EitherIter<L, R> {}
|
||||
|
||||
impl<A: ExecutableTxTuple, B: ExecutableTxTuple> ExecutableTxTuple for Either<A, B> {
|
||||
type RawTx = Either<A::RawTx, B::RawTx>;
|
||||
type Tx = Either<A::Tx, B::Tx>;
|
||||
type Error = Either<A::Error, B::Error>;
|
||||
type IntoIter = EitherIter<A::IntoIter, B::IntoIter>;
|
||||
type Convert = Either<A::Convert, B::Convert>;
|
||||
|
||||
fn into_parts(self) -> (Self::IntoIter, Self::Convert) {
|
||||
match self {
|
||||
Self::Left(a) => {
|
||||
let (iter, convert) = a.into_parts();
|
||||
(EitherIter(Either::Left(iter)), Either::Left(convert))
|
||||
}
|
||||
Self::Right(b) => {
|
||||
let (iter, convert) = b.into_parts();
|
||||
(EitherIter(Either::Right(iter)), Either::Right(convert))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ pub use aliases::*;
|
||||
#[cfg(feature = "std")]
|
||||
mod engine;
|
||||
#[cfg(feature = "std")]
|
||||
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use engine::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod metrics;
|
||||
|
||||
@@ -55,7 +55,7 @@ use reth_provider::{
|
||||
providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
|
||||
BlockReader, EthStorage, ProviderFactory,
|
||||
};
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
|
||||
use tempfile::TempDir;
|
||||
use thiserror::Error;
|
||||
@@ -175,8 +175,8 @@ pub struct TestExExHandle {
|
||||
pub events_rx: UnboundedReceiver<ExExEvent>,
|
||||
/// Channel for sending notifications to the Execution Extension
|
||||
pub notifications_tx: Sender<ExExNotification>,
|
||||
/// Node task manager
|
||||
pub tasks: TaskManager,
|
||||
/// Node task runtime
|
||||
pub runtime: Runtime,
|
||||
/// WAL temp directory handle
|
||||
_wal_directory: TempDir,
|
||||
}
|
||||
@@ -252,6 +252,7 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
chain_spec.clone(),
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)?;
|
||||
|
||||
let genesis_hash = init_genesis(&provider_factory)?;
|
||||
@@ -265,9 +266,9 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
)
|
||||
.await?;
|
||||
let network = network_manager.handle().clone();
|
||||
let tasks = TaskManager::current();
|
||||
let task_executor = tasks.executor();
|
||||
tasks.executor().spawn(network_manager);
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
|
||||
let task_executor = runtime.clone();
|
||||
runtime.spawn_task(network_manager);
|
||||
|
||||
let (_, payload_builder_handle) = NoopPayloadBuilderService::<EthEngineTypes>::new();
|
||||
|
||||
@@ -320,7 +321,7 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
provider_factory,
|
||||
events_rx,
|
||||
notifications_tx,
|
||||
tasks,
|
||||
runtime,
|
||||
_wal_directory: wal_directory,
|
||||
},
|
||||
))
|
||||
|
||||
@@ -83,4 +83,5 @@ test-utils = [
|
||||
"reth-primitives-traits/test-utils",
|
||||
"dep:reth-ethereum-primitives",
|
||||
"reth-ethereum-primitives?/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
|
||||
@@ -86,7 +86,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
|
||||
downloader,
|
||||
};
|
||||
|
||||
spawner.spawn(downloader.boxed());
|
||||
spawner.spawn_task(downloader.boxed());
|
||||
|
||||
Self { from_downloader: ReceiverStream::new(bodies_rx), to_downloader }
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ impl<H: Sealable + Send + Sync + Unpin + 'static> TaskDownloader<H> {
|
||||
updates: UnboundedReceiverStream::new(updates_rx),
|
||||
downloader,
|
||||
};
|
||||
spawner.spawn(downloader.boxed());
|
||||
spawner.spawn_task(downloader.boxed());
|
||||
|
||||
Self { from_downloader: ReceiverStream::new(headers_rx), to_downloader }
|
||||
}
|
||||
|
||||
@@ -139,6 +139,7 @@ test-utils = [
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
"dep:reth-evm-ethereum",
|
||||
"reth-evm-ethereum?/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
|
||||
[[bench]]
|
||||
|
||||
@@ -24,9 +24,12 @@ use crate::{
|
||||
import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
|
||||
listener::ConnectionListener,
|
||||
message::{NewBlockMessage, PeerMessage},
|
||||
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
|
||||
metrics::{
|
||||
BackedOffPeersMetrics, ClosedSessionsMetrics, DisconnectMetrics, NetworkMetrics,
|
||||
PendingSessionFailureMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
|
||||
},
|
||||
network::{NetworkHandle, NetworkHandleMessage},
|
||||
peers::PeersManager,
|
||||
peers::{BackoffReason, PeersManager},
|
||||
poll_nested_stream_with_budget,
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
required_block_filter::RequiredBlockFilter,
|
||||
@@ -139,6 +142,12 @@ pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
metrics: NetworkMetrics,
|
||||
/// Disconnect metrics for the Network
|
||||
disconnect_metrics: DisconnectMetrics,
|
||||
/// Closed sessions metrics, split by direction.
|
||||
closed_sessions_metrics: ClosedSessionsMetrics,
|
||||
/// Pending session failure metrics, split by direction.
|
||||
pending_session_failure_metrics: PendingSessionFailureMetrics,
|
||||
/// Backed off peers metrics, split by reason.
|
||||
backed_off_peers_metrics: BackedOffPeersMetrics,
|
||||
}
|
||||
|
||||
impl NetworkManager {
|
||||
@@ -354,6 +363,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
num_active_peers,
|
||||
metrics: Default::default(),
|
||||
disconnect_metrics: Default::default(),
|
||||
closed_sessions_metrics: Default::default(),
|
||||
pending_session_failure_metrics: Default::default(),
|
||||
backed_off_peers_metrics: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -860,13 +872,18 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
&peer_id,
|
||||
err,
|
||||
);
|
||||
self.backed_off_peers_metrics.increment_for_reason(
|
||||
BackoffReason::from_disconnect(err.as_disconnected()),
|
||||
);
|
||||
err.as_disconnected()
|
||||
} else {
|
||||
// Gracefully disconnected
|
||||
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
|
||||
self.backed_off_peers_metrics
|
||||
.increment_for_reason(BackoffReason::GracefulClose);
|
||||
None
|
||||
};
|
||||
self.metrics.closed_sessions.increment(1);
|
||||
self.closed_sessions_metrics.active.increment(1);
|
||||
self.update_active_connection_metrics();
|
||||
|
||||
if let Some(reason) = reason {
|
||||
@@ -891,7 +908,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
.state_mut()
|
||||
.peers_mut()
|
||||
.on_incoming_pending_session_dropped(remote_addr, err);
|
||||
self.metrics.pending_session_failures.increment(1);
|
||||
self.pending_session_failure_metrics.inbound.increment(1);
|
||||
if let Some(reason) = err.as_disconnected() {
|
||||
self.disconnect_metrics.increment(reason);
|
||||
}
|
||||
@@ -901,13 +918,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
.peers_mut()
|
||||
.on_incoming_pending_session_gracefully_closed();
|
||||
}
|
||||
self.metrics.closed_sessions.increment(1);
|
||||
self.closed_sessions_metrics.incoming_pending.increment(1);
|
||||
self.metrics
|
||||
.incoming_connections
|
||||
.set(self.swarm.state().peers().num_inbound_connections() as f64);
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
}
|
||||
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
|
||||
trace!(
|
||||
@@ -924,7 +938,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
&peer_id,
|
||||
err,
|
||||
);
|
||||
self.metrics.pending_session_failures.increment(1);
|
||||
self.pending_session_failure_metrics.outbound.increment(1);
|
||||
self.backed_off_peers_metrics.increment_for_reason(
|
||||
BackoffReason::from_disconnect(err.as_disconnected()),
|
||||
);
|
||||
if let Some(reason) = err.as_disconnected() {
|
||||
self.disconnect_metrics.increment(reason);
|
||||
}
|
||||
@@ -934,9 +951,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
.peers_mut()
|
||||
.on_outgoing_pending_session_gracefully_closed(&peer_id);
|
||||
}
|
||||
self.metrics.closed_sessions.increment(1);
|
||||
self.closed_sessions_metrics.outgoing_pending.increment(1);
|
||||
self.update_pending_connection_metrics();
|
||||
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
@@ -956,6 +972,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
&error,
|
||||
);
|
||||
|
||||
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
|
||||
@@ -22,12 +22,6 @@ pub struct NetworkMetrics {
|
||||
/// Number of peers known to the node
|
||||
pub(crate) tracked_peers: Gauge,
|
||||
|
||||
/// Cumulative number of failures of pending sessions
|
||||
pub(crate) pending_session_failures: Counter,
|
||||
|
||||
/// Total number of sessions closed
|
||||
pub(crate) closed_sessions: Counter,
|
||||
|
||||
/// Number of active incoming connections
|
||||
pub(crate) incoming_connections: Gauge,
|
||||
|
||||
@@ -77,6 +71,68 @@ pub struct NetworkMetrics {
|
||||
pub(crate) acc_duration_poll_swarm: Gauge,
|
||||
}
|
||||
|
||||
/// Metrics for closed sessions, split by direction.
|
||||
#[derive(Debug)]
|
||||
pub struct ClosedSessionsMetrics {
|
||||
/// Sessions closed from active (established) connections.
|
||||
pub active: Counter,
|
||||
/// Sessions closed from incoming pending connections.
|
||||
pub incoming_pending: Counter,
|
||||
/// Sessions closed from outgoing pending connections.
|
||||
pub outgoing_pending: Counter,
|
||||
}
|
||||
|
||||
impl Default for ClosedSessionsMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
active: metrics::counter!("network_closed_sessions", "direction" => "active"),
|
||||
incoming_pending: metrics::counter!("network_closed_sessions", "direction" => "incoming_pending"),
|
||||
outgoing_pending: metrics::counter!("network_closed_sessions", "direction" => "outgoing_pending"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for pending session failures, split by direction.
|
||||
#[derive(Debug)]
|
||||
pub struct PendingSessionFailureMetrics {
|
||||
/// Failures on incoming pending sessions.
|
||||
pub inbound: Counter,
|
||||
/// Failures on outgoing pending sessions.
|
||||
pub outbound: Counter,
|
||||
}
|
||||
|
||||
impl Default for PendingSessionFailureMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inbound: metrics::counter!("network_pending_session_failures", "direction" => "inbound"),
|
||||
outbound: metrics::counter!("network_pending_session_failures", "direction" => "outbound"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for backed off peers, split by reason.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "network.backed_off_peers")]
|
||||
pub struct BackedOffPeersMetrics {
|
||||
/// Peers backed off because they reported too many peers.
|
||||
pub too_many_peers: Counter,
|
||||
/// Peers backed off after a graceful session close.
|
||||
pub graceful_close: Counter,
|
||||
/// Peers backed off due to connection or protocol errors.
|
||||
pub connection_error: Counter,
|
||||
}
|
||||
|
||||
impl BackedOffPeersMetrics {
|
||||
/// Increments the counter for the given backoff reason.
|
||||
pub fn increment_for_reason(&self, reason: crate::peers::BackoffReason) {
|
||||
match reason {
|
||||
crate::peers::BackoffReason::TooManyPeers => self.too_many_peers.increment(1),
|
||||
crate::peers::BackoffReason::GracefulClose => self.graceful_close.increment(1),
|
||||
crate::peers::BackoffReason::ConnectionError => self.connection_error.increment(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for `SessionManager`
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "network")]
|
||||
|
||||
@@ -1260,6 +1260,27 @@ impl Display for InboundConnectionError {
|
||||
}
|
||||
}
|
||||
|
||||
/// The reason a peer was backed off.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum BackoffReason {
|
||||
/// The remote peer responded with `TooManyPeers` (0x04).
|
||||
TooManyPeers,
|
||||
/// The session was gracefully closed and we're backing off briefly.
|
||||
GracefulClose,
|
||||
/// A connection or protocol-level error occurred.
|
||||
ConnectionError,
|
||||
}
|
||||
|
||||
impl BackoffReason {
|
||||
/// Derives the backoff reason from an optional [`DisconnectReason`].
|
||||
pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
|
||||
match reason {
|
||||
Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
|
||||
_ => Self::ConnectionError,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use alloy_primitives::B512;
|
||||
|
||||
@@ -229,7 +229,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.executor.spawn(f.boxed());
|
||||
self.executor.spawn_task(f.boxed());
|
||||
}
|
||||
|
||||
/// Invoked on a received status update.
|
||||
|
||||
@@ -50,7 +50,7 @@ reth-rpc-eth-types.workspace = true
|
||||
reth-rpc-layer.workspace = true
|
||||
reth-stages.workspace = true
|
||||
reth-static-file.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-tasks = { workspace = true, features = ["rayon"] }
|
||||
reth-tokio-util.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
@@ -120,6 +120,7 @@ test-utils = [
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-node-ethereum/test-utils",
|
||||
"reth-primitives-traits/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
op = [
|
||||
"reth-db/op",
|
||||
|
||||
@@ -903,8 +903,8 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
.request_handler(self.provider().clone())
|
||||
.split_with_handle();
|
||||
|
||||
self.executor.spawn_critical_blocking("p2p txpool", Box::pin(txpool));
|
||||
self.executor.spawn_critical_blocking("p2p eth request handler", Box::pin(eth));
|
||||
self.executor.spawn_critical_blocking_task("p2p txpool", Box::pin(txpool));
|
||||
self.executor.spawn_critical_blocking_task("p2p eth request handler", Box::pin(eth));
|
||||
|
||||
let default_peers_path = self.config().datadir().known_peers();
|
||||
let known_peers_file = self.config().network.persistent_peers_file(default_peers_path);
|
||||
|
||||
@@ -324,7 +324,7 @@ mod test {
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_provider::noop::NoopProvider;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use reth_transaction_pool::noop::NoopTransactionPool;
|
||||
|
||||
#[test]
|
||||
@@ -345,9 +345,7 @@ mod test {
|
||||
|
||||
let task_executor = {
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
let handle = runtime.handle().clone();
|
||||
let manager = TaskManager::new(handle);
|
||||
manager.executor()
|
||||
Runtime::with_existing_handle(runtime.handle().clone()).unwrap()
|
||||
};
|
||||
|
||||
let node = NodeAdapter { components, task_executor, provider: NoopProvider::default() };
|
||||
|
||||
@@ -107,7 +107,8 @@ where
|
||||
let (payload_service, payload_service_handle) =
|
||||
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
|
||||
|
||||
ctx.task_executor().spawn_critical("payload builder service", Box::pin(payload_service));
|
||||
ctx.task_executor()
|
||||
.spawn_critical_task("payload builder service", Box::pin(payload_service));
|
||||
|
||||
Ok(payload_service_handle)
|
||||
}
|
||||
@@ -133,7 +134,7 @@ where
|
||||
) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
ctx.task_executor().spawn_critical("payload builder", async move {
|
||||
ctx.task_executor().spawn_critical_task("payload builder", async move {
|
||||
#[allow(clippy::collection_is_never_read)]
|
||||
let mut subscriptions = Vec::new();
|
||||
|
||||
|
||||
@@ -7,7 +7,8 @@ use reth_chainspec::EthereumHardforks;
|
||||
use reth_node_api::{BlockTy, NodeTypes, TxTy};
|
||||
use reth_transaction_pool::{
|
||||
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
|
||||
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
|
||||
SubPoolLimit, TransactionOrdering, TransactionPool, TransactionValidationTaskExecutor,
|
||||
TransactionValidator,
|
||||
};
|
||||
use std::future::Future;
|
||||
|
||||
@@ -174,10 +175,31 @@ where
|
||||
where
|
||||
BS: BlobStore,
|
||||
{
|
||||
let ctx = self.ctx;
|
||||
let transaction_pool = self.build(blob_store, pool_config);
|
||||
// Spawn maintenance tasks using standalone functions
|
||||
spawn_maintenance_tasks(ctx, transaction_pool.clone(), transaction_pool.config())?;
|
||||
self.build_with_ordering_and_spawn_maintenance_task(
|
||||
CoinbaseTipOrdering::default(),
|
||||
blob_store,
|
||||
pool_config,
|
||||
)
|
||||
}
|
||||
|
||||
/// Build the transaction pool with a custom [`TransactionOrdering`] and spawn its maintenance
|
||||
/// tasks.
|
||||
pub fn build_with_ordering_and_spawn_maintenance_task<BS, O>(
|
||||
self,
|
||||
ordering: O,
|
||||
blob_store: BS,
|
||||
pool_config: PoolConfig,
|
||||
) -> eyre::Result<reth_transaction_pool::Pool<TransactionValidationTaskExecutor<V>, O, BS>>
|
||||
where
|
||||
BS: BlobStore,
|
||||
O: TransactionOrdering<Transaction = V::Transaction>,
|
||||
{
|
||||
let TxPoolBuilder { ctx, validator, .. } = self;
|
||||
|
||||
let transaction_pool =
|
||||
reth_transaction_pool::Pool::new(validator, ordering, blob_store, pool_config.clone());
|
||||
|
||||
spawn_maintenance_tasks(ctx, transaction_pool.clone(), &pool_config)?;
|
||||
|
||||
Ok(transaction_pool)
|
||||
}
|
||||
@@ -256,7 +278,7 @@ where
|
||||
let chain_events = ctx.provider().canonical_state_stream();
|
||||
let client = ctx.provider().clone();
|
||||
|
||||
ctx.task_executor().spawn_critical(
|
||||
ctx.task_executor().spawn_critical_task(
|
||||
"txpool maintenance task",
|
||||
reth_transaction_pool::maintain::maintain_transaction_pool_future(
|
||||
client,
|
||||
|
||||
@@ -215,9 +215,7 @@ impl LaunchContext {
|
||||
/// Configure global settings this includes:
|
||||
///
|
||||
/// - Raising the file descriptor limit
|
||||
/// - Configuring the global rayon thread pool with available parallelism. Honoring
|
||||
/// engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
|
||||
/// core for the rayon thread pool
|
||||
/// - Configuring the global rayon thread pool for implicit `par_iter` usage
|
||||
pub fn configure_globals(&self, reserved_cpu_cores: usize) {
|
||||
// Raise the fd limit of the process.
|
||||
// Does not do anything on windows.
|
||||
@@ -229,14 +227,12 @@ impl LaunchContext {
|
||||
Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
|
||||
}
|
||||
|
||||
// Reserving the given number of CPU cores for the rest of OS.
|
||||
// Users can reserve more cores by setting engine.reserved-cpu-cores
|
||||
// Note: The global rayon thread pool will use at least one core.
|
||||
// Configure the implicit global rayon pool for `par_iter` usage.
|
||||
let num_threads = available_parallelism()
|
||||
.map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
|
||||
if let Err(err) = ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.thread_name(|i| format!("rayon-{i}"))
|
||||
.thread_name(|i| format!("rayon-{i:02}"))
|
||||
.build_global()
|
||||
{
|
||||
warn!(%err, "Failed to build global thread pool")
|
||||
@@ -503,6 +499,7 @@ where
|
||||
self.chain_spec(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
self.task_executor().clone(),
|
||||
)?
|
||||
.with_prune_modes(self.prune_modes())
|
||||
.with_changeset_cache(changeset_cache);
|
||||
@@ -558,7 +555,7 @@ where
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Pipeline should be run as blocking and panic if it fails.
|
||||
self.task_executor().spawn_critical_blocking(
|
||||
self.task_executor().spawn_critical_blocking_task(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
|
||||
@@ -678,7 +675,8 @@ where
|
||||
|
||||
debug!(target: "reth::cli", "Spawning stages metrics listener task");
|
||||
let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
|
||||
self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
|
||||
self.task_executor()
|
||||
.spawn_critical_task("stages metrics listener task", sync_metrics_listener);
|
||||
|
||||
LaunchContextWith {
|
||||
inner: self.inner,
|
||||
@@ -1105,7 +1103,7 @@ where
|
||||
// If engine events are provided, spawn listener for new payload reporting
|
||||
let ethstats_for_events = ethstats.clone();
|
||||
let task_executor = self.task_executor().clone();
|
||||
task_executor.spawn(Box::pin(async move {
|
||||
task_executor.spawn_task(Box::pin(async move {
|
||||
while let Some(event) = engine_events.next().await {
|
||||
use reth_engine_primitives::ConsensusEngineEvent;
|
||||
match event {
|
||||
@@ -1131,7 +1129,7 @@ where
|
||||
}));
|
||||
|
||||
// Spawn main ethstats service
|
||||
task_executor.spawn(Box::pin(async move { ethstats.run().await }));
|
||||
task_executor.spawn_task(Box::pin(async move { ethstats.run().await }));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -213,7 +213,7 @@ where
|
||||
handle
|
||||
.node
|
||||
.task_executor
|
||||
.spawn_critical("custom debug block provider consensus client", async move {
|
||||
.spawn_critical_task("custom debug block provider consensus client", async move {
|
||||
rpc_consensus_client.run().await
|
||||
});
|
||||
} else if let Some(url) = config.debug.rpc_consensus_url.clone() {
|
||||
@@ -234,7 +234,7 @@ where
|
||||
Arc::new(block_provider),
|
||||
);
|
||||
|
||||
handle.node.task_executor.spawn_critical("rpc-ws consensus client", async move {
|
||||
handle.node.task_executor.spawn_critical_task("rpc-ws consensus client", async move {
|
||||
rpc_consensus_client.run().await
|
||||
});
|
||||
} else if let Some(maybe_custom_etherscan_url) = config.debug.etherscan.clone() {
|
||||
@@ -262,9 +262,12 @@ where
|
||||
handle.node.add_ons_handle.beacon_engine_handle.clone(),
|
||||
Arc::new(block_provider),
|
||||
);
|
||||
handle.node.task_executor.spawn_critical("etherscan consensus client", async move {
|
||||
rpc_consensus_client.run().await
|
||||
});
|
||||
handle
|
||||
.node
|
||||
.task_executor
|
||||
.spawn_critical_task("etherscan consensus client", async move {
|
||||
rpc_consensus_client.run().await
|
||||
});
|
||||
}
|
||||
|
||||
if config.dev.dev {
|
||||
@@ -289,7 +292,7 @@ where
|
||||
};
|
||||
|
||||
let dev_mining_mode = handle.node.config.dev_mining_mode(pool);
|
||||
handle.node.task_executor.spawn_critical("local engine", async move {
|
||||
handle.node.task_executor.spawn_critical_task("local engine", async move {
|
||||
LocalMiner::new(
|
||||
blockchain_db,
|
||||
builder,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user