mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
70 Commits
dan/static
...
matt/make-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1e406ad1d | ||
|
|
83620dae57 | ||
|
|
35fc3b684f | ||
|
|
75ca930237 | ||
|
|
514b2898aa | ||
|
|
d6af5793e5 | ||
|
|
01f3e58229 | ||
|
|
78c6c9c10f | ||
|
|
039c61e93f | ||
|
|
b545252285 | ||
|
|
6f7c8ad2c9 | ||
|
|
1204674e1a | ||
|
|
764246d5ea | ||
|
|
5356c0480e | ||
|
|
79e52ad2e0 | ||
|
|
c52ff7045c | ||
|
|
ec6e3032f0 | ||
|
|
cea62ade29 | ||
|
|
a66e38c08c | ||
|
|
843b5f3c3c | ||
|
|
c45ccc3e38 | ||
|
|
a6d6a21524 | ||
|
|
f1ed523b20 | ||
|
|
dc39df5746 | ||
|
|
c574a3f7b7 | ||
|
|
7bb5c579e0 | ||
|
|
614a68532b | ||
|
|
648a2b8cf1 | ||
|
|
9cfa8a9566 | ||
|
|
a1c1885fe2 | ||
|
|
dca5852213 | ||
|
|
c94b728af1 | ||
|
|
868ac9d77b | ||
|
|
1e2e33e951 | ||
|
|
598f228e21 | ||
|
|
996121f0a5 | ||
|
|
e7da50a502 | ||
|
|
3020540066 | ||
|
|
f82d143d0c | ||
|
|
bebc532e0e | ||
|
|
0df9791bea | ||
|
|
09adb83922 | ||
|
|
c12b6d4c90 | ||
|
|
7a78044587 | ||
|
|
f88538e033 | ||
|
|
63dff64b8a | ||
|
|
233590cefd | ||
|
|
40962ef6fc | ||
|
|
2f121b099b | ||
|
|
0470050c05 | ||
|
|
cbc416b82a | ||
|
|
3fddefbd38 | ||
|
|
f97a6530c1 | ||
|
|
80e3e1c79d | ||
|
|
ee37c25a4b | ||
|
|
c01f9688e2 | ||
|
|
815a75833e | ||
|
|
59c4e24296 | ||
|
|
d5b5caa439 | ||
|
|
47f1999654 | ||
|
|
3ac5637bd1 | ||
|
|
4cec99ed13 | ||
|
|
2f73835483 | ||
|
|
ed20a40649 | ||
|
|
080a9cfc10 | ||
|
|
c4cd5c9b7b | ||
|
|
ce2a194fb7 | ||
|
|
6dcab51c97 | ||
|
|
4db23809cc | ||
|
|
8399bafdbe |
6
.changelog/rare-tigers-nod.md
Normal file
6
.changelog/rare-tigers-nod.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-trie-sparse: patch
|
||||
reth-engine-tree: patch
|
||||
---
|
||||
|
||||
Removed the `skip_proof_node_filtering` flag, `revealed_account_paths`/`revealed_paths` tracking, and the `filter_revealed_v2_proof_nodes` function from the sparse trie implementation. Also removed the corresponding skipped-nodes metrics, simplifying the proof node reveal path to always pass nodes directly to the sparse trie without pre-filtering.
|
||||
8
.changelog/vain-bats-bark.md
Normal file
8
.changelog/vain-bats-bark.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
reth-trie-common: minor
|
||||
reth-trie: minor
|
||||
reth-trie-parallel: minor
|
||||
reth-engine-tree: patch
|
||||
---
|
||||
|
||||
Moved `ProofV2Target`, `MultiProofTargetsV2`, and `ChunkedMultiProofTargetsV2` from `reth-trie-parallel::targets_v2` into a new `reth-trie-common::target_v2` module, making these types available at a lower level without pulling in the full parallel trie crate. Added a `multiproof_v2` method to `Proof` in `reth-trie` that generates a state multiproof using the V2 proof calculator with synchronous account value encoding.
|
||||
9
.github/scripts/bench-slack-notify.js
vendored
9
.github/scripts/bench-slack-notify.js
vendored
@@ -118,9 +118,12 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
|
||||
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
|
||||
|
||||
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
|
||||
const countsLine = warmup
|
||||
? `*Warmup:* ${warmup} | *Blocks:* ${summary.blocks}`
|
||||
: `*Blocks:* ${summary.blocks}`;
|
||||
const cores = process.env.BENCH_CORES || '0';
|
||||
const countsParts = [];
|
||||
if (warmup) countsParts.push(`*Warmup:* ${warmup}`);
|
||||
countsParts.push(`*Blocks:* ${summary.blocks}`);
|
||||
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
|
||||
const countsLine = countsParts.join(' | ');
|
||||
|
||||
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
|
||||
|
||||
|
||||
13
.github/scripts/bench-slack-users.json
vendored
13
.github/scripts/bench-slack-users.json
vendored
@@ -9,5 +9,16 @@
|
||||
"gakonst": "U092SEPDM40",
|
||||
"Rjected": "U09F6SCKRGT",
|
||||
"DaniPopes": "U09FAT8EK2A",
|
||||
"emmajam": "U0A34UN92HW"
|
||||
"emmajam": "U0A34UN92HW",
|
||||
"onbjerg": "U09FB0UK5AA",
|
||||
"fgimenez": "U09G3GP7CSU",
|
||||
"rakita": "U09FB3Z2M7Y",
|
||||
"jxom": "U09F72MG083",
|
||||
"tmm": "U0AD0U8E88N",
|
||||
"pepyakin": "U0A7HKMGEHJ",
|
||||
"grandizzy": "U09F8DBDDRT",
|
||||
"SuperFluffy": "U095BKHB2Q4",
|
||||
"kamsz": "U0A2563UBRD",
|
||||
"zerosnacks": "U09FARPMN74",
|
||||
"samczsun": "U096R14E4H3"
|
||||
}
|
||||
|
||||
19
.github/scripts/hive/expected_failures.yaml
vendored
19
.github/scripts/hive/expected_failures.yaml
vendored
@@ -16,28 +16,13 @@ rpc-compat:
|
||||
# syncing mode, the test expects syncing to be false on start
|
||||
- eth_syncing/check-syncing (reth)
|
||||
|
||||
# no fix due to https://github.com/paradigmxyz/reth/issues/8732
|
||||
engine-withdrawals:
|
||||
- Withdrawals Fork On Genesis (Paris) (reth)
|
||||
- Withdrawals Fork on Block 1 (Paris) (reth)
|
||||
- Withdrawals Fork on Block 2 (Paris) (reth)
|
||||
- Withdrawals Fork on Block 3 (Paris) (reth)
|
||||
- Withdraw to a single account (Paris) (reth)
|
||||
- Withdraw to two accounts (Paris) (reth)
|
||||
- Withdraw many accounts (Paris) (reth)
|
||||
- Withdraw zero amount (Paris) (reth)
|
||||
- Empty Withdrawals (Paris) (reth)
|
||||
- Corrupted Block Hash Payload (INVALID) (Paris) (reth)
|
||||
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
|
||||
engine-withdrawals: [ ]
|
||||
|
||||
engine-api: [ ]
|
||||
|
||||
# no fix due to https://github.com/paradigmxyz/reth/issues/8732
|
||||
engine-cancun:
|
||||
- Invalid PayloadAttributes, Missing BeaconRoot, Syncing=True (Cancun) (reth)
|
||||
# the test fails with older versions of the code for which it passed before, probably related to changes
|
||||
# in hive or its dependencies
|
||||
- Blob Transaction Ordering, Multiple Clients (Cancun) (reth)
|
||||
|
||||
sync: [ ]
|
||||
|
||||
@@ -49,7 +34,7 @@ engine-auth: [ ]
|
||||
# The test artificially creates an empty account with storage, then tests EIP-7610's behavior.
|
||||
# On mainnet, ~25 such accounts exist as contract addresses (derived from keccak(prefix, caller,
|
||||
# nonce/salt), not from public keys). No private key exists for contract addresses. To trigger
|
||||
# this with EIP-7702, you'd need to recover a private key from one of the already deployed contract addresses - mathematically impossible.
|
||||
# this with EIP-7702, you'd need to recover a private key from one of the already deployed contract addresses - mathematically impossible.
|
||||
#
|
||||
# tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_*
|
||||
# Requires hash collision on create2 address to target already deployed accounts with storage.
|
||||
|
||||
11
.github/scripts/hive/ignored_tests.yaml
vendored
11
.github/scripts/hive/ignored_tests.yaml
vendored
@@ -11,17 +11,6 @@
|
||||
#
|
||||
# When a test should no longer be ignored, remove it from this list.
|
||||
|
||||
# flaky
|
||||
engine-withdrawals:
|
||||
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
|
||||
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
|
||||
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
|
||||
# P2P sync timing issue in hive Docker environment: secondary client returns SYNCING but
|
||||
# peer discovery/connection doesn't complete within the timeout when running with
|
||||
# --sim.parallelism 16. Not a correctness bug, purely a CI timing issue.
|
||||
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
|
||||
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
|
||||
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
|
||||
engine-cancun:
|
||||
- Transaction Re-Org, New Payload on Revert Back (Cancun) (reth)
|
||||
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)
|
||||
|
||||
48
.github/workflows/bench.yml
vendored
48
.github/workflows/bench.yml
vendored
@@ -7,10 +7,6 @@
|
||||
# same block range (snapshot recovered between runs) to compare performance.
|
||||
|
||||
on:
|
||||
# TODO: Disabled temporarily for https://github.com/CodSpeedHQ/runner/issues/55
|
||||
# merge_group:
|
||||
push:
|
||||
branches: [main]
|
||||
issue_comment:
|
||||
types: [created, edited]
|
||||
workflow_dispatch:
|
||||
@@ -60,46 +56,6 @@ permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
codspeed:
|
||||
if: github.event_name == 'push'
|
||||
runs-on: depot-ubuntu-latest
|
||||
concurrency:
|
||||
group: bench-codspeed-${{ github.head_ref || github.run_id }}
|
||||
cancel-in-progress: true
|
||||
strategy:
|
||||
matrix:
|
||||
partition: [1, 2]
|
||||
total_partitions: [2]
|
||||
include:
|
||||
- partition: 1
|
||||
crates: "-p reth-primitives -p reth-trie-common -p reth-trie-sparse"
|
||||
- partition: 2
|
||||
crates: "-p reth-trie"
|
||||
name: codspeed (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
submodules: true
|
||||
ref: ${{ github.event_name == 'issue_comment' && format('refs/pull/{0}/merge', github.event.issue.number) || '' }}
|
||||
- uses: rui314/setup-mold@v1
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: mozilla-actions/sccache-action@v0.0.9
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: Install cargo-codspeed
|
||||
uses: taiki-e/install-action@v2
|
||||
with:
|
||||
tool: cargo-codspeed
|
||||
- name: Build the benchmark target(s)
|
||||
run: cargo codspeed build --profile profiling --features test-utils ${{ matrix.crates }}
|
||||
- name: Run the benchmarks
|
||||
uses: CodSpeedHQ/action@v4
|
||||
with:
|
||||
run: cargo codspeed run ${{ matrix.crates }}
|
||||
mode: instrumentation
|
||||
token: ${{ secrets.CODSPEED_TOKEN }}
|
||||
|
||||
reth-bench-ack:
|
||||
if: |
|
||||
(github.event_name == 'issue_comment' && github.event.issue.pull_request && (startsWith(github.event.comment.body, '@decofe bench') || startsWith(github.event.comment.body, 'derek bench'))) ||
|
||||
@@ -295,7 +251,6 @@ jobs:
|
||||
});
|
||||
allRuns.push(...r);
|
||||
}
|
||||
// Only count runs that trigger reth-bench (not push-triggered codspeed runs)
|
||||
const benchRuns = allRuns.filter(r => r.event === 'issue_comment' || r.event === 'workflow_dispatch');
|
||||
const thisRun = benchRuns.find(r => r.id === context.runId);
|
||||
const thisCreatedAt = thisRun ? new Date(thisRun.created_at) : new Date();
|
||||
@@ -411,6 +366,9 @@ jobs:
|
||||
BENCH_CORES: ${{ needs.reth-bench-ack.outputs.cores }}
|
||||
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
|
||||
steps:
|
||||
- name: Clean up previous bench-work
|
||||
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
|
||||
|
||||
- name: Resolve checkout ref
|
||||
id: checkout-ref
|
||||
uses: actions/github-script@v8
|
||||
|
||||
30
.github/workflows/pr-audit.yml
vendored
Normal file
30
.github/workflows/pr-audit.yml
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
name: Pull request audit
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [labeled]
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
runs-on: ubuntu-latest
|
||||
if: github.event.label.name == 'cyclops'
|
||||
steps:
|
||||
- name: Publish event
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
echo "${{ secrets.EVENTS_KEY }}" > ${{ runner.temp }}/key
|
||||
echo "${{ secrets.EVENTS_CERT }}" > ${{ runner.temp }}/cert
|
||||
|
||||
curl -sf -o /dev/null -X POST ${{ secrets.EVENTS_ARGS }} \
|
||||
-H "Content-Type: application/json" \
|
||||
--key ${{ runner.temp }}/key \
|
||||
--cert ${{ runner.temp }}/cert \
|
||||
-d '{
|
||||
"repository": "${{ github.repository }}",
|
||||
"event": "pr_audit",
|
||||
"data": {
|
||||
"pr_number": ${{ github.event.pull_request.number }},
|
||||
"sha": "${{ github.event.pull_request.head.sha }}"
|
||||
}
|
||||
}'
|
||||
6
.github/workflows/release.yml
vendored
6
.github/workflows/release.yml
vendored
@@ -73,18 +73,22 @@ jobs:
|
||||
os: ubuntu-24.04
|
||||
profile: maxperf
|
||||
allow_fail: false
|
||||
rustflags: "-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"
|
||||
- target: aarch64-unknown-linux-gnu
|
||||
os: ubuntu-24.04-arm
|
||||
profile: maxperf
|
||||
allow_fail: false
|
||||
rustflags: ""
|
||||
- target: x86_64-apple-darwin
|
||||
os: macos-14
|
||||
profile: maxperf
|
||||
allow_fail: false
|
||||
rustflags: "-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"
|
||||
- target: aarch64-apple-darwin
|
||||
os: macos-14
|
||||
profile: maxperf
|
||||
allow_fail: false
|
||||
rustflags: ""
|
||||
build:
|
||||
- command: build
|
||||
binary: reth
|
||||
@@ -110,7 +114,7 @@ jobs:
|
||||
echo "MACOSX_DEPLOYMENT_TARGET=$(xcrun -sdk macosx --show-sdk-platform-version)" >> $GITHUB_ENV
|
||||
|
||||
- name: Build Reth
|
||||
run: make PROFILE=${{ matrix.configs.profile }} ${{ matrix.build.command }}-${{ matrix.configs.target }}
|
||||
run: make PROFILE=${{ matrix.configs.profile }} EXTRA_RUSTFLAGS="${{ matrix.configs.rustflags }}" ${{ matrix.build.command }}-${{ matrix.configs.target }}
|
||||
- name: Move binary
|
||||
run: |
|
||||
mkdir artifacts
|
||||
|
||||
72
CLAUDE.md
72
CLAUDE.md
@@ -191,6 +191,78 @@ The `book` CI job (`.github/workflows/lint.yml`) enforces this by regenerating t
|
||||
|
||||
### Opening PRs against <https://github.com/paradigmxyz/reth>
|
||||
|
||||
#### Titles
|
||||
|
||||
Use [Conventional Commits](https://www.conventionalcommits.org/) with an optional scope:
|
||||
|
||||
```
|
||||
<type>(<scope>): <short description>
|
||||
```
|
||||
|
||||
**Types**: `feat`, `fix`, `perf`, `refactor`, `docs`, `test`, `chore`
|
||||
|
||||
**Scope** (optional): crate or area, e.g. `evm`, `trie`, `rpc`, `engine`, `net`
|
||||
|
||||
Examples:
|
||||
- `fix(rpc): correct gas estimation for ERC-20 transfers`
|
||||
- `perf: batch trie updates to reduce cursor overhead`
|
||||
- `feat(engine): add new_payload_interval metric`
|
||||
|
||||
#### Descriptions
|
||||
|
||||
Keep it short. Say what changed and why — nothing more.
|
||||
|
||||
**Do:**
|
||||
- Write 1–3 sentences summarizing the change
|
||||
- Explain _why_ if the diff doesn't make it obvious
|
||||
- Link related issues or EIPs
|
||||
- Include benchmark numbers for perf changes
|
||||
|
||||
**Don't:**
|
||||
- List every file changed — that's what the diff is for
|
||||
- Repeat the title in the body
|
||||
- Add "Files changed" or "Changes" sections
|
||||
- Write walls of text that go stale when the diff is updated
|
||||
- Use filler like "This PR introduces...", "comprehensive", "robust", "enhance", "leverage"
|
||||
|
||||
**Template:**
|
||||
|
||||
```
|
||||
Closes #<issue>
|
||||
|
||||
<what changed, 1-3 sentences>
|
||||
|
||||
<why, if not obvious from the diff>
|
||||
```
|
||||
|
||||
**Good example:**
|
||||
|
||||
```
|
||||
Closes #16800
|
||||
|
||||
Adds fallback for external IP resolution so node startup doesn't fail
|
||||
when STUN is unreachable. Falls back to the configured default.
|
||||
```
|
||||
|
||||
**Bad example:**
|
||||
|
||||
```
|
||||
## Summary
|
||||
This PR introduces comprehensive improvements to the IP resolution system.
|
||||
|
||||
## Changes
|
||||
- Modified `crates/net/discv4/src/lib.rs` to add fallback
|
||||
- Modified `crates/net/discv4/src/config.rs` to add default IP
|
||||
- Added tests in `crates/net/discv4/src/tests/ip.rs`
|
||||
|
||||
## Files Changed
|
||||
- crates/net/discv4/src/lib.rs
|
||||
- crates/net/discv4/src/config.rs
|
||||
- crates/net/discv4/src/tests/ip.rs
|
||||
```
|
||||
|
||||
#### Labels and CI
|
||||
|
||||
Label PRs appropriately, first check the available labels and then apply the relevant ones:
|
||||
* when changes are RPC related, add A-rpc label
|
||||
* when changes are docs related, add C-docs label
|
||||
|
||||
454
Cargo.lock
generated
454
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
19
Cargo.toml
19
Cargo.toml
@@ -454,7 +454,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-evm = { version = "0.28.1", default-features = false }
|
||||
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
|
||||
@@ -489,14 +489,9 @@ alloy-transport-ipc = { version = "1.7.3", default-features = false }
|
||||
alloy-transport-ws = { version = "1.7.3", default-features = false }
|
||||
|
||||
# op
|
||||
alloy-op-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-op-hardforks = "0.4.4"
|
||||
op-alloy-rpc-types = { version = "0.23.1", default-features = false }
|
||||
op-alloy-rpc-types-engine = { version = "0.23.1", default-features = false }
|
||||
op-alloy-network = { version = "0.23.1", default-features = false }
|
||||
op-alloy-consensus = { version = "0.23.1", default-features = false }
|
||||
op-alloy-rpc-jsonrpsee = { version = "0.23.1", default-features = false }
|
||||
op-alloy-flz = { version = "0.13.1", default-features = false }
|
||||
op-alloy-rpc-types = { version = "0.24.0", default-features = false }
|
||||
op-alloy-rpc-types-engine = { version = "0.24.0", default-features = false }
|
||||
op-alloy-consensus = { version = "0.24.0", default-features = false }
|
||||
|
||||
# misc
|
||||
either = { version = "1.15.0", default-features = false }
|
||||
@@ -523,6 +518,7 @@ humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = { version = "0.14", default-features = false }
|
||||
linked_hash_set = "0.1"
|
||||
libc = "0.2"
|
||||
lz4 = "1.28.1"
|
||||
modular-bitfield = "0.13.1"
|
||||
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
|
||||
@@ -665,6 +661,7 @@ cipher = "0.4.3"
|
||||
comfy-table = "7.0"
|
||||
concat-kdf = "0.1.0"
|
||||
crossbeam-channel = "0.5.13"
|
||||
crossbeam-queue = "0.3"
|
||||
crossbeam-utils = "0.8"
|
||||
crossterm = "0.29.0"
|
||||
csv = "1.3.0"
|
||||
@@ -741,10 +738,8 @@ ipnet = "2.11"
|
||||
# alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
|
||||
|
||||
# op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
|
||||
# op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
|
||||
# op-alloy-rpc-types = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
|
||||
# op-alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
|
||||
# op-alloy-rpc-jsonrpsee = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
|
||||
#
|
||||
# revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "1207e33" }
|
||||
#
|
||||
@@ -755,9 +750,7 @@ ipnet = "2.11"
|
||||
# jsonrpsee-types = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
|
||||
|
||||
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "df124c0" }
|
||||
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "df124c0" }
|
||||
|
||||
# revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "3020ea8" }
|
||||
|
||||
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "072c248" }
|
||||
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "072c248" }
|
||||
|
||||
7
Makefile
7
Makefile
@@ -17,6 +17,9 @@ FEATURES ?=
|
||||
# Cargo profile for builds. Default is for local builds, CI uses an override.
|
||||
PROFILE ?= release
|
||||
|
||||
# Extra RUSTFLAGS to append to build targets (e.g., "-C target-cpu=x86-64-v3")
|
||||
EXTRA_RUSTFLAGS ?=
|
||||
|
||||
# Extra flags for Cargo
|
||||
CARGO_INSTALL_EXTRA_FLAGS ?=
|
||||
|
||||
@@ -74,7 +77,7 @@ build-debug: ## Build the reth binary into `target/debug` directory.
|
||||
cargo build --bin reth --features "$(FEATURES)"
|
||||
# Builds the reth binary natively.
|
||||
build-native-%:
|
||||
cargo build --bin reth --target $* --features "$(FEATURES)" --profile "$(PROFILE)"
|
||||
$(if $(EXTRA_RUSTFLAGS),RUSTFLAGS="$(EXTRA_RUSTFLAGS)") cargo build --bin reth --target $* --features "$(FEATURES)" --profile "$(PROFILE)"
|
||||
|
||||
# The following commands use `cross` to build a cross-compile.
|
||||
#
|
||||
@@ -96,7 +99,7 @@ build-aarch64-unknown-linux-gnu: export JEMALLOC_SYS_WITH_LG_PAGE=16
|
||||
# Note: The additional rustc compiler flags are for intrinsics needed by MDBX.
|
||||
# See: https://github.com/cross-rs/cross/wiki/FAQ#undefined-reference-with-build-std
|
||||
build-%:
|
||||
RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \
|
||||
RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc $(EXTRA_RUSTFLAGS)" \
|
||||
cross build --bin reth --target $* --features "$(FEATURES)" --profile "$(PROFILE)"
|
||||
|
||||
# Unfortunately we can't easily use cross to build for Darwin because of licensing issues.
|
||||
|
||||
@@ -31,6 +31,8 @@ pub(crate) struct BenchContext {
|
||||
pub(crate) is_optimism: bool,
|
||||
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
|
||||
pub(crate) use_reth_namespace: bool,
|
||||
/// Whether to fetch and replay RLP-encoded blocks.
|
||||
pub(crate) rlp_blocks: bool,
|
||||
}
|
||||
|
||||
impl BenchContext {
|
||||
@@ -142,7 +144,8 @@ impl BenchContext {
|
||||
};
|
||||
|
||||
let next_block = first_block.header.number + 1;
|
||||
let use_reth_namespace = bench_args.reth_new_payload;
|
||||
let rlp_blocks = bench_args.rlp_blocks;
|
||||
let use_reth_namespace = bench_args.reth_new_payload || rlp_blocks;
|
||||
Ok(Self {
|
||||
auth_provider,
|
||||
block_provider,
|
||||
@@ -150,6 +153,7 @@ impl BenchContext {
|
||||
next_block,
|
||||
is_optimism,
|
||||
use_reth_namespace,
|
||||
rlp_blocks,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use reth_chainspec::ChainSpec;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
|
||||
use reth_rpc_api::RethNewPayloadInput;
|
||||
use std::{path::PathBuf, time::Instant};
|
||||
use tracing::info;
|
||||
|
||||
@@ -184,34 +185,32 @@ impl Command {
|
||||
Some(new_payload_version),
|
||||
)?;
|
||||
|
||||
let (version, params) = if self.reth_new_payload {
|
||||
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?)
|
||||
} else {
|
||||
(Some(version), params)
|
||||
};
|
||||
|
||||
// Save payload to file with version info for replay
|
||||
let payload_path =
|
||||
self.output.join(format!("payload_block_{}.json", block.header.number));
|
||||
let file = GasRampPayloadFile {
|
||||
version: version as u8,
|
||||
version: version.map(|v| v as u8),
|
||||
block_hash,
|
||||
params: params.clone(),
|
||||
execution_data: Some(execution_data.clone()),
|
||||
};
|
||||
let payload_json = serde_json::to_string_pretty(&file)?;
|
||||
std::fs::write(&payload_path, &payload_json)?;
|
||||
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
|
||||
|
||||
let reth_data = self.reth_new_payload.then_some(execution_data);
|
||||
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
|
||||
let _ = call_new_payload_with_reth(&provider, version, params).await?;
|
||||
|
||||
let forkchoice_state = ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash: block_hash,
|
||||
finalized_block_hash: block_hash,
|
||||
};
|
||||
call_forkchoice_updated_with_reth(
|
||||
&provider,
|
||||
version,
|
||||
forkchoice_state,
|
||||
self.reth_new_payload,
|
||||
)
|
||||
.await?;
|
||||
call_forkchoice_updated_with_reth(&provider, version, forkchoice_state).await?;
|
||||
|
||||
parent_header = block.header;
|
||||
parent_hash = block_hash;
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::{
|
||||
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
|
||||
},
|
||||
};
|
||||
use alloy_provider::Provider;
|
||||
use alloy_provider::{ext::DebugApi, Provider};
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
@@ -154,6 +154,7 @@ impl Command {
|
||||
mut next_block,
|
||||
is_optimism,
|
||||
use_reth_namespace,
|
||||
rlp_blocks,
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
@@ -186,6 +187,21 @@ impl Command {
|
||||
}
|
||||
};
|
||||
|
||||
let rlp = if rlp_blocks {
|
||||
let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
|
||||
Ok(rlp) => rlp,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
|
||||
let _ = error_sender
|
||||
.send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
|
||||
break;
|
||||
}
|
||||
};
|
||||
Some(rlp)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let head_block_hash = block.header.hash;
|
||||
let safe_block_hash = block_provider
|
||||
.get_block_by_number(block.header.number.saturating_sub(32).into());
|
||||
@@ -207,7 +223,7 @@ impl Command {
|
||||
|
||||
next_block += 1;
|
||||
if let Err(e) = sender
|
||||
.send((block, head_block_hash, safe_block_hash, finalized_block_hash))
|
||||
.send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
|
||||
.await
|
||||
{
|
||||
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
|
||||
@@ -221,7 +237,7 @@ impl Command {
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
while let Some((block, head, safe, finalized)) = {
|
||||
while let Some((block, head, safe, finalized, rlp)) = {
|
||||
let wait_start = Instant::now();
|
||||
let result = receiver.recv().await;
|
||||
total_wait_time += wait_start.elapsed();
|
||||
@@ -240,11 +256,11 @@ impl Command {
|
||||
finalized_block_hash: finalized,
|
||||
};
|
||||
|
||||
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
|
||||
let (version, params) =
|
||||
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
|
||||
let start = Instant::now();
|
||||
let reth_data = use_reth_namespace.then_some(execution_data);
|
||||
let server_timings =
|
||||
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
|
||||
call_new_payload_with_reth(&auth_provider, version, params).await?;
|
||||
|
||||
let np_latency =
|
||||
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
|
||||
@@ -263,13 +279,7 @@ impl Command {
|
||||
};
|
||||
|
||||
let fcu_start = Instant::now();
|
||||
call_forkchoice_updated_with_reth(
|
||||
&auth_provider,
|
||||
version,
|
||||
forkchoice_state,
|
||||
use_reth_namespace,
|
||||
)
|
||||
.await?;
|
||||
call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
|
||||
let fcu_latency = fcu_start.elapsed();
|
||||
|
||||
let total_latency = if server_timings.is_some() {
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::{
|
||||
},
|
||||
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
|
||||
};
|
||||
use alloy_provider::Provider;
|
||||
use alloy_provider::{ext::DebugApi, Provider};
|
||||
use clap::Parser;
|
||||
use csv::Writer;
|
||||
use eyre::{Context, OptionExt};
|
||||
@@ -51,6 +51,7 @@ impl Command {
|
||||
mut next_block,
|
||||
is_optimism,
|
||||
use_reth_namespace,
|
||||
rlp_blocks,
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
@@ -83,8 +84,21 @@ impl Command {
|
||||
}
|
||||
};
|
||||
|
||||
let rlp = if rlp_blocks {
|
||||
let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
|
||||
else {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
|
||||
let _ = error_sender
|
||||
.send(eyre::eyre!("Failed to fetch raw block {next_block}"));
|
||||
break;
|
||||
};
|
||||
Some(rlp)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
next_block += 1;
|
||||
if let Err(e) = sender.send(block).await {
|
||||
if let Err(e) = sender.send((block, rlp)).await {
|
||||
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
|
||||
break;
|
||||
}
|
||||
@@ -96,7 +110,7 @@ impl Command {
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
while let Some(block) = {
|
||||
while let Some((block, rlp)) = {
|
||||
let wait_start = Instant::now();
|
||||
let result = receiver.recv().await;
|
||||
total_wait_time += wait_start.elapsed();
|
||||
@@ -108,12 +122,12 @@ impl Command {
|
||||
|
||||
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
|
||||
|
||||
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
|
||||
let (version, params) =
|
||||
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
|
||||
|
||||
let start = Instant::now();
|
||||
let reth_data = use_reth_namespace.then_some(execution_data);
|
||||
let server_timings =
|
||||
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
|
||||
call_new_payload_with_reth(&auth_provider, version, params).await?;
|
||||
|
||||
let latency =
|
||||
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
|
||||
|
||||
@@ -22,14 +22,14 @@ pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct GasRampPayloadFile {
|
||||
/// Engine API version (1-5).
|
||||
pub(crate) version: u8,
|
||||
///
|
||||
/// `None` indicates that `reth_newPayload` should be used.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) version: Option<u8>,
|
||||
/// The block hash for FCU.
|
||||
pub(crate) block_hash: B256,
|
||||
/// The params to pass to newPayload.
|
||||
pub(crate) params: serde_json::Value,
|
||||
/// The execution data for `reth_newPayload`.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
|
||||
}
|
||||
|
||||
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
|
||||
|
||||
@@ -38,6 +38,7 @@ use eyre::Context;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
use reth_rpc_api::RethNewPayloadInput;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
time::{Duration, Instant},
|
||||
@@ -161,7 +162,9 @@ struct GasRampPayload {
|
||||
/// Block number from filename.
|
||||
block_number: u64,
|
||||
/// Engine API version for newPayload.
|
||||
version: EngineApiMessageVersion,
|
||||
///
|
||||
/// `None` indicates that `reth_newPayload` should be used.
|
||||
version: Option<EngineApiMessageVersion>,
|
||||
/// The file contents.
|
||||
file: GasRampPayloadFile,
|
||||
}
|
||||
@@ -273,13 +276,10 @@ impl Command {
|
||||
"Executing gas ramp payload (newPayload + FCU)"
|
||||
);
|
||||
|
||||
let reth_data =
|
||||
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
|
||||
let _ = call_new_payload_with_reth(
|
||||
&auth_provider,
|
||||
payload.version,
|
||||
payload.file.params.clone(),
|
||||
reth_data,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -288,13 +288,7 @@ impl Command {
|
||||
safe_block_hash: parent_hash,
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
call_forkchoice_updated_with_reth(
|
||||
&auth_provider,
|
||||
payload.version,
|
||||
fcu_state,
|
||||
self.reth_new_payload,
|
||||
)
|
||||
.await?;
|
||||
call_forkchoice_updated_with_reth(&auth_provider, payload.version, fcu_state).await?;
|
||||
|
||||
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
|
||||
|
||||
@@ -342,31 +336,34 @@ impl Command {
|
||||
"Sending newPayload"
|
||||
);
|
||||
|
||||
let params = serde_json::to_value((
|
||||
execution_payload.clone(),
|
||||
Vec::<B256>::new(),
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
))?;
|
||||
let (version, params) = if self.reth_new_payload {
|
||||
let reth_data = ExecutionData {
|
||||
payload: execution_payload.clone().into(),
|
||||
sidecar: ExecutionPayloadSidecar::v4(
|
||||
CancunPayloadFields {
|
||||
versioned_hashes: Vec::new(),
|
||||
parent_beacon_block_root: B256::ZERO,
|
||||
},
|
||||
PraguePayloadFields {
|
||||
requests: envelope.execution_requests.clone().into(),
|
||||
},
|
||||
),
|
||||
};
|
||||
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(reth_data),))?)
|
||||
} else {
|
||||
(
|
||||
Some(EngineApiMessageVersion::V4),
|
||||
serde_json::to_value((
|
||||
execution_payload.clone(),
|
||||
Vec::<B256>::new(),
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
))?,
|
||||
)
|
||||
};
|
||||
|
||||
let reth_data = self.reth_new_payload.then(|| ExecutionData {
|
||||
payload: execution_payload.clone().into(),
|
||||
sidecar: ExecutionPayloadSidecar::v4(
|
||||
CancunPayloadFields {
|
||||
versioned_hashes: Vec::new(),
|
||||
parent_beacon_block_root: B256::ZERO,
|
||||
},
|
||||
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
|
||||
),
|
||||
});
|
||||
|
||||
let server_timings = call_new_payload_with_reth(
|
||||
&auth_provider,
|
||||
EngineApiMessageVersion::V4,
|
||||
params,
|
||||
reth_data,
|
||||
)
|
||||
.await?;
|
||||
let server_timings =
|
||||
call_new_payload_with_reth(&auth_provider, version, params).await?;
|
||||
|
||||
let np_latency =
|
||||
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
|
||||
@@ -391,13 +388,7 @@ impl Command {
|
||||
};
|
||||
|
||||
let fcu_start = Instant::now();
|
||||
call_forkchoice_updated_with_reth(
|
||||
&auth_provider,
|
||||
EngineApiMessageVersion::V4,
|
||||
fcu_state,
|
||||
self.reth_new_payload,
|
||||
)
|
||||
.await?;
|
||||
call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
|
||||
let fcu_latency = fcu_start.elapsed();
|
||||
|
||||
let total_latency =
|
||||
@@ -558,13 +549,18 @@ impl Command {
|
||||
let file: GasRampPayloadFile = serde_json::from_str(&content)
|
||||
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
|
||||
|
||||
let version = match file.version {
|
||||
1 => EngineApiMessageVersion::V1,
|
||||
2 => EngineApiMessageVersion::V2,
|
||||
3 => EngineApiMessageVersion::V3,
|
||||
4 => EngineApiMessageVersion::V4,
|
||||
5 => EngineApiMessageVersion::V5,
|
||||
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
|
||||
let version = if let Some(version) = file.version {
|
||||
match version {
|
||||
1 => EngineApiMessageVersion::V1,
|
||||
2 => EngineApiMessageVersion::V2,
|
||||
3 => EngineApiMessageVersion::V3,
|
||||
4 => EngineApiMessageVersion::V4,
|
||||
5 => EngineApiMessageVersion::V5,
|
||||
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
|
||||
}
|
||||
.into()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
info!(
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! before sending additional calls.
|
||||
|
||||
use alloy_eips::eip7685::Requests;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
|
||||
use alloy_rpc_types_engine::{
|
||||
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
|
||||
@@ -12,6 +12,7 @@ use alloy_rpc_types_engine::{
|
||||
use alloy_transport::TransportResult;
|
||||
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
use reth_rpc_api::RethNewPayloadInput;
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error};
|
||||
@@ -169,7 +170,15 @@ where
|
||||
pub(crate) fn block_to_new_payload(
|
||||
block: AnyRpcBlock,
|
||||
is_optimism: bool,
|
||||
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
|
||||
rlp: Option<Bytes>,
|
||||
reth_new_payload: bool,
|
||||
) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
|
||||
if let Some(rlp) = rlp {
|
||||
return Ok((
|
||||
None,
|
||||
serde_json::to_value((RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),))?,
|
||||
));
|
||||
}
|
||||
let block = block
|
||||
.into_inner()
|
||||
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
|
||||
@@ -181,7 +190,14 @@ pub(crate) fn block_to_new_payload(
|
||||
|
||||
// Convert to execution payload
|
||||
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
|
||||
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
|
||||
let (version, params, execution_data) =
|
||||
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
|
||||
|
||||
if reth_new_payload {
|
||||
Ok((None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?))
|
||||
} else {
|
||||
Ok((Some(version), params))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an execution payload and sidecar into versioned engine API params and an
|
||||
@@ -266,17 +282,15 @@ pub(crate) fn payload_to_new_payload(
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
|
||||
provider: P,
|
||||
version: EngineApiMessageVersion,
|
||||
version: Option<EngineApiMessageVersion>,
|
||||
params: serde_json::Value,
|
||||
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
|
||||
call_new_payload_with_reth(provider, version, params, None).await
|
||||
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
|
||||
call_new_payload_with_reth(provider, version, params).await
|
||||
}
|
||||
|
||||
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RethPayloadStatus {
|
||||
#[serde(flatten)]
|
||||
status: PayloadStatus,
|
||||
latency_us: u64,
|
||||
#[serde(default)]
|
||||
persistence_wait_us: Option<u64>,
|
||||
@@ -300,72 +314,50 @@ pub(crate) struct NewPayloadTimingBreakdown {
|
||||
}
|
||||
|
||||
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
|
||||
/// `reth_execution_data` is provided.
|
||||
/// `version` is provided.
|
||||
///
|
||||
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
|
||||
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
|
||||
/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
|
||||
///
|
||||
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
|
||||
/// the standard engine namespace.
|
||||
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
|
||||
provider: P,
|
||||
version: EngineApiMessageVersion,
|
||||
version: Option<EngineApiMessageVersion>,
|
||||
params: serde_json::Value,
|
||||
reth_execution_data: Option<ExecutionData>,
|
||||
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
|
||||
if let Some(execution_data) = reth_execution_data {
|
||||
let method = "reth_newPayload";
|
||||
let reth_params = serde_json::to_value((execution_data.clone(),))
|
||||
.expect("ExecutionData serialization cannot fail");
|
||||
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
|
||||
let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
|
||||
|
||||
debug!(target: "reth-bench", method, "Sending newPayload");
|
||||
debug!(target: "reth-bench", method, "Sending newPayload");
|
||||
|
||||
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
|
||||
let resp = loop {
|
||||
let resp: serde_json::Value = provider.client().request(method, ¶ms).await?;
|
||||
let status = PayloadStatus::deserialize(&resp)?;
|
||||
|
||||
while !resp.status.is_valid() {
|
||||
if resp.status.is_invalid() {
|
||||
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
|
||||
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
|
||||
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
|
||||
)))
|
||||
}
|
||||
if resp.status.is_syncing() {
|
||||
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
|
||||
"invalid range: no canonical state found for parent of requested block",
|
||||
))
|
||||
}
|
||||
resp = provider.client().request(method, &reth_params).await?;
|
||||
if status.is_valid() {
|
||||
break resp;
|
||||
}
|
||||
|
||||
Ok(Some(NewPayloadTimingBreakdown {
|
||||
latency: Duration::from_micros(resp.latency_us),
|
||||
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
|
||||
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
|
||||
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
|
||||
}))
|
||||
} else {
|
||||
let method = version.method_name();
|
||||
|
||||
debug!(target: "reth-bench", method, "Sending newPayload");
|
||||
|
||||
let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?;
|
||||
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
|
||||
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
|
||||
std::io::Error::other(format!("Invalid {method}: {status:?}")),
|
||||
)))
|
||||
}
|
||||
if status.is_syncing() {
|
||||
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
|
||||
"invalid range: no canonical state found for parent of requested block",
|
||||
))
|
||||
}
|
||||
status = provider.client().request(method, ¶ms).await?;
|
||||
if status.is_invalid() {
|
||||
return Err(eyre::eyre!("Invalid {method}: {status:?}"));
|
||||
}
|
||||
Ok(None)
|
||||
if status.is_syncing() {
|
||||
return Err(eyre::eyre!(
|
||||
"invalid range: no canonical state found for parent of requested block"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if version.is_some() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let resp: RethPayloadStatus = serde_json::from_value(resp)?;
|
||||
|
||||
Ok(Some(NewPayloadTimingBreakdown {
|
||||
latency: Duration::from_micros(resp.latency_us),
|
||||
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
|
||||
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
|
||||
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
|
||||
@@ -403,20 +395,25 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
|
||||
P: Provider<N> + EngineApiValidWaitExt<N>,
|
||||
>(
|
||||
provider: P,
|
||||
message_version: EngineApiMessageVersion,
|
||||
message_version: Option<EngineApiMessageVersion>,
|
||||
forkchoice_state: ForkchoiceState,
|
||||
use_reth: bool,
|
||||
) -> TransportResult<ForkchoiceUpdated> {
|
||||
if use_reth {
|
||||
if let Some(message_version) = message_version {
|
||||
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
|
||||
} else {
|
||||
let method = "reth_forkchoiceUpdated";
|
||||
let reth_params = serde_json::to_value((forkchoice_state,))
|
||||
.expect("ForkchoiceState serialization cannot fail");
|
||||
|
||||
debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
|
||||
|
||||
let mut resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
|
||||
loop {
|
||||
let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
|
||||
|
||||
if resp.is_valid() {
|
||||
break Ok(resp)
|
||||
}
|
||||
|
||||
while !resp.is_valid() {
|
||||
if resp.is_invalid() {
|
||||
error!(target: "reth-bench", ?resp, "Invalid {method}");
|
||||
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
|
||||
@@ -428,11 +425,6 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
|
||||
"invalid range: no canonical state found for parent of requested block",
|
||||
))
|
||||
}
|
||||
resp = provider.client().request(method, &reth_params).await?;
|
||||
}
|
||||
|
||||
Ok(resp)
|
||||
} else {
|
||||
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,6 @@ alloy-signer.workspace = true
|
||||
alloy-signer-local.workspace = true
|
||||
rand.workspace = true
|
||||
revm-state.workspace = true
|
||||
criterion.workspace = true
|
||||
|
||||
[features]
|
||||
serde = [
|
||||
@@ -86,8 +85,3 @@ test-utils = [
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
]
|
||||
rayon = ["dep:rayon"]
|
||||
|
||||
[[bench]]
|
||||
name = "canonical_hashes_range"
|
||||
harness = false
|
||||
required-features = ["test-utils"]
|
||||
|
||||
@@ -1,96 +0,0 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use reth_chain_state::{
|
||||
test_utils::TestBlockBuilder, ExecutedBlock, MemoryOverlayStateProviderRef,
|
||||
};
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_storage_api::{noop::NoopProvider, BlockHashReader};
|
||||
|
||||
criterion_group!(benches, bench_canonical_hashes_range);
|
||||
criterion_main!(benches);
|
||||
|
||||
fn bench_canonical_hashes_range(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("canonical_hashes_range");
|
||||
|
||||
let scenarios = [("small", 10), ("medium", 100), ("large", 1000)];
|
||||
|
||||
for (name, num_blocks) in scenarios {
|
||||
group.bench_function(format!("{}_blocks_{}", name, num_blocks), |b| {
|
||||
let (provider, blocks) = setup_provider_with_blocks(num_blocks);
|
||||
let start_block = blocks[0].recovered_block().number;
|
||||
let end_block = blocks[num_blocks / 2].recovered_block().number;
|
||||
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
provider
|
||||
.canonical_hashes_range(black_box(start_block), black_box(end_block))
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
let (provider, blocks) = setup_provider_with_blocks(500);
|
||||
let base_block = blocks[100].recovered_block().number;
|
||||
|
||||
let range_sizes = [1, 10, 50, 100, 250];
|
||||
for range_size in range_sizes {
|
||||
group.bench_function(format!("range_size_{}", range_size), |b| {
|
||||
let end_block = base_block + range_size;
|
||||
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
provider
|
||||
.canonical_hashes_range(black_box(base_block), black_box(end_block))
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
// Benchmark edge cases
|
||||
group.bench_function("no_in_memory_matches", |b| {
|
||||
let (provider, blocks) = setup_provider_with_blocks(100);
|
||||
let first_block = blocks[0].recovered_block().number;
|
||||
let start_block = first_block - 50;
|
||||
let end_block = first_block - 10;
|
||||
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
provider
|
||||
.canonical_hashes_range(black_box(start_block), black_box(end_block))
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
group.bench_function("all_in_memory_matches", |b| {
|
||||
let (provider, blocks) = setup_provider_with_blocks(100);
|
||||
let first_block = blocks[0].recovered_block().number;
|
||||
let last_block = blocks[blocks.len() - 1].recovered_block().number;
|
||||
|
||||
b.iter(|| {
|
||||
black_box(
|
||||
provider
|
||||
.canonical_hashes_range(black_box(first_block), black_box(last_block + 1))
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn setup_provider_with_blocks(
|
||||
num_blocks: usize,
|
||||
) -> (MemoryOverlayStateProviderRef<'static, EthPrimitives>, Vec<ExecutedBlock<EthPrimitives>>) {
|
||||
let mut builder = TestBlockBuilder::<EthPrimitives>::default();
|
||||
|
||||
let blocks: Vec<_> = builder.get_executed_blocks(1000..1000 + num_blocks as u64).collect();
|
||||
|
||||
let historical = Box::new(NoopProvider::default());
|
||||
let provider = MemoryOverlayStateProviderRef::new(historical, blocks.clone());
|
||||
|
||||
(provider, blocks)
|
||||
}
|
||||
@@ -855,15 +855,9 @@ impl From<Genesis> for ChainSpec {
|
||||
// those networks we use the activation
|
||||
// blocks of those networks
|
||||
match genesis.config.chain_id {
|
||||
1 => {
|
||||
if ttd == MAINNET_PARIS_TTD {
|
||||
return Some(MAINNET_PARIS_BLOCK)
|
||||
}
|
||||
}
|
||||
11155111 => {
|
||||
if ttd == SEPOLIA_PARIS_TTD {
|
||||
return Some(SEPOLIA_PARIS_BLOCK)
|
||||
}
|
||||
1 if ttd == MAINNET_PARIS_TTD => return Some(MAINNET_PARIS_BLOCK),
|
||||
11155111 if ttd == SEPOLIA_PARIS_TTD => {
|
||||
return Some(SEPOLIA_PARIS_BLOCK)
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
||||
@@ -19,6 +19,7 @@ mod list;
|
||||
mod prune_checkpoints;
|
||||
mod repair_trie;
|
||||
mod settings;
|
||||
mod stage_checkpoints;
|
||||
mod state;
|
||||
mod static_file_header;
|
||||
mod stats;
|
||||
@@ -70,6 +71,8 @@ pub enum Subcommands {
|
||||
Settings(settings::Command),
|
||||
/// View or set prune checkpoints
|
||||
PruneCheckpoints(prune_checkpoints::Command),
|
||||
// View or set stage checkpoints
|
||||
StageCheckpoints(stage_checkpoints::Command),
|
||||
/// Gets storage size information for an account
|
||||
AccountStorage(account_storage::Command),
|
||||
/// Gets account state and storage at a specific block
|
||||
@@ -213,6 +216,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::StageCheckpoints(command) => {
|
||||
db_exec!(self.env, tool, N, command.access_rights(), {
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::AccountStorage(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
|
||||
297
crates/cli/commands/src/db/stage_checkpoints.rs
Normal file
297
crates/cli/commands/src/db/stage_checkpoints.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
//! `reth db stage-checkpoints` command for viewing and setting stage checkpoint values.
|
||||
|
||||
use clap::{Args, Parser, Subcommand, ValueEnum};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
|
||||
StageCheckpointWriter,
|
||||
};
|
||||
use reth_stages::StageId;
|
||||
|
||||
use crate::common::AccessRights;
|
||||
|
||||
/// `reth db stage-checkpoints` subcommand
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Command {
|
||||
#[command(subcommand)]
|
||||
command: Subcommands,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Returns database access rights required for the command.
|
||||
pub fn access_rights(&self) -> AccessRights {
|
||||
match &self.command {
|
||||
Subcommands::Get { .. } => AccessRights::RO,
|
||||
Subcommands::Set(_) => AccessRights::RW,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the command
|
||||
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
match self.command {
|
||||
Subcommands::Get { stage } => Self::get(tool, stage),
|
||||
Subcommands::Set(args) => Self::set(tool, args),
|
||||
}
|
||||
}
|
||||
|
||||
fn get<N: ProviderNodeTypes>(tool: &DbTool<N>, stage: Option<StageArg>) -> eyre::Result<()> {
|
||||
let provider = tool.provider_factory.provider()?;
|
||||
|
||||
match stage {
|
||||
Some(stage) => {
|
||||
let stage_id = stage.into();
|
||||
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
|
||||
println!("{stage_id}: {checkpoint:?}");
|
||||
}
|
||||
None => {
|
||||
let mut checkpoints = provider.get_all_checkpoints()?;
|
||||
checkpoints.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
for (stage, checkpoint) in checkpoints {
|
||||
println!("{stage}: {checkpoint:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set<N: ProviderNodeTypes>(tool: &DbTool<N>, args: SetArgs) -> eyre::Result<()> {
|
||||
let stage_id: StageId = args.stage.into();
|
||||
let provider_rw = tool.provider_factory.database_provider_rw()?;
|
||||
|
||||
let previous = provider_rw.get_stage_checkpoint(stage_id)?;
|
||||
let mut checkpoint = previous.unwrap_or_default();
|
||||
checkpoint.block_number = args.block_number;
|
||||
|
||||
if args.clear_stage_unit {
|
||||
checkpoint.stage_checkpoint = None;
|
||||
}
|
||||
|
||||
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
|
||||
|
||||
provider_rw.commit()?;
|
||||
|
||||
println!("Updated checkpoint for {stage_id}: {checkpoint:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Subcommands {
|
||||
/// Get stage checkpoint(s) from database.
|
||||
Get {
|
||||
/// Specific stage to query. If omitted, shows all stages.
|
||||
#[arg(long, value_enum)]
|
||||
stage: Option<StageArg>,
|
||||
},
|
||||
/// Set a stage checkpoint.
|
||||
Set(SetArgs),
|
||||
}
|
||||
|
||||
/// Arguments for the `set` subcommand.
|
||||
#[derive(Debug, Args)]
|
||||
pub struct SetArgs {
|
||||
/// Stage to update.
|
||||
#[arg(long, value_enum)]
|
||||
stage: StageArg,
|
||||
|
||||
/// Block number to set as stage checkpoint.
|
||||
#[arg(long)]
|
||||
block_number: u64,
|
||||
|
||||
/// Clear stage-specific unit checkpoint payload.
|
||||
#[arg(long)]
|
||||
clear_stage_unit: bool,
|
||||
}
|
||||
|
||||
/// CLI-friendly stage names.
|
||||
#[derive(Debug, Clone, Copy, ValueEnum)]
|
||||
#[clap(rename_all = "kebab-case")]
|
||||
pub enum StageArg {
|
||||
Era,
|
||||
Headers,
|
||||
Bodies,
|
||||
SenderRecovery,
|
||||
Execution,
|
||||
PruneSenderRecovery,
|
||||
MerkleUnwind,
|
||||
AccountHashing,
|
||||
StorageHashing,
|
||||
MerkleExecute,
|
||||
TransactionLookup,
|
||||
IndexStorageHistory,
|
||||
IndexAccountHistory,
|
||||
Prune,
|
||||
Finish,
|
||||
}
|
||||
|
||||
impl From<StageArg> for StageId {
|
||||
fn from(arg: StageArg) -> Self {
|
||||
match arg {
|
||||
StageArg::Era => Self::Era,
|
||||
StageArg::Headers => Self::Headers,
|
||||
StageArg::Bodies => Self::Bodies,
|
||||
StageArg::SenderRecovery => Self::SenderRecovery,
|
||||
StageArg::Execution => Self::Execution,
|
||||
StageArg::PruneSenderRecovery => Self::PruneSenderRecovery,
|
||||
StageArg::MerkleUnwind => Self::MerkleUnwind,
|
||||
StageArg::AccountHashing => Self::AccountHashing,
|
||||
StageArg::StorageHashing => Self::StorageHashing,
|
||||
StageArg::MerkleExecute => Self::MerkleExecute,
|
||||
StageArg::TransactionLookup => Self::TransactionLookup,
|
||||
StageArg::IndexStorageHistory => Self::IndexStorageHistory,
|
||||
StageArg::IndexAccountHistory => Self::IndexAccountHistory,
|
||||
StageArg::Prune => Self::Prune,
|
||||
StageArg::Finish => Self::Finish,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use clap::Parser;
|
||||
use reth_provider::{
|
||||
test_utils::create_test_provider_factory, DBProvider, DatabaseProviderFactory,
|
||||
StageCheckpointReader, StageCheckpointWriter,
|
||||
};
|
||||
use reth_stages::StageCheckpoint;
|
||||
|
||||
#[test]
|
||||
fn parse_set_args() {
|
||||
let command = Command::parse_from([
|
||||
"stage-checkpoints",
|
||||
"set",
|
||||
"--stage",
|
||||
"headers",
|
||||
"--block-number",
|
||||
"123",
|
||||
]);
|
||||
|
||||
assert!(matches!(
|
||||
command.command,
|
||||
Subcommands::Set(SetArgs {
|
||||
stage: StageArg::Headers,
|
||||
block_number: 123,
|
||||
clear_stage_unit: false,
|
||||
})
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_overwrites_block_number() {
|
||||
let provider_factory = create_test_provider_factory();
|
||||
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
|
||||
|
||||
{
|
||||
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
|
||||
provider_rw
|
||||
.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(10))
|
||||
.expect("save checkpoint");
|
||||
provider_rw.commit().expect("commit initial checkpoint");
|
||||
}
|
||||
|
||||
let command = Command {
|
||||
command: Subcommands::Set(SetArgs {
|
||||
stage: StageArg::Headers,
|
||||
block_number: 42,
|
||||
clear_stage_unit: false,
|
||||
}),
|
||||
};
|
||||
|
||||
command.execute(&tool).expect("execute command");
|
||||
|
||||
let provider = provider_factory.provider().expect("provider");
|
||||
let checkpoint = provider
|
||||
.get_stage_checkpoint(StageId::Headers)
|
||||
.expect("get stage checkpoint")
|
||||
.expect("missing stage checkpoint");
|
||||
|
||||
assert_eq!(checkpoint.block_number, 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_preserves_stage_unit_checkpoint_unless_cleared() {
|
||||
let provider_factory = create_test_provider_factory();
|
||||
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
|
||||
|
||||
{
|
||||
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
|
||||
let checkpoint = StageCheckpoint::new(10).with_block_range(&StageId::Execution, 5, 10);
|
||||
provider_rw
|
||||
.save_stage_checkpoint(StageId::Execution, checkpoint)
|
||||
.expect("save checkpoint");
|
||||
provider_rw.commit().expect("commit initial checkpoint");
|
||||
}
|
||||
|
||||
Command {
|
||||
command: Subcommands::Set(SetArgs {
|
||||
stage: StageArg::Execution,
|
||||
block_number: 11,
|
||||
clear_stage_unit: false,
|
||||
}),
|
||||
}
|
||||
.execute(&tool)
|
||||
.expect("execute command");
|
||||
|
||||
let provider = provider_factory.provider().expect("provider");
|
||||
let checkpoint = provider
|
||||
.get_stage_checkpoint(StageId::Execution)
|
||||
.expect("get stage checkpoint")
|
||||
.expect("missing stage checkpoint");
|
||||
assert!(checkpoint.stage_checkpoint.is_some());
|
||||
|
||||
Command {
|
||||
command: Subcommands::Set(SetArgs {
|
||||
stage: StageArg::Execution,
|
||||
block_number: 12,
|
||||
clear_stage_unit: true,
|
||||
}),
|
||||
}
|
||||
.execute(&tool)
|
||||
.expect("execute command");
|
||||
|
||||
let checkpoint = provider_factory
|
||||
.provider()
|
||||
.expect("provider")
|
||||
.get_stage_checkpoint(StageId::Execution)
|
||||
.expect("get stage checkpoint")
|
||||
.expect("missing stage checkpoint");
|
||||
assert!(checkpoint.stage_checkpoint.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_preserves_checkpoint_progress() {
|
||||
let provider_factory = create_test_provider_factory();
|
||||
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
|
||||
|
||||
{
|
||||
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
|
||||
provider_rw
|
||||
.save_stage_checkpoint(StageId::MerkleExecute, StageCheckpoint::new(10))
|
||||
.expect("save checkpoint");
|
||||
provider_rw
|
||||
.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![1, 2, 3])
|
||||
.expect("save progress");
|
||||
provider_rw.commit().expect("commit initial checkpoint");
|
||||
}
|
||||
|
||||
Command {
|
||||
command: Subcommands::Set(SetArgs {
|
||||
stage: StageArg::MerkleExecute,
|
||||
block_number: 20,
|
||||
clear_stage_unit: false,
|
||||
}),
|
||||
}
|
||||
.execute(&tool)
|
||||
.expect("execute command");
|
||||
|
||||
let provider = provider_factory.provider().expect("provider");
|
||||
let progress = provider
|
||||
.get_stage_checkpoint_progress(StageId::MerkleExecute)
|
||||
.expect("get stage checkpoint progress");
|
||||
|
||||
assert_eq!(progress, Some(vec![1, 2, 3]));
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ use reth_db_api::{
|
||||
};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_provider::providers::ProviderNodeTypes;
|
||||
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
|
||||
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use std::{
|
||||
@@ -17,7 +17,7 @@ use std::{
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
use tracing::info;
|
||||
|
||||
/// Log progress every 5 seconds
|
||||
const LOG_INTERVAL: Duration = Duration::from_secs(30);
|
||||
@@ -152,17 +152,11 @@ impl Command {
|
||||
let mut storage_keys = BTreeSet::new();
|
||||
|
||||
if history_in_rocksdb {
|
||||
error!(
|
||||
target: "reth::cli",
|
||||
"Historical storage queries with RocksDB backend are not yet supported. \
|
||||
Use MDBX for storage history or query current state without --block."
|
||||
);
|
||||
return Ok(());
|
||||
self.collect_staticfile_storage_keys(tool, address, &mut storage_keys)?;
|
||||
} else {
|
||||
self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
|
||||
}
|
||||
|
||||
// Collect keys from MDBX StorageChangeSets using parallel scanning
|
||||
self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
@@ -207,6 +201,63 @@ impl Command {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects storage keys from static file StorageChangeSets (storage_v2).
|
||||
fn collect_staticfile_storage_keys<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
&self,
|
||||
tool: &DbTool<N>,
|
||||
address: Address,
|
||||
keys: &mut BTreeSet<B256>,
|
||||
) -> eyre::Result<()> {
|
||||
let tip = tool.provider_factory.provider()?.best_block_number()?;
|
||||
|
||||
if tip == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
tip,
|
||||
"Scanning static file storage changesets"
|
||||
);
|
||||
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
let walker = static_file_provider.walk_storage_changeset_range(0..=tip);
|
||||
|
||||
let mut total_scanned = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
for changeset_result in walker {
|
||||
let (block_addr, storage_entry) = changeset_result?;
|
||||
total_scanned += 1;
|
||||
|
||||
if block_addr.address() == address {
|
||||
keys.insert(storage_entry.key);
|
||||
}
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
entries_scanned = total_scanned,
|
||||
unique_keys = keys.len(),
|
||||
"Scanning static file storage changesets"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
total_entries = total_scanned,
|
||||
unique_keys = keys.len(),
|
||||
"Finished static file storage changeset scan"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects storage keys from MDBX StorageChangeSets using parallel block range scanning.
|
||||
fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
&self,
|
||||
|
||||
@@ -297,21 +297,18 @@ where
|
||||
}
|
||||
|
||||
match event {
|
||||
Event::Key(key) => {
|
||||
if key.kind == event::KeyEventKind::Press {
|
||||
match key.code {
|
||||
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
|
||||
KeyCode::Down => app.next(),
|
||||
KeyCode::Up => app.previous(),
|
||||
KeyCode::Right => app.next_page(),
|
||||
KeyCode::Left => app.previous_page(),
|
||||
KeyCode::Char('G') => {
|
||||
app.mode = ViewMode::GoToPage;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Event::Key(key) if key.kind == event::KeyEventKind::Press => match key.code {
|
||||
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
|
||||
KeyCode::Down => app.next(),
|
||||
KeyCode::Up => app.previous(),
|
||||
KeyCode::Right => app.next_page(),
|
||||
KeyCode::Left => app.previous_page(),
|
||||
KeyCode::Char('G') => {
|
||||
app.mode = ViewMode::GoToPage;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Event::Key(_) => {}
|
||||
Event::Mouse(e) => match e.kind {
|
||||
MouseEventKind::ScrollDown => app.next(),
|
||||
MouseEventKind::ScrollUp => app.previous(),
|
||||
|
||||
@@ -9,26 +9,6 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
/// How close to the canonical head we persist blocks.
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
|
||||
/// Returns the default number of storage worker threads based on available parallelism.
|
||||
fn default_storage_worker_count() -> usize {
|
||||
#[cfg(feature = "std")]
|
||||
{
|
||||
std::thread::available_parallelism().map_or(8, |n| n.get() * 2)
|
||||
}
|
||||
#[cfg(not(feature = "std"))]
|
||||
{
|
||||
8
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the default number of account worker threads.
|
||||
///
|
||||
/// Account workers coordinate storage proof collection and account trie traversal.
|
||||
/// They are set to the same count as storage workers for simplicity.
|
||||
fn default_account_worker_count() -> usize {
|
||||
default_storage_worker_count()
|
||||
}
|
||||
|
||||
/// The size of proof targets chunk to spawn in one multiproof calculation.
|
||||
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
|
||||
|
||||
@@ -147,10 +127,6 @@ pub struct TreeConfig {
|
||||
always_process_payload_attributes_on_canonical_head: bool,
|
||||
/// Whether to unwind canonical header to ancestor during forkchoice updates.
|
||||
allow_unwind_canonical_header: bool,
|
||||
/// Number of storage proof worker threads.
|
||||
storage_worker_count: usize,
|
||||
/// Number of account proof worker threads.
|
||||
account_worker_count: usize,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
/// Depth for sparse trie pruning after state root computation.
|
||||
@@ -187,8 +163,6 @@ impl Default for TreeConfig {
|
||||
state_root_fallback: false,
|
||||
always_process_payload_attributes_on_canonical_head: false,
|
||||
allow_unwind_canonical_header: false,
|
||||
storage_worker_count: default_storage_worker_count(),
|
||||
account_worker_count: default_account_worker_count(),
|
||||
disable_cache_metrics: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
@@ -220,8 +194,6 @@ impl TreeConfig {
|
||||
state_root_fallback: bool,
|
||||
always_process_payload_attributes_on_canonical_head: bool,
|
||||
allow_unwind_canonical_header: bool,
|
||||
storage_worker_count: usize,
|
||||
account_worker_count: usize,
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
@@ -246,8 +218,6 @@ impl TreeConfig {
|
||||
state_root_fallback,
|
||||
always_process_payload_attributes_on_canonical_head,
|
||||
allow_unwind_canonical_header,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
disable_cache_metrics,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
@@ -479,42 +449,6 @@ impl TreeConfig {
|
||||
self.has_enough_parallelism && !self.legacy_state_root
|
||||
}
|
||||
|
||||
/// Return the number of storage proof worker threads.
|
||||
pub const fn storage_worker_count(&self) -> usize {
|
||||
self.storage_worker_count
|
||||
}
|
||||
|
||||
/// Setter for the number of storage proof worker threads.
|
||||
///
|
||||
/// No-op if it's [`None`].
|
||||
pub const fn with_storage_worker_count_opt(
|
||||
mut self,
|
||||
storage_worker_count: Option<usize>,
|
||||
) -> Self {
|
||||
if let Some(count) = storage_worker_count {
|
||||
self.storage_worker_count = count;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Return the number of account proof worker threads.
|
||||
pub const fn account_worker_count(&self) -> usize {
|
||||
self.account_worker_count
|
||||
}
|
||||
|
||||
/// Setter for the number of account proof worker threads.
|
||||
///
|
||||
/// No-op if it's [`None`].
|
||||
pub const fn with_account_worker_count_opt(
|
||||
mut self,
|
||||
account_worker_count: Option<usize>,
|
||||
) -> Self {
|
||||
if let Some(count) = account_worker_count {
|
||||
self.account_worker_count = count;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether cache metrics recording is disabled.
|
||||
pub const fn disable_cache_metrics(&self) -> bool {
|
||||
self.disable_cache_metrics
|
||||
|
||||
@@ -98,7 +98,6 @@ reth-e2e-test-utils.workspace = true
|
||||
revm-state.workspace = true
|
||||
|
||||
assert_matches.workspace = true
|
||||
criterion.workspace = true
|
||||
eyre.workspace = true
|
||||
serde_json.workspace = true
|
||||
crossbeam-channel.workspace = true
|
||||
@@ -106,14 +105,6 @@ proptest.workspace = true
|
||||
rand.workspace = true
|
||||
rand_08.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "channel_perf"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "state_root_task"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
test-utils = [
|
||||
"reth-chain-state/test-utils",
|
||||
|
||||
@@ -1,138 +0,0 @@
|
||||
//! Benchmark comparing `std::sync::mpsc` and `crossbeam` channels for `StateRootTask`.
|
||||
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use alloy_primitives::{B256, U256};
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
|
||||
use proptest::test_runner::TestRunner;
|
||||
use rand::Rng;
|
||||
use revm_primitives::{Address, HashMap};
|
||||
use revm_state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot};
|
||||
use std::{hint::black_box, thread};
|
||||
|
||||
/// Creates a mock state with the specified number of accounts for benchmarking
|
||||
fn create_bench_state(num_accounts: usize) -> EvmState {
|
||||
let mut runner = TestRunner::deterministic();
|
||||
let mut rng = runner.rng().clone();
|
||||
let mut state_changes = HashMap::default();
|
||||
|
||||
for i in 0..num_accounts {
|
||||
let storage =
|
||||
EvmStorage::from_iter([(U256::from(i), EvmStorageSlot::new(U256::from(i + 1), 0))]);
|
||||
|
||||
let account = Account {
|
||||
info: AccountInfo {
|
||||
balance: U256::from(100),
|
||||
nonce: 10,
|
||||
code_hash: B256::from_slice(&rng.random::<[u8; 32]>()),
|
||||
code: Default::default(),
|
||||
account_id: None,
|
||||
},
|
||||
original_info: Box::new(AccountInfo::default()),
|
||||
storage,
|
||||
status: AccountStatus::empty(),
|
||||
transaction_id: 0,
|
||||
};
|
||||
|
||||
let address = Address::with_last_byte(i as u8);
|
||||
state_changes.insert(address, account);
|
||||
}
|
||||
|
||||
state_changes
|
||||
}
|
||||
|
||||
/// Simulated `StateRootTask` with `std::sync::mpsc`
|
||||
struct StdStateRootTask {
|
||||
rx: std::sync::mpsc::Receiver<EvmState>,
|
||||
}
|
||||
|
||||
impl StdStateRootTask {
|
||||
const fn new(rx: std::sync::mpsc::Receiver<EvmState>) -> Self {
|
||||
Self { rx }
|
||||
}
|
||||
|
||||
fn run(self) {
|
||||
while let Ok(state) = self.rx.recv() {
|
||||
black_box(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Simulated `StateRootTask` with `crossbeam-channel`
|
||||
struct CrossbeamStateRootTask {
|
||||
rx: crossbeam_channel::Receiver<EvmState>,
|
||||
}
|
||||
|
||||
impl CrossbeamStateRootTask {
|
||||
const fn new(rx: crossbeam_channel::Receiver<EvmState>) -> Self {
|
||||
Self { rx }
|
||||
}
|
||||
|
||||
fn run(self) {
|
||||
while let Ok(state) = self.rx.recv() {
|
||||
black_box(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks the performance of different channel implementations for state streaming
|
||||
fn bench_state_stream(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("state_stream_channels");
|
||||
group.sample_size(10);
|
||||
|
||||
for size in &[1, 10, 100] {
|
||||
let bench_setup = || {
|
||||
let states: Vec<_> = (0..100).map(|_| create_bench_state(*size)).collect();
|
||||
states
|
||||
};
|
||||
|
||||
group.bench_with_input(BenchmarkId::new("std_channel", size), size, |b, _| {
|
||||
b.iter_batched(
|
||||
bench_setup,
|
||||
|states| {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let task = StdStateRootTask::new(rx);
|
||||
|
||||
let processor = thread::spawn(move || {
|
||||
task.run();
|
||||
});
|
||||
|
||||
for state in states {
|
||||
tx.send(state).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
processor.join().unwrap();
|
||||
},
|
||||
BatchSize::LargeInput,
|
||||
);
|
||||
});
|
||||
|
||||
group.bench_with_input(BenchmarkId::new("crossbeam_channel", size), size, |b, _| {
|
||||
b.iter_batched(
|
||||
bench_setup,
|
||||
|states| {
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let task = CrossbeamStateRootTask::new(rx);
|
||||
|
||||
let processor = thread::spawn(move || {
|
||||
task.run();
|
||||
});
|
||||
|
||||
for state in states {
|
||||
tx.send(state).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
processor.join().unwrap();
|
||||
},
|
||||
BatchSize::LargeInput,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_state_stream);
|
||||
criterion_main!(benches);
|
||||
@@ -1,272 +0,0 @@
|
||||
//! Benchmark for `StateRootTask` complete workflow, including sending state
|
||||
//! updates using the incoming messages sender and waiting for the final result.
|
||||
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use alloy_consensus::constants::KECCAK_EMPTY;
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::{Address, B256};
|
||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use proptest::test_runner::TestRunner;
|
||||
use rand::Rng;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_engine_tree::tree::{
|
||||
precompile_cache::PrecompileCacheMap, ExecutionEnv, PayloadProcessor, StateProviderBuilder,
|
||||
TreeConfig,
|
||||
};
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_evm::OnStateHook;
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
use reth_primitives_traits::{Account as RethAccount, Recovered, StorageEntry};
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, OverlayStateProviderFactory},
|
||||
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
|
||||
AccountReader, ChainSpecProvider, HashingWriter, ProviderFactory,
|
||||
};
|
||||
use revm_primitives::{HashMap, U256};
|
||||
use revm_state::{Account as RevmAccount, AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
|
||||
use std::{hint::black_box, sync::Arc};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct BenchParams {
|
||||
num_accounts: usize,
|
||||
updates_per_account: usize,
|
||||
storage_slots_per_account: usize,
|
||||
selfdestructs_per_update: usize,
|
||||
}
|
||||
|
||||
/// Generates a series of random state updates with configurable accounts,
|
||||
/// storage, and self-destructs
|
||||
fn create_bench_state_updates(params: &BenchParams) -> Vec<EvmState> {
|
||||
let mut runner = TestRunner::deterministic();
|
||||
let mut rng = runner.rng().clone();
|
||||
let all_addresses: Vec<Address> =
|
||||
(0..params.num_accounts).map(|_| Address::random_with(&mut rng)).collect();
|
||||
let mut updates = Vec::with_capacity(params.updates_per_account);
|
||||
|
||||
for _ in 0..params.updates_per_account {
|
||||
let mut state_update = EvmState::default();
|
||||
let num_accounts_in_update = rng.random_range(1..=params.num_accounts);
|
||||
|
||||
// regular updates for randomly selected accounts
|
||||
for &address in &all_addresses[0..num_accounts_in_update] {
|
||||
// randomly choose to self-destruct with probability
|
||||
// (selfdestructs/accounts)
|
||||
let is_selfdestruct = rng
|
||||
.random_bool(params.selfdestructs_per_update as f64 / params.num_accounts as f64);
|
||||
|
||||
let account = if is_selfdestruct {
|
||||
RevmAccount {
|
||||
info: AccountInfo::default(),
|
||||
storage: HashMap::default(),
|
||||
status: AccountStatus::SelfDestructed,
|
||||
transaction_id: 0,
|
||||
original_info: Box::new(AccountInfo::default()),
|
||||
}
|
||||
} else {
|
||||
RevmAccount {
|
||||
info: AccountInfo {
|
||||
balance: U256::from(rng.random::<u64>()),
|
||||
nonce: rng.random::<u64>(),
|
||||
code_hash: KECCAK_EMPTY,
|
||||
code: Some(Default::default()),
|
||||
account_id: None,
|
||||
},
|
||||
storage: (0..rng.random_range(0..=params.storage_slots_per_account))
|
||||
.map(|_| {
|
||||
(
|
||||
U256::from(rng.random::<u64>()),
|
||||
EvmStorageSlot::new_changed(
|
||||
U256::ZERO,
|
||||
U256::from(rng.random::<u64>()),
|
||||
0,
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
original_info: Box::new(AccountInfo::default()),
|
||||
transaction_id: 0,
|
||||
}
|
||||
};
|
||||
|
||||
state_update.insert(address, account);
|
||||
}
|
||||
|
||||
updates.push(state_update);
|
||||
}
|
||||
|
||||
updates
|
||||
}
|
||||
|
||||
fn convert_revm_to_reth_account(revm_account: &RevmAccount) -> Option<RethAccount> {
|
||||
match revm_account.status {
|
||||
AccountStatus::SelfDestructed => None,
|
||||
_ => Some(RethAccount {
|
||||
balance: revm_account.info.balance,
|
||||
nonce: revm_account.info.nonce,
|
||||
bytecode_hash: if revm_account.info.code_hash == KECCAK_EMPTY {
|
||||
None
|
||||
} else {
|
||||
Some(revm_account.info.code_hash)
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies state updates to the provider, ensuring self-destructs only affect
|
||||
/// existing accounts
|
||||
fn setup_provider(
|
||||
factory: &ProviderFactory<MockNodeTypesWithDB>,
|
||||
state_updates: &[EvmState],
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
for update in state_updates {
|
||||
let provider_rw = factory.provider_rw()?;
|
||||
|
||||
let mut account_updates = Vec::with_capacity(update.len());
|
||||
|
||||
for (address, account) in update {
|
||||
// only process self-destructs if account exists, always process
|
||||
// other updates
|
||||
let should_process = match account.status {
|
||||
AccountStatus::SelfDestructed => {
|
||||
provider_rw.basic_account(address).ok().flatten().is_some()
|
||||
}
|
||||
_ => true,
|
||||
};
|
||||
|
||||
if should_process {
|
||||
account_updates.push((
|
||||
*address,
|
||||
convert_revm_to_reth_account(account),
|
||||
(account.status == AccountStatus::Touched).then(|| {
|
||||
account
|
||||
.storage
|
||||
.iter()
|
||||
.map(|(slot, value)| StorageEntry {
|
||||
key: B256::from(*slot),
|
||||
value: value.present_value,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// update in the provider account and its storage (if available)
|
||||
for (address, account, maybe_storage) in account_updates {
|
||||
provider_rw.insert_account_for_hashing(std::iter::once((address, account)))?;
|
||||
if let Some(storage) = maybe_storage {
|
||||
provider_rw
|
||||
.insert_storage_for_hashing(std::iter::once((address, storage.into_iter())))?;
|
||||
}
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn bench_state_root(c: &mut Criterion) {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let mut group = c.benchmark_group("state_root");
|
||||
|
||||
let scenarios = vec![
|
||||
BenchParams {
|
||||
num_accounts: 100,
|
||||
updates_per_account: 5,
|
||||
storage_slots_per_account: 10,
|
||||
selfdestructs_per_update: 2,
|
||||
},
|
||||
BenchParams {
|
||||
num_accounts: 1000,
|
||||
updates_per_account: 10,
|
||||
storage_slots_per_account: 20,
|
||||
selfdestructs_per_update: 5,
|
||||
},
|
||||
BenchParams {
|
||||
num_accounts: 500,
|
||||
updates_per_account: 8,
|
||||
storage_slots_per_account: 15,
|
||||
selfdestructs_per_update: 20,
|
||||
},
|
||||
];
|
||||
|
||||
for params in scenarios {
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new(
|
||||
"state_root_task",
|
||||
format!(
|
||||
"accounts_{}_updates_{}_slots_{}_selfdestructs_{}",
|
||||
params.num_accounts,
|
||||
params.updates_per_account,
|
||||
params.storage_slots_per_account,
|
||||
params.selfdestructs_per_update
|
||||
),
|
||||
),
|
||||
¶ms,
|
||||
|b, params| {
|
||||
b.iter_with_setup(
|
||||
|| {
|
||||
let factory = create_test_provider_factory_with_chain_spec(Arc::new(
|
||||
ChainSpec::default(),
|
||||
));
|
||||
let genesis_hash = init_genesis(&factory).unwrap();
|
||||
let state_updates = create_bench_state_updates(params);
|
||||
setup_provider(&factory, &state_updates).expect("failed to setup provider");
|
||||
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
);
|
||||
let provider = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
(genesis_hash, payload_processor, provider, state_updates)
|
||||
},
|
||||
|(genesis_hash, mut payload_processor, provider, state_updates)| {
|
||||
black_box({
|
||||
let mut handle = payload_processor.spawn(
|
||||
ExecutionEnv::test_default(),
|
||||
(
|
||||
Vec::<
|
||||
Result<
|
||||
Recovered<TransactionSigned>,
|
||||
core::convert::Infallible,
|
||||
>,
|
||||
>::new(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(
|
||||
provider,
|
||||
reth_trie_db::ChangesetCache::new(),
|
||||
),
|
||||
&TreeConfig::default(),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook();
|
||||
|
||||
for (i, update) in state_updates.into_iter().enumerate() {
|
||||
state_hook.on_state(StateChangeSource::Transaction(i), &update);
|
||||
}
|
||||
drop(state_hook);
|
||||
|
||||
handle.state_root().expect("task failed")
|
||||
});
|
||||
},
|
||||
)
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_state_root);
|
||||
criterion_main!(benches);
|
||||
@@ -427,6 +427,14 @@ where
|
||||
(incoming, outgoing)
|
||||
}
|
||||
|
||||
/// Returns a [`TreeOutcome`] indicating the forkchoice head is valid and canonical.
|
||||
fn valid_outcome(state: ForkchoiceState) -> TreeOutcome<OnForkChoiceUpdated> {
|
||||
TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
|
||||
PayloadStatusEnum::Valid,
|
||||
Some(state.head_block_hash),
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns a new [`Sender`] to send messages to this type.
|
||||
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
|
||||
self.incoming_tx.clone()
|
||||
@@ -1118,11 +1126,7 @@ where
|
||||
}
|
||||
|
||||
// The head block is already canonical
|
||||
let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
|
||||
PayloadStatusEnum::Valid,
|
||||
Some(state.head_block_hash),
|
||||
)));
|
||||
Ok(Some(outcome))
|
||||
Ok(Some(Self::valid_outcome(state)))
|
||||
}
|
||||
|
||||
/// Applies chain update for the new head block and processes payload attributes.
|
||||
@@ -1183,12 +1187,7 @@ where
|
||||
|
||||
// The head block is already canonical and we're not processing payload attributes,
|
||||
// so we're not triggering a payload job and can return right away
|
||||
|
||||
let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
|
||||
PayloadStatusEnum::Valid,
|
||||
Some(state.head_block_hash),
|
||||
)));
|
||||
return Ok(Some(outcome));
|
||||
return Ok(Some(Self::valid_outcome(state)));
|
||||
}
|
||||
|
||||
// Ensure we can apply a new chain update for the head block
|
||||
@@ -1208,11 +1207,7 @@ where
|
||||
return Ok(Some(TreeOutcome::new(updated)));
|
||||
}
|
||||
|
||||
let outcome = TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
|
||||
PayloadStatusEnum::Valid,
|
||||
Some(state.head_block_hash),
|
||||
)));
|
||||
return Ok(Some(outcome));
|
||||
return Ok(Some(Self::valid_outcome(state)));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@@ -2583,6 +2578,51 @@ where
|
||||
Some(TreeEvent::Download(request))
|
||||
}
|
||||
|
||||
/// Handles a downloaded block that was successfully inserted as valid.
|
||||
///
|
||||
/// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
|
||||
/// If it matches a non-head sync target (safe or finalized), makes it canonical inline
|
||||
/// and triggers a download for the remaining blocks towards the actual head.
|
||||
/// Otherwise, tries to connect buffered blocks.
|
||||
fn on_valid_downloaded_block(
|
||||
&mut self,
|
||||
block_num_hash: BlockNumHash,
|
||||
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
|
||||
// check if we just inserted a block that's part of sync targets,
|
||||
// i.e. head, safe, or finalized
|
||||
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
|
||||
sync_target.contains(block_num_hash.hash)
|
||||
{
|
||||
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
|
||||
|
||||
if sync_target.head_block_hash == block_num_hash.hash {
|
||||
// we just inserted the sync target head block, make it canonical
|
||||
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
|
||||
sync_target_head: block_num_hash.hash,
|
||||
})))
|
||||
}
|
||||
|
||||
// This block is part of the sync target (safe or finalized) but not the
|
||||
// head. Make it canonical and try to connect any buffered children, then
|
||||
// continue downloading towards the actual head if needed.
|
||||
self.make_canonical(block_num_hash.hash)?;
|
||||
self.try_connect_buffered_blocks(block_num_hash)?;
|
||||
|
||||
// Check if we've reached the sync target head after connecting buffered
|
||||
// blocks (e.g. the head block may have already been buffered).
|
||||
if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
|
||||
let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
|
||||
trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
|
||||
return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
|
||||
}
|
||||
|
||||
return Ok(None)
|
||||
}
|
||||
trace!(target: "engine::tree", "appended downloaded block");
|
||||
self.try_connect_buffered_blocks(block_num_hash)?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Invoked with a block downloaded from the network
|
||||
///
|
||||
/// Returns an event with the appropriate action to take, such as:
|
||||
@@ -2605,22 +2645,11 @@ where
|
||||
|
||||
// try to append the block
|
||||
match self.insert_block(block) {
|
||||
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
|
||||
// check if we just inserted a block that's part of sync targets,
|
||||
// i.e. head, safe, or finalized
|
||||
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
|
||||
sync_target.contains(block_num_hash.hash)
|
||||
{
|
||||
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
|
||||
|
||||
// we just inserted a block that we know is part of the canonical chain, so we
|
||||
// can make it canonical
|
||||
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
|
||||
sync_target_head: block_num_hash.hash,
|
||||
})))
|
||||
}
|
||||
trace!(target: "engine::tree", "appended downloaded block");
|
||||
self.try_connect_buffered_blocks(block_num_hash)?;
|
||||
Ok(
|
||||
InsertPayloadOk::Inserted(BlockStatus::Valid) |
|
||||
InsertPayloadOk::AlreadySeen(BlockStatus::Valid),
|
||||
) => {
|
||||
return self.on_valid_downloaded_block(block_num_hash);
|
||||
}
|
||||
Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
|
||||
// block is not connected to the canonical head, we need to download
|
||||
|
||||
@@ -44,7 +44,7 @@ use reth_trie_sparse::{
|
||||
use std::{
|
||||
ops::Not,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
atomic::{AtomicBool, AtomicUsize},
|
||||
mpsc::{self, channel},
|
||||
Arc,
|
||||
},
|
||||
@@ -282,40 +282,19 @@ where
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
|
||||
let span = Span::current();
|
||||
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
|
||||
|
||||
let parent_state_root = env.parent_state_root;
|
||||
let transaction_count = env.transaction_count;
|
||||
let chunk_size = config.multiproof_chunk_size();
|
||||
let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
|
||||
let prewarm_handle = self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
provider_builder,
|
||||
Some(to_multi_proof.clone()),
|
||||
Some(state_root_handle.to_multi_proof.clone()),
|
||||
bal,
|
||||
);
|
||||
|
||||
// Create and spawn the storage proof task.
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
|
||||
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
|
||||
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
|
||||
self.spawn_sparse_trie_task(
|
||||
proof_handle,
|
||||
state_root_tx,
|
||||
from_multi_proof,
|
||||
parent_state_root,
|
||||
chunk_size,
|
||||
);
|
||||
|
||||
PayloadHandle {
|
||||
to_multi_proof: Some(to_multi_proof),
|
||||
state_root_handle: Some(state_root_handle),
|
||||
prewarm_handle,
|
||||
state_root: Some(state_root_rx),
|
||||
transactions: execution_rx,
|
||||
_span: span,
|
||||
}
|
||||
@@ -339,14 +318,55 @@ where
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
|
||||
PayloadHandle {
|
||||
to_multi_proof: None,
|
||||
state_root_handle: None,
|
||||
prewarm_handle,
|
||||
state_root: None,
|
||||
transactions: execution_rx,
|
||||
_span: Span::current(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns state root computation pipeline (multiproof + sparse trie tasks).
|
||||
///
|
||||
/// The returned [`StateRootHandle`] provides:
|
||||
/// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
|
||||
/// execution.
|
||||
/// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
|
||||
/// state root.
|
||||
///
|
||||
/// The state hook **must** be dropped after execution to signal the end of state updates.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
pub fn spawn_state_root<F>(
|
||||
&mut self,
|
||||
multiproof_provider_factory: F,
|
||||
env: &ExecutionEnv<Evm>,
|
||||
config: &TreeConfig,
|
||||
) -> StateRootHandle
|
||||
where
|
||||
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
|
||||
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
|
||||
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
|
||||
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
self.spawn_sparse_trie_task(
|
||||
proof_handle,
|
||||
state_root_tx,
|
||||
from_multi_proof,
|
||||
env.parent_state_root,
|
||||
config.multiproof_chunk_size(),
|
||||
);
|
||||
|
||||
StateRootHandle::new(to_multi_proof, state_root_rx)
|
||||
}
|
||||
|
||||
/// Transaction count threshold below which proof workers are halved, since fewer transactions
|
||||
/// produce fewer state changes and most workers would be idle overhead.
|
||||
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
|
||||
@@ -409,6 +429,7 @@ where
|
||||
// few transactions are recovered sequentially and sent immediately before
|
||||
// entering the parallel iterator for the remainder.
|
||||
let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
|
||||
let executor = self.executor.clone();
|
||||
self.executor.spawn_blocking_named("tx-iterator", move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
let mut all: Vec<_> = transactions.into_iter().collect();
|
||||
@@ -424,15 +445,15 @@ where
|
||||
.map(|(i, tx)| {
|
||||
let idx = i + prefetch;
|
||||
let tx = convert.convert(tx);
|
||||
(idx, tx)
|
||||
})
|
||||
.for_each_ordered_in(executor.cpu_pool(), |(idx, tx)| {
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
|
||||
let _ = prewarm_tx.send((idx, tx.clone()));
|
||||
tx
|
||||
});
|
||||
(idx, tx)
|
||||
})
|
||||
.for_each_ordered(|(idx, tx)| {
|
||||
let _ = execute_tx.send(tx);
|
||||
debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
|
||||
});
|
||||
@@ -465,6 +486,8 @@ where
|
||||
|
||||
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
|
||||
|
||||
let executed_tx_index = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// configure prewarming
|
||||
let prewarm_ctx = PrewarmContext {
|
||||
env,
|
||||
@@ -473,6 +496,7 @@ where
|
||||
provider: provider_builder,
|
||||
metrics: PrewarmMetrics::default(),
|
||||
terminate_execution: Arc::new(AtomicBool::new(false)),
|
||||
executed_tx_index: Arc::clone(&executed_tx_index),
|
||||
precompile_cache_disabled: self.precompile_cache_disabled,
|
||||
precompile_cache_map: self.precompile_cache_map.clone(),
|
||||
};
|
||||
@@ -498,7 +522,7 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task) }
|
||||
CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
|
||||
}
|
||||
|
||||
/// Returns the cache for the given parent hash.
|
||||
@@ -579,7 +603,7 @@ where
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie.with_skip_proof_node_filtering(true),
|
||||
sparse_state_trie,
|
||||
chunk_size,
|
||||
);
|
||||
|
||||
@@ -629,6 +653,12 @@ where
|
||||
trie_metrics
|
||||
.into_trie_for_reuse_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
trie_metrics
|
||||
.sparse_trie_retained_memory_bytes
|
||||
.set(trie.memory_size() as f64);
|
||||
trie_metrics
|
||||
.sparse_trie_retained_storage_tries
|
||||
.set(trie.retained_storage_tries_count() as f64);
|
||||
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
|
||||
deferred
|
||||
} else {
|
||||
@@ -725,20 +755,79 @@ fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to a background state root computation task.
|
||||
///
|
||||
/// Unlike [`PayloadHandle`], this does not include transaction iteration or cache prewarming.
|
||||
/// It only provides access to the state root computation via [`Self::state_hook`] and
|
||||
/// [`Self::state_root`].
|
||||
///
|
||||
/// Created by [`PayloadProcessor::spawn_state_root`].
|
||||
#[derive(Debug)]
|
||||
pub struct StateRootHandle {
|
||||
/// Channel for evm state updates to the multiproof pipeline.
|
||||
to_multi_proof: CrossbeamSender<MultiProofMessage>,
|
||||
/// Receiver for the computed state root.
|
||||
state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
|
||||
}
|
||||
|
||||
impl StateRootHandle {
|
||||
/// Creates a new state root handle.
|
||||
pub const fn new(
|
||||
to_multi_proof: CrossbeamSender<MultiProofMessage>,
|
||||
state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
) -> Self {
|
||||
Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
|
||||
}
|
||||
|
||||
/// Returns a state hook that streams state updates to the background state root task.
|
||||
///
|
||||
/// The hook must be dropped after execution completes to signal the end of state updates.
|
||||
pub fn state_hook(&self) -> impl OnStateHook {
|
||||
let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
|
||||
|
||||
move |source: StateChangeSource, state: &EvmState| {
|
||||
let _ =
|
||||
to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Awaits the state root computation result.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If called more than once.
|
||||
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
self.state_root_rx
|
||||
.take()
|
||||
.expect("state_root already taken")
|
||||
.recv()
|
||||
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
|
||||
}
|
||||
|
||||
/// Takes the state root receiver for use with custom waiting logic (e.g., timeouts).
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If called more than once.
|
||||
pub const fn take_state_root_rx(
|
||||
&mut self,
|
||||
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
self.state_root_rx.take().expect("state_root already taken")
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to all the spawned tasks.
|
||||
///
|
||||
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
|
||||
/// caching task without cloning the expensive `BundleState`.
|
||||
#[derive(Debug)]
|
||||
pub struct PayloadHandle<Tx, Err, R> {
|
||||
/// Channel for evm state updates
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
/// Handle to the background state root computation, if spawned.
|
||||
state_root_handle: Option<StateRootHandle>,
|
||||
// must include the receiver of the state root wired to the sparse trie
|
||||
prewarm_handle: CacheTaskHandle<R>,
|
||||
/// Stream of block transactions
|
||||
transactions: mpsc::Receiver<Result<Tx, Err>>,
|
||||
/// Receiver for the state root
|
||||
state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
|
||||
/// Span for tracing
|
||||
_span: Span,
|
||||
}
|
||||
@@ -756,11 +845,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
skip_all
|
||||
)]
|
||||
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
self.state_root
|
||||
.take()
|
||||
.expect("state_root is None")
|
||||
.recv()
|
||||
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
|
||||
self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
|
||||
}
|
||||
|
||||
/// Takes the state root receiver out of the handle for use with custom waiting logic
|
||||
@@ -772,21 +857,14 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
pub const fn take_state_root_rx(
|
||||
&mut self,
|
||||
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
self.state_root.take().expect("state_root is None")
|
||||
self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
|
||||
}
|
||||
|
||||
/// Returns a state hook to be used to send state updates to this task.
|
||||
///
|
||||
/// If a multiproof task is spawned the hook will notify it about new states.
|
||||
pub fn state_hook(&self) -> impl OnStateHook {
|
||||
// convert the channel into a `StateHookSender` that emits an event on drop
|
||||
let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
|
||||
|
||||
move |source: StateChangeSource, state: &EvmState| {
|
||||
if let Some(sender) = &to_multi_proof {
|
||||
let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
|
||||
}
|
||||
}
|
||||
pub fn state_hook(&self) -> Option<impl OnStateHook> {
|
||||
self.state_root_handle.as_ref().map(|handle| handle.state_hook())
|
||||
}
|
||||
|
||||
/// Returns a clone of the caches used by prewarming
|
||||
@@ -799,6 +877,14 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
|
||||
}
|
||||
|
||||
/// Returns a reference to the shared executed transaction index counter.
|
||||
///
|
||||
/// The main execution loop should store `index + 1` after executing each transaction so that
|
||||
/// prewarm workers can skip transactions that have already been processed.
|
||||
pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
|
||||
&self.prewarm_handle.executed_tx_index
|
||||
}
|
||||
|
||||
/// Terminates the pre-warming transaction processing.
|
||||
///
|
||||
/// Note: This does not terminate the task yet.
|
||||
@@ -822,9 +908,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
|
||||
/// Returns iterator yielding transactions from the stream.
|
||||
pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
|
||||
core::iter::repeat_with(|| self.transactions.recv())
|
||||
.take_while(|res| res.is_ok())
|
||||
.map(|res| res.unwrap())
|
||||
self.transactions.iter()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -838,6 +922,9 @@ pub struct CacheTaskHandle<R> {
|
||||
saved_cache: Option<SavedCache>,
|
||||
/// Channel to the spawned prewarm task if any
|
||||
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
|
||||
/// Shared counter tracking the next transaction index to be executed by the main execution
|
||||
/// loop. Prewarm workers skip transactions below this index.
|
||||
executed_tx_index: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
|
||||
@@ -1366,7 +1453,7 @@ mod tests {
|
||||
None, // No BAL for test
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook();
|
||||
let mut state_hook = handle.state_hook().expect("state hook is None");
|
||||
|
||||
for (i, update) in state_updates.into_iter().enumerate() {
|
||||
state_hook.on_state(StateChangeSource::Transaction(i), &update);
|
||||
|
||||
@@ -8,7 +8,7 @@ use metrics::{Gauge, Histogram};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{HashedPostState, HashedStorage};
|
||||
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
|
||||
use reth_trie_common::MultiProofTargetsV2;
|
||||
use std::sync::Arc;
|
||||
use tracing::trace;
|
||||
|
||||
@@ -77,10 +77,11 @@ pub enum MultiProofMessage {
|
||||
/// This should trigger once the block has been executed (after) the last state update has been
|
||||
/// sent. This triggers the exit condition of the multi proof task.
|
||||
#[derive(Deref, Debug)]
|
||||
pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
|
||||
pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
|
||||
|
||||
impl StateHookSender {
|
||||
pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
|
||||
/// Creates a new [`StateHookSender`] wrapping the given channel sender.
|
||||
pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
|
||||
Self(inner)
|
||||
}
|
||||
}
|
||||
@@ -189,6 +190,11 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
pub into_trie_for_reuse_duration_histogram: Histogram,
|
||||
/// Time spent waiting for preserved sparse trie cache to become available.
|
||||
pub sparse_trie_cache_wait_duration_histogram: Histogram,
|
||||
|
||||
/// Retained memory of the preserved sparse trie cache in bytes.
|
||||
pub sparse_trie_retained_memory_bytes: Gauge,
|
||||
/// Number of storage tries retained in the preserved sparse trie cache.
|
||||
pub sparse_trie_retained_storage_tries: Gauge,
|
||||
}
|
||||
|
||||
/// Dispatches work items as a single unit or in chunks based on target size and worker
|
||||
|
||||
@@ -33,9 +33,9 @@ use reth_provider::{
|
||||
};
|
||||
use reth_revm::{database::StateProviderDatabase, state::EvmState};
|
||||
use reth_tasks::{pool::WorkerPool, Runtime};
|
||||
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
|
||||
use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
mpsc::{self, channel, Receiver, Sender},
|
||||
Arc,
|
||||
};
|
||||
@@ -149,7 +149,7 @@ where
|
||||
});
|
||||
|
||||
while let Ok((index, tx)) = pending.recv() {
|
||||
if ctx.terminate_execution.load(Ordering::Relaxed) {
|
||||
if ctx.should_stop() {
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
"Termination requested, stopping transaction distribution"
|
||||
@@ -157,6 +157,11 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
// skip transactions already executed by the main loop
|
||||
if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tx_count += 1;
|
||||
let parent_span = Span::current();
|
||||
s.spawn(move |_| {
|
||||
@@ -202,13 +207,18 @@ where
|
||||
Tx: ExecutableTxFor<Evm>,
|
||||
{
|
||||
WorkerPool::with_worker_mut(|worker| {
|
||||
let Some((evm, metrics, terminate_execution)) =
|
||||
let Some(evm) =
|
||||
worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
if ctx.should_stop() {
|
||||
return;
|
||||
}
|
||||
|
||||
// skip if main execution has already processed this transaction
|
||||
if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -225,25 +235,25 @@ where
|
||||
sender=%tx.signer(),
|
||||
"Error when executing prewarm transaction",
|
||||
);
|
||||
metrics.transaction_errors.increment(1);
|
||||
ctx.metrics.transaction_errors.increment(1);
|
||||
return;
|
||||
}
|
||||
};
|
||||
metrics.execution_duration.record(start.elapsed());
|
||||
ctx.metrics.execution_duration.record(start.elapsed());
|
||||
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
if ctx.should_stop() {
|
||||
return;
|
||||
}
|
||||
|
||||
if index > 0 {
|
||||
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
ctx.metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
if let Some(to_multi_proof) = to_multi_proof {
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
}
|
||||
|
||||
metrics.total_runtime.record(start.elapsed());
|
||||
ctx.metrics.total_runtime.record(start.elapsed());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -348,7 +358,7 @@ where
|
||||
bal.par_iter().for_each_init(
|
||||
|| (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
|
||||
|(ctx, provider), account| {
|
||||
if ctx.terminate_execution.load(Ordering::Relaxed) {
|
||||
if ctx.should_stop() {
|
||||
return;
|
||||
}
|
||||
ctx.prefetch_bal_account(provider, account);
|
||||
@@ -442,7 +452,7 @@ where
|
||||
PrewarmTaskEvent::TerminateTransactionExecution => {
|
||||
// stop tx processing
|
||||
debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
|
||||
self.ctx.terminate_execution.store(true, Ordering::Relaxed);
|
||||
self.ctx.stop();
|
||||
}
|
||||
PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
|
||||
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
|
||||
@@ -497,6 +507,10 @@ where
|
||||
pub metrics: PrewarmMetrics,
|
||||
/// An atomic bool that tells prewarm tasks to not start any more execution.
|
||||
pub terminate_execution: Arc<AtomicBool>,
|
||||
/// Shared counter tracking the next transaction index to be executed by the main execution
|
||||
/// loop. Prewarm workers skip transactions with `index < counter` since those have already
|
||||
/// been executed.
|
||||
pub executed_tx_index: Arc<AtomicUsize>,
|
||||
/// Whether the precompile cache is disabled.
|
||||
pub precompile_cache_disabled: bool,
|
||||
/// The precompile cache map.
|
||||
@@ -505,11 +519,8 @@ where
|
||||
|
||||
/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
|
||||
/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
|
||||
type PrewarmEvmState<Evm> = Option<(
|
||||
EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>,
|
||||
PrewarmMetrics,
|
||||
Arc<AtomicBool>,
|
||||
)>;
|
||||
type PrewarmEvmState<Evm> =
|
||||
Option<EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>>;
|
||||
|
||||
impl<N, P, Evm> PrewarmContext<N, P, Evm>
|
||||
where
|
||||
@@ -517,7 +528,7 @@ where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
Evm: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
/// Creates a per-thread EVM, metrics handle, and termination flag for prewarming.
|
||||
/// Creates a per-thread EVM for prewarming.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
|
||||
let mut state_provider = match self.provider.build() {
|
||||
@@ -558,7 +569,7 @@ where
|
||||
|
||||
if !self.precompile_cache_disabled {
|
||||
// Only cache pure precompiles to avoid issues with stateful precompiles
|
||||
evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
|
||||
evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
|
||||
CachedPrecompile::wrap(
|
||||
precompile,
|
||||
self.precompile_cache_map.cache_for_address(*address),
|
||||
@@ -568,7 +579,19 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
Some((evm, self.metrics.clone(), self.terminate_execution.clone()))
|
||||
Some(evm)
|
||||
}
|
||||
|
||||
/// Returns `true` if prewarming should stop.
|
||||
#[inline]
|
||||
pub fn should_stop(&self) -> bool {
|
||||
self.terminate_execution.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Signals all prewarm tasks to stop execution.
|
||||
#[inline]
|
||||
pub fn stop(&self) {
|
||||
self.terminate_execution.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Prefetches a single account and all its storage slots from the BAL into the cache.
|
||||
@@ -620,8 +643,6 @@ where
|
||||
/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
|
||||
/// given state.
|
||||
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
|
||||
use reth_trie::proof_v2;
|
||||
|
||||
let mut targets = MultiProofTargetsV2::default();
|
||||
let mut storage_target_count = 0;
|
||||
for (addr, account) in state {
|
||||
@@ -647,7 +668,7 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize
|
||||
}
|
||||
|
||||
let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
|
||||
storage_slots.push(proof_v2::Target::from(hashed_slot));
|
||||
storage_slots.push(ProofV2Target::from(hashed_slot));
|
||||
}
|
||||
|
||||
storage_target_count += storage_slots.len();
|
||||
|
||||
@@ -16,15 +16,15 @@ use rayon::iter::ParallelIterator;
|
||||
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount,
|
||||
EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
|
||||
TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
};
|
||||
use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{
|
||||
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
|
||||
},
|
||||
root::ParallelStateRootError,
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
};
|
||||
#[cfg(feature = "trie-debug")]
|
||||
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
|
||||
@@ -509,12 +509,12 @@ where
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
targets.push(ProofV2Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
targets.push(ProofV2Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -551,13 +551,13 @@ where
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.push_account_target(Target::new(target).with_min_len(min_len));
|
||||
.push_account_target(ProofV2Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.push_account_target(Target::new(target).with_min_len(min_len));
|
||||
.push_account_target(ProofV2Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
})?;
|
||||
@@ -734,13 +734,13 @@ impl PendingTargets {
|
||||
}
|
||||
|
||||
/// Adds a target to the account targets.
|
||||
fn push_account_target(&mut self, target: Target) {
|
||||
fn push_account_target(&mut self, target: ProofV2Target) {
|
||||
self.targets.account_targets.push(target);
|
||||
self.len += 1;
|
||||
}
|
||||
|
||||
/// Extends storage targets for the given address.
|
||||
fn extend_storage_targets(&mut self, address: &B256, targets: Vec<Target>) {
|
||||
fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
|
||||
self.len += targets.len();
|
||||
self.targets.storage_targets.entry(*address).or_default().extend(targets);
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use reth_engine_primitives::{
|
||||
use reth_errors::{BlockExecutionError, ProviderResult};
|
||||
use reth_evm::{
|
||||
block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
|
||||
SpecFor,
|
||||
OnStateHook, SpecFor,
|
||||
};
|
||||
use reth_payload_primitives::{
|
||||
BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
|
||||
@@ -50,7 +50,11 @@ use revm_primitives::Address;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::{mpsc::RecvTimeoutError, Arc},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
mpsc::RecvTimeoutError,
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
|
||||
|
||||
@@ -838,19 +842,21 @@ where
|
||||
|
||||
if !self.config.precompile_cache_disabled() {
|
||||
let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
|
||||
executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
|
||||
let metrics = self
|
||||
.precompile_cache_metrics
|
||||
.entry(*address)
|
||||
.or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
|
||||
.clone();
|
||||
CachedPrecompile::wrap(
|
||||
precompile,
|
||||
self.precompile_cache_map.cache_for_address(*address),
|
||||
spec_id,
|
||||
Some(metrics),
|
||||
)
|
||||
});
|
||||
executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
|
||||
|address, precompile| {
|
||||
let metrics = self
|
||||
.precompile_cache_metrics
|
||||
.entry(*address)
|
||||
.or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
|
||||
.clone();
|
||||
CachedPrecompile::wrap(
|
||||
precompile,
|
||||
self.precompile_cache_map.cache_for_address(*address),
|
||||
spec_id,
|
||||
Some(metrics),
|
||||
)
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn background task to compute receipt root and logs bloom incrementally.
|
||||
@@ -864,7 +870,10 @@ where
|
||||
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
|
||||
|
||||
let transaction_count = input.transaction_count();
|
||||
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
|
||||
let executed_tx_index = Arc::clone(handle.executed_tx_index());
|
||||
let executor = executor.with_state_hook(
|
||||
handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
|
||||
);
|
||||
|
||||
let execution_start = Instant::now();
|
||||
|
||||
@@ -874,6 +883,7 @@ where
|
||||
transaction_count,
|
||||
handle.iter_transactions(),
|
||||
&receipt_tx,
|
||||
&executed_tx_index,
|
||||
)?;
|
||||
drop(receipt_tx);
|
||||
|
||||
@@ -913,6 +923,7 @@ where
|
||||
transaction_count: usize,
|
||||
transactions: impl Iterator<Item = Result<Tx, Err>>,
|
||||
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
|
||||
executed_tx_index: &AtomicUsize,
|
||||
) -> Result<(E, Vec<Address>), BlockExecutionError>
|
||||
where
|
||||
E: BlockExecutor<Receipt = N::Receipt>,
|
||||
@@ -959,6 +970,9 @@ where
|
||||
executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
// advance the shared counter so prewarm workers skip already-executed txs
|
||||
executed_tx_index.store(senders.len(), Ordering::Relaxed);
|
||||
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
|
||||
|
||||
use alloy_primitives::Bytes;
|
||||
use alloy_primitives::{
|
||||
map::{DefaultHashBuilder, FbBuildHasher},
|
||||
Bytes,
|
||||
};
|
||||
use moka::policy::EvictionPolicy;
|
||||
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
|
||||
use reth_primitives_traits::dashmap::DashMap;
|
||||
@@ -13,7 +16,7 @@ const MAX_CACHE_SIZE: u32 = 10_000;
|
||||
|
||||
/// Stores caches for each precompile.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
|
||||
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>, FbBuildHasher<20>>>)
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
|
||||
|
||||
@@ -37,9 +40,7 @@ where
|
||||
|
||||
/// Cache for precompiles, for each input stores the result.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PrecompileCache<S>(
|
||||
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
|
||||
)
|
||||
pub struct PrecompileCache<S>(moka::sync::Cache<Bytes, CacheEntry<S>, DefaultHashBuilder>)
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
///
|
||||
/// Both parent hash and anchor hash must match to ensure the overlay is valid.
|
||||
/// This prevents using a stale overlay after persistence has advanced the anchor.
|
||||
pub(crate) fn get_cached_overlay(
|
||||
pub fn get_cached_overlay(
|
||||
&self,
|
||||
parent_hash: B256,
|
||||
expected_anchor: B256,
|
||||
|
||||
@@ -2041,3 +2041,126 @@ mod forkchoice_updated_tests {
|
||||
assert_eq!(last_persisted_number, canonical_tip);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests that `on_valid_downloaded_block` triggers a download for the actual head block when
|
||||
/// the block matches a non-head sync target (safe or finalized).
|
||||
///
|
||||
/// This exercises the exact code path fixed in `on_downloaded_block`: after `insert_block`
|
||||
/// returns `Inserted(Valid)`, `on_valid_downloaded_block` checks `sync_target.contains()`.
|
||||
/// If the block is NOT the head, it should make canonical inline and emit a `Download`
|
||||
/// event for the head — rather than returning `MakeCanonical` which would stop the download
|
||||
/// pipeline.
|
||||
///
|
||||
/// Reproduces the hive test failure:
|
||||
/// "Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts -
|
||||
/// No Transactions: Timeout while waiting for secondary client to sync"
|
||||
#[test]
|
||||
fn test_on_valid_downloaded_non_head_sync_target_continues_to_head() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = MAINNET.clone();
|
||||
let mut test_harness = TestHarness::new(chain_spec);
|
||||
|
||||
// Build blocks: genesis (0) and safe block (1).
|
||||
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
|
||||
let genesis = &blocks[0];
|
||||
let safe_block = &blocks[1];
|
||||
|
||||
// Insert genesis and safe block into the tree. The safe block must be in the tree
|
||||
// for `make_canonical` to succeed inside `on_valid_downloaded_block`.
|
||||
test_harness = test_harness.with_blocks(vec![genesis.clone(), safe_block.clone()]);
|
||||
|
||||
let genesis_hash = genesis.recovered_block().hash();
|
||||
let safe_hash = safe_block.recovered_block().hash();
|
||||
let head_hash = B256::random(); // head block is unknown — hasn't been downloaded yet
|
||||
|
||||
// Reset canonical head to genesis so the safe block is in tree but not yet canonical.
|
||||
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
|
||||
|
||||
// Set the forkchoice tracker to SYNCING with head != safe.
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: head_hash,
|
||||
safe_block_hash: safe_hash,
|
||||
finalized_block_hash: genesis_hash,
|
||||
};
|
||||
test_harness
|
||||
.tree
|
||||
.state
|
||||
.forkchoice_state_tracker
|
||||
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
|
||||
|
||||
// Call on_valid_downloaded_block — this is called by on_downloaded_block after
|
||||
// insert_block returns Inserted(Valid).
|
||||
let safe_num_hash = safe_block.recovered_block().num_hash();
|
||||
let result = test_harness.tree.on_valid_downloaded_block(safe_num_hash).unwrap();
|
||||
|
||||
// With the fix: the engine makes safe canonical inline, then emits Download for head.
|
||||
// Without the fix: it would return MakeCanonical{safe_hash} and never download head.
|
||||
match result {
|
||||
Some(TreeEvent::Download(DownloadRequest::BlockSet(hashes))) => {
|
||||
assert!(
|
||||
hashes.contains(&head_hash),
|
||||
"Expected download for head block {head_hash}, got {hashes:?}"
|
||||
);
|
||||
}
|
||||
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
|
||||
panic!(
|
||||
"BUG: returned MakeCanonical for non-head block {sync_target_head} \
|
||||
instead of downloading the actual head {head_hash}"
|
||||
);
|
||||
}
|
||||
other => panic!("Expected Download event for head block, got: {other:?}"),
|
||||
}
|
||||
|
||||
// Verify the safe block was made canonical.
|
||||
assert_eq!(
|
||||
test_harness.tree.state.tree_state.canonical_block_hash(),
|
||||
safe_hash,
|
||||
"Safe block should be canonical after on_valid_downloaded_block"
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that `on_valid_downloaded_block` returns `MakeCanonical` when the downloaded block
|
||||
/// IS the sync target head (the normal non-buggy path).
|
||||
#[test]
|
||||
fn test_on_valid_downloaded_head_sync_target_returns_make_canonical() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = MAINNET.clone();
|
||||
let mut test_harness = TestHarness::new(chain_spec);
|
||||
|
||||
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
|
||||
let genesis = &blocks[0];
|
||||
let head_block = &blocks[1];
|
||||
|
||||
test_harness = test_harness.with_blocks(vec![genesis.clone(), head_block.clone()]);
|
||||
|
||||
let genesis_hash = genesis.recovered_block().hash();
|
||||
let head_hash = head_block.recovered_block().hash();
|
||||
|
||||
// Reset canonical head to genesis.
|
||||
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
|
||||
|
||||
// Set the forkchoice tracker: head == the downloaded block.
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: head_hash,
|
||||
safe_block_hash: head_hash,
|
||||
finalized_block_hash: genesis_hash,
|
||||
};
|
||||
test_harness
|
||||
.tree
|
||||
.state
|
||||
.forkchoice_state_tracker
|
||||
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
|
||||
|
||||
let head_num_hash = head_block.recovered_block().num_hash();
|
||||
let result = test_harness.tree.on_valid_downloaded_block(head_num_hash).unwrap();
|
||||
|
||||
// When the downloaded block IS the head, should return MakeCanonical.
|
||||
match result {
|
||||
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
|
||||
assert_eq!(sync_target_head, head_hash);
|
||||
}
|
||||
other => panic!("Expected MakeCanonical for head block, got: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ impl EngineMessageStore {
|
||||
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
|
||||
}
|
||||
}
|
||||
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
|
||||
Ok(filenames_by_ts.into_values().flatten())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ where
|
||||
(EthEvmConfig::ethereum(spec.clone()), Arc::new(EthBeaconConsensus::new(spec)))
|
||||
};
|
||||
|
||||
self.run_with_components::<EthereumNode>(components, |builder, ext| async move {
|
||||
self.run_with_components::<EthereumNode>(components, async move |builder, ext| {
|
||||
launcher.entrypoint(builder, ext).await
|
||||
})
|
||||
}
|
||||
@@ -127,6 +127,9 @@ where
|
||||
|
||||
self.init_tracing(&runner)?;
|
||||
|
||||
// Deprioritize background threads spawned by tracing/OTel libraries.
|
||||
reth_tasks::utils::deprioritize_background_threads();
|
||||
|
||||
// Install the prometheus recorder to be sure to record all metrics
|
||||
install_prometheus_recorder();
|
||||
|
||||
|
||||
@@ -242,7 +242,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_valid_gas_limit_increase() {
|
||||
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
|
||||
let child = header_with_gas_limit((parent.gas_limit + 5) as u64);
|
||||
let child = header_with_gas_limit(parent.gas_limit + 5);
|
||||
|
||||
assert!(validate_against_parent_gas_limit(
|
||||
&child,
|
||||
@@ -260,7 +260,7 @@ mod tests {
|
||||
assert!(matches!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
|
||||
ConsensusError::GasLimitInvalidMinimum { child_gas_limit }
|
||||
if child_gas_limit == child.gas_limit as u64
|
||||
if child_gas_limit == child.gas_limit
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -28,10 +28,6 @@ alloy-evm.workspace = true
|
||||
alloy-consensus.workspace = true
|
||||
alloy-rpc-types-engine.workspace = true
|
||||
|
||||
# Misc
|
||||
parking_lot = { workspace = true, optional = true }
|
||||
derive_more = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
reth-testing-utils.workspace = true
|
||||
reth-evm = { workspace = true, features = ["test-utils"] }
|
||||
@@ -54,14 +50,11 @@ std = [
|
||||
"reth-primitives-traits/std",
|
||||
"revm/std",
|
||||
"reth-ethereum-primitives/std",
|
||||
"derive_more?/std",
|
||||
"alloy-rpc-types-engine/std",
|
||||
"reth-storage-errors/std",
|
||||
]
|
||||
test-utils = [
|
||||
"std",
|
||||
"dep:parking_lot",
|
||||
"dep:derive_more",
|
||||
"reth-chainspec/test-utils",
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
"reth-evm/test-utils",
|
||||
|
||||
@@ -3,7 +3,7 @@ use alloy_consensus::{
|
||||
proofs::{self, calculate_receipt_root},
|
||||
Block, BlockBody, BlockHeader, Header, TxReceipt, EMPTY_OMMER_ROOT_HASH,
|
||||
};
|
||||
use alloy_eips::merge::BEACON_NONCE;
|
||||
use alloy_eips::{eip4895::Withdrawals, merge::BEACON_NONCE};
|
||||
use alloy_evm::{block::BlockExecutorFactory, eth::EthBlockExecutionCtx};
|
||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||
use reth_evm::execute::{BlockAssembler, BlockAssemblerInput, BlockExecutionError};
|
||||
@@ -61,7 +61,7 @@ where
|
||||
let withdrawals = self
|
||||
.chain_spec
|
||||
.is_shanghai_active_at_timestamp(timestamp)
|
||||
.then(|| ctx.withdrawals.map(|w| w.into_owned()).unwrap_or_default());
|
||||
.then(|| Withdrawals::new(ctx.withdrawals.map(|w| w.into_owned()).unwrap_or_default()));
|
||||
|
||||
let withdrawals_root =
|
||||
withdrawals.as_deref().map(|w| proofs::calculate_withdrawals_root(w));
|
||||
|
||||
@@ -192,7 +192,7 @@ where
|
||||
parent_hash: block.header().parent_hash,
|
||||
parent_beacon_block_root: block.header().parent_beacon_block_root,
|
||||
ommers: &block.body().ommers,
|
||||
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
|
||||
withdrawals: block.body().withdrawals.as_ref().map(|w| Cow::Borrowed(w.as_slice())),
|
||||
extra_data: block.header().extra_data.clone(),
|
||||
})
|
||||
}
|
||||
@@ -207,7 +207,7 @@ where
|
||||
parent_hash: parent.hash(),
|
||||
parent_beacon_block_root: attributes.parent_beacon_block_root,
|
||||
ommers: &[],
|
||||
withdrawals: attributes.withdrawals.map(Cow::Owned),
|
||||
withdrawals: attributes.withdrawals.map(|w| Cow::Owned(w.into_inner())),
|
||||
extra_data: attributes.extra_data,
|
||||
})
|
||||
}
|
||||
@@ -287,7 +287,7 @@ where
|
||||
parent_hash: payload.parent_hash(),
|
||||
parent_beacon_block_root: payload.sidecar.parent_beacon_block_root(),
|
||||
ommers: &[],
|
||||
withdrawals: payload.payload.withdrawals().map(|w| Cow::Owned(w.clone().into())),
|
||||
withdrawals: payload.payload.withdrawals().map(|w| Cow::Borrowed(w.as_slice())),
|
||||
extra_data: payload.payload.as_v1().extra_data.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,221 +1,8 @@
|
||||
use crate::EthEvmConfig;
|
||||
use alloc::{boxed::Box, sync::Arc, vec, vec::Vec};
|
||||
use alloy_consensus::{Header, TxType};
|
||||
use alloy_eips::eip7685::Requests;
|
||||
use alloy_evm::precompiles::PrecompilesMap;
|
||||
use alloy_primitives::Bytes;
|
||||
use alloy_rpc_types_engine::ExecutionData;
|
||||
use parking_lot::Mutex;
|
||||
use reth_ethereum_primitives::{Receipt, TransactionSigned};
|
||||
use reth_evm::{
|
||||
block::{
|
||||
BlockExecutionError, BlockExecutor, BlockExecutorFactory, BlockExecutorFor, ExecutableTx,
|
||||
},
|
||||
eth::{EthBlockExecutionCtx, EthEvmContext, EthTxResult},
|
||||
ConfigureEngineEvm, ConfigureEvm, Database, EthEvm, EthEvmFactory, Evm, EvmEnvFor, EvmFactory,
|
||||
ExecutableTxIterator, ExecutionCtxFor, RecoveredTx,
|
||||
};
|
||||
use reth_execution_types::{BlockExecutionResult, ExecutionOutcome};
|
||||
use reth_primitives_traits::{BlockTy, SealedBlock, SealedHeader};
|
||||
use revm::{
|
||||
context::result::{ExecutionResult, HaltReason, Output, ResultAndState, SuccessReason},
|
||||
database::State,
|
||||
Inspector,
|
||||
};
|
||||
use reth_evm::noop::NoopEvmConfig;
|
||||
|
||||
/// A helper type alias for mocked block executor provider.
|
||||
pub type MockExecutorProvider = MockEvmConfig;
|
||||
|
||||
/// A block executor provider that returns mocked execution results.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MockEvmConfig {
|
||||
inner: EthEvmConfig,
|
||||
exec_results: Arc<Mutex<Vec<ExecutionOutcome>>>,
|
||||
}
|
||||
|
||||
impl Default for MockEvmConfig {
|
||||
fn default() -> Self {
|
||||
Self { inner: EthEvmConfig::mainnet(), exec_results: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
impl MockEvmConfig {
|
||||
/// Extend the mocked execution results
|
||||
pub fn extend(&self, results: impl IntoIterator<Item = impl Into<ExecutionOutcome>>) {
|
||||
self.exec_results.lock().extend(results.into_iter().map(Into::into));
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockExecutorFactory for MockEvmConfig {
|
||||
type EvmFactory = EthEvmFactory;
|
||||
type ExecutionCtx<'a> = EthBlockExecutionCtx<'a>;
|
||||
type Receipt = Receipt;
|
||||
type Transaction = TransactionSigned;
|
||||
|
||||
fn evm_factory(&self) -> &Self::EvmFactory {
|
||||
self.inner.evm_factory()
|
||||
}
|
||||
|
||||
fn create_executor<'a, DB, I>(
|
||||
&'a self,
|
||||
evm: EthEvm<&'a mut State<DB>, I, PrecompilesMap>,
|
||||
_ctx: Self::ExecutionCtx<'a>,
|
||||
) -> impl BlockExecutorFor<'a, Self, DB, I>
|
||||
where
|
||||
DB: Database + 'a,
|
||||
I: Inspector<<Self::EvmFactory as EvmFactory>::Context<&'a mut State<DB>>> + 'a,
|
||||
{
|
||||
MockExecutor {
|
||||
result: self.exec_results.lock().pop().unwrap(),
|
||||
evm,
|
||||
hook: None,
|
||||
receipts: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Mock executor that returns a fixed execution result.
|
||||
#[derive(derive_more::Debug)]
|
||||
pub struct MockExecutor<'a, DB: Database, I> {
|
||||
result: ExecutionOutcome,
|
||||
evm: EthEvm<&'a mut State<DB>, I, PrecompilesMap>,
|
||||
#[debug(skip)]
|
||||
hook: Option<Box<dyn reth_evm::OnStateHook>>,
|
||||
receipts: Vec<Receipt>,
|
||||
}
|
||||
|
||||
impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExecutor
|
||||
for MockExecutor<'a, DB, I>
|
||||
{
|
||||
type Evm = EthEvm<&'a mut State<DB>, I, PrecompilesMap>;
|
||||
type Transaction = TransactionSigned;
|
||||
type Receipt = Receipt;
|
||||
type Result = EthTxResult<HaltReason, TxType>;
|
||||
|
||||
fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn receipts(&self) -> &[Self::Receipt] {
|
||||
&self.receipts
|
||||
}
|
||||
|
||||
fn execute_transaction_without_commit(
|
||||
&mut self,
|
||||
tx: impl ExecutableTx<Self>,
|
||||
) -> Result<Self::Result, BlockExecutionError> {
|
||||
Ok(EthTxResult {
|
||||
result: ResultAndState::new(
|
||||
ExecutionResult::Success {
|
||||
reason: SuccessReason::Return,
|
||||
gas_used: 0,
|
||||
gas_refunded: 0,
|
||||
logs: vec![],
|
||||
output: Output::Call(Bytes::from(vec![])),
|
||||
},
|
||||
Default::default(),
|
||||
),
|
||||
tx_type: tx.into_parts().1.tx().tx_type(),
|
||||
blob_gas_used: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn commit_transaction(&mut self, _output: Self::Result) -> Result<u64, BlockExecutionError> {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
fn finish(
|
||||
self,
|
||||
) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
|
||||
let Self { result, mut evm, .. } = self;
|
||||
let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = result;
|
||||
let result = BlockExecutionResult {
|
||||
receipts: receipts.into_iter().flatten().collect(),
|
||||
requests: requests.into_iter().fold(Requests::default(), |mut reqs, req| {
|
||||
reqs.extend(req);
|
||||
reqs
|
||||
}),
|
||||
gas_used: 0,
|
||||
blob_gas_used: 0,
|
||||
};
|
||||
|
||||
evm.db_mut().bundle_state = bundle;
|
||||
|
||||
Ok((evm, result))
|
||||
}
|
||||
|
||||
fn set_state_hook(&mut self, hook: Option<Box<dyn reth_evm::OnStateHook>>) {
|
||||
self.hook = hook;
|
||||
}
|
||||
|
||||
fn evm(&self) -> &Self::Evm {
|
||||
&self.evm
|
||||
}
|
||||
|
||||
fn evm_mut(&mut self) -> &mut Self::Evm {
|
||||
&mut self.evm
|
||||
}
|
||||
}
|
||||
|
||||
impl ConfigureEvm for MockEvmConfig {
|
||||
type BlockAssembler = <EthEvmConfig as ConfigureEvm>::BlockAssembler;
|
||||
type BlockExecutorFactory = Self;
|
||||
type Error = <EthEvmConfig as ConfigureEvm>::Error;
|
||||
type NextBlockEnvCtx = <EthEvmConfig as ConfigureEvm>::NextBlockEnvCtx;
|
||||
type Primitives = <EthEvmConfig as ConfigureEvm>::Primitives;
|
||||
|
||||
fn block_executor_factory(&self) -> &Self::BlockExecutorFactory {
|
||||
self
|
||||
}
|
||||
|
||||
fn block_assembler(&self) -> &Self::BlockAssembler {
|
||||
self.inner.block_assembler()
|
||||
}
|
||||
|
||||
fn evm_env(&self, header: &Header) -> Result<EvmEnvFor<Self>, Self::Error> {
|
||||
self.inner.evm_env(header)
|
||||
}
|
||||
|
||||
fn next_evm_env(
|
||||
&self,
|
||||
parent: &Header,
|
||||
attributes: &Self::NextBlockEnvCtx,
|
||||
) -> Result<EvmEnvFor<Self>, Self::Error> {
|
||||
self.inner.next_evm_env(parent, attributes)
|
||||
}
|
||||
|
||||
fn context_for_block<'a>(
|
||||
&self,
|
||||
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
|
||||
) -> Result<reth_evm::ExecutionCtxFor<'a, Self>, Self::Error> {
|
||||
self.inner.context_for_block(block)
|
||||
}
|
||||
|
||||
fn context_for_next_block(
|
||||
&self,
|
||||
parent: &SealedHeader,
|
||||
attributes: Self::NextBlockEnvCtx,
|
||||
) -> Result<reth_evm::ExecutionCtxFor<'_, Self>, Self::Error> {
|
||||
self.inner.context_for_next_block(parent, attributes)
|
||||
}
|
||||
}
|
||||
|
||||
impl ConfigureEngineEvm<ExecutionData> for MockEvmConfig {
|
||||
fn evm_env_for_payload(&self, payload: &ExecutionData) -> Result<EvmEnvFor<Self>, Self::Error> {
|
||||
self.inner.evm_env_for_payload(payload)
|
||||
}
|
||||
|
||||
fn context_for_payload<'a>(
|
||||
&self,
|
||||
payload: &'a ExecutionData,
|
||||
) -> Result<ExecutionCtxFor<'a, Self>, Self::Error> {
|
||||
self.inner.context_for_payload(payload)
|
||||
}
|
||||
|
||||
fn tx_iterator_for_payload(
|
||||
&self,
|
||||
payload: &ExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
self.inner.tx_iterator_for_payload(payload)
|
||||
}
|
||||
}
|
||||
/// Mock for EVM config.
|
||||
pub type MockEvmConfig = NoopEvmConfig<EthEvmConfig>;
|
||||
|
||||
@@ -315,7 +315,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
|
||||
&'a self,
|
||||
evm: EvmFor<Self, &'a mut State<DB>, I>,
|
||||
ctx: <Self::BlockExecutorFactory as BlockExecutorFactory>::ExecutionCtx<'a>,
|
||||
) -> impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB, I>
|
||||
) -> impl BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>, I>
|
||||
where
|
||||
DB: Database,
|
||||
I: InspectorFor<Self, &'a mut State<DB>> + 'a,
|
||||
@@ -328,7 +328,8 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
|
||||
&'a self,
|
||||
db: &'a mut State<DB>,
|
||||
block: &'a SealedBlock<<Self::Primitives as NodePrimitives>::Block>,
|
||||
) -> Result<impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB>, Self::Error> {
|
||||
) -> Result<impl BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>>, Self::Error>
|
||||
{
|
||||
let evm = self.evm_for_block(db, block.header())?;
|
||||
let ctx = self.context_for_block(block)?;
|
||||
Ok(self.create_executor(evm, ctx))
|
||||
@@ -356,7 +357,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
|
||||
ctx: <Self::BlockExecutorFactory as BlockExecutorFactory>::ExecutionCtx<'a>,
|
||||
) -> impl BlockBuilder<
|
||||
Primitives = Self::Primitives,
|
||||
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, DB, I>,
|
||||
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>, I>,
|
||||
>
|
||||
where
|
||||
DB: Database,
|
||||
@@ -408,7 +409,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
|
||||
) -> Result<
|
||||
impl BlockBuilder<
|
||||
Primitives = Self::Primitives,
|
||||
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, DB>,
|
||||
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>>,
|
||||
>,
|
||||
Self::Error,
|
||||
> {
|
||||
|
||||
@@ -70,3 +70,27 @@ where
|
||||
self.inner().context_for_next_block(parent, attributes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<Inner, T> crate::ConfigureEngineEvm<T> for NoopEvmConfig<Inner>
|
||||
where
|
||||
Inner: crate::ConfigureEngineEvm<T>,
|
||||
{
|
||||
fn evm_env_for_payload(&self, payload: &T) -> Result<EvmEnvFor<Self>, Self::Error> {
|
||||
self.inner().evm_env_for_payload(payload)
|
||||
}
|
||||
|
||||
fn context_for_payload<'a>(
|
||||
&self,
|
||||
payload: &'a T,
|
||||
) -> Result<crate::ExecutionCtxFor<'a, Self>, Self::Error> {
|
||||
self.inner().context_for_payload(payload)
|
||||
}
|
||||
|
||||
fn tx_iterator_for_payload(
|
||||
&self,
|
||||
payload: &T,
|
||||
) -> Result<impl crate::ExecutableTxIterator<Self>, Self::Error> {
|
||||
self.inner().tx_iterator_for_payload(payload)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,6 +202,18 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
self.blocks().iter().map(|block| block.1)
|
||||
}
|
||||
|
||||
/// Returns an iterator over all transactions in the chain.
|
||||
pub fn transactions_iter(&self) -> impl Iterator<Item = &N::SignedTx> + '_ {
|
||||
self.blocks_iter().flat_map(|block| block.body().transactions())
|
||||
}
|
||||
|
||||
/// Returns an iterator over all [`Recovered`] transaction references in the chain.
|
||||
pub fn transactions_recovered_iter(
|
||||
&self,
|
||||
) -> impl Iterator<Item = Recovered<&N::SignedTx>> + '_ {
|
||||
self.blocks_iter().flat_map(|block| block.transactions_recovered())
|
||||
}
|
||||
|
||||
/// Returns an iterator over all blocks and their receipts in the chain.
|
||||
pub fn blocks_and_receipts(
|
||||
&self,
|
||||
|
||||
@@ -376,7 +376,7 @@ mod tests {
|
||||
);
|
||||
|
||||
for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
|
||||
pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
|
||||
pipeline_results.iter().zip(backfill_results).enumerate()
|
||||
{
|
||||
backfill_output.state.reverts.sort();
|
||||
|
||||
|
||||
@@ -505,9 +505,6 @@ where
|
||||
}
|
||||
let buffer_full = this.buffer.len() >= this.max_capacity;
|
||||
|
||||
// Update capacity
|
||||
this.update_capacity();
|
||||
|
||||
// Advance all poll senders
|
||||
let mut min_id = usize::MAX;
|
||||
for idx in (0..this.exex_handles.len()).rev() {
|
||||
|
||||
@@ -489,6 +489,8 @@ pub struct Discv4Service {
|
||||
lookup_interval: Interval,
|
||||
/// Used to rotate targets to lookup
|
||||
lookup_rotator: LookupTargetRotator,
|
||||
/// Whether we still need to reset the lookup interval on the first bootnode pong.
|
||||
pending_lookup_reset: bool,
|
||||
/// Interval when to recheck active requests
|
||||
evict_expired_requests_interval: Interval,
|
||||
/// Interval when to resend pings.
|
||||
@@ -503,6 +505,8 @@ pub struct Discv4Service {
|
||||
received_pongs: PongTable,
|
||||
/// Interval used to expire additionally tracked nodes
|
||||
expire_interval: Interval,
|
||||
/// Cached signed `FindNode` packet to avoid redundant ECDSA signing during lookups.
|
||||
cached_find_node: Option<CachedFindNode>,
|
||||
}
|
||||
|
||||
impl Discv4Service {
|
||||
@@ -598,11 +602,13 @@ impl Discv4Service {
|
||||
ping_interval,
|
||||
evict_expired_requests_interval,
|
||||
lookup_rotator,
|
||||
pending_lookup_reset: config.enable_lookup,
|
||||
resolve_external_ip_interval: config.resolve_external_ip_interval(),
|
||||
config,
|
||||
queued_events: Default::default(),
|
||||
received_pongs: Default::default(),
|
||||
expire_interval: tokio::time::interval(EXPIRE_DURATION),
|
||||
cached_find_node: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -811,9 +817,12 @@ impl Discv4Service {
|
||||
fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
|
||||
trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
|
||||
ctx.mark_queried(node.id);
|
||||
let id = ctx.target();
|
||||
let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
|
||||
self.send_packet(msg, node.udp_addr());
|
||||
let (payload, hash) = self.find_node_packet(ctx.target());
|
||||
let to = node.udp_addr();
|
||||
trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
|
||||
let _ = self.egress.try_send((payload, to)).map_err(|err| {
|
||||
debug!(target: "discv4", %err, "dropped outgoing packet");
|
||||
});
|
||||
self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
|
||||
}
|
||||
|
||||
@@ -955,10 +964,8 @@ impl Discv4Service {
|
||||
|
||||
// Check if ENR was updated
|
||||
match (last_enr_seq, old_enr) {
|
||||
(Some(new), Some(old)) => {
|
||||
if new > old {
|
||||
self.send_enr_request(record);
|
||||
}
|
||||
(Some(new), Some(old)) if new > old => {
|
||||
self.send_enr_request(record);
|
||||
}
|
||||
(Some(_), None) => {
|
||||
// got an ENR
|
||||
@@ -1076,6 +1083,19 @@ impl Discv4Service {
|
||||
hash
|
||||
}
|
||||
|
||||
/// Returns a signed `FindNode` packet for `target`, reusing a cached payload when possible.
|
||||
fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
|
||||
let expire = self.find_node_expiration();
|
||||
let cache_ttl = self.config.request_timeout / 4;
|
||||
CachedFindNode::get_or_sign(
|
||||
&mut self.cached_find_node,
|
||||
target,
|
||||
cache_ttl,
|
||||
&self.secret_key,
|
||||
expire,
|
||||
)
|
||||
}
|
||||
|
||||
/// Message handler for an incoming `Ping`
|
||||
fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
|
||||
if self.is_expired(ping.expire) {
|
||||
@@ -1195,10 +1215,8 @@ impl Discv4Service {
|
||||
} else {
|
||||
// Request ENR if included in the ping
|
||||
match (ping.enr_sq, old_enr) {
|
||||
(Some(new), Some(old)) => {
|
||||
if new > old {
|
||||
self.send_enr_request(record);
|
||||
}
|
||||
(Some(new), Some(old)) if new > old => {
|
||||
self.send_enr_request(record);
|
||||
}
|
||||
(Some(_), None) => {
|
||||
self.send_enr_request(record);
|
||||
@@ -1295,9 +1313,15 @@ impl Discv4Service {
|
||||
match reason {
|
||||
PingReason::InitialInsert => {
|
||||
self.update_on_pong(node, pong.enr_sq);
|
||||
// Reset the lookup interval so the next poll_tick fires immediately,
|
||||
// rather than waiting the full ~20s for the first lookup.
|
||||
if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
|
||||
self.pending_lookup_reset = false;
|
||||
self.lookup_interval.reset();
|
||||
}
|
||||
}
|
||||
PingReason::EstablishBond => {
|
||||
// same as `InitialInsert` which renews the bond if the peer is in the table
|
||||
// no initial lookup needed here since the node was already in the table.
|
||||
self.update_on_pong(node, pong.enr_sq);
|
||||
}
|
||||
PingReason::RePing => {
|
||||
@@ -1355,10 +1379,8 @@ impl Discv4Service {
|
||||
_ => return,
|
||||
};
|
||||
match (fork_id, old_fork_id) {
|
||||
(Some(new), Some(old)) => {
|
||||
if new != old {
|
||||
self.notify(DiscoveryUpdate::EnrForkId(record, new))
|
||||
}
|
||||
(Some(new), Some(old)) if new != old => {
|
||||
self.notify(DiscoveryUpdate::EnrForkId(record, new))
|
||||
}
|
||||
(Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
|
||||
_ => {}
|
||||
@@ -2288,6 +2310,41 @@ impl FindNodeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cached signed `FindNode` packet to avoid redundant ECDSA signing during Kademlia lookups.
|
||||
#[derive(Debug)]
|
||||
struct CachedFindNode {
|
||||
target: PeerId,
|
||||
payload: Bytes,
|
||||
hash: B256,
|
||||
cached_at: Instant,
|
||||
}
|
||||
|
||||
impl CachedFindNode {
|
||||
/// Returns the cached `(payload, hash)` if the target matches and the cache is still fresh,
|
||||
/// or signs a new packet, updates the cache, and returns it.
|
||||
fn get_or_sign(
|
||||
cache: &mut Option<Self>,
|
||||
target: PeerId,
|
||||
ttl: Duration,
|
||||
secret_key: &secp256k1::SecretKey,
|
||||
expire: u64,
|
||||
) -> (Bytes, B256) {
|
||||
if let Some(c) = cache.as_ref() &&
|
||||
c.target == target &&
|
||||
c.cached_at.elapsed() < ttl
|
||||
{
|
||||
return (c.payload.clone(), c.hash);
|
||||
}
|
||||
|
||||
let msg = Message::FindNode(FindNode { id: target, expire });
|
||||
let (payload, hash) = msg.encode(secret_key);
|
||||
|
||||
*cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
|
||||
|
||||
(payload, hash)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct EnrRequestState {
|
||||
// Timestamp when the request was sent.
|
||||
@@ -2961,27 +3018,21 @@ mod tests {
|
||||
let event = poll_fn(|cx| service_1.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Ping);
|
||||
|
||||
// we now wait for PONG
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
|
||||
match event {
|
||||
Discv4Event::EnrRequest => {
|
||||
// since we support enr in the ping it may also request the enr
|
||||
// Drain events from service_2 until we see the Pong. Intermediate EnrRequest and
|
||||
// FindNode events are expected: ENR requests come from the ping handshake, and FindNode
|
||||
// arrives because service_1 resets its lookup interval on the first bootnode pong.
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
match event {
|
||||
Discv4Event::EnrRequest => {
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Pong);
|
||||
}
|
||||
Discv4Event::Pong => {}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
Discv4Event::Pong => break,
|
||||
Discv4Event::EnrRequest | Discv4Event::FindNode => {}
|
||||
ev => unreachable!("{ev:?}"),
|
||||
}
|
||||
}
|
||||
Discv4Event::Pong => {}
|
||||
ev => unreachable!("{ev:?}"),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("timed out waiting for Pong from service_2");
|
||||
|
||||
// endpoint is proven
|
||||
match service_2.kbuckets.entry(&key1) {
|
||||
@@ -3064,4 +3115,109 @@ mod tests {
|
||||
// Assert bootnode did not appear in update stream
|
||||
assert!(bootnode_appeared, "Bootnode should appear in update stream");
|
||||
}
|
||||
|
||||
fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
|
||||
let key = kad_key(record.id);
|
||||
let _ = service.kbuckets.insert_or_update(
|
||||
&key,
|
||||
NodeEntry::new_proven(record),
|
||||
NodeStatus {
|
||||
direction: ConnectionDirection::Incoming,
|
||||
state: ConnectionState::Connected,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
|
||||
let echo_hash = B256::random();
|
||||
service.pending_pings.insert(
|
||||
record.id,
|
||||
PingRequest {
|
||||
sent_at: Instant::now(),
|
||||
node: record,
|
||||
echo_hash,
|
||||
reason: PingReason::InitialInsert,
|
||||
},
|
||||
);
|
||||
echo_hash
|
||||
}
|
||||
|
||||
fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
|
||||
Pong {
|
||||
to: rng_endpoint(&mut rand_08::thread_rng()),
|
||||
echo: echo_hash,
|
||||
expire: service.ping_expiration(),
|
||||
enr_sq: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_on_first_bootnode_pong() {
|
||||
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
let config = Discv4Config::builder().add_boot_node(record).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
// 1. initial state
|
||||
assert!(service.pending_lookup_reset);
|
||||
|
||||
// 2. setup: proven bootnode + pending InitialInsert ping
|
||||
insert_proven_node(&mut service, record);
|
||||
let echo_hash = insert_initial_ping(&mut service, record);
|
||||
|
||||
// 3. input: pong arrives
|
||||
service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
|
||||
|
||||
// 4. flag should be consumed — interval was reset
|
||||
assert!(!service.pending_lookup_reset, "flag should be consumed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_fires_only_once() {
|
||||
let records: Vec<_> = (0..2)
|
||||
.map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
|
||||
.collect();
|
||||
let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
// 1. setup: two proven bootnodes with pending InitialInsert pings
|
||||
for &r in &records {
|
||||
insert_proven_node(&mut service, r);
|
||||
}
|
||||
let hashes: Vec<_> =
|
||||
records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
|
||||
|
||||
// 2. first pong -> consumes the flag (resets the interval)
|
||||
service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
|
||||
assert!(!service.pending_lookup_reset);
|
||||
|
||||
// 3. second pong -> flag already consumed, no second reset
|
||||
service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
|
||||
assert!(!service.pending_lookup_reset);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_not_triggered_by_non_bootnode() {
|
||||
let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
let config = Discv4Config::builder().add_boot_node(bootnode).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
assert!(service.pending_lookup_reset);
|
||||
|
||||
// a non-bootnode pong should not consume the flag
|
||||
let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
insert_proven_node(&mut service, stranger);
|
||||
let echo_hash = insert_initial_ping(&mut service, stranger);
|
||||
service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
|
||||
|
||||
assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_disabled_when_lookup_disabled() {
|
||||
let config = Discv4Config::builder().enable_lookup(false).build();
|
||||
let (_discv4, service) = create_discv4_with_config(config).await;
|
||||
|
||||
// flag should be false when lookups are disabled
|
||||
assert!(!service.pending_lookup_reset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -580,7 +580,7 @@ pub fn spawn_populate_kbuckets_bg(
|
||||
let metrics = metrics.discovered_peers;
|
||||
let mut kbucket_index = MAX_KBUCKET_INDEX;
|
||||
let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
|
||||
task::spawn(Box::pin(async move {
|
||||
task::spawn(async move {
|
||||
// make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
|
||||
// log2distance from local node
|
||||
for i in (0..bootstrap_lookup_countdown).rev() {
|
||||
@@ -622,7 +622,7 @@ pub fn spawn_populate_kbuckets_bg(
|
||||
|
||||
tokio::time::sleep(lookup_interval).await;
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
/// Gets the next lookup target, based on which bucket is currently being targeted.
|
||||
|
||||
@@ -52,6 +52,7 @@ where
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.inner.clear();
|
||||
self.last_requested_block_number.take();
|
||||
self.metrics.clear();
|
||||
}
|
||||
/// Add new request to the queue.
|
||||
/// Expects a sorted list of headers.
|
||||
|
||||
@@ -60,6 +60,15 @@ impl BodyDownloaderMetrics {
|
||||
_error => self.unexpected_errors.increment(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear all gauge metrics by setting them to 0.
|
||||
pub fn clear(&self) {
|
||||
self.in_flight_requests.set(0);
|
||||
self.buffered_responses.set(0);
|
||||
self.buffered_blocks.set(0);
|
||||
self.buffered_blocks_size_bytes.set(0);
|
||||
self.queued_blocks.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for an individual response, i.e. the size in bytes, and length (number of bodies) in the
|
||||
|
||||
@@ -6,6 +6,7 @@ use alloc::vec::Vec;
|
||||
use alloy_eips::BlockHashOrNumber;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper};
|
||||
use derive_more::{Deref, IntoIterator};
|
||||
use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
|
||||
|
||||
/// A request for a peer to return block headers starting at the requested block.
|
||||
@@ -39,7 +40,17 @@ pub struct GetBlockHeaders {
|
||||
}
|
||||
|
||||
/// The response to [`GetBlockHeaders`], containing headers if any headers were found.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
pub struct BlockHeaders<H = alloy_consensus::Header>(
|
||||
@@ -56,7 +67,17 @@ impl<H> From<Vec<H>> for BlockHeaders<H> {
|
||||
}
|
||||
|
||||
/// A request for a peer to return block bodies for the given block hashes.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -73,7 +94,17 @@ impl From<Vec<B256>> for GetBlockBodies {
|
||||
|
||||
/// The response to [`GetBlockBodies`], containing the block bodies that the peer knows about if
|
||||
/// any were found.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
pub struct BlockBodies<B = reth_ethereum_primitives::BlockBody>(
|
||||
|
||||
@@ -16,7 +16,17 @@ use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_primitives_traits::{Block, SignedTransaction};
|
||||
|
||||
/// This informs peers of new blocks that have appeared on the network.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -31,12 +41,7 @@ pub struct NewBlockHashes(
|
||||
impl NewBlockHashes {
|
||||
/// Returns the latest block in the list of blocks.
|
||||
pub fn latest(&self) -> Option<&BlockHashNumber> {
|
||||
self.0.iter().fold(None, |latest, block| {
|
||||
if let Some(latest) = latest {
|
||||
return if latest.number > block.number { Some(latest) } else { Some(block) }
|
||||
}
|
||||
Some(block)
|
||||
})
|
||||
self.iter().max_by_key(|b| b.number)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,7 +104,17 @@ generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBloc
|
||||
|
||||
/// This informs peers of transactions that have appeared on the network and are not yet included
|
||||
/// in a block.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp, 10)]
|
||||
@@ -111,7 +126,7 @@ pub struct Transactions<T = TransactionSigned>(
|
||||
impl<T: SignedTransaction> Transactions<T> {
|
||||
/// Returns `true` if the list of transactions contains any blob transactions.
|
||||
pub fn has_eip4844(&self) -> bool {
|
||||
self.0.iter().any(|tx| tx.is_eip4844())
|
||||
self.iter().any(|tx| tx.is_eip4844())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,7 +146,9 @@ impl<T> From<Transactions<T>> for Vec<T> {
|
||||
///
|
||||
/// The list of transactions is constructed on per-peers basis, but the underlying transaction
|
||||
/// objects are shared.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)]
|
||||
#[derive(
|
||||
Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Deref, IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp, 20)]
|
||||
pub struct SharedTransactions<T = TransactionSigned>(
|
||||
@@ -180,7 +197,7 @@ impl NewPooledTransactionHashes {
|
||||
/// Returns an iterator over all transaction hashes.
|
||||
pub fn iter_hashes(&self) -> impl Iterator<Item = &B256> + '_ {
|
||||
match self {
|
||||
Self::Eth66(msg) => msg.0.iter(),
|
||||
Self::Eth66(msg) => msg.iter(),
|
||||
Self::Eth68(msg) => msg.hashes.iter(),
|
||||
}
|
||||
}
|
||||
@@ -212,7 +229,7 @@ impl NewPooledTransactionHashes {
|
||||
/// Returns an iterator over all transaction hashes.
|
||||
pub fn into_iter_hashes(self) -> impl Iterator<Item = B256> {
|
||||
match self {
|
||||
Self::Eth66(msg) => msg.0.into_iter(),
|
||||
Self::Eth66(msg) => msg.into_iter(),
|
||||
Self::Eth68(msg) => msg.hashes.into_iter(),
|
||||
}
|
||||
}
|
||||
@@ -310,7 +327,17 @@ impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
|
||||
|
||||
/// This informs peers of transaction hashes for transactions that have appeared on the network,
|
||||
/// but have not been included in a block.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
|
||||
@@ -4,11 +4,22 @@ use alloc::vec::Vec;
|
||||
use alloy_consensus::{ReceiptWithBloom, RlpDecodableReceipt, RlpEncodableReceipt, TxReceipt};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
|
||||
use derive_more::{Deref, IntoIterator};
|
||||
use reth_codecs_derive::add_arbitrary_tests;
|
||||
use reth_ethereum_primitives::Receipt;
|
||||
|
||||
/// A request for transaction receipts from the given block hashes.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -55,7 +66,7 @@ impl alloy_rlp::Decodable for GetReceipts70 {
|
||||
|
||||
/// The response to [`GetReceipts`], containing receipt lists that correspond to each block
|
||||
/// requested.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Default)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Default, Deref, IntoIterator)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -85,7 +96,9 @@ impl<T: RlpDecodableReceipt> alloy_rlp::Decodable for Receipts<T> {
|
||||
/// Eth/69 receipt response type that removes bloom filters from the protocol.
|
||||
///
|
||||
/// This is effectively a subset of [`Receipts`].
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)]
|
||||
#[derive(
|
||||
Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Deref, IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -103,8 +116,7 @@ impl<T: TxReceipt> Receipts69<T> {
|
||||
/// every receipt.
|
||||
pub fn into_with_bloom(self) -> Receipts<T> {
|
||||
Receipts(
|
||||
self.0
|
||||
.into_iter()
|
||||
self.into_iter()
|
||||
.map(|receipts| receipts.into_iter().map(|r| r.into_with_bloom()).collect())
|
||||
.collect(),
|
||||
)
|
||||
|
||||
@@ -3,12 +3,23 @@
|
||||
use alloc::vec::Vec;
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
|
||||
use derive_more::{Deref, IntoIterator};
|
||||
use reth_codecs_derive::add_arbitrary_tests;
|
||||
|
||||
/// A request for state tree nodes corresponding to the given hashes.
|
||||
/// This message was removed in `eth/67`, only clients running `eth/66` or earlier will respond to
|
||||
/// this message.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -19,7 +30,17 @@ pub struct GetNodeData(pub Vec<B256>);
|
||||
///
|
||||
/// Not all nodes are guaranteed to be returned by the peer.
|
||||
/// This message was removed in `eth/67`.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
|
||||
@@ -9,7 +9,17 @@ use derive_more::{Constructor, Deref, IntoIterator};
|
||||
use reth_codecs_derive::add_arbitrary_tests;
|
||||
|
||||
/// A list of transaction hashes that the peer would like transaction bodies for.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Eq,
|
||||
RlpEncodableWrapper,
|
||||
RlpDecodableWrapper,
|
||||
Default,
|
||||
Deref,
|
||||
IntoIterator,
|
||||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
#[add_arbitrary_tests(rlp)]
|
||||
@@ -55,7 +65,7 @@ pub struct PooledTransactions<T = PooledTransaction>(
|
||||
impl<T: Encodable2718> PooledTransactions<T> {
|
||||
/// Returns an iterator over the transaction hashes in this response.
|
||||
pub fn hashes(&self) -> impl Iterator<Item = B256> + '_ {
|
||||
self.0.iter().map(|tx| tx.trie_hash())
|
||||
self.iter().map(|tx| tx.trie_hash())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,10 +35,11 @@ pub enum EthVersion {
|
||||
|
||||
impl EthVersion {
|
||||
/// The latest known eth version
|
||||
pub const LATEST: Self = Self::Eth69;
|
||||
pub const LATEST: Self = Self::Eth70;
|
||||
|
||||
/// All known eth versions
|
||||
pub const ALL_VERSIONS: &'static [Self] = &[Self::Eth69, Self::Eth68, Self::Eth67, Self::Eth66];
|
||||
pub const ALL_VERSIONS: &'static [Self] =
|
||||
&[Self::Eth70, Self::Eth69, Self::Eth68, Self::Eth67, Self::Eth66];
|
||||
|
||||
/// Returns true if the version is eth/66
|
||||
pub const fn is_eth66(&self) -> bool {
|
||||
|
||||
@@ -121,7 +121,7 @@ impl<St> RlpxProtocolMultiplexer<St> {
|
||||
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
|
||||
P2PStreamError: Into<Err>,
|
||||
{
|
||||
self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move {
|
||||
self.into_satellite_stream_with_tuple_handshake(cap, async move |proxy| {
|
||||
let st = handshake(proxy).await?;
|
||||
Ok((st, ()))
|
||||
})
|
||||
@@ -223,17 +223,18 @@ impl<St> RlpxProtocolMultiplexer<St> {
|
||||
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
|
||||
{
|
||||
let eth_cap = self.inner.conn.shared_capabilities().eth_version()?;
|
||||
self.into_satellite_stream_with_tuple_handshake(&Capability::eth(eth_cap), move |proxy| {
|
||||
let handshake = handshake.clone();
|
||||
async move {
|
||||
self.into_satellite_stream_with_tuple_handshake(
|
||||
&Capability::eth(eth_cap),
|
||||
async move |proxy| {
|
||||
let handshake = handshake.clone();
|
||||
let mut unauth = UnauthProxy { inner: proxy };
|
||||
let their_status = handshake
|
||||
.handshake(&mut unauth, status, fork_filter, HANDSHAKE_TIMEOUT)
|
||||
.await?;
|
||||
let eth_stream = EthStream::new(eth_cap, unauth.into_inner());
|
||||
Ok((eth_stream, their_status))
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -812,14 +813,11 @@ mod tests {
|
||||
|
||||
let multiplexer = RlpxProtocolMultiplexer::new(conn);
|
||||
let _satellite = multiplexer
|
||||
.into_satellite_stream_with_handshake(
|
||||
eth.capability().as_ref(),
|
||||
move |proxy| async move {
|
||||
UnauthedEthStream::new(proxy)
|
||||
.handshake::<EthNetworkPrimitives>(status, fork_filter)
|
||||
.await
|
||||
},
|
||||
)
|
||||
.into_satellite_stream_with_handshake(eth.capability().as_ref(), async move |proxy| {
|
||||
UnauthedEthStream::new(proxy)
|
||||
.handshake::<EthNetworkPrimitives>(status, fork_filter)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -940,7 +940,7 @@ mod tests {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let handle = tokio::spawn(Box::pin(async move {
|
||||
let handle = tokio::spawn(async move {
|
||||
// roughly based off of the design of tokio::net::TcpListener
|
||||
let (incoming, _) = listener.accept().await.unwrap();
|
||||
let stream = crate::PassthroughCodec::default().framed(incoming);
|
||||
@@ -960,7 +960,7 @@ mod tests {
|
||||
panic!("expected mismatched protocol version error, got {other_err:?}")
|
||||
}
|
||||
}
|
||||
}));
|
||||
});
|
||||
|
||||
let outgoing = TcpStream::connect(local_addr).await.unwrap();
|
||||
let sink = crate::PassthroughCodec::default().framed(outgoing);
|
||||
|
||||
@@ -97,9 +97,6 @@ alloy-genesis.workspace = true
|
||||
url.workspace = true
|
||||
secp256k1 = { workspace = true, features = ["rand"] }
|
||||
|
||||
## Benchmarks
|
||||
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
|
||||
|
||||
[features]
|
||||
serde = [
|
||||
"dep:serde",
|
||||
@@ -141,13 +138,3 @@ test-utils = [
|
||||
"reth-evm-ethereum?/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
|
||||
[[bench]]
|
||||
name = "broadcast"
|
||||
required-features = ["test-utils"]
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "tx_manager_hash_fetching"
|
||||
required-features = ["test-utils"]
|
||||
harness = false
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
#![allow(missing_docs)]
|
||||
use alloy_primitives::U256;
|
||||
use criterion::*;
|
||||
use futures::StreamExt;
|
||||
use rand::SeedableRng;
|
||||
use reth_network::{test_utils::Testnet, NetworkEventListenerProvider};
|
||||
use reth_network_api::Peers;
|
||||
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
|
||||
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction};
|
||||
use std::sync::Arc;
|
||||
use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc::unbounded_channel};
|
||||
|
||||
criterion_group!(
|
||||
name = broadcast_benches;
|
||||
config = Criterion::default();
|
||||
targets = broadcast_ingress_bench
|
||||
);
|
||||
|
||||
pub fn broadcast_ingress_bench(c: &mut Criterion) {
|
||||
let rt = TokioRuntime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("Broadcast Ingress");
|
||||
group.sample_size(10);
|
||||
group.bench_function("receive_broadcasts", move |b| {
|
||||
b.to_async(&rt).iter_with_setup(
|
||||
|| {
|
||||
// `b.to_async(rt)` automatically enters the
|
||||
// runtime context and simply calling `block_on` here will cause the code to panic.
|
||||
tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(async {
|
||||
let provider = MockEthProvider::default();
|
||||
let mut net = Testnet::create_with(2, provider.clone()).await;
|
||||
|
||||
let mut peer0 = net.remove_peer(0);
|
||||
let (tx, transactions_rx) = unbounded_channel();
|
||||
peer0.network_mut().set_transactions(tx);
|
||||
let mut events0 = peer0.handle().event_listener();
|
||||
let net = net.with_eth_pool();
|
||||
let handle = net.spawn();
|
||||
let peer1 = handle.peers()[0].network().clone();
|
||||
let peer0_id = peer0.peer_id();
|
||||
peer1.add_peer(peer0_id, peer0.local_addr());
|
||||
|
||||
// await connection
|
||||
tokio::select! {
|
||||
_ = events0.next() => {}
|
||||
_ = &mut peer0 => {}
|
||||
}
|
||||
|
||||
// prepare some transactions
|
||||
let mut tx_gen =
|
||||
TransactionGenerator::new(rand::rngs::StdRng::seed_from_u64(0));
|
||||
let num_broadcasts = 10;
|
||||
for _ in 0..num_broadcasts {
|
||||
for _ in 0..2 {
|
||||
let mut txs = Vec::new();
|
||||
let tx = tx_gen.gen_eip1559_pooled();
|
||||
// ensure the sender has balance
|
||||
provider.add_account(
|
||||
tx.sender(),
|
||||
ExtendedAccount::new(0, U256::from(100_000_000)),
|
||||
);
|
||||
txs.push(Arc::new(tx.transaction().clone().into_inner()));
|
||||
peer1.send_transactions(peer0_id, txs);
|
||||
}
|
||||
}
|
||||
(num_broadcasts, transactions_rx, peer0, handle)
|
||||
})
|
||||
})
|
||||
},
|
||||
|(num_txs, mut transactions_rx, mut peer0, _handle)| async move {
|
||||
let mut count = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = transactions_rx.recv() => {
|
||||
count += 1;
|
||||
if count == num_txs {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = &mut peer0 => {
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
criterion_main!(broadcast_benches);
|
||||
@@ -1,142 +0,0 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use alloy_primitives::{B256, U256};
|
||||
use criterion::{measurement::WallTime, *};
|
||||
use rand::SeedableRng;
|
||||
use reth_eth_wire::EthVersion;
|
||||
use reth_eth_wire_types::EthNetworkPrimitives;
|
||||
use reth_network::{
|
||||
test_utils::{
|
||||
transactions::{buffer_hash_to_tx_fetcher, new_mock_session},
|
||||
Testnet,
|
||||
},
|
||||
transactions::{
|
||||
fetcher::TransactionFetcher, TransactionFetcherConfig, TransactionPropagationMode::Max,
|
||||
TransactionsManagerConfig,
|
||||
},
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
|
||||
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool};
|
||||
use std::collections::HashMap;
|
||||
use tokio::runtime::Runtime as TokioRuntime;
|
||||
|
||||
criterion_group!(
|
||||
name = tx_fetch_benches;
|
||||
config = Criterion::default();
|
||||
targets = tx_fetch_bench, fetch_pending_hashes,
|
||||
);
|
||||
|
||||
pub fn benchmark_fetch_pending_hashes(group: &mut BenchmarkGroup<'_, WallTime>, peers_num: usize) {
|
||||
let mut tx_fetcher = TransactionFetcher::<EthNetworkPrimitives>::default();
|
||||
let mut peers = HashMap::default();
|
||||
|
||||
for _i in 0..peers_num {
|
||||
// NOTE: the worst case, each tx in the cache belongs to a different peer.
|
||||
let peer = PeerId::random();
|
||||
let hash = B256::random();
|
||||
|
||||
let (mut peer_data, _) = new_mock_session(peer, EthVersion::Eth66);
|
||||
peer_data.seen_transactions_mut().insert(hash);
|
||||
peers.insert(peer, peer_data);
|
||||
|
||||
buffer_hash_to_tx_fetcher(&mut tx_fetcher, hash, peer, 0, None);
|
||||
}
|
||||
|
||||
let group_id = format!("fetch pending hashes, peers num: {peers_num}");
|
||||
|
||||
group.bench_function(group_id, |b| {
|
||||
b.iter(|| {
|
||||
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
pub fn fetch_pending_hashes(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("Fetch Pending Hashes");
|
||||
|
||||
for peers in [5, 10, 20, 100, 1000, 10000, 100000] {
|
||||
benchmark_fetch_pending_hashes(&mut group, peers);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
pub fn tx_fetch_bench(c: &mut Criterion) {
|
||||
let rt = TokioRuntime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("Transaction Fetch");
|
||||
group.sample_size(30);
|
||||
|
||||
group.bench_function("fetch_transactions", |b| {
|
||||
b.to_async(&rt).iter_with_setup(
|
||||
|| {
|
||||
tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(async {
|
||||
let tx_manager_config = TransactionsManagerConfig {
|
||||
propagation_mode: Max(0),
|
||||
transaction_fetcher_config: TransactionFetcherConfig {
|
||||
max_inflight_requests: 1,
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let provider = MockEthProvider::default();
|
||||
let num_peers = 10;
|
||||
let net = Testnet::create_with(num_peers, provider.clone()).await;
|
||||
|
||||
// install request handlers
|
||||
let net = net.with_eth_pool_config(tx_manager_config);
|
||||
let handle = net.spawn();
|
||||
|
||||
// connect all the peers first
|
||||
handle.connect_peers().await;
|
||||
|
||||
let listening_peer = &handle.peers()[num_peers - 1];
|
||||
let listening_peer_tx_listener =
|
||||
listening_peer.pool().unwrap().pending_transactions_listener();
|
||||
|
||||
let num_tx_per_peer = 10;
|
||||
|
||||
for i in 1..num_peers {
|
||||
let peer = &handle.peers()[i];
|
||||
let peer_pool = peer.pool().unwrap();
|
||||
|
||||
for _ in 0..num_tx_per_peer {
|
||||
let mut tx_gen =
|
||||
TransactionGenerator::new(rand::rngs::StdRng::seed_from_u64(0));
|
||||
|
||||
let tx = tx_gen.gen_eip1559_pooled();
|
||||
let sender = tx.sender();
|
||||
provider.add_account(
|
||||
sender,
|
||||
ExtendedAccount::new(0, U256::from(100_000_000)),
|
||||
);
|
||||
peer_pool.add_external_transaction(tx.clone()).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Total expected transactions
|
||||
let total_expected_tx = num_tx_per_peer * (num_peers - 1);
|
||||
|
||||
(listening_peer_tx_listener, total_expected_tx)
|
||||
})
|
||||
})
|
||||
},
|
||||
|(mut listening_peer_tx_listener, total_expected_tx)| async move {
|
||||
let mut received_tx = 0;
|
||||
while listening_peer_tx_listener.recv().await.is_some() {
|
||||
received_tx += 1;
|
||||
if received_tx >= total_expected_tx {
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_main!(tx_fetch_benches);
|
||||
@@ -301,6 +301,20 @@ impl Discovery {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Discovery {
|
||||
fn drop(&mut self) {
|
||||
if let Some(discv4) = &self.discv4 {
|
||||
discv4.terminate();
|
||||
}
|
||||
if let Some(handle) = self._discv4_service.take() {
|
||||
handle.abort();
|
||||
}
|
||||
if let Some(handle) = self._dns_disc_service.take() {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Discovery {
|
||||
type Item = DiscoveryEvent;
|
||||
|
||||
|
||||
@@ -171,7 +171,7 @@ where
|
||||
|
||||
let mut total_bytes = 0;
|
||||
|
||||
for hash in request.0 {
|
||||
for hash in request {
|
||||
if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
|
||||
let body = block.into_body();
|
||||
total_bytes += body.length();
|
||||
@@ -304,7 +304,7 @@ where
|
||||
let mut receipts = Vec::new();
|
||||
let mut total_bytes = 0;
|
||||
|
||||
for hash in request.0 {
|
||||
for hash in request {
|
||||
if let Some(receipts_by_block) =
|
||||
self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
|
||||
{
|
||||
|
||||
@@ -11,6 +11,7 @@ use reth_network_p2p::{
|
||||
error::{PeerRequestResult, RequestError},
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
priority::Priority,
|
||||
receipts::client::{ReceiptsClient, ReceiptsFut},
|
||||
BlockClient,
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
@@ -102,6 +103,24 @@ impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> ReceiptsClient for FetchClient<N> {
|
||||
type Receipt = N::Receipt;
|
||||
type Output = ReceiptsFut<N::Receipt>;
|
||||
|
||||
fn get_receipts_with_priority(&self, request: Vec<B256>, priority: Priority) -> Self::Output {
|
||||
let (response, rx) = oneshot::channel();
|
||||
if self
|
||||
.request_tx
|
||||
.send(DownloadRequest::GetReceipts { request, response, priority })
|
||||
.is_ok()
|
||||
{
|
||||
Box::pin(FlattenedResponse::from(rx))
|
||||
} else {
|
||||
Box::pin(future::err(RequestError::ChannelClosed))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
|
||||
type Block = N::Block;
|
||||
}
|
||||
|
||||
@@ -8,13 +8,15 @@ use crate::{message::BlockRequest, session::BlockRangeInfo};
|
||||
use alloy_primitives::B256;
|
||||
use futures::StreamExt;
|
||||
use reth_eth_wire::{
|
||||
Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
|
||||
Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts,
|
||||
NetworkPrimitives,
|
||||
};
|
||||
use reth_network_api::test_utils::PeersHandle;
|
||||
use reth_network_p2p::{
|
||||
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
|
||||
headers::client::HeadersRequest,
|
||||
priority::Priority,
|
||||
receipts::client::ReceiptsResponse,
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::ReputationChangeKind;
|
||||
@@ -32,6 +34,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
|
||||
type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
|
||||
type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
|
||||
|
||||
/// Manages data fetching operations.
|
||||
///
|
||||
@@ -45,6 +48,8 @@ pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
|
||||
/// Currently active [`GetBlockBodies`] requests
|
||||
inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
|
||||
/// Currently active `GetReceipts` requests
|
||||
inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
|
||||
/// The list of _available_ peers for requests.
|
||||
peers: HashMap<PeerId, Peer>,
|
||||
/// The handle to the peers manager
|
||||
@@ -67,6 +72,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
Self {
|
||||
inflight_headers_requests: Default::default(),
|
||||
inflight_bodies_requests: Default::default(),
|
||||
inflight_receipts_requests: Default::default(),
|
||||
peers: Default::default(),
|
||||
peers_handle,
|
||||
num_active_peers,
|
||||
@@ -114,6 +120,9 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
if let Some(req) = self.inflight_bodies_requests.remove(peer) {
|
||||
let _ = req.response.send(Err(RequestError::ConnectionDropped));
|
||||
}
|
||||
if let Some(req) = self.inflight_receipts_requests.remove(peer) {
|
||||
let _ = req.response.send(Err(RequestError::ConnectionDropped));
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the block information for the peer.
|
||||
@@ -256,6 +265,11 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
self.inflight_bodies_requests.insert(peer_id, inflight);
|
||||
BlockRequest::GetBlockBodies(GetBlockBodies(request))
|
||||
}
|
||||
DownloadRequest::GetReceipts { request, response, .. } => {
|
||||
let inflight = Request { request: (), response };
|
||||
self.inflight_receipts_requests.insert(peer_id, inflight);
|
||||
BlockRequest::GetReceipts(GetReceipts(request))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,6 +344,30 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Called on a `GetReceipts` response from a peer.
|
||||
///
|
||||
/// All receipt variants (legacy with bloom, eth/69, eth/70) are expected to be normalized
|
||||
/// to [`ReceiptsResponse`] by the caller before invoking this method.
|
||||
pub(crate) fn on_receipts_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
res: RequestResult<ReceiptsResponse<N::Receipt>>,
|
||||
) -> Option<BlockResponseOutcome> {
|
||||
let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
|
||||
|
||||
if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
|
||||
let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
|
||||
}
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
peer.last_response_likely_bad = is_likely_bad_response;
|
||||
|
||||
if peer.state.on_request_finished() && !is_likely_bad_response {
|
||||
return self.followup_request(peer_id)
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns a new [`FetchClient`] that can send requests to this type.
|
||||
pub(crate) fn client(&self) -> FetchClient<N> {
|
||||
FetchClient {
|
||||
@@ -453,6 +491,8 @@ enum PeerState {
|
||||
GetBlockHeaders,
|
||||
/// Peer is handling a `GetBlockBodies` request.
|
||||
GetBlockBodies,
|
||||
/// Peer is handling a `GetReceipts` request.
|
||||
GetReceipts,
|
||||
/// Peer session is about to close
|
||||
Closing,
|
||||
}
|
||||
@@ -491,6 +531,7 @@ struct Request<Req, Resp> {
|
||||
|
||||
/// Requests that can be sent to the Syncer from a [`FetchClient`]
|
||||
#[derive(Debug)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
|
||||
/// Download the requested headers and send response through channel
|
||||
GetBlockHeaders {
|
||||
@@ -498,13 +539,19 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
|
||||
response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
|
||||
priority: Priority,
|
||||
},
|
||||
/// Download the requested headers and send response through channel
|
||||
/// Download the requested bodies and send response through channel
|
||||
GetBlockBodies {
|
||||
request: Vec<B256>,
|
||||
response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
|
||||
priority: Priority,
|
||||
range_hint: Option<RangeInclusive<u64>>,
|
||||
},
|
||||
/// Download receipts for the given block hashes and send response through channel
|
||||
GetReceipts {
|
||||
request: Vec<B256>,
|
||||
response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
|
||||
priority: Priority,
|
||||
},
|
||||
}
|
||||
|
||||
// === impl DownloadRequest ===
|
||||
@@ -515,15 +562,16 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
|
||||
match self {
|
||||
Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
|
||||
Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
|
||||
Self::GetReceipts { .. } => PeerState::GetReceipts,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the requested priority of this request
|
||||
const fn get_priority(&self) -> &Priority {
|
||||
match self {
|
||||
Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
|
||||
priority
|
||||
}
|
||||
Self::GetBlockHeaders { priority, .. } |
|
||||
Self::GetBlockBodies { priority, .. } |
|
||||
Self::GetReceipts { priority, .. } => priority,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -543,6 +591,7 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
|
||||
BestPeerRequirements::FullBlock
|
||||
}
|
||||
}
|
||||
Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1146,4 +1195,178 @@ mod tests {
|
||||
!peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
|
||||
);
|
||||
}
|
||||
|
||||
/// Creates a `StateFetcher` with a single idle peer and returns both.
|
||||
fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
|
||||
let manager = PeersManager::new(PeersConfig::default());
|
||||
let mut fetcher =
|
||||
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
|
||||
let peer_id = B512::random();
|
||||
|
||||
fetcher.new_active_peer(
|
||||
peer_id,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Arc::new(Capabilities::from(vec![])),
|
||||
Default::default(),
|
||||
None,
|
||||
);
|
||||
(fetcher, peer_id)
|
||||
}
|
||||
|
||||
/// Inserts an inflight receipts request into the fetcher and returns the
|
||||
/// `oneshot::Receiver` that the final response will be sent through.
|
||||
fn insert_inflight_receipts(
|
||||
fetcher: &mut StateFetcher<EthNetworkPrimitives>,
|
||||
peer_id: PeerId,
|
||||
) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
|
||||
fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
|
||||
rx
|
||||
}
|
||||
|
||||
// ---- Receipts: basic dispatch ----
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_poll_dispatches_receipts_to_peer() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let (tx, _rx) = oneshot::channel();
|
||||
fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
|
||||
request: vec![B256::ZERO],
|
||||
response: tx,
|
||||
priority: Priority::default(),
|
||||
});
|
||||
|
||||
let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
|
||||
fetcher.poll(cx)
|
||||
else {
|
||||
panic!("expected Ready(BlockRequest)");
|
||||
};
|
||||
assert_eq!(dispatched_peer, peer_id);
|
||||
assert!(matches!(request, BlockRequest::GetReceipts(_)));
|
||||
|
||||
// Peer should now be in GetReceipts state
|
||||
assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
|
||||
// Inflight request should be tracked
|
||||
assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
// ---- Receipts: response handling ----
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipts_complete_response_resolves_and_idles_peer() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
|
||||
let rx = insert_inflight_receipts(&mut fetcher, peer_id);
|
||||
|
||||
let resp = ReceiptsResponse::new(vec![vec![]]);
|
||||
let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
|
||||
|
||||
// No queued requests, so no followup
|
||||
assert!(outcome.is_none());
|
||||
// Peer back to idle
|
||||
assert!(fetcher.peers[&peer_id].state.is_idle());
|
||||
// Inflight cleaned up
|
||||
assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
|
||||
|
||||
// Caller receives the response
|
||||
let result = rx.await.unwrap().unwrap();
|
||||
assert_eq!(result.1.receipts.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipts_empty_response_marks_peer_bad() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
|
||||
|
||||
let resp = ReceiptsResponse::new(vec![]);
|
||||
let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
|
||||
|
||||
assert!(fetcher.peers[&peer_id].last_response_likely_bad);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipts_error_forwards_and_marks_peer_bad() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
let rx = insert_inflight_receipts(&mut fetcher, peer_id);
|
||||
|
||||
let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
|
||||
|
||||
assert!(fetcher.peers[&peer_id].last_response_likely_bad);
|
||||
// Error is forwarded to the caller
|
||||
let result = rx.await.unwrap();
|
||||
assert_eq!(result.unwrap_err(), RequestError::Timeout);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_session_closed_cancels_inflight_receipts() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
let rx = insert_inflight_receipts(&mut fetcher, peer_id);
|
||||
|
||||
fetcher.on_session_closed(&peer_id);
|
||||
|
||||
assert!(!fetcher.peers.contains_key(&peer_id));
|
||||
assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
|
||||
|
||||
let result = rx.await.unwrap();
|
||||
assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipts_response_triggers_followup() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
|
||||
// Queue a bodies request as a followup candidate
|
||||
let (followup_tx, _followup_rx) = oneshot::channel();
|
||||
fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
|
||||
request: vec![B256::random()],
|
||||
response: followup_tx,
|
||||
priority: Priority::default(),
|
||||
range_hint: None,
|
||||
});
|
||||
|
||||
let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
|
||||
|
||||
let resp = ReceiptsResponse::new(vec![vec![]]);
|
||||
let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
|
||||
|
||||
assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prepare_block_request_creates_inflight_receipts() {
|
||||
let (mut fetcher, peer_id) = fetcher_with_peer();
|
||||
let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
|
||||
|
||||
let (tx, _rx) = oneshot::channel();
|
||||
let req = DownloadRequest::GetReceipts {
|
||||
request: hashes.clone(),
|
||||
response: tx,
|
||||
priority: Priority::default(),
|
||||
};
|
||||
|
||||
let block_request = fetcher.prepare_block_request(peer_id, req);
|
||||
|
||||
// Returns a GetReceipts block request with the same hashes
|
||||
match block_request {
|
||||
BlockRequest::GetReceipts(ref get) => {
|
||||
assert_eq!(get.0, hashes);
|
||||
}
|
||||
other => panic!("expected GetReceipts, got {other:?}"),
|
||||
}
|
||||
|
||||
// Peer state transitions to GetReceipts
|
||||
assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
|
||||
|
||||
// Inflight request is tracked
|
||||
assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -431,19 +431,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
|
||||
/// Returns an iterator over all peers in the peer set.
|
||||
pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
|
||||
self.swarm.state().peers().iter_peers()
|
||||
self.swarm.peers().iter_peers()
|
||||
}
|
||||
|
||||
/// Returns the number of peers in the peer set.
|
||||
pub fn num_known_peers(&self) -> usize {
|
||||
self.swarm.state().peers().num_known_peers()
|
||||
self.swarm.peers().num_known_peers()
|
||||
}
|
||||
|
||||
/// Returns a new [`PeersHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`PeersHandle`] can be used to interact with the network's peer set.
|
||||
pub fn peers_handle(&self) -> PeersHandle {
|
||||
self.swarm.state().peers().handle()
|
||||
self.swarm.peers().handle()
|
||||
}
|
||||
|
||||
/// Collect the peers from the [`NetworkManager`] and write them to the given
|
||||
@@ -452,7 +452,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
/// Only persists peers that are not currently backed off or banned. Includes metadata like
|
||||
/// peer kind, fork ID, and reputation.
|
||||
pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
|
||||
let peers = self.swarm.state().peers().persistable_peers().collect::<Vec<_>>();
|
||||
let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
|
||||
persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
|
||||
reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
|
||||
Ok(())
|
||||
@@ -730,10 +730,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
NetworkHandleMessage::ReputationChange(peer_id, kind) => {
|
||||
self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
|
||||
self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
|
||||
}
|
||||
NetworkHandleMessage::GetReputationById(peer_id, tx) => {
|
||||
let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
|
||||
let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
|
||||
}
|
||||
NetworkHandleMessage::FetchClient(tx) => {
|
||||
let _ = tx.send(self.fetch_client());
|
||||
@@ -756,7 +756,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
|
||||
}
|
||||
NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
|
||||
let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
|
||||
let peer_ids = self.swarm.peers().peers_by_kind(kind);
|
||||
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
|
||||
}
|
||||
NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
|
||||
@@ -791,7 +791,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
self.metrics.total_incoming_connections.increment(1);
|
||||
self.metrics
|
||||
.incoming_connections
|
||||
.set(self.swarm.state().peers().num_inbound_connections() as f64);
|
||||
.set(self.swarm.peers().num_inbound_connections() as f64);
|
||||
}
|
||||
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
|
||||
trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
|
||||
@@ -829,7 +829,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
}
|
||||
|
||||
if direction.is_outgoing() {
|
||||
self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
|
||||
self.swarm.peers_mut().on_active_outgoing_established(peer_id);
|
||||
}
|
||||
|
||||
self.update_active_connection_metrics();
|
||||
@@ -857,12 +857,12 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
SwarmEvent::PeerAdded(peer_id) => {
|
||||
trace!(target: "net", ?peer_id, "Peer added");
|
||||
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
|
||||
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
|
||||
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
|
||||
}
|
||||
SwarmEvent::PeerRemoved(peer_id) => {
|
||||
trace!(target: "net", ?peer_id, "Peer dropped");
|
||||
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
|
||||
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
|
||||
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
|
||||
}
|
||||
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
|
||||
let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
|
||||
@@ -877,23 +877,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
);
|
||||
|
||||
// Capture direction before state is reset to Idle
|
||||
let is_inbound = self.swarm.state().peers().is_inbound_peer(&peer_id);
|
||||
let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
|
||||
|
||||
let reason = if let Some(ref err) = error {
|
||||
// If the connection was closed due to an error, we report
|
||||
// the peer
|
||||
self.swarm.state_mut().peers_mut().on_active_session_dropped(
|
||||
&remote_addr,
|
||||
&peer_id,
|
||||
err,
|
||||
);
|
||||
self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
|
||||
self.backed_off_peers_metrics.increment_for_reason(
|
||||
BackoffReason::from_disconnect(err.as_disconnected()),
|
||||
);
|
||||
err.as_disconnected()
|
||||
} else {
|
||||
// Gracefully disconnected
|
||||
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
|
||||
self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
|
||||
self.backed_off_peers_metrics
|
||||
.increment_for_reason(BackoffReason::GracefulClose);
|
||||
None
|
||||
@@ -908,9 +904,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
self.disconnect_metrics.increment_outbound(reason);
|
||||
}
|
||||
}
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
|
||||
self.event_sender
|
||||
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
|
||||
}
|
||||
@@ -940,7 +934,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
self.closed_sessions_metrics.incoming_pending.increment(1);
|
||||
self.metrics
|
||||
.incoming_connections
|
||||
.set(self.swarm.state().peers().num_inbound_connections() as f64);
|
||||
.set(self.swarm.peers().num_inbound_connections() as f64);
|
||||
}
|
||||
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
|
||||
trace!(
|
||||
@@ -952,7 +946,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
);
|
||||
|
||||
if let Some(ref err) = error {
|
||||
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
|
||||
self.swarm.peers_mut().on_outgoing_pending_session_dropped(
|
||||
&remote_addr,
|
||||
&peer_id,
|
||||
err,
|
||||
@@ -972,9 +966,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
}
|
||||
self.closed_sessions_metrics.outgoing_pending.increment(1);
|
||||
self.update_pending_connection_metrics();
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
|
||||
}
|
||||
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
|
||||
trace!(
|
||||
@@ -985,16 +977,14 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
"Outgoing connection error"
|
||||
);
|
||||
|
||||
self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
|
||||
self.swarm.peers_mut().on_outgoing_connection_failure(
|
||||
&remote_addr,
|
||||
&peer_id,
|
||||
&error,
|
||||
);
|
||||
|
||||
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
|
||||
self.metrics
|
||||
.backed_off_peers
|
||||
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
|
||||
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
|
||||
self.update_pending_connection_metrics();
|
||||
}
|
||||
SwarmEvent::BadMessage { peer_id } => {
|
||||
@@ -1052,12 +1042,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
/// Updates the metrics for active,established connections
|
||||
#[inline]
|
||||
fn update_active_connection_metrics(&self) {
|
||||
self.metrics
|
||||
.incoming_connections
|
||||
.set(self.swarm.state().peers().num_inbound_connections() as f64);
|
||||
self.metrics
|
||||
.outgoing_connections
|
||||
.set(self.swarm.state().peers().num_outbound_connections() as f64);
|
||||
self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
|
||||
self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
|
||||
}
|
||||
|
||||
/// Updates the metrics for pending connections
|
||||
@@ -1065,7 +1051,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
fn update_pending_connection_metrics(&self) {
|
||||
self.metrics
|
||||
.pending_outgoing_connections
|
||||
.set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
|
||||
.set(self.swarm.peers().num_pending_outbound_connections() as f64);
|
||||
self.metrics
|
||||
.total_pending_connections
|
||||
.set(self.swarm.sessions().num_pending_connections() as f64);
|
||||
|
||||
@@ -9,9 +9,9 @@ use alloy_primitives::{Bytes, B256};
|
||||
use futures::FutureExt;
|
||||
use reth_eth_wire::{
|
||||
message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
|
||||
EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
|
||||
NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions,
|
||||
Receipts, SharedTransactions, Transactions,
|
||||
EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
|
||||
NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData,
|
||||
PooledTransactions, Receipts, SharedTransactions, Transactions,
|
||||
};
|
||||
use reth_eth_wire_types::RawCapabilityMessage;
|
||||
use reth_network_api::PeerRequest;
|
||||
@@ -77,6 +77,11 @@ pub enum BlockRequest {
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetBlockBodies(GetBlockBodies),
|
||||
|
||||
/// Requests receipts from the peer.
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetReceipts(GetReceipts),
|
||||
}
|
||||
|
||||
/// Corresponding variant for [`PeerRequest`].
|
||||
|
||||
@@ -862,7 +862,7 @@ impl PeersManager {
|
||||
}
|
||||
}
|
||||
|
||||
if kind.filter(|kind| kind.is_trusted()).is_some() {
|
||||
if kind.is_some_and(|kind| kind.is_trusted()) {
|
||||
// also track the peer in the peer id set
|
||||
self.trusted_peer_ids.insert(peer_id);
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
if headers.0.is_empty() {
|
||||
if headers.is_empty() {
|
||||
info!(
|
||||
target: "net::filter",
|
||||
"Peer {} does not have required block {}, banning",
|
||||
|
||||
@@ -1145,7 +1145,7 @@ mod tests {
|
||||
|
||||
let expected_disconnect = DisconnectReason::UselessPeer;
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
|
||||
let msg = client_stream.next().await.unwrap().unwrap_err();
|
||||
assert_eq!(msg.as_disconnected().unwrap(), expected_disconnect);
|
||||
});
|
||||
@@ -1168,7 +1168,7 @@ mod tests {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |client_stream| {
|
||||
drop(client_stream);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await
|
||||
});
|
||||
@@ -1198,7 +1198,7 @@ mod tests {
|
||||
|
||||
let num_messages = 100;
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
|
||||
for _ in 0..num_messages {
|
||||
client_stream
|
||||
.send(EthMessage::NewPooledTransactionHashes66(Vec::new().into()))
|
||||
@@ -1234,7 +1234,7 @@ mod tests {
|
||||
let request_timeout = Duration::from_millis(100);
|
||||
let drop_timeout = Duration::from_millis(1500);
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |client_stream| {
|
||||
let _client_stream = client_stream;
|
||||
tokio::time::sleep(drop_timeout * 60).await;
|
||||
});
|
||||
@@ -1287,7 +1287,7 @@ mod tests {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
|
||||
let _ = tokio::time::timeout(Duration::from_secs(5), client_stream.next()).await;
|
||||
client_stream.into_inner().disconnect(DisconnectReason::UselessPeer).await.unwrap();
|
||||
});
|
||||
@@ -1315,7 +1315,7 @@ mod tests {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
|
||||
client_stream
|
||||
.send(EthMessage::NewPooledTransactionHashes68(Default::default()))
|
||||
.await
|
||||
|
||||
@@ -13,11 +13,12 @@ use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::B256;
|
||||
use rand::seq::SliceRandom;
|
||||
use reth_eth_wire::{
|
||||
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
|
||||
NewBlockHashes, NewBlockPayload, UnifiedStatus,
|
||||
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, GetReceipts70,
|
||||
NetworkPrimitives, NewBlockHashes, NewBlockPayload, UnifiedStatus,
|
||||
};
|
||||
use reth_ethereum_forks::ForkId;
|
||||
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
|
||||
use reth_network_p2p::receipts::client::ReceiptsResponse;
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::{PeerAddr, PeerKind};
|
||||
use reth_primitives_traits::Block;
|
||||
@@ -382,8 +383,8 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
///
|
||||
/// Caution: this will replace an already pending response. It's the responsibility of the
|
||||
/// caller to select the peer.
|
||||
fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
|
||||
if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
|
||||
fn handle_block_request(&mut self, peer_id: PeerId, request: BlockRequest) {
|
||||
if let Some(ref mut peer) = self.active_peers.get_mut(&peer_id) {
|
||||
let (request, response) = match request {
|
||||
BlockRequest::GetBlockHeaders(request) => {
|
||||
let (response, rx) = oneshot::channel();
|
||||
@@ -397,6 +398,30 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
let response = PeerResponse::BlockBodies { response: rx };
|
||||
(request, response)
|
||||
}
|
||||
BlockRequest::GetReceipts(request) => {
|
||||
if peer.capabilities.supports_eth_v70() {
|
||||
let (response, rx) = oneshot::channel();
|
||||
let request = PeerRequest::GetReceipts70 {
|
||||
request: GetReceipts70 {
|
||||
first_block_receipt_index: 0,
|
||||
block_hashes: request.0,
|
||||
},
|
||||
response,
|
||||
};
|
||||
let response = PeerResponse::Receipts70 { response: rx };
|
||||
(request, response)
|
||||
} else if peer.capabilities.supports_eth_v69() {
|
||||
let (response, rx) = oneshot::channel();
|
||||
let request = PeerRequest::GetReceipts69 { request, response };
|
||||
let response = PeerResponse::Receipts69 { response: rx };
|
||||
(request, response)
|
||||
} else {
|
||||
let (response, rx) = oneshot::channel();
|
||||
let request = PeerRequest::GetReceipts { request, response };
|
||||
let response = PeerResponse::Receipts { response: rx };
|
||||
(request, response)
|
||||
}
|
||||
}
|
||||
};
|
||||
let _ = peer.request_tx.to_session_tx.try_send(request);
|
||||
peer.pending_response = Some(response);
|
||||
@@ -428,6 +453,27 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
PeerResponseResult::BlockBodies(res) => {
|
||||
self.state_fetcher.on_block_bodies_response(peer, res)
|
||||
}
|
||||
PeerResponseResult::Receipts(res) => {
|
||||
// Legacy eth/66-68: strip bloom filters and wrap in ReceiptsResponse
|
||||
let normalized = res.map(|blocks| {
|
||||
let receipts = blocks
|
||||
.into_iter()
|
||||
.map(|block_receipts| {
|
||||
block_receipts.into_iter().map(|rwb| rwb.receipt).collect()
|
||||
})
|
||||
.collect();
|
||||
ReceiptsResponse::new(receipts)
|
||||
});
|
||||
self.state_fetcher.on_receipts_response(peer, normalized)
|
||||
}
|
||||
PeerResponseResult::Receipts69(res) => {
|
||||
let normalized = res.map(ReceiptsResponse::new);
|
||||
self.state_fetcher.on_receipts_response(peer, normalized)
|
||||
}
|
||||
PeerResponseResult::Receipts70(res) => {
|
||||
let normalized = res.map(ReceiptsResponse::from);
|
||||
self.state_fetcher.on_receipts_response(peer, normalized)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
@@ -525,7 +571,6 @@ pub(crate) struct ActivePeer<N: NetworkPrimitives> {
|
||||
/// Best block of the peer.
|
||||
pub(crate) best_hash: B256,
|
||||
/// The capabilities of the remote peer.
|
||||
#[expect(dead_code)]
|
||||
pub(crate) capabilities: Arc<Capabilities>,
|
||||
/// A communication channel directly to the session task.
|
||||
pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
listener::{ConnectionListener, ListenerEvent},
|
||||
message::PeerMessage,
|
||||
peers::InboundConnectionError,
|
||||
peers::{InboundConnectionError, PeersManager},
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
|
||||
state::{NetworkState, StateAction},
|
||||
@@ -98,6 +98,16 @@ impl<N: NetworkPrimitives> Swarm<N> {
|
||||
pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
|
||||
&mut self.sessions
|
||||
}
|
||||
|
||||
/// Access to the [`PeersManager`].
|
||||
pub(crate) const fn peers(&self) -> &PeersManager {
|
||||
self.state.peers()
|
||||
}
|
||||
|
||||
/// Mutable access to the [`PeersManager`].
|
||||
pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
|
||||
self.state.peers_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> Swarm<N> {
|
||||
@@ -190,9 +200,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
|
||||
return None
|
||||
}
|
||||
// ensure we can handle an incoming connection from this address
|
||||
if let Err(err) =
|
||||
self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
|
||||
{
|
||||
if let Err(err) = self.peers_mut().on_incoming_pending_session(remote_addr.ip()) {
|
||||
match err {
|
||||
InboundConnectionError::IpBanned => {
|
||||
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
|
||||
@@ -256,21 +264,21 @@ impl<N: NetworkPrimitives> Swarm<N> {
|
||||
//
|
||||
// When disabled (default), peers without a fork ID are admitted immediately.
|
||||
// Peers that *do* carry a fork ID are always validated against ours.
|
||||
let enforce = self.state().peers().enforce_enr_fork_id();
|
||||
let enforce = self.peers().enforce_enr_fork_id();
|
||||
let allow = match fork_id {
|
||||
Some(f) => self.sessions.is_valid_fork_id(f),
|
||||
None => !enforce,
|
||||
};
|
||||
if allow {
|
||||
self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
|
||||
self.peers_mut().add_peer(peer_id, addr, fork_id);
|
||||
}
|
||||
}
|
||||
StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
|
||||
if self.sessions.is_valid_fork_id(fork_id) {
|
||||
self.state_mut().peers_mut().add_peer(peer_id, addr, Some(fork_id));
|
||||
self.peers_mut().add_peer(peer_id, addr, Some(fork_id));
|
||||
} else {
|
||||
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
|
||||
self.state_mut().peers_mut().remove_peer(peer_id);
|
||||
self.peers_mut().remove_peer(peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -279,18 +287,18 @@ impl<N: NetworkPrimitives> Swarm<N> {
|
||||
|
||||
/// Set network connection state to `ShuttingDown`
|
||||
pub(crate) const fn on_shutdown_requested(&mut self) {
|
||||
self.state_mut().peers_mut().on_shutdown();
|
||||
self.peers_mut().on_shutdown();
|
||||
}
|
||||
|
||||
/// Checks if the node's network connection state is '`ShuttingDown`'
|
||||
#[inline]
|
||||
pub(crate) const fn is_shutting_down(&self) -> bool {
|
||||
self.state().peers().connection_state().is_shutting_down()
|
||||
self.peers().connection_state().is_shutting_down()
|
||||
}
|
||||
|
||||
/// Set network connection state to `Hibernate` or `Active`
|
||||
pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
|
||||
self.state_mut().peers_mut().on_network_state_change(network_state);
|
||||
self.peers_mut().on_network_state_change(network_state);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1295,7 +1295,6 @@ where
|
||||
let has_blob_txs = msg.has_eip4844();
|
||||
|
||||
let non_blob_txs = msg
|
||||
.0
|
||||
.into_iter()
|
||||
.map(N::PooledTransaction::try_from)
|
||||
.filter_map(Result::ok)
|
||||
@@ -1344,9 +1343,23 @@ where
|
||||
return
|
||||
}
|
||||
|
||||
let mut transactions = transactions.0;
|
||||
|
||||
// Truncate to remaining capacity early to bound work on all subsequent processing.
|
||||
// Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
|
||||
// malicious and we avoid wasting CPU on them.
|
||||
let capacity = self.remaining_pool_import_capacity();
|
||||
if transactions.len() > capacity {
|
||||
let skipped = transactions.len() - capacity;
|
||||
transactions.truncate(capacity);
|
||||
self.metrics
|
||||
.skipped_transactions_pending_pool_imports_at_capacity
|
||||
.increment(skipped as u64);
|
||||
trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
|
||||
}
|
||||
|
||||
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
|
||||
let client_version = peer.client_version.clone();
|
||||
let mut transactions = transactions.0;
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -1365,20 +1378,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// 1. filter out txns already inserted into pool
|
||||
let txns_count_pre_pool_filter = transactions.len();
|
||||
self.pool.retain_unknown(&mut transactions);
|
||||
if txns_count_pre_pool_filter > transactions.len() {
|
||||
let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
|
||||
self.metrics
|
||||
.occurrences_transactions_already_in_pool
|
||||
.increment(already_known_txns_count as u64);
|
||||
}
|
||||
|
||||
// tracks the quality of the given transactions
|
||||
let mut has_bad_transactions = false;
|
||||
|
||||
// Remove known and invalid transactions
|
||||
// 1. Remove known, already-tracked, and invalid transactions first since these are
|
||||
// cheap in-memory checks against local maps
|
||||
transactions.retain(|tx| {
|
||||
if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
|
||||
entry.get_mut().insert(peer_id);
|
||||
@@ -1397,16 +1401,14 @@ where
|
||||
true
|
||||
});
|
||||
|
||||
// Truncate to remaining capacity before recovery to avoid wasting CPU on transactions
|
||||
// that won't be imported anyway.
|
||||
let capacity = self.remaining_pool_import_capacity();
|
||||
if transactions.len() > capacity {
|
||||
let skipped = transactions.len() - capacity;
|
||||
transactions.truncate(capacity);
|
||||
// 2. filter out txns already inserted into pool
|
||||
let txns_count_pre_pool_filter = transactions.len();
|
||||
self.pool.retain_unknown(&mut transactions);
|
||||
if txns_count_pre_pool_filter > transactions.len() {
|
||||
let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
|
||||
self.metrics
|
||||
.skipped_transactions_pending_pool_imports_at_capacity
|
||||
.increment(skipped as u64);
|
||||
trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
|
||||
.occurrences_transactions_already_in_pool
|
||||
.increment(already_known_txns_count as u64);
|
||||
}
|
||||
|
||||
let txs_len = transactions.len();
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::{
|
||||
download::DownloadClient,
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
priority::Priority,
|
||||
receipts::client::ReceiptsClient,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
|
||||
@@ -75,3 +76,19 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> ReceiptsClient for Either<A, B>
|
||||
where
|
||||
A: ReceiptsClient,
|
||||
B: ReceiptsClient<Receipt = A::Receipt>,
|
||||
{
|
||||
type Receipt = A::Receipt;
|
||||
type Output = Either<A::Output, B::Output>;
|
||||
|
||||
fn get_receipts_with_priority(&self, hashes: Vec<B256>, priority: Priority) -> Self::Output {
|
||||
match self {
|
||||
Self::Left(a) => Either::Left(a.get_receipts_with_priority(hashes, priority)),
|
||||
Self::Right(b) => Either::Right(b.get_receipts_with_priority(hashes, priority)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,9 @@ pub mod download;
|
||||
/// Traits for implementing P2P block body clients.
|
||||
pub mod bodies;
|
||||
|
||||
/// Traits for implementing P2P receipt clients.
|
||||
pub mod receipts;
|
||||
|
||||
/// A downloader that combines two different downloaders/client implementations.
|
||||
pub mod either;
|
||||
|
||||
@@ -51,6 +54,7 @@ pub mod test_utils;
|
||||
|
||||
pub use bodies::client::BodiesClient;
|
||||
pub use headers::client::HeadersClient;
|
||||
pub use receipts::client::ReceiptsClient;
|
||||
use reth_primitives_traits::Block;
|
||||
|
||||
/// Helper trait that unifies network behaviour needed for fetching entire blocks.
|
||||
|
||||
65
crates/net/p2p/src/receipts/client.rs
Normal file
65
crates/net/p2p/src/receipts/client.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
|
||||
use alloy_consensus::TxReceipt;
|
||||
use alloy_primitives::B256;
|
||||
use futures::Future;
|
||||
use reth_eth_wire_types::Receipts70;
|
||||
|
||||
/// The receipts future type
|
||||
pub type ReceiptsFut<R = reth_ethereum_primitives::Receipt> =
|
||||
Pin<Box<dyn Future<Output = PeerRequestResult<ReceiptsResponse<R>>> + Send + Sync>>;
|
||||
|
||||
/// Response from a receipts request.
|
||||
///
|
||||
/// **Note for [`ReceiptsClient`] callers:** the network layer handles eth/70
|
||||
/// continuation rounds internally, so `last_block_incomplete` is always `false`
|
||||
/// by the time this response reaches a [`ReceiptsClient`] consumer. The field
|
||||
/// exists for internal use by the fetcher during multi-round assembly.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ReceiptsResponse<R> {
|
||||
/// Receipts grouped by block, in the same order as the requested hashes.
|
||||
pub receipts: Vec<Vec<R>>,
|
||||
/// When `true`, the **last** block in [`receipts`](Self::receipts) was
|
||||
/// truncated by the remote peer (eth/70 `Receipts70.last_block_incomplete`).
|
||||
///
|
||||
/// This is used internally by the fetcher to drive continuation rounds.
|
||||
/// Responses surfaced through [`ReceiptsClient`] always have this set to
|
||||
/// `false`.
|
||||
pub last_block_incomplete: bool,
|
||||
}
|
||||
|
||||
impl<R> ReceiptsResponse<R> {
|
||||
/// Creates a complete (non-truncated) response.
|
||||
#[inline]
|
||||
pub const fn new(receipts: Vec<Vec<R>>) -> Self {
|
||||
Self { receipts, last_block_incomplete: false }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> From<Receipts70<R>> for ReceiptsResponse<R> {
|
||||
fn from(r70: Receipts70<R>) -> Self {
|
||||
Self { receipts: r70.receipts, last_block_incomplete: r70.last_block_incomplete }
|
||||
}
|
||||
}
|
||||
|
||||
/// A client capable of downloading block receipts from peers.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait ReceiptsClient: DownloadClient {
|
||||
/// The receipt type this client fetches.
|
||||
type Receipt: TxReceipt;
|
||||
|
||||
/// The output of the request future for querying block receipts.
|
||||
type Output: Future<Output = PeerRequestResult<ReceiptsResponse<Self::Receipt>>>
|
||||
+ Sync
|
||||
+ Send
|
||||
+ Unpin;
|
||||
|
||||
/// Fetches the receipts for the requested block hashes.
|
||||
fn get_receipts(&self, hashes: Vec<B256>) -> Self::Output {
|
||||
self.get_receipts_with_priority(hashes, Priority::Normal)
|
||||
}
|
||||
|
||||
/// Fetches the receipts for the requested block hashes with priority.
|
||||
fn get_receipts_with_priority(&self, hashes: Vec<B256>, priority: Priority) -> Self::Output;
|
||||
}
|
||||
2
crates/net/p2p/src/receipts/mod.rs
Normal file
2
crates/net/p2p/src/receipts/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
/// Traits and types for receipt clients.
|
||||
pub mod client;
|
||||
@@ -1,7 +1,9 @@
|
||||
mod bodies;
|
||||
mod full_block;
|
||||
mod headers;
|
||||
mod receipts;
|
||||
|
||||
pub use bodies::*;
|
||||
pub use full_block::*;
|
||||
pub use headers::*;
|
||||
pub use receipts::*;
|
||||
|
||||
49
crates/net/p2p/src/test_utils/receipts.rs
Normal file
49
crates/net/p2p/src/test_utils/receipts.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use crate::{
|
||||
download::DownloadClient,
|
||||
error::PeerRequestResult,
|
||||
priority::Priority,
|
||||
receipts::client::{ReceiptsClient, ReceiptsFut, ReceiptsResponse},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use futures::FutureExt;
|
||||
use reth_ethereum_primitives::Receipt;
|
||||
use reth_network_peers::PeerId;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// A test client for fetching receipts
|
||||
pub struct TestReceiptsClient<F> {
|
||||
/// The function that is called on each receipt request.
|
||||
pub responder: F,
|
||||
}
|
||||
|
||||
impl<F> Debug for TestReceiptsClient<F> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("TestReceiptsClient").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Sync + Send> DownloadClient for TestReceiptsClient<F> {
|
||||
fn report_bad_message(&self, _peer_id: PeerId) {}
|
||||
|
||||
fn num_connected_peers(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> ReceiptsClient for TestReceiptsClient<F>
|
||||
where
|
||||
F: Fn(Vec<B256>) -> PeerRequestResult<ReceiptsResponse<Receipt>> + Send + Sync,
|
||||
{
|
||||
type Receipt = Receipt;
|
||||
type Output = ReceiptsFut;
|
||||
|
||||
fn get_receipts_with_priority(&self, hashes: Vec<B256>, _priority: Priority) -> Self::Output {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = tx.send((self.responder)(hashes));
|
||||
Box::pin(rx.map(|x| match x {
|
||||
Ok(value) => value,
|
||||
Err(err) => Err(err.into()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
//! `NodeRecord` type that uses a domain instead of an IP.
|
||||
|
||||
use crate::{NodeRecord, PeerId};
|
||||
use alloc::string::{String, ToString};
|
||||
use crate::{NodeRecord, NodeRecordParseError, PeerId};
|
||||
use alloc::string::ToString;
|
||||
use core::{
|
||||
fmt::{self, Write},
|
||||
net::IpAddr,
|
||||
num::ParseIntError,
|
||||
str::FromStr,
|
||||
};
|
||||
use serde_with::{DeserializeFromStr, SerializeDisplay};
|
||||
@@ -113,20 +112,6 @@ impl fmt::Display for TrustedPeer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Possible error types when parsing a [`NodeRecord`]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum NodeRecordParseError {
|
||||
/// Invalid url
|
||||
#[error("Failed to parse url: {0}")]
|
||||
InvalidUrl(String),
|
||||
/// Invalid id
|
||||
#[error("Failed to parse id")]
|
||||
InvalidId(String),
|
||||
/// Invalid discport
|
||||
#[error("Failed to discport query: {0}")]
|
||||
Discport(ParseIntError),
|
||||
}
|
||||
|
||||
impl FromStr for TrustedPeer {
|
||||
type Err = NodeRecordParseError;
|
||||
|
||||
|
||||
@@ -903,15 +903,15 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
.request_handler(self.provider().clone())
|
||||
.split_with_handle();
|
||||
|
||||
self.executor.spawn_critical_blocking_task("p2p txpool", Box::pin(txpool));
|
||||
self.executor.spawn_critical_blocking_task("p2p eth request handler", Box::pin(eth));
|
||||
self.executor.spawn_critical_blocking_task("p2p txpool", txpool);
|
||||
self.executor.spawn_critical_blocking_task("p2p eth request handler", eth);
|
||||
|
||||
let default_peers_path = self.config().datadir().known_peers();
|
||||
let known_peers_file = self.config().network.persistent_peers_file(default_peers_path);
|
||||
self.executor.spawn_critical_with_graceful_shutdown_signal(
|
||||
"p2p network task",
|
||||
|shutdown| {
|
||||
Box::pin(network.run_until_graceful_shutdown(shutdown, |network| {
|
||||
network.run_until_graceful_shutdown(shutdown, |network| {
|
||||
if let Some(peers_file) = known_peers_file {
|
||||
let num_known_peers = network.num_known_peers();
|
||||
trace!(target: "reth::cli", peers_file=?peers_file, num_peers=%num_known_peers, "Saving current peers");
|
||||
@@ -924,7 +924,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -107,8 +107,7 @@ where
|
||||
let (payload_service, payload_service_handle) =
|
||||
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
|
||||
|
||||
ctx.task_executor()
|
||||
.spawn_critical_task("payload builder service", Box::pin(payload_service));
|
||||
ctx.task_executor().spawn_critical_task("payload builder service", payload_service);
|
||||
|
||||
Ok(payload_service_handle)
|
||||
}
|
||||
|
||||
@@ -557,13 +557,10 @@ where
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Pipeline should be run as blocking and panic if it fails.
|
||||
self.task_executor().spawn_critical_blocking_task(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
|
||||
let _ = tx.send(result);
|
||||
}),
|
||||
);
|
||||
self.task_executor().spawn_critical_blocking_task("pipeline task", async move {
|
||||
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
|
||||
let _ = tx.send(result);
|
||||
});
|
||||
rx.await?.inspect_err(|err| {
|
||||
error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
|
||||
})?;
|
||||
@@ -1105,7 +1102,7 @@ where
|
||||
// If engine events are provided, spawn listener for new payload reporting
|
||||
let ethstats_for_events = ethstats.clone();
|
||||
let task_executor = self.task_executor().clone();
|
||||
task_executor.spawn_task(Box::pin(async move {
|
||||
task_executor.spawn_task(async move {
|
||||
while let Some(event) = engine_events.next().await {
|
||||
use reth_engine_primitives::ConsensusEngineEvent;
|
||||
match event {
|
||||
@@ -1128,10 +1125,10 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
});
|
||||
|
||||
// Spawn main ethstats service
|
||||
task_executor.spawn_task(Box::pin(async move { ethstats.run().await }));
|
||||
task_executor.spawn_task(async move { ethstats.run().await });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -256,11 +256,11 @@ impl EngineNodeLauncher {
|
||||
|
||||
ctx.task_executor().spawn_critical_task(
|
||||
"events task",
|
||||
Box::pin(node::handle_events(
|
||||
node::handle_events(
|
||||
Some(Box::new(ctx.components().network().clone())),
|
||||
Some(ctx.head().number),
|
||||
events,
|
||||
)),
|
||||
),
|
||||
);
|
||||
|
||||
let RpcHandle {
|
||||
@@ -375,7 +375,7 @@ impl EngineNodeLauncher {
|
||||
|
||||
let _ = exit.send(res);
|
||||
};
|
||||
ctx.task_executor().spawn_critical_task("consensus engine", Box::pin(consensus_engine));
|
||||
ctx.task_executor().spawn_critical_task("consensus engine", consensus_engine);
|
||||
|
||||
let engine_events_for_ethstats = engine_events.new_listener();
|
||||
|
||||
|
||||
@@ -993,12 +993,9 @@ where
|
||||
|
||||
let new_canonical_blocks = node.provider().canonical_state_stream();
|
||||
let c = cache.clone();
|
||||
node.task_executor().spawn_critical_task(
|
||||
"cache canonical blocks task",
|
||||
Box::pin(async move {
|
||||
cache_new_blocks_task(c, new_canonical_blocks).await;
|
||||
}),
|
||||
);
|
||||
node.task_executor().spawn_critical_task("cache canonical blocks task", async move {
|
||||
cache_new_blocks_task(c, new_canonical_blocks).await;
|
||||
});
|
||||
|
||||
let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size());
|
||||
let ctx = EthApiCtx {
|
||||
|
||||
@@ -78,6 +78,10 @@ pub struct BenchmarkArgs {
|
||||
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
pub reth_new_payload: bool,
|
||||
|
||||
/// Fetch and replay RLP-encoded blocks. Implies `reth_new_payload`.
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
pub rlp_blocks: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -461,8 +461,6 @@ impl EngineArgs {
|
||||
self.always_process_payload_attributes_on_canonical_head,
|
||||
)
|
||||
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
|
||||
.with_storage_worker_count_opt(self.storage_worker_count)
|
||||
.with_account_worker_count_opt(self.account_worker_count)
|
||||
.without_cache_metrics(self.cache_metrics_disabled)
|
||||
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
|
||||
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
|
||||
|
||||
@@ -557,7 +557,7 @@ pub struct DiscoveryArgs {
|
||||
/// The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is
|
||||
/// IPv6, or `--discovery.addr.ipv6` is set.
|
||||
#[arg(id = "discovery.v5.port.ipv6", long = "discovery.v5.port.ipv6", value_name = "DISCOVERY_V5_PORT_IPV6",
|
||||
default_value = None, default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
|
||||
default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
|
||||
pub discv5_port_ipv6: u16,
|
||||
|
||||
/// The interval in seconds at which to carry out periodic lookup queries, for the whole
|
||||
|
||||
@@ -648,11 +648,8 @@ where
|
||||
let shutdown_tx = shutdown_tx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let head = canonical_stream.next().await;
|
||||
if let Some(head) = head &&
|
||||
head_tx.send(head).await.is_err()
|
||||
{
|
||||
while let Some(head) = canonical_stream.next().await {
|
||||
if head_tx.send(head).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,43 +130,34 @@ impl MetricServer {
|
||||
|
||||
tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
|
||||
|
||||
task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
|
||||
Box::pin(async move {
|
||||
loop {
|
||||
let io = tokio::select! {
|
||||
_ = &mut signal => break,
|
||||
io = listener.accept() => {
|
||||
match io {
|
||||
Ok((stream, _remote_addr)) => stream,
|
||||
Err(err) => {
|
||||
tracing::error!(%err, "failed to accept connection");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
|
||||
let io = tokio::select! {
|
||||
_ = &mut signal => break,
|
||||
io = listener.accept() => {
|
||||
match io {
|
||||
Ok((stream, _remote_addr)) => stream,
|
||||
Err(err) => {
|
||||
tracing::error!(%err, "failed to accept connection");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let handle = install_prometheus_recorder();
|
||||
let hook = hook.clone();
|
||||
let pprof_dump_dir = pprof_dump_dir.clone();
|
||||
let service = tower::service_fn(move |req: Request<_>| {
|
||||
let response =
|
||||
handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir);
|
||||
async move { Ok::<_, Infallible>(response) }
|
||||
});
|
||||
|
||||
let mut shutdown = signal.clone().ignore_guard();
|
||||
tokio::task::spawn(async move {
|
||||
let _ = jsonrpsee_server::serve_with_graceful_shutdown(
|
||||
io,
|
||||
service,
|
||||
&mut shutdown,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let handle = install_prometheus_recorder();
|
||||
let hook = hook.clone();
|
||||
let pprof_dump_dir = pprof_dump_dir.clone();
|
||||
let service = tower::service_fn(move |req: Request<_>| {
|
||||
let response = handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir);
|
||||
async move { Ok::<_, Infallible>(response) }
|
||||
});
|
||||
|
||||
let mut shutdown = signal.clone().ignore_guard();
|
||||
tokio::task::spawn(async move {
|
||||
let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
|
||||
.await
|
||||
.inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
|
||||
});
|
||||
});
|
||||
|
||||
Ok(())
|
||||
@@ -183,36 +174,34 @@ impl MetricServer {
|
||||
let client = Client::builder()
|
||||
.build()
|
||||
.wrap_err("Could not create HTTP client to push metrics to gateway")?;
|
||||
task_executor.spawn_with_graceful_shutdown_signal(move |mut signal| {
|
||||
Box::pin(async move {
|
||||
tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
|
||||
let handle = install_prometheus_recorder();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut signal => {
|
||||
tracing::info!("Shutting down task to push metrics to gateway");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(interval) => {
|
||||
hooks.iter().for_each(|hook| hook());
|
||||
let metrics = handle.handle().render();
|
||||
match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
|
||||
Ok(response) => {
|
||||
if !response.status().is_success() {
|
||||
tracing::warn!(
|
||||
status = %response.status(),
|
||||
"Failed to push metrics to gateway"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(%err, "Failed to push metrics to gateway");
|
||||
task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
|
||||
tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
|
||||
let handle = install_prometheus_recorder();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut signal => {
|
||||
tracing::info!("Shutting down task to push metrics to gateway");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(interval) => {
|
||||
hooks.iter().for_each(|hook| hook());
|
||||
let metrics = handle.handle().render();
|
||||
match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
|
||||
Ok(response) => {
|
||||
if !response.status().is_success() {
|
||||
tracing::warn!(
|
||||
status = %response.status(),
|
||||
"Failed to push metrics to gateway"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(%err, "Failed to push metrics to gateway");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -376,48 +376,56 @@ where
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
// check if the interval is reached
|
||||
while this.interval.poll_tick(cx).is_ready() {
|
||||
// start a new job if there is no pending block, we haven't reached the deadline,
|
||||
// and the payload isn't frozen
|
||||
if this.pending_block.is_none() && !this.best_payload.is_frozen() {
|
||||
this.spawn_build_job();
|
||||
}
|
||||
}
|
||||
|
||||
// poll the pending block
|
||||
if let Some(mut fut) = this.pending_block.take() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(outcome)) => match outcome {
|
||||
BuildOutcome::Better { payload, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
|
||||
this.best_payload = PayloadState::Best(payload);
|
||||
loop {
|
||||
// Wait for any pending build to complete before polling the next tick.
|
||||
//
|
||||
// This avoids consuming interval ticks while a build is still in-flight,
|
||||
// which would delay the follow-up build by a full interval even though
|
||||
// the current attempt has already finished.
|
||||
if let Some(mut fut) = this.pending_block.take() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(outcome)) => match outcome {
|
||||
BuildOutcome::Better { payload, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
|
||||
this.best_payload = PayloadState::Best(payload);
|
||||
}
|
||||
BuildOutcome::Freeze(payload) => {
|
||||
debug!(target: "payload_builder", "payload frozen, no further building will occur");
|
||||
this.best_payload = PayloadState::Frozen(payload);
|
||||
}
|
||||
BuildOutcome::Aborted { fees, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
|
||||
}
|
||||
BuildOutcome::Cancelled => {
|
||||
unreachable!("the cancel signal never fired")
|
||||
}
|
||||
},
|
||||
Poll::Ready(Err(error)) => {
|
||||
// job failed, but we simply try again next interval
|
||||
debug!(target: "payload_builder", %error, "payload build attempt failed");
|
||||
this.metrics.inc_failed_payload_builds();
|
||||
}
|
||||
BuildOutcome::Freeze(payload) => {
|
||||
debug!(target: "payload_builder", "payload frozen, no further building will occur");
|
||||
this.best_payload = PayloadState::Frozen(payload);
|
||||
Poll::Pending => {
|
||||
this.pending_block = Some(fut);
|
||||
return Poll::Pending
|
||||
}
|
||||
BuildOutcome::Aborted { fees, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
|
||||
}
|
||||
BuildOutcome::Cancelled => {
|
||||
unreachable!("the cancel signal never fired")
|
||||
}
|
||||
},
|
||||
Poll::Ready(Err(error)) => {
|
||||
// job failed, but we simply try again next interval
|
||||
debug!(target: "payload_builder", %error, "payload build attempt failed");
|
||||
this.metrics.inc_failed_payload_builds();
|
||||
}
|
||||
Poll::Pending => {
|
||||
this.pending_block = Some(fut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
if this.best_payload.is_frozen() {
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
// Wait for the next build interval tick.
|
||||
//
|
||||
// The loop is needed because `poll_tick` does not register a waker
|
||||
// when it returns `Ready`, so we must loop back after spawning a job
|
||||
// to reach a point that *does* register one (the pending block poll above).
|
||||
ready!(this.interval.poll_tick(cx));
|
||||
this.spawn_build_job()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,12 +50,12 @@ where
|
||||
tracing::info!("MyMiddleware processed call {}", req.method);
|
||||
let count = self.count.clone();
|
||||
let service = self.service.clone();
|
||||
Box::pin(async move {
|
||||
async move {
|
||||
let rp = service.call(req).await;
|
||||
// Modify the state.
|
||||
count.fetch_add(1, Ordering::Relaxed);
|
||||
rp
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
|
||||
|
||||
@@ -8,7 +8,7 @@ use jsonrpsee_types::error::{
|
||||
};
|
||||
use reth_engine_primitives::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError};
|
||||
use reth_payload_builder_primitives::PayloadBuilderError;
|
||||
use reth_payload_primitives::EngineObjectValidationError;
|
||||
use reth_payload_primitives::{EngineObjectValidationError, VersionSpecificValidationError};
|
||||
use thiserror::Error;
|
||||
|
||||
/// The Engine API result type
|
||||
@@ -117,11 +117,14 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
|
||||
EngineObjectValidationError::Payload(_) |
|
||||
EngineObjectValidationError::InvalidParams(_) |
|
||||
// Per Engine API spec, structure validation errors for PayloadAttributes
|
||||
// (e.g., missing withdrawals post-Shanghai, missing parentBeaconBlockRoot
|
||||
// post-Cancun) should return -32602 "Invalid params", not -38003.
|
||||
// (e.g., missing withdrawals post-Shanghai) should return -32602 "Invalid params".
|
||||
// See: https://github.com/ethereum/execution-apis/blob/main/src/engine/shanghai.md
|
||||
// Fixes: https://github.com/paradigmxyz/reth/issues/8732
|
||||
EngineObjectValidationError::PayloadAttributes(_),
|
||||
EngineObjectValidationError::PayloadAttributes(
|
||||
VersionSpecificValidationError::WithdrawalsNotSupportedInV1 |
|
||||
VersionSpecificValidationError::NoWithdrawalsPostShanghai |
|
||||
VersionSpecificValidationError::HasWithdrawalsPreShanghai,
|
||||
),
|
||||
) |
|
||||
EngineApiError::UnexpectedRequestsHash => {
|
||||
// Note: the data field is not required by the spec, but is also included by other
|
||||
@@ -145,6 +148,16 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
|
||||
Some(ErrorData::new(error)),
|
||||
)
|
||||
}
|
||||
EngineApiError::EngineObjectValidationError(
|
||||
EngineObjectValidationError::PayloadAttributes(
|
||||
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3 |
|
||||
VersionSpecificValidationError::NoParentBeaconBlockRootPostCancun,
|
||||
),
|
||||
) => jsonrpsee_types::error::ErrorObject::owned(
|
||||
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
|
||||
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
|
||||
Some(ErrorData::new(error)),
|
||||
),
|
||||
EngineApiError::EngineObjectValidationError(
|
||||
EngineObjectValidationError::UnsupportedFork,
|
||||
) => jsonrpsee_types::error::ErrorObject::owned(
|
||||
@@ -198,8 +211,6 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_rpc_types_engine::ForkchoiceUpdateError;
|
||||
use reth_payload_primitives::VersionSpecificValidationError;
|
||||
|
||||
#[track_caller]
|
||||
fn ensure_engine_rpc_error(
|
||||
code: i32,
|
||||
@@ -265,5 +276,16 @@ mod tests {
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// Beacon root shape mismatches on PayloadAttributes are reported as -38003.
|
||||
ensure_engine_rpc_error(
|
||||
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
|
||||
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
|
||||
EngineApiError::EngineObjectValidationError(
|
||||
EngineObjectValidationError::PayloadAttributes(
|
||||
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3,
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,8 +74,12 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
|
||||
block_id: BlockId,
|
||||
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send {
|
||||
async move {
|
||||
// If no pending block from provider, build the pending block locally.
|
||||
if block_id.is_pending() {
|
||||
if self.pending_block_kind().is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// If no pending block from provider, build the pending block locally.
|
||||
if let Some(pending) = self.local_pending_block().await? {
|
||||
return Ok(Some(pending.block.body().transaction_count()));
|
||||
}
|
||||
@@ -180,6 +184,10 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
|
||||
{
|
||||
async move {
|
||||
if block_id.is_pending() {
|
||||
if self.pending_block_kind().is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// First, try to get the pending block from the provider, in case we already
|
||||
// received the actual pending block from the CL.
|
||||
if let Some((block, receipts)) = self
|
||||
@@ -284,6 +292,10 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt {
|
||||
> + Send {
|
||||
async move {
|
||||
if block_id.is_pending() {
|
||||
if self.pending_block_kind().is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Pending block can be fetched directly without need for caching
|
||||
if let Some(pending_block) =
|
||||
self.provider().pending_block().map_err(Self::Error::from_eth_err)?
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user