mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
85 Commits
mediocrego
...
klkvr/spar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1549e93eac | ||
|
|
48e1270dec | ||
|
|
821974a6a6 | ||
|
|
6183adb2db | ||
|
|
1b0c54a0a9 | ||
|
|
3aaa8daefc | ||
|
|
bb97b80116 | ||
|
|
81d1aa1eb4 | ||
|
|
d6d1a090e5 | ||
|
|
b52700dd95 | ||
|
|
65201a1abf | ||
|
|
b7cb06b88f | ||
|
|
5b86a17331 | ||
|
|
914ab7d5e6 | ||
|
|
7e38827df5 | ||
|
|
dd2c5b7c53 | ||
|
|
ed528fb975 | ||
|
|
25a97f0be4 | ||
|
|
844e531a3f | ||
|
|
ef860c20fb | ||
|
|
755d1e5f61 | ||
|
|
7d1bd7c9f8 | ||
|
|
93c2455cef | ||
|
|
6c661eb868 | ||
|
|
c8b84404ae | ||
|
|
b722a7f360 | ||
|
|
f0eb2aad7c | ||
|
|
ea6da11cf4 | ||
|
|
20029d3c1a | ||
|
|
40eb3bb7b4 | ||
|
|
8875c8da25 | ||
|
|
d87b48c9fc | ||
|
|
95778a0cc1 | ||
|
|
17359dadd9 | ||
|
|
bab01a7bba | ||
|
|
82960045c9 | ||
|
|
513fca16e9 | ||
|
|
ad242a2002 | ||
|
|
840d6066a6 | ||
|
|
1f315b4f3d | ||
|
|
fa604aa3dc | ||
|
|
4b851acfdb | ||
|
|
785933a8ef | ||
|
|
eb5455de68 | ||
|
|
cb9bf1fe3a | ||
|
|
99973c7781 | ||
|
|
9d89d4cb5e | ||
|
|
f07b37f029 | ||
|
|
10b87c8a2e | ||
|
|
095d021969 | ||
|
|
0c192ef514 | ||
|
|
946c74a538 | ||
|
|
2455fa4ff5 | ||
|
|
0585dc1180 | ||
|
|
8b985c102a | ||
|
|
6fefbfc65f | ||
|
|
2012602909 | ||
|
|
c84b3475a4 | ||
|
|
f3107565f3 | ||
|
|
ac6a0fa372 | ||
|
|
5899cd4188 | ||
|
|
a3b76591b7 | ||
|
|
ab84d61f91 | ||
|
|
8c1724af13 | ||
|
|
3f6354c2a4 | ||
|
|
ba64eb5fc7 | ||
|
|
1b2935763b | ||
|
|
cf3eef1874 | ||
|
|
cc3f3d7062 | ||
|
|
92ce4f3af0 | ||
|
|
28e5911482 | ||
|
|
257ccd41e9 | ||
|
|
a53ef64a54 | ||
|
|
74f9e386ed | ||
|
|
45ff349b00 | ||
|
|
d057c4e9e6 | ||
|
|
405b40f02f | ||
|
|
868fffe0dd | ||
|
|
b288c4e259 | ||
|
|
4e26a6edc9 | ||
|
|
26f788d5ca | ||
|
|
9433ac5666 | ||
|
|
0796a20f24 | ||
|
|
2633d3e513 | ||
|
|
1f8fb7e58f |
@@ -1,20 +0,0 @@
|
||||
# Changelogs configuration for reth
|
||||
# https://github.com/wevm/changelogs
|
||||
|
||||
# How to bump packages that depend on changed packages
|
||||
dependent_bump = "patch"
|
||||
|
||||
[changelog]
|
||||
# Generate per-crate changelogs (vs single root changelog)
|
||||
format = "per-crate"
|
||||
|
||||
# Fixed groups: all always share the same version
|
||||
# reth binaries share version
|
||||
[[fixed]]
|
||||
members = ["reth", "op-reth"]
|
||||
|
||||
# Packages to ignore (internal/test-only crates)
|
||||
ignore = [
|
||||
"reth-testing-utils",
|
||||
"reth-bench",
|
||||
]
|
||||
@@ -1,5 +0,0 @@
|
||||
---
|
||||
reth-engine-tree: patch
|
||||
---
|
||||
|
||||
Reordered cache size calculations in `ExecutionCache::new` to group related operations together.
|
||||
@@ -1,6 +0,0 @@
|
||||
---
|
||||
reth: patch
|
||||
op-reth: patch
|
||||
---
|
||||
|
||||
Added automated changelog generation infrastructure using wevm/changelogs-rs with Claude Code integration. Configured per-crate changelog format with fixed version groups for reth binaries and exclusions for internal test utilities.
|
||||
@@ -1,5 +0,0 @@
|
||||
---
|
||||
reth: patch
|
||||
---
|
||||
|
||||
Updated Alloy dependencies from 1.5.2 to 1.6.1.
|
||||
59
.github/scripts/verify_image_arch.sh
vendored
59
.github/scripts/verify_image_arch.sh
vendored
@@ -1,59 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Verifies that Docker images have the expected architectures.
|
||||
#
|
||||
# Usage:
|
||||
# ./verify_image_arch.sh <targets> <registry> <ethereum_tags> <optimism_tags>
|
||||
#
|
||||
# Environment:
|
||||
# DRY_RUN=true - Skip actual verification, just print what would be checked.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
TARGETS="${1:-}"
|
||||
REGISTRY="${2:-}"
|
||||
ETHEREUM_TAGS="${3:-}"
|
||||
OPTIMISM_TAGS="${4:-}"
|
||||
DRY_RUN="${DRY_RUN:-false}"
|
||||
|
||||
verify_image() {
|
||||
local image="$1"
|
||||
shift
|
||||
local expected_archs=("$@")
|
||||
|
||||
echo "Checking $image..."
|
||||
|
||||
if [[ "$DRY_RUN" == "true" ]]; then
|
||||
echo " [dry-run] Would verify architectures: ${expected_archs[*]}"
|
||||
return 0
|
||||
fi
|
||||
|
||||
manifest=$(docker manifest inspect "$image" 2>/dev/null) || {
|
||||
echo "::error::Failed to inspect manifest for $image"
|
||||
return 1
|
||||
}
|
||||
|
||||
for arch in "${expected_archs[@]}"; do
|
||||
if ! echo "$manifest" | jq -e ".manifests[] | select(.platform.architecture == \"$arch\" and .platform.os == \"linux\")" > /dev/null; then
|
||||
echo "::error::Missing architecture $arch for $image"
|
||||
return 1
|
||||
fi
|
||||
echo " ✓ linux/$arch"
|
||||
done
|
||||
}
|
||||
|
||||
if [[ "$TARGETS" == *"nightly"* ]]; then
|
||||
verify_image "${REGISTRY}/reth:nightly" amd64 arm64
|
||||
verify_image "${REGISTRY}/op-reth:nightly" amd64 arm64
|
||||
verify_image "${REGISTRY}/reth:nightly-profiling" amd64
|
||||
verify_image "${REGISTRY}/reth:nightly-edge-profiling" amd64
|
||||
verify_image "${REGISTRY}/op-reth:nightly-profiling" amd64
|
||||
else
|
||||
for tag in $(echo "$ETHEREUM_TAGS" | tr ',' ' '); do
|
||||
verify_image "$tag" amd64 arm64
|
||||
done
|
||||
for tag in $(echo "$OPTIMISM_TAGS" | tr ',' ' '); do
|
||||
verify_image "$tag" amd64 arm64
|
||||
done
|
||||
fi
|
||||
|
||||
echo "All image architectures verified successfully"
|
||||
21
.github/workflows/changelog.yml
vendored
21
.github/workflows/changelog.yml
vendored
@@ -1,21 +0,0 @@
|
||||
name: Changelog
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
changelog:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
ref: ${{ github.head_ref }}
|
||||
- run: npm install -g @anthropic-ai/claude-code
|
||||
- uses: wevm/changelogs-rs/gen@master
|
||||
with:
|
||||
ai: 'claude -p'
|
||||
env:
|
||||
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
|
||||
10
.github/workflows/docker.yml
vendored
10
.github/workflows/docker.yml
vendored
@@ -102,13 +102,3 @@ jobs:
|
||||
set: |
|
||||
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
|
||||
optimism.tags=${{ steps.params.outputs.optimism_tags }}
|
||||
|
||||
- name: Verify image architectures
|
||||
env:
|
||||
DRY_RUN: ${{ github.event_name == 'workflow_dispatch' && inputs.dry_run }}
|
||||
run: |
|
||||
./.github/scripts/verify_image_arch.sh \
|
||||
"${{ steps.params.outputs.targets }}" \
|
||||
"ghcr.io/${{ github.repository_owner }}" \
|
||||
"${{ steps.params.outputs.ethereum_tags }}" \
|
||||
"${{ steps.params.outputs.optimism_tags }}"
|
||||
|
||||
2
.github/workflows/unit.yml
vendored
2
.github/workflows/unit.yml
vendored
@@ -90,7 +90,7 @@ jobs:
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
- run: cargo nextest run --release -p ef-tests --features "asm-keccak ef-tests"
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
|
||||
11
CLAUDE.md
11
CLAUDE.md
@@ -38,7 +38,7 @@ Reth is a high-performance Ethereum execution client written in Rust, focusing o
|
||||
|
||||
2. **Linting**: Run clippy with all features
|
||||
```bash
|
||||
cargo +nightly clippy --workspace --lib --examples --tests --benches --all-features
|
||||
RUSTFLAGS="-D warnings" cargo +nightly clippy --workspace --lib --examples --tests --benches --all-features --locked
|
||||
```
|
||||
|
||||
3. **Testing**: Use nextest for faster test execution
|
||||
@@ -169,11 +169,12 @@ Based on PR patterns, avoid:
|
||||
Before submitting changes, ensure:
|
||||
|
||||
1. **Format Check**: `cargo +nightly fmt --all --check`
|
||||
2. **Clippy**: No warnings
|
||||
2. **Clippy**: No warnings with `RUSTFLAGS="-D warnings"`
|
||||
3. **Tests Pass**: All unit and integration tests
|
||||
4. **Documentation**: Update relevant docs and add doc comments with `cargo docs --document-private-items`
|
||||
5. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
|
||||
|
||||
|
||||
### Opening PRs against <https://github.com/paradigmxyz/reth>
|
||||
|
||||
Label PRs appropriately, first check the available labels and then apply the relevant ones:
|
||||
@@ -348,10 +349,10 @@ Let's say you want to fix a bug where external IP resolution fails on startup:
|
||||
}
|
||||
```
|
||||
|
||||
5. **Run checks** (IMPORTANT!):
|
||||
5. **Run checks**:
|
||||
```bash
|
||||
cargo +nightly fmt --all
|
||||
cargo clippy --workspace --all-features # Make sure WHOLE WORKSPACE compiles!
|
||||
cargo clippy --all-features
|
||||
cargo test -p reth-discv4
|
||||
```
|
||||
|
||||
@@ -373,7 +374,7 @@ Let's say you want to fix a bug where external IP resolution fails on startup:
|
||||
cargo +nightly fmt --all
|
||||
|
||||
# Run lints
|
||||
cargo +nightly clippy --workspace --all-features
|
||||
RUSTFLAGS="-D warnings" cargo +nightly clippy --workspace --all-features --locked
|
||||
|
||||
# Run tests
|
||||
cargo nextest run --workspace
|
||||
|
||||
813
Cargo.lock
generated
813
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
107
Cargo.toml
107
Cargo.toml
@@ -480,7 +480,7 @@ revm-primitives = { version = "22.0.0", default-features = false }
|
||||
revm-interpreter = { version = "32.0.0", default-features = false }
|
||||
revm-database-interface = { version = "9.0.0", default-features = false }
|
||||
op-revm = { version = "15.0.0", default-features = false }
|
||||
revm-inspectors = "0.34.2"
|
||||
revm-inspectors = "0.34.1"
|
||||
|
||||
# eth
|
||||
alloy-dyn-abi = "1.5.4"
|
||||
@@ -490,42 +490,42 @@ alloy-sol-types = { version = "1.5.4", default-features = false }
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-evm = { version = "0.27.0", default-features = false }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
alloy-trie = { version = "0.9.1", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
|
||||
alloy-consensus = { version = "1.6.1", default-features = false }
|
||||
alloy-contract = { version = "1.6.1", default-features = false }
|
||||
alloy-eips = { version = "1.6.1", default-features = false }
|
||||
alloy-genesis = { version = "1.6.1", default-features = false }
|
||||
alloy-json-rpc = { version = "1.6.1", default-features = false }
|
||||
alloy-network = { version = "1.6.1", default-features = false }
|
||||
alloy-network-primitives = { version = "1.6.1", default-features = false }
|
||||
alloy-provider = { version = "1.6.1", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-client = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types = { version = "1.6.1", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.6.1", default-features = false }
|
||||
alloy-serde = { version = "1.6.1", default-features = false }
|
||||
alloy-signer = { version = "1.6.1", default-features = false }
|
||||
alloy-signer-local = { version = "1.6.1", default-features = false }
|
||||
alloy-transport = { version = "1.6.1" }
|
||||
alloy-transport-http = { version = "1.6.1", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.6.1", default-features = false }
|
||||
alloy-transport-ws = { version = "1.6.1", default-features = false }
|
||||
alloy-consensus = { version = "1.5.2", default-features = false }
|
||||
alloy-contract = { version = "1.5.2", default-features = false }
|
||||
alloy-eips = { version = "1.5.2", default-features = false }
|
||||
alloy-genesis = { version = "1.5.2", default-features = false }
|
||||
alloy-json-rpc = { version = "1.5.2", default-features = false }
|
||||
alloy-network = { version = "1.5.2", default-features = false }
|
||||
alloy-network-primitives = { version = "1.5.2", default-features = false }
|
||||
alloy-provider = { version = "1.5.2", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-client = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types = { version = "1.5.2", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.5.2", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.5.2", default-features = false }
|
||||
alloy-serde = { version = "1.5.2", default-features = false }
|
||||
alloy-signer = { version = "1.5.2", default-features = false }
|
||||
alloy-signer-local = { version = "1.5.2", default-features = false }
|
||||
alloy-transport = { version = "1.5.2" }
|
||||
alloy-transport-http = { version = "1.5.2", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.5.2", default-features = false }
|
||||
alloy-transport-ws = { version = "1.5.2", default-features = false }
|
||||
|
||||
# op
|
||||
alloy-op-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-op-evm = { version = "0.27.0", default-features = false }
|
||||
alloy-op-hardforks = "0.4.4"
|
||||
op-alloy-rpc-types = { version = "0.23.1", default-features = false }
|
||||
op-alloy-rpc-types-engine = { version = "0.23.1", default-features = false }
|
||||
@@ -543,7 +543,7 @@ backon = { version = "1.2", default-features = false, features = ["std-blocking-
|
||||
bincode = "1.3"
|
||||
bitflags = "2.4"
|
||||
boyer-moore-magiclen = "0.2.16"
|
||||
bytes = { version = "1.11.1", default-features = false }
|
||||
bytes = { version = "1.5", default-features = false }
|
||||
brotli = "8"
|
||||
cfg-if = "1.0"
|
||||
clap = "4"
|
||||
@@ -560,9 +560,9 @@ humantime-serde = "1.1"
|
||||
itertools = { version = "0.14", default-features = false }
|
||||
linked_hash_set = "0.1"
|
||||
lz4 = "1.28.1"
|
||||
modular-bitfield = "0.13.1"
|
||||
modular-bitfield = "0.11.2"
|
||||
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
|
||||
nybbles = { version = "0.4.8", default-features = false }
|
||||
nybbles = { version = "0.4.2", default-features = false }
|
||||
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
|
||||
parking_lot = "0.12"
|
||||
paste = "1.0"
|
||||
@@ -589,13 +589,13 @@ zstd = "0.13"
|
||||
byteorder = "1"
|
||||
fixed-cache = { version = "0.1.7", features = ["stats"] }
|
||||
moka = "0.12"
|
||||
tar-no-std = { version = "0.4.2", default-features = false }
|
||||
miniz_oxide = { version = "0.9.0", default-features = false }
|
||||
tar-no-std = { version = "0.3.2", default-features = false }
|
||||
miniz_oxide = { version = "0.8.4", default-features = false }
|
||||
chrono = "0.4.41"
|
||||
|
||||
# metrics
|
||||
metrics = "0.24.0"
|
||||
metrics-derive = "0.1.1"
|
||||
metrics-derive = "0.1"
|
||||
metrics-exporter-prometheus = { version = "0.18.0", default-features = false }
|
||||
metrics-process = "2.1.0"
|
||||
metrics-util = { default-features = false, version = "0.20.0" }
|
||||
@@ -607,7 +607,7 @@ quote = "1.0"
|
||||
# tokio
|
||||
tokio = { version = "1.44.2", default-features = false }
|
||||
tokio-stream = "0.1.11"
|
||||
tokio-tungstenite = "0.28.0"
|
||||
tokio-tungstenite = "0.26.2"
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
|
||||
# async
|
||||
@@ -620,7 +620,7 @@ futures-util = { version = "0.3", default-features = false }
|
||||
hyper = "1.3"
|
||||
hyper-util = "0.1.5"
|
||||
pin-project = "1.0.12"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
|
||||
reqwest = { version = "0.12", default-features = false }
|
||||
tracing-futures = "0.2"
|
||||
tower = "0.5"
|
||||
tower-http = "0.6"
|
||||
@@ -640,6 +640,7 @@ jsonrpsee-types = "0.26.0"
|
||||
http = "1.0"
|
||||
http-body = "1.0"
|
||||
http-body-util = "0.1.2"
|
||||
jsonwebtoken = "9"
|
||||
proptest-arbitrary-interop = "0.1.0"
|
||||
|
||||
# crypto
|
||||
@@ -653,7 +654,7 @@ rand_08 = { package = "rand", version = "0.8" }
|
||||
c-kzg = "2.1.5"
|
||||
|
||||
# config
|
||||
toml = "0.9"
|
||||
toml = "0.8"
|
||||
|
||||
# rocksdb
|
||||
rocksdb = { version = "0.24" }
|
||||
@@ -672,16 +673,16 @@ assert_matches = "1.5.0"
|
||||
criterion = { package = "codspeed-criterion-compat", version = "4.3" }
|
||||
insta = "1.41"
|
||||
proptest = "1.7"
|
||||
proptest-derive = "0.7"
|
||||
proptest-derive = "0.5"
|
||||
similar-asserts = { version = "1.5.0", features = ["serde"] }
|
||||
tempfile = "3.20"
|
||||
test-fuzz = "7"
|
||||
rstest = "0.26.1"
|
||||
rstest = "0.24.0"
|
||||
test-case = "3"
|
||||
|
||||
# ssz encoding
|
||||
ethereum_ssz = "0.10.1"
|
||||
ethereum_ssz_derive = "0.10.1"
|
||||
ethereum_ssz = "0.9.0"
|
||||
ethereum_ssz_derive = "0.9.0"
|
||||
|
||||
# allocators
|
||||
jemalloc_pprof = { version = "0.8", default-features = false }
|
||||
@@ -693,14 +694,14 @@ snmalloc-rs = { version = "0.3.7", features = ["build_cc"] }
|
||||
aes = "0.8.1"
|
||||
ahash = "0.8"
|
||||
anyhow = "1.0"
|
||||
bindgen = { version = "0.72", default-features = false }
|
||||
block-padding = "0.3"
|
||||
bindgen = { version = "0.71", default-features = false }
|
||||
block-padding = "0.3.2"
|
||||
cc = "1.2.15"
|
||||
cipher = "0.4.3"
|
||||
comfy-table = "7.0"
|
||||
concat-kdf = "0.1.0"
|
||||
crossbeam-channel = "0.5.13"
|
||||
crossterm = "0.29.0"
|
||||
crossterm = "0.28.0"
|
||||
csv = "1.3.0"
|
||||
ctrlc = "3.4"
|
||||
ctr = "0.9.2"
|
||||
@@ -713,7 +714,7 @@ hmac = "0.12.1"
|
||||
human_bytes = "0.4.1"
|
||||
indexmap = "2"
|
||||
interprocess = "2.2.0"
|
||||
lz4_flex = { version = "0.12", default-features = false }
|
||||
lz4_flex = { version = "0.11", default-features = false }
|
||||
memmap2 = "0.9.4"
|
||||
mev-share-sse = { version = "0.5.0", default-features = false }
|
||||
num-traits = "0.2.15"
|
||||
@@ -721,15 +722,15 @@ page_size = "0.6.0"
|
||||
parity-scale-codec = "3.2.1"
|
||||
plain_hasher = "0.2"
|
||||
pretty_assertions = "1.4"
|
||||
ratatui = { version = "0.30", default-features = false }
|
||||
ringbuffer = "0.16.0"
|
||||
ratatui = { version = "0.29", default-features = false }
|
||||
ringbuffer = "0.15.0"
|
||||
rmp-serde = "1.3"
|
||||
roaring = "0.11.3"
|
||||
roaring = "0.10.2"
|
||||
rolling-file = "0.2.0"
|
||||
sha3 = "0.10.5"
|
||||
snap = "1.1.1"
|
||||
socket2 = { version = "0.6", default-features = false }
|
||||
sysinfo = { version = "0.38", default-features = false }
|
||||
socket2 = { version = "0.5", default-features = false }
|
||||
sysinfo = { version = "0.33", default-features = false }
|
||||
tracing-journald = "0.3"
|
||||
tracing-logfmt = "=0.3.5"
|
||||
tracing-samply = "0.1"
|
||||
|
||||
@@ -29,8 +29,9 @@ ARG MANIFEST_PATH=bin/reth
|
||||
ARG BUILD_PROFILE=release
|
||||
ENV BUILD_PROFILE=$BUILD_PROFILE
|
||||
|
||||
# Extra Cargo flags (can be overridden, otherwise set per-platform below)
|
||||
# Extra Cargo flags
|
||||
ARG RUSTFLAGS=""
|
||||
ENV RUSTFLAGS="$RUSTFLAGS"
|
||||
|
||||
# Extra Cargo features
|
||||
ARG FEATURES=""
|
||||
@@ -45,18 +46,11 @@ ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
|
||||
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
|
||||
|
||||
# Build application
|
||||
# Platform-specific RUSTFLAGS: amd64 uses x86-64-v3 (Haswell+) with pclmulqdq for rocksdb
|
||||
ARG TARGETPLATFORM
|
||||
COPY --exclude=.git . .
|
||||
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
|
||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
if [ -n "$RUSTFLAGS" ]; then \
|
||||
export RUSTFLAGS="$RUSTFLAGS"; \
|
||||
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
|
||||
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
|
||||
fi && \
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
|
||||
|
||||
RUN sccache --show-stats || true
|
||||
|
||||
@@ -56,7 +56,7 @@ ctrlc.workspace = true
|
||||
shlex.workspace = true
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
nix = { version = "0.31", features = ["signal", "process"] }
|
||||
nix = { version = "0.29", features = ["signal", "process"] }
|
||||
|
||||
[features]
|
||||
default = ["jemalloc"]
|
||||
|
||||
@@ -186,12 +186,10 @@ impl BenchmarkRunner {
|
||||
&output_dir.to_string_lossy(),
|
||||
]);
|
||||
|
||||
// Configure wait mode: both can be used together
|
||||
// When both are set: wait at least wait_time, and also wait for persistence if needed
|
||||
// Configure wait mode: wait-time takes precedence over persistence-based flow
|
||||
if let Some(ref wait_time) = self.wait_time {
|
||||
cmd.args(["--wait-time", wait_time]);
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
} else if self.wait_for_persistence {
|
||||
cmd.arg("--wait-for-persistence");
|
||||
|
||||
// Add persistence threshold if specified
|
||||
|
||||
@@ -116,9 +116,9 @@ pub(crate) struct Args {
|
||||
|
||||
/// Optional fixed delay between engine API calls (passed to reth-bench).
|
||||
///
|
||||
/// Can be combined with `--wait-for-persistence`: when both are set,
|
||||
/// waits at least this duration, and also waits for persistence if needed.
|
||||
#[arg(long, value_name = "DURATION")]
|
||||
/// When set, reth-bench uses wait-time mode and disables persistence-based flow.
|
||||
/// This flag remains for compatibility with older scripts.
|
||||
#[arg(long, value_name = "DURATION", hide = true)]
|
||||
pub wait_time: Option<String>,
|
||||
|
||||
/// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
|
||||
@@ -126,9 +126,6 @@ pub(crate) struct Args {
|
||||
/// When enabled, waits for every Nth block to be persisted using the
|
||||
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
|
||||
/// doesn't outpace persistence.
|
||||
///
|
||||
/// Can be combined with `--wait-time`: when both are set, waits at least
|
||||
/// wait-time, and also waits for persistence if the block hasn't been persisted yet.
|
||||
#[arg(long)]
|
||||
pub wait_for_persistence: bool,
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ op-alloy-consensus = { workspace = true, features = ["alloy-compat"] }
|
||||
op-alloy-rpc-types-engine = { workspace = true, features = ["serde"] }
|
||||
|
||||
# reqwest
|
||||
reqwest.workspace = true
|
||||
reqwest = { workspace = true, default-features = false, features = ["rustls-tls-native-roots"] }
|
||||
|
||||
# tower
|
||||
tower.workspace = true
|
||||
|
||||
@@ -572,22 +572,13 @@ impl Command {
|
||||
|
||||
for i in 0..self.count {
|
||||
// Get initial batch of transactions for this payload
|
||||
let Some(mut result) = tx_buffer.take_batch().await else {
|
||||
info!(
|
||||
payloads_built = i,
|
||||
payloads_requested = self.count,
|
||||
"Transaction source exhausted, stopping"
|
||||
);
|
||||
break;
|
||||
};
|
||||
let mut result = tx_buffer
|
||||
.take_batch()
|
||||
.await
|
||||
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
|
||||
|
||||
if result.transactions.is_empty() {
|
||||
info!(
|
||||
payloads_built = i,
|
||||
payloads_requested = self.count,
|
||||
"No more transactions available, stopping"
|
||||
);
|
||||
break;
|
||||
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
|
||||
}
|
||||
|
||||
// Build with retry - may need to request more transactions
|
||||
|
||||
@@ -96,23 +96,9 @@ impl Command {
|
||||
);
|
||||
}
|
||||
|
||||
// Set up waiter based on configured options
|
||||
// When both are set: wait at least wait_time, and also wait for persistence if needed
|
||||
// Set up waiter based on configured options (duration takes precedence)
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), true) => {
|
||||
let ws_url = derive_ws_rpc_url(
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
))
|
||||
}
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
|
||||
//! - **Persistence-based waits**: Wait for blocks to be persisted using
|
||||
//! `reth_subscribePersistedBlock` subscription
|
||||
//! - **Combined mode**: Wait at least the fixed duration, and also wait for persistence if the
|
||||
//! block hasn't been persisted yet (whichever takes longer)
|
||||
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
@@ -221,39 +219,14 @@ impl PersistenceWaiter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a waiter that combines both duration and persistence waiting.
|
||||
///
|
||||
/// Waits at least `wait_time` between blocks, and also waits for persistence
|
||||
/// if the block hasn't been persisted yet (whichever takes longer).
|
||||
pub(crate) const fn with_duration_and_subscription(
|
||||
wait_time: Duration,
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
///
|
||||
/// When both `wait_time` and `subscription` are set (combined mode):
|
||||
/// - Always waits at least `wait_time`
|
||||
/// - Additionally waits for persistence if we're at a persistence checkpoint
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
// Always wait for the fixed duration if configured
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Check persistence if subscription is configured
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
@@ -148,20 +148,9 @@ impl Command {
|
||||
);
|
||||
}
|
||||
|
||||
// Set up waiter based on configured options
|
||||
// When both are set: wait at least wait_time, and also wait for persistence if needed
|
||||
// Set up waiter based on configured options (duration takes precedence)
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
))
|
||||
}
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
};
|
||||
use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
|
||||
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
|
||||
use alloy_primitives::{map::B256Map, BlockNumber, TxHash, B256};
|
||||
use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256};
|
||||
use parking_lot::RwLock;
|
||||
use reth_chainspec::ChainInfo;
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
@@ -57,7 +57,7 @@ pub(crate) struct InMemoryStateMetrics {
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct InMemoryState<N: NodePrimitives = EthPrimitives> {
|
||||
/// All canonical blocks that are not on disk yet.
|
||||
blocks: RwLock<B256Map<Arc<BlockState<N>>>>,
|
||||
blocks: RwLock<HashMap<B256, Arc<BlockState<N>>>>,
|
||||
/// Mapping of block numbers to block hashes.
|
||||
numbers: RwLock<BTreeMap<u64, B256>>,
|
||||
/// The pending block that has not yet been made canonical.
|
||||
@@ -68,7 +68,7 @@ pub(crate) struct InMemoryState<N: NodePrimitives = EthPrimitives> {
|
||||
|
||||
impl<N: NodePrimitives> InMemoryState<N> {
|
||||
pub(crate) fn new(
|
||||
blocks: B256Map<Arc<BlockState<N>>>,
|
||||
blocks: HashMap<B256, Arc<BlockState<N>>>,
|
||||
numbers: BTreeMap<u64, B256>,
|
||||
pending: Option<BlockState<N>>,
|
||||
) -> Self {
|
||||
@@ -184,7 +184,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
/// Create a new in-memory state with the given blocks, numbers, pending state, and optional
|
||||
/// finalized header.
|
||||
pub fn new(
|
||||
blocks: B256Map<Arc<BlockState<N>>>,
|
||||
blocks: HashMap<B256, Arc<BlockState<N>>>,
|
||||
numbers: BTreeMap<u64, B256>,
|
||||
pending: Option<BlockState<N>>,
|
||||
finalized: Option<SealedHeader<N::BlockHeader>>,
|
||||
@@ -209,7 +209,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
|
||||
/// Create an empty state.
|
||||
pub fn empty() -> Self {
|
||||
Self::new(B256Map::default(), BTreeMap::new(), None, None, None)
|
||||
Self::new(HashMap::default(), BTreeMap::new(), None, None, None)
|
||||
}
|
||||
|
||||
/// Create a new in memory state with the given local head and finalized header
|
||||
@@ -1176,7 +1176,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_in_memory_state_impl_state_by_hash() {
|
||||
let mut state_by_hash = B256Map::default();
|
||||
let mut state_by_hash = HashMap::default();
|
||||
let number = rand::rng().random::<u64>();
|
||||
let mut test_block_builder: TestBlockBuilder = TestBlockBuilder::default();
|
||||
let state = Arc::new(create_mock_state(&mut test_block_builder, number, B256::random()));
|
||||
@@ -1190,7 +1190,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_in_memory_state_impl_state_by_number() {
|
||||
let mut state_by_hash = B256Map::default();
|
||||
let mut state_by_hash = HashMap::default();
|
||||
let mut hash_by_number = BTreeMap::new();
|
||||
|
||||
let number = rand::rng().random::<u64>();
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_in_memory_state_impl_head_state() {
|
||||
let mut state_by_hash = B256Map::default();
|
||||
let mut state_by_hash = HashMap::default();
|
||||
let mut hash_by_number = BTreeMap::new();
|
||||
let mut test_block_builder: TestBlockBuilder = TestBlockBuilder::default();
|
||||
let state1 = Arc::new(create_mock_state(&mut test_block_builder, 1, B256::random()));
|
||||
@@ -1237,7 +1237,7 @@ mod tests {
|
||||
let pending_hash = pending_state.hash();
|
||||
|
||||
let in_memory_state =
|
||||
InMemoryState::new(B256Map::default(), BTreeMap::new(), Some(pending_state));
|
||||
InMemoryState::new(HashMap::default(), BTreeMap::new(), Some(pending_state));
|
||||
|
||||
let result = in_memory_state.pending_state();
|
||||
assert!(result.is_some());
|
||||
@@ -1249,7 +1249,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_in_memory_state_impl_no_pending_state() {
|
||||
let in_memory_state: InMemoryState =
|
||||
InMemoryState::new(B256Map::default(), BTreeMap::new(), None);
|
||||
InMemoryState::new(HashMap::default(), BTreeMap::new(), None);
|
||||
|
||||
assert_eq!(in_memory_state.pending_state(), None);
|
||||
}
|
||||
@@ -1380,7 +1380,7 @@ mod tests {
|
||||
let state2 = Arc::new(BlockState::with_parent(block2.clone(), Some(state1.clone())));
|
||||
let state3 = Arc::new(BlockState::with_parent(block3.clone(), Some(state2.clone())));
|
||||
|
||||
let mut blocks = B256Map::default();
|
||||
let mut blocks = HashMap::default();
|
||||
blocks.insert(block1.recovered_block().hash(), state1);
|
||||
blocks.insert(block2.recovered_block().hash(), state2);
|
||||
blocks.insert(block3.recovered_block().hash(), state3);
|
||||
@@ -1427,7 +1427,7 @@ mod tests {
|
||||
fn test_canonical_in_memory_state_canonical_chain_single_block() {
|
||||
let block = TestBlockBuilder::eth().get_executed_block_with_number(1, B256::random());
|
||||
let hash = block.recovered_block().hash();
|
||||
let mut blocks = B256Map::default();
|
||||
let mut blocks = HashMap::default();
|
||||
blocks.insert(hash, Arc::new(BlockState::new(block)));
|
||||
let mut numbers = BTreeMap::new();
|
||||
numbers.insert(1, hash);
|
||||
|
||||
@@ -541,7 +541,7 @@ impl<H: BlockHeader> ChainSpec<H> {
|
||||
}
|
||||
}
|
||||
|
||||
bf_params.first().map(|(_, params)| *params).unwrap_or_else(BaseFeeParams::ethereum)
|
||||
bf_params.first().map(|(_, params)| *params).unwrap_or(BaseFeeParams::ethereum())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,4 +133,4 @@ arbitrary = [
|
||||
"reth-ethereum-primitives/arbitrary",
|
||||
]
|
||||
|
||||
edge = ["reth-db-common/edge", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
|
||||
edge = ["reth-db-common/edge", "reth-stages/rocksdb", "reth-provider/rocksdb"]
|
||||
|
||||
@@ -121,16 +121,14 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
let genesis_block_number = self.chain.genesis().number.unwrap_or_default();
|
||||
let (db, sfp) = match access {
|
||||
AccessRights::RW => (
|
||||
init_db(db_path, self.db.database_args())?,
|
||||
Arc::new(init_db(db_path, self.db.database_args())?),
|
||||
StaticFileProviderBuilder::read_write(sf_path)
|
||||
.with_metrics()
|
||||
.with_genesis_block_number(genesis_block_number)
|
||||
.build()?,
|
||||
),
|
||||
AccessRights::RO | AccessRights::RoInconsistent => {
|
||||
(open_db_read_only(&db_path, self.db.database_args())?, {
|
||||
(Arc::new(open_db_read_only(&db_path, self.db.database_args())?), {
|
||||
let provider = StaticFileProviderBuilder::read_only(sf_path)
|
||||
.with_metrics()
|
||||
.with_genesis_block_number(genesis_block_number)
|
||||
.build()?;
|
||||
provider.watch_directory();
|
||||
@@ -162,16 +160,16 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
fn create_provider_factory<N: CliNodeTypes>(
|
||||
&self,
|
||||
config: &Config,
|
||||
db: DatabaseEnv,
|
||||
db: Arc<DatabaseEnv>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
access: AccessRights,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
{
|
||||
let prune_modes = config.prune.segments.clone();
|
||||
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, DatabaseEnv>>::new(
|
||||
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::new(
|
||||
db,
|
||||
self.chain.clone(),
|
||||
static_file_provider,
|
||||
@@ -202,7 +200,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||
|
||||
// Builds and executes an unwind-only pipeline
|
||||
let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, DatabaseEnv>>::builder()
|
||||
let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder()
|
||||
.add_stages(DefaultStages::new(
|
||||
factory.clone(),
|
||||
tip_rx,
|
||||
@@ -231,7 +229,7 @@ pub struct Environment<N: NodeTypes> {
|
||||
/// Configuration for reth node
|
||||
pub config: Config,
|
||||
/// Provider factory.
|
||||
pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
/// Datadir path.
|
||||
pub data_dir: ChainPath<DataDirPath>,
|
||||
}
|
||||
@@ -263,8 +261,8 @@ impl AccessRights {
|
||||
/// Helper alias to satisfy `FullNodeTypes` bound on [`Node`] trait generic.
|
||||
type FullTypesAdapter<T> = FullNodeTypesAdapter<
|
||||
T,
|
||||
DatabaseEnv,
|
||||
BlockchainProvider<NodeTypesWithDBAdapter<T, DatabaseEnv>>,
|
||||
Arc<DatabaseEnv>,
|
||||
BlockchainProvider<NodeTypesWithDBAdapter<T, Arc<DatabaseEnv>>>,
|
||||
>;
|
||||
|
||||
/// Helper trait with a common set of requirements for the
|
||||
|
||||
@@ -17,6 +17,7 @@ use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProvider
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use std::{
|
||||
hash::{BuildHasher, Hasher},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
@@ -89,7 +90,7 @@ impl Command {
|
||||
/// Execute `db checksum` command
|
||||
pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
self,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
) -> eyre::Result<()> {
|
||||
warn!("This command should be run without the node running!");
|
||||
|
||||
@@ -116,7 +117,7 @@ fn checksum_hasher() -> impl Hasher {
|
||||
}
|
||||
|
||||
fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
segment: StaticFileSegment,
|
||||
start_block: Option<u64>,
|
||||
end_block: Option<u64>,
|
||||
|
||||
@@ -9,7 +9,7 @@ use reth_db_api::table::Table;
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDBAdapter;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use std::{hash::Hasher, time::Instant};
|
||||
use std::{hash::Hasher, sync::Arc, time::Instant};
|
||||
use tracing::info;
|
||||
|
||||
/// RocksDB tables that can be checksummed.
|
||||
@@ -36,7 +36,7 @@ impl RocksDbTable {
|
||||
|
||||
/// Computes a checksum for a RocksDB table.
|
||||
pub fn checksum_rocksdb<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
table: RocksDbTable,
|
||||
limit: Option<usize>,
|
||||
) -> eyre::Result<()> {
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::{
|
||||
hash::Hash,
|
||||
io::Write,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -55,7 +56,7 @@ impl Command {
|
||||
/// then written to a file in the output directory.
|
||||
pub fn execute<T: NodeTypes>(
|
||||
self,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<T, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<T, Arc<DatabaseEnv>>>,
|
||||
) -> eyre::Result<()> {
|
||||
warn!("Make sure the node is not running when running `reth db diff`!");
|
||||
// open second db
|
||||
|
||||
@@ -7,7 +7,7 @@ use reth_db::{transaction::DbTx, DatabaseEnv};
|
||||
use reth_db_api::{database::Database, table::Table, RawValue, TableViewer, Tables};
|
||||
use reth_db_common::{DbTool, ListFilter};
|
||||
use reth_node_builder::{NodeTypes, NodeTypesWithDBAdapter};
|
||||
use std::cell::RefCell;
|
||||
use std::{cell::RefCell, sync::Arc};
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
@@ -55,7 +55,7 @@ impl Command {
|
||||
/// Execute `db list` command
|
||||
pub fn execute<N: NodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
self,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
) -> eyre::Result<()> {
|
||||
self.table.view(&ListTableViewer { tool, args: &self })
|
||||
}
|
||||
@@ -89,7 +89,7 @@ impl Command {
|
||||
}
|
||||
|
||||
struct ListTableViewer<'a, N: NodeTypes> {
|
||||
tool: &'a DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &'a DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
args: &'a Command,
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_provider::providers::ProviderNodeTypes;
|
||||
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
thread,
|
||||
@@ -231,7 +230,7 @@ impl Command {
|
||||
thread::scope(|s| {
|
||||
let handles: Vec<_> = (0..num_threads)
|
||||
.map(|thread_id| {
|
||||
spawn_scoped_os_thread(s, "db-state-worker", move || {
|
||||
s.spawn(move || {
|
||||
loop {
|
||||
// Get next chunk to process
|
||||
let chunk_idx = {
|
||||
|
||||
@@ -16,7 +16,7 @@ use reth_provider::{
|
||||
RocksDBProviderFactory,
|
||||
};
|
||||
use reth_static_file_types::SegmentRangeInclusive;
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
/// The arguments for the `reth db stats` command
|
||||
@@ -48,7 +48,7 @@ impl Command {
|
||||
pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
self,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
) -> eyre::Result<()> {
|
||||
if self.checksum {
|
||||
let checksum_report = self.checksum_report(tool)?;
|
||||
@@ -72,7 +72,7 @@ impl Command {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn db_stats_table<N: NodeTypesWithDB<DB = DatabaseEnv>>(
|
||||
fn db_stats_table<N: NodeTypesWithDB<DB = Arc<DatabaseEnv>>>(
|
||||
&self,
|
||||
tool: &DbTool<N>,
|
||||
) -> eyre::Result<ComfyTable> {
|
||||
|
||||
@@ -227,9 +227,8 @@ where
|
||||
|
||||
// Handle errors
|
||||
if let Err(err) = res {
|
||||
error!("{err}");
|
||||
error!("{:?}", err)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -242,7 +241,6 @@ fn event_loop<B: Backend, F, T: Table>(
|
||||
) -> io::Result<()>
|
||||
where
|
||||
F: FnMut(usize, usize) -> Vec<TableRow<T>>,
|
||||
io::Error: From<B::Error>,
|
||||
{
|
||||
let mut last_tick = Instant::now();
|
||||
let mut running = true;
|
||||
|
||||
@@ -2,7 +2,7 @@ use futures::Future;
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_db::DatabaseEnv;
|
||||
use reth_node_builder::{NodeBuilder, WithLaunchContext};
|
||||
use std::fmt;
|
||||
use std::{fmt, sync::Arc};
|
||||
|
||||
/// A trait for launching a reth node with custom configuration strategies.
|
||||
///
|
||||
@@ -30,7 +30,7 @@ where
|
||||
/// * `builder_args` - Extension arguments for configuration
|
||||
fn entrypoint(
|
||||
self,
|
||||
builder: WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
builder: WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
builder_args: Ext,
|
||||
) -> impl Future<Output = eyre::Result<()>>;
|
||||
}
|
||||
@@ -58,7 +58,7 @@ impl<F> FnLauncher<F> {
|
||||
where
|
||||
C: ChainSpecParser,
|
||||
F: AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> eyre::Result<()>,
|
||||
{
|
||||
@@ -77,13 +77,13 @@ where
|
||||
C: ChainSpecParser,
|
||||
Ext: clap::Args + fmt::Debug,
|
||||
F: AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> eyre::Result<()>,
|
||||
{
|
||||
fn entrypoint(
|
||||
self,
|
||||
builder: WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
builder: WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
builder_args: Ext,
|
||||
) -> impl Future<Output = eyre::Result<()>> {
|
||||
(self.func)(builder, builder_args)
|
||||
|
||||
@@ -206,7 +206,7 @@ where
|
||||
let db_path = data_dir.db();
|
||||
|
||||
tracing::info!(target: "reth::cli", path = ?db_path, "Opening database");
|
||||
let database = init_db(db_path.clone(), self.db.database_args())?.with_metrics();
|
||||
let database = Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics());
|
||||
|
||||
if with_unused_ports {
|
||||
node_config = node_config.with_unused_ports();
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Command that runs pruning.
|
||||
//! Command that runs pruning without any limits.
|
||||
use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
|
||||
use clap::Parser;
|
||||
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_cli_util::cancellation::CancellationToken;
|
||||
use reth_node_builder::common::metrics_hooks;
|
||||
use reth_node_core::{args::MetricArgs, version::version_metadata};
|
||||
use reth_node_metrics::{
|
||||
@@ -12,14 +11,12 @@ use reth_node_metrics::{
|
||||
server::{MetricServer, MetricServerConfig},
|
||||
version::VersionInfo,
|
||||
};
|
||||
#[cfg(all(unix, feature = "edge"))]
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_prune::PrunerBuilder;
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
/// Prunes according to the configuration
|
||||
/// Prunes according to the configuration without any limits
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct PruneCommand<C: ChainSpecParser> {
|
||||
#[command(flatten)]
|
||||
@@ -53,7 +50,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
build_profile: version_metadata().build_profile_name.as_ref(),
|
||||
},
|
||||
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
|
||||
ctx.task_executor.clone(),
|
||||
ctx.task_executor,
|
||||
metrics_hooks(&provider_factory),
|
||||
data_dir.pprof_dumps(),
|
||||
);
|
||||
@@ -72,66 +69,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
// Delete data which has been copied to static files.
|
||||
if let Some(prune_tip) = lowest_static_file_height {
|
||||
info!(target: "reth::cli", ?prune_tip, ?config, "Pruning data from database...");
|
||||
|
||||
// Set up cancellation token for graceful shutdown on Ctrl+C
|
||||
let cancellation = CancellationToken::new();
|
||||
let cancellation_clone = cancellation.clone();
|
||||
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
|
||||
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
|
||||
cancellation_clone.cancel();
|
||||
});
|
||||
|
||||
// Use batched pruning with a limit to bound memory, running in a loop until complete.
|
||||
//
|
||||
// A limit of 20_000_000 results in a max memory usage of ~5G.
|
||||
const DELETE_LIMIT: usize = 20_000_000;
|
||||
// Run the pruner according to the configuration, and don't enforce any limits on it
|
||||
let mut pruner = PrunerBuilder::new(config)
|
||||
.delete_limit(DELETE_LIMIT)
|
||||
.build_with_provider_factory(provider_factory.clone());
|
||||
.delete_limit(usize::MAX)
|
||||
.build_with_provider_factory(provider_factory);
|
||||
|
||||
let mut total_pruned = 0usize;
|
||||
loop {
|
||||
if cancellation.is_cancelled() {
|
||||
info!(target: "reth::cli", total_pruned, "Pruning interrupted by user");
|
||||
break;
|
||||
}
|
||||
|
||||
let output = pruner.run(prune_tip)?;
|
||||
let batch_pruned: usize = output.segments.iter().map(|(_, seg)| seg.pruned).sum();
|
||||
total_pruned = total_pruned.saturating_add(batch_pruned);
|
||||
|
||||
// Check if all segments are finished (not just the overall progress,
|
||||
// since the pruner sets overall progress from the last segment only)
|
||||
let all_segments_finished =
|
||||
output.segments.iter().all(|(_, seg)| seg.progress.is_finished());
|
||||
|
||||
if all_segments_finished {
|
||||
info!(target: "reth::cli", total_pruned, "Pruned data from database");
|
||||
break;
|
||||
}
|
||||
|
||||
if batch_pruned == 0 {
|
||||
return Err(eyre::eyre!(
|
||||
"pruner made no progress but reported more data remaining; \
|
||||
aborting to prevent infinite loop"
|
||||
));
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
batch_pruned,
|
||||
total_pruned,
|
||||
"Pruning batch complete, continuing..."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Flush and compact RocksDB to reclaim disk space after pruning
|
||||
#[cfg(all(unix, feature = "edge"))]
|
||||
{
|
||||
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
|
||||
provider_factory.rocksdb_provider().flush_and_compact()?;
|
||||
info!(target: "reth::cli", "RocksDB compaction complete");
|
||||
pruner.run(prune_tip)?;
|
||||
info!(target: "reth::cli", "Pruned data from database");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -26,7 +26,7 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
|
||||
consensus: C,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
N: ProviderNodeTypes<DB = DatabaseEnv>,
|
||||
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
|
||||
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
C: FullConsensus<E::Primitives> + 'static,
|
||||
{
|
||||
@@ -39,7 +39,7 @@ where
|
||||
if should_run {
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
|
||||
@@ -10,9 +10,10 @@ use reth_provider::{
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = DatabaseEnv>>(
|
||||
pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
|
||||
db_tool: &DbTool<N>,
|
||||
from: BlockNumber,
|
||||
to: BlockNumber,
|
||||
@@ -35,7 +36,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
|
||||
if should_run {
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
|
||||
@@ -9,9 +9,10 @@ use reth_provider::{
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = DatabaseEnv>>(
|
||||
pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
|
||||
db_tool: &DbTool<N>,
|
||||
from: u64,
|
||||
to: u64,
|
||||
@@ -25,7 +26,7 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
|
||||
if should_run {
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
|
||||
@@ -34,7 +34,7 @@ pub(crate) async fn dump_merkle_stage<N>(
|
||||
consensus: impl FullConsensus<N::Primitives> + 'static,
|
||||
) -> Result<()>
|
||||
where
|
||||
N: ProviderNodeTypes<DB = DatabaseEnv>,
|
||||
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
|
||||
{
|
||||
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
|
||||
|
||||
@@ -59,7 +59,7 @@ where
|
||||
if should_run {
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
|
||||
@@ -158,7 +158,7 @@ enum Subcommands {
|
||||
|
||||
impl Subcommands {
|
||||
/// Returns the block to unwind to. The returned block will stay in database.
|
||||
fn unwind_target<N: ProviderNodeTypes<DB = DatabaseEnv>>(
|
||||
fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
|
||||
&self,
|
||||
factory: ProviderFactory<N>,
|
||||
) -> eyre::Result<u64> {
|
||||
|
||||
@@ -83,7 +83,22 @@ impl CliRunner {
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
// complete.
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(tokio_runtime);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -122,7 +137,19 @@ impl CliRunner {
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
// Shutdown the runtime on a separate thread
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(tokio_runtime);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -152,7 +179,13 @@ impl CliRunner {
|
||||
tokio_runtime
|
||||
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
|
||||
|
||||
tokio_shutdown(tokio_runtime, false);
|
||||
// drop the tokio runtime on a separate thread because drop blocks until its pools
|
||||
// (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
|
||||
// the current thread but we want to exit right away.
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || drop(tokio_runtime))
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -219,14 +252,7 @@ impl CliRunnerConfig {
|
||||
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
|
||||
/// enabled
|
||||
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
|
||||
// This prevents the costly process of spawning new threads on every
|
||||
// new block, and instead reuses the existing threads.
|
||||
.thread_keep_alive(Duration::from_secs(15))
|
||||
.thread_name("tokio-rt")
|
||||
.build()
|
||||
tokio::runtime::Builder::new_multi_thread().enable_all().build()
|
||||
}
|
||||
|
||||
/// Runs the given future to completion or until a critical task panicked.
|
||||
@@ -295,27 +321,3 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
|
||||
///
|
||||
/// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
/// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
/// complete.
|
||||
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
// Shutdown the runtime on a separate thread
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(rt);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
if wait {
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ auto_impl.workspace = true
|
||||
derive_more.workspace = true
|
||||
futures.workspace = true
|
||||
eyre.workspace = true
|
||||
reqwest.workspace = true
|
||||
reqwest = { workspace = true, features = ["rustls-tls"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
tokio = { workspace = true, features = ["time"] }
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -95,7 +95,7 @@ where
|
||||
let block_hash = payload.block_hash();
|
||||
let block_number = payload.block_number();
|
||||
|
||||
previous_block_hashes.enqueue(block_hash);
|
||||
previous_block_hashes.push(block_hash);
|
||||
|
||||
// Send new events to execution client
|
||||
let _ = self.engine_handle.new_payload(payload).await;
|
||||
@@ -160,7 +160,7 @@ mod tests {
|
||||
|
||||
// Push hashes 0..65
|
||||
for i in 0..65u8 {
|
||||
buffer.enqueue(B256::with_last_byte(i));
|
||||
buffer.push(B256::with_last_byte(i));
|
||||
}
|
||||
|
||||
// offset=0 should return the most recent (64)
|
||||
@@ -181,7 +181,7 @@ mod tests {
|
||||
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
|
||||
|
||||
// With only 1 entry, only offset=0 works
|
||||
buffer.enqueue(B256::with_last_byte(1));
|
||||
buffer.push(B256::with_last_byte(1));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 1), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), None);
|
||||
@@ -189,7 +189,7 @@ mod tests {
|
||||
|
||||
// With 33 entries, offset=32 works but offset=64 doesn't
|
||||
for i in 2..=33u8 {
|
||||
buffer.enqueue(B256::with_last_byte(i));
|
||||
buffer.push(B256::with_last_byte(i));
|
||||
}
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), None);
|
||||
|
||||
@@ -114,22 +114,22 @@ pub async fn setup_engine_with_chain_import(
|
||||
|
||||
// Initialize the database using init_db (same as CLI import command)
|
||||
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
|
||||
let db = reth_db::init_db(&db_path, db_args)?;
|
||||
let db_env = reth_db::init_db(&db_path, db_args)?;
|
||||
let db = Arc::new(db_env);
|
||||
|
||||
// Create a provider factory with the initialized database (use regular DB, not
|
||||
// TempDatabase) We need to specify the node types properly for the adapter
|
||||
let provider_factory =
|
||||
ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(
|
||||
static_files_path.clone(),
|
||||
)?,
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)?;
|
||||
let provider_factory = ProviderFactory::<
|
||||
NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>,
|
||||
>::new(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
reth_db_common::init::init_genesis(&provider_factory)?;
|
||||
@@ -320,10 +320,11 @@ mod tests {
|
||||
// Import the chain
|
||||
{
|
||||
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
|
||||
let db = reth_db::init_db(&db_path, db_args).unwrap();
|
||||
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
|
||||
let db = Arc::new(db_env);
|
||||
|
||||
let provider_factory: ProviderFactory<
|
||||
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
|
||||
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
|
||||
> = ProviderFactory::new(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
@@ -384,10 +385,11 @@ mod tests {
|
||||
|
||||
// Now reopen the database and verify checkpoints are still there
|
||||
{
|
||||
let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
|
||||
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
|
||||
let db = Arc::new(db_env);
|
||||
|
||||
let provider_factory: ProviderFactory<
|
||||
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
|
||||
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
|
||||
> = ProviderFactory::new(
|
||||
db,
|
||||
chain_spec.clone(),
|
||||
|
||||
@@ -528,12 +528,8 @@ impl TreeConfig {
|
||||
}
|
||||
|
||||
/// Setter for the number of storage proof worker threads.
|
||||
///
|
||||
/// No-op if it's [`None`].
|
||||
pub fn with_storage_worker_count_opt(mut self, storage_worker_count: Option<usize>) -> Self {
|
||||
if let Some(count) = storage_worker_count {
|
||||
self.storage_worker_count = count.max(MIN_WORKER_COUNT);
|
||||
}
|
||||
pub fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
|
||||
self.storage_worker_count = storage_worker_count.max(MIN_WORKER_COUNT);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -543,12 +539,8 @@ impl TreeConfig {
|
||||
}
|
||||
|
||||
/// Setter for the number of account proof worker threads.
|
||||
///
|
||||
/// No-op if it's [`None`].
|
||||
pub fn with_account_worker_count_opt(mut self, account_worker_count: Option<usize>) -> Self {
|
||||
if let Some(count) = account_worker_count {
|
||||
self.account_worker_count = count.max(MIN_WORKER_COUNT);
|
||||
}
|
||||
pub fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
|
||||
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ reth-engine-tree.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-network-p2p.workspace = true
|
||||
reth-payload-builder.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-prune.workspace = true
|
||||
reth-stages-api.workspace = true
|
||||
|
||||
@@ -14,6 +14,7 @@ pub use reth_engine_tree::{
|
||||
chain::{ChainEvent, ChainOrchestrator},
|
||||
engine::EngineApiEvent,
|
||||
};
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network_p2p::BlockClient;
|
||||
use reth_node_types::{BlockTy, NodeTypes};
|
||||
@@ -96,7 +97,7 @@ where
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ reth-evm = { workspace = true, features = ["metrics"] }
|
||||
reth-network-p2p.workspace = true
|
||||
reth-payload-builder.workspace = true
|
||||
reth-payload-primitives.workspace = true
|
||||
reth-primitives-traits = { workspace = true, features = ["rayon", "dashmap"] }
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-prune.workspace = true
|
||||
@@ -62,18 +62,19 @@ metrics.workspace = true
|
||||
reth-metrics = { workspace = true, features = ["common"] }
|
||||
|
||||
# misc
|
||||
dashmap.workspace = true
|
||||
schnellru.workspace = true
|
||||
rayon.workspace = true
|
||||
tracing.workspace = true
|
||||
derive_more.workspace = true
|
||||
parking_lot.workspace = true
|
||||
crossbeam-channel.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
# optional deps for test-utils
|
||||
reth-prune-types = { workspace = true, optional = true }
|
||||
reth-stages = { workspace = true, optional = true }
|
||||
reth-static-file = { workspace = true, optional = true }
|
||||
reth-tracing = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
@@ -132,6 +133,7 @@ test-utils = [
|
||||
"reth-stages-api/test-utils",
|
||||
"reth-stages/test-utils",
|
||||
"reth-static-file",
|
||||
"reth-tracing",
|
||||
"reth-trie/test-utils",
|
||||
"reth-trie-common/test-utils",
|
||||
"reth-trie-db/test-utils",
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::{map::B256Set, B256};
|
||||
use alloy_primitives::B256;
|
||||
use futures::FutureExt;
|
||||
use reth_consensus::Consensus;
|
||||
use reth_network_p2p::{
|
||||
@@ -12,7 +12,7 @@ use reth_network_p2p::{
|
||||
use reth_primitives_traits::{Block, SealedBlock};
|
||||
use std::{
|
||||
cmp::{Ordering, Reverse},
|
||||
collections::{binary_heap::PeekMut, BinaryHeap, VecDeque},
|
||||
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
|
||||
fmt::Debug,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
@@ -109,7 +109,7 @@ where
|
||||
}
|
||||
|
||||
/// Processes a block set download request.
|
||||
fn download_block_set(&mut self, hashes: B256Set) {
|
||||
fn download_block_set(&mut self, hashes: HashSet<B256>) {
|
||||
for hash in hashes {
|
||||
self.download_full_block(hash);
|
||||
}
|
||||
@@ -397,7 +397,7 @@ mod tests {
|
||||
|
||||
// send block set download request
|
||||
block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
|
||||
B256Set::from_iter([tip.hash(), tip.parent_hash]),
|
||||
HashSet::from([tip.hash(), tip.parent_hash]),
|
||||
)));
|
||||
|
||||
// ensure we have TOTAL_BLOCKS in flight full block request
|
||||
@@ -440,7 +440,7 @@ mod tests {
|
||||
)));
|
||||
|
||||
// send block set download request
|
||||
let download_set = B256Set::from_iter([tip.hash(), tip.parent_hash]);
|
||||
let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
|
||||
block_downloader
|
||||
.on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
|
||||
download::{BlockDownloader, DownloadAction, DownloadOutcome},
|
||||
};
|
||||
use alloy_primitives::{map::B256Set, B256};
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::Sender;
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
@@ -14,6 +14,7 @@ use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_payload_primitives::PayloadTypes;
|
||||
use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fmt::Display,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
@@ -340,7 +341,7 @@ pub enum RequestHandlerEvent<T> {
|
||||
#[derive(Debug)]
|
||||
pub enum DownloadRequest {
|
||||
/// Download the given set of blocks.
|
||||
BlockSet(B256Set),
|
||||
BlockSet(HashSet<B256>),
|
||||
/// Download the given range of blocks.
|
||||
BlockRange(B256, u64),
|
||||
}
|
||||
@@ -348,6 +349,6 @@ pub enum DownloadRequest {
|
||||
impl DownloadRequest {
|
||||
/// Returns a [`DownloadRequest`] for a single block.
|
||||
pub fn single_block(hash: B256) -> Self {
|
||||
Self::BlockSet(B256Set::from_iter([hash]))
|
||||
Self::BlockSet(HashSet::from([hash]))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) struct PersistenceMetrics {
|
||||
/// How long it took for blocks to be saved
|
||||
pub(crate) save_blocks_duration_seconds: Histogram,
|
||||
/// How many blocks we persist at once.
|
||||
pub(crate) save_blocks_batch_size: Histogram,
|
||||
pub(crate) save_blocks_block_count: Histogram,
|
||||
/// How long it took for blocks to be pruned
|
||||
pub(crate) prune_before_duration_seconds: Histogram,
|
||||
}
|
||||
|
||||
@@ -11,13 +11,8 @@ use reth_provider::{
|
||||
};
|
||||
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
|
||||
use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc::{Receiver, SendError, Sender},
|
||||
Arc,
|
||||
},
|
||||
thread::JoinHandle,
|
||||
sync::mpsc::{Receiver, SendError, Sender},
|
||||
time::Instant,
|
||||
};
|
||||
use thiserror::Error;
|
||||
@@ -45,12 +40,6 @@ where
|
||||
metrics: PersistenceMetrics,
|
||||
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
/// Pending finalized block number to be committed with the next block save.
|
||||
/// This avoids triggering a separate fsync for each finalized block update.
|
||||
pending_finalized_block: Option<u64>,
|
||||
/// Pending safe block number to be committed with the next block save.
|
||||
/// This avoids triggering a separate fsync for each safe block update.
|
||||
pending_safe_block: Option<u64>,
|
||||
}
|
||||
|
||||
impl<N> PersistenceService<N>
|
||||
@@ -64,15 +53,7 @@ where
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
incoming,
|
||||
pruner,
|
||||
metrics: PersistenceMetrics::default(),
|
||||
sync_metrics_tx,
|
||||
pending_finalized_block: None,
|
||||
pending_safe_block: None,
|
||||
}
|
||||
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
|
||||
}
|
||||
|
||||
/// Prunes block data before the given block number according to the configured prune
|
||||
@@ -125,10 +106,14 @@ where
|
||||
}
|
||||
}
|
||||
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
|
||||
self.pending_finalized_block = Some(finalized_block);
|
||||
let provider = self.provider.database_provider_rw()?;
|
||||
provider.save_finalized_block_number(finalized_block)?;
|
||||
provider.commit()?;
|
||||
}
|
||||
PersistenceAction::SaveSafeBlock(safe_block) => {
|
||||
self.pending_safe_block = Some(safe_block);
|
||||
let provider = self.provider.database_provider_rw()?;
|
||||
provider.save_safe_block_number(safe_block)?;
|
||||
provider.commit()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -153,39 +138,26 @@ where
|
||||
}
|
||||
|
||||
fn on_save_blocks(
|
||||
&mut self,
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
|
||||
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
|
||||
let block_count = blocks.len();
|
||||
|
||||
// Take any pending finalized/safe block updates to commit together
|
||||
let pending_finalized = self.pending_finalized_block.take();
|
||||
let pending_safe = self.pending_safe_block.take();
|
||||
|
||||
debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
if last_block.is_some() {
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
|
||||
|
||||
// Commit pending finalized/safe block updates in the same transaction
|
||||
if let Some(finalized) = pending_finalized {
|
||||
provider_rw.save_finalized_block_number(finalized)?;
|
||||
}
|
||||
if let Some(safe) = pending_safe {
|
||||
provider_rw.save_safe_block_number(safe)?;
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
}
|
||||
|
||||
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
|
||||
|
||||
self.metrics.save_blocks_batch_size.record(block_count as f64);
|
||||
self.metrics.save_blocks_block_count.record(block_count as f64);
|
||||
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
|
||||
|
||||
Ok(last_block)
|
||||
@@ -232,25 +204,15 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
|
||||
pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
|
||||
/// The channel used to communicate with the persistence service
|
||||
sender: Sender<PersistenceAction<N>>,
|
||||
/// Guard that joins the service thread when all handles are dropped.
|
||||
/// Uses `Arc` so the handle remains `Clone`.
|
||||
_service_guard: Arc<ServiceGuard>,
|
||||
}
|
||||
|
||||
impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
|
||||
///
|
||||
/// This is intended for testing purposes where you want to mock the persistence service.
|
||||
/// For production use, prefer [`spawn_service`](Self::spawn_service).
|
||||
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
|
||||
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
|
||||
pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
|
||||
///
|
||||
/// The returned handle can be cloned and shared. When all clones are dropped, the service
|
||||
/// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are
|
||||
/// released.
|
||||
pub fn spawn_service<N>(
|
||||
provider_factory: ProviderFactory<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
@@ -262,19 +224,22 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
// create the initial channels
|
||||
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
|
||||
|
||||
// construct persistence handle
|
||||
let persistence_handle = PersistenceHandle::new(db_service_tx);
|
||||
|
||||
// spawn the persistence service
|
||||
let db_service =
|
||||
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
|
||||
let join_handle = spawn_os_thread("persistence", || {
|
||||
if let Err(err) = db_service.run() {
|
||||
error!(target: "engine::persistence", ?err, "Persistence service failed");
|
||||
}
|
||||
});
|
||||
std::thread::Builder::new()
|
||||
.name("Persistence Service".to_string())
|
||||
.spawn(|| {
|
||||
if let Err(err) = db_service.run() {
|
||||
error!(target: "engine::persistence", ?err, "Persistence service failed");
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
PersistenceHandle {
|
||||
sender: db_service_tx,
|
||||
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
|
||||
}
|
||||
persistence_handle
|
||||
}
|
||||
|
||||
/// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
|
||||
@@ -302,10 +267,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||
}
|
||||
|
||||
/// Queues the finalized block number to be persisted on disk.
|
||||
///
|
||||
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
|
||||
/// call to avoid triggering a separate fsync for each update.
|
||||
/// Persists the finalized block number on disk.
|
||||
pub fn save_finalized_block_number(
|
||||
&self,
|
||||
finalized_block: u64,
|
||||
@@ -313,10 +275,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
|
||||
}
|
||||
|
||||
/// Queues the safe block number to be persisted on disk.
|
||||
///
|
||||
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
|
||||
/// call to avoid triggering a separate fsync for each update.
|
||||
/// Persists the safe block number on disk.
|
||||
pub fn save_safe_block_number(
|
||||
&self,
|
||||
safe_block: u64,
|
||||
@@ -338,27 +297,6 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard that joins the persistence service thread when dropped.
|
||||
///
|
||||
/// This ensures graceful shutdown - the service thread completes before resources like
|
||||
/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle
|
||||
/// can be cloned while sharing the same guard.
|
||||
struct ServiceGuard(Option<JoinHandle<()>>);
|
||||
|
||||
impl std::fmt::Debug for ServiceGuard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ServiceGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(join_handle) = self.0.take() {
|
||||
let _ = join_handle.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -385,12 +323,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_save_blocks_empty() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let handle = default_persistence_handle();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let blocks = vec![];
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let hash = rx.recv().unwrap();
|
||||
assert_eq!(hash, None);
|
||||
@@ -399,7 +337,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_save_blocks_single_block() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let handle = default_persistence_handle();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
let block_number = 0;
|
||||
let mut test_block_builder = TestBlockBuilder::eth();
|
||||
let executed =
|
||||
@@ -409,7 +347,7 @@ mod tests {
|
||||
let blocks = vec![executed];
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx
|
||||
.recv_timeout(std::time::Duration::from_secs(10))
|
||||
@@ -422,14 +360,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_save_blocks_multiple_blocks() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let handle = default_persistence_handle();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let mut test_block_builder = TestBlockBuilder::eth();
|
||||
let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
|
||||
assert_eq!(last_hash, actual_hash);
|
||||
}
|
||||
@@ -437,7 +375,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_save_blocks_multiple_calls() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let handle = default_persistence_handle();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let ranges = [0..1, 1..2, 2..4, 4..5];
|
||||
let mut test_block_builder = TestBlockBuilder::eth();
|
||||
@@ -446,7 +384,7 @@ mod tests {
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
|
||||
assert_eq!(last_hash, actual_hash);
|
||||
|
||||
@@ -76,8 +76,7 @@ impl CacheConfig for EpochCacheConfig {
|
||||
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
|
||||
|
||||
/// A wrapper of a state provider and a shared cache.
|
||||
#[derive(Debug)]
|
||||
pub struct CachedStateProvider<S> {
|
||||
pub(crate) struct CachedStateProvider<S> {
|
||||
/// The state provider
|
||||
state_provider: S,
|
||||
|
||||
@@ -97,7 +96,7 @@ where
|
||||
{
|
||||
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
|
||||
/// [`CachedStateMetrics`].
|
||||
pub const fn new(
|
||||
pub(crate) const fn new(
|
||||
state_provider: S,
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
@@ -115,7 +114,7 @@ impl<S> CachedStateProvider<S> {
|
||||
/// [`State`](revm::database::State) also caches internally during block execution and the cache
|
||||
/// is then updated after the block with the entire [`BundleState`] output of that block which
|
||||
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
|
||||
pub const fn prewarm(mut self) -> Self {
|
||||
pub(crate) const fn prewarm(mut self) -> Self {
|
||||
self.prewarm = true;
|
||||
self
|
||||
}
|
||||
@@ -132,7 +131,7 @@ impl<S> CachedStateProvider<S> {
|
||||
/// and the fixed-cache internal stats (collisions, size, capacity).
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.caching")]
|
||||
pub struct CachedStateMetrics {
|
||||
pub(crate) struct CachedStateMetrics {
|
||||
/// Number of times a new execution cache was created
|
||||
execution_cache_created_total: Counter,
|
||||
|
||||
@@ -187,7 +186,7 @@ pub struct CachedStateMetrics {
|
||||
|
||||
impl CachedStateMetrics {
|
||||
/// Sets all values to zero, indicating that a new block is being executed.
|
||||
pub fn reset(&self) {
|
||||
pub(crate) fn reset(&self) {
|
||||
// code cache
|
||||
self.code_cache_hits.set(0);
|
||||
self.code_cache_misses.set(0);
|
||||
@@ -205,7 +204,7 @@ impl CachedStateMetrics {
|
||||
}
|
||||
|
||||
/// Returns a new zeroed-out instance of [`CachedStateMetrics`].
|
||||
pub fn zeroed() -> Self {
|
||||
pub(crate) fn zeroed() -> Self {
|
||||
let zeroed = Self::default();
|
||||
zeroed.reset();
|
||||
zeroed
|
||||
@@ -327,7 +326,7 @@ impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
|
||||
|
||||
/// Represents the status of a key in the cache.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum CachedStatus<T> {
|
||||
pub(crate) enum CachedStatus<T> {
|
||||
/// The key is not in the cache (or was invalidated). The value was recalculated.
|
||||
NotCached(T),
|
||||
/// The key exists in cache and has a specific value.
|
||||
@@ -488,7 +487,7 @@ impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider
|
||||
/// Since EIP-6780, SELFDESTRUCT only works within the same transaction where the
|
||||
/// contract was created, so we don't need to handle clearing the storage.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ExecutionCache {
|
||||
pub(crate) struct ExecutionCache {
|
||||
/// Cache for contract bytecode, keyed by code hash.
|
||||
code_cache: Arc<FixedCache<B256, Option<Bytecode>, FbBuildHasher<32>>>,
|
||||
|
||||
@@ -520,7 +519,7 @@ impl ExecutionCache {
|
||||
///
|
||||
/// Fixed-cache requires power-of-two sizes for efficient indexing.
|
||||
/// With epochs enabled, the minimum size is 4096 entries.
|
||||
pub const fn bytes_to_entries(size_bytes: usize, entry_size: usize) -> usize {
|
||||
pub(crate) const fn bytes_to_entries(size_bytes: usize, entry_size: usize) -> usize {
|
||||
let entries = size_bytes / entry_size;
|
||||
// Round down to nearest power of two
|
||||
let rounded = if entries == 0 { 1 } else { (entries + 1).next_power_of_two() >> 1 };
|
||||
@@ -533,10 +532,10 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Build an [`ExecutionCache`] struct, so that execution caches can be easily cloned.
|
||||
pub fn new(total_cache_size: usize) -> Self {
|
||||
let code_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
|
||||
pub(crate) fn new(total_cache_size: usize) -> Self {
|
||||
let storage_cache_size = (total_cache_size * 8888) / 10000; // 88.88% of total
|
||||
let account_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
|
||||
let code_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
|
||||
|
||||
let code_capacity = Self::bytes_to_entries(code_cache_size, CODE_CACHE_ENTRY_SIZE);
|
||||
let storage_capacity = Self::bytes_to_entries(storage_cache_size, STORAGE_CACHE_ENTRY_SIZE);
|
||||
@@ -567,7 +566,7 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Gets code from cache, or inserts using the provided function.
|
||||
pub fn get_or_try_insert_code_with<E>(
|
||||
pub(crate) fn get_or_try_insert_code_with<E>(
|
||||
&self,
|
||||
hash: B256,
|
||||
f: impl FnOnce() -> Result<Option<Bytecode>, E>,
|
||||
@@ -586,7 +585,7 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Gets storage from cache, or inserts using the provided function.
|
||||
pub fn get_or_try_insert_storage_with<E>(
|
||||
pub(crate) fn get_or_try_insert_storage_with<E>(
|
||||
&self,
|
||||
address: Address,
|
||||
key: StorageKey,
|
||||
@@ -606,7 +605,7 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Gets account from cache, or inserts using the provided function.
|
||||
pub fn get_or_try_insert_account_with<E>(
|
||||
pub(crate) fn get_or_try_insert_account_with<E>(
|
||||
&self,
|
||||
address: Address,
|
||||
f: impl FnOnce() -> Result<Option<Account>, E>,
|
||||
@@ -625,7 +624,12 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Insert storage value into cache.
|
||||
pub fn insert_storage(&self, address: Address, key: StorageKey, value: Option<StorageValue>) {
|
||||
pub(crate) fn insert_storage(
|
||||
&self,
|
||||
address: Address,
|
||||
key: StorageKey,
|
||||
value: Option<StorageValue>,
|
||||
) {
|
||||
self.storage_cache.insert((address, key), value.unwrap_or_default());
|
||||
}
|
||||
|
||||
@@ -658,8 +662,7 @@ impl ExecutionCache {
|
||||
///
|
||||
/// Returns an error if the state updates are inconsistent and should be discarded.
|
||||
#[instrument(level = "debug", target = "engine::caching", skip_all)]
|
||||
#[expect(clippy::result_unit_err)]
|
||||
pub fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
|
||||
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
|
||||
.entered();
|
||||
@@ -768,7 +771,7 @@ impl ExecutionCache {
|
||||
/// A saved cache that has been used for executing a specific block, which has been updated for its
|
||||
/// execution.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SavedCache {
|
||||
pub(crate) struct SavedCache {
|
||||
/// The hash of the block these caches were used to execute.
|
||||
hash: B256,
|
||||
|
||||
@@ -788,43 +791,43 @@ pub struct SavedCache {
|
||||
|
||||
impl SavedCache {
|
||||
/// Creates a new instance with the internals
|
||||
pub fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
|
||||
pub(super) fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
|
||||
Self { hash, caches, metrics, usage_guard: Arc::new(()), disable_cache_metrics: false }
|
||||
}
|
||||
|
||||
/// Sets whether to disable cache metrics recording.
|
||||
pub const fn with_disable_cache_metrics(mut self, disable: bool) -> Self {
|
||||
pub(super) const fn with_disable_cache_metrics(mut self, disable: bool) -> Self {
|
||||
self.disable_cache_metrics = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the hash for this cache
|
||||
pub const fn executed_block_hash(&self) -> B256 {
|
||||
pub(crate) const fn executed_block_hash(&self) -> B256 {
|
||||
self.hash
|
||||
}
|
||||
|
||||
/// Splits the cache into its caches, metrics, and `disable_cache_metrics` flag, consuming it.
|
||||
pub fn split(self) -> (ExecutionCache, CachedStateMetrics, bool) {
|
||||
pub(crate) fn split(self) -> (ExecutionCache, CachedStateMetrics, bool) {
|
||||
(self.caches, self.metrics, self.disable_cache_metrics)
|
||||
}
|
||||
|
||||
/// Returns true if the cache is available for use (no other tasks are currently using it).
|
||||
pub fn is_available(&self) -> bool {
|
||||
pub(crate) fn is_available(&self) -> bool {
|
||||
Arc::strong_count(&self.usage_guard) == 1
|
||||
}
|
||||
|
||||
/// Returns the current strong count of the usage guard.
|
||||
pub fn usage_count(&self) -> usize {
|
||||
pub(crate) fn usage_count(&self) -> usize {
|
||||
Arc::strong_count(&self.usage_guard)
|
||||
}
|
||||
|
||||
/// Returns the [`ExecutionCache`] belonging to the tracked hash.
|
||||
pub const fn cache(&self) -> &ExecutionCache {
|
||||
pub(crate) const fn cache(&self) -> &ExecutionCache {
|
||||
&self.caches
|
||||
}
|
||||
|
||||
/// Returns the metrics associated with this cache.
|
||||
pub const fn metrics(&self) -> &CachedStateMetrics {
|
||||
pub(crate) const fn metrics(&self) -> &CachedStateMetrics {
|
||||
&self.metrics
|
||||
}
|
||||
|
||||
|
||||
@@ -13,13 +13,13 @@ use std::time::{Duration, Instant};
|
||||
|
||||
/// Metrics for the `EngineApi`.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EngineApiMetrics {
|
||||
pub(crate) struct EngineApiMetrics {
|
||||
/// Engine API-specific metrics.
|
||||
pub engine: EngineMetrics,
|
||||
pub(crate) engine: EngineMetrics,
|
||||
/// Block executor metrics.
|
||||
pub executor: ExecutorMetrics,
|
||||
pub(crate) executor: ExecutorMetrics,
|
||||
/// Metrics for block validation
|
||||
pub block_validation: BlockValidationMetrics,
|
||||
pub(crate) block_validation: BlockValidationMetrics,
|
||||
/// Canonical chain and reorg related metrics
|
||||
pub tree: TreeMetrics,
|
||||
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
|
||||
@@ -32,7 +32,7 @@ impl EngineApiMetrics {
|
||||
///
|
||||
/// This method updates metrics for execution time, gas usage, and the number
|
||||
/// of accounts, storage slots and bytecodes updated.
|
||||
pub fn record_block_execution<R>(
|
||||
pub(crate) fn record_block_execution<R>(
|
||||
&self,
|
||||
output: &BlockExecutionOutput<R>,
|
||||
execution_duration: Duration,
|
||||
@@ -59,27 +59,27 @@ impl EngineApiMetrics {
|
||||
}
|
||||
|
||||
/// Returns a reference to the executor metrics for use in state hooks.
|
||||
pub const fn executor_metrics(&self) -> &ExecutorMetrics {
|
||||
pub(crate) const fn executor_metrics(&self) -> &ExecutorMetrics {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
/// Records the duration of block pre-execution changes (e.g., beacon root update).
|
||||
pub fn record_pre_execution(&self, elapsed: Duration) {
|
||||
pub(crate) fn record_pre_execution(&self, elapsed: Duration) {
|
||||
self.executor.pre_execution_histogram.record(elapsed);
|
||||
}
|
||||
|
||||
/// Records the duration of block post-execution changes (e.g., finalization).
|
||||
pub fn record_post_execution(&self, elapsed: Duration) {
|
||||
pub(crate) fn record_post_execution(&self, elapsed: Duration) {
|
||||
self.executor.post_execution_histogram.record(elapsed);
|
||||
}
|
||||
|
||||
/// Records the time spent waiting for the next transaction from the iterator.
|
||||
pub fn record_transaction_wait(&self, elapsed: Duration) {
|
||||
pub(crate) fn record_transaction_wait(&self, elapsed: Duration) {
|
||||
self.executor.transaction_wait_histogram.record(elapsed);
|
||||
}
|
||||
|
||||
/// Records the duration of a single transaction execution.
|
||||
pub fn record_transaction_execution(&self, elapsed: Duration) {
|
||||
pub(crate) fn record_transaction_execution(&self, elapsed: Duration) {
|
||||
self.executor.transaction_execution_histogram.record(elapsed);
|
||||
}
|
||||
}
|
||||
@@ -87,7 +87,7 @@ impl EngineApiMetrics {
|
||||
/// Metrics for the entire blockchain tree
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "blockchain_tree")]
|
||||
pub struct TreeMetrics {
|
||||
pub(crate) struct TreeMetrics {
|
||||
/// The highest block number in the canonical chain
|
||||
pub canonical_chain_height: Gauge,
|
||||
/// The number of reorgs
|
||||
@@ -103,7 +103,7 @@ pub struct TreeMetrics {
|
||||
/// Metrics for the `EngineApi`.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
pub struct EngineMetrics {
|
||||
pub(crate) struct EngineMetrics {
|
||||
/// Engine API forkchoiceUpdated response type metrics
|
||||
#[metric(skip)]
|
||||
pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
|
||||
@@ -336,42 +336,42 @@ pub(crate) struct BalMetrics {
|
||||
/// Metrics for non-execution related block validation.
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.block_validation")]
|
||||
pub struct BlockValidationMetrics {
|
||||
pub(crate) struct BlockValidationMetrics {
|
||||
/// Total number of storage tries updated in the state root calculation
|
||||
pub state_root_storage_tries_updated_total: Counter,
|
||||
pub(crate) state_root_storage_tries_updated_total: Counter,
|
||||
/// Total number of times the parallel state root computation fell back to regular.
|
||||
pub state_root_parallel_fallback_total: Counter,
|
||||
pub(crate) state_root_parallel_fallback_total: Counter,
|
||||
/// Total number of times the state root task failed but the fallback succeeded.
|
||||
pub state_root_task_fallback_success_total: Counter,
|
||||
pub(crate) state_root_task_fallback_success_total: Counter,
|
||||
/// Latest state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub state_root_duration: Gauge,
|
||||
pub(crate) state_root_duration: Gauge,
|
||||
/// Histogram for state root duration ie the time spent blocked waiting for the state root
|
||||
pub state_root_histogram: Histogram,
|
||||
pub(crate) state_root_histogram: Histogram,
|
||||
/// Histogram of deferred trie computation duration.
|
||||
pub deferred_trie_compute_duration: Histogram,
|
||||
pub(crate) deferred_trie_compute_duration: Histogram,
|
||||
/// Payload conversion and validation latency
|
||||
pub payload_validation_duration: Gauge,
|
||||
pub(crate) payload_validation_duration: Gauge,
|
||||
/// Histogram of payload validation latency
|
||||
pub payload_validation_histogram: Histogram,
|
||||
pub(crate) payload_validation_histogram: Histogram,
|
||||
/// Payload processor spawning duration
|
||||
pub spawn_payload_processor: Histogram,
|
||||
pub(crate) spawn_payload_processor: Histogram,
|
||||
/// Post-execution validation duration
|
||||
pub post_execution_validation_duration: Histogram,
|
||||
pub(crate) post_execution_validation_duration: Histogram,
|
||||
/// Total duration of the new payload call
|
||||
pub total_duration: Histogram,
|
||||
pub(crate) total_duration: Histogram,
|
||||
/// Size of `HashedPostStateSorted` (`total_len`)
|
||||
pub hashed_post_state_size: Histogram,
|
||||
pub(crate) hashed_post_state_size: Histogram,
|
||||
/// Size of `TrieUpdatesSorted` (`total_len`)
|
||||
pub trie_updates_sorted_size: Histogram,
|
||||
pub(crate) trie_updates_sorted_size: Histogram,
|
||||
/// Size of `AnchoredTrieInput` overlay `TrieUpdatesSorted` (`total_len`)
|
||||
pub anchored_overlay_trie_updates_size: Histogram,
|
||||
pub(crate) anchored_overlay_trie_updates_size: Histogram,
|
||||
/// Size of `AnchoredTrieInput` overlay `HashedPostStateSorted` (`total_len`)
|
||||
pub anchored_overlay_hashed_state_size: Histogram,
|
||||
pub(crate) anchored_overlay_hashed_state_size: Histogram,
|
||||
}
|
||||
|
||||
impl BlockValidationMetrics {
|
||||
/// Records a new state root time, updating both the histogram and state root gauge
|
||||
pub fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
|
||||
pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
|
||||
self.state_root_storage_tries_updated_total
|
||||
.increment(trie_output.storage_tries_ref().len() as u64);
|
||||
self.state_root_duration.set(elapsed_as_secs);
|
||||
@@ -380,7 +380,7 @@ impl BlockValidationMetrics {
|
||||
|
||||
/// Records a new payload validation time, updating both the histogram and the payload
|
||||
/// validation gauge
|
||||
pub fn record_payload_validation(&self, elapsed_as_secs: f64) {
|
||||
pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
|
||||
self.payload_validation_duration.set(elapsed_as_secs);
|
||||
self.payload_validation_histogram.record(elapsed_as_secs);
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{
|
||||
chain::FromOrchestrator,
|
||||
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
|
||||
persistence::PersistenceHandle,
|
||||
tree::{error::InsertPayloadError, payload_validator::TreeCtx},
|
||||
tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
|
||||
};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
|
||||
@@ -37,7 +37,6 @@ use reth_provider::{
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use revm::state::EvmState;
|
||||
use state::TreeState;
|
||||
@@ -56,19 +55,18 @@ pub mod error;
|
||||
pub mod instrumented_state;
|
||||
mod invalid_headers;
|
||||
mod metrics;
|
||||
pub mod payload_processor;
|
||||
mod payload_processor;
|
||||
pub mod payload_validator;
|
||||
mod persistence_state;
|
||||
pub mod precompile_cache;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
#[expect(unused)]
|
||||
mod trie_updates;
|
||||
|
||||
use crate::tree::error::AdvancePersistenceError;
|
||||
pub use block_buffer::BlockBuffer;
|
||||
pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
|
||||
pub use invalid_headers::InvalidHeaderCache;
|
||||
pub use metrics::EngineApiMetrics;
|
||||
pub use payload_processor::*;
|
||||
pub use payload_validator::{BasicEngineValidator, EngineValidator};
|
||||
pub use persistence_state::PersistenceState;
|
||||
@@ -160,16 +158,6 @@ impl<N: NodePrimitives> EngineApiTreeState<N> {
|
||||
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the tree state.
|
||||
pub const fn tree_state(&self) -> &TreeState<N> {
|
||||
&self.tree_state
|
||||
}
|
||||
|
||||
/// Returns true if the block has been marked as invalid.
|
||||
pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
|
||||
self.invalid_headers.get(hash).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// The outcome of a tree operation.
|
||||
@@ -432,7 +420,7 @@ where
|
||||
changeset_cache,
|
||||
);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
spawn_os_thread("engine", || task.run());
|
||||
std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
|
||||
(incoming, outgoing)
|
||||
}
|
||||
|
||||
@@ -581,9 +569,6 @@ where
|
||||
&mut self,
|
||||
payload: T::ExecutionData,
|
||||
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
|
||||
let _trace_guard =
|
||||
reth_tracing::runtime::maybe_trace_newpayload_block(payload.block_number());
|
||||
|
||||
trace!(target: "engine::tree", "invoked new payload");
|
||||
|
||||
// start timing for the new payload process
|
||||
@@ -972,13 +957,14 @@ where
|
||||
&self,
|
||||
canonical_header: &SealedHeader<N::BlockHeader>,
|
||||
) -> ProviderResult<()> {
|
||||
// Load the block into memory if it's not already present
|
||||
self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
|
||||
let new_head_number = canonical_header.number();
|
||||
let new_head_hash = canonical_header.hash();
|
||||
|
||||
// Update the canonical head header
|
||||
self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
|
||||
|
||||
Ok(())
|
||||
// Load the block into memory if it's not already present
|
||||
self.ensure_block_in_memory(new_head_number, new_head_hash)
|
||||
}
|
||||
|
||||
/// Ensures a block is loaded into memory if not already present.
|
||||
@@ -2615,27 +2601,19 @@ where
|
||||
let block_num_hash = block_id.block;
|
||||
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
|
||||
|
||||
// Check if block already exists - first in memory, then DB only if it could be persisted
|
||||
if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
|
||||
convert_to_block(self, input)?;
|
||||
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
|
||||
}
|
||||
|
||||
// Only query DB if block could be persisted (number <= last persisted block).
|
||||
// New blocks from CL always have number > last persisted, so skip DB lookup for them.
|
||||
if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
|
||||
match self.provider.sealed_header_by_hash(block_num_hash.hash) {
|
||||
Err(err) => {
|
||||
let block = convert_to_block(self, input)?;
|
||||
return Err(InsertBlockError::new(block, err.into()).into());
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
convert_to_block(self, input)?;
|
||||
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
|
||||
}
|
||||
Ok(None) => {}
|
||||
match self.sealed_header_by_hash(block_num_hash.hash) {
|
||||
Err(err) => {
|
||||
let block = convert_to_block(self, input)?;
|
||||
return Err(InsertBlockError::new(block, err.into()).into());
|
||||
}
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
// We now assume that we already have this block in the tree. However, we need to
|
||||
// run the conversion to ensure that the block hash is valid.
|
||||
convert_to_block(self, input)?;
|
||||
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
||||
// Ensure that the parent state is available.
|
||||
match self.state_provider_builder(block_id.parent) {
|
||||
|
||||
@@ -21,7 +21,7 @@ pub fn total_slots(bal: &BlockAccessList) -> usize {
|
||||
/// first, followed by read-only slots. The iterator intelligently skips accounts and slots
|
||||
/// outside the specified range for efficient traversal.
|
||||
#[derive(Debug)]
|
||||
pub struct BALSlotIter<'a> {
|
||||
pub(crate) struct BALSlotIter<'a> {
|
||||
bal: &'a BlockAccessList,
|
||||
range: Range<usize>,
|
||||
current_index: usize,
|
||||
@@ -34,7 +34,7 @@ pub struct BALSlotIter<'a> {
|
||||
|
||||
impl<'a> BALSlotIter<'a> {
|
||||
/// Creates a new iterator over storage slots within the specified range.
|
||||
pub fn new(bal: &'a BlockAccessList, range: Range<usize>) -> Self {
|
||||
pub(crate) fn new(bal: &'a BlockAccessList, range: Range<usize>) -> Self {
|
||||
let mut iter = Self { bal, range, current_index: 0, account_idx: 0, slot_idx: 0 };
|
||||
iter.skip_to_range_start();
|
||||
iter
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
//! Executor for mixed I/O and CPU workloads.
|
||||
|
||||
use reth_trie_parallel::root::get_tokio_runtime_handle;
|
||||
use tokio::{runtime::Handle, task::JoinHandle};
|
||||
use std::{sync::OnceLock, time::Duration};
|
||||
use tokio::{
|
||||
runtime::{Builder, Handle, Runtime},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
/// An executor for mixed I/O and CPU workloads.
|
||||
///
|
||||
@@ -24,7 +27,7 @@ impl WorkloadExecutor {
|
||||
&self.inner.handle
|
||||
}
|
||||
|
||||
/// Runs the provided function on an executor dedicated to blocking operations.
|
||||
/// Shorthand for [`Runtime::spawn_blocking`]
|
||||
#[track_caller]
|
||||
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
|
||||
where
|
||||
@@ -42,6 +45,29 @@ struct WorkloadExecutorInner {
|
||||
|
||||
impl WorkloadExecutorInner {
|
||||
fn new() -> Self {
|
||||
Self { handle: get_tokio_runtime_handle() }
|
||||
fn get_runtime_handle() -> Handle {
|
||||
Handle::try_current().unwrap_or_else(|_| {
|
||||
// Create a new runtime if no runtime is available
|
||||
static RT: OnceLock<Runtime> = OnceLock::new();
|
||||
|
||||
let rt = RT.get_or_init(|| {
|
||||
Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
// Keep the threads alive for at least the block time, which is 12 seconds
|
||||
// at the time of writing, plus a little extra.
|
||||
//
|
||||
// This is to prevent the costly process of spawning new threads on every
|
||||
// new block, and instead reuse the existing
|
||||
// threads.
|
||||
.thread_keep_alive(Duration::from_secs(15))
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
rt.handle().clone()
|
||||
})
|
||||
}
|
||||
|
||||
Self { handle: get_runtime_handle() }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use executor::WorkloadExecutor;
|
||||
use metrics::{Counter, Histogram};
|
||||
use metrics::Counter;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
use prewarm::PrewarmMetrics;
|
||||
@@ -145,7 +145,7 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// Returns a reference to the workload executor driving payload tasks.
|
||||
pub const fn executor(&self) -> &WorkloadExecutor {
|
||||
pub(super) const fn executor(&self) -> &WorkloadExecutor {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
@@ -235,7 +235,8 @@ where
|
||||
+ 'static,
|
||||
{
|
||||
// start preparing transactions immediately
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx, transaction_count_hint) =
|
||||
self.spawn_tx_iterator(transactions);
|
||||
|
||||
let span = Span::current();
|
||||
let (to_sparse_trie, sparse_trie_rx) = channel();
|
||||
@@ -259,6 +260,7 @@ where
|
||||
self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
transaction_count_hint,
|
||||
provider_builder.clone(),
|
||||
None, // Don't send proof targets when BAL is present
|
||||
Some(bal),
|
||||
@@ -269,6 +271,7 @@ where
|
||||
self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
transaction_count_hint,
|
||||
provider_builder.clone(),
|
||||
Some(to_multi_proof.clone()),
|
||||
None,
|
||||
@@ -342,7 +345,7 @@ where
|
||||
///
|
||||
/// Returns a [`PayloadHandle`] to communicate with the task.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
|
||||
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
transactions: I,
|
||||
@@ -352,10 +355,10 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
|
||||
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
|
||||
let prewarm_handle =
|
||||
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
|
||||
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
|
||||
PayloadHandle {
|
||||
to_multi_proof: None,
|
||||
prewarm_handle,
|
||||
@@ -373,15 +376,19 @@ where
|
||||
) -> (
|
||||
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
|
||||
usize,
|
||||
) {
|
||||
let (transactions, convert) = transactions.into();
|
||||
let transactions = transactions.into_par_iter();
|
||||
let transaction_count_hint = transactions.len();
|
||||
|
||||
let (ooo_tx, ooo_rx) = mpsc::channel();
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into();
|
||||
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
self.executor.spawn_blocking(move || {
|
||||
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
@@ -417,14 +424,16 @@ where
|
||||
}
|
||||
});
|
||||
|
||||
(prewarm_rx, execute_rx)
|
||||
(prewarm_rx, execute_rx, transaction_count_hint)
|
||||
}
|
||||
|
||||
/// Spawn prewarming optionally wired to the multiproof task for target updates.
|
||||
#[expect(clippy::too_many_arguments)]
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
transaction_count_hint: usize,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
@@ -459,6 +468,7 @@ where
|
||||
self.execution_cache.clone(),
|
||||
prewarm_ctx,
|
||||
to_multi_proof,
|
||||
transaction_count_hint,
|
||||
self.prewarm_max_concurrency,
|
||||
);
|
||||
|
||||
@@ -513,27 +523,20 @@ where
|
||||
) {
|
||||
let preserved_sparse_trie = self.sparse_state_trie.clone();
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let span = Span::current();
|
||||
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
|
||||
let prune_depth = self.sparse_trie_prune_depth;
|
||||
let max_storage_tries = self.sparse_trie_max_storage_tries;
|
||||
let chunk_size =
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", "sparse_trie_task")
|
||||
.entered();
|
||||
let _enter = span.entered();
|
||||
|
||||
// Reuse a stored SparseStateTrie if available, applying continuation logic.
|
||||
// If this payload's parent state root matches the preserved trie's anchor,
|
||||
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
|
||||
// keep allocations.
|
||||
let start = Instant::now();
|
||||
let preserved = preserved_sparse_trie.take();
|
||||
trie_metrics
|
||||
.sparse_trie_cache_wait_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
|
||||
let sparse_state_trie = preserved
|
||||
let sparse_state_trie = preserved_sparse_trie
|
||||
.take()
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
@@ -559,16 +562,18 @@ where
|
||||
sparse_state_trie,
|
||||
))
|
||||
} else {
|
||||
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
|
||||
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new(
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
chunk_size,
|
||||
))
|
||||
};
|
||||
|
||||
let result = task.run();
|
||||
if let Err(e) = &result {
|
||||
tracing::error!(target: "engine::tree::payload_processor", "State root computation failed: {e:?}");
|
||||
}
|
||||
// Capture the computed state_root before sending the result
|
||||
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
|
||||
|
||||
@@ -587,14 +592,11 @@ where
|
||||
target: "engine::tree::payload_processor",
|
||||
"State root receiver dropped, clearing trie"
|
||||
);
|
||||
let (trie, deferred) = task.into_cleared_trie(
|
||||
let trie = task.into_cleared_trie(
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
// Drop guard before deferred to release lock before expensive deallocations
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -602,9 +604,9 @@ where
|
||||
// A failed computation may have left the trie in a partially updated state.
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
|
||||
let deferred = if let Some(state_root) = computed_state_root {
|
||||
if let Some(state_root) = computed_state_root {
|
||||
let start = std::time::Instant::now();
|
||||
let (trie, deferred) = task.into_trie_for_reuse(
|
||||
let trie = task.into_trie_for_reuse(
|
||||
prune_depth,
|
||||
max_storage_tries,
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
@@ -614,22 +616,17 @@ where
|
||||
.into_trie_for_reuse_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
guard.store(PreservedSparseTrie::anchored(trie, state_root));
|
||||
deferred
|
||||
} else {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"State root computation failed, clearing trie"
|
||||
);
|
||||
let (trie, deferred) = task.into_cleared_trie(
|
||||
let trie = task.into_cleared_trie(
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
deferred
|
||||
};
|
||||
// Drop guard before deferred to release lock before expensive deallocations
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -640,7 +637,7 @@ where
|
||||
///
|
||||
/// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
|
||||
/// hitting the database, maintaining performance consistency.
|
||||
pub fn on_inserted_executed_block(
|
||||
pub(crate) fn on_inserted_executed_block(
|
||||
&self,
|
||||
block_with_parent: BlockWithParent,
|
||||
bundle_state: &BundleState,
|
||||
@@ -737,19 +734,19 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
}
|
||||
|
||||
/// Returns a clone of the caches used by prewarming
|
||||
pub fn caches(&self) -> Option<ExecutionCache> {
|
||||
pub(super) fn caches(&self) -> Option<ExecutionCache> {
|
||||
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
|
||||
}
|
||||
|
||||
/// Returns a clone of the cache metrics used by prewarming
|
||||
pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
|
||||
pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
|
||||
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
|
||||
}
|
||||
|
||||
/// Terminates the pre-warming transaction processing.
|
||||
///
|
||||
/// Note: This does not terminate the task yet.
|
||||
pub fn stop_prewarming_execution(&self) {
|
||||
pub(super) fn stop_prewarming_execution(&self) {
|
||||
self.prewarm_handle.stop_prewarming_execution()
|
||||
}
|
||||
|
||||
@@ -760,7 +757,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
/// path without cloning the expensive `BundleState`.
|
||||
///
|
||||
/// Returns a sender for the channel that should be notified on block validation success.
|
||||
pub fn terminate_caching(
|
||||
pub(super) fn terminate_caching(
|
||||
&mut self,
|
||||
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
|
||||
) -> Option<mpsc::Sender<()>> {
|
||||
@@ -780,7 +777,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
|
||||
/// prewarm task without cloning the expensive `BundleState`.
|
||||
#[derive(Debug)]
|
||||
pub struct CacheTaskHandle<R> {
|
||||
pub(crate) struct CacheTaskHandle<R> {
|
||||
/// The shared cache the task operates with.
|
||||
saved_cache: Option<SavedCache>,
|
||||
/// Channel to the spawned prewarm task if any
|
||||
@@ -791,7 +788,7 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
|
||||
/// Terminates the pre-warming transaction processing.
|
||||
///
|
||||
/// Note: This does not terminate the task yet.
|
||||
pub fn stop_prewarming_execution(&self) {
|
||||
pub(super) fn stop_prewarming_execution(&self) {
|
||||
self.to_prewarm_task
|
||||
.as_ref()
|
||||
.map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
|
||||
@@ -802,7 +799,7 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
|
||||
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
|
||||
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
|
||||
#[must_use = "sender must be used and notified on block validation success"]
|
||||
pub fn terminate_caching(
|
||||
pub(super) fn terminate_caching(
|
||||
&mut self,
|
||||
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
|
||||
) -> Option<mpsc::Sender<()>> {
|
||||
@@ -856,7 +853,7 @@ impl<R> Drop for CacheTaskHandle<R> {
|
||||
/// - Prepares data for state root proof computation
|
||||
/// - Runs concurrently but must not interfere with cache saves
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct PayloadExecutionCache {
|
||||
struct PayloadExecutionCache {
|
||||
/// Guarded cloneable cache identified by a block hash.
|
||||
inner: Arc<RwLock<Option<SavedCache>>>,
|
||||
/// Metrics for cache operations.
|
||||
@@ -875,7 +872,6 @@ impl PayloadExecutionCache {
|
||||
let cache = self.inner.read();
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
|
||||
if elapsed.as_millis() > 5 {
|
||||
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
|
||||
}
|
||||
@@ -937,7 +933,7 @@ impl PayloadExecutionCache {
|
||||
///
|
||||
/// Violating this requirement can result in cache corruption, incorrect state data,
|
||||
/// and potential consensus failures.
|
||||
pub fn update_with_guard<F>(&self, update_fn: F)
|
||||
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
|
||||
where
|
||||
F: FnOnce(&mut Option<SavedCache>),
|
||||
{
|
||||
@@ -953,8 +949,6 @@ pub(crate) struct ExecutionCacheMetrics {
|
||||
/// Counter for when the execution cache was unavailable because other threads
|
||||
/// (e.g., prewarming) are still using it.
|
||||
pub(crate) execution_cache_in_use: Counter,
|
||||
/// Time spent waiting for execution cache mutex to become available.
|
||||
pub(crate) execution_cache_wait_duration: Histogram,
|
||||
}
|
||||
|
||||
/// EVM context required to execute a block.
|
||||
@@ -970,10 +964,6 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
|
||||
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
|
||||
/// the trie can be reused directly.
|
||||
pub parent_state_root: B256,
|
||||
/// Number of transactions in the block.
|
||||
/// Used to determine parallel worker count for prewarming.
|
||||
/// A value of 0 indicates the count is unknown.
|
||||
pub transaction_count: usize,
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
|
||||
@@ -986,7 +976,6 @@ where
|
||||
hash: Default::default(),
|
||||
parent_hash: Default::default(),
|
||||
parent_state_root: Default::default(),
|
||||
transaction_count: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use reth_trie_parallel::{
|
||||
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
|
||||
ProofWorkerHandle,
|
||||
},
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2},
|
||||
};
|
||||
use revm_primitives::map::{hash_map, B256Map};
|
||||
use std::{collections::BTreeMap, sync::Arc, time::Instant};
|
||||
@@ -63,7 +63,7 @@ const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
|
||||
|
||||
/// The default max targets, for limiting the number of account and storage proof targets to be
|
||||
/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
|
||||
pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
|
||||
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
|
||||
/// state.
|
||||
@@ -100,7 +100,7 @@ impl SparseTrieUpdate {
|
||||
|
||||
/// Messages used internally by the multi proof task.
|
||||
#[derive(Debug)]
|
||||
pub enum MultiProofMessage {
|
||||
pub(super) enum MultiProofMessage {
|
||||
/// Prefetch proof targets
|
||||
PrefetchProofs(VersionedMultiProofTargets),
|
||||
/// New state update from transaction execution with its source
|
||||
@@ -257,7 +257,7 @@ fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiP
|
||||
|
||||
/// A set of multiproof targets which can be either in the legacy or V2 representations.
|
||||
#[derive(Debug)]
|
||||
pub enum VersionedMultiProofTargets {
|
||||
pub(super) enum VersionedMultiProofTargets {
|
||||
/// Legacy targets
|
||||
Legacy(MultiProofTargets),
|
||||
/// V2 targets
|
||||
@@ -363,7 +363,9 @@ impl VersionedMultiProofTargets {
|
||||
Self::Legacy(targets) => {
|
||||
Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
|
||||
}
|
||||
Self::V2(targets) => Box::new(targets.chunks(chunk_size).map(Self::V2)),
|
||||
Self::V2(targets) => {
|
||||
Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -583,8 +585,6 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
pub last_proof_wait_time_histogram: Histogram,
|
||||
/// Time spent preparing the sparse trie for reuse after state root computation.
|
||||
pub into_trie_for_reuse_duration_histogram: Histogram,
|
||||
/// Time spent waiting for preserved sparse trie cache to become available.
|
||||
pub sparse_trie_cache_wait_duration_histogram: Histogram,
|
||||
}
|
||||
|
||||
/// Standalone task that receives a transaction state stream and updates relevant
|
||||
|
||||
@@ -49,8 +49,7 @@ use std::{
|
||||
use tracing::{debug, debug_span, instrument, trace, warn, Span};
|
||||
|
||||
/// Determines the prewarming mode: transaction-based or BAL-based.
|
||||
#[derive(Debug)]
|
||||
pub enum PrewarmMode<Tx> {
|
||||
pub(super) enum PrewarmMode<Tx> {
|
||||
/// Prewarm by executing transactions from a stream.
|
||||
Transactions(Receiver<Tx>),
|
||||
/// Prewarm by prefetching slots from a Block Access List.
|
||||
@@ -70,8 +69,7 @@ struct IndexedTransaction<Tx> {
|
||||
/// individually in parallel.
|
||||
///
|
||||
/// Note: This task runs until cancelled externally.
|
||||
#[derive(Debug)]
|
||||
pub struct PrewarmCacheTask<N, P, Evm>
|
||||
pub(super) struct PrewarmCacheTask<N, P, Evm>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
@@ -84,6 +82,8 @@ where
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
/// How many transactions should be executed in parallel
|
||||
max_concurrency: usize,
|
||||
/// The number of transactions to be processed
|
||||
transaction_count_hint: usize,
|
||||
/// Sender to emit evm state outcome messages, if any.
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
/// Receiver for events produced by tx execution
|
||||
@@ -99,11 +99,12 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
/// Initializes the task with the given transactions pending execution
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
executor: WorkloadExecutor,
|
||||
execution_cache: PayloadExecutionCache,
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
transaction_count_hint: usize,
|
||||
max_concurrency: usize,
|
||||
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
|
||||
let (actions_tx, actions_rx) = channel();
|
||||
@@ -111,7 +112,7 @@ where
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
max_concurrency,
|
||||
transaction_count = ctx.env.transaction_count,
|
||||
transaction_count_hint,
|
||||
"Initialized prewarm task"
|
||||
);
|
||||
|
||||
@@ -121,6 +122,7 @@ where
|
||||
execution_cache,
|
||||
ctx,
|
||||
max_concurrency,
|
||||
transaction_count_hint,
|
||||
to_multi_proof,
|
||||
actions_rx,
|
||||
parent_span: Span::current(),
|
||||
@@ -144,6 +146,7 @@ where
|
||||
let executor = self.executor.clone();
|
||||
let ctx = self.ctx.clone();
|
||||
let max_concurrency = self.max_concurrency;
|
||||
let transaction_count_hint = self.transaction_count_hint;
|
||||
let span = Span::current();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
@@ -151,14 +154,13 @@ where
|
||||
|
||||
let (done_tx, done_rx) = mpsc::channel();
|
||||
|
||||
// When transaction_count is 0, it means the count is unknown. In this case, spawn
|
||||
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
|
||||
// max workers to handle potentially many transactions in parallel rather
|
||||
// than bottlenecking on a single worker.
|
||||
let transaction_count = ctx.env.transaction_count;
|
||||
let workers_needed = if transaction_count == 0 {
|
||||
let workers_needed = if transaction_count_hint == 0 {
|
||||
max_concurrency
|
||||
} else {
|
||||
transaction_count.min(max_concurrency)
|
||||
transaction_count_hint.min(max_concurrency)
|
||||
};
|
||||
|
||||
// Spawn workers
|
||||
@@ -368,8 +370,11 @@ where
|
||||
name = "prewarm and caching",
|
||||
skip_all
|
||||
)]
|
||||
pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
|
||||
where
|
||||
pub(super) fn run<Tx>(
|
||||
self,
|
||||
mode: PrewarmMode<Tx>,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
) where
|
||||
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
|
||||
{
|
||||
// Spawn execution tasks based on mode
|
||||
@@ -431,29 +436,23 @@ where
|
||||
|
||||
/// Context required by tx execution tasks.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PrewarmContext<N, P, Evm>
|
||||
pub(super) struct PrewarmContext<N, P, Evm>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// The execution environment.
|
||||
pub env: ExecutionEnv<Evm>,
|
||||
/// The EVM configuration.
|
||||
pub evm_config: Evm,
|
||||
/// The saved cache.
|
||||
pub saved_cache: Option<SavedCache>,
|
||||
pub(super) env: ExecutionEnv<Evm>,
|
||||
pub(super) evm_config: Evm,
|
||||
pub(super) saved_cache: Option<SavedCache>,
|
||||
/// Provider to obtain the state
|
||||
pub provider: StateProviderBuilder<N, P>,
|
||||
/// The metrics for the prewarm task.
|
||||
pub metrics: PrewarmMetrics,
|
||||
pub(super) provider: StateProviderBuilder<N, P>,
|
||||
pub(super) metrics: PrewarmMetrics,
|
||||
/// An atomic bool that tells prewarm tasks to not start any more execution.
|
||||
pub terminate_execution: Arc<AtomicBool>,
|
||||
/// Whether the precompile cache is disabled.
|
||||
pub precompile_cache_disabled: bool,
|
||||
/// The precompile cache map.
|
||||
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
pub(super) terminate_execution: Arc<AtomicBool>,
|
||||
pub(super) precompile_cache_disabled: bool,
|
||||
pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// Whether V2 proof calculation is enabled.
|
||||
pub v2_proofs_enabled: bool,
|
||||
pub(super) v2_proofs_enabled: bool,
|
||||
}
|
||||
|
||||
impl<N, P, Evm> PrewarmContext<N, P, Evm>
|
||||
@@ -853,8 +852,7 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
|
||||
///
|
||||
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
|
||||
/// execution path without cloning the expensive `BundleState`.
|
||||
#[derive(Debug)]
|
||||
pub enum PrewarmTaskEvent<R> {
|
||||
pub(super) enum PrewarmTaskEvent<R> {
|
||||
/// Forcefully terminate all remaining transaction execution.
|
||||
TerminateTransactionExecution,
|
||||
/// Forcefully terminate the task on demand and update the shared cache with the given output
|
||||
@@ -884,7 +882,7 @@ pub enum PrewarmTaskEvent<R> {
|
||||
/// Metrics for transactions prewarming.
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.prewarm")]
|
||||
pub struct PrewarmMetrics {
|
||||
pub(crate) struct PrewarmMetrics {
|
||||
/// The number of transactions to prewarm
|
||||
pub(crate) transactions: Gauge,
|
||||
/// A histogram of the number of transactions to prewarm
|
||||
|
||||
@@ -3,19 +3,19 @@
|
||||
use crate::tree::{
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
VersionedMultiProofTargets,
|
||||
},
|
||||
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use alloy_rlp::Decodable;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
TrieAccount, EMPTY_ROOT_HASH,
|
||||
};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{
|
||||
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
DeferredDrops, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use smallvec::SmallVec;
|
||||
@@ -72,7 +72,7 @@ where
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> SparseStateTrie<A, S> {
|
||||
match self {
|
||||
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
Self::Cached(task) => task.into_trie_for_reuse(
|
||||
@@ -88,7 +88,7 @@ where
|
||||
self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> SparseStateTrie<A, S> {
|
||||
match self {
|
||||
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
@@ -135,7 +135,6 @@ where
|
||||
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
|
||||
/// to the trie. Once all updates are processed, computes and returns the final state root.
|
||||
#[instrument(
|
||||
name = "SparseTrieTask::run",
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
@@ -199,17 +198,13 @@ where
|
||||
mut self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.clear();
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
let deferred = self.trie.take_deferred_drops();
|
||||
(self.trie, deferred)
|
||||
self.trie
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
@@ -222,15 +217,6 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
|
||||
/// The size of proof targets chunk to spawn in one calculation.
|
||||
/// If None, chunking is disabled and all targets are processed in a single proof.
|
||||
chunk_size: Option<usize>,
|
||||
/// If this number is exceeded and chunking is enabled, then this will override whether or not
|
||||
/// there are any active workers and force chunking across workers. This is to prevent tasks
|
||||
/// which are very long from hitting a single worker.
|
||||
max_targets_for_chunking: usize,
|
||||
|
||||
/// Account trie updates.
|
||||
account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage trie updates. hashed address -> slot -> update.
|
||||
@@ -255,16 +241,12 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
/// Cache of storage proof targets that have already been fetched/requested from the proof
|
||||
/// workers. account -> slot -> lowest `min_len` requested.
|
||||
fetched_storage_targets: B256Map<B256Map<u8>>,
|
||||
/// Reusable buffer for RLP encoding of accounts.
|
||||
account_rlp_buf: Vec<u8>,
|
||||
/// Whether the last state update has been received.
|
||||
finished_state_updates: bool,
|
||||
/// Pending targets to be dispatched to the proof workers.
|
||||
pending_targets: MultiProofTargetsV2,
|
||||
/// Number of pending execution/prewarming updates received but not yet passed to
|
||||
/// `update_leaves`.
|
||||
/// Number of pending updates that were received but not yet processed.
|
||||
pending_updates: usize,
|
||||
|
||||
/// Metrics for the sparse trie.
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
@@ -274,13 +256,12 @@ where
|
||||
A: SparseTrieExt + Default,
|
||||
S: SparseTrieExt + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_trie(
|
||||
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
|
||||
pub(super) fn new(
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
chunk_size: Option<usize>,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
Self {
|
||||
@@ -289,14 +270,11 @@ where
|
||||
updates,
|
||||
proof_worker_handle,
|
||||
trie,
|
||||
chunk_size,
|
||||
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
account_updates: Default::default(),
|
||||
storage_updates: Default::default(),
|
||||
pending_account_updates: Default::default(),
|
||||
fetched_account_targets: Default::default(),
|
||||
fetched_storage_targets: Default::default(),
|
||||
account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
|
||||
finished_state_updates: Default::default(),
|
||||
pending_targets: Default::default(),
|
||||
pending_updates: Default::default(),
|
||||
@@ -313,11 +291,10 @@ where
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.prune(prune_depth, max_storage_tries);
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
let deferred = self.trie.take_deferred_drops();
|
||||
(self.trie, deferred)
|
||||
self.trie
|
||||
}
|
||||
|
||||
/// Clears and shrinks the trie, discarding all state.
|
||||
@@ -328,11 +305,10 @@ where
|
||||
mut self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.clear();
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
let deferred = self.trie.take_deferred_drops();
|
||||
(self.trie, deferred)
|
||||
self.trie
|
||||
}
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
@@ -342,7 +318,6 @@ where
|
||||
///
|
||||
/// This concludes once the last state update has been received and processed.
|
||||
#[instrument(
|
||||
name = "SparseTrieCacheTask::run",
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
@@ -371,33 +346,24 @@ where
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
while let Ok(next) = self.proof_result_rx.try_recv() {
|
||||
let ProofResult::V2(res) = next.result? else {
|
||||
while let Ok(res) = self.proof_result_rx.try_recv() {
|
||||
let ProofResult::V2(res) = res.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
result.extend(res);
|
||||
}
|
||||
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
}
|
||||
|
||||
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
|
||||
// If we don't have any pending messages, we can spend some time on computing
|
||||
// storage roots and promoting account updates.
|
||||
self.dispatch_pending_targets();
|
||||
self.promote_pending_account_updates()?;
|
||||
self.process_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
|
||||
// If we don't have any pending updates OR we've accumulated a lot already, apply
|
||||
// them to the trie,
|
||||
} else if self.updates.is_empty() || self.pending_updates > 100 {
|
||||
self.process_leaf_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() ||
|
||||
self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default()
|
||||
{
|
||||
// Make sure to dispatch targets if we don't have any updates or if we've
|
||||
// accumulated a lot of them.
|
||||
} else if self.updates.is_empty() || self.pending_targets.chunking_length() > 100 {
|
||||
self.dispatch_pending_targets();
|
||||
}
|
||||
|
||||
@@ -424,7 +390,6 @@ where
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
/// Processes a [`MultiProofMessage`].
|
||||
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
|
||||
match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
|
||||
@@ -520,8 +485,6 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies all account and storage leaf updates to corresponding tries and collects any new
|
||||
/// multiproof targets.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
@@ -530,21 +493,26 @@ where
|
||||
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.pending_updates = 0;
|
||||
|
||||
// Start with processing all storage updates in parallel.
|
||||
let storage_results = self
|
||||
.storage_updates
|
||||
// Make sure that tries exist for all addresses that have updates.
|
||||
for address in self.storage_updates.keys() {
|
||||
self.trie.get_or_create_storage_trie_mut(*address);
|
||||
}
|
||||
|
||||
let storage_results: Vec<_> = self
|
||||
.trie
|
||||
.storage_tries()
|
||||
.iter_mut()
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
.filter_map(|(address, trie)| {
|
||||
let updates = self.storage_updates.remove(address)?;
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
|
||||
(address, updates, fetched, trie)
|
||||
Some((address, updates, fetched, trie))
|
||||
})
|
||||
.par_bridge()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
.map(|(address, mut updates, mut fetched, trie)| {
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
trie.update_leaves(&mut updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
@@ -557,13 +525,13 @@ where
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
SparseTrieResult::Ok((address, targets, fetched, updates))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
for (address, targets, fetched, updates) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.trie.insert_storage_trie(*address, trie);
|
||||
self.storage_updates.insert(*address, updates);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
|
||||
@@ -571,17 +539,6 @@ where
|
||||
}
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.process_account_leaf_updates()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
///
|
||||
/// Returns whether any updates were drained (applied to the trie).
|
||||
fn process_account_leaf_updates(&mut self) -> SparseTrieResult<bool> {
|
||||
let updates_len_before = self.account_updates.len();
|
||||
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
@@ -602,18 +559,16 @@ where
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(self.account_updates.len() < updates_len_before)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterates through all storage tries for which all updates were processed, computes their
|
||||
/// storage roots, and promotes corresponding pending account updates into proper leaf updates
|
||||
/// for accounts trie.
|
||||
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
|
||||
fn process_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.process_leaf_updates()?;
|
||||
|
||||
if self.pending_account_updates.is_empty() {
|
||||
@@ -622,7 +577,7 @@ where
|
||||
|
||||
let roots = self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
.storage_tries()
|
||||
.par_iter_mut()
|
||||
.filter(|(address, _)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
@@ -635,10 +590,10 @@ where
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (addr, storage_root) in roots {
|
||||
for (address, storage_root) in roots {
|
||||
// If the storage root is known and we have a pending update for this account, encode it
|
||||
// into a proper update.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*address) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
@@ -647,70 +602,77 @@ where
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
self.account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(&mut self.account_rlp_buf);
|
||||
self.account_rlp_buf.clone()
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(account.unwrap_or_default().into_trie_account(storage_root))
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
self.account_updates.insert(*address, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
// Now handle pending account updates that can be upgraded to a proper update.
|
||||
let account_rlp_buf = &mut self.account_rlp_buf;
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
// Now promote pending account updates if possible.
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
|
||||
Some(encoded).filter(|encoded| !encoded.is_empty())
|
||||
} else if !self.account_updates.contains_key(addr) {
|
||||
self.trie.get_account_value(addr)
|
||||
} else {
|
||||
// Needs to be revealed first
|
||||
return true;
|
||||
};
|
||||
|
||||
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
|
||||
|
||||
let (account, storage_root) = if let Some(account) = account.take() {
|
||||
// If account is Some(_) here it means it didn't have any storage updates
|
||||
// and we can fetch the storage root directly from the account trie.
|
||||
//
|
||||
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
|
||||
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
|
||||
|
||||
(account, storage_root)
|
||||
} else {
|
||||
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
|
||||
};
|
||||
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
|
||||
Vec::new()
|
||||
} else {
|
||||
let account = account.unwrap_or_default().into_trie_account(storage_root);
|
||||
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(account)
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
|
||||
Some(encoded).filter(|encoded| !encoded.is_empty())
|
||||
} else if !self.account_updates.contains_key(addr) {
|
||||
self.trie.get_account_value(addr)
|
||||
} else {
|
||||
// Needs to be revealed first
|
||||
return true;
|
||||
};
|
||||
|
||||
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
|
||||
|
||||
let (account, storage_root) = if let Some(account) = account.take() {
|
||||
// If account is Some(_) here it means it didn't have any storage updates
|
||||
// and we can fetch the storage root directly from the account trie.
|
||||
//
|
||||
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
|
||||
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
|
||||
|
||||
(account, storage_root)
|
||||
} else {
|
||||
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
|
||||
};
|
||||
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
|
||||
Vec::new()
|
||||
} else {
|
||||
account_rlp_buf.clear();
|
||||
account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
|
||||
account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
// Only exit when no new updates are processed.
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if !self.process_account_leaf_updates()? {
|
||||
break
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -726,8 +688,8 @@ where
|
||||
dispatch_with_chunking(
|
||||
std::mem::take(&mut self.pending_targets),
|
||||
chunking_length,
|
||||
self.chunk_size,
|
||||
self.max_targets_for_chunking,
|
||||
Some(60),
|
||||
300,
|
||||
self.proof_worker_handle.available_account_workers(),
|
||||
self.proof_worker_handle.available_storage_workers(),
|
||||
MultiProofTargetsV2::chunks,
|
||||
@@ -800,7 +762,7 @@ where
|
||||
.storages
|
||||
.into_iter()
|
||||
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
|
||||
.par_bridge_buffered()
|
||||
.par_bridge()
|
||||
.map(|(address, storage, storage_trie)| {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
|
||||
|
||||
@@ -407,7 +407,6 @@ where
|
||||
hash: input.hash(),
|
||||
parent_hash: input.parent_hash(),
|
||||
parent_state_root: parent_block.state_root(),
|
||||
transaction_count: input.transaction_count(),
|
||||
};
|
||||
|
||||
// Plan the strategy used for state root computation.
|
||||
@@ -520,14 +519,6 @@ where
|
||||
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
|
||||
// we double check the state root here for good measure
|
||||
if state_root == block.header().state_root() {
|
||||
// Compare trie updates with serial computation if configured
|
||||
if self.config.always_compare_trie_updates() {
|
||||
self.compare_trie_updates_with_serial(
|
||||
overlay_factory.clone(),
|
||||
&hashed_state,
|
||||
trie_updates.clone(),
|
||||
);
|
||||
}
|
||||
maybe_state_root = Some((state_root, trie_updates, elapsed))
|
||||
} else {
|
||||
warn!(
|
||||
@@ -903,62 +894,6 @@ where
|
||||
.root_with_updates()?)
|
||||
}
|
||||
|
||||
/// Compares trie updates from the state root task with serial state root computation.
|
||||
///
|
||||
/// This is used for debugging and validating the correctness of the parallel state root
|
||||
/// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
|
||||
/// method runs a separate serial state root computation and compares the resulting trie
|
||||
/// updates.
|
||||
fn compare_trie_updates_with_serial(
|
||||
&self,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
hashed_state: &HashedPostState,
|
||||
task_trie_updates: TrieUpdates,
|
||||
) {
|
||||
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
|
||||
|
||||
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
|
||||
Ok((serial_root, serial_trie_updates)) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
?serial_root,
|
||||
"Serial state root computation finished for comparison"
|
||||
);
|
||||
|
||||
// Get a database provider to use as trie cursor factory
|
||||
match overlay_factory.database_provider_ro() {
|
||||
Ok(provider) => {
|
||||
if let Err(err) = super::trie_updates::compare_trie_updates(
|
||||
&provider,
|
||||
task_trie_updates,
|
||||
serial_trie_updates,
|
||||
) {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
%err,
|
||||
"Error comparing trie updates"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
%err,
|
||||
"Failed to get database provider for trie update comparison"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
%err,
|
||||
"Failed to compute serial state root for comparison"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates the block after execution.
|
||||
///
|
||||
/// This performs:
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
|
||||
|
||||
use alloy_primitives::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use moka::policy::EvictionPolicy;
|
||||
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
|
||||
use reth_primitives_traits::dashmap::DashMap;
|
||||
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
|
||||
use revm_primitives::Address;
|
||||
use std::{hash::Hash, sync::Arc};
|
||||
@@ -21,8 +21,7 @@ impl<S> PrecompileCacheMap<S>
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
/// Get the precompile cache for the given address.
|
||||
pub fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
|
||||
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
|
||||
// Try just using `.get` first to avoid acquiring a write lock.
|
||||
if let Some(cache) = self.0.get(&address) {
|
||||
return cache.clone();
|
||||
@@ -91,7 +90,7 @@ impl<S> CacheEntry<S> {
|
||||
|
||||
/// A cache for precompile inputs / outputs.
|
||||
#[derive(Debug)]
|
||||
pub struct CachedPrecompile<S>
|
||||
pub(crate) struct CachedPrecompile<S>
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
@@ -110,7 +109,7 @@ where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
/// `CachedPrecompile` constructor.
|
||||
pub const fn new(
|
||||
pub(crate) const fn new(
|
||||
precompile: DynPrecompile,
|
||||
cache: PrecompileCache<S>,
|
||||
spec_id: S,
|
||||
@@ -119,8 +118,7 @@ where
|
||||
Self { precompile, cache, spec_id, metrics }
|
||||
}
|
||||
|
||||
/// Wrap the given precompile in a cached precompile.
|
||||
pub fn wrap(
|
||||
pub(crate) fn wrap(
|
||||
precompile: DynPrecompile,
|
||||
cache: PrecompileCache<S>,
|
||||
spec_id: S,
|
||||
@@ -198,18 +196,18 @@ where
|
||||
/// Metrics for the cached precompile.
|
||||
#[derive(reth_metrics::Metrics, Clone)]
|
||||
#[metrics(scope = "sync.caching")]
|
||||
pub struct CachedPrecompileMetrics {
|
||||
pub(crate) struct CachedPrecompileMetrics {
|
||||
/// Precompile cache hits
|
||||
pub precompile_cache_hits: metrics::Counter,
|
||||
precompile_cache_hits: metrics::Counter,
|
||||
|
||||
/// Precompile cache misses
|
||||
pub precompile_cache_misses: metrics::Counter,
|
||||
precompile_cache_misses: metrics::Counter,
|
||||
|
||||
/// Precompile cache size. Uses the LRU cache length as the size metric.
|
||||
pub precompile_cache_size: metrics::Gauge,
|
||||
precompile_cache_size: metrics::Gauge,
|
||||
|
||||
/// Precompile execution errors.
|
||||
pub precompile_errors: metrics::Counter,
|
||||
precompile_errors: metrics::Counter,
|
||||
}
|
||||
|
||||
impl CachedPrecompileMetrics {
|
||||
@@ -217,7 +215,7 @@ impl CachedPrecompileMetrics {
|
||||
///
|
||||
/// Adds address as an `address` label padded with zeros to at least two hex symbols, prefixed
|
||||
/// by `0x`.
|
||||
pub fn new_with_address(address: Address) -> Self {
|
||||
pub(crate) fn new_with_address(address: Address) -> Self {
|
||||
Self::new_with_labels(&[("address", format!("0x{address:02x}"))])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use crate::engine::EngineApiKind;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::{
|
||||
map::{B256Map, B256Set},
|
||||
map::{HashMap, HashSet},
|
||||
BlockNumber, B256,
|
||||
};
|
||||
use reth_chain_state::{DeferredTrieData, EthPrimitives, ExecutedBlock, LazyOverlay};
|
||||
@@ -25,7 +25,7 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
|
||||
/// __All__ unique executed blocks by block hash that are connected to the canonical chain.
|
||||
///
|
||||
/// This includes blocks of all forks.
|
||||
pub(crate) blocks_by_hash: B256Map<ExecutedBlock<N>>,
|
||||
pub(crate) blocks_by_hash: HashMap<B256, ExecutedBlock<N>>,
|
||||
/// Executed blocks grouped by their respective block number.
|
||||
///
|
||||
/// This maps unique block number to all known blocks for that height.
|
||||
@@ -33,7 +33,7 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
|
||||
/// Note: there can be multiple blocks at the same height due to forks.
|
||||
pub(crate) blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock<N>>>,
|
||||
/// Map of any parent block hash to its children.
|
||||
pub(crate) parent_to_child: B256Map<B256Set>,
|
||||
pub(crate) parent_to_child: HashMap<B256, HashSet<B256>>,
|
||||
/// Currently tracked canonical head of the chain.
|
||||
pub(crate) current_canonical_head: BlockNumHash,
|
||||
/// The engine API variant of this handler
|
||||
@@ -48,34 +48,37 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
|
||||
|
||||
impl<N: NodePrimitives> TreeState<N> {
|
||||
/// Returns a new, empty tree state that points to the given canonical head.
|
||||
pub fn new(current_canonical_head: BlockNumHash, engine_kind: EngineApiKind) -> Self {
|
||||
pub(crate) fn new(current_canonical_head: BlockNumHash, engine_kind: EngineApiKind) -> Self {
|
||||
Self {
|
||||
blocks_by_hash: B256Map::default(),
|
||||
blocks_by_hash: HashMap::default(),
|
||||
blocks_by_number: BTreeMap::new(),
|
||||
current_canonical_head,
|
||||
parent_to_child: B256Map::default(),
|
||||
parent_to_child: HashMap::default(),
|
||||
engine_kind,
|
||||
cached_canonical_overlay: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Resets the state and points to the given canonical head.
|
||||
pub fn reset(&mut self, current_canonical_head: BlockNumHash) {
|
||||
pub(crate) fn reset(&mut self, current_canonical_head: BlockNumHash) {
|
||||
*self = Self::new(current_canonical_head, self.engine_kind);
|
||||
}
|
||||
|
||||
/// Returns the number of executed blocks stored.
|
||||
pub fn block_count(&self) -> usize {
|
||||
pub(crate) fn block_count(&self) -> usize {
|
||||
self.blocks_by_hash.len()
|
||||
}
|
||||
|
||||
/// Returns the [`ExecutedBlock`] by hash.
|
||||
pub fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
|
||||
pub(crate) fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
|
||||
self.blocks_by_hash.get(&hash)
|
||||
}
|
||||
|
||||
/// Returns the sealed block header by hash.
|
||||
pub fn sealed_header_by_hash(&self, hash: &B256) -> Option<SealedHeader<N::BlockHeader>> {
|
||||
pub(crate) fn sealed_header_by_hash(
|
||||
&self,
|
||||
hash: &B256,
|
||||
) -> Option<SealedHeader<N::BlockHeader>> {
|
||||
self.blocks_by_hash.get(hash).map(|b| b.sealed_block().sealed_header().clone())
|
||||
}
|
||||
|
||||
@@ -84,7 +87,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// highest persisted block connected to this chain.
|
||||
///
|
||||
/// Returns `None` if the block for the given hash is not found.
|
||||
pub fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
|
||||
pub(crate) fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
|
||||
let block = self.blocks_by_hash.get(&hash).cloned()?;
|
||||
let mut parent_hash = block.recovered_block().parent_hash();
|
||||
let mut blocks = vec![block];
|
||||
@@ -157,7 +160,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
}
|
||||
|
||||
/// Insert executed block into the state.
|
||||
pub fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
|
||||
pub(crate) fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
|
||||
let hash = executed.recovered_block().hash();
|
||||
let parent_hash = executed.recovered_block().parent_hash();
|
||||
let block_number = executed.recovered_block().number();
|
||||
@@ -178,7 +181,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// ## Returns
|
||||
///
|
||||
/// The removed block and the block hashes of its children.
|
||||
fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, B256Set)> {
|
||||
fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, HashSet<B256>)> {
|
||||
let executed = self.blocks_by_hash.remove(&hash)?;
|
||||
|
||||
// Remove this block from collection of children of its parent block.
|
||||
@@ -213,7 +216,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
}
|
||||
|
||||
/// Returns whether or not the hash is part of the canonical chain.
|
||||
pub fn is_canonical(&self, hash: B256) -> bool {
|
||||
pub(crate) fn is_canonical(&self, hash: B256) -> bool {
|
||||
let mut current_block = self.current_canonical_head.hash;
|
||||
if current_block == hash {
|
||||
return true
|
||||
@@ -231,7 +234,11 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
|
||||
/// Removes canonical blocks below the upper bound, only if the last persisted hash is
|
||||
/// part of the canonical chain.
|
||||
pub fn remove_canonical_until(&mut self, upper_bound: BlockNumber, last_persisted_hash: B256) {
|
||||
pub(crate) fn remove_canonical_until(
|
||||
&mut self,
|
||||
upper_bound: BlockNumber,
|
||||
last_persisted_hash: B256,
|
||||
) {
|
||||
debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
|
||||
|
||||
// If the last persisted hash is not canonical, then we don't want to remove any canonical
|
||||
@@ -256,7 +263,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
|
||||
/// Removes all blocks that are below the finalized block, as well as removing non-canonical
|
||||
/// sidechains that fork from below the finalized block.
|
||||
pub fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
|
||||
pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
|
||||
let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
|
||||
|
||||
// We remove disconnected sidechains in three steps:
|
||||
@@ -316,7 +323,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// NOTE: if the finalized block is greater than the upper bound, the only blocks that will be
|
||||
/// removed are canonical blocks and sidechains that fork below the `upper_bound`. This is the
|
||||
/// same behavior as if the `finalized_num` were `Some(upper_bound)`.
|
||||
pub fn remove_until(
|
||||
pub(crate) fn remove_until(
|
||||
&mut self,
|
||||
upper_bound: BlockNumHash,
|
||||
last_persisted_hash: B256,
|
||||
@@ -354,22 +361,22 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
}
|
||||
|
||||
/// Updates the canonical head to the given block.
|
||||
pub const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
|
||||
pub(crate) const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
|
||||
self.current_canonical_head = new_head;
|
||||
}
|
||||
|
||||
/// Returns the tracked canonical head.
|
||||
pub const fn canonical_head(&self) -> &BlockNumHash {
|
||||
pub(crate) const fn canonical_head(&self) -> &BlockNumHash {
|
||||
&self.current_canonical_head
|
||||
}
|
||||
|
||||
/// Returns the block hash of the canonical head.
|
||||
pub const fn canonical_block_hash(&self) -> B256 {
|
||||
pub(crate) const fn canonical_block_hash(&self) -> B256 {
|
||||
self.canonical_head().hash
|
||||
}
|
||||
|
||||
/// Returns the block number of the canonical head.
|
||||
pub const fn canonical_block_number(&self) -> BlockNumber {
|
||||
pub(crate) const fn canonical_block_number(&self) -> BlockNumber {
|
||||
self.canonical_head().number
|
||||
}
|
||||
}
|
||||
@@ -379,7 +386,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
/// Determines if the second block is a descendant of the first block.
|
||||
///
|
||||
/// If the two blocks are the same, this returns `false`.
|
||||
pub fn is_descendant(
|
||||
pub(crate) fn is_descendant(
|
||||
&self,
|
||||
first: BlockNumHash,
|
||||
second: alloy_eips::eip1898::BlockWithParent,
|
||||
@@ -489,7 +496,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[1].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
|
||||
);
|
||||
|
||||
assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
|
||||
@@ -498,7 +505,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[2].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
|
||||
);
|
||||
assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
|
||||
|
||||
@@ -586,11 +593,11 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
|
||||
);
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -636,11 +643,11 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
|
||||
);
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -686,11 +693,11 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
|
||||
);
|
||||
assert_eq!(
|
||||
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
|
||||
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
|
||||
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_trie_db::ChangesetCache;
|
||||
|
||||
use alloy_eips::eip1898::BlockWithParent;
|
||||
use alloy_primitives::{
|
||||
map::{B256Map, B256Set},
|
||||
map::{HashMap, HashSet},
|
||||
Bytes, B256,
|
||||
};
|
||||
use alloy_rlp::Decodable;
|
||||
@@ -28,7 +28,6 @@ use reth_ethereum_primitives::{Block, EthPrimitives};
|
||||
use reth_evm_ethereum::MockEvmConfig;
|
||||
use reth_primitives_traits::Block as _;
|
||||
use reth_provider::test_utils::MockEthProvider;
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
str::FromStr,
|
||||
@@ -235,11 +234,11 @@ impl TestHarness {
|
||||
}
|
||||
|
||||
fn with_blocks(mut self, blocks: Vec<ExecutedBlock>) -> Self {
|
||||
let mut blocks_by_hash = B256Map::default();
|
||||
let mut blocks_by_hash = HashMap::default();
|
||||
let mut blocks_by_number = BTreeMap::new();
|
||||
let mut state_by_hash = B256Map::default();
|
||||
let mut state_by_hash = HashMap::default();
|
||||
let mut hash_by_number = BTreeMap::new();
|
||||
let mut parent_to_child: B256Map<B256Set> = B256Map::default();
|
||||
let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
|
||||
let mut parent_hash = B256::ZERO;
|
||||
|
||||
for block in &blocks {
|
||||
@@ -539,7 +538,10 @@ async fn test_tree_persist_blocks() {
|
||||
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
|
||||
.collect();
|
||||
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
|
||||
spawn_os_thread("engine", || test_harness.tree.run());
|
||||
std::thread::Builder::new()
|
||||
.name("Engine Task".to_string())
|
||||
.spawn(|| test_harness.tree.run())
|
||||
.unwrap();
|
||||
|
||||
// send a message to the tree to enter the main loop.
|
||||
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
@@ -957,7 +959,7 @@ async fn test_engine_tree_fcu_missing_head() {
|
||||
let event = test_harness.from_tree_rx.recv().await.unwrap();
|
||||
match event {
|
||||
EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
|
||||
let expected_block_set = B256Set::from_iter([missing_block.hash()]);
|
||||
let expected_block_set = HashSet::from_iter([missing_block.hash()]);
|
||||
assert_eq!(actual_block_set, expected_block_set);
|
||||
}
|
||||
_ => panic!("Unexpected event: {event:#?}"),
|
||||
@@ -1002,7 +1004,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
|
||||
let event = test_harness.from_tree_rx.recv().await.unwrap();
|
||||
match event {
|
||||
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
|
||||
assert_eq!(hash_set, B256Set::from_iter([main_chain_last_hash]));
|
||||
assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
|
||||
}
|
||||
_ => panic!("Unexpected event: {event:#?}"),
|
||||
}
|
||||
@@ -1011,7 +1013,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
|
||||
let event = test_harness.from_tree_rx.recv().await.unwrap();
|
||||
match event {
|
||||
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
|
||||
assert_eq!(hash_set, B256Set::from_iter([main_chain_last_hash]));
|
||||
assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
|
||||
}
|
||||
_ => panic!("Unexpected event: {event:#?}"),
|
||||
}
|
||||
@@ -1987,7 +1989,10 @@ mod forkchoice_updated_tests {
|
||||
let action_rx = test_harness.action_rx;
|
||||
|
||||
// Spawn tree in background thread
|
||||
spawn_os_thread("engine", || test_harness.tree.run());
|
||||
std::thread::Builder::new()
|
||||
.name("Engine Task".to_string())
|
||||
.spawn(|| test_harness.tree.run())
|
||||
.unwrap();
|
||||
|
||||
// Send terminate request
|
||||
to_tree_tx
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
use alloy_primitives::{
|
||||
map::{B256Map, HashMap},
|
||||
B256,
|
||||
};
|
||||
use alloy_primitives::{map::HashMap, B256};
|
||||
use reth_db::DatabaseError;
|
||||
use reth_trie::{
|
||||
trie_cursor::{TrieCursor, TrieCursorFactory},
|
||||
@@ -22,7 +19,7 @@ struct EntryDiff<T> {
|
||||
struct TrieUpdatesDiff {
|
||||
account_nodes: HashMap<Nibbles, EntryDiff<Option<BranchNodeCompact>>>,
|
||||
removed_nodes: HashMap<Nibbles, EntryDiff<bool>>,
|
||||
storage_tries: B256Map<StorageTrieUpdatesDiff>,
|
||||
storage_tries: HashMap<B256, StorageTrieUpdatesDiff>,
|
||||
}
|
||||
|
||||
impl TrieUpdatesDiff {
|
||||
@@ -101,7 +98,7 @@ impl StorageTrieUpdatesDiff {
|
||||
|
||||
/// Compares the trie updates from state root task, regular state root calculation and database,
|
||||
/// and logs the differences if there's any.
|
||||
pub(crate) fn compare_trie_updates(
|
||||
pub(super) fn compare_trie_updates(
|
||||
trie_cursor_factory: impl TrieCursorFactory,
|
||||
task: TrieUpdates,
|
||||
regular: TrieUpdates,
|
||||
@@ -189,8 +186,7 @@ fn compare_storage_trie_updates<C: TrieCursor>(
|
||||
task: &mut StorageTrieUpdates,
|
||||
regular: &mut StorageTrieUpdates,
|
||||
) -> Result<StorageTrieUpdatesDiff, DatabaseError> {
|
||||
// Check if the storage trie exists by seeking to the first entry
|
||||
let database_not_exists = trie_cursor()?.seek(Nibbles::default())?.is_none();
|
||||
let database_not_exists = trie_cursor()?.next()?.is_none();
|
||||
let mut diff = StorageTrieUpdatesDiff {
|
||||
// If the deletion is a no-op, meaning that the entry is not in the
|
||||
// database, do not add it to the diff.
|
||||
|
||||
@@ -20,6 +20,8 @@ reth-era.workspace = true
|
||||
# http
|
||||
bytes.workspace = true
|
||||
reqwest.workspace = true
|
||||
reqwest.default-features = false
|
||||
reqwest.features = ["stream", "rustls-tls-native-roots"]
|
||||
|
||||
# async
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -86,7 +86,7 @@ where
|
||||
mut self,
|
||||
components: impl CliComponentsBuilder<N>,
|
||||
launcher: impl AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> Result<()>,
|
||||
) -> Result<()>
|
||||
@@ -132,7 +132,7 @@ pub(crate) fn run_commands_with<C, Ext, Rpc, N, SubCmd>(
|
||||
runner: CliRunner,
|
||||
components: impl CliComponentsBuilder<N>,
|
||||
launcher: impl AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> Result<()>,
|
||||
) -> Result<()>
|
||||
|
||||
@@ -131,7 +131,7 @@ impl<
|
||||
/// ````
|
||||
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
C: ChainSpecParser<ChainSpec = ChainSpec>,
|
||||
{
|
||||
@@ -148,7 +148,7 @@ impl<
|
||||
self,
|
||||
components: impl CliComponentsBuilder<N>,
|
||||
launcher: impl AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> eyre::Result<()>,
|
||||
) -> eyre::Result<()>
|
||||
@@ -180,7 +180,7 @@ impl<
|
||||
/// ```
|
||||
pub fn with_runner<L, Fut>(self, runner: CliRunner, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
C: ChainSpecParser<ChainSpec = ChainSpec>,
|
||||
{
|
||||
@@ -196,7 +196,7 @@ impl<
|
||||
runner: CliRunner,
|
||||
components: impl CliComponentsBuilder<N>,
|
||||
launcher: impl AsyncFnOnce(
|
||||
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
|
||||
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
|
||||
Ext,
|
||||
) -> eyre::Result<()>,
|
||||
) -> eyre::Result<()>
|
||||
|
||||
@@ -119,9 +119,10 @@ impl EthereumNode {
|
||||
/// use reth_db::open_db_read_only;
|
||||
/// use reth_node_ethereum::EthereumNode;
|
||||
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// let factory = EthereumNode::provider_factory_builder()
|
||||
/// .db(open_db_read_only("db", Default::default()).unwrap())
|
||||
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
|
||||
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
|
||||
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
|
||||
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
|
||||
|
||||
@@ -100,12 +100,10 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
|
||||
ChainSpecBuilder::default().chain(MAINNET.chain).genesis(genesis).osaka_activated().build(),
|
||||
);
|
||||
let genesis_hash = chain_spec.genesis_hash();
|
||||
let node_config = NodeConfig::test().with_chain(chain_spec).with_unused_ports().with_rpc(
|
||||
RpcServerArgs::default()
|
||||
.with_unused_ports()
|
||||
.with_http()
|
||||
.with_force_blob_sidecar_upcasting(),
|
||||
);
|
||||
let node_config = NodeConfig::test()
|
||||
.with_chain(chain_spec)
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.node(EthereumNode::default())
|
||||
@@ -127,7 +125,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
|
||||
let blob_tx_hash = node.rpc.inject_tx(blob_tx).await?;
|
||||
// fetch it from rpc
|
||||
let envelope = node.rpc.envelope_by_hash(blob_tx_hash).await?;
|
||||
// assert that sidecar was converted to eip7594 (force upcasting is enabled)
|
||||
// assert that sidecar was converted to eip7594
|
||||
assert!(envelope.as_eip4844().unwrap().tx().sidecar().unwrap().is_eip7594());
|
||||
// validate sidecar
|
||||
TransactionTestContext::validate_sidecar(envelope);
|
||||
@@ -163,12 +161,10 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
|
||||
.build(),
|
||||
);
|
||||
let genesis_hash = chain_spec.genesis_hash();
|
||||
let node_config = NodeConfig::test().with_chain(chain_spec).with_unused_ports().with_rpc(
|
||||
RpcServerArgs::default()
|
||||
.with_unused_ports()
|
||||
.with_http()
|
||||
.with_force_blob_sidecar_upcasting(),
|
||||
);
|
||||
let node_config = NodeConfig::test()
|
||||
.with_chain(chain_spec)
|
||||
.with_unused_ports()
|
||||
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node(exec.clone())
|
||||
.node(EthereumNode::default())
|
||||
|
||||
@@ -511,8 +511,9 @@ mod compact {
|
||||
total_length += flags.len() + buffer.len();
|
||||
buf.put_slice(&flags);
|
||||
if zstd {
|
||||
reth_zstd_compressors::with_receipt_compressor(|compressor| {
|
||||
let compressed = compressor.compress(&buffer).expect("Failed to compress.");
|
||||
reth_zstd_compressors::RECEIPT_COMPRESSOR.with(|compressor| {
|
||||
let compressed =
|
||||
compressor.borrow_mut().compress(&buffer).expect("Failed to compress.");
|
||||
buf.put(compressed.as_slice());
|
||||
});
|
||||
} else {
|
||||
@@ -524,7 +525,8 @@ mod compact {
|
||||
fn from_compact(buf: &[u8], _len: usize) -> (Self, &[u8]) {
|
||||
let (flags, mut buf) = ReceiptFlags::from(buf);
|
||||
if flags.__zstd() != 0 {
|
||||
reth_zstd_compressors::with_receipt_decompressor(|decompressor| {
|
||||
reth_zstd_compressors::RECEIPT_DECOMPRESSOR.with(|decompressor| {
|
||||
let decompressor = &mut decompressor.borrow_mut();
|
||||
let decompressed = decompressor.decompress(buf);
|
||||
let original_buf = buf;
|
||||
let mut buf: &[u8] = decompressed;
|
||||
|
||||
@@ -577,11 +577,19 @@ impl reth_codecs::Compact for TransactionSigned {
|
||||
|
||||
let tx_bits = if zstd_bit {
|
||||
let mut tmp = Vec::with_capacity(256);
|
||||
reth_zstd_compressors::with_tx_compressor(|compressor| {
|
||||
if cfg!(feature = "std") {
|
||||
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
|
||||
let mut compressor = compressor.borrow_mut();
|
||||
let tx_bits = self.transaction.to_compact(&mut tmp);
|
||||
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
|
||||
tx_bits as u8
|
||||
})
|
||||
} else {
|
||||
let mut compressor = reth_zstd_compressors::create_tx_compressor();
|
||||
let tx_bits = self.transaction.to_compact(&mut tmp);
|
||||
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
|
||||
tx_bits as u8
|
||||
})
|
||||
}
|
||||
} else {
|
||||
self.transaction.to_compact(buf) as u8
|
||||
};
|
||||
@@ -603,13 +611,26 @@ impl reth_codecs::Compact for TransactionSigned {
|
||||
|
||||
let zstd_bit = bitflags >> 3;
|
||||
let (transaction, buf) = if zstd_bit != 0 {
|
||||
reth_zstd_compressors::with_tx_decompressor(|decompressor| {
|
||||
// TODO: enforce that zstd is only present at a "top" level type
|
||||
if cfg!(feature = "std") {
|
||||
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
|
||||
let mut decompressor = decompressor.borrow_mut();
|
||||
|
||||
// TODO: enforce that zstd is only present at a "top" level type
|
||||
|
||||
let transaction_type = (bitflags & 0b110) >> 1;
|
||||
let (transaction, _) =
|
||||
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
|
||||
|
||||
(transaction, buf)
|
||||
})
|
||||
} else {
|
||||
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
|
||||
let transaction_type = (bitflags & 0b110) >> 1;
|
||||
let (transaction, _) =
|
||||
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
|
||||
|
||||
(transaction, buf)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
let transaction_type = bitflags >> 1;
|
||||
Transaction::from_compact(buf, transaction_type)
|
||||
|
||||
@@ -36,6 +36,7 @@ rayon = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-ethereum-forks.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -46,6 +47,7 @@ std = [
|
||||
"alloy-primitives/std",
|
||||
"alloy-consensus/std",
|
||||
"revm/std",
|
||||
"reth-ethereum-forks/std",
|
||||
"alloy-evm/std",
|
||||
"reth-execution-errors/std",
|
||||
"reth-execution-types/std",
|
||||
|
||||
@@ -57,12 +57,6 @@ pub enum StateProofError {
|
||||
/// RLP decoding error.
|
||||
#[error(transparent)]
|
||||
Rlp(#[from] alloy_rlp::Error),
|
||||
/// Trie inconsistency detected during proof calculation.
|
||||
///
|
||||
/// This occurs when cached trie nodes disagree with the leaf data, causing
|
||||
/// proof calculation to be unable to make forward progress.
|
||||
#[error("trie inconsistency: {0}")]
|
||||
TrieInconsistency(alloc::string::String),
|
||||
}
|
||||
|
||||
impl From<StateProofError> for ProviderError {
|
||||
@@ -70,7 +64,6 @@ impl From<StateProofError> for ProviderError {
|
||||
match value {
|
||||
StateProofError::Database(error) => Self::Database(error),
|
||||
StateProofError::Rlp(error) => Self::Rlp(error),
|
||||
StateProofError::TrieInconsistency(msg) => Self::Database(DatabaseError::Other(msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
use crate::{BlockExecutionOutput, BlockExecutionResult};
|
||||
use alloc::{vec, vec::Vec};
|
||||
use alloy_eips::eip7685::Requests;
|
||||
use alloy_primitives::{
|
||||
logs_bloom,
|
||||
map::{AddressMap, B256Map, HashMap},
|
||||
Address, BlockNumber, Bloom, Log, B256, U256,
|
||||
};
|
||||
use alloy_primitives::{logs_bloom, map::HashMap, Address, BlockNumber, Bloom, Log, B256, U256};
|
||||
use reth_primitives_traits::{Account, Bytecode, Receipt, StorageEntry};
|
||||
use reth_trie_common::{HashedPostState, KeyHasher};
|
||||
use revm::{
|
||||
@@ -14,13 +10,14 @@ use revm::{
|
||||
};
|
||||
|
||||
/// Type used to initialize revms bundle state.
|
||||
pub type BundleStateInit = AddressMap<(Option<Account>, Option<Account>, B256Map<(U256, U256)>)>;
|
||||
pub type BundleStateInit =
|
||||
HashMap<Address, (Option<Account>, Option<Account>, HashMap<B256, (U256, U256)>)>;
|
||||
|
||||
/// Types used inside `RevertsInit` to initialize revms reverts.
|
||||
pub type AccountRevertInit = (Option<Option<Account>>, Vec<StorageEntry>);
|
||||
|
||||
/// Type used to initialize revms reverts.
|
||||
pub type RevertsInit = HashMap<BlockNumber, AddressMap<AccountRevertInit>>;
|
||||
pub type RevertsInit = HashMap<BlockNumber, HashMap<Address, AccountRevertInit>>;
|
||||
|
||||
/// Represents a changed account
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
@@ -617,12 +614,12 @@ mod tests {
|
||||
);
|
||||
|
||||
// Create a BundleStateInit object and insert initial data
|
||||
let mut state_init: BundleStateInit = AddressMap::default();
|
||||
let mut state_init: BundleStateInit = HashMap::default();
|
||||
state_init
|
||||
.insert(Address::new([2; 20]), (None, Some(Account::default()), B256Map::default()));
|
||||
.insert(Address::new([2; 20]), (None, Some(Account::default()), HashMap::default()));
|
||||
|
||||
// Create an AddressMap for account reverts and insert initial data
|
||||
let mut revert_inner: AddressMap<AccountRevertInit> = AddressMap::default();
|
||||
// Create a HashMap for account reverts and insert initial data
|
||||
let mut revert_inner: HashMap<Address, AccountRevertInit> = HashMap::default();
|
||||
revert_inner.insert(Address::new([2; 20]), (None, vec![]));
|
||||
|
||||
// Create a RevertsInit object and insert the revert_inner data
|
||||
|
||||
@@ -41,7 +41,6 @@ metrics.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
reth-tracing.workspace = true
|
||||
alloy-primitives = { workspace = true, features = ["rand"] }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
secp256k1 = { workspace = true, features = ["std", "rand"] }
|
||||
rand_08.workspace = true
|
||||
|
||||
@@ -30,12 +30,12 @@ tokio-stream.workspace = true
|
||||
hickory-resolver = { workspace = true, features = ["tokio"] }
|
||||
|
||||
# misc
|
||||
dashmap = { workspace = true, features = ["inline"] }
|
||||
data-encoding.workspace = true
|
||||
linked_hash_set.workspace = true
|
||||
schnellru.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
parking_lot.workspace = true
|
||||
serde = { workspace = true, optional = true }
|
||||
serde_with = { workspace = true, optional = true }
|
||||
|
||||
@@ -56,9 +56,9 @@ serde = [
|
||||
"alloy-primitives/serde",
|
||||
"enr/serde",
|
||||
"linked_hash_set/serde",
|
||||
"parking_lot/serde",
|
||||
"rand/serde",
|
||||
"secp256k1/serde",
|
||||
"hickory-resolver/serde",
|
||||
"reth-ethereum-forks/serde",
|
||||
"dashmap/serde",
|
||||
]
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
//! Perform DNS lookups
|
||||
|
||||
use dashmap::DashMap;
|
||||
use hickory_resolver::name_server::ConnectionProvider;
|
||||
pub use hickory_resolver::{ResolveError, TokioResolver};
|
||||
use std::future::Future;
|
||||
use parking_lot::RwLock;
|
||||
use std::{collections::HashMap, future::Future};
|
||||
use tracing::trace;
|
||||
|
||||
/// A type that can lookup DNS entries
|
||||
@@ -72,25 +72,25 @@ impl Resolver for DnsResolver {
|
||||
|
||||
/// A [Resolver] that uses an in memory map to lookup entries
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MapResolver(DashMap<String, String>);
|
||||
pub struct MapResolver(RwLock<HashMap<String, String>>);
|
||||
|
||||
// === impl MapResolver ===
|
||||
|
||||
impl MapResolver {
|
||||
/// Inserts a key-value pair into the map.
|
||||
pub fn insert(&self, k: String, v: String) -> Option<String> {
|
||||
self.0.insert(k, v)
|
||||
self.0.write().insert(k, v)
|
||||
}
|
||||
|
||||
/// Returns the value corresponding to the key
|
||||
pub fn get(&self, k: &str) -> Option<String> {
|
||||
self.0.get(k).map(|entry| entry.value().clone())
|
||||
self.0.read().get(k).cloned()
|
||||
}
|
||||
|
||||
/// Removes a key from the map, returning the value at the key if the key was previously in the
|
||||
/// map.
|
||||
pub fn remove(&self, k: &str) -> Option<String> {
|
||||
self.0.remove(k).map(|(_, v)| v)
|
||||
self.0.write().remove(k)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -621,11 +621,12 @@ mod tests {
|
||||
bodies::test_utils::{insert_headers, zip_blocks},
|
||||
test_utils::{generate_bodies, TestBodiesClient},
|
||||
};
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_primitives::B256;
|
||||
use assert_matches::assert_matches;
|
||||
use reth_consensus::test_utils::TestConsensus;
|
||||
use reth_provider::test_utils::create_test_provider_factory;
|
||||
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
|
||||
use std::collections::HashMap;
|
||||
|
||||
// Check that the blocks are emitted in order of block number, not in order of
|
||||
// first-downloaded
|
||||
@@ -673,7 +674,7 @@ mod tests {
|
||||
let bodies = blocks
|
||||
.into_iter()
|
||||
.map(|block| (block.hash(), block.into_body()))
|
||||
.collect::<B256Map<_>>();
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
insert_headers(&factory, &headers);
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::map::B256Map;
|
||||
use alloy_primitives::B256;
|
||||
use reth_ethereum_primitives::BlockBody;
|
||||
use reth_network_p2p::bodies::response::BlockResponse;
|
||||
use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
|
||||
@@ -11,10 +11,11 @@ use reth_provider::{
|
||||
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderFactory, StaticFileSegment,
|
||||
StaticFileWriter,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub(crate) fn zip_blocks<'a, B: Block>(
|
||||
headers: impl Iterator<Item = &'a SealedHeader<B::Header>>,
|
||||
bodies: &mut B256Map<B::Body>,
|
||||
bodies: &mut HashMap<B256, B::Body>,
|
||||
) -> Vec<BlockResponse<B>> {
|
||||
headers
|
||||
.into_iter()
|
||||
@@ -31,7 +32,7 @@ pub(crate) fn zip_blocks<'a, B: Block>(
|
||||
|
||||
pub(crate) fn create_raw_bodies(
|
||||
headers: impl IntoIterator<Item = SealedHeader>,
|
||||
bodies: &mut B256Map<BlockBody>,
|
||||
bodies: &mut HashMap<B256, BlockBody>,
|
||||
) -> Vec<reth_ethereum_primitives::Block> {
|
||||
headers
|
||||
.into_iter()
|
||||
|
||||
@@ -704,7 +704,7 @@ mod tests {
|
||||
FileClient::from_file(file.into(), NoopConsensus::arc())
|
||||
.await
|
||||
.unwrap()
|
||||
.with_bodies(bodies.clone().into_iter().collect()),
|
||||
.with_bodies(bodies.clone()),
|
||||
);
|
||||
let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
|
||||
client.clone(),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_primitives::B256;
|
||||
use reth_ethereum_primitives::BlockBody;
|
||||
use reth_network_p2p::{
|
||||
bodies::client::{BodiesClient, BodiesFut},
|
||||
@@ -7,6 +7,7 @@ use reth_network_p2p::{
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
ops::RangeInclusive,
|
||||
sync::{
|
||||
@@ -20,7 +21,7 @@ use tokio::sync::Mutex;
|
||||
/// A [`BodiesClient`] for testing.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TestBodiesClient {
|
||||
bodies: Arc<Mutex<B256Map<BlockBody>>>,
|
||||
bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
|
||||
should_delay: bool,
|
||||
max_batch_size: Option<usize>,
|
||||
times_requested: AtomicU64,
|
||||
@@ -28,7 +29,7 @@ pub struct TestBodiesClient {
|
||||
}
|
||||
|
||||
impl TestBodiesClient {
|
||||
pub(crate) fn with_bodies(mut self, bodies: B256Map<BlockBody>) -> Self {
|
||||
pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
|
||||
self.bodies = Arc::new(Mutex::new(bodies));
|
||||
self
|
||||
}
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
|
||||
#[cfg(any(test, feature = "file-client"))]
|
||||
use crate::{bodies::test_utils::create_raw_bodies, file_codec::BlockFileCodec};
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_primitives::B256;
|
||||
use reth_ethereum_primitives::BlockBody;
|
||||
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
|
||||
use std::ops::RangeInclusive;
|
||||
use std::{collections::HashMap, ops::RangeInclusive};
|
||||
|
||||
mod bodies_client;
|
||||
pub use bodies_client::TestBodiesClient;
|
||||
@@ -19,7 +19,7 @@ pub(crate) const TEST_SCOPE: &str = "downloaders.test";
|
||||
/// Generate a set of bodies and their corresponding block hashes
|
||||
pub(crate) fn generate_bodies(
|
||||
range: RangeInclusive<u64>,
|
||||
) -> (Vec<SealedHeader>, B256Map<BlockBody>) {
|
||||
) -> (Vec<SealedHeader>, HashMap<B256, BlockBody>) {
|
||||
let mut rng = generators::rng();
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
@@ -38,7 +38,7 @@ pub(crate) fn generate_bodies(
|
||||
#[cfg(any(test, feature = "file-client"))]
|
||||
pub(crate) async fn generate_bodies_file(
|
||||
range: RangeInclusive<u64>,
|
||||
) -> (tokio::fs::File, Vec<SealedHeader>, B256Map<BlockBody>) {
|
||||
) -> (tokio::fs::File, Vec<SealedHeader>, HashMap<B256, BlockBody>) {
|
||||
use futures::SinkExt;
|
||||
use std::io::SeekFrom;
|
||||
use tokio::{fs::File, io::AsyncSeekExt};
|
||||
|
||||
@@ -8,13 +8,13 @@ use crate::{
|
||||
};
|
||||
use alloy_consensus::Header;
|
||||
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_primitives::B256;
|
||||
use parking_lot::Mutex;
|
||||
use reth_eth_wire_types::HeadersDirection;
|
||||
use reth_ethereum_primitives::{Block, BlockBody};
|
||||
use reth_network_peers::{PeerId, WithPeerId};
|
||||
use reth_primitives_traits::{SealedBlock, SealedHeader};
|
||||
use std::{ops::RangeInclusive, sync::Arc};
|
||||
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
|
||||
|
||||
/// A headers+bodies client that stores the headers and bodies in memory, with an artificial soft
|
||||
/// bodies response limit that is set to 20 by default.
|
||||
@@ -22,8 +22,8 @@ use std::{ops::RangeInclusive, sync::Arc};
|
||||
/// This full block client can be [Clone]d and shared between multiple tasks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TestFullBlockClient {
|
||||
headers: Arc<Mutex<B256Map<Header>>>,
|
||||
bodies: Arc<Mutex<B256Map<BlockBody>>>,
|
||||
headers: Arc<Mutex<HashMap<B256, Header>>>,
|
||||
bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
|
||||
// soft response limit, max number of bodies to respond with
|
||||
soft_limit: usize,
|
||||
}
|
||||
@@ -31,8 +31,8 @@ pub struct TestFullBlockClient {
|
||||
impl Default for TestFullBlockClient {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
headers: Arc::new(Mutex::new(B256Map::default())),
|
||||
bodies: Arc::new(Mutex::new(B256Map::default())),
|
||||
headers: Arc::new(Mutex::new(HashMap::default())),
|
||||
bodies: Arc::new(Mutex::new(HashMap::default())),
|
||||
soft_limit: 20,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,8 +251,6 @@ impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
|
||||
}
|
||||
|
||||
/// Creates a preconfigured node for testing purposes with a specific datadir.
|
||||
///
|
||||
/// The entire `datadir` will be cleaned up when the node is dropped.
|
||||
#[cfg(feature = "test-utils")]
|
||||
pub fn testing_node_with_datadir(
|
||||
mut self,
|
||||
@@ -270,7 +268,7 @@ impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
|
||||
let data_dir =
|
||||
path.unwrap_or_chain_default(self.config.chain.chain(), self.config.datadir.clone());
|
||||
|
||||
let db = reth_db::test_utils::create_test_rw_db_with_datadir(data_dir.data_dir());
|
||||
let db = reth_db::test_utils::create_test_rw_db_with_path(data_dir.db());
|
||||
|
||||
WithLaunchContext { builder: self.with_database(db), task_executor }
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Pool component for the node builder.
|
||||
|
||||
use crate::{BuilderContext, FullNodeTypes};
|
||||
use alloy_primitives::map::AddressSet;
|
||||
use alloy_primitives::Address;
|
||||
use reth_chain_state::CanonStateSubscriptions;
|
||||
use reth_chainspec::EthereumHardforks;
|
||||
use reth_node_api::{BlockTy, NodeTypes, TxTy};
|
||||
@@ -9,7 +9,7 @@ use reth_transaction_pool::{
|
||||
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
|
||||
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
|
||||
};
|
||||
use std::future::Future;
|
||||
use std::{collections::HashSet, future::Future};
|
||||
|
||||
/// A type that knows how to build the transaction pool.
|
||||
pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
|
||||
@@ -62,7 +62,7 @@ pub struct PoolBuilderConfigOverrides {
|
||||
/// Minimum base fee required by the protocol.
|
||||
pub minimal_protocol_basefee: Option<u64>,
|
||||
/// Addresses that will be considered as local. Above exemptions apply.
|
||||
pub local_addresses: AddressSet,
|
||||
pub local_addresses: HashSet<Address>,
|
||||
/// Additional tasks to validate new transactions.
|
||||
pub additional_validation_tasks: Option<usize>,
|
||||
}
|
||||
|
||||
@@ -66,8 +66,8 @@ use reth_node_metrics::{
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
|
||||
BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
|
||||
ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::{PruneModes, PrunerBuilder};
|
||||
@@ -236,7 +236,7 @@ impl LaunchContext {
|
||||
.map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
|
||||
if let Err(err) = ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.thread_name(|i| format!("rayon-{i}"))
|
||||
.thread_name(|i| format!("reth-rayon-{i}"))
|
||||
.build_global()
|
||||
{
|
||||
warn!(%err, "Failed to build global thread pool")
|
||||
@@ -507,10 +507,32 @@ where
|
||||
.with_prune_modes(self.prune_modes())
|
||||
.with_changeset_cache(changeset_cache);
|
||||
|
||||
// Check consistency between the database and static files, returning
|
||||
// the unwind targets for each storage layer if inconsistencies are
|
||||
// found.
|
||||
let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
|
||||
// Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the
|
||||
// earliest consistent block.
|
||||
//
|
||||
// Order matters:
|
||||
// 1) heal static files (no pruning)
|
||||
// 2) check RocksDB (needs static-file tx data)
|
||||
// 3) check static-file checkpoints vs MDBX (may prune)
|
||||
//
|
||||
// Compute one unwind target and run a single unwind.
|
||||
|
||||
let provider_ro = factory.database_provider_ro()?;
|
||||
|
||||
// Step 1: heal file-level inconsistencies (no pruning)
|
||||
factory.static_file_provider().check_file_consistency(&provider_ro)?;
|
||||
|
||||
// Step 2: RocksDB consistency check (needs static files tx data)
|
||||
let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?;
|
||||
|
||||
// Step 3: Static file checkpoint consistency (may prune)
|
||||
let static_file_unwind = factory
|
||||
.static_file_provider()
|
||||
.check_consistency(&provider_ro)?
|
||||
.map(|target| match target {
|
||||
PipelineTarget::Unwind(block) => block,
|
||||
PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
|
||||
});
|
||||
|
||||
// Take the minimum block number to ensure all storage layers are consistent.
|
||||
let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
|
||||
|
||||
@@ -134,12 +134,6 @@ impl EngineNodeLauncher {
|
||||
|
||||
let node_config = ctx.node_config();
|
||||
|
||||
// Configure trace-level logging for specific block if requested
|
||||
if let Some(block_num) = node_config.debug.trace_block {
|
||||
info!(target: "reth::cli", block_num, "Trace-level logging enabled for newPayload block");
|
||||
reth_tracing::runtime::set_trace_block(Some(block_num));
|
||||
}
|
||||
|
||||
// We always assume that node is syncing after a restart
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
|
||||
|
||||
@@ -218,9 +218,9 @@ impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> DerefMut for FullNode<N
|
||||
}
|
||||
|
||||
/// Helper type alias to define [`FullNode`] for a given [`Node`].
|
||||
pub type FullNodeFor<N, DB = DatabaseEnv> =
|
||||
pub type FullNodeFor<N, DB = Arc<DatabaseEnv>> =
|
||||
FullNode<NodeAdapter<RethFullAdapter<DB, N>>, <N as Node<RethFullAdapter<DB, N>>>::AddOns>;
|
||||
|
||||
/// Helper type alias to define [`NodeHandle`] for a given [`Node`].
|
||||
pub type NodeHandleFor<N, DB = DatabaseEnv> =
|
||||
pub type NodeHandleFor<N, DB = Arc<DatabaseEnv>> =
|
||||
NodeHandle<NodeAdapter<RethFullAdapter<DB, N>>, <N as Node<RethFullAdapter<DB, N>>>::AddOns>;
|
||||
|
||||
@@ -1192,7 +1192,6 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
|
||||
.pending_block_kind(self.config.pending_block_kind)
|
||||
.raw_tx_forwarder(self.config.raw_tx_forwarder)
|
||||
.evm_memory_limit(self.config.rpc_evm_memory_limit)
|
||||
.force_blob_sidecar_upcasting(self.config.force_blob_sidecar_upcasting)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,13 +108,6 @@ pub struct DebugArgs {
|
||||
/// the backfill, but did not yet receive any new blocks.
|
||||
#[arg(long = "debug.startup-sync-state-idle", help_heading = "Debug")]
|
||||
pub startup_sync_state_idle: bool,
|
||||
|
||||
/// Enable trace-level logging for a specific block during `engine_newPayload` processing.
|
||||
///
|
||||
/// This is useful for debugging block execution issues. Once the block is processed,
|
||||
/// trace logging is automatically disabled (one-shot behavior).
|
||||
#[arg(long = "debug.trace-block", help_heading = "Debug", value_name = "BLOCK_NUMBER")]
|
||||
pub trace_block: Option<u64>,
|
||||
}
|
||||
|
||||
impl Default for DebugArgs {
|
||||
@@ -134,7 +127,6 @@ impl Default for DebugArgs {
|
||||
healthy_node_rpc_url: None,
|
||||
ethstats: None,
|
||||
startup_sync_state_idle: false,
|
||||
trace_block: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
//! clap [Args](clap::Args) for engine purposes
|
||||
|
||||
use clap::{builder::Resettable, Args};
|
||||
use reth_engine_primitives::{
|
||||
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
};
|
||||
use reth_engine_primitives::{TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE};
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use crate::node_config::{
|
||||
@@ -41,8 +38,6 @@ pub struct DefaultEngineValues {
|
||||
disable_proof_v2: bool,
|
||||
cache_metrics_disabled: bool,
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
}
|
||||
|
||||
impl DefaultEngineValues {
|
||||
@@ -184,18 +179,6 @@ impl DefaultEngineValues {
|
||||
self.enable_sparse_trie_as_cache = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the sparse trie prune depth by default
|
||||
pub const fn with_sparse_trie_prune_depth(mut self, v: usize) -> Self {
|
||||
self.sparse_trie_prune_depth = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum number of storage tries to retain after sparse trie pruning by default
|
||||
pub const fn with_sparse_trie_max_storage_tries(mut self, v: usize) -> Self {
|
||||
self.sparse_trie_max_storage_tries = v;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultEngineValues {
|
||||
@@ -222,8 +205,6 @@ impl Default for DefaultEngineValues {
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -355,14 +336,6 @@ pub struct EngineArgs {
|
||||
/// Enable sparse trie as cache.
|
||||
#[arg(long = "engine.enable-sparse-trie-as-cache", default_value_t = DefaultEngineValues::get_global().enable_sparse_trie_as_cache, conflicts_with = "disable_proof_v2")]
|
||||
pub enable_sparse_trie_as_cache: bool,
|
||||
|
||||
/// Sparse trie prune depth.
|
||||
#[arg(long = "engine.sparse-trie-prune-depth", default_value_t = DefaultEngineValues::get_global().sparse_trie_prune_depth, requires = "enable_sparse_trie_as_cache")]
|
||||
pub sparse_trie_prune_depth: usize,
|
||||
|
||||
/// Maximum number of storage tries to retain after sparse trie pruning.
|
||||
#[arg(long = "engine.sparse-trie-max-storage-tries", default_value_t = DefaultEngineValues::get_global().sparse_trie_max_storage_tries, requires = "enable_sparse_trie_as_cache")]
|
||||
pub sparse_trie_max_storage_tries: usize,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
@@ -390,8 +363,6 @@ impl Default for EngineArgs {
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
} = DefaultEngineValues::get_global().clone();
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -419,8 +390,6 @@ impl Default for EngineArgs {
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -428,7 +397,7 @@ impl Default for EngineArgs {
|
||||
impl EngineArgs {
|
||||
/// Creates a [`TreeConfig`] from the engine arguments.
|
||||
pub fn tree_config(&self) -> TreeConfig {
|
||||
TreeConfig::default()
|
||||
let mut config = TreeConfig::default()
|
||||
.with_persistence_threshold(self.persistence_threshold)
|
||||
.with_memory_block_buffer_target(self.memory_block_buffer_target)
|
||||
.with_legacy_state_root(self.legacy_state_root_task_enabled)
|
||||
@@ -445,14 +414,21 @@ impl EngineArgs {
|
||||
.with_always_process_payload_attributes_on_canonical_head(
|
||||
self.always_process_payload_attributes_on_canonical_head,
|
||||
)
|
||||
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
|
||||
.with_storage_worker_count_opt(self.storage_worker_count)
|
||||
.with_account_worker_count_opt(self.account_worker_count)
|
||||
.with_disable_proof_v2(self.disable_proof_v2)
|
||||
.without_cache_metrics(self.cache_metrics_disabled)
|
||||
.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache)
|
||||
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
|
||||
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
|
||||
.with_unwind_canonical_header(self.allow_unwind_canonical_header);
|
||||
|
||||
if let Some(count) = self.storage_worker_count {
|
||||
config = config.with_storage_worker_count(count);
|
||||
}
|
||||
|
||||
if let Some(count) = self.account_worker_count {
|
||||
config = config.with_account_worker_count(count);
|
||||
}
|
||||
|
||||
config = config.with_disable_proof_v2(self.disable_proof_v2);
|
||||
config = config.without_cache_metrics(self.cache_metrics_disabled);
|
||||
config = config.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache);
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
@@ -503,9 +479,7 @@ mod tests {
|
||||
account_worker_count: Some(8),
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: true,
|
||||
enable_sparse_trie_as_cache: true,
|
||||
sparse_trie_prune_depth: 10,
|
||||
sparse_trie_max_storage_tries: 100,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
};
|
||||
|
||||
let parsed_args = CommandParser::<EngineArgs>::parse_from([
|
||||
@@ -536,11 +510,6 @@ mod tests {
|
||||
"--engine.account-worker-count",
|
||||
"8",
|
||||
"--engine.disable-cache-metrics",
|
||||
"--engine.enable-sparse-trie-as-cache",
|
||||
"--engine.sparse-trie-prune-depth",
|
||||
"10",
|
||||
"--engine.sparse-trie-max-storage-tries",
|
||||
"100",
|
||||
])
|
||||
.args;
|
||||
|
||||
|
||||
@@ -8,13 +8,9 @@ use reth_tracing::{
|
||||
};
|
||||
use std::{fmt, fmt::Display};
|
||||
use tracing::{level_filters::LevelFilter, Level};
|
||||
|
||||
/// Constant to convert megabytes to bytes
|
||||
const MB_TO_BYTES: u64 = 1024 * 1024;
|
||||
|
||||
const PROFILER_TRACING_FILTER: &str =
|
||||
"info,engine=debug,trie=debug,providers=debug,rpc=debug,sync=debug,pruner=debug";
|
||||
|
||||
/// The log configuration.
|
||||
#[derive(Debug, Args)]
|
||||
#[command(next_help_heading = "Logging")]
|
||||
@@ -74,7 +70,7 @@ pub struct LogArgs {
|
||||
long = "log.samply.filter",
|
||||
value_name = "FILTER",
|
||||
global = true,
|
||||
default_value = PROFILER_TRACING_FILTER,
|
||||
default_value = "debug",
|
||||
hide = true
|
||||
)]
|
||||
pub samply_filter: String,
|
||||
@@ -88,7 +84,7 @@ pub struct LogArgs {
|
||||
long = "log.tracy.filter",
|
||||
value_name = "FILTER",
|
||||
global = true,
|
||||
default_value = PROFILER_TRACING_FILTER,
|
||||
default_value = "debug",
|
||||
hide = true
|
||||
)]
|
||||
pub tracy_filter: String,
|
||||
|
||||
@@ -6,7 +6,7 @@ use clap::{builder::RangedU64ValueParser, Args};
|
||||
use reth_chainspec::EthereumHardforks;
|
||||
use reth_config::config::PruneConfig;
|
||||
use reth_prune_types::{
|
||||
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE,
|
||||
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_UNWIND_SAFE_DISTANCE,
|
||||
};
|
||||
use std::{collections::BTreeMap, ops::Not, sync::OnceLock};
|
||||
|
||||
@@ -81,7 +81,7 @@ impl Default for DefaultPruningValues {
|
||||
minimal_prune_modes: PruneModes {
|
||||
sender_recovery: Some(PruneMode::Full),
|
||||
transaction_lookup: Some(PruneMode::Full),
|
||||
receipts: Some(PruneMode::Distance(MINIMUM_DISTANCE)),
|
||||
receipts: Some(PruneMode::Full),
|
||||
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
bodies_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::args::{
|
||||
types::{MaxU32, ZeroAsNoneU64},
|
||||
GasPriceOracleArgs, RpcStateCacheArgs,
|
||||
};
|
||||
use alloy_primitives::map::AddressSet;
|
||||
use alloy_primitives::Address;
|
||||
use alloy_rpc_types_engine::JwtSecret;
|
||||
use clap::{
|
||||
builder::{PossibleValue, RangedU64ValueParser, Resettable, TypedValueParser},
|
||||
@@ -15,6 +15,7 @@ use reth_cli_util::{parse_duration_from_secs_or_ms, parse_ether_value};
|
||||
use reth_rpc_eth_types::builder::config::PendingBlockKind;
|
||||
use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
ffi::OsStr,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
path::PathBuf,
|
||||
@@ -88,7 +89,7 @@ pub struct DefaultRpcServerArgs {
|
||||
rpc_proof_permits: usize,
|
||||
rpc_pending_block: PendingBlockKind,
|
||||
rpc_forwarder: Option<Url>,
|
||||
builder_disallow: Option<AddressSet>,
|
||||
builder_disallow: Option<HashSet<Address>>,
|
||||
rpc_state_cache: RpcStateCacheArgs,
|
||||
gas_price_oracle: GasPriceOracleArgs,
|
||||
rpc_send_raw_transaction_sync_timeout: Duration,
|
||||
@@ -334,7 +335,7 @@ impl DefaultRpcServerArgs {
|
||||
}
|
||||
|
||||
/// Set the default builder disallow addresses
|
||||
pub fn with_builder_disallow(mut self, v: Option<AddressSet>) -> Self {
|
||||
pub fn with_builder_disallow(mut self, v: Option<HashSet<Address>>) -> Self {
|
||||
self.builder_disallow = v;
|
||||
self
|
||||
}
|
||||
@@ -620,8 +621,8 @@ pub struct RpcServerArgs {
|
||||
|
||||
/// Path to file containing disallowed addresses, json-encoded list of strings. Block
|
||||
/// validation API will reject blocks containing transactions from these addresses.
|
||||
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<AddressSet>, default_value = Resettable::from(DefaultRpcServerArgs::get_global().builder_disallow.as_ref().map(|v| format!("{:?}", v).into())))]
|
||||
pub builder_disallow: Option<AddressSet>,
|
||||
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<HashSet<Address>>, default_value = Resettable::from(DefaultRpcServerArgs::get_global().builder_disallow.as_ref().map(|v| format!("{:?}", v).into())))]
|
||||
pub builder_disallow: Option<HashSet<Address>>,
|
||||
|
||||
/// State cache configuration.
|
||||
#[command(flatten)]
|
||||
@@ -646,14 +647,6 @@ pub struct RpcServerArgs {
|
||||
/// transactions from the same sender will also be skipped.
|
||||
#[arg(long = "testing.skip-invalid-transactions", default_value_t = true)]
|
||||
pub testing_skip_invalid_transactions: bool,
|
||||
|
||||
/// Force upcasting EIP-4844 blob sidecars to EIP-7594 format when Osaka is active.
|
||||
///
|
||||
/// When enabled, blob transactions submitted via `eth_sendRawTransaction` with EIP-4844
|
||||
/// sidecars will be automatically converted to EIP-7594 format if the next block is Osaka.
|
||||
/// By default this is disabled, meaning transactions are submitted as-is.
|
||||
#[arg(long = "rpc.force-blob-sidecar-upcasting", default_value_t = false)]
|
||||
pub rpc_force_blob_sidecar_upcasting: bool,
|
||||
}
|
||||
|
||||
impl RpcServerArgs {
|
||||
@@ -775,12 +768,6 @@ impl RpcServerArgs {
|
||||
self.rpc_send_raw_transaction_sync_timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables forced blob sidecar upcasting from EIP-4844 to EIP-7594 format.
|
||||
pub const fn with_force_blob_sidecar_upcasting(mut self) -> Self {
|
||||
self.rpc_force_blob_sidecar_upcasting = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RpcServerArgs {
|
||||
@@ -873,7 +860,6 @@ impl Default for RpcServerArgs {
|
||||
gas_price_oracle,
|
||||
rpc_send_raw_transaction_sync_timeout,
|
||||
testing_skip_invalid_transactions: true,
|
||||
rpc_force_blob_sidecar_upcasting: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1050,7 +1036,6 @@ mod tests {
|
||||
},
|
||||
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
|
||||
testing_skip_invalid_transactions: true,
|
||||
rpc_force_blob_sidecar_upcasting: false,
|
||||
};
|
||||
|
||||
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([
|
||||
|
||||
@@ -21,7 +21,7 @@ alloy-primitives.workspace = true
|
||||
alloy-consensus.workspace = true
|
||||
|
||||
tokio.workspace = true
|
||||
tokio-tungstenite.workspace = true
|
||||
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
|
||||
futures-util.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ tempfile = { workspace = true, optional = true }
|
||||
tikv-jemalloc-ctl = { workspace = true, optional = true, features = ["stats"] }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
procfs = "0.18.0"
|
||||
procfs = "0.17.0"
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest.workspace = true
|
||||
|
||||
@@ -37,7 +37,7 @@ pub use commands::{import::ImportOpCommand, import_receipts::ImportReceiptsOpCom
|
||||
use reth_optimism_chainspec::OpChainSpec;
|
||||
use reth_rpc_server_types::{DefaultRpcModuleValidator, RpcModuleValidator};
|
||||
|
||||
use std::{ffi::OsString, fmt, marker::PhantomData};
|
||||
use std::{ffi::OsString, fmt, marker::PhantomData, sync::Arc};
|
||||
|
||||
use chainspec::OpChainSpecParser;
|
||||
use clap::Parser;
|
||||
@@ -121,7 +121,7 @@ where
|
||||
/// [`NodeCommand`](reth_cli_commands::node::NodeCommand).
|
||||
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
{
|
||||
self.with_runner(CliRunner::try_default_runtime()?, launcher)
|
||||
@@ -130,7 +130,7 @@ where
|
||||
/// Execute the configured cli command with the provided [`CliRunner`].
|
||||
pub fn with_runner<L, Fut>(self, runner: CliRunner, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
{
|
||||
let mut this = self.configure();
|
||||
|
||||
@@ -293,11 +293,7 @@ mod tests {
|
||||
use alloy_consensus::{Header, Receipt};
|
||||
use alloy_eips::eip7685::Requests;
|
||||
use alloy_genesis::Genesis;
|
||||
use alloy_primitives::{
|
||||
bytes,
|
||||
map::{AddressMap, B256Map, HashMap},
|
||||
Address, LogData, B256,
|
||||
};
|
||||
use alloy_primitives::{bytes, map::HashMap, Address, LogData, B256};
|
||||
use op_revm::OpSpecId;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_evm::execute::ProviderError;
|
||||
@@ -592,12 +588,12 @@ mod tests {
|
||||
);
|
||||
|
||||
// Create a BundleStateInit object and insert initial data
|
||||
let mut state_init: BundleStateInit = AddressMap::default();
|
||||
let mut state_init: BundleStateInit = HashMap::default();
|
||||
state_init
|
||||
.insert(Address::new([2; 20]), (None, Some(Account::default()), B256Map::default()));
|
||||
.insert(Address::new([2; 20]), (None, Some(Account::default()), HashMap::default()));
|
||||
|
||||
// Create an AddressMap for account reverts and insert initial data
|
||||
let mut revert_inner: AddressMap<AccountRevertInit> = AddressMap::default();
|
||||
// Create a HashMap for account reverts and insert initial data
|
||||
let mut revert_inner: HashMap<Address, AccountRevertInit> = HashMap::default();
|
||||
revert_inner.insert(Address::new([2; 20]), (None, vec![]));
|
||||
|
||||
// Create a RevertsInit object and insert the revert_inner data
|
||||
|
||||
@@ -38,7 +38,7 @@ op-alloy-rpc-types-engine = { workspace = true, features = ["k256"] }
|
||||
|
||||
# io
|
||||
tokio.workspace = true
|
||||
tokio-tungstenite.workspace = true
|
||||
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
|
||||
serde_json.workspace = true
|
||||
url.workspace = true
|
||||
futures-util.workspace = true
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user