mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
95 Commits
fix/slow-p
...
fix/mdbx-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9dbe5adea | ||
|
|
596a95fc04 | ||
|
|
5159f40452 | ||
|
|
7f0a6a67b1 | ||
|
|
d2a43a9288 | ||
|
|
369c629b9b | ||
|
|
6fec4603cf | ||
|
|
515fd597f3 | ||
|
|
126a7c9570 | ||
|
|
8aeee5018e | ||
|
|
210309ca76 | ||
|
|
551918b0d8 | ||
|
|
89677e1bd9 | ||
|
|
0e2b3afa3f | ||
|
|
5d551eab29 | ||
|
|
12c4c04f7d | ||
|
|
392f8e6e13 | ||
|
|
1a94d1f091 | ||
|
|
97ae89c7f0 | ||
|
|
a4921119e4 | ||
|
|
0f3d3695f5 | ||
|
|
54355dfc78 | ||
|
|
44a6035fa3 | ||
|
|
746baed2b1 | ||
|
|
e86c5fba53 | ||
|
|
485fa3448d | ||
|
|
0db3813941 | ||
|
|
52c2ae3362 | ||
|
|
b1d75f2771 | ||
|
|
ef80ee1687 | ||
|
|
8dacfb3d9c | ||
|
|
425a021e3b | ||
|
|
08c0d30ea7 | ||
|
|
84e970e4c9 | ||
|
|
020f20db42 | ||
|
|
f53929e0c8 | ||
|
|
4a8fbe15e3 | ||
|
|
a59e9832e6 | ||
|
|
07beb76cf7 | ||
|
|
3ddf0bd729 | ||
|
|
c3d92ddfc2 | ||
|
|
c0628dfbff | ||
|
|
a2aa1f18df | ||
|
|
d489f80f6b | ||
|
|
bf272c9432 | ||
|
|
ebb54d0dcc | ||
|
|
1d7367c389 | ||
|
|
824ae12d75 | ||
|
|
2db281e51d | ||
|
|
8367ba473e | ||
|
|
f2abad5f5c | ||
|
|
4673d77c03 | ||
|
|
33bcd60348 | ||
|
|
8a9b5d90f4 | ||
|
|
c26cfa3dcb | ||
|
|
13e59651f1 | ||
|
|
0f4995d1ea | ||
|
|
cff7e8be53 | ||
|
|
5433d7a4ac | ||
|
|
1866db4d50 | ||
|
|
c9b92550b6 | ||
|
|
8e81ebfc1f | ||
|
|
1363205b5d | ||
|
|
ed201cae0e | ||
|
|
a5b10f11ce | ||
|
|
a06644944f | ||
|
|
8eecad3d1d | ||
|
|
412f39e223 | ||
|
|
13106233e4 | ||
|
|
e63fef0e79 | ||
|
|
eed34254f5 | ||
|
|
b38d37a1e1 | ||
|
|
7efaf4ca97 | ||
|
|
ef708792a9 | ||
|
|
bcd74d021b | ||
|
|
0f0a181fe2 | ||
|
|
9678d6c76d | ||
|
|
7ceca70353 | ||
|
|
4412a501eb | ||
|
|
3ca5cf49b6 | ||
|
|
1d4603769f | ||
|
|
9bba8c7a98 | ||
|
|
6f0ef914b9 | ||
|
|
d756e8310a | ||
|
|
74a7ba581c | ||
|
|
a8980bf7c1 | ||
|
|
050d9f440f | ||
|
|
df33a8200f | ||
|
|
d3dab613fc | ||
|
|
1b31739adf | ||
|
|
6280abedd0 | ||
|
|
4c064a4d20 | ||
|
|
8d19a36492 | ||
|
|
78f2685ee9 | ||
|
|
fee7e997ff |
4
.github/workflows/docker-nightly.yml
vendored
4
.github/workflows/docker-nightly.yml
vendored
@@ -28,10 +28,14 @@ jobs:
|
||||
build:
|
||||
- name: 'Build and push the nightly reth image'
|
||||
command: 'make PROFILE=maxperf docker-build-push-nightly'
|
||||
- name: 'Build and push the nightly edge profiling reth image'
|
||||
command: 'make PROFILE=profiling docker-build-push-nightly-edge-profiling'
|
||||
- name: 'Build and push the nightly profiling reth image'
|
||||
command: 'make PROFILE=profiling docker-build-push-nightly-profiling'
|
||||
- name: 'Build and push the nightly op-reth image'
|
||||
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-nightly'
|
||||
- name: 'Build and push the nightly edge profiling op-reth image'
|
||||
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-edge-profiling'
|
||||
- name: 'Build and push the nightly profiling op-reth image'
|
||||
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-profiling'
|
||||
steps:
|
||||
|
||||
22
.github/workflows/hive.yml
vendored
22
.github/workflows/hive.yml
vendored
@@ -15,11 +15,21 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
prepare-reth:
|
||||
prepare-reth-stable:
|
||||
uses: ./.github/workflows/prepare-reth.yml
|
||||
with:
|
||||
image_tag: ghcr.io/paradigmxyz/reth:latest
|
||||
binary_name: reth
|
||||
cargo_features: "asm-keccak"
|
||||
artifact_name: "reth-stable"
|
||||
|
||||
prepare-reth-edge:
|
||||
uses: ./.github/workflows/prepare-reth.yml
|
||||
with:
|
||||
image_tag: ghcr.io/paradigmxyz/reth:latest
|
||||
binary_name: reth
|
||||
cargo_features: "asm-keccak edge"
|
||||
artifact_name: "reth-edge"
|
||||
|
||||
prepare-hive:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
@@ -77,6 +87,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
storage: [stable, edge]
|
||||
# ethereum/rpc to be deprecated:
|
||||
# https://github.com/ethereum/hive/pull/1117
|
||||
scenario:
|
||||
@@ -86,7 +97,7 @@ jobs:
|
||||
- sim: devp2p
|
||||
limit: discv4
|
||||
# started failing after https://github.com/ethereum/go-ethereum/pull/31843, no
|
||||
# action on our side, remove from here when we get unxpected passes on these tests
|
||||
# action on our side, remove from here when we get unexpected passes on these tests
|
||||
# - sim: devp2p
|
||||
# limit: eth
|
||||
# include:
|
||||
@@ -176,9 +187,10 @@ jobs:
|
||||
- sim: ethereum/eels/consume-rlp
|
||||
limit: .*tests/paris.*
|
||||
needs:
|
||||
- prepare-reth
|
||||
- prepare-reth-stable
|
||||
- prepare-reth-edge
|
||||
- prepare-hive
|
||||
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
runs-on:
|
||||
group: Reth
|
||||
permissions:
|
||||
@@ -197,7 +209,7 @@ jobs:
|
||||
- name: Download reth image
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: artifacts
|
||||
name: reth-${{ matrix.storage }}
|
||||
path: /tmp
|
||||
|
||||
- name: Load Docker images
|
||||
|
||||
7
.github/workflows/prepare-reth.yml
vendored
7
.github/workflows/prepare-reth.yml
vendored
@@ -21,6 +21,11 @@ on:
|
||||
required: false
|
||||
type: string
|
||||
description: "Optional cargo package path"
|
||||
artifact_name:
|
||||
required: false
|
||||
type: string
|
||||
default: "artifacts"
|
||||
description: "Name for the uploaded artifact"
|
||||
|
||||
jobs:
|
||||
prepare-reth:
|
||||
@@ -52,5 +57,5 @@ jobs:
|
||||
id: upload
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: artifacts
|
||||
name: ${{ inputs.artifact_name }}
|
||||
path: ./artifacts
|
||||
|
||||
27
.github/workflows/unit.yml
vendored
27
.github/workflows/unit.yml
vendored
@@ -19,29 +19,22 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
|
||||
strategy:
|
||||
matrix:
|
||||
type: [ethereum, optimism]
|
||||
storage: [stable, edge]
|
||||
include:
|
||||
- type: ethereum
|
||||
args: --features "asm-keccak ethereum" --locked
|
||||
partition: 1
|
||||
total_partitions: 2
|
||||
- type: ethereum
|
||||
args: --features "asm-keccak ethereum" --locked
|
||||
partition: 2
|
||||
total_partitions: 2
|
||||
features: asm-keccak ethereum
|
||||
exclude_args: ""
|
||||
- type: optimism
|
||||
args: --features "asm-keccak" --locked --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
|
||||
partition: 1
|
||||
total_partitions: 2
|
||||
- type: optimism
|
||||
args: --features "asm-keccak" --locked --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
|
||||
partition: 2
|
||||
total_partitions: 2
|
||||
features: asm-keccak
|
||||
exclude_args: --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -59,9 +52,9 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
${{ matrix.args }} --workspace \
|
||||
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
|
||||
${{ matrix.exclude_args }} --workspace \
|
||||
--exclude ef-tests --no-tests=warn \
|
||||
--partition hash:${{ matrix.partition }}/2 \
|
||||
-E "!kind(test) and not binary(e2e_testsuite)"
|
||||
|
||||
state:
|
||||
|
||||
@@ -18,7 +18,7 @@ Reth is a high-performance Ethereum execution client written in Rust, focusing o
|
||||
6. **Pipeline (`crates/stages/`)**: Staged sync architecture for blockchain synchronization
|
||||
7. **Trie (`crates/trie/`)**: Merkle Patricia Trie implementation with parallel state root computation
|
||||
8. **Node Builder (`crates/node/`)**: High-level node orchestration and configuration
|
||||
9 **The Consensus Engine (`crates/engine/`)**: Handles processing blocks received from the consensus layer with the Engine API (newPayload, forkchoiceUpdated)
|
||||
9. **The Consensus Engine (`crates/engine/`)**: Handles processing blocks received from the consensus layer with the Engine API (newPayload, forkchoiceUpdated)
|
||||
|
||||
### Key Design Principles
|
||||
|
||||
|
||||
@@ -51,9 +51,7 @@ elsewhere.
|
||||
<!-- - **Asking in the support Telegram:** The [Foundry Support Telegram][support-tg] is a fast and easy way to ask questions. -->
|
||||
<!-- - **Opening a discussion:** This repository comes with a discussions board where you can also ask for help. Click the "Discussions" tab at the top. -->
|
||||
|
||||
If you have reviewed existing documentation and still have questions, or you are having problems, you can get help by *
|
||||
*opening a discussion**. This repository comes with a discussions board where you can also ask for help. Click the "
|
||||
Discussions" tab at the top.
|
||||
If you have reviewed existing documentation and still have questions, or you are having problems, you can get help by **opening a discussion**. This repository comes with a discussions board where you can also ask for help. Click the "Discussions" tab at the top.
|
||||
|
||||
As Reth is still in heavy development, the documentation can be a bit scattered. The [Reth Docs][reth-docs] is our
|
||||
current best-effort attempt at keeping up-to-date information.
|
||||
|
||||
98
Cargo.lock
generated
98
Cargo.lock
generated
@@ -255,7 +255,6 @@ checksum = "926b2c0d34e641cf8b17bf54ce50fda16715b9f68ad878fa6128bae410c6f890"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"borsh",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -458,7 +457,6 @@ dependencies = [
|
||||
"proptest-derive 0.6.0",
|
||||
"rand 0.9.2",
|
||||
"rapidhash",
|
||||
"rayon",
|
||||
"ruint",
|
||||
"rustc-hash",
|
||||
"serde",
|
||||
@@ -4565,7 +4563,6 @@ dependencies = [
|
||||
"allocator-api2",
|
||||
"equivalent",
|
||||
"foldhash 0.2.0",
|
||||
"rayon",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
@@ -5069,7 +5066,6 @@ dependencies = [
|
||||
"arbitrary",
|
||||
"equivalent",
|
||||
"hashbrown 0.16.1",
|
||||
"rayon",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
@@ -5255,6 +5251,23 @@ version = "1.0.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc_pprof"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74ff642505c7ce8d31c0d43ec0e235c6fd4585d9b8172d8f9dd04d36590200b5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
"mappings",
|
||||
"once_cell",
|
||||
"pprof_util",
|
||||
"tempfile",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jni"
|
||||
version = "0.21.1"
|
||||
@@ -5785,6 +5798,19 @@ dependencies = [
|
||||
"syn 2.0.113",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mappings"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db4d277bb50d4508057e7bddd7fcd19ef4a4cc38051b6a5a36868d75ae2cbeb9"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"pprof_util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match-lookup"
|
||||
version = "0.1.1"
|
||||
@@ -6525,7 +6551,7 @@ dependencies = [
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
"opentelemetry_sdk",
|
||||
"prost",
|
||||
"prost 0.14.1",
|
||||
"reqwest",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
@@ -6541,7 +6567,7 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"prost",
|
||||
"prost 0.14.1",
|
||||
"tonic",
|
||||
"tonic-prost",
|
||||
]
|
||||
@@ -6882,6 +6908,20 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof_util"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4429d44e5e2c8a69399fc0070379201eed018e3df61e04eb7432811df073c224"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"flate2",
|
||||
"num",
|
||||
"paste",
|
||||
"prost 0.13.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.21"
|
||||
@@ -7068,6 +7108,16 @@ dependencies = [
|
||||
"syn 2.0.113",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive 0.13.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.14.1"
|
||||
@@ -7075,7 +7125,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
"prost-derive 0.14.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.14.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.113",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7625,6 +7688,7 @@ version = "1.9.3"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"alloy-json-rpc",
|
||||
"alloy-network",
|
||||
"alloy-primitives",
|
||||
"alloy-provider",
|
||||
"alloy-pubsub",
|
||||
@@ -7646,6 +7710,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"reth-cli-runner",
|
||||
"reth-cli-util",
|
||||
"reth-engine-primitives",
|
||||
"reth-fs-util",
|
||||
"reth-node-api",
|
||||
"reth-node-core",
|
||||
@@ -7657,6 +7722,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tower",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7665,7 +7731,9 @@ version = "1.9.3"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-provider",
|
||||
"alloy-rpc-client",
|
||||
"alloy-rpc-types-eth",
|
||||
"alloy-transport-ws",
|
||||
"chrono",
|
||||
"clap",
|
||||
"csv",
|
||||
@@ -8270,7 +8338,6 @@ dependencies = [
|
||||
"reth-network-peers",
|
||||
"secp256k1 0.30.0",
|
||||
"sha2",
|
||||
"sha3",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -8410,7 +8477,6 @@ dependencies = [
|
||||
"reth-stages",
|
||||
"reth-stages-api",
|
||||
"reth-static-file",
|
||||
"reth-storage-errors",
|
||||
"reth-tasks",
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
@@ -8885,6 +8951,7 @@ dependencies = [
|
||||
"reth-tasks",
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
"reth-trie-common",
|
||||
"rmp-serde",
|
||||
"secp256k1 0.30.0",
|
||||
"tempfile",
|
||||
@@ -9481,18 +9548,25 @@ dependencies = [
|
||||
name = "reth-node-metrics"
|
||||
version = "1.9.3"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"eyre",
|
||||
"http",
|
||||
"http-body-util",
|
||||
"jemalloc_pprof",
|
||||
"jsonrpsee-server",
|
||||
"mappings",
|
||||
"metrics",
|
||||
"metrics-exporter-prometheus",
|
||||
"metrics-process",
|
||||
"metrics-util",
|
||||
"pprof_util",
|
||||
"procfs 0.17.0",
|
||||
"reqwest",
|
||||
"reth-fs-util",
|
||||
"reth-metrics",
|
||||
"reth-tasks",
|
||||
"socket2 0.5.10",
|
||||
"tempfile",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tokio",
|
||||
"tower",
|
||||
@@ -10280,6 +10354,7 @@ version = "1.9.3"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-dyn-abi",
|
||||
"alloy-eip7928",
|
||||
"alloy-eips",
|
||||
"alloy-evm",
|
||||
"alloy-genesis",
|
||||
@@ -10362,6 +10437,7 @@ dependencies = [
|
||||
name = "reth-rpc-api"
|
||||
version = "1.9.3"
|
||||
dependencies = [
|
||||
"alloy-eip7928",
|
||||
"alloy-eips",
|
||||
"alloy-genesis",
|
||||
"alloy-json-rpc",
|
||||
@@ -11109,6 +11185,8 @@ dependencies = [
|
||||
"reth-execution-errors",
|
||||
"reth-primitives-traits",
|
||||
"reth-provider",
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-trie",
|
||||
"reth-trie-common",
|
||||
"revm",
|
||||
@@ -13028,7 +13106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost",
|
||||
"prost 0.14.1",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
|
||||
@@ -487,7 +487,7 @@ revm-inspectors = "0.33.2"
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-dyn-abi = "1.4.1"
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.1.0" }
|
||||
alloy-eip7928 = { version = "0.1.0", default-features = false }
|
||||
alloy-evm = { version = "0.25.1", default-features = false }
|
||||
alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
@@ -684,6 +684,7 @@ ethereum_ssz = "0.9.0"
|
||||
ethereum_ssz_derive = "0.9.0"
|
||||
|
||||
# allocators
|
||||
jemalloc_pprof = { version = "0.8", default-features = false }
|
||||
tikv-jemalloc-ctl = "0.6"
|
||||
tikv-jemallocator = "0.6"
|
||||
tracy-client = "0.18.0"
|
||||
|
||||
10
Makefile
10
Makefile
@@ -276,6 +276,11 @@ docker-build-push-latest: ## Build and push a cross-arch Docker image tagged wit
|
||||
docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
|
||||
$(call docker_build_push,nightly,nightly)
|
||||
|
||||
.PHONY: docker-build-push-nightly-edge-profiling
|
||||
docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
|
||||
docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
|
||||
$(call docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
|
||||
|
||||
# Create a cross-arch Docker image with the given tags and push it
|
||||
define docker_build_push
|
||||
$(MAKE) build-x86_64-unknown-linux-gnu
|
||||
@@ -328,6 +333,11 @@ op-docker-build-push-latest: ## Build and push a cross-arch Docker image tagged
|
||||
op-docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
|
||||
$(call op_docker_build_push,nightly,nightly)
|
||||
|
||||
.PHONY: op-docker-build-push-nightly-edge-profiling
|
||||
op-docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
|
||||
op-docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
|
||||
$(call op_docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
|
||||
|
||||
# Note: This requires a buildx builder with emulation support. For example:
|
||||
#
|
||||
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
|
||||
|
||||
@@ -44,7 +44,7 @@ More historical context below:
|
||||
- We released 1.0 "production-ready" stable Reth in June 2024.
|
||||
- Reth completed an audit with [Sigma Prime](https://sigmaprime.io/), the developers of [Lighthouse](https://github.com/sigp/lighthouse), the Rust Consensus Layer implementation. Find it [here](./audit/sigma_prime_audit_v2.pdf).
|
||||
- Revm (the EVM used in Reth) underwent an audit with [Guido Vranken](https://x.com/guidovranken) (#1 [Ethereum Bug Bounty](https://ethereum.org/en/bug-bounty)). We will publish the results soon.
|
||||
- We released multiple iterative beta versions, up to [beta.9](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.9) on Monday June 3, 2024,the last beta release.
|
||||
- We released multiple iterative beta versions, up to [beta.9](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.9) on Monday June 3, 2024, the last beta release.
|
||||
- We released [beta](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.1) on Monday March 4, 2024, our first breaking change to the database model, providing faster query speed, smaller database footprint, and allowing "history" to be mounted on separate drives.
|
||||
- We shipped iterative improvements until the last alpha release on February 28, 2024, [0.1.0-alpha.21](https://github.com/paradigmxyz/reth/releases/tag/v0.1.0-alpha.21).
|
||||
- We [initially announced](https://www.paradigm.xyz/2023/06/reth-alpha) [0.1.0-alpha.1](https://github.com/paradigmxyz/reth/releases/tag/v0.1.0-alpha.1) on June 20, 2023.
|
||||
|
||||
@@ -25,7 +25,9 @@ reth-chainspec.workspace = true
|
||||
|
||||
# alloy
|
||||
alloy-provider = { workspace = true, features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
|
||||
alloy-rpc-types-eth.workspace = true
|
||||
alloy-transport-ws.workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
|
||||
# CLI and argument parsing
|
||||
|
||||
50
bin/reth-bench-compare/README.md
Normal file
50
bin/reth-bench-compare/README.md
Normal file
@@ -0,0 +1,50 @@
|
||||
# reth-bench-compare
|
||||
|
||||
Compare reth performance between two git references.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
reth-bench-compare \
|
||||
--baseline-ref main \
|
||||
--feature-ref my-feature \
|
||||
--blocks 100 \
|
||||
--wait-for-persistence
|
||||
```
|
||||
|
||||
## Arguments
|
||||
|
||||
| Argument | Description | Default | Required |
|
||||
|----------|-------------|---------|----------|
|
||||
| `--baseline-ref <REF>` | Git reference for baseline | - | Yes |
|
||||
| `--feature-ref <REF>` | Git reference to compare | - | Yes |
|
||||
| `--blocks <N>` | Number of blocks to benchmark | `100` | No |
|
||||
| `--chain <CHAIN>` | Chain to benchmark | `mainnet` | No |
|
||||
| `--datadir <PATH>` | Data directory path | OS-specific | No |
|
||||
| `--rpc-url <URL>` | RPC endpoint for block data | Chain default | No |
|
||||
| `--output-dir <PATH>` | Output directory | `./reth-bench-compare` | No |
|
||||
| `--wait-for-persistence` | Wait for block persistence | `false` | No |
|
||||
| `--persistence-threshold <N>` | Wait after every N+1 blocks | `2` | No |
|
||||
| `--wait-time <DURATION>` | Fixed delay (legacy) | - | No |
|
||||
| `--warmup-blocks <N>` | Cache warmup blocks | Same as `--blocks` | No |
|
||||
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
|
||||
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
|
||||
| `-vvvv` | Debug logging | Info | No |
|
||||
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
|
||||
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
|
||||
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
|
||||
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |
|
||||
| `--baseline-rustflags <FLAGS>` | RUSTFLAGS for baseline only | Inherits `--rustflags` | No |
|
||||
| `--feature-rustflags <FLAGS>` | RUSTFLAGS for feature only | Inherits `--rustflags` | No |
|
||||
| `--baseline-args <ARGS>` | Extra args for baseline node | - | No |
|
||||
| `--feature-args <ARGS>` | Extra args for feature node | - | No |
|
||||
| `--metrics-port <PORT>` | Metrics endpoint port | `5005` | No |
|
||||
| `--sudo` | Run with elevated privileges | `false` | No |
|
||||
|
||||
## Output
|
||||
|
||||
Results in `./reth-bench-compare/results/<timestamp>/`:
|
||||
- `comparison_report.json` - Metrics comparison
|
||||
- `per_block_comparison.csv` - Per-block statistics
|
||||
- `baseline/` and `feature/` - Individual run results
|
||||
- `latency_comparison.png` - Chart (if `--draw` used)
|
||||
@@ -18,6 +18,8 @@ pub(crate) struct BenchmarkRunner {
|
||||
rpc_url: String,
|
||||
jwt_secret: String,
|
||||
wait_time: Option<String>,
|
||||
wait_for_persistence: bool,
|
||||
persistence_threshold: Option<u64>,
|
||||
warmup_blocks: u64,
|
||||
}
|
||||
|
||||
@@ -28,6 +30,8 @@ impl BenchmarkRunner {
|
||||
rpc_url: args.get_rpc_url(),
|
||||
jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
|
||||
wait_time: args.wait_time.clone(),
|
||||
wait_for_persistence: args.wait_for_persistence,
|
||||
persistence_threshold: args.persistence_threshold,
|
||||
warmup_blocks: args.get_warmup_blocks(),
|
||||
}
|
||||
}
|
||||
@@ -96,13 +100,9 @@ impl BenchmarkRunner {
|
||||
&from_block.to_string(),
|
||||
"--to",
|
||||
&to_block.to_string(),
|
||||
"--wait-time=0ms", // Warmup should avoid persistence waits.
|
||||
]);
|
||||
|
||||
// Add wait-time argument if provided
|
||||
if let Some(ref wait_time) = self.wait_time {
|
||||
cmd.args(["--wait-time", wait_time]);
|
||||
}
|
||||
|
||||
cmd.env("RUST_LOG_STYLE", "never")
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
@@ -186,9 +186,16 @@ impl BenchmarkRunner {
|
||||
&output_dir.to_string_lossy(),
|
||||
]);
|
||||
|
||||
// Add wait-time argument if provided
|
||||
// Configure wait mode: wait-time takes precedence over persistence-based flow
|
||||
if let Some(ref wait_time) = self.wait_time {
|
||||
cmd.args(["--wait-time", wait_time]);
|
||||
} else if self.wait_for_persistence {
|
||||
cmd.arg("--wait-for-persistence");
|
||||
|
||||
// Add persistence threshold if specified
|
||||
if let Some(threshold) = self.persistence_threshold {
|
||||
cmd.args(["--persistence-threshold", &threshold.to_string()]);
|
||||
}
|
||||
}
|
||||
|
||||
cmd.env("RUST_LOG_STYLE", "never")
|
||||
|
||||
@@ -114,10 +114,29 @@ pub(crate) struct Args {
|
||||
#[arg(long)]
|
||||
pub profile: bool,
|
||||
|
||||
/// Wait time between engine API calls (passed to reth-bench)
|
||||
#[arg(long, value_name = "DURATION")]
|
||||
/// Optional fixed delay between engine API calls (passed to reth-bench).
|
||||
///
|
||||
/// When set, reth-bench uses wait-time mode and disables persistence-based flow.
|
||||
/// This flag remains for compatibility with older scripts.
|
||||
#[arg(long, value_name = "DURATION", hide = true)]
|
||||
pub wait_time: Option<String>,
|
||||
|
||||
/// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
|
||||
///
|
||||
/// When enabled, waits for every Nth block to be persisted using the
|
||||
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
|
||||
/// doesn't outpace persistence.
|
||||
#[arg(long)]
|
||||
pub wait_for_persistence: bool,
|
||||
|
||||
/// Engine persistence threshold (passed to reth-bench).
|
||||
///
|
||||
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
|
||||
/// matches the engine's default persistence threshold (2), so waits occur
|
||||
/// at blocks 3, 6, 9, etc.
|
||||
#[arg(long, value_name = "PERSISTENCE_THRESHOLD")]
|
||||
pub persistence_threshold: Option<u64>,
|
||||
|
||||
/// Number of blocks to run for cache warmup after clearing caches.
|
||||
/// If not specified, defaults to the same as --blocks
|
||||
#[arg(long, value_name = "N")]
|
||||
@@ -512,6 +531,7 @@ async fn run_compilation_phase(
|
||||
Ok((baseline_commit, feature_commit))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Run warmup phase to warm up caches before benchmarking
|
||||
async fn run_warmup_phase(
|
||||
git_manager: &GitManager,
|
||||
@@ -521,9 +541,15 @@ async fn run_warmup_phase(
|
||||
args: &Args,
|
||||
is_optimism: bool,
|
||||
baseline_commit: &str,
|
||||
starting_tip: u64,
|
||||
) -> Result<()> {
|
||||
info!("=== Running warmup phase ===");
|
||||
|
||||
// Unwind to starting block minus warmup blocks, so we end up back at starting_tip
|
||||
let warmup_blocks = args.get_warmup_blocks();
|
||||
let unwind_target = starting_tip.saturating_sub(warmup_blocks);
|
||||
node_manager.unwind_to_block(unwind_target).await?;
|
||||
|
||||
// Use baseline for warmup
|
||||
let warmup_ref = &args.baseline_ref;
|
||||
|
||||
@@ -552,12 +578,9 @@ async fn run_warmup_phase(
|
||||
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
|
||||
|
||||
// Wait for node to be ready and get its current tip
|
||||
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
|
||||
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
|
||||
info!("Warmup node is ready at tip: {}", current_tip);
|
||||
|
||||
// Store the tip we'll unwind back to
|
||||
let original_tip = current_tip;
|
||||
|
||||
// Clear filesystem caches before warmup run only (unless disabled)
|
||||
if args.no_clear_cache {
|
||||
info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
|
||||
@@ -568,12 +591,9 @@ async fn run_warmup_phase(
|
||||
// Run warmup to warm up caches
|
||||
benchmark_runner.run_warmup(current_tip).await?;
|
||||
|
||||
// Stop node before unwinding (node must be stopped to release database lock)
|
||||
// Stop node after warmup
|
||||
node_manager.stop_node(&mut node_process).await?;
|
||||
|
||||
// Unwind back to starting block after warmup
|
||||
node_manager.unwind_to_block(original_tip).await?;
|
||||
|
||||
info!("Warmup phase completed");
|
||||
Ok(())
|
||||
}
|
||||
@@ -595,6 +615,27 @@ async fn run_benchmark_workflow(
|
||||
let (baseline_commit, feature_commit) =
|
||||
run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
|
||||
|
||||
// Switch to baseline reference and get the starting tip
|
||||
git_manager.switch_ref(&args.baseline_ref)?;
|
||||
let binary_path =
|
||||
compilation_manager.get_cached_binary_path_for_commit(&baseline_commit, is_optimism);
|
||||
if !binary_path.exists() {
|
||||
return Err(eyre!(
|
||||
"Cached baseline binary not found at {:?}. Compilation phase should have created it.",
|
||||
binary_path
|
||||
));
|
||||
}
|
||||
|
||||
// Start node briefly to get the current tip, then stop it
|
||||
info!("=== Determining initial block height ===");
|
||||
let additional_args = args.build_additional_args("baseline", args.baseline_args.as_ref());
|
||||
let (mut node_process, _) = node_manager
|
||||
.start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
|
||||
.await?;
|
||||
let starting_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
|
||||
info!("Node starting tip: {}", starting_tip);
|
||||
node_manager.stop_node(&mut node_process).await?;
|
||||
|
||||
// Run warmup phase before benchmarking (skip if warmup_blocks is 0)
|
||||
if args.get_warmup_blocks() > 0 {
|
||||
run_warmup_phase(
|
||||
@@ -605,6 +646,7 @@ async fn run_benchmark_workflow(
|
||||
args,
|
||||
is_optimism,
|
||||
&baseline_commit,
|
||||
starting_tip,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -620,6 +662,10 @@ async fn run_benchmark_workflow(
|
||||
let commit = commits[i];
|
||||
info!("=== Processing {} reference: {} ===", ref_type, git_ref);
|
||||
|
||||
// Unwind to starting block minus benchmark blocks, so we end up back at starting_tip
|
||||
let unwind_target = starting_tip.saturating_sub(args.blocks);
|
||||
node_manager.unwind_to_block(unwind_target).await?;
|
||||
|
||||
// Switch to target reference
|
||||
git_manager.switch_ref(git_ref)?;
|
||||
|
||||
@@ -653,17 +699,14 @@ async fn run_benchmark_workflow(
|
||||
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
|
||||
|
||||
// Wait for node to be ready and get its current tip (wherever it is)
|
||||
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
|
||||
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
|
||||
info!("Node is ready at tip: {}", current_tip);
|
||||
|
||||
// Store the tip we'll unwind back to
|
||||
let original_tip = current_tip;
|
||||
|
||||
// Calculate benchmark range
|
||||
// Note: reth-bench has an off-by-one error where it consumes the first block
|
||||
// of the range, so we add 1 to compensate and get exactly args.blocks blocks
|
||||
let from_block = original_tip;
|
||||
let to_block = original_tip + args.blocks;
|
||||
let from_block = current_tip;
|
||||
let to_block = current_tip + args.blocks;
|
||||
|
||||
// Run benchmark
|
||||
let output_dir = comparison_generator.get_ref_output_dir(ref_type);
|
||||
@@ -680,9 +723,6 @@ async fn run_benchmark_workflow(
|
||||
// Stop node
|
||||
node_manager.stop_node(&mut node_process).await?;
|
||||
|
||||
// Unwind back to original tip
|
||||
node_manager.unwind_to_block(original_tip).await?;
|
||||
|
||||
// Store results for comparison
|
||||
comparison_generator.add_ref_results(ref_type, &output_dir)?;
|
||||
|
||||
|
||||
@@ -99,6 +99,7 @@ pub(crate) struct RefInfo {
|
||||
/// Summary of the comparison between references.
|
||||
///
|
||||
/// Percent deltas are `(feature - baseline) / baseline * 100`:
|
||||
/// - `new_payload_latency_mean_change_percent`: percent changes of the per-block means.
|
||||
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
|
||||
/// per-block percentiles.
|
||||
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
|
||||
@@ -116,6 +117,7 @@ pub(crate) struct ComparisonSummary {
|
||||
pub per_block_latency_change_median_percent: f64,
|
||||
pub per_block_latency_change_std_dev_percent: f64,
|
||||
pub new_payload_total_latency_change_percent: f64,
|
||||
pub new_payload_latency_mean_change_percent: f64,
|
||||
pub new_payload_latency_p50_change_percent: f64,
|
||||
pub new_payload_latency_p90_change_percent: f64,
|
||||
pub new_payload_latency_p99_change_percent: f64,
|
||||
@@ -445,6 +447,10 @@ impl ComparisonGenerator {
|
||||
per_block_latency_change_median_percent,
|
||||
per_block_latency_change_std_dev_percent,
|
||||
new_payload_total_latency_change_percent,
|
||||
new_payload_latency_mean_change_percent: calc_percent_change(
|
||||
baseline.mean_new_payload_latency_ms,
|
||||
feature.mean_new_payload_latency_ms,
|
||||
),
|
||||
new_payload_latency_p50_change_percent: calc_percent_change(
|
||||
baseline.median_new_payload_latency_ms,
|
||||
feature.median_new_payload_latency_ms,
|
||||
@@ -575,6 +581,10 @@ impl ComparisonGenerator {
|
||||
" Total newPayload time change: {:+.2}%",
|
||||
summary.new_payload_total_latency_change_percent
|
||||
);
|
||||
println!(
|
||||
" NewPayload Latency mean: {:+.2}%",
|
||||
summary.new_payload_latency_mean_change_percent
|
||||
);
|
||||
println!(
|
||||
" NewPayload Latency p50: {:+.2}%",
|
||||
summary.new_payload_latency_p50_change_percent
|
||||
|
||||
@@ -121,8 +121,7 @@ impl CompilationManager {
|
||||
cmd.env("RUSTFLAGS", rustflags);
|
||||
info!("Using RUSTFLAGS: {rustflags}");
|
||||
|
||||
// Debug log the command
|
||||
debug!("Executing cargo command: {:?}", cmd);
|
||||
info!("Compiling {binary_name} with {cmd:?}");
|
||||
|
||||
let output = cmd.output().wrap_err("Failed to execute cargo build command")?;
|
||||
|
||||
@@ -231,8 +230,7 @@ impl CompilationManager {
|
||||
let mut cmd = Command::new("cargo");
|
||||
cmd.args(["install", "--locked", "samply"]);
|
||||
|
||||
// Debug log the command
|
||||
debug!("Executing cargo command: {:?}", cmd);
|
||||
info!("Installing samply with {cmd:?}");
|
||||
|
||||
let output = cmd.output().wrap_err("Failed to execute cargo install samply command")?;
|
||||
|
||||
@@ -307,8 +305,7 @@ impl CompilationManager {
|
||||
let mut cmd = Command::new("make");
|
||||
cmd.arg("install-reth-bench").current_dir(&self.repo_root);
|
||||
|
||||
// Debug log the command
|
||||
debug!("Executing make command: {:?}", cmd);
|
||||
info!("Compiling reth-bench with {cmd:?}");
|
||||
|
||||
let output = cmd.output().wrap_err("Failed to execute make install-reth-bench command")?;
|
||||
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
|
||||
use crate::cli::Args;
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_rpc_types_eth::SyncStatus;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use eyre::{eyre, OptionExt, Result, WrapErr};
|
||||
#[cfg(unix)]
|
||||
use nix::sys::signal::{killpg, Signal};
|
||||
@@ -18,6 +20,9 @@ use tokio::{
|
||||
};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Default websocket RPC port used by reth
|
||||
const DEFAULT_WS_RPC_PORT: u16 = 8546;
|
||||
|
||||
/// Manages reth node lifecycle and operations
|
||||
pub(crate) struct NodeManager {
|
||||
datadir: Option<String>,
|
||||
@@ -152,7 +157,10 @@ impl NodeManager {
|
||||
metrics_arg,
|
||||
"--http".to_string(),
|
||||
"--http.api".to_string(),
|
||||
"eth".to_string(),
|
||||
"eth,reth".to_string(),
|
||||
"--ws".to_string(),
|
||||
"--ws.api".to_string(),
|
||||
"eth,reth".to_string(),
|
||||
"--disable-discovery".to_string(),
|
||||
"--trusted-only".to_string(),
|
||||
]);
|
||||
@@ -359,8 +367,13 @@ impl NodeManager {
|
||||
Ok((child, reth_command))
|
||||
}
|
||||
|
||||
/// Wait for the node to be ready and return its current tip
|
||||
pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
|
||||
/// Wait for the node to be ready and return its current tip.
|
||||
///
|
||||
/// Fails early if the node process exits before becoming ready.
|
||||
pub(crate) async fn wait_for_node_ready_and_get_tip(
|
||||
&self,
|
||||
child: &mut tokio::process::Child,
|
||||
) -> Result<u64> {
|
||||
info!("Waiting for node to be ready and synced...");
|
||||
|
||||
let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
|
||||
@@ -371,8 +384,23 @@ impl NodeManager {
|
||||
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
|
||||
let provider = ProviderBuilder::new().connect_http(url);
|
||||
|
||||
let start_time = tokio::time::Instant::now();
|
||||
let mut iteration = 0;
|
||||
|
||||
timeout(max_wait, async {
|
||||
loop {
|
||||
iteration += 1;
|
||||
debug!(
|
||||
"Readiness check iteration {} (elapsed: {:?})",
|
||||
iteration,
|
||||
start_time.elapsed()
|
||||
);
|
||||
|
||||
// Check if the node process has exited.
|
||||
if let Some(status) = child.try_wait()? {
|
||||
return Err(eyre!("Node process exited unexpectedly with {status}"));
|
||||
}
|
||||
|
||||
// First check if RPC is up and node is not syncing
|
||||
match provider.syncing().await {
|
||||
Ok(sync_result) => {
|
||||
@@ -381,24 +409,48 @@ impl NodeManager {
|
||||
debug!("Node is still syncing {sync_info:?}, waiting...");
|
||||
}
|
||||
_ => {
|
||||
debug!("HTTP RPC is up and node is not syncing, checking block number...");
|
||||
// Node is not syncing, now get the tip
|
||||
match provider.get_block_number().await {
|
||||
Ok(tip) => {
|
||||
info!("Node is ready and not syncing at block: {}", tip);
|
||||
return Ok(tip);
|
||||
debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
|
||||
|
||||
// Verify WebSocket RPC is ready (public endpoint, no JWT required)
|
||||
let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
|
||||
debug!("Attempting WebSocket connection to {} (public endpoint)", ws_url);
|
||||
let ws_connect = WsConnect::new(&ws_url);
|
||||
|
||||
match RpcClient::connect_pubsub(ws_connect).await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Node is ready (HTTP and WebSocket) at block: {} (took {:?}, {} iterations)",
|
||||
tip, start_time.elapsed(), iteration
|
||||
);
|
||||
return Ok(tip);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
|
||||
iteration, e
|
||||
);
|
||||
debug!("WebSocket error details: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to get block number: {}", e);
|
||||
debug!("Failed to get block number (iteration {}): {:?}", iteration, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Node RPC not ready yet or failed to check sync status: {}", e);
|
||||
debug!("Node RPC not ready yet or failed to check sync status (iteration {}): {:?}", iteration, e);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Sleeping for {:?} before next check", check_interval);
|
||||
sleep(check_interval).await;
|
||||
}
|
||||
})
|
||||
|
||||
@@ -16,6 +16,7 @@ workspace = true
|
||||
# reth
|
||||
reth-cli-runner.workspace = true
|
||||
reth-cli-util.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-fs-util.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-node-core.workspace = true
|
||||
@@ -25,10 +26,11 @@ reth-tracing.workspace = true
|
||||
# alloy
|
||||
alloy-eips.workspace = true
|
||||
alloy-json-rpc.workspace = true
|
||||
alloy-network.workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false }
|
||||
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
|
||||
alloy-pubsub.workspace = true
|
||||
alloy-rpc-client.workspace = true
|
||||
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
|
||||
alloy-rpc-types-engine.workspace = true
|
||||
alloy-transport-http.workspace = true
|
||||
alloy-transport-ipc.workspace = true
|
||||
@@ -50,6 +52,9 @@ tracing.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
# url parsing
|
||||
url.workspace = true
|
||||
|
||||
# async
|
||||
async-trait.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -31,6 +31,14 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
|
||||
`reth-bench` contains different commands to benchmark different patterns of engine API calls.
|
||||
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
|
||||
|
||||
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
|
||||
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
|
||||
|
||||
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
|
||||
|
||||
By default, the WebSocket URL for persistence subscriptions is derived from `--engine-rpc-url` (converting to ws:// on port 8546). Use `--ws-rpc-url` to override this.
|
||||
|
||||
Below is an overview of how to run a benchmark:
|
||||
|
||||
### Setup
|
||||
|
||||
@@ -163,7 +163,7 @@ impl AuthenticatedTransport {
|
||||
|
||||
// shift the iat forward by one second so there is some buffer time
|
||||
let mut shifted_claims = inner_and_claims.1;
|
||||
shifted_claims.iat -= 1;
|
||||
shifted_claims.iat -= 30;
|
||||
|
||||
// if the claims are out of date, reset the inner transport
|
||||
if !shifted_claims.is_within_time_window() {
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
//! Runs the `reth bench` command, calling first newPayload for each block, then calling
|
||||
//! forkchoiceUpdated.
|
||||
//!
|
||||
//! Supports configurable waiting behavior:
|
||||
//! - **`--wait-time`**: Fixed sleep interval between blocks.
|
||||
//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
|
||||
//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
|
||||
//! threshold. This ensures the benchmark doesn't outpace persistence.
|
||||
//!
|
||||
//! Both options can be used together or independently.
|
||||
|
||||
use crate::{
|
||||
bench::{
|
||||
@@ -11,16 +19,26 @@ use crate::{
|
||||
},
|
||||
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
|
||||
};
|
||||
use alloy_provider::Provider;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
use alloy_pubsub::SubscriptionStream;
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use clap::Parser;
|
||||
use csv::Writer;
|
||||
use eyre::{Context, OptionExt};
|
||||
use futures::StreamExt;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// `reth benchmark new-payload-fcu` command
|
||||
#[derive(Debug, Parser)]
|
||||
@@ -30,8 +48,31 @@ pub struct Command {
|
||||
rpc_url: String,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, default_value = "250ms", verbatim_doc_comment)]
|
||||
wait_time: Duration,
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
/// Wait for blocks to be persisted before sending the next batch.
|
||||
///
|
||||
/// When enabled, waits for every Nth block to be persisted using the
|
||||
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
|
||||
/// doesn't outpace persistence.
|
||||
///
|
||||
/// The subscription uses the regular RPC websocket endpoint (no JWT required).
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
wait_for_persistence: bool,
|
||||
|
||||
/// Engine persistence threshold used for deciding when to wait for persistence.
|
||||
///
|
||||
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
|
||||
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
|
||||
/// at blocks 3, 6, 9, etc.
|
||||
#[arg(
|
||||
long = "persistence-threshold",
|
||||
value_name = "PERSISTENCE_THRESHOLD",
|
||||
default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
persistence_threshold: u64,
|
||||
|
||||
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
|
||||
/// endpoint.
|
||||
@@ -50,6 +91,32 @@ pub struct Command {
|
||||
impl Command {
|
||||
/// Execute `benchmark new-payload-fcu` command
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
// Log mode configuration
|
||||
if let Some(duration) = self.wait_time {
|
||||
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
info!(
|
||||
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
|
||||
self.persistence_threshold + 1,
|
||||
self.persistence_threshold
|
||||
);
|
||||
}
|
||||
|
||||
// Set up waiter based on configured options (duration takes precedence)
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let sub = self.setup_persistence_subscription().await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
))
|
||||
}
|
||||
(None, false) => None,
|
||||
};
|
||||
|
||||
let BenchContext {
|
||||
benchmark_mode,
|
||||
block_provider,
|
||||
@@ -110,7 +177,6 @@ impl Command {
|
||||
}
|
||||
});
|
||||
|
||||
// put results in a summary vec so they can be printed at the end
|
||||
let mut results = Vec::new();
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
@@ -121,14 +187,12 @@ impl Command {
|
||||
total_wait_time += wait_start.elapsed();
|
||||
result
|
||||
} {
|
||||
// just put gas used here
|
||||
let gas_used = block.header.gas_used;
|
||||
let block_number = block.header.number;
|
||||
let transaction_count = block.transactions.len() as u64;
|
||||
|
||||
debug!(target: "reth-bench", ?block_number, "Sending payload",);
|
||||
debug!(target: "reth-bench", ?block_number, "Sending payload");
|
||||
|
||||
// construct fcu to call
|
||||
let forkchoice_state = ForkchoiceState {
|
||||
head_block_hash: head,
|
||||
safe_block_hash: safe,
|
||||
@@ -143,7 +207,6 @@ impl Command {
|
||||
|
||||
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
|
||||
|
||||
// calculate the total duration and the fcu latency, record
|
||||
let total_latency = start.elapsed();
|
||||
let fcu_latency = total_latency - new_payload_result.latency;
|
||||
let combined_result = CombinedResult {
|
||||
@@ -154,17 +217,15 @@ impl Command {
|
||||
total_latency,
|
||||
};
|
||||
|
||||
// current duration since the start of the benchmark minus the time
|
||||
// waiting for blocks
|
||||
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
|
||||
// We want to measure engine throughput, not RPC fetch latency.
|
||||
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
|
||||
|
||||
// convert gas used to gigagas, then compute gigagas per second
|
||||
info!(%combined_result);
|
||||
|
||||
// wait before sending the next payload
|
||||
tokio::time::sleep(self.wait_time).await;
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
}
|
||||
|
||||
// record the current result
|
||||
let gas_row =
|
||||
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
|
||||
results.push((gas_row, combined_result));
|
||||
@@ -175,24 +236,26 @@ impl Command {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
// Drop waiter - we don't need to wait for final blocks to persist
|
||||
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
|
||||
drop(waiter);
|
||||
|
||||
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
|
||||
results.into_iter().unzip();
|
||||
|
||||
// write the csv output to files
|
||||
if let Some(path) = self.benchmark.output {
|
||||
// first write the combined results to a file
|
||||
// Write CSV output files
|
||||
if let Some(ref path) = self.benchmark.output {
|
||||
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
|
||||
info!("Writing engine api call latency output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(output_path)?;
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
for result in combined_results {
|
||||
writer.serialize(result)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
|
||||
// now write the gas output to a file
|
||||
let output_path = path.join(GAS_OUTPUT_SUFFIX);
|
||||
info!("Writing total gas output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(output_path)?;
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
for row in &gas_output_results {
|
||||
writer.serialize(row)?;
|
||||
}
|
||||
@@ -201,8 +264,8 @@ impl Command {
|
||||
info!("Finished writing benchmark output files to {:?}.", path);
|
||||
}
|
||||
|
||||
// accumulate the results and calculate the overall Ggas/s
|
||||
let gas_output = TotalGasOutput::new(gas_output_results)?;
|
||||
|
||||
info!(
|
||||
total_duration=?gas_output.total_duration,
|
||||
total_gas_used=?gas_output.total_gas_used,
|
||||
@@ -213,4 +276,278 @@ impl Command {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the websocket RPC URL used for the persistence subscription.
|
||||
///
|
||||
/// Preference:
|
||||
/// - If `--ws-rpc-url` is provided, use it directly.
|
||||
/// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
|
||||
///
|
||||
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
|
||||
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
|
||||
/// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme
|
||||
/// (http→ws, https→wss) and force the port to 8546.
|
||||
fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
|
||||
if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
|
||||
let parsed: Url = ws_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
|
||||
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
|
||||
Ok(parsed)
|
||||
} else {
|
||||
let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
engine_url = %self.benchmark.engine_rpc_url,
|
||||
%derived,
|
||||
"Derived WebSocket RPC URL from engine RPC URL"
|
||||
);
|
||||
Ok(derived)
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
|
||||
let ws_url = self.derive_ws_rpc_url()?;
|
||||
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
let provider: RootProvider<Ethereum> = RootProvider::new(client);
|
||||
|
||||
let subscription = provider
|
||||
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an engine API URL to the default RPC websocket URL.
|
||||
///
|
||||
/// Transformations:
|
||||
/// - `http` → `ws`
|
||||
/// - `https` → `wss`
|
||||
/// - `ws` / `wss` keep their scheme
|
||||
/// - Port is always set to `8546`, reth's default RPC websocket port.
|
||||
///
|
||||
/// This is used when we only know the engine API URL (typically `:8551`) but
|
||||
/// need to connect to the node's WS RPC endpoint for persistence events.
|
||||
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
|
||||
let url: Url = engine_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
|
||||
|
||||
let mut ws_url = url.clone();
|
||||
|
||||
match ws_url.scheme() {
|
||||
"http" => ws_url
|
||||
.set_scheme("ws")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
|
||||
"https" => ws_url
|
||||
.set_scheme("wss")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
|
||||
"ws" | "wss" => {}
|
||||
scheme => {
|
||||
return Err(eyre::eyre!(
|
||||
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
|
||||
|
||||
Ok(ws_url)
|
||||
}
|
||||
|
||||
/// Waits until the persistence subscription reports that `target` has been persisted.
|
||||
///
|
||||
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
|
||||
/// - the subscription stream ends unexpectedly, or
|
||||
/// - `timeout` elapses before `target` is observed.
|
||||
async fn wait_for_persistence(
|
||||
stream: &mut SubscriptionStream<BlockNumHash>,
|
||||
target: u64,
|
||||
last_persisted: &mut u64,
|
||||
timeout: Duration,
|
||||
) -> eyre::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
while *last_persisted < target {
|
||||
match stream.next().await {
|
||||
Some(persisted) => {
|
||||
*last_persisted = persisted.number;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted_block = ?last_persisted,
|
||||
"Received persistence notification"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
eyre::eyre!(
|
||||
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
|
||||
target,
|
||||
timeout,
|
||||
last_persisted
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
|
||||
/// The provider must be kept alive for the subscription to continue receiving events.
|
||||
struct PersistenceSubscription {
|
||||
_provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl PersistenceSubscription {
|
||||
const fn new(
|
||||
provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
) -> Self {
|
||||
Self { _provider: provider, stream }
|
||||
}
|
||||
|
||||
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Encapsulates the block waiting logic.
|
||||
///
|
||||
/// Provides a simple `on_block()` interface that handles both:
|
||||
/// - Fixed duration waits (when `wait_time` is set)
|
||||
/// - Persistence-based waits (when `subscription` is set)
|
||||
///
|
||||
/// For persistence mode, waits after every `(threshold + 1)` blocks.
|
||||
struct PersistenceWaiter {
|
||||
wait_time: Option<Duration>,
|
||||
subscription: Option<PersistenceSubscription>,
|
||||
blocks_sent: u64,
|
||||
last_persisted: u64,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWaiter {
|
||||
const fn with_duration(wait_time: Duration) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: None,
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold: 0,
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
const fn with_subscription(
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: None,
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.blocks_sent += 1;
|
||||
|
||||
if self.blocks_sent % (self.threshold + 1) == 0 {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
target_block = ?block_number,
|
||||
last_persisted = self.last_persisted,
|
||||
blocks_sent = self.blocks_sent,
|
||||
"Waiting for persistence"
|
||||
);
|
||||
|
||||
wait_for_persistence(
|
||||
subscription.stream_mut(),
|
||||
block_number,
|
||||
&mut self.last_persisted,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted = self.last_persisted,
|
||||
"Persistence caught up"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_engine_url_to_ws_url() {
|
||||
// http -> ws, always uses port 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "ws://localhost:8546/");
|
||||
|
||||
// https -> wss
|
||||
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "wss://localhost:8546/");
|
||||
|
||||
// Custom engine port still maps to 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
|
||||
assert_eq!(result.port(), Some(8546));
|
||||
|
||||
// Already ws passthrough
|
||||
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
|
||||
assert_eq!(result.scheme(), "ws");
|
||||
|
||||
// Invalid inputs
|
||||
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
|
||||
assert!(engine_url_to_ws_url("not a valid url").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_waiter_with_duration() {
|
||||
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
|
||||
|
||||
let start = Instant::now();
|
||||
waiter.on_block(1).await.unwrap();
|
||||
waiter.on_block(2).await.unwrap();
|
||||
waiter.on_block(3).await.unwrap();
|
||||
|
||||
// Should have waited ~3ms total
|
||||
assert!(start.elapsed() >= Duration::from_millis(3));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,6 +116,11 @@ jemalloc-prof = [
|
||||
"reth-cli-util/jemalloc",
|
||||
"reth-cli-util/jemalloc-prof",
|
||||
"reth-ethereum-cli/jemalloc-prof",
|
||||
"reth-node-metrics/jemalloc-prof",
|
||||
]
|
||||
jemalloc-symbols = [
|
||||
"jemalloc-prof",
|
||||
"reth-ethereum-cli/jemalloc-symbols",
|
||||
]
|
||||
jemalloc-unprefixed = [
|
||||
"reth-cli-util/jemalloc-unprefixed",
|
||||
@@ -166,6 +171,8 @@ min-trace-logs = [
|
||||
"reth-node-core/min-trace-logs",
|
||||
]
|
||||
|
||||
edge = ["reth-ethereum-cli/edge"]
|
||||
|
||||
[[bin]]
|
||||
name = "reth"
|
||||
path = "src/main.rs"
|
||||
|
||||
@@ -2,22 +2,46 @@
|
||||
//!
|
||||
//! ## Feature Flags
|
||||
//!
|
||||
//! ### Default Features
|
||||
//!
|
||||
//! - `jemalloc`: Uses [jemallocator](https://github.com/tikv/jemallocator) as the global allocator.
|
||||
//! This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc)
|
||||
//! for more info.
|
||||
//! - `otlp`: Enables [OpenTelemetry](https://opentelemetry.io/) metrics export to a configured OTLP
|
||||
//! collector endpoint.
|
||||
//! - `js-tracer`: Enables the `JavaScript` tracer for the `debug_trace` endpoints, allowing custom
|
||||
//! `JavaScript`-based transaction tracing.
|
||||
//! - `keccak-cache-global`: Enables global caching for Keccak256 hashes to improve performance.
|
||||
//! - `asm-keccak`: Replaces the default, pure-Rust implementation of Keccak256 with one implemented
|
||||
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
|
||||
//! details and supported targets.
|
||||
//!
|
||||
//! ### Allocator Features
|
||||
//!
|
||||
//! - `jemalloc-prof`: Enables [jemallocator's](https://github.com/tikv/jemallocator) heap profiling
|
||||
//! and leak detection functionality. See [jemalloc's opt.prof](https://jemalloc.net/jemalloc.3.html#opt.prof)
|
||||
//! documentation for usage details. This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc)
|
||||
//! for more info.
|
||||
//! - `asm-keccak`: replaces the default, pure-Rust implementation of Keccak256 with one implemented
|
||||
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
|
||||
//! details and supported targets
|
||||
//! documentation for usage details. This is **not recommended on Windows**.
|
||||
//! - `jemalloc-symbols`: Enables jemalloc symbols for profiling. Includes `jemalloc-prof`.
|
||||
//! - `jemalloc-unprefixed`: Uses unprefixed jemalloc symbols.
|
||||
//! - `tracy-allocator`: Enables [Tracy](https://github.com/wolfpld/tracy) profiler allocator
|
||||
//! integration for memory profiling.
|
||||
//! - `snmalloc`: Uses [snmalloc](https://github.com/snmalloc/snmalloc) as the global allocator. Use
|
||||
//! `--no-default-features` when enabling this, as jemalloc takes precedence.
|
||||
//! - `snmalloc-native`: Uses snmalloc with native CPU optimizations. Use `--no-default-features`
|
||||
//! when enabling this.
|
||||
//!
|
||||
//! ### Log Level Features
|
||||
//!
|
||||
//! - `min-error-logs`: Disables all logs below `error` level.
|
||||
//! - `min-warn-logs`: Disables all logs below `warn` level.
|
||||
//! - `min-info-logs`: Disables all logs below `info` level. This can speed up the node, since fewer
|
||||
//! calls to the logging component are made.
|
||||
//! - `min-debug-logs`: Disables all logs below `debug` level.
|
||||
//! - `min-trace-logs`: Disables all logs below `trace` level.
|
||||
//!
|
||||
//! ### Development Features
|
||||
//!
|
||||
//! - `dev`: Enables development mode features, including test vector generation commands.
|
||||
|
||||
#![doc(
|
||||
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
|
||||
@@ -170,7 +194,7 @@ pub mod rpc {
|
||||
pub use reth_rpc::eth::*;
|
||||
}
|
||||
|
||||
/// Re-exported from `reth_rpc::rpc`.
|
||||
/// Re-exported from `reth_rpc_server_types::result`.
|
||||
pub mod result {
|
||||
pub use reth_rpc_server_types::result::*;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,10 @@
|
||||
#[global_allocator]
|
||||
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
|
||||
|
||||
#[cfg(all(feature = "jemalloc-prof", unix))]
|
||||
#[unsafe(export_name = "_rjem_malloc_conf")]
|
||||
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||
|
||||
use clap::Parser;
|
||||
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
|
||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||
|
||||
@@ -33,6 +33,7 @@ where
|
||||
) -> Self {
|
||||
let (finalized_block, _) = watch::channel(finalized);
|
||||
let (safe_block, _) = watch::channel(safe);
|
||||
let (persisted_block, _) = watch::channel(None);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(ChainInfoInner {
|
||||
@@ -42,6 +43,7 @@ where
|
||||
canonical_head: RwLock::new(head),
|
||||
safe_block,
|
||||
finalized_block,
|
||||
persisted_block,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -97,6 +99,11 @@ where
|
||||
self.inner.finalized_block.borrow().as_ref().map(SealedHeader::num_hash)
|
||||
}
|
||||
|
||||
/// Returns the `BlockNumHash` of the persisted block.
|
||||
pub fn get_persisted_num_hash(&self) -> Option<BlockNumHash> {
|
||||
*self.inner.persisted_block.borrow()
|
||||
}
|
||||
|
||||
/// Sets the canonical head of the chain.
|
||||
pub fn set_canonical_head(&self, header: SealedHeader<N::BlockHeader>) {
|
||||
let number = header.number();
|
||||
@@ -130,6 +137,18 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
/// Sets the persisted block of the chain.
|
||||
pub fn set_persisted(&self, num_hash: BlockNumHash) {
|
||||
self.inner.persisted_block.send_if_modified(|current| {
|
||||
if current.map(|b| b.hash) != Some(num_hash.hash) {
|
||||
let _ = current.replace(num_hash);
|
||||
return true
|
||||
}
|
||||
|
||||
false
|
||||
});
|
||||
}
|
||||
|
||||
/// Subscribe to the finalized block.
|
||||
pub fn subscribe_finalized_block(
|
||||
&self,
|
||||
@@ -141,6 +160,11 @@ where
|
||||
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
|
||||
self.inner.safe_block.subscribe()
|
||||
}
|
||||
|
||||
/// Subscribe to the persisted block.
|
||||
pub fn subscribe_persisted_block(&self) -> watch::Receiver<Option<BlockNumHash>> {
|
||||
self.inner.persisted_block.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
/// Container type for all chain info fields
|
||||
@@ -159,11 +183,14 @@ struct ChainInfoInner<N: NodePrimitives = reth_ethereum_primitives::EthPrimitive
|
||||
safe_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
|
||||
/// The block that the beacon node considers finalized.
|
||||
finalized_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
|
||||
/// The last block that was persisted to disk.
|
||||
persisted_block: watch::Sender<Option<BlockNumHash>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::B256;
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_testing_utils::{generators, generators::random_header};
|
||||
|
||||
@@ -338,4 +365,28 @@ mod tests {
|
||||
// Assert that the BlockNumHash returned matches the safe header
|
||||
assert_eq!(tracker.get_safe_num_hash(), Some(safe_header.num_hash()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_persisted() {
|
||||
let mut rng = generators::rng();
|
||||
let header = random_header(&mut rng, 10, None);
|
||||
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header, None, None);
|
||||
|
||||
// Initial state: persisted block should be None
|
||||
assert!(tracker.get_persisted_num_hash().is_none());
|
||||
|
||||
// Set a persisted block
|
||||
let num_hash1 = BlockNumHash::new(10, B256::random());
|
||||
tracker.set_persisted(num_hash1);
|
||||
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash1));
|
||||
|
||||
// Setting the same block again should not change anything
|
||||
tracker.set_persisted(num_hash1);
|
||||
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash1));
|
||||
|
||||
// Set a different block
|
||||
let num_hash2 = BlockNumHash::new(20, B256::random());
|
||||
tracker.set_persisted(num_hash2);
|
||||
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,12 +37,19 @@ pub struct ComputedTrieData {
|
||||
|
||||
/// Trie input bundled with its anchor hash.
|
||||
///
|
||||
/// This is used to store the trie input and anchor hash for a block together.
|
||||
/// The `trie_input` contains the **cumulative** overlay of all in-memory ancestor blocks,
|
||||
/// not just this block's changes. Child blocks reuse the parent's overlay in O(1) by
|
||||
/// cloning the Arc-wrapped data.
|
||||
///
|
||||
/// The `anchor_hash` is metadata indicating which persisted base state this overlay
|
||||
/// sits on top of. It is CRITICAL for overlay reuse decisions: an overlay built on top
|
||||
/// of Anchor A cannot be reused for a block anchored to Anchor B, as it would result
|
||||
/// in an incorrect state.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AnchoredTrieInput {
|
||||
/// The persisted ancestor hash this trie input is anchored to.
|
||||
pub anchor_hash: B256,
|
||||
/// Trie input constructed from in-memory overlays.
|
||||
/// Cumulative trie input overlay from all in-memory ancestors.
|
||||
pub trie_input: Arc<TrieInputSorted>,
|
||||
}
|
||||
|
||||
@@ -62,7 +69,8 @@ static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
|
||||
/// Internal state for deferred trie data.
|
||||
enum DeferredState {
|
||||
/// Data is not yet available; raw inputs stored for fallback computation.
|
||||
Pending(PendingInputs),
|
||||
/// Wrapped in `Option` to allow taking ownership during computation.
|
||||
Pending(Option<PendingInputs>),
|
||||
/// Data has been computed and is ready.
|
||||
Ready(ComputedTrieData),
|
||||
}
|
||||
@@ -112,12 +120,12 @@ impl DeferredTrieData {
|
||||
ancestors: Vec<Self>,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: Arc::new(Mutex::new(DeferredState::Pending(PendingInputs {
|
||||
state: Arc::new(Mutex::new(DeferredState::Pending(Some(PendingInputs {
|
||||
hashed_state,
|
||||
trie_updates,
|
||||
anchor_hash,
|
||||
ancestors,
|
||||
}))),
|
||||
})))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,8 +146,9 @@ impl DeferredTrieData {
|
||||
///
|
||||
/// # Process
|
||||
/// 1. Sort the current block's hashed state and trie updates
|
||||
/// 2. Merge ancestor overlays (oldest -> newest, so later state takes precedence)
|
||||
/// 3. Extend the merged overlay with this block's sorted data
|
||||
/// 2. Reuse parent's cached overlay if available (O(1) - the common case)
|
||||
/// 3. Otherwise, rebuild overlay from ancestors (rare fallback)
|
||||
/// 4. Extend the overlay with this block's sorted data
|
||||
///
|
||||
/// Used by both the async background task and the synchronous fallback path.
|
||||
///
|
||||
@@ -147,49 +156,103 @@ impl DeferredTrieData {
|
||||
/// * `hashed_state` - Unsorted hashed post-state (account/storage changes) from execution
|
||||
/// * `trie_updates` - Unsorted trie node updates from state root computation
|
||||
/// * `anchor_hash` - The persisted ancestor hash this trie input is anchored to
|
||||
/// * `ancestors` - Deferred trie data from ancestor blocks for merging
|
||||
/// * `ancestors` - Deferred trie data from ancestor blocks for merging (oldest -> newest)
|
||||
pub fn sort_and_build_trie_input(
|
||||
hashed_state: &HashedPostState,
|
||||
trie_updates: &TrieUpdates,
|
||||
hashed_state: Arc<HashedPostState>,
|
||||
trie_updates: Arc<TrieUpdates>,
|
||||
anchor_hash: B256,
|
||||
ancestors: &[Self],
|
||||
) -> ComputedTrieData {
|
||||
// Sort the current block's hashed state and trie updates
|
||||
let sorted_hashed_state = Arc::new(hashed_state.clone_into_sorted());
|
||||
let sorted_trie_updates = Arc::new(trie_updates.clone_into_sorted());
|
||||
let sorted_hashed_state = match Arc::try_unwrap(hashed_state) {
|
||||
Ok(state) => state.into_sorted(),
|
||||
Err(arc) => arc.clone_into_sorted(),
|
||||
};
|
||||
let sorted_trie_updates = match Arc::try_unwrap(trie_updates) {
|
||||
Ok(updates) => updates.into_sorted(),
|
||||
Err(arc) => arc.clone_into_sorted(),
|
||||
};
|
||||
|
||||
// Merge trie data from ancestors (oldest -> newest so later state takes precedence)
|
||||
let mut overlay = TrieInputSorted::default();
|
||||
for ancestor in ancestors {
|
||||
let ancestor_data = ancestor.wait_cloned();
|
||||
{
|
||||
let state_mut = Arc::make_mut(&mut overlay.state);
|
||||
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
|
||||
}
|
||||
{
|
||||
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
|
||||
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
|
||||
}
|
||||
}
|
||||
// Reuse parent's overlay if available and anchors match.
|
||||
// We can only reuse the parent's overlay if it was built on top of the same
|
||||
// persisted anchor. If the anchor has changed (e.g., due to persistence),
|
||||
// the parent's overlay is relative to an old state and cannot be used.
|
||||
let overlay = if let Some(parent) = ancestors.last() {
|
||||
let parent_data = parent.wait_cloned();
|
||||
|
||||
// Extend overlay with current block's sorted data
|
||||
{
|
||||
let state_mut = Arc::make_mut(&mut overlay.state);
|
||||
state_mut.extend_ref(sorted_hashed_state.as_ref());
|
||||
}
|
||||
{
|
||||
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
|
||||
nodes_mut.extend_ref(sorted_trie_updates.as_ref());
|
||||
}
|
||||
match &parent_data.anchored_trie_input {
|
||||
// Case 1: Parent has cached overlay AND anchors match.
|
||||
Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input })
|
||||
if *parent_anchor == anchor_hash =>
|
||||
{
|
||||
// O(1): Reuse parent's overlay, extend with current block's data.
|
||||
let mut overlay = TrieInputSorted::new(
|
||||
Arc::clone(&trie_input.nodes),
|
||||
Arc::clone(&trie_input.state),
|
||||
Default::default(), // prefix_sets are per-block, not cumulative
|
||||
);
|
||||
// Only trigger COW clone if there's actually data to add.
|
||||
if !sorted_hashed_state.is_empty() {
|
||||
Arc::make_mut(&mut overlay.state).extend_ref(&sorted_hashed_state);
|
||||
}
|
||||
if !sorted_trie_updates.is_empty() {
|
||||
Arc::make_mut(&mut overlay.nodes).extend_ref(&sorted_trie_updates);
|
||||
}
|
||||
overlay
|
||||
}
|
||||
// Case 2: Parent exists but anchor mismatch or no cached overlay.
|
||||
// We must rebuild from the ancestors list (which only contains unpersisted blocks).
|
||||
_ => Self::merge_ancestors_into_overlay(
|
||||
ancestors,
|
||||
&sorted_hashed_state,
|
||||
&sorted_trie_updates,
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// Case 3: No in-memory ancestors (first block after persisted anchor).
|
||||
// Build overlay with just this block's data.
|
||||
Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates)
|
||||
};
|
||||
|
||||
ComputedTrieData::with_trie_input(
|
||||
sorted_hashed_state,
|
||||
sorted_trie_updates,
|
||||
Arc::new(sorted_hashed_state),
|
||||
Arc::new(sorted_trie_updates),
|
||||
anchor_hash,
|
||||
Arc::new(overlay),
|
||||
)
|
||||
}
|
||||
|
||||
/// Merge all ancestors and current block's data into a single overlay.
|
||||
///
|
||||
/// This is a rare fallback path, only used when no ancestor has a cached
|
||||
/// `anchored_trie_input` (e.g., blocks created via alternative constructors).
|
||||
/// In normal operation, the parent always has a cached overlay and this
|
||||
/// function is never called.
|
||||
///
|
||||
/// Iterates ancestors oldest -> newest, then extends with current block's data,
|
||||
/// so later state takes precedence.
|
||||
fn merge_ancestors_into_overlay(
|
||||
ancestors: &[Self],
|
||||
sorted_hashed_state: &HashedPostStateSorted,
|
||||
sorted_trie_updates: &TrieUpdatesSorted,
|
||||
) -> TrieInputSorted {
|
||||
let mut overlay = TrieInputSorted::default();
|
||||
|
||||
let state_mut = Arc::make_mut(&mut overlay.state);
|
||||
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
|
||||
|
||||
for ancestor in ancestors {
|
||||
let ancestor_data = ancestor.wait_cloned();
|
||||
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
|
||||
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
|
||||
}
|
||||
|
||||
// Extend with current block's sorted data last (takes precedence)
|
||||
state_mut.extend_ref(sorted_hashed_state);
|
||||
nodes_mut.extend_ref(sorted_trie_updates);
|
||||
|
||||
overlay
|
||||
}
|
||||
|
||||
/// Returns trie data, computing synchronously if the async task hasn't completed.
|
||||
///
|
||||
/// - If the async task has completed (`Ready`), returns the cached result.
|
||||
@@ -204,7 +267,7 @@ impl DeferredTrieData {
|
||||
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
|
||||
pub fn wait_cloned(&self) -> ComputedTrieData {
|
||||
let mut state = self.state.lock();
|
||||
match &*state {
|
||||
match &mut *state {
|
||||
// If the deferred trie data is ready, return the cached result.
|
||||
DeferredState::Ready(bundle) => {
|
||||
DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
|
||||
@@ -212,11 +275,14 @@ impl DeferredTrieData {
|
||||
}
|
||||
// If the deferred trie data is pending, compute the trie data synchronously and return
|
||||
// the result. This is the fallback path if the async task hasn't completed.
|
||||
DeferredState::Pending(inputs) => {
|
||||
DeferredState::Pending(maybe_inputs) => {
|
||||
DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
|
||||
|
||||
let inputs = maybe_inputs.take().expect("inputs must be present in Pending state");
|
||||
|
||||
let computed = Self::sort_and_build_trie_input(
|
||||
&inputs.hashed_state,
|
||||
&inputs.trie_updates,
|
||||
inputs.hashed_state,
|
||||
inputs.trie_updates,
|
||||
inputs.anchor_hash,
|
||||
&inputs.ancestors,
|
||||
);
|
||||
@@ -441,4 +507,365 @@ mod tests {
|
||||
let (_, account) = &overlay_state[0];
|
||||
assert_eq!(account.unwrap().nonce, 2);
|
||||
}
|
||||
|
||||
/// Helper to create a ready block with anchored trie input containing specific state.
|
||||
fn ready_block_with_state(
|
||||
anchor_hash: B256,
|
||||
accounts: Vec<(B256, Option<Account>)>,
|
||||
) -> DeferredTrieData {
|
||||
let hashed_state = Arc::new(HashedPostStateSorted::new(accounts, B256Map::default()));
|
||||
let trie_updates = Arc::default();
|
||||
let mut overlay = TrieInputSorted::default();
|
||||
Arc::make_mut(&mut overlay.state).extend_ref(hashed_state.as_ref());
|
||||
|
||||
DeferredTrieData::ready(ComputedTrieData {
|
||||
hashed_state,
|
||||
trie_updates,
|
||||
anchored_trie_input: Some(AnchoredTrieInput {
|
||||
anchor_hash,
|
||||
trie_input: Arc::new(overlay),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Verifies that first block after anchor (no ancestors) creates empty base overlay.
|
||||
#[test]
|
||||
fn first_block_after_anchor_creates_empty_base() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let key = B256::with_last_byte(42);
|
||||
let account = Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None };
|
||||
|
||||
// First block after anchor - no ancestors
|
||||
let first_block = DeferredTrieData::pending(
|
||||
Arc::new(HashedPostState::default().with_accounts([(key, Some(account))])),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![], // No ancestors
|
||||
);
|
||||
|
||||
let result = first_block.wait_cloned();
|
||||
|
||||
// Should have overlay with just this block's data
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.anchor_hash, anchor);
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
|
||||
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
|
||||
assert_eq!(*found_key, key);
|
||||
assert_eq!(found_account.unwrap().nonce, 1);
|
||||
}
|
||||
|
||||
/// Verifies that parent's overlay is reused regardless of anchor.
|
||||
#[test]
|
||||
fn reuses_parent_overlay() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let key = B256::with_last_byte(42);
|
||||
let account = Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None };
|
||||
|
||||
// Create parent with anchored trie input
|
||||
let parent = ready_block_with_state(anchor, vec![(key, Some(account))]);
|
||||
|
||||
// Create child - should reuse parent's overlay
|
||||
let child = DeferredTrieData::pending(
|
||||
Arc::new(HashedPostState::default()),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![parent],
|
||||
);
|
||||
|
||||
let result = child.wait_cloned();
|
||||
|
||||
// Verify parent's account is in the overlay
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.anchor_hash, anchor);
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
|
||||
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
|
||||
assert_eq!(*found_key, key);
|
||||
assert_eq!(found_account.unwrap().nonce, 100);
|
||||
}
|
||||
|
||||
/// Verifies that parent's overlay is NOT reused when anchor changes (after persist).
|
||||
/// The overlay data is dependent on the anchor, so it must be rebuilt from the
|
||||
/// remaining ancestors.
|
||||
#[test]
|
||||
fn rebuilds_overlay_when_anchor_changes() {
|
||||
let old_anchor = B256::with_last_byte(1);
|
||||
let new_anchor = B256::with_last_byte(2);
|
||||
let key = B256::with_last_byte(42);
|
||||
let account = Account { nonce: 50, balance: U256::ZERO, bytecode_hash: None };
|
||||
|
||||
// Create parent with OLD anchor
|
||||
let parent = ready_block_with_state(old_anchor, vec![(key, Some(account))]);
|
||||
|
||||
// Create child with NEW anchor (simulates after persist)
|
||||
// Should NOT reuse parent's overlay because anchor changed
|
||||
let child = DeferredTrieData::pending(
|
||||
Arc::new(HashedPostState::default()),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
new_anchor,
|
||||
vec![parent],
|
||||
);
|
||||
|
||||
let result = child.wait_cloned();
|
||||
|
||||
// Verify result uses new anchor
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.anchor_hash, new_anchor);
|
||||
|
||||
// Crucially, since we provided `parent` in ancestors but it has a different anchor,
|
||||
// the code falls back to `merge_ancestors_into_overlay`.
|
||||
// `merge_ancestors_into_overlay` reads `parent.hashed_state` (which has the account).
|
||||
// So the account IS present, but it was obtained via REBUILD, not REUSE.
|
||||
// We can check `DEFERRED_TRIE_METRICS` if we want to be sure, but functionally:
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
|
||||
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
|
||||
assert_eq!(*found_key, key);
|
||||
assert_eq!(found_account.unwrap().nonce, 50);
|
||||
}
|
||||
|
||||
/// Verifies that parent without `anchored_trie_input` triggers rebuild path.
|
||||
#[test]
|
||||
fn rebuilds_when_parent_has_no_anchored_input() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let key = B256::with_last_byte(42);
|
||||
let account = Account { nonce: 25, balance: U256::ZERO, bytecode_hash: None };
|
||||
|
||||
// Create parent WITHOUT anchored trie input (e.g., from without_trie_input constructor)
|
||||
let parent_state =
|
||||
HashedPostStateSorted::new(vec![(key, Some(account))], B256Map::default());
|
||||
let parent = DeferredTrieData::ready(ComputedTrieData {
|
||||
hashed_state: Arc::new(parent_state),
|
||||
trie_updates: Arc::default(),
|
||||
anchored_trie_input: None, // No anchored input
|
||||
});
|
||||
|
||||
// Create child - should rebuild from parent's hashed_state
|
||||
let child = DeferredTrieData::pending(
|
||||
Arc::new(HashedPostState::default()),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![parent],
|
||||
);
|
||||
|
||||
let result = child.wait_cloned();
|
||||
|
||||
// Verify overlay is built and contains parent's data
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.anchor_hash, anchor);
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
|
||||
}
|
||||
|
||||
/// Verifies that a chain of blocks with matching anchors builds correct cumulative overlay.
|
||||
#[test]
|
||||
fn chain_of_blocks_builds_cumulative_overlay() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let key1 = B256::with_last_byte(1);
|
||||
let key2 = B256::with_last_byte(2);
|
||||
let key3 = B256::with_last_byte(3);
|
||||
|
||||
// Block 1: sets account at key1
|
||||
let block1 = ready_block_with_state(
|
||||
anchor,
|
||||
vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
|
||||
);
|
||||
|
||||
// Block 2: adds account at key2, ancestor is block1
|
||||
let block2_hashed = HashedPostState::default().with_accounts([(
|
||||
key2,
|
||||
Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let block2 = DeferredTrieData::pending(
|
||||
Arc::new(block2_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![block1.clone()],
|
||||
);
|
||||
// Compute block2's trie data
|
||||
let block2_computed = block2.wait_cloned();
|
||||
let block2_ready = DeferredTrieData::ready(block2_computed);
|
||||
|
||||
// Block 3: adds account at key3, ancestor is block2 (which includes block1)
|
||||
let block3_hashed = HashedPostState::default().with_accounts([(
|
||||
key3,
|
||||
Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let block3 = DeferredTrieData::pending(
|
||||
Arc::new(block3_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![block1, block2_ready],
|
||||
);
|
||||
|
||||
let result = block3.wait_cloned();
|
||||
|
||||
// Verify all three accounts are in the cumulative overlay
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 3);
|
||||
|
||||
// Accounts should be sorted by key (B256 ordering)
|
||||
let accounts = &overlay.trie_input.state.accounts;
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
|
||||
}
|
||||
|
||||
/// Verifies that child block's state overwrites parent's state for the same key.
|
||||
#[test]
|
||||
fn child_state_overwrites_parent() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let key = B256::with_last_byte(42);
|
||||
|
||||
// Parent sets nonce to 10
|
||||
let parent = ready_block_with_state(
|
||||
anchor,
|
||||
vec![(key, Some(Account { nonce: 10, balance: U256::ZERO, bytecode_hash: None }))],
|
||||
);
|
||||
|
||||
// Child overwrites nonce to 99
|
||||
let child_hashed = HashedPostState::default().with_accounts([(
|
||||
key,
|
||||
Some(Account { nonce: 99, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let child = DeferredTrieData::pending(
|
||||
Arc::new(child_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
vec![parent],
|
||||
);
|
||||
|
||||
let result = child.wait_cloned();
|
||||
|
||||
// Verify child's value wins (extend_ref uses later value)
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
// Note: extend_ref may result in duplicate keys; check the last occurrence
|
||||
let accounts = &overlay.trie_input.state.accounts;
|
||||
let last_account = accounts.iter().rfind(|(k, _)| *k == key).unwrap();
|
||||
assert_eq!(last_account.1.unwrap().nonce, 99);
|
||||
}
|
||||
|
||||
/// Stress test: verify O(N) behavior by building a chain of many blocks.
|
||||
/// This test ensures the fix doesn't regress - previously this would be O(N²).
|
||||
#[test]
|
||||
fn long_chain_builds_in_linear_time() {
|
||||
let anchor = B256::with_last_byte(1);
|
||||
let num_blocks = 50; // Enough to notice O(N²) vs O(N) difference
|
||||
|
||||
let mut ancestors: Vec<DeferredTrieData> = Vec::new();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..num_blocks {
|
||||
let key = B256::with_last_byte(i as u8);
|
||||
let account = Account { nonce: i as u64, balance: U256::ZERO, bytecode_hash: None };
|
||||
let hashed = HashedPostState::default().with_accounts([(key, Some(account))]);
|
||||
|
||||
let block = DeferredTrieData::pending(
|
||||
Arc::new(hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
anchor,
|
||||
ancestors.clone(),
|
||||
);
|
||||
|
||||
// Compute and add to ancestors for next iteration
|
||||
let computed = block.wait_cloned();
|
||||
ancestors.push(DeferredTrieData::ready(computed));
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// With O(N) fix, 50 blocks should complete quickly (< 1 second)
|
||||
// With O(N²), this would take significantly longer
|
||||
assert!(
|
||||
elapsed < Duration::from_secs(2),
|
||||
"Chain of {num_blocks} blocks took {:?}, possible O(N²) regression",
|
||||
elapsed
|
||||
);
|
||||
|
||||
// Verify final overlay has all accounts
|
||||
let final_result = ancestors.last().unwrap().wait_cloned();
|
||||
let overlay = final_result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), num_blocks);
|
||||
}
|
||||
|
||||
/// Verifies that a multi-ancestor overlay is rebuilt when anchor changes.
|
||||
/// This simulates the "persist prefix then keep building" scenario where:
|
||||
/// 1. A chain of blocks is built with anchor A
|
||||
/// 2. Some blocks are persisted, changing anchor to B
|
||||
/// 3. New blocks must rebuild the overlay from the remaining ancestors
|
||||
#[test]
|
||||
fn multi_ancestor_overlay_rebuilt_after_anchor_change() {
|
||||
let old_anchor = B256::with_last_byte(1);
|
||||
let new_anchor = B256::with_last_byte(2);
|
||||
let key1 = B256::with_last_byte(1);
|
||||
let key2 = B256::with_last_byte(2);
|
||||
let key3 = B256::with_last_byte(3);
|
||||
let key4 = B256::with_last_byte(4);
|
||||
|
||||
// Build a chain of 3 blocks with old_anchor
|
||||
let block1 = ready_block_with_state(
|
||||
old_anchor,
|
||||
vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
|
||||
);
|
||||
|
||||
let block2_hashed = HashedPostState::default().with_accounts([(
|
||||
key2,
|
||||
Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let block2 = DeferredTrieData::pending(
|
||||
Arc::new(block2_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
old_anchor,
|
||||
vec![block1.clone()],
|
||||
);
|
||||
let block2_ready = DeferredTrieData::ready(block2.wait_cloned());
|
||||
|
||||
let block3_hashed = HashedPostState::default().with_accounts([(
|
||||
key3,
|
||||
Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let block3 = DeferredTrieData::pending(
|
||||
Arc::new(block3_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
old_anchor,
|
||||
vec![block1.clone(), block2_ready.clone()],
|
||||
);
|
||||
let block3_ready = DeferredTrieData::ready(block3.wait_cloned());
|
||||
|
||||
// Verify block3's overlay has all 3 accounts with old_anchor
|
||||
let block3_overlay = block3_ready.wait_cloned().anchored_trie_input.unwrap();
|
||||
assert_eq!(block3_overlay.anchor_hash, old_anchor);
|
||||
assert_eq!(block3_overlay.trie_input.state.accounts.len(), 3);
|
||||
|
||||
// Now simulate persist: create block4 with NEW anchor but same ancestors.
|
||||
// To verify correct rebuilding, we must provide ALL unpersisted ancestors.
|
||||
// If we only provided block3, the rebuild would only see block3's state.
|
||||
// We pass block1, block2, block3 to simulate that they are all still in memory
|
||||
// but the anchor check forces a rebuild (e.g. artificial anchor change).
|
||||
let block4_hashed = HashedPostState::default().with_accounts([(
|
||||
key4,
|
||||
Some(Account { nonce: 4, balance: U256::ZERO, bytecode_hash: None }),
|
||||
)]);
|
||||
let block4 = DeferredTrieData::pending(
|
||||
Arc::new(block4_hashed),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
new_anchor, // Different anchor - simulates post-persist
|
||||
vec![block1, block2_ready, block3_ready],
|
||||
);
|
||||
|
||||
let result = block4.wait_cloned();
|
||||
|
||||
// Verify:
|
||||
// 1. New anchor is used in result
|
||||
assert_eq!(result.anchor_hash(), Some(new_anchor));
|
||||
|
||||
// 2. All 4 accounts are in the overlay (rebuilt from ancestors + extended)
|
||||
let overlay = result.anchored_trie_input.as_ref().unwrap();
|
||||
assert_eq!(overlay.trie_input.state.accounts.len(), 4);
|
||||
|
||||
// 3. All accounts have correct values
|
||||
let accounts = &overlay.trie_input.state.accounts;
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
|
||||
assert!(accounts.iter().any(|(k, a)| *k == key4 && a.unwrap().nonce == 4));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,6 +317,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
/// This will update the links between blocks and remove all blocks that are [..
|
||||
/// `persisted_height`].
|
||||
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
|
||||
self.set_persisted(persisted_num_hash);
|
||||
// if the persisted hash is not in the canonical in memory state, do nothing, because it
|
||||
// means canonical blocks were not actually persisted.
|
||||
//
|
||||
@@ -444,6 +445,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
self.inner.chain_info_tracker.set_finalized(header);
|
||||
}
|
||||
|
||||
/// Persisted block setter.
|
||||
pub fn set_persisted(&self, num_hash: BlockNumHash) {
|
||||
self.inner.chain_info_tracker.set_persisted(num_hash);
|
||||
}
|
||||
|
||||
/// Canonical head getter.
|
||||
pub fn get_canonical_head(&self) -> SealedHeader<N::BlockHeader> {
|
||||
self.inner.chain_info_tracker.get_canonical_head()
|
||||
@@ -459,6 +465,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
self.inner.chain_info_tracker.get_safe_header()
|
||||
}
|
||||
|
||||
/// Persisted block `BlockNumHash` getter.
|
||||
pub fn get_persisted_num_hash(&self) -> Option<BlockNumHash> {
|
||||
self.inner.chain_info_tracker.get_persisted_num_hash()
|
||||
}
|
||||
|
||||
/// Returns the `SealedHeader` corresponding to the pending state.
|
||||
pub fn pending_sealed_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
|
||||
self.pending_state().map(|h| h.block_ref().recovered_block().clone_sealed_header())
|
||||
@@ -511,6 +522,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
|
||||
self.inner.chain_info_tracker.subscribe_finalized_block()
|
||||
}
|
||||
|
||||
/// Subscribe to new persisted block events.
|
||||
pub fn subscribe_persisted_block(&self) -> watch::Receiver<Option<BlockNumHash>> {
|
||||
self.inner.chain_info_tracker.subscribe_persisted_block()
|
||||
}
|
||||
|
||||
/// Attempts to send a new [`CanonStateNotification`] to all active Receiver handles.
|
||||
pub fn notify_canon_state(&self, event: CanonStateNotification<N>) {
|
||||
self.inner.canon_state_notification_sender.send(event).ok();
|
||||
@@ -930,6 +946,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
|
||||
chain.append_block(
|
||||
exec.recovered_block().clone(),
|
||||
exec.execution_outcome().clone(),
|
||||
exec.trie_updates(),
|
||||
exec.hashed_state(),
|
||||
);
|
||||
chain
|
||||
}));
|
||||
@@ -940,6 +958,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
|
||||
chain.append_block(
|
||||
exec.recovered_block().clone(),
|
||||
exec.execution_outcome().clone(),
|
||||
exec.trie_updates(),
|
||||
exec.hashed_state(),
|
||||
);
|
||||
chain
|
||||
}));
|
||||
@@ -947,6 +967,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
|
||||
chain.append_block(
|
||||
exec.recovered_block().clone(),
|
||||
exec.execution_outcome().clone(),
|
||||
exec.trie_updates(),
|
||||
exec.hashed_state(),
|
||||
);
|
||||
chain
|
||||
}));
|
||||
@@ -1530,13 +1552,24 @@ mod tests {
|
||||
// Test commit notification
|
||||
let chain_commit = NewCanonicalChain::Commit { new: vec![block0.clone(), block1.clone()] };
|
||||
|
||||
// Build expected trie updates map
|
||||
let mut expected_trie_updates = BTreeMap::new();
|
||||
expected_trie_updates.insert(0, block0.trie_updates());
|
||||
expected_trie_updates.insert(1, block1.trie_updates());
|
||||
|
||||
// Build expected hashed state map
|
||||
let mut expected_hashed_state = BTreeMap::new();
|
||||
expected_hashed_state.insert(0, block0.hashed_state());
|
||||
expected_hashed_state.insert(1, block1.hashed_state());
|
||||
|
||||
assert_eq!(
|
||||
chain_commit.to_chain_notification(),
|
||||
CanonStateNotification::Commit {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block0.recovered_block().clone(), block1.recovered_block().clone()],
|
||||
sample_execution_outcome.clone(),
|
||||
None
|
||||
expected_trie_updates,
|
||||
expected_hashed_state
|
||||
))
|
||||
}
|
||||
);
|
||||
@@ -1547,18 +1580,40 @@ mod tests {
|
||||
old: vec![block1.clone(), block2.clone()],
|
||||
};
|
||||
|
||||
// Build expected trie updates for old chain
|
||||
let mut old_trie_updates = BTreeMap::new();
|
||||
old_trie_updates.insert(1, block1.trie_updates());
|
||||
old_trie_updates.insert(2, block2.trie_updates());
|
||||
|
||||
// Build expected trie updates for new chain
|
||||
let mut new_trie_updates = BTreeMap::new();
|
||||
new_trie_updates.insert(1, block1a.trie_updates());
|
||||
new_trie_updates.insert(2, block2a.trie_updates());
|
||||
|
||||
// Build expected hashed state for old chain
|
||||
let mut old_hashed_state = BTreeMap::new();
|
||||
old_hashed_state.insert(1, block1.hashed_state());
|
||||
old_hashed_state.insert(2, block2.hashed_state());
|
||||
|
||||
// Build expected hashed state for new chain
|
||||
let mut new_hashed_state = BTreeMap::new();
|
||||
new_hashed_state.insert(1, block1a.hashed_state());
|
||||
new_hashed_state.insert(2, block2a.hashed_state());
|
||||
|
||||
assert_eq!(
|
||||
chain_reorg.to_chain_notification(),
|
||||
CanonStateNotification::Reorg {
|
||||
old: Arc::new(Chain::new(
|
||||
vec![block1.recovered_block().clone(), block2.recovered_block().clone()],
|
||||
sample_execution_outcome.clone(),
|
||||
None
|
||||
old_trie_updates,
|
||||
old_hashed_state
|
||||
)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()],
|
||||
sample_execution_outcome,
|
||||
None
|
||||
new_trie_updates,
|
||||
new_hashed_state
|
||||
))
|
||||
}
|
||||
);
|
||||
|
||||
@@ -23,7 +23,8 @@ mod notifications;
|
||||
pub use notifications::{
|
||||
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
|
||||
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
|
||||
ForkChoiceSubscriptions,
|
||||
ForkChoiceSubscriptions, PersistedBlockNotifications, PersistedBlockSubscriptions,
|
||||
WatchValueStream,
|
||||
};
|
||||
|
||||
mod memory_overlay;
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use crate::{
|
||||
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications,
|
||||
ForkChoiceSubscriptions,
|
||||
ForkChoiceSubscriptions, PersistedBlockNotifications, PersistedBlockSubscriptions,
|
||||
};
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_storage_api::noop::NoopProvider;
|
||||
@@ -27,3 +27,10 @@ impl<C: Send + Sync, N: NodePrimitives> ForkChoiceSubscriptions for NoopProvider
|
||||
ForkChoiceNotifications(rx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Send + Sync, N: NodePrimitives> PersistedBlockSubscriptions for NoopProvider<C, N> {
|
||||
fn subscribe_persisted_block(&self) -> PersistedBlockNotifications {
|
||||
let (_, rx) = watch::channel(None);
|
||||
PersistedBlockNotifications(rx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Canonical chain state notification trait and types.
|
||||
|
||||
use alloy_eips::eip2718::Encodable2718;
|
||||
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use reth_execution_types::{BlockReceipts, Chain};
|
||||
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
|
||||
@@ -205,22 +205,22 @@ pub trait ForkChoiceSubscriptions: Send + Sync {
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream for fork choice watch channels (pending, safe or finalized watchers)
|
||||
/// A stream that yields values from a `watch::Receiver<Option<T>>`, filtering out `None` values.
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct ForkChoiceStream<T> {
|
||||
pub struct WatchValueStream<T> {
|
||||
#[pin]
|
||||
st: WatchStream<Option<T>>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
|
||||
/// Creates a new `ForkChoiceStream`
|
||||
impl<T: Clone + Sync + Send + 'static> WatchValueStream<T> {
|
||||
/// Creates a new [`WatchValueStream`]
|
||||
pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
|
||||
Self { st: WatchStream::from_changes(rx) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
|
||||
impl<T: Clone + Sync + Send + 'static> Stream for WatchValueStream<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
@@ -234,6 +234,24 @@ impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Alias for [`WatchValueStream`] for fork choice watch channels.
|
||||
pub type ForkChoiceStream<T> = WatchValueStream<T>;
|
||||
|
||||
/// Wrapper around a watch receiver that receives persisted block notifications.
|
||||
#[derive(Debug, Deref, DerefMut)]
|
||||
pub struct PersistedBlockNotifications(pub watch::Receiver<Option<BlockNumHash>>);
|
||||
|
||||
/// A trait that allows subscribing to persisted block events.
|
||||
pub trait PersistedBlockSubscriptions: Send + Sync {
|
||||
/// Get notified when a new block is persisted to disk.
|
||||
fn subscribe_persisted_block(&self) -> PersistedBlockNotifications;
|
||||
|
||||
/// Convenience method to get a stream of the persisted blocks.
|
||||
fn persisted_block_stream(&self) -> WatchValueStream<BlockNumHash> {
|
||||
WatchValueStream::new(self.subscribe_persisted_block().0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -242,6 +260,7 @@ mod tests {
|
||||
use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
|
||||
use reth_execution_types::ExecutionOutcome;
|
||||
use reth_primitives_traits::SealedBlock;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[test]
|
||||
fn test_commit_notification() {
|
||||
@@ -260,7 +279,8 @@ mod tests {
|
||||
let chain: Arc<Chain> = Arc::new(Chain::new(
|
||||
vec![block1.clone(), block2.clone()],
|
||||
ExecutionOutcome::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
|
||||
// Create a commit notification
|
||||
@@ -295,12 +315,17 @@ mod tests {
|
||||
block3.set_block_number(3);
|
||||
block3.set_hash(block3_hash);
|
||||
|
||||
let old_chain: Arc<Chain> =
|
||||
Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
|
||||
let old_chain: Arc<Chain> = Arc::new(Chain::new(
|
||||
vec![block1.clone()],
|
||||
ExecutionOutcome::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
let new_chain = Arc::new(Chain::new(
|
||||
vec![block2.clone(), block3.clone()],
|
||||
ExecutionOutcome::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
|
||||
// Create a reorg notification
|
||||
@@ -362,8 +387,12 @@ mod tests {
|
||||
let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
|
||||
|
||||
// Create a new chain segment with `block1` and `block2` and the execution outcome.
|
||||
let new_chain: Arc<Chain> =
|
||||
Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
|
||||
let new_chain: Arc<Chain> = Arc::new(Chain::new(
|
||||
vec![block1.clone(), block2.clone()],
|
||||
execution_outcome,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
|
||||
// Create a commit notification containing the new chain segment.
|
||||
let notification = CanonStateNotification::Commit { new: new_chain };
|
||||
@@ -420,8 +449,12 @@ mod tests {
|
||||
ExecutionOutcome { receipts: old_receipts, ..Default::default() };
|
||||
|
||||
// Create an old chain segment to be reverted, containing `old_block1`.
|
||||
let old_chain: Arc<Chain> =
|
||||
Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
|
||||
let old_chain: Arc<Chain> = Arc::new(Chain::new(
|
||||
vec![old_block1.clone()],
|
||||
old_execution_outcome,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
|
||||
// Define block2 for the new chain segment, which will be committed.
|
||||
let mut body = BlockBody::<TransactionSigned>::default();
|
||||
@@ -449,7 +482,12 @@ mod tests {
|
||||
ExecutionOutcome { receipts: new_receipts, ..Default::default() };
|
||||
|
||||
// Create a new chain segment to be committed, containing `new_block1`.
|
||||
let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
|
||||
let new_chain = Arc::new(Chain::new(
|
||||
vec![new_block1.clone()],
|
||||
new_execution_outcome,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
));
|
||||
|
||||
// Create a reorg notification with both reverted (old) and committed (new) chain segments.
|
||||
let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
|
||||
|
||||
@@ -129,3 +129,5 @@ arbitrary = [
|
||||
"reth-primitives-traits/arbitrary",
|
||||
"reth-ethereum-primitives/arbitrary",
|
||||
]
|
||||
|
||||
edge = ["reth-db-common/edge", "reth-stages/rocksdb"]
|
||||
|
||||
@@ -107,13 +107,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
let (db, sfp) = match access {
|
||||
AccessRights::RW => (
|
||||
Arc::new(init_db(db_path, self.db.database_args())?),
|
||||
StaticFileProviderBuilder::read_write(sf_path)?
|
||||
StaticFileProviderBuilder::read_write(sf_path)
|
||||
.with_genesis_block_number(genesis_block_number)
|
||||
.build()?,
|
||||
),
|
||||
AccessRights::RO | AccessRights::RoInconsistent => {
|
||||
(Arc::new(open_db_read_only(&db_path, self.db.database_args())?), {
|
||||
let provider = StaticFileProviderBuilder::read_only(sf_path)?
|
||||
let provider = StaticFileProviderBuilder::read_only(sf_path)
|
||||
.with_genesis_block_number(genesis_block_number)
|
||||
.build()?;
|
||||
provider.watch_directory();
|
||||
|
||||
@@ -2,8 +2,8 @@ use alloy_primitives::{hex, BlockHash};
|
||||
use clap::Parser;
|
||||
use reth_db::{
|
||||
static_file::{
|
||||
ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask, ReceiptMask, TransactionMask,
|
||||
TransactionSenderMask,
|
||||
AccountChangesetMask, ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask,
|
||||
ReceiptMask, TransactionMask, TransactionSenderMask,
|
||||
},
|
||||
RawDupSort,
|
||||
};
|
||||
@@ -19,7 +19,7 @@ use reth_db_common::DbTool;
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_primitives_traits::ValueWithSubKey;
|
||||
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
|
||||
use reth_provider::{providers::ProviderNodeTypes, ChangeSetReader, StaticFileProviderFactory};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use tracing::error;
|
||||
|
||||
@@ -64,6 +64,10 @@ enum Subcommand {
|
||||
#[arg(value_parser = maybe_json_value_parser)]
|
||||
key: String,
|
||||
|
||||
/// The subkey to get content for, for example address in changeset
|
||||
#[arg(value_parser = maybe_json_value_parser)]
|
||||
subkey: Option<String>,
|
||||
|
||||
/// Output bytes instead of human-readable decoded value
|
||||
#[arg(long)]
|
||||
raw: bool,
|
||||
@@ -77,33 +81,77 @@ impl Command {
|
||||
Subcommand::Mdbx { table, key, subkey, end_key, end_subkey, raw } => {
|
||||
table.view(&GetValueViewer { tool, key, subkey, end_key, end_subkey, raw })?
|
||||
}
|
||||
Subcommand::StaticFile { segment, key, raw } => {
|
||||
let (key, mask): (u64, _) = match segment {
|
||||
Subcommand::StaticFile { segment, key, subkey, raw } => {
|
||||
let (key, subkey, mask): (u64, _, _) = match segment {
|
||||
StaticFileSegment::Headers => (
|
||||
table_key::<tables::Headers>(&key)?,
|
||||
None,
|
||||
<HeaderWithHashMask<HeaderTy<N>>>::MASK,
|
||||
),
|
||||
StaticFileSegment::Transactions => {
|
||||
(table_key::<tables::Transactions>(&key)?, <TransactionMask<TxTy<N>>>::MASK)
|
||||
}
|
||||
StaticFileSegment::Receipts => {
|
||||
(table_key::<tables::Receipts>(&key)?, <ReceiptMask<ReceiptTy<N>>>::MASK)
|
||||
}
|
||||
StaticFileSegment::Transactions => (
|
||||
table_key::<tables::Transactions>(&key)?,
|
||||
None,
|
||||
<TransactionMask<TxTy<N>>>::MASK,
|
||||
),
|
||||
StaticFileSegment::Receipts => (
|
||||
table_key::<tables::Receipts>(&key)?,
|
||||
None,
|
||||
<ReceiptMask<ReceiptTy<N>>>::MASK,
|
||||
),
|
||||
StaticFileSegment::TransactionSenders => (
|
||||
table_key::<tables::TransactionSenders>(&key)?,
|
||||
<TransactionSenderMask>::MASK,
|
||||
None,
|
||||
TransactionSenderMask::MASK,
|
||||
),
|
||||
StaticFileSegment::AccountChangeSets => {
|
||||
let subkey =
|
||||
table_subkey::<tables::AccountChangeSets>(subkey.as_deref()).ok();
|
||||
(
|
||||
table_key::<tables::AccountChangeSets>(&key)?,
|
||||
subkey,
|
||||
AccountChangesetMask::MASK,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let content = tool
|
||||
.provider_factory
|
||||
.static_file_provider()
|
||||
.get_segment_provider(segment, key)?
|
||||
.cursor()?
|
||||
.get(key.into(), mask)
|
||||
.map(|result| {
|
||||
result.map(|vec| vec.iter().map(|slice| slice.to_vec()).collect::<Vec<_>>())
|
||||
})?;
|
||||
// handle account changesets differently if a subkey is provided.
|
||||
if let StaticFileSegment::AccountChangeSets = segment {
|
||||
let Some(subkey) = subkey else {
|
||||
// get all changesets for the block
|
||||
let changesets = tool
|
||||
.provider_factory
|
||||
.static_file_provider()
|
||||
.account_block_changeset(key)?;
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&changesets)?);
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
let account = tool
|
||||
.provider_factory
|
||||
.static_file_provider()
|
||||
.get_account_before_block(key, subkey)?;
|
||||
|
||||
if let Some(account) = account {
|
||||
println!("{}", serde_json::to_string_pretty(&account)?);
|
||||
} else {
|
||||
error!(target: "reth::cli", "No content for the given table key.");
|
||||
}
|
||||
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let content = tool.provider_factory.static_file_provider().find_static_file(
|
||||
segment,
|
||||
|provider| {
|
||||
let mut cursor = provider.cursor()?;
|
||||
cursor.get(key.into(), mask).map(|result| {
|
||||
result.map(|vec| {
|
||||
vec.iter().map(|slice| slice.to_vec()).collect::<Vec<_>>()
|
||||
})
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
match content {
|
||||
Some(content) => {
|
||||
@@ -139,6 +187,9 @@ impl Command {
|
||||
)?;
|
||||
println!("{}", serde_json::to_string_pretty(&sender)?);
|
||||
}
|
||||
StaticFileSegment::AccountChangeSets => {
|
||||
unreachable!("account changeset static files are special cased before this match")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
let access_rights =
|
||||
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
|
||||
db_exec!(self.env, tool, N, access_rights, {
|
||||
command.execute(&tool, ctx.task_executor.clone())?;
|
||||
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
|
||||
});
|
||||
}
|
||||
Subcommands::StaticFileHeader(command) => {
|
||||
|
||||
@@ -9,7 +9,10 @@ use reth_db_api::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_core::version::version_metadata;
|
||||
use reth_node_core::{
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
version::version_metadata,
|
||||
};
|
||||
use reth_node_metrics::{
|
||||
chain::ChainSpecInfo,
|
||||
hooks::Hooks,
|
||||
@@ -53,11 +56,13 @@ impl Command {
|
||||
self,
|
||||
tool: &DbTool<N>,
|
||||
task_executor: TaskExecutor,
|
||||
data_dir: &ChainPath<DataDirPath>,
|
||||
) -> eyre::Result<()> {
|
||||
// Set up metrics server if requested
|
||||
let _metrics_handle = if let Some(listen_addr) = self.metrics {
|
||||
let chain_name = tool.provider_factory.chain_spec().chain().to_string();
|
||||
let executor = task_executor.clone();
|
||||
let pprof_dump_dir = data_dir.pprof_dumps();
|
||||
|
||||
let handle = task_executor.spawn_critical("metrics server", async move {
|
||||
let config = MetricServerConfig::new(
|
||||
@@ -73,6 +78,7 @@ impl Command {
|
||||
ChainSpecInfo { name: chain_name },
|
||||
executor,
|
||||
Hooks::builder().build(),
|
||||
pprof_dump_dir,
|
||||
);
|
||||
|
||||
// Spawn the metrics server
|
||||
|
||||
@@ -40,12 +40,17 @@ enum Subcommands {
|
||||
#[clap(rename_all = "snake_case")]
|
||||
pub enum SetCommand {
|
||||
/// Store receipts in static files instead of the database
|
||||
ReceiptsInStaticFiles {
|
||||
Receipts {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store transaction senders in static files instead of the database
|
||||
TransactionSendersInStaticFiles {
|
||||
TransactionSenders {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store account changesets in static files instead of the database
|
||||
AccountChangesets {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
@@ -94,11 +99,12 @@ impl Command {
|
||||
storages_history_in_rocksdb: _,
|
||||
transaction_hash_numbers_in_rocksdb: _,
|
||||
account_history_in_rocksdb: _,
|
||||
account_changesets_in_static_files: _,
|
||||
} = settings.unwrap_or_else(StorageSettings::legacy);
|
||||
|
||||
// Update the setting based on the key
|
||||
match cmd {
|
||||
SetCommand::ReceiptsInStaticFiles { value } => {
|
||||
SetCommand::Receipts { value } => {
|
||||
if settings.receipts_in_static_files == value {
|
||||
println!("receipts_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
@@ -106,7 +112,7 @@ impl Command {
|
||||
settings.receipts_in_static_files = value;
|
||||
println!("Set receipts_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::TransactionSendersInStaticFiles { value } => {
|
||||
SetCommand::TransactionSenders { value } => {
|
||||
if settings.transaction_senders_in_static_files == value {
|
||||
println!("transaction_senders_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
@@ -114,6 +120,14 @@ impl Command {
|
||||
settings.transaction_senders_in_static_files = value;
|
||||
println!("Set transaction_senders_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::AccountChangesets { value } => {
|
||||
if settings.account_changesets_in_static_files == value {
|
||||
println!("account_changesets_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.account_changesets_in_static_files = value;
|
||||
println!("Set account_changesets_in_static_files = {}", value);
|
||||
}
|
||||
}
|
||||
|
||||
// Write updated settings
|
||||
|
||||
@@ -69,9 +69,7 @@ pub async fn import_blocks_from_file<N>(
|
||||
provider_factory: ProviderFactory<N>,
|
||||
config: &Config,
|
||||
executor: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
consensus: Arc<
|
||||
impl FullConsensus<N::Primitives, Error = reth_consensus::ConsensusError> + 'static,
|
||||
>,
|
||||
consensus: Arc<impl FullConsensus<N::Primitives> + 'static>,
|
||||
) -> eyre::Result<ImportResult>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
@@ -198,7 +196,7 @@ pub fn build_import_pipeline_impl<N, C, E>(
|
||||
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
C: FullConsensus<N::Primitives, Error = reth_consensus::ConsensusError> + 'static,
|
||||
C: FullConsensus<N::Primitives> + 'static,
|
||||
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
{
|
||||
if !file_client.has_canonical_blocks() {
|
||||
|
||||
@@ -99,6 +99,7 @@ where
|
||||
/// * Headers: It will push an empty block.
|
||||
/// * Transactions: It will not push any tx, only increments the end block range.
|
||||
/// * Receipts: It will not push any receipt, only increments the end block range.
|
||||
/// * TransactionSenders: If the segment exists, increments the end block range.
|
||||
fn append_dummy_chain<N, F>(
|
||||
sf_provider: &StaticFileProvider<N>,
|
||||
target_height: BlockNumber,
|
||||
@@ -110,8 +111,15 @@ where
|
||||
{
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
|
||||
// Spawn jobs for incrementing the block end range of transactions and receipts
|
||||
for segment in [StaticFileSegment::Transactions, StaticFileSegment::Receipts] {
|
||||
// Spawn jobs for incrementing the block end range of transactions, receipts, and senders.
|
||||
for segment in [
|
||||
StaticFileSegment::Transactions,
|
||||
StaticFileSegment::Receipts,
|
||||
StaticFileSegment::TransactionSenders,
|
||||
] {
|
||||
if sf_provider.get_highest_static_file_block(segment).is_none() {
|
||||
continue
|
||||
}
|
||||
let tx_clone = tx.clone();
|
||||
let provider = sf_provider.clone();
|
||||
std::thread::spawn(move || {
|
||||
@@ -151,9 +159,15 @@ where
|
||||
|
||||
// If, for any reason, rayon crashes this verifies if all segments are at the same
|
||||
// target_height.
|
||||
for segment in
|
||||
[StaticFileSegment::Headers, StaticFileSegment::Receipts, StaticFileSegment::Transactions]
|
||||
{
|
||||
for segment in [
|
||||
StaticFileSegment::Headers,
|
||||
StaticFileSegment::Receipts,
|
||||
StaticFileSegment::Transactions,
|
||||
StaticFileSegment::TransactionSenders,
|
||||
] {
|
||||
if sf_provider.get_highest_static_file_block(segment).is_none() {
|
||||
continue
|
||||
}
|
||||
assert_eq!(
|
||||
sf_provider.latest_writer(segment)?.user_header().block_end(),
|
||||
Some(target_height),
|
||||
|
||||
@@ -87,6 +87,9 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
.unwrap_or_default();
|
||||
writer.prune_transaction_senders(to_delete, 0)?;
|
||||
}
|
||||
StaticFileSegment::AccountChangeSets => {
|
||||
writer.prune_account_changesets(highest_block)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::setup;
|
||||
use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
|
||||
use reth_consensus::{noop::NoopConsensus, FullConsensus};
|
||||
use reth_db::DatabaseEnv;
|
||||
use reth_db_api::{
|
||||
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
|
||||
where
|
||||
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
|
||||
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
|
||||
C: FullConsensus<E::Primitives> + 'static,
|
||||
{
|
||||
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
|
||||
|
||||
@@ -169,7 +169,7 @@ fn dry_run<N, E, C>(
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
|
||||
C: FullConsensus<E::Primitives> + 'static,
|
||||
{
|
||||
info!(target: "reth::cli", "Executing stage. [dry-run]");
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::setup;
|
||||
use alloy_primitives::{Address, BlockNumber};
|
||||
use eyre::Result;
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_consensus::{ConsensusError, FullConsensus};
|
||||
use reth_consensus::FullConsensus;
|
||||
use reth_db::DatabaseEnv;
|
||||
use reth_db_api::{database::Database, models::BlockNumberAddress, table::TableImporter, tables};
|
||||
use reth_db_common::DbTool;
|
||||
@@ -31,7 +31,7 @@ pub(crate) async fn dump_merkle_stage<N>(
|
||||
output_datadir: ChainPath<DataDirPath>,
|
||||
should_run: bool,
|
||||
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
|
||||
consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
|
||||
consensus: impl FullConsensus<N::Primitives> + 'static,
|
||||
) -> Result<()>
|
||||
where
|
||||
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
|
||||
@@ -79,7 +79,7 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
|
||||
tip_block_number: u64,
|
||||
output_db: &DatabaseEnv,
|
||||
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
|
||||
consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
|
||||
consensus: impl FullConsensus<N::Primitives> + 'static,
|
||||
) -> eyre::Result<()> {
|
||||
let (from, to) = range;
|
||||
let provider = db_tool.provider_factory.database_provider_rw()?;
|
||||
|
||||
@@ -153,6 +153,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
|
||||
}
|
||||
})
|
||||
.build(),
|
||||
data_dir.pprof_dumps(),
|
||||
);
|
||||
|
||||
MetricServer::new(config).serve().await?;
|
||||
|
||||
@@ -18,8 +18,8 @@ use tracing::{debug, error, trace};
|
||||
///
|
||||
/// Provides utilities for running a cli command to completion.
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct CliRunner {
|
||||
config: CliRunnerConfig,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
@@ -29,12 +29,18 @@ impl CliRunner {
|
||||
///
|
||||
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
|
||||
Ok(Self { tokio_runtime: tokio_runtime()? })
|
||||
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
|
||||
}
|
||||
|
||||
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
|
||||
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
Self { tokio_runtime }
|
||||
Self { config: CliRunnerConfig::new(), tokio_runtime }
|
||||
}
|
||||
|
||||
/// Sets the [`CliRunnerConfig`] for this runner.
|
||||
pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
|
||||
self.config = config;
|
||||
self
|
||||
}
|
||||
|
||||
/// Executes an async block on the runtime and blocks until completion.
|
||||
@@ -74,7 +80,7 @@ impl CliRunner {
|
||||
// after the command has finished or exit signal was received we shutdown the task
|
||||
// manager which fires the shutdown signal to all tasks spawned via the task
|
||||
// executor and awaiting on tasks spawned with graceful shutdown
|
||||
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
@@ -128,7 +134,7 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
// Shutdown the runtime on a separate thread
|
||||
@@ -211,6 +217,38 @@ pub struct CliContext {
|
||||
pub task_executor: TaskExecutor,
|
||||
}
|
||||
|
||||
/// Default timeout for graceful shutdown of tasks.
|
||||
const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Configuration for [`CliRunner`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CliRunnerConfig {
|
||||
/// Timeout for graceful shutdown of tasks.
|
||||
///
|
||||
/// After the command completes, this is the maximum time to wait for spawned tasks
|
||||
/// to finish before forcefully terminating them.
|
||||
pub graceful_shutdown_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for CliRunnerConfig {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl CliRunnerConfig {
|
||||
/// Creates a new config with default values.
|
||||
pub const fn new() -> Self {
|
||||
Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
|
||||
}
|
||||
|
||||
/// Sets the graceful shutdown timeout.
|
||||
pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.graceful_shutdown_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
|
||||
/// enabled
|
||||
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
|
||||
@@ -437,6 +437,8 @@ pub struct BlocksPerFileConfig {
|
||||
pub receipts: Option<u64>,
|
||||
/// Number of blocks per file for the transaction senders segment.
|
||||
pub transaction_senders: Option<u64>,
|
||||
/// Number of blocks per file for the account changesets segment.
|
||||
pub account_change_sets: Option<u64>,
|
||||
}
|
||||
|
||||
impl StaticFilesConfig {
|
||||
@@ -444,8 +446,13 @@ impl StaticFilesConfig {
|
||||
///
|
||||
/// Returns an error if any blocks per file value is zero.
|
||||
pub fn validate(&self) -> eyre::Result<()> {
|
||||
let BlocksPerFileConfig { headers, transactions, receipts, transaction_senders } =
|
||||
self.blocks_per_file;
|
||||
let BlocksPerFileConfig {
|
||||
headers,
|
||||
transactions,
|
||||
receipts,
|
||||
transaction_senders,
|
||||
account_change_sets,
|
||||
} = self.blocks_per_file;
|
||||
eyre::ensure!(headers != Some(0), "Headers segment blocks per file must be greater than 0");
|
||||
eyre::ensure!(
|
||||
transactions != Some(0),
|
||||
@@ -459,13 +466,22 @@ impl StaticFilesConfig {
|
||||
transaction_senders != Some(0),
|
||||
"Transaction senders segment blocks per file must be greater than 0"
|
||||
);
|
||||
eyre::ensure!(
|
||||
account_change_sets != Some(0),
|
||||
"Account changesets segment blocks per file must be greater than 0"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts the blocks per file configuration into a [`HashMap`] per segment.
|
||||
pub fn as_blocks_per_file_map(&self) -> HashMap<StaticFileSegment, u64> {
|
||||
let BlocksPerFileConfig { headers, transactions, receipts, transaction_senders } =
|
||||
self.blocks_per_file;
|
||||
let BlocksPerFileConfig {
|
||||
headers,
|
||||
transactions,
|
||||
receipts,
|
||||
transaction_senders,
|
||||
account_change_sets,
|
||||
} = self.blocks_per_file;
|
||||
|
||||
let mut map = HashMap::new();
|
||||
// Iterating over all possible segments allows us to do an exhaustive match here,
|
||||
@@ -476,6 +492,7 @@ impl StaticFilesConfig {
|
||||
StaticFileSegment::Transactions => transactions,
|
||||
StaticFileSegment::Receipts => receipts,
|
||||
StaticFileSegment::TransactionSenders => transaction_senders,
|
||||
StaticFileSegment::AccountChangeSets => account_change_sets,
|
||||
};
|
||||
|
||||
if let Some(blocks_per_file) = blocks_per_file {
|
||||
@@ -527,7 +544,7 @@ impl PruneConfig {
|
||||
|
||||
/// Returns whether there is any kind of receipt pruning configuration.
|
||||
pub fn has_receipts_pruning(&self) -> bool {
|
||||
self.segments.receipts.is_some() || !self.segments.receipts_log_filter.is_empty()
|
||||
self.segments.has_receipts_pruning()
|
||||
}
|
||||
|
||||
/// Merges values from `other` into `self`.
|
||||
|
||||
@@ -500,13 +500,11 @@ mod tests {
|
||||
let expected_blob_gas_used = 10 * DATA_GAS_PER_BLOB;
|
||||
|
||||
// validate blob, it should fail blob gas used validation
|
||||
assert_eq!(
|
||||
validate_block_pre_execution(&block, &chain_spec),
|
||||
Err(ConsensusError::BlobGasUsedDiff(GotExpected {
|
||||
got: 1,
|
||||
expected: expected_blob_gas_used
|
||||
}))
|
||||
);
|
||||
assert!(matches!(
|
||||
validate_block_pre_execution(&block, &chain_spec).unwrap_err(),
|
||||
ConsensusError::BlobGasUsedDiff(diff)
|
||||
if diff.got == 1 && diff.expected == expected_blob_gas_used
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -517,10 +515,10 @@ mod tests {
|
||||
|
||||
// Test exceeding default - should fail
|
||||
let header_33 = Header { extra_data: Bytes::from(vec![0; 33]), ..Default::default() };
|
||||
assert_eq!(
|
||||
validate_header_extra_data(&header_33, 32),
|
||||
Err(ConsensusError::ExtraDataExceedsMax { len: 33 })
|
||||
);
|
||||
assert!(matches!(
|
||||
validate_header_extra_data(&header_33, 32).unwrap_err(),
|
||||
ConsensusError::ExtraDataExceedsMax { len } if len == 33
|
||||
));
|
||||
|
||||
// Test with custom larger limit - should pass
|
||||
assert!(validate_header_extra_data(&header_33, 64).is_ok());
|
||||
|
||||
@@ -11,9 +11,10 @@
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
use alloc::{boxed::Box, fmt::Debug, string::String, vec::Vec};
|
||||
use alloc::{boxed::Box, fmt::Debug, string::String, sync::Arc, vec::Vec};
|
||||
use alloy_consensus::Header;
|
||||
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256};
|
||||
use core::error::Error;
|
||||
use reth_execution_types::BlockExecutionResult;
|
||||
use reth_primitives_traits::{
|
||||
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
|
||||
@@ -49,15 +50,12 @@ pub trait FullConsensus<N: NodePrimitives>: Consensus<N::Block> {
|
||||
/// Consensus is a protocol that chooses canonical chain.
|
||||
#[auto_impl::auto_impl(&, Arc)]
|
||||
pub trait Consensus<B: Block>: HeaderValidator<B::Header> {
|
||||
/// The error type related to consensus.
|
||||
type Error;
|
||||
|
||||
/// Ensures that body field values match the header.
|
||||
fn validate_body_against_header(
|
||||
&self,
|
||||
body: &B::Body,
|
||||
header: &SealedHeader<B::Header>,
|
||||
) -> Result<(), Self::Error>;
|
||||
) -> Result<(), ConsensusError>;
|
||||
|
||||
/// Validate a block disregarding world state, i.e. things that can be checked before sender
|
||||
/// recovery and execution.
|
||||
@@ -69,7 +67,7 @@ pub trait Consensus<B: Block>: HeaderValidator<B::Header> {
|
||||
/// **This should not be called for the genesis block**.
|
||||
///
|
||||
/// Note: validating blocks does not include other validations of the Consensus
|
||||
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), Self::Error>;
|
||||
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError>;
|
||||
}
|
||||
|
||||
/// `HeaderValidator` is a protocol that validates headers and their relationships.
|
||||
@@ -125,7 +123,7 @@ pub trait HeaderValidator<H = Header>: Debug + Send + Sync {
|
||||
}
|
||||
|
||||
/// Consensus Errors
|
||||
#[derive(Debug, PartialEq, Eq, Clone, thiserror::Error)]
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
pub enum ConsensusError {
|
||||
/// Error when the gas used in the header exceeds the gas limit.
|
||||
#[error("block used gas ({gas_used}) is greater than gas limit ({gas_limit})")]
|
||||
@@ -410,6 +408,9 @@ pub enum ConsensusError {
|
||||
/// Other, likely an injected L2 error.
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
/// Other unspecified error.
|
||||
#[error(transparent)]
|
||||
Custom(#[from] Arc<dyn Error + Send + Sync>),
|
||||
}
|
||||
|
||||
impl ConsensusError {
|
||||
@@ -447,3 +448,34 @@ pub struct TxGasLimitTooHighErr {
|
||||
/// The maximum allowed gas limit
|
||||
pub max_allowed: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("Custom L2 consensus error")]
|
||||
struct CustomL2Error;
|
||||
|
||||
#[test]
|
||||
fn test_custom_error_conversion() {
|
||||
// Test conversion from custom error to ConsensusError
|
||||
let custom_err = CustomL2Error;
|
||||
let arc_err: Arc<dyn Error + Send + Sync> = Arc::new(custom_err);
|
||||
let consensus_err: ConsensusError = arc_err.into();
|
||||
|
||||
// Verify it's the Custom variant
|
||||
assert!(matches!(consensus_err, ConsensusError::Custom(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_custom_error_display() {
|
||||
let custom_err = CustomL2Error;
|
||||
let arc_err: Arc<dyn Error + Send + Sync> = Arc::new(custom_err);
|
||||
let consensus_err: ConsensusError = arc_err.into();
|
||||
|
||||
// Verify the error message is preserved through transparent attribute
|
||||
let error_message = format!("{}", consensus_err);
|
||||
assert_eq!(error_message, "Custom L2 consensus error");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,19 +55,17 @@ impl<H> HeaderValidator<H> for NoopConsensus {
|
||||
}
|
||||
|
||||
impl<B: Block> Consensus<B> for NoopConsensus {
|
||||
type Error = ConsensusError;
|
||||
|
||||
/// Validates body against header (no-op implementation).
|
||||
fn validate_body_against_header(
|
||||
&self,
|
||||
_body: &B::Body,
|
||||
_header: &SealedHeader<B::Header>,
|
||||
) -> Result<(), Self::Error> {
|
||||
) -> Result<(), ConsensusError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates block before execution (no-op implementation).
|
||||
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), Self::Error> {
|
||||
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), ConsensusError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,13 +61,11 @@ impl<N: NodePrimitives> FullConsensus<N> for TestConsensus {
|
||||
}
|
||||
|
||||
impl<B: Block> Consensus<B> for TestConsensus {
|
||||
type Error = ConsensusError;
|
||||
|
||||
fn validate_body_against_header(
|
||||
&self,
|
||||
_body: &B::Body,
|
||||
_header: &SealedHeader<B::Header>,
|
||||
) -> Result<(), Self::Error> {
|
||||
) -> Result<(), ConsensusError> {
|
||||
if self.fail_body_against_header() {
|
||||
Err(ConsensusError::BaseFeeMissing)
|
||||
} else {
|
||||
@@ -75,7 +73,7 @@ impl<B: Block> Consensus<B> for TestConsensus {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), Self::Error> {
|
||||
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), ConsensusError> {
|
||||
if self.fail_validation() {
|
||||
Err(ConsensusError::BaseFeeMissing)
|
||||
} else {
|
||||
|
||||
@@ -113,7 +113,6 @@ pub async fn setup_engine_with_chain_import(
|
||||
let rocksdb_dir_path = datadir.join("rocksdb");
|
||||
|
||||
// Initialize the database using init_db (same as CLI import command)
|
||||
// Use the same database arguments as the node will use
|
||||
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
|
||||
let db_env = reth_db::init_db(&db_path, db_args)?;
|
||||
let db = Arc::new(db_env);
|
||||
@@ -317,7 +316,8 @@ mod tests {
|
||||
|
||||
// Import the chain
|
||||
{
|
||||
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
|
||||
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
|
||||
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
|
||||
let db = Arc::new(db_env);
|
||||
|
||||
let provider_factory: ProviderFactory<
|
||||
@@ -475,7 +475,8 @@ mod tests {
|
||||
let datadir = temp_dir.path().join("datadir");
|
||||
std::fs::create_dir_all(&datadir).unwrap();
|
||||
let db_path = datadir.join("db");
|
||||
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
|
||||
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
|
||||
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
|
||||
let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
|
||||
|
||||
// Create static files path
|
||||
|
||||
@@ -135,6 +135,8 @@ pub struct TreeConfig {
|
||||
storage_worker_count: usize,
|
||||
/// Number of account proof worker threads.
|
||||
account_worker_count: usize,
|
||||
/// Whether to enable V2 storage proofs.
|
||||
enable_proof_v2: bool,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -163,6 +165,7 @@ impl Default for TreeConfig {
|
||||
allow_unwind_canonical_header: false,
|
||||
storage_worker_count: default_storage_worker_count(),
|
||||
account_worker_count: default_account_worker_count(),
|
||||
enable_proof_v2: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,6 +197,7 @@ impl TreeConfig {
|
||||
allow_unwind_canonical_header: bool,
|
||||
storage_worker_count: usize,
|
||||
account_worker_count: usize,
|
||||
enable_proof_v2: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -219,6 +223,7 @@ impl TreeConfig {
|
||||
allow_unwind_canonical_header,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
enable_proof_v2,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -500,4 +505,15 @@ impl TreeConfig {
|
||||
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
|
||||
self
|
||||
}
|
||||
|
||||
/// Return whether V2 storage proofs are enabled.
|
||||
pub const fn enable_proof_v2(&self) -> bool {
|
||||
self.enable_proof_v2
|
||||
}
|
||||
|
||||
/// Setter for whether to enable V2 storage proofs.
|
||||
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
|
||||
self.enable_proof_v2 = enable_proof_v2;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_consensus::{ConsensusError, FullConsensus};
|
||||
use reth_consensus::FullConsensus;
|
||||
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
|
||||
use reth_engine_tree::{
|
||||
backfill::PipelineSync,
|
||||
@@ -70,7 +70,7 @@ where
|
||||
/// Constructor for `EngineService`.
|
||||
#[expect(clippy::too_many_arguments)]
|
||||
pub fn new<V, C>(
|
||||
consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<N::Primitives>>,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
client: Client,
|
||||
incoming_requests: EngineMessageStream<N::Payload>,
|
||||
|
||||
@@ -29,7 +29,6 @@ reth-provider.workspace = true
|
||||
reth-prune.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-stages-api.workspace = true
|
||||
reth-storage-errors.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-trie-parallel.workspace = true
|
||||
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::B256;
|
||||
use futures::FutureExt;
|
||||
use reth_consensus::{Consensus, ConsensusError};
|
||||
use reth_consensus::Consensus;
|
||||
use reth_network_p2p::{
|
||||
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
|
||||
BlockClient,
|
||||
@@ -81,7 +81,7 @@ where
|
||||
B: Block,
|
||||
{
|
||||
/// Create a new instance
|
||||
pub fn new(client: Client, consensus: Arc<dyn Consensus<B, Error = ConsensusError>>) -> Self {
|
||||
pub fn new(client: Client, consensus: Arc<dyn Consensus<B>>) -> Self {
|
||||
Self {
|
||||
full_block_client: FullBlockClient::new(client, consensus),
|
||||
inflight_full_block_requests: Vec::new(),
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
download::{BlockDownloader, DownloadAction, DownloadOutcome},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::Sender;
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
|
||||
@@ -15,7 +16,6 @@ use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fmt::Display,
|
||||
sync::mpsc::Sender,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::metrics::PersistenceMetrics;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_errors::ProviderError;
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
@@ -15,7 +16,6 @@ use std::{
|
||||
time::Instant,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::{debug, error};
|
||||
|
||||
/// Writes parts of reth's in memory tree state to the database and static files.
|
||||
@@ -183,13 +183,13 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
|
||||
///
|
||||
/// First, header, transaction, and receipt-related data should be written to static files.
|
||||
/// Then the execution history-related data will be written to the database.
|
||||
SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
|
||||
SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
///
|
||||
/// This will first update checkpoints from the database, then remove actual block data from
|
||||
/// static files.
|
||||
RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
|
||||
RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
|
||||
|
||||
/// Update the persisted finalized block on disk
|
||||
SaveFinalizedBlock(u64),
|
||||
@@ -261,7 +261,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
pub fn save_blocks(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<T>>,
|
||||
tx: oneshot::Sender<Option<BlockNumHash>>,
|
||||
tx: CrossbeamSender<Option<BlockNumHash>>,
|
||||
) -> Result<(), SendError<PersistenceAction<T>>> {
|
||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||
}
|
||||
@@ -290,7 +290,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
pub fn remove_blocks_above(
|
||||
&self,
|
||||
block_num: u64,
|
||||
tx: oneshot::Sender<Option<BlockNumHash>>,
|
||||
tx: CrossbeamSender<Option<BlockNumHash>>,
|
||||
) -> Result<(), SendError<PersistenceAction<T>>> {
|
||||
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
|
||||
}
|
||||
@@ -319,22 +319,22 @@ mod tests {
|
||||
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_empty() {
|
||||
#[test]
|
||||
fn test_save_blocks_empty() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let blocks = vec![];
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let hash = rx.await.unwrap();
|
||||
let hash = rx.recv().unwrap();
|
||||
assert_eq!(hash, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_single_block() {
|
||||
#[test]
|
||||
fn test_save_blocks_single_block() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
let block_number = 0;
|
||||
@@ -344,37 +344,35 @@ mod tests {
|
||||
let block_hash = executed.recovered_block().hash();
|
||||
|
||||
let blocks = vec![executed];
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let BlockNumHash { hash: actual_hash, number: _ } =
|
||||
tokio::time::timeout(std::time::Duration::from_secs(10), rx)
|
||||
.await
|
||||
.expect("test timed out")
|
||||
.expect("channel closed unexpectedly")
|
||||
.expect("no hash returned");
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx
|
||||
.recv_timeout(std::time::Duration::from_secs(10))
|
||||
.expect("test timed out")
|
||||
.expect("no hash returned");
|
||||
|
||||
assert_eq!(block_hash, actual_hash);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_multiple_blocks() {
|
||||
#[test]
|
||||
fn test_save_blocks_multiple_blocks() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let mut test_block_builder = TestBlockBuilder::eth();
|
||||
let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
|
||||
assert_eq!(last_hash, actual_hash);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_multiple_calls() {
|
||||
#[test]
|
||||
fn test_save_blocks_multiple_calls() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
@@ -383,11 +381,11 @@ mod tests {
|
||||
for range in ranges {
|
||||
let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
persistence_handle.save_blocks(blocks, tx).unwrap();
|
||||
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
|
||||
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
|
||||
assert_eq!(last_hash, actual_hash);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -629,6 +629,11 @@ impl SavedCache {
|
||||
Arc::strong_count(&self.usage_guard) == 1
|
||||
}
|
||||
|
||||
/// Returns the current strong count of the usage guard.
|
||||
pub(crate) fn usage_count(&self) -> usize {
|
||||
Arc::strong_count(&self.usage_guard)
|
||||
}
|
||||
|
||||
/// Returns the [`ExecutionCache`] belonging to the tracked hash.
|
||||
pub(crate) const fn cache(&self) -> &ExecutionCache {
|
||||
&self.caches
|
||||
|
||||
@@ -6,15 +6,13 @@ use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError};
|
||||
use reth_evm::execute::InternalBlockExecutionError;
|
||||
use reth_payload_primitives::NewPayloadError;
|
||||
use reth_primitives_traits::{Block, BlockBody, SealedBlock};
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
|
||||
/// This is an error that can come from advancing persistence. Either this can be a
|
||||
/// [`TryRecvError`], or this can be a [`ProviderError`]
|
||||
/// This is an error that can come from advancing persistence.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AdvancePersistenceError {
|
||||
/// An error that can be from failing to receive a value from persistence
|
||||
#[error(transparent)]
|
||||
RecvError(#[from] TryRecvError),
|
||||
/// The persistence channel was closed unexpectedly
|
||||
#[error("persistence channel closed")]
|
||||
ChannelClosed,
|
||||
/// A provider error
|
||||
#[error(transparent)]
|
||||
Provider(#[from] ProviderError),
|
||||
|
||||
@@ -321,7 +321,7 @@ impl NewPayloadStatusMetrics {
|
||||
}
|
||||
|
||||
/// Metrics for non-execution related block validation.
|
||||
#[derive(Metrics)]
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.block_validation")]
|
||||
pub(crate) struct BlockValidationMetrics {
|
||||
/// Total number of storage tries updated in the state root calculation
|
||||
@@ -348,6 +348,14 @@ pub(crate) struct BlockValidationMetrics {
|
||||
pub(crate) post_execution_validation_duration: Histogram,
|
||||
/// Total duration of the new payload call
|
||||
pub(crate) total_duration: Histogram,
|
||||
/// Size of `HashedPostStateSorted` (`total_len`)
|
||||
pub(crate) hashed_post_state_size: Histogram,
|
||||
/// Size of `TrieUpdatesSorted` (`total_len`)
|
||||
pub(crate) trie_updates_sorted_size: Histogram,
|
||||
/// Size of `AnchoredTrieInput` overlay `TrieUpdatesSorted` (`total_len`)
|
||||
pub(crate) anchored_overlay_trie_updates_size: Histogram,
|
||||
/// Size of `AnchoredTrieInput` overlay `HashedPostStateSorted` (`total_len`)
|
||||
pub(crate) anchored_overlay_hashed_state_size: Histogram,
|
||||
}
|
||||
|
||||
impl BlockValidationMetrics {
|
||||
|
||||
@@ -37,18 +37,12 @@ use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
use revm::state::EvmState;
|
||||
use state::TreeState;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
ops,
|
||||
sync::{
|
||||
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
|
||||
Arc,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
oneshot::{self, error::TryRecvError},
|
||||
oneshot,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@@ -240,7 +234,7 @@ where
|
||||
C: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
provider: P,
|
||||
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<N>>,
|
||||
payload_validator: V,
|
||||
/// Keeps track of internals such as executed and buffered blocks.
|
||||
state: EngineApiTreeState<N>,
|
||||
@@ -326,7 +320,7 @@ where
|
||||
#[expect(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
provider: P,
|
||||
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<N>>,
|
||||
payload_validator: V,
|
||||
outgoing: UnboundedSender<EngineApiEvent<N>>,
|
||||
state: EngineApiTreeState<N>,
|
||||
@@ -338,7 +332,7 @@ where
|
||||
engine_kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = std::sync::mpsc::channel();
|
||||
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
|
||||
|
||||
Self {
|
||||
provider,
|
||||
@@ -368,7 +362,7 @@ where
|
||||
#[expect(clippy::complexity)]
|
||||
pub fn spawn_new(
|
||||
provider: P,
|
||||
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<N>>,
|
||||
payload_validator: V,
|
||||
persistence: PersistenceHandle<N>,
|
||||
payload_builder: PayloadBuilderHandle<T>,
|
||||
@@ -423,8 +417,8 @@ where
|
||||
/// This will block the current thread and process incoming messages.
|
||||
pub fn run(mut self) {
|
||||
loop {
|
||||
match self.try_recv_engine_message() {
|
||||
Ok(Some(msg)) => {
|
||||
match self.wait_for_event() {
|
||||
LoopEvent::EngineMessage(msg) => {
|
||||
debug!(target: "engine::tree", %msg, "received new engine message");
|
||||
match self.on_engine_message(msg) {
|
||||
Ok(ops::ControlFlow::Break(())) => return,
|
||||
@@ -435,15 +429,22 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
|
||||
LoopEvent::PersistenceComplete { result, start_time } => {
|
||||
if let Err(err) = self.on_persistence_complete(result, start_time) {
|
||||
error!(target: "engine::tree", %err, "Persistence complete handling failed");
|
||||
return
|
||||
}
|
||||
}
|
||||
Err(_err) => {
|
||||
error!(target: "engine::tree", "Engine channel disconnected");
|
||||
LoopEvent::Disconnected => {
|
||||
error!(target: "engine::tree", "Channel disconnected");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Always check if we need to trigger new persistence after any event:
|
||||
// - After engine messages: new blocks may have been inserted that exceed the
|
||||
// persistence threshold
|
||||
// - After persistence completion: we can now persist more blocks if needed
|
||||
if let Err(err) = self.advance_persistence() {
|
||||
error!(target: "engine::tree", %err, "Advancing persistence failed");
|
||||
return
|
||||
@@ -451,6 +452,47 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until the next event is ready: either an incoming engine message or a persistence
|
||||
/// completion (if one is in progress).
|
||||
///
|
||||
/// Uses biased selection to prioritize persistence completion to update in-memory state and
|
||||
/// unblock further writes.
|
||||
fn wait_for_event(&mut self) -> LoopEvent<T, N> {
|
||||
// Take ownership of persistence rx if present
|
||||
let maybe_persistence = self.persistence_state.rx.take();
|
||||
|
||||
if let Some((persistence_rx, start_time, action)) = maybe_persistence {
|
||||
// Biased select prioritizes persistence completion to update in memory state and
|
||||
// unblock further writes
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(persistence_rx) -> result => {
|
||||
// Don't put it back - consumed (oneshot-like behavior)
|
||||
match result {
|
||||
Ok(value) => LoopEvent::PersistenceComplete {
|
||||
result: value,
|
||||
start_time,
|
||||
},
|
||||
Err(_) => LoopEvent::Disconnected,
|
||||
}
|
||||
},
|
||||
recv(self.incoming) -> msg => {
|
||||
// Put the persistence rx back - we didn't consume it
|
||||
self.persistence_state.rx = Some((persistence_rx, start_time, action));
|
||||
match msg {
|
||||
Ok(m) => LoopEvent::EngineMessage(m),
|
||||
Err(_) => LoopEvent::Disconnected,
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// No persistence in progress - just wait on incoming
|
||||
match self.incoming.recv() {
|
||||
Ok(m) => LoopEvent::EngineMessage(m),
|
||||
Err(_) => LoopEvent::Disconnected,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Invoked when previously requested blocks were downloaded.
|
||||
///
|
||||
/// If the block count exceeds the configured batch size we're allowed to execute at once, this
|
||||
@@ -1191,39 +1233,13 @@ where
|
||||
.with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
|
||||
}
|
||||
|
||||
/// Attempts to receive the next engine request.
|
||||
///
|
||||
/// If there's currently no persistence action in progress, this will block until a new request
|
||||
/// is received. If there's a persistence action in progress, this will try to receive the
|
||||
/// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
|
||||
/// received in time.
|
||||
///
|
||||
/// Returns an error if the engine channel is disconnected.
|
||||
#[expect(clippy::type_complexity)]
|
||||
fn try_recv_engine_message(
|
||||
&self,
|
||||
) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
|
||||
if self.persistence_state.in_progress() {
|
||||
// try to receive the next request with a timeout to not block indefinitely
|
||||
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(msg) => Ok(Some(msg)),
|
||||
Err(err) => match err {
|
||||
RecvTimeoutError::Timeout => Ok(None),
|
||||
RecvTimeoutError::Disconnected => Err(RecvError),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
self.incoming.recv().map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper method to remove blocks and set the persistence state. This ensures we keep track of
|
||||
/// the current persistence action while we're removing blocks.
|
||||
fn remove_blocks(&mut self, new_tip_num: u64) {
|
||||
debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
|
||||
if new_tip_num < self.persistence_state.last_persisted_block.number {
|
||||
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
|
||||
self.persistence_state.start_remove(new_tip_num, rx);
|
||||
}
|
||||
@@ -1245,35 +1261,17 @@ where
|
||||
.expect("Checked non-empty persisting blocks");
|
||||
|
||||
debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
|
||||
|
||||
self.persistence_state.start_save(highest_num_hash, rx);
|
||||
}
|
||||
|
||||
/// Attempts to advance the persistence state.
|
||||
/// Triggers new persistence actions if no persistence task is currently in progress.
|
||||
///
|
||||
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
|
||||
/// or send a new persistence action if necessary.
|
||||
/// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
|
||||
/// Persistence completion is handled separately via the `wait_for_event` method.
|
||||
fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
|
||||
if self.persistence_state.in_progress() {
|
||||
let (mut rx, start_time, current_action) = self
|
||||
.persistence_state
|
||||
.rx
|
||||
.take()
|
||||
.expect("if a persistence task is in progress Receiver must be Some");
|
||||
// Check if persistence has complete
|
||||
match rx.try_recv() {
|
||||
Ok(last_persisted_hash_num) => {
|
||||
self.on_persistence_complete(last_persisted_hash_num, start_time)?;
|
||||
}
|
||||
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
|
||||
Err(TryRecvError::Empty) => {
|
||||
self.persistence_state.rx = Some((rx, start_time, current_action))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.persistence_state.in_progress() {
|
||||
if let Some(new_tip_num) = self.find_disk_reorg()? {
|
||||
self.remove_blocks(new_tip_num)
|
||||
@@ -1306,7 +1304,7 @@ where
|
||||
loop {
|
||||
// Wait for any in-progress persistence to complete (blocking)
|
||||
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
|
||||
let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
|
||||
let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
|
||||
self.on_persistence_complete(result, start_time)?;
|
||||
}
|
||||
|
||||
@@ -1322,6 +1320,31 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to poll for a completed persistence task (non-blocking).
|
||||
///
|
||||
/// Returns `true` if a persistence task was completed, `false` otherwise.
|
||||
#[cfg(test)]
|
||||
pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
|
||||
let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
match rx.try_recv() {
|
||||
Ok(result) => {
|
||||
self.on_persistence_complete(result, start_time)?;
|
||||
Ok(true)
|
||||
}
|
||||
Err(crossbeam_channel::TryRecvError::Empty) => {
|
||||
// Not ready yet, put it back
|
||||
self.persistence_state.rx = Some((rx, start_time, action));
|
||||
Ok(false)
|
||||
}
|
||||
Err(crossbeam_channel::TryRecvError::Disconnected) => {
|
||||
Err(AdvancePersistenceError::ChannelClosed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a completed persistence task.
|
||||
fn on_persistence_complete(
|
||||
&mut self,
|
||||
@@ -2848,6 +2871,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Events received in the main engine loop.
|
||||
#[derive(Debug)]
|
||||
enum LoopEvent<T, N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
T: PayloadTypes,
|
||||
{
|
||||
/// An engine API message was received.
|
||||
EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
|
||||
/// A persistence task completed.
|
||||
PersistenceComplete {
|
||||
/// The result of the persistence operation.
|
||||
result: Option<BlockNumHash>,
|
||||
/// When the persistence operation started.
|
||||
start_time: Instant,
|
||||
},
|
||||
/// A channel was disconnected.
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
|
||||
/// variant.
|
||||
///
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Configured sparse trie enum for switching between serial and parallel implementations.
|
||||
|
||||
use alloy_primitives::B256;
|
||||
use reth_trie::{Nibbles, ProofTrieNode, TrieMasks, TrieNode};
|
||||
use reth_trie::{BranchNodeMasks, Nibbles, ProofTrieNode, TrieNode};
|
||||
use reth_trie_sparse::{
|
||||
errors::SparseTrieResult, provider::TrieNodeProvider, LeafLookup, LeafLookupError,
|
||||
SerialSparseTrie, SparseTrieInterface, SparseTrieUpdates,
|
||||
@@ -44,7 +44,7 @@ impl SparseTrieInterface for ConfiguredSparseTrie {
|
||||
fn with_root(
|
||||
self,
|
||||
root: TrieNode,
|
||||
masks: TrieMasks,
|
||||
masks: Option<BranchNodeMasks>,
|
||||
retain_updates: bool,
|
||||
) -> SparseTrieResult<Self> {
|
||||
match self {
|
||||
@@ -75,7 +75,7 @@ impl SparseTrieInterface for ConfiguredSparseTrie {
|
||||
&mut self,
|
||||
path: Nibbles,
|
||||
node: TrieNode,
|
||||
masks: TrieMasks,
|
||||
masks: Option<BranchNodeMasks>,
|
||||
) -> SparseTrieResult<()> {
|
||||
match self {
|
||||
Self::Serial(trie) => trie.reveal_node(path, node, masks),
|
||||
|
||||
@@ -274,24 +274,23 @@ where
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let v2_proofs_enabled = config.enable_proof_v2();
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
task_ctx,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
v2_proofs_enabled,
|
||||
);
|
||||
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
to_multi_proof,
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof,
|
||||
);
|
||||
|
||||
// wire the multiproof task to the prewarm task
|
||||
let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
|
||||
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
@@ -316,7 +315,7 @@ where
|
||||
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
|
||||
|
||||
PayloadHandle {
|
||||
to_multi_proof,
|
||||
to_multi_proof: Some(to_multi_proof),
|
||||
prewarm_handle,
|
||||
state_root: Some(state_root_rx),
|
||||
transactions: execution_rx,
|
||||
@@ -492,38 +491,40 @@ where
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
{
|
||||
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
|
||||
// there's none to reuse.
|
||||
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
|
||||
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
|
||||
let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
|
||||
ConfiguredSparseTrie::Serial(Default::default())
|
||||
} else {
|
||||
ConfiguredSparseTrie::Parallel(Box::new(
|
||||
ParallelSparseTrie::default()
|
||||
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
|
||||
))
|
||||
});
|
||||
ClearedSparseStateTrie::from_state_trie(
|
||||
SparseStateTrie::new()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true),
|
||||
)
|
||||
});
|
||||
|
||||
let task =
|
||||
SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
|
||||
sparse_trie_rx,
|
||||
proof_worker_handle,
|
||||
self.trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
);
|
||||
|
||||
let disable_parallel_sparse_trie = self.disable_parallel_sparse_trie;
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let span = Span::current();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
|
||||
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
|
||||
// if there's none to reuse.
|
||||
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
|
||||
let default_trie = SparseTrie::blind_from(if disable_parallel_sparse_trie {
|
||||
ConfiguredSparseTrie::Serial(Default::default())
|
||||
} else {
|
||||
ConfiguredSparseTrie::Parallel(Box::new(
|
||||
ParallelSparseTrie::default()
|
||||
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
|
||||
))
|
||||
});
|
||||
ClearedSparseStateTrie::from_state_trie(
|
||||
SparseStateTrie::new()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true),
|
||||
)
|
||||
});
|
||||
|
||||
let task = SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
|
||||
sparse_trie_rx,
|
||||
proof_worker_handle,
|
||||
trie_metrics,
|
||||
sparse_state_trie,
|
||||
);
|
||||
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
@@ -775,12 +776,34 @@ impl ExecutionCache {
|
||||
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
|
||||
}
|
||||
|
||||
cache
|
||||
.as_ref()
|
||||
if let Some(c) = cache.as_ref() {
|
||||
let cached_hash = c.executed_block_hash();
|
||||
// Check that the cache hash matches the parent hash of the current block. It won't
|
||||
// match in case it's a fork block.
|
||||
let hash_matches = cached_hash == parent_hash;
|
||||
// Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
|
||||
// a reference to this cache. We can only reuse it when we have exclusive access.
|
||||
.filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
|
||||
.cloned()
|
||||
let available = c.is_available();
|
||||
let usage_count = c.usage_count();
|
||||
|
||||
debug!(
|
||||
target: "engine::caching",
|
||||
%cached_hash,
|
||||
%parent_hash,
|
||||
hash_matches,
|
||||
available,
|
||||
usage_count,
|
||||
"Existing cache found"
|
||||
);
|
||||
|
||||
if hash_matches && available {
|
||||
return Some(c.clone());
|
||||
}
|
||||
} else {
|
||||
debug!(target: "engine::caching", %parent_hash, "No cache found");
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Clears the tracked cache
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -29,7 +29,6 @@ use alloy_evm::Database;
|
||||
use alloy_primitives::{keccak256, map::B256Set, B256};
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use metrics::{Counter, Gauge, Histogram};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
|
||||
use reth_execution_types::ExecutionOutcome;
|
||||
use reth_metrics::Metrics;
|
||||
@@ -619,8 +618,7 @@ where
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
let targets = multiproof_targets_from_state(res.state);
|
||||
let storage_targets = targets.storage_targets_count();
|
||||
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
|
||||
drop(_enter);
|
||||
@@ -767,33 +765,37 @@ where
|
||||
|
||||
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
|
||||
/// given state.
|
||||
fn multiproof_targets_from_state(state: EvmState) -> MultiProofTargets {
|
||||
state
|
||||
.into_par_iter()
|
||||
.filter_map(|(address, account)| {
|
||||
// if the account was not touched, or if the account was selfdestructed, do not
|
||||
// fetch proofs for it
|
||||
//
|
||||
// Since selfdestruct can only happen in the same transaction, we can skip
|
||||
// prefetching proofs for selfdestructed accounts
|
||||
//
|
||||
// See: https://eips.ethereum.org/EIPS/eip-6780
|
||||
if !account.is_touched() || account.is_selfdestructed() {
|
||||
return None;
|
||||
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
|
||||
let mut targets = MultiProofTargets::with_capacity(state.len());
|
||||
let mut storage_targets = 0;
|
||||
for (addr, account) in state {
|
||||
// if the account was not touched, or if the account was selfdestructed, do not
|
||||
// fetch proofs for it
|
||||
//
|
||||
// Since selfdestruct can only happen in the same transaction, we can skip
|
||||
// prefetching proofs for selfdestructed accounts
|
||||
//
|
||||
// See: https://eips.ethereum.org/EIPS/eip-6780
|
||||
if !account.is_touched() || account.is_selfdestructed() {
|
||||
continue
|
||||
}
|
||||
|
||||
let mut storage_set =
|
||||
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
|
||||
for (key, slot) in account.storage {
|
||||
// do nothing if unchanged
|
||||
if !slot.is_changed() {
|
||||
continue
|
||||
}
|
||||
|
||||
let hashed_address = keccak256(address);
|
||||
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
|
||||
}
|
||||
|
||||
let storage_set: B256Set = account
|
||||
.storage
|
||||
.into_iter()
|
||||
.filter(|(_, slot)| slot.is_changed())
|
||||
.map(|(key, _)| keccak256(B256::new(key.to_be_bytes())))
|
||||
.collect();
|
||||
storage_targets += storage_set.len();
|
||||
targets.insert(keccak256(addr), storage_set);
|
||||
}
|
||||
|
||||
Some((hashed_address, storage_set))
|
||||
})
|
||||
.collect()
|
||||
(targets, storage_targets)
|
||||
}
|
||||
|
||||
/// The events the pre-warm task can handle.
|
||||
|
||||
@@ -121,8 +121,9 @@ where
|
||||
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
|
||||
})?;
|
||||
|
||||
self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
|
||||
self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
|
||||
let end = Instant::now();
|
||||
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
|
||||
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
|
||||
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
@@ -173,7 +174,7 @@ where
|
||||
.par_bridge()
|
||||
.map(|(address, storage, storage_trie)| {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
|
||||
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
|
||||
.entered();
|
||||
|
||||
trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
|
||||
|
||||
@@ -34,13 +34,12 @@ use reth_primitives_traits::{
|
||||
SealedHeader, SignerRecoverable,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockReader,
|
||||
DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome, HashedPostStateProvider,
|
||||
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
|
||||
StateProviderFactory, StateReader, TrieReader,
|
||||
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
|
||||
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome,
|
||||
HashedPostStateProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
|
||||
StateProvider, StateProviderFactory, StateReader, TrieReader,
|
||||
};
|
||||
use reth_revm::db::State;
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
|
||||
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
|
||||
use revm_primitives::Address;
|
||||
@@ -112,7 +111,7 @@ where
|
||||
/// Provider for database access.
|
||||
provider: P,
|
||||
/// Consensus implementation for validation.
|
||||
consensus: Arc<dyn FullConsensus<Evm::Primitives, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
|
||||
/// EVM configuration.
|
||||
evm_config: Evm,
|
||||
/// Configuration for the tree.
|
||||
@@ -136,8 +135,15 @@ impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
P: DatabaseProviderFactory<
|
||||
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
Provider: BlockReader
|
||||
+ TrieReader
|
||||
+ StageCheckpointReader
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ BlockNumReader,
|
||||
> + BlockReader<Header = N::BlockHeader>
|
||||
+ ChangeSetReader
|
||||
+ BlockNumReader
|
||||
+ StateProviderFactory
|
||||
+ StateReader
|
||||
+ HashedPostStateProvider
|
||||
@@ -149,7 +155,7 @@ where
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
provider: P,
|
||||
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn FullConsensus<N>>,
|
||||
evm_config: Evm,
|
||||
validator: V,
|
||||
config: TreeConfig,
|
||||
@@ -616,7 +622,8 @@ where
|
||||
.without_state_clear()
|
||||
.build();
|
||||
|
||||
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env.clone());
|
||||
let spec_id = *env.evm_env.spec_id();
|
||||
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
|
||||
let ctx =
|
||||
self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
|
||||
let mut executor = self.evm_config.create_executor(evm, ctx);
|
||||
@@ -632,7 +639,7 @@ where
|
||||
CachedPrecompile::wrap(
|
||||
precompile,
|
||||
self.precompile_cache_map.cache_for_address(*address),
|
||||
*env.evm_env.spec_id(),
|
||||
spec_id,
|
||||
Some(metrics),
|
||||
)
|
||||
});
|
||||
@@ -713,8 +720,7 @@ where
|
||||
|
||||
Ok(StateRoot::new(&provider, &provider)
|
||||
.with_prefix_sets(prefix_sets.freeze())
|
||||
.root_with_updates()
|
||||
.map_err(Into::<DatabaseError>::into)?)
|
||||
.root_with_updates()?)
|
||||
}
|
||||
|
||||
/// Validates the block after execution.
|
||||
@@ -1080,16 +1086,33 @@ where
|
||||
ancestors,
|
||||
);
|
||||
let deferred_handle_task = deferred_trie_data.clone();
|
||||
let deferred_compute_duration =
|
||||
self.metrics.block_validation.deferred_trie_compute_duration.clone();
|
||||
let block_validation_metrics = self.metrics.block_validation.clone();
|
||||
|
||||
// Spawn background task to compute trie data. Calling `wait_cloned` will compute from
|
||||
// the stored inputs and cache the result, so subsequent calls return immediately.
|
||||
let compute_trie_input_task = move || {
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| {
|
||||
let compute_start = Instant::now();
|
||||
let _ = deferred_handle_task.wait_cloned();
|
||||
deferred_compute_duration.record(compute_start.elapsed().as_secs_f64());
|
||||
let computed = deferred_handle_task.wait_cloned();
|
||||
block_validation_metrics
|
||||
.deferred_trie_compute_duration
|
||||
.record(compute_start.elapsed().as_secs_f64());
|
||||
|
||||
// Record sizes of the computed trie data
|
||||
block_validation_metrics
|
||||
.hashed_post_state_size
|
||||
.record(computed.hashed_state.total_len() as f64);
|
||||
block_validation_metrics
|
||||
.trie_updates_sorted_size
|
||||
.record(computed.trie_updates.total_len() as f64);
|
||||
if let Some(anchored) = &computed.anchored_trie_input {
|
||||
block_validation_metrics
|
||||
.anchored_overlay_trie_updates_size
|
||||
.record(anchored.trie_input.nodes.total_len() as f64);
|
||||
block_validation_metrics
|
||||
.anchored_overlay_hashed_state_size
|
||||
.record(anchored.trie_input.state.total_len() as f64);
|
||||
}
|
||||
}));
|
||||
|
||||
if result.is_err() {
|
||||
@@ -1185,10 +1208,17 @@ pub trait EngineValidator<
|
||||
impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
|
||||
where
|
||||
P: DatabaseProviderFactory<
|
||||
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
Provider: BlockReader
|
||||
+ TrieReader
|
||||
+ StageCheckpointReader
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ BlockNumReader,
|
||||
> + BlockReader<Header = N::BlockHeader>
|
||||
+ StateProviderFactory
|
||||
+ StateReader
|
||||
+ ChangeSetReader
|
||||
+ BlockNumReader
|
||||
+ HashedPostStateProvider
|
||||
+ Clone
|
||||
+ 'static,
|
||||
|
||||
@@ -22,12 +22,12 @@
|
||||
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::Receiver as CrossbeamReceiver;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::trace;
|
||||
|
||||
/// The state of the persistence task.
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub struct PersistenceState {
|
||||
/// Hash and number of the last block persisted.
|
||||
///
|
||||
@@ -36,7 +36,7 @@ pub struct PersistenceState {
|
||||
/// Receiver end of channel where the result of the persistence task will be
|
||||
/// sent when done. A None value means there's no persistence task in progress.
|
||||
pub(crate) rx:
|
||||
Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
|
||||
Option<(CrossbeamReceiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
|
||||
}
|
||||
|
||||
impl PersistenceState {
|
||||
@@ -50,7 +50,7 @@ impl PersistenceState {
|
||||
pub(crate) fn start_remove(
|
||||
&mut self,
|
||||
new_tip_num: u64,
|
||||
rx: oneshot::Receiver<Option<BlockNumHash>>,
|
||||
rx: CrossbeamReceiver<Option<BlockNumHash>>,
|
||||
) {
|
||||
self.rx =
|
||||
Some((rx, Instant::now(), CurrentPersistenceAction::RemovingBlocks { new_tip_num }));
|
||||
@@ -60,7 +60,7 @@ impl PersistenceState {
|
||||
pub(crate) fn start_save(
|
||||
&mut self,
|
||||
highest: BlockNumHash,
|
||||
rx: oneshot::Receiver<Option<BlockNumHash>>,
|
||||
rx: CrossbeamReceiver<Option<BlockNumHash>>,
|
||||
) {
|
||||
self.rx = Some((rx, Instant::now(), CurrentPersistenceAction::SavingBlocks { highest }));
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ use std::{
|
||||
collections::BTreeMap,
|
||||
str::FromStr,
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
mpsc::{Receiver, Sender},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
@@ -97,6 +97,7 @@ struct TestChannel<T> {
|
||||
impl<T: Send + 'static> TestChannel<T> {
|
||||
/// Creates a new test channel
|
||||
fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
|
||||
use std::sync::mpsc::channel;
|
||||
let (original_tx, original_rx) = channel();
|
||||
let (wrapped_tx, wrapped_rx) = channel();
|
||||
let (release_tx, release_rx) = channel();
|
||||
@@ -143,7 +144,9 @@ struct TestHarness {
|
||||
BasicEngineValidator<MockEthProvider, MockEvmConfig, MockEngineValidator>,
|
||||
MockEvmConfig,
|
||||
>,
|
||||
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
|
||||
to_tree_tx: crossbeam_channel::Sender<
|
||||
FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>,
|
||||
>,
|
||||
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
|
||||
blocks: Vec<ExecutedBlock>,
|
||||
action_rx: Receiver<PersistenceAction>,
|
||||
@@ -153,6 +156,7 @@ struct TestHarness {
|
||||
|
||||
impl TestHarness {
|
||||
fn new(chain_spec: Arc<ChainSpec>) -> Self {
|
||||
use std::sync::mpsc::channel;
|
||||
let (action_tx, action_rx) = channel();
|
||||
Self::with_persistence_channel(chain_spec, action_tx, action_rx)
|
||||
}
|
||||
@@ -205,7 +209,7 @@ impl TestHarness {
|
||||
engine_api_tree_state,
|
||||
canonical_in_memory_state,
|
||||
persistence_handle,
|
||||
PersistenceState::default(),
|
||||
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
|
||||
payload_builder,
|
||||
// always assume enough parallelism for tests
|
||||
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
|
||||
@@ -399,10 +403,8 @@ impl ValidatorTestHarness {
|
||||
|
||||
/// Configure `PersistenceState` for specific persistence scenarios
|
||||
fn start_persistence_operation(&mut self, action: CurrentPersistenceAction) {
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
// Create a dummy receiver for testing - it will never receive a value
|
||||
let (_tx, rx) = oneshot::channel();
|
||||
let (_tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
match action {
|
||||
CurrentPersistenceAction::SavingBlocks { highest } => {
|
||||
@@ -498,11 +500,17 @@ fn test_tree_persist_block_batch() {
|
||||
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
|
||||
|
||||
// process the message
|
||||
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
|
||||
let msg = match test_harness.tree.wait_for_event() {
|
||||
super::LoopEvent::EngineMessage(msg) => msg,
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
};
|
||||
let _ = test_harness.tree.on_engine_message(msg).unwrap();
|
||||
|
||||
// we now should receive the other batch
|
||||
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
|
||||
let msg = match test_harness.tree.wait_for_event() {
|
||||
super::LoopEvent::EngineMessage(msg) => msg,
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
};
|
||||
match msg {
|
||||
FromEngine::DownloadedBlocks(blocks) => {
|
||||
assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
|
||||
@@ -753,8 +761,8 @@ async fn test_tree_state_on_new_head_reorg() {
|
||||
})
|
||||
);
|
||||
|
||||
// after advancing persistence, we should be at `None` for the next action
|
||||
test_harness.tree.advance_persistence().unwrap();
|
||||
// after polling persistence completion, we should be at `None` for the next action
|
||||
test_harness.tree.try_poll_persistence().unwrap();
|
||||
let current_action = test_harness.tree.persistence_state.current_action().cloned();
|
||||
assert_eq!(current_action, None);
|
||||
|
||||
|
||||
@@ -51,7 +51,12 @@ jemalloc = [
|
||||
"reth-node-metrics/jemalloc",
|
||||
]
|
||||
jemalloc-prof = [
|
||||
"reth-node-core/jemalloc",
|
||||
"jemalloc",
|
||||
"reth-node-metrics/jemalloc-prof",
|
||||
]
|
||||
jemalloc-symbols = [
|
||||
"jemalloc-prof",
|
||||
"reth-node-metrics/jemalloc-symbols",
|
||||
]
|
||||
tracy-allocator = []
|
||||
|
||||
@@ -81,3 +86,5 @@ min-trace-logs = [
|
||||
"tracing/release_max_level_trace",
|
||||
"reth-node-core/min-trace-logs",
|
||||
]
|
||||
|
||||
edge = ["reth-cli-commands/edge"]
|
||||
|
||||
@@ -84,17 +84,15 @@ where
|
||||
B: Block,
|
||||
ChainSpec: EthChainSpec<Header = B::Header> + EthereumHardforks + Debug + Send + Sync,
|
||||
{
|
||||
type Error = ConsensusError;
|
||||
|
||||
fn validate_body_against_header(
|
||||
&self,
|
||||
body: &B::Body,
|
||||
header: &SealedHeader<B::Header>,
|
||||
) -> Result<(), Self::Error> {
|
||||
) -> Result<(), ConsensusError> {
|
||||
validate_body_against_header(body, header.header())
|
||||
}
|
||||
|
||||
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), Self::Error> {
|
||||
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError> {
|
||||
validate_block_pre_execution(block, &self.chain_spec)
|
||||
}
|
||||
}
|
||||
@@ -228,10 +226,12 @@ mod tests {
|
||||
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
|
||||
let child = header_with_gas_limit((parent.gas_limit + 5) as u64);
|
||||
|
||||
assert_eq!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
|
||||
Ok(())
|
||||
);
|
||||
assert!(validate_against_parent_gas_limit(
|
||||
&child,
|
||||
&parent,
|
||||
&ChainSpec::<Header>::default()
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -239,10 +239,11 @@ mod tests {
|
||||
let parent = header_with_gas_limit(MINIMUM_GAS_LIMIT);
|
||||
let child = header_with_gas_limit(MINIMUM_GAS_LIMIT - 1);
|
||||
|
||||
assert_eq!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
|
||||
Err(ConsensusError::GasLimitInvalidMinimum { child_gas_limit: child.gas_limit as u64 })
|
||||
);
|
||||
assert!(matches!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
|
||||
ConsensusError::GasLimitInvalidMinimum { child_gas_limit }
|
||||
if child_gas_limit == child.gas_limit as u64
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -252,13 +253,11 @@ mod tests {
|
||||
parent.gas_limit + parent.gas_limit / GAS_LIMIT_BOUND_DIVISOR + 1,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
|
||||
Err(ConsensusError::GasLimitInvalidIncrease {
|
||||
parent_gas_limit: parent.gas_limit,
|
||||
child_gas_limit: child.gas_limit,
|
||||
})
|
||||
);
|
||||
assert!(matches!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
|
||||
ConsensusError::GasLimitInvalidIncrease { parent_gas_limit, child_gas_limit }
|
||||
if parent_gas_limit == parent.gas_limit && child_gas_limit == child.gas_limit
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -266,10 +265,12 @@ mod tests {
|
||||
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
|
||||
let child = header_with_gas_limit(parent.gas_limit - 5);
|
||||
|
||||
assert_eq!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
|
||||
Ok(())
|
||||
);
|
||||
assert!(validate_against_parent_gas_limit(
|
||||
&child,
|
||||
&parent,
|
||||
&ChainSpec::<Header>::default()
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -279,13 +280,11 @@ mod tests {
|
||||
parent.gas_limit - parent.gas_limit / GAS_LIMIT_BOUND_DIVISOR - 1,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
|
||||
Err(ConsensusError::GasLimitInvalidDecrease {
|
||||
parent_gas_limit: parent.gas_limit,
|
||||
child_gas_limit: child.gas_limit,
|
||||
})
|
||||
);
|
||||
assert!(matches!(
|
||||
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
|
||||
ConsensusError::GasLimitInvalidDecrease { parent_gas_limit, child_gas_limit }
|
||||
if parent_gas_limit == parent.gas_limit && child_gas_limit == child.gas_limit
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -300,9 +299,8 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
EthBeaconConsensus::new(chain_spec).validate_header(&SealedHeader::seal_slow(header,)),
|
||||
Ok(())
|
||||
);
|
||||
assert!(EthBeaconConsensus::new(chain_spec)
|
||||
.validate_header(&SealedHeader::seal_slow(header,))
|
||||
.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,18 +170,16 @@ mod tests {
|
||||
let expected_receipts_root = B256::random();
|
||||
let expected_logs_bloom = calculated_logs_bloom;
|
||||
|
||||
assert_eq!(
|
||||
assert!(matches!(
|
||||
compare_receipts_root_and_logs_bloom(
|
||||
calculated_receipts_root,
|
||||
calculated_logs_bloom,
|
||||
expected_receipts_root,
|
||||
expected_logs_bloom
|
||||
),
|
||||
Err(ConsensusError::BodyReceiptRootDiff(
|
||||
GotExpected { got: calculated_receipts_root, expected: expected_receipts_root }
|
||||
.into()
|
||||
))
|
||||
);
|
||||
).unwrap_err(),
|
||||
ConsensusError::BodyReceiptRootDiff(diff)
|
||||
if diff.got == calculated_receipts_root && diff.expected == expected_receipts_root
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -192,16 +190,15 @@ mod tests {
|
||||
let expected_receipts_root = calculated_receipts_root;
|
||||
let expected_logs_bloom = Bloom::random();
|
||||
|
||||
assert_eq!(
|
||||
assert!(matches!(
|
||||
compare_receipts_root_and_logs_bloom(
|
||||
calculated_receipts_root,
|
||||
calculated_logs_bloom,
|
||||
expected_receipts_root,
|
||||
expected_logs_bloom
|
||||
),
|
||||
Err(ConsensusError::BodyBloomLogDiff(
|
||||
GotExpected { got: calculated_logs_bloom, expected: expected_logs_bloom }.into()
|
||||
))
|
||||
);
|
||||
).unwrap_err(),
|
||||
ConsensusError::BodyBloomLogDiff(diff)
|
||||
if diff.got == calculated_logs_bloom && diff.expected == expected_logs_bloom
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,8 +24,10 @@ use crate::BuiltPayloadConversionError;
|
||||
/// Contains the built payload.
|
||||
///
|
||||
/// According to the [engine API specification](https://github.com/ethereum/execution-apis/blob/main/src/engine/README.md) the execution layer should build the initial version of the payload with an empty transaction set and then keep update it in order to maximize the revenue.
|
||||
/// Therefore, the empty-block here is always available and full-block will be set/updated
|
||||
/// afterward.
|
||||
///
|
||||
/// This struct represents a single built block at a point in time. The payload building process
|
||||
/// creates a sequence of these payloads, starting with an empty block and progressively including
|
||||
/// more transactions.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EthBuiltPayload<N: NodePrimitives = EthPrimitives> {
|
||||
/// Identifier of the payload
|
||||
|
||||
@@ -29,6 +29,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
|
||||
.expect("valid datadir"),
|
||||
static_files_path: Some(tempdir.path().join("static")),
|
||||
rocksdb_path: Some(tempdir.path().join("rocksdb")),
|
||||
pprof_dumps_path: Some(tempdir.path().join("pprof")),
|
||||
};
|
||||
let config = NodeConfig::test().with_datadir_args(datadir_args).with_rpc(rpc_args);
|
||||
let db = create_test_rw_db();
|
||||
|
||||
@@ -152,6 +152,15 @@ jemalloc = [
|
||||
"reth-ethereum-cli?/jemalloc",
|
||||
"reth-node-core?/jemalloc",
|
||||
]
|
||||
jemalloc-prof = [
|
||||
"jemalloc",
|
||||
"reth-cli-util?/jemalloc-prof",
|
||||
"reth-ethereum-cli?/jemalloc-prof",
|
||||
]
|
||||
jemalloc-symbols = [
|
||||
"jemalloc-prof",
|
||||
"reth-ethereum-cli?/jemalloc-symbols",
|
||||
]
|
||||
js-tracer = [
|
||||
"rpc",
|
||||
"reth-rpc/js-tracer",
|
||||
|
||||
@@ -7,7 +7,7 @@ use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
|
||||
use thiserror::Error;
|
||||
|
||||
/// State root errors.
|
||||
#[derive(Error, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Error, Clone, Debug)]
|
||||
pub enum StateRootError {
|
||||
/// Internal database error.
|
||||
#[error(transparent)]
|
||||
@@ -15,19 +15,25 @@ pub enum StateRootError {
|
||||
/// Storage root error.
|
||||
#[error(transparent)]
|
||||
StorageRootError(#[from] StorageRootError),
|
||||
/// Provider error when loading prefix sets
|
||||
#[error(transparent)]
|
||||
PrefixSetLoadError(#[from] ProviderError),
|
||||
}
|
||||
|
||||
impl From<StateRootError> for DatabaseError {
|
||||
fn from(err: StateRootError) -> Self {
|
||||
match err {
|
||||
impl From<StateRootError> for ProviderError {
|
||||
fn from(value: StateRootError) -> Self {
|
||||
match value {
|
||||
StateRootError::Database(err) |
|
||||
StateRootError::StorageRootError(StorageRootError::Database(err)) => err,
|
||||
StateRootError::StorageRootError(StorageRootError::Database(err)) => {
|
||||
Self::Database(err)
|
||||
}
|
||||
StateRootError::PrefixSetLoadError(err) => err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage root error.
|
||||
#[derive(Error, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Error, Clone, Debug)]
|
||||
pub enum StorageRootError {
|
||||
/// Internal database error.
|
||||
#[error(transparent)]
|
||||
@@ -43,7 +49,7 @@ impl From<StorageRootError> for DatabaseError {
|
||||
}
|
||||
|
||||
/// State proof errors.
|
||||
#[derive(Error, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Error, Clone, Debug)]
|
||||
pub enum StateProofError {
|
||||
/// Internal database error.
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Contains [Chain], a chain of blocks and their final state.
|
||||
|
||||
use crate::ExecutionOutcome;
|
||||
use alloc::{borrow::Cow, collections::BTreeMap, vec::Vec};
|
||||
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc, vec::Vec};
|
||||
use alloy_consensus::{transaction::Recovered, BlockHeader};
|
||||
use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
|
||||
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
|
||||
@@ -10,8 +10,7 @@ use reth_primitives_traits::{
|
||||
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
|
||||
RecoveredBlock, SealedHeader,
|
||||
};
|
||||
use reth_trie_common::updates::TrieUpdates;
|
||||
use revm::database::BundleState;
|
||||
use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
|
||||
|
||||
/// A chain of blocks and their final state.
|
||||
///
|
||||
@@ -35,10 +34,10 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
|
||||
///
|
||||
/// Additionally, it includes the individual state changes that led to the current state.
|
||||
execution_outcome: ExecutionOutcome<N::Receipt>,
|
||||
/// State trie updates after block is added to the chain.
|
||||
/// NOTE: Currently, trie updates are present only for
|
||||
/// single-block chains that extend the canonical chain.
|
||||
trie_updates: Option<TrieUpdates>,
|
||||
/// State trie updates for each block in the chain, keyed by block number.
|
||||
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
|
||||
/// Hashed post state for each block in the chain, keyed by block number.
|
||||
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
|
||||
}
|
||||
|
||||
type ChainTxReceiptMeta<'a, N> = (
|
||||
@@ -54,6 +53,7 @@ impl<N: NodePrimitives> Default for Chain<N> {
|
||||
blocks: Default::default(),
|
||||
execution_outcome: Default::default(),
|
||||
trie_updates: Default::default(),
|
||||
hashed_state: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,22 +67,27 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
pub fn new(
|
||||
blocks: impl IntoIterator<Item = RecoveredBlock<N::Block>>,
|
||||
execution_outcome: ExecutionOutcome<N::Receipt>,
|
||||
trie_updates: Option<TrieUpdates>,
|
||||
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
|
||||
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
|
||||
) -> Self {
|
||||
let blocks =
|
||||
blocks.into_iter().map(|b| (b.header().number(), b)).collect::<BTreeMap<_, _>>();
|
||||
debug_assert!(!blocks.is_empty(), "Chain should have at least one block");
|
||||
|
||||
Self { blocks, execution_outcome, trie_updates }
|
||||
Self { blocks, execution_outcome, trie_updates, hashed_state }
|
||||
}
|
||||
|
||||
/// Create new Chain from a single block and its state.
|
||||
pub fn from_block(
|
||||
block: RecoveredBlock<N::Block>,
|
||||
execution_outcome: ExecutionOutcome<N::Receipt>,
|
||||
trie_updates: Option<TrieUpdates>,
|
||||
trie_updates: Arc<TrieUpdatesSorted>,
|
||||
hashed_state: Arc<HashedPostStateSorted>,
|
||||
) -> Self {
|
||||
Self::new([block], execution_outcome, trie_updates)
|
||||
let block_number = block.header().number();
|
||||
let trie_updates_map = BTreeMap::from([(block_number, trie_updates)]);
|
||||
let hashed_state_map = BTreeMap::from([(block_number, hashed_state)]);
|
||||
Self::new([block], execution_outcome, trie_updates_map, hashed_state_map)
|
||||
}
|
||||
|
||||
/// Get the blocks in this chain.
|
||||
@@ -100,14 +105,37 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
self.blocks.values().map(|block| block.clone_sealed_header())
|
||||
}
|
||||
|
||||
/// Get cached trie updates for this chain.
|
||||
pub const fn trie_updates(&self) -> Option<&TrieUpdates> {
|
||||
self.trie_updates.as_ref()
|
||||
/// Get all trie updates for this chain.
|
||||
pub const fn trie_updates(&self) -> &BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
|
||||
&self.trie_updates
|
||||
}
|
||||
|
||||
/// Remove cached trie updates for this chain.
|
||||
/// Get trie updates for a specific block number.
|
||||
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<&Arc<TrieUpdatesSorted>> {
|
||||
self.trie_updates.get(&block_number)
|
||||
}
|
||||
|
||||
/// Remove all trie updates for this chain.
|
||||
pub fn clear_trie_updates(&mut self) {
|
||||
self.trie_updates.take();
|
||||
self.trie_updates.clear();
|
||||
}
|
||||
|
||||
/// Get all hashed states for this chain.
|
||||
pub const fn hashed_state(&self) -> &BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
|
||||
&self.hashed_state
|
||||
}
|
||||
|
||||
/// Get hashed state for a specific block number.
|
||||
pub fn hashed_state_at(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> Option<&Arc<HashedPostStateSorted>> {
|
||||
self.hashed_state.get(&block_number)
|
||||
}
|
||||
|
||||
/// Remove all hashed states for this chain.
|
||||
pub fn clear_hashed_state(&mut self) {
|
||||
self.hashed_state.clear();
|
||||
}
|
||||
|
||||
/// Get execution outcome of this chain
|
||||
@@ -120,12 +148,6 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
&mut self.execution_outcome
|
||||
}
|
||||
|
||||
/// Prepends the given state to the current state.
|
||||
pub fn prepend_state(&mut self, state: BundleState) {
|
||||
self.execution_outcome.prepend_state(state);
|
||||
self.trie_updates.take(); // invalidate cached trie updates
|
||||
}
|
||||
|
||||
/// Return true if chain is empty and has no blocks.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.blocks.is_empty()
|
||||
@@ -161,11 +183,23 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
/// Destructure the chain into its inner components:
|
||||
/// 1. The blocks contained in the chain.
|
||||
/// 2. The execution outcome representing the final state.
|
||||
/// 3. The optional trie updates.
|
||||
/// 3. The trie updates map.
|
||||
/// 4. The hashed state map.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn into_inner(
|
||||
self,
|
||||
) -> (ChainBlocks<'static, N::Block>, ExecutionOutcome<N::Receipt>, Option<TrieUpdates>) {
|
||||
(ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_updates)
|
||||
) -> (
|
||||
ChainBlocks<'static, N::Block>,
|
||||
ExecutionOutcome<N::Receipt>,
|
||||
BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
|
||||
BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
|
||||
) {
|
||||
(
|
||||
ChainBlocks { blocks: Cow::Owned(self.blocks) },
|
||||
self.execution_outcome,
|
||||
self.trie_updates,
|
||||
self.hashed_state,
|
||||
)
|
||||
}
|
||||
|
||||
/// Destructure the chain into its inner components:
|
||||
@@ -295,10 +329,14 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
&mut self,
|
||||
block: RecoveredBlock<N::Block>,
|
||||
execution_outcome: ExecutionOutcome<N::Receipt>,
|
||||
trie_updates: Arc<TrieUpdatesSorted>,
|
||||
hashed_state: Arc<HashedPostStateSorted>,
|
||||
) {
|
||||
self.blocks.insert(block.header().number(), block);
|
||||
let block_number = block.header().number();
|
||||
self.blocks.insert(block_number, block);
|
||||
self.execution_outcome.extend(execution_outcome);
|
||||
self.trie_updates.take(); // reset
|
||||
self.trie_updates.insert(block_number, trie_updates);
|
||||
self.hashed_state.insert(block_number, hashed_state);
|
||||
}
|
||||
|
||||
/// Merge two chains by appending the given chain into the current one.
|
||||
@@ -317,7 +355,8 @@ impl<N: NodePrimitives> Chain<N> {
|
||||
// Insert blocks from other chain
|
||||
self.blocks.extend(other.blocks);
|
||||
self.execution_outcome.extend(other.execution_outcome);
|
||||
self.trie_updates.take(); // reset
|
||||
self.trie_updates.extend(other.trie_updates);
|
||||
self.hashed_state.extend(other.hashed_state);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -442,14 +481,13 @@ pub struct BlockReceipts<T = reth_ethereum_primitives::Receipt> {
|
||||
#[cfg(feature = "serde-bincode-compat")]
|
||||
pub(super) mod serde_bincode_compat {
|
||||
use crate::{serde_bincode_compat, ExecutionOutcome};
|
||||
use alloc::{borrow::Cow, collections::BTreeMap};
|
||||
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc};
|
||||
use alloy_primitives::BlockNumber;
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_primitives_traits::{
|
||||
serde_bincode_compat::{RecoveredBlock, SerdeBincodeCompat},
|
||||
Block, NodePrimitives,
|
||||
};
|
||||
use reth_trie_common::serde_bincode_compat::updates::TrieUpdates;
|
||||
use serde::{ser::SerializeMap, Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_with::{DeserializeAs, SerializeAs};
|
||||
|
||||
@@ -469,6 +507,7 @@ pub(super) mod serde_bincode_compat {
|
||||
/// }
|
||||
/// ```
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(bound = "")]
|
||||
pub struct Chain<'a, N = EthPrimitives>
|
||||
where
|
||||
N: NodePrimitives<
|
||||
@@ -477,7 +516,19 @@ pub(super) mod serde_bincode_compat {
|
||||
{
|
||||
blocks: RecoveredBlocks<'a, N::Block>,
|
||||
execution_outcome: serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
|
||||
trie_updates: Option<TrieUpdates<'a>>,
|
||||
#[serde(default, rename = "trie_updates_legacy")]
|
||||
_trie_updates_legacy:
|
||||
Option<reth_trie_common::serde_bincode_compat::updates::TrieUpdates<'a>>,
|
||||
#[serde(default)]
|
||||
trie_updates: BTreeMap<
|
||||
BlockNumber,
|
||||
reth_trie_common::serde_bincode_compat::updates::TrieUpdatesSorted<'a>,
|
||||
>,
|
||||
#[serde(default)]
|
||||
hashed_state: BTreeMap<
|
||||
BlockNumber,
|
||||
reth_trie_common::serde_bincode_compat::hashed_state::HashedPostStateSorted<'a>,
|
||||
>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -530,7 +581,17 @@ pub(super) mod serde_bincode_compat {
|
||||
Self {
|
||||
blocks: RecoveredBlocks(Cow::Borrowed(&value.blocks)),
|
||||
execution_outcome: value.execution_outcome.as_repr(),
|
||||
trie_updates: value.trie_updates.as_ref().map(Into::into),
|
||||
_trie_updates_legacy: None,
|
||||
trie_updates: value
|
||||
.trie_updates
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, v.as_ref().into()))
|
||||
.collect(),
|
||||
hashed_state: value
|
||||
.hashed_state
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, v.as_ref().into()))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -545,7 +606,16 @@ pub(super) mod serde_bincode_compat {
|
||||
Self {
|
||||
blocks: value.blocks.0.into_owned(),
|
||||
execution_outcome: ExecutionOutcome::from_repr(value.execution_outcome),
|
||||
trie_updates: value.trie_updates.map(Into::into),
|
||||
trie_updates: value
|
||||
.trie_updates
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Arc::new(v.into())))
|
||||
.collect(),
|
||||
hashed_state: value
|
||||
.hashed_state
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Arc::new(v.into())))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -589,6 +659,8 @@ pub(super) mod serde_bincode_compat {
|
||||
|
||||
#[test]
|
||||
fn test_chain_bincode_roundtrip() {
|
||||
use alloc::collections::BTreeMap;
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct Data {
|
||||
@@ -603,7 +675,8 @@ pub(super) mod serde_bincode_compat {
|
||||
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
|
||||
.unwrap()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
),
|
||||
};
|
||||
|
||||
@@ -620,7 +693,7 @@ mod tests {
|
||||
use alloy_consensus::TxType;
|
||||
use alloy_primitives::{Address, B256};
|
||||
use reth_ethereum_primitives::Receipt;
|
||||
use revm::{primitives::HashMap, state::AccountInfo};
|
||||
use revm::{database::BundleState, primitives::HashMap, state::AccountInfo};
|
||||
|
||||
#[test]
|
||||
fn chain_append() {
|
||||
@@ -703,8 +776,12 @@ mod tests {
|
||||
let mut block_state_extended = execution_outcome1;
|
||||
block_state_extended.extend(execution_outcome2);
|
||||
|
||||
let chain: Chain =
|
||||
Chain::new(vec![block1.clone(), block2.clone()], block_state_extended, None);
|
||||
let chain: Chain = Chain::new(
|
||||
vec![block1.clone(), block2.clone()],
|
||||
block_state_extended,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
);
|
||||
|
||||
// return tip state
|
||||
assert_eq!(
|
||||
|
||||
@@ -396,7 +396,7 @@ impl ExecutionOutcome {
|
||||
/// Returns the ethereum receipt root for all recorded receipts.
|
||||
///
|
||||
/// Note: this function calculated Bloom filters for every receipt and created merkle trees
|
||||
/// of receipt. This is a expensive operation.
|
||||
/// of receipt. This is an expensive operation.
|
||||
pub fn ethereum_receipts_root(&self, block_number: BlockNumber) -> Option<B256> {
|
||||
self.generic_receipts_root_slow(
|
||||
block_number,
|
||||
|
||||
@@ -57,6 +57,7 @@ reth-evm-ethereum.workspace = true
|
||||
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
|
||||
reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
reth-testing-utils.workspace = true
|
||||
reth-trie-common = { workspace = true, features = ["serde-bincode-compat"] }
|
||||
|
||||
alloy-genesis.workspace = true
|
||||
|
||||
@@ -76,6 +77,7 @@ serde = [
|
||||
"rand/serde",
|
||||
"secp256k1/serde",
|
||||
"reth-primitives-traits/serde",
|
||||
"reth-trie-common/serde",
|
||||
"reth-prune-types/serde",
|
||||
"reth-config/serde",
|
||||
"reth-ethereum-primitives/serde",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::StreamBackfillJob;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
ops::RangeInclusive,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -148,7 +149,7 @@ where
|
||||
executor.into_state().take_bundle(),
|
||||
results,
|
||||
);
|
||||
let chain = Chain::new(blocks, outcome, None);
|
||||
let chain = Chain::new(blocks, outcome, BTreeMap::new(), BTreeMap::new());
|
||||
Ok(chain)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,11 +38,14 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
|
||||
/// or 17 minutes of 1-second blocks.
|
||||
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
|
||||
|
||||
/// The maximum number of blocks allowed in the WAL before emitting a warning.
|
||||
/// Default maximum number of blocks allowed in the WAL before emitting a warning.
|
||||
///
|
||||
/// This constant defines the threshold for the Write-Ahead Log (WAL) size. If the number of blocks
|
||||
/// in the WAL exceeds this limit, a warning is logged to indicate potential issues.
|
||||
pub const WAL_BLOCKS_WARNING: usize = 128;
|
||||
/// This constant defines the default threshold for the Write-Ahead Log (WAL) size. If the number
|
||||
/// of blocks in the WAL exceeds this limit, a warning is logged to indicate potential issues.
|
||||
///
|
||||
/// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with
|
||||
/// faster block times, this value should be increased proportionally to avoid excessive warnings.
|
||||
pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
|
||||
|
||||
/// The source of the notification.
|
||||
///
|
||||
@@ -247,6 +250,8 @@ pub struct ExExManager<P, N: NodePrimitives> {
|
||||
wal: Wal<N>,
|
||||
/// A stream of finalized headers.
|
||||
finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
|
||||
/// The threshold for the number of blocks in the WAL before emitting a warning.
|
||||
wal_blocks_warning: usize,
|
||||
|
||||
/// A handle to the `ExEx` manager.
|
||||
handle: ExExManagerHandle<N>,
|
||||
@@ -306,6 +311,7 @@ where
|
||||
|
||||
wal,
|
||||
finalized_header_stream,
|
||||
wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
|
||||
|
||||
handle: ExExManagerHandle {
|
||||
exex_tx: handle_tx,
|
||||
@@ -324,6 +330,16 @@ where
|
||||
self.handle.clone()
|
||||
}
|
||||
|
||||
/// Sets the threshold for the number of blocks in the WAL before emitting a warning.
|
||||
///
|
||||
/// For L2 chains with faster block times, this value should be increased proportionally
|
||||
/// to avoid excessive warnings. For example, a chain with 2-second block times might use
|
||||
/// a value 6x higher than the default.
|
||||
pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
|
||||
self.wal_blocks_warning = threshold;
|
||||
self
|
||||
}
|
||||
|
||||
/// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
|
||||
/// readiness to receive notifications.
|
||||
fn update_capacity(&self) {
|
||||
@@ -390,10 +406,11 @@ where
|
||||
.unwrap();
|
||||
|
||||
self.wal.finalize(lowest_finished_height)?;
|
||||
if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
|
||||
if self.wal.num_blocks() > self.wal_blocks_warning {
|
||||
warn!(
|
||||
target: "exex::manager",
|
||||
blocks = ?self.wal.num_blocks(),
|
||||
threshold = self.wal_blocks_warning,
|
||||
"WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
|
||||
);
|
||||
}
|
||||
@@ -670,6 +687,7 @@ mod tests {
|
||||
BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
|
||||
};
|
||||
use reth_testing_utils::generators::{self, random_block, BlockParams};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
|
||||
let (tx, rx) = watch::channel(None);
|
||||
@@ -771,7 +789,12 @@ mod tests {
|
||||
block1.set_block_number(10);
|
||||
|
||||
let notification1 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block1.clone()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
// Push the first notification
|
||||
@@ -789,7 +812,12 @@ mod tests {
|
||||
block2.set_block_number(20);
|
||||
|
||||
let notification2 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block2.clone()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
exex_manager.push_notification(notification2.clone());
|
||||
@@ -832,7 +860,12 @@ mod tests {
|
||||
block1.set_block_number(10);
|
||||
|
||||
let notification1 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block1.clone()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
exex_manager.push_notification(notification1.clone());
|
||||
@@ -1060,6 +1093,7 @@ mod tests {
|
||||
vec![Default::default()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -1127,7 +1161,8 @@ mod tests {
|
||||
// Setup a notification
|
||||
let notification = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block1.clone(), block2.clone()],
|
||||
vec![Default::default()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
@@ -1174,7 +1209,12 @@ mod tests {
|
||||
block1.set_block_number(10);
|
||||
|
||||
let notification = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block1.clone()],
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
@@ -1320,10 +1360,20 @@ mod tests {
|
||||
);
|
||||
|
||||
let genesis_notification = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![genesis_block.clone()],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
let notification = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block.clone()],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
let (finalized_headers_tx, rx) = watch::channel(None);
|
||||
|
||||
@@ -460,6 +460,7 @@ mod tests {
|
||||
Chain, DBProvider, DatabaseProviderFactory,
|
||||
};
|
||||
use reth_testing_utils::generators::{self, random_block, BlockParams};
|
||||
use std::collections::BTreeMap;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -499,7 +500,8 @@ mod tests {
|
||||
)
|
||||
.try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -567,7 +569,8 @@ mod tests {
|
||||
.seal_slow()
|
||||
.try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -634,7 +637,8 @@ mod tests {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![exex_head_block.clone().try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
wal.commit(&exex_head_notification)?;
|
||||
@@ -648,7 +652,8 @@ mod tests {
|
||||
)
|
||||
.try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -705,7 +710,8 @@ mod tests {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![exex_head_block.clone().try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
wal.commit(&exex_head_notification)?;
|
||||
@@ -724,7 +730,8 @@ mod tests {
|
||||
)
|
||||
.try_recover()?],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
|
||||
@@ -243,7 +243,7 @@ mod tests {
|
||||
use reth_testing_utils::generators::{
|
||||
self, random_block, random_block_range, BlockParams, BlockRangeParams,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
|
||||
wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
|
||||
@@ -303,25 +303,38 @@ mod tests {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![blocks[0].clone(), blocks[1].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
let reverted_notification = ExExNotification::ChainReverted {
|
||||
old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
|
||||
old: Arc::new(Chain::new(
|
||||
vec![blocks[1].clone()],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
let committed_notification_2 = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block_1_reorged.clone(), blocks[2].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
let reorged_notification = ExExNotification::ChainReorged {
|
||||
old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
|
||||
old: Arc::new(Chain::new(
|
||||
vec![blocks[2].clone()],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block_2_reorged.clone(), blocks[3].clone()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
|
||||
@@ -178,12 +178,22 @@ where
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Storage;
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::{
|
||||
map::{HashMap, HashSet},
|
||||
B256, U256,
|
||||
};
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_provider::Chain;
|
||||
use reth_testing_utils::generators::{self, random_block};
|
||||
use std::{fs::File, sync::Arc};
|
||||
use reth_trie_common::{
|
||||
updates::{StorageTrieUpdates, TrieUpdates},
|
||||
BranchNodeCompact, HashedPostState, HashedStorage, Nibbles,
|
||||
};
|
||||
use std::{collections::BTreeMap, fs::File, sync::Arc};
|
||||
|
||||
// wal with 1 block and tx
|
||||
// wal with 1 block and tx (old 3-field format)
|
||||
// <https://github.com/paradigmxyz/reth/issues/15012>
|
||||
#[test]
|
||||
fn decode_notification_wal() {
|
||||
@@ -202,6 +212,24 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// wal with 1 block and tx (new 4-field format with trie updates and hashed state)
|
||||
#[test]
|
||||
fn decode_notification_wal_new_format() {
|
||||
let wal = include_bytes!("../../test-data/new_format.wal");
|
||||
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
|
||||
'_,
|
||||
reth_ethereum_primitives::EthPrimitives,
|
||||
> = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
|
||||
let notification: ExExNotification = notification.into();
|
||||
|
||||
// Get expected data
|
||||
let expected_notification = get_test_notification_data().unwrap();
|
||||
assert_eq!(
|
||||
¬ification, &expected_notification,
|
||||
"Decoded notification should match expected static data"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_roundtrip() -> eyre::Result<()> {
|
||||
let mut rng = generators::rng();
|
||||
@@ -213,8 +241,18 @@ mod tests {
|
||||
let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
|
||||
|
||||
let notification = ExExNotification::ChainReorged {
|
||||
new: Arc::new(Chain::new(vec![new_block], Default::default(), None)),
|
||||
old: Arc::new(Chain::new(vec![old_block], Default::default(), None)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![new_block],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
old: Arc::new(Chain::new(
|
||||
vec![old_block],
|
||||
Default::default(),
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
};
|
||||
|
||||
// Do a round trip serialization and deserialization
|
||||
@@ -229,6 +267,97 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate a new WAL file for testing.
|
||||
///
|
||||
/// Run this test with `--ignored` to generate a new test WAL file:
|
||||
/// ```sh
|
||||
/// cargo test -p reth-exex generate_test_wal -- --ignored --nocapture
|
||||
/// ```
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn generate_test_wal() -> eyre::Result<()> {
|
||||
use std::io::Write;
|
||||
|
||||
let notification = get_test_notification_data()?;
|
||||
|
||||
// Serialize the notification
|
||||
let notification_compat =
|
||||
reth_exex_types::serde_bincode_compat::ExExNotification::from(¬ification);
|
||||
let encoded = rmp_serde::encode::to_vec(¬ification_compat)?;
|
||||
|
||||
// Write to test-data directory
|
||||
let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
|
||||
std::fs::create_dir_all(&test_data_dir)?;
|
||||
|
||||
let output_path = test_data_dir.join("new_format.wal");
|
||||
let mut file = File::create(&output_path)?;
|
||||
file.write_all(&encoded)?;
|
||||
|
||||
println!("Generated WAL file at: {}", output_path.display());
|
||||
println!("File size: {} bytes", encoded.len());
|
||||
println!("✓ WAL file created successfully!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function to generate deterministic test data for WAL tests
|
||||
fn get_test_notification_data(
|
||||
) -> eyre::Result<ExExNotification<reth_ethereum_primitives::EthPrimitives>> {
|
||||
use reth_ethereum_primitives::Block;
|
||||
use reth_primitives_traits::Block as _;
|
||||
|
||||
// Create a block with a transaction
|
||||
let block = Block::default().seal_slow().try_recover()?;
|
||||
let block_number = block.header().number();
|
||||
|
||||
let hashed_address = B256::from([1; 32]);
|
||||
let storage_key = B256::from([2; 32]);
|
||||
|
||||
let trie_updates = TrieUpdates {
|
||||
account_nodes: HashMap::from_iter([
|
||||
(Nibbles::from_nibbles_unchecked([0x01]), BranchNodeCompact::default()),
|
||||
(Nibbles::from_nibbles_unchecked([0x02]), BranchNodeCompact::default()),
|
||||
]),
|
||||
removed_nodes: HashSet::from_iter([Nibbles::from_nibbles_unchecked([0x03])]),
|
||||
storage_tries: HashMap::from_iter([(
|
||||
hashed_address,
|
||||
StorageTrieUpdates {
|
||||
is_deleted: false,
|
||||
storage_nodes: HashMap::from_iter([(
|
||||
Nibbles::from_nibbles_unchecked([0x04]),
|
||||
BranchNodeCompact::default(),
|
||||
)]),
|
||||
removed_nodes: Default::default(),
|
||||
},
|
||||
)]),
|
||||
};
|
||||
|
||||
let hashed_state = HashedPostState {
|
||||
accounts: HashMap::from_iter([(
|
||||
hashed_address,
|
||||
Some(Account { nonce: 1, ..Default::default() }),
|
||||
)]),
|
||||
storages: HashMap::from_iter([(
|
||||
hashed_address,
|
||||
HashedStorage {
|
||||
wiped: false,
|
||||
storage: HashMap::from_iter([(storage_key, U256::from(101))]),
|
||||
},
|
||||
)]),
|
||||
};
|
||||
|
||||
let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
|
||||
ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(
|
||||
vec![block],
|
||||
Default::default(),
|
||||
BTreeMap::from([(block_number, Arc::new(trie_updates.into_sorted()))]),
|
||||
BTreeMap::from([(block_number, Arc::new(hashed_state.into_sorted()))]),
|
||||
)),
|
||||
};
|
||||
Ok(notification)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_files_range() -> eyre::Result<()> {
|
||||
let temp_dir = tempfile::tempdir()?;
|
||||
|
||||
BIN
crates/exex/exex/test-data/new_format.wal
Normal file
BIN
crates/exex/exex/test-data/new_format.wal
Normal file
Binary file not shown.
@@ -201,7 +201,7 @@ pub(super) mod serde_bincode_compat {
|
||||
use reth_primitives_traits::RecoveredBlock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
#[test]
|
||||
fn test_exex_notification_bincode_roundtrip() {
|
||||
@@ -222,13 +222,15 @@ pub(super) mod serde_bincode_compat {
|
||||
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
|
||||
.unwrap()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
new: Arc::new(Chain::new(
|
||||
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
|
||||
.unwrap()],
|
||||
Default::default(),
|
||||
None,
|
||||
BTreeMap::new(),
|
||||
BTreeMap::new(),
|
||||
)),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -70,10 +70,11 @@ impl<T> Clone for UnboundedMeteredSender<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
|
||||
/// A wrapper type around [`UnboundedReceiver`](mpsc::UnboundedReceiver) that updates metrics on
|
||||
/// receive.
|
||||
#[derive(Debug)]
|
||||
pub struct UnboundedMeteredReceiver<T> {
|
||||
/// The [Receiver](mpsc::UnboundedReceiver) that this wraps around
|
||||
/// The [`UnboundedReceiver`](mpsc::UnboundedReceiver) that this wraps around
|
||||
receiver: mpsc::UnboundedReceiver<T>,
|
||||
/// Holds metrics for this type
|
||||
metrics: MeteredReceiverMetrics,
|
||||
|
||||
@@ -459,7 +459,7 @@ pub struct Discv4Service {
|
||||
ingress: IngressReceiver,
|
||||
/// Sender for sending outgoing messages
|
||||
///
|
||||
/// Sends outgoind messages to the UDP task.
|
||||
/// Sends outgoing messages to the UDP task.
|
||||
egress: EgressSender,
|
||||
/// Buffered pending pings to apply backpressure.
|
||||
///
|
||||
@@ -479,7 +479,7 @@ pub struct Discv4Service {
|
||||
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
|
||||
/// Currently active ENR requests
|
||||
pending_enr_requests: HashMap<PeerId, EnrRequestState>,
|
||||
/// Copy of he sender half of the commands channel for [Discv4]
|
||||
/// Copy of the sender half of the commands channel for [Discv4]
|
||||
to_service: mpsc::UnboundedSender<Discv4Command>,
|
||||
/// Receiver half of the commands channel for [Discv4]
|
||||
commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
|
||||
|
||||
@@ -5,7 +5,7 @@ use alloy_primitives::BlockNumber;
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use reth_config::BodiesConfig;
|
||||
use reth_consensus::{Consensus, ConsensusError};
|
||||
use reth_consensus::Consensus;
|
||||
use reth_network_p2p::{
|
||||
bodies::{
|
||||
client::BodiesClient,
|
||||
@@ -41,7 +41,7 @@ pub struct BodiesDownloader<
|
||||
/// The bodies client
|
||||
client: Arc<C>,
|
||||
/// The consensus client
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
/// The database handle
|
||||
provider: Provider,
|
||||
/// The maximum number of non-empty blocks per one request
|
||||
@@ -307,12 +307,14 @@ where
|
||||
{
|
||||
type Block = B;
|
||||
|
||||
/// Set a new download range (exclusive).
|
||||
/// Set a new download range (inclusive).
|
||||
///
|
||||
/// This method will drain all queued bodies, filter out ones outside the range and put them
|
||||
/// back into the buffer.
|
||||
/// If there are any bodies between the range start and last queued body that have not been
|
||||
/// downloaded or are not in progress, they will be re-requested.
|
||||
/// If the provided range is a suffix of the current range with the same end block, the
|
||||
/// existing download already covers it and the call is a no-op.
|
||||
/// If the range starts immediately after the current range, it is treated as the next
|
||||
/// consecutive range and appended without resetting the in-flight state.
|
||||
/// For all other ranges, the downloader state is cleared and the new range replaces the old
|
||||
/// one.
|
||||
fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
|
||||
// Check if the range is valid.
|
||||
if range.is_empty() {
|
||||
@@ -577,7 +579,7 @@ impl BodiesDownloaderBuilder {
|
||||
pub fn build<B, C, Provider>(
|
||||
self,
|
||||
client: C,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
provider: Provider,
|
||||
) -> BodiesDownloader<B, C, Provider>
|
||||
where
|
||||
|
||||
@@ -4,7 +4,7 @@ use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::BlockNumber;
|
||||
use futures::{stream::FuturesUnordered, Stream};
|
||||
use futures_util::StreamExt;
|
||||
use reth_consensus::{Consensus, ConsensusError};
|
||||
use reth_consensus::Consensus;
|
||||
use reth_network_p2p::{
|
||||
bodies::{client::BodiesClient, response::BlockResponse},
|
||||
error::DownloadResult,
|
||||
@@ -58,7 +58,7 @@ where
|
||||
pub(crate) fn push_new_request(
|
||||
&mut self,
|
||||
client: Arc<C>,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
request: Vec<SealedHeader<B::Header>>,
|
||||
) {
|
||||
// Set last max requested block number
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::B256;
|
||||
use futures::{Future, FutureExt};
|
||||
use reth_consensus::{Consensus, ConsensusError};
|
||||
use reth_consensus::Consensus;
|
||||
use reth_network_p2p::{
|
||||
bodies::{client::BodiesClient, response::BlockResponse},
|
||||
error::{DownloadError, DownloadResult},
|
||||
@@ -38,7 +38,7 @@ use std::{
|
||||
/// and eventually disconnected.
|
||||
pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
|
||||
client: Arc<C>,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
metrics: BodyDownloaderMetrics,
|
||||
/// Metrics for individual responses. This can be used to observe how the size (in bytes) of
|
||||
/// responses change while bodies are being downloaded.
|
||||
@@ -60,7 +60,7 @@ where
|
||||
/// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
|
||||
pub(crate) fn new(
|
||||
client: Arc<C>,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
metrics: BodyDownloaderMetrics,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -42,7 +42,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use reth_consensus::{Consensus, ConsensusError};
|
||||
/// use reth_consensus::Consensus;
|
||||
/// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader};
|
||||
/// use reth_network_p2p::bodies::client::BodiesClient;
|
||||
/// use reth_primitives_traits::{Block, InMemorySize};
|
||||
@@ -55,7 +55,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
|
||||
/// Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
|
||||
/// >(
|
||||
/// client: Arc<C>,
|
||||
/// consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
/// consensus: Arc<dyn Consensus<B>>,
|
||||
/// provider: Provider,
|
||||
/// ) {
|
||||
/// let downloader =
|
||||
|
||||
@@ -86,7 +86,7 @@ impl<B: FullBlock> FileClient<B> {
|
||||
/// Create a new file client from a file path.
|
||||
pub async fn new<P: AsRef<Path>>(
|
||||
path: P,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
) -> Result<Self, FileClientError> {
|
||||
let file = File::open(path).await?;
|
||||
Self::from_file(file, consensus).await
|
||||
@@ -95,7 +95,7 @@ impl<B: FullBlock> FileClient<B> {
|
||||
/// Initialize the [`FileClient`] with a file directly.
|
||||
pub(crate) async fn from_file(
|
||||
mut file: File,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
) -> Result<Self, FileClientError> {
|
||||
// get file len from metadata before reading
|
||||
let metadata = file.metadata().await?;
|
||||
@@ -200,7 +200,7 @@ impl<B: FullBlock> FileClient<B> {
|
||||
}
|
||||
|
||||
struct FileClientBuilder<B: Block> {
|
||||
pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
pub consensus: Arc<dyn Consensus<B>>,
|
||||
pub parent_header: Option<SealedHeader<B::Header>>,
|
||||
}
|
||||
|
||||
@@ -562,7 +562,7 @@ impl ChunkedFileReader {
|
||||
/// are available before processing. For plain files, it uses the original chunking logic.
|
||||
pub async fn next_chunk<B: FullBlock>(
|
||||
&mut self,
|
||||
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
|
||||
consensus: Arc<dyn Consensus<B>>,
|
||||
parent_header: Option<SealedHeader<B::Header>>,
|
||||
) -> Result<Option<FileClient<B>>, FileClientError> {
|
||||
let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
|
||||
@@ -726,7 +726,7 @@ mod tests {
|
||||
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p0, p1, p2]));
|
||||
assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
|
||||
assert!(downloader.next().await.is_none());
|
||||
assert!(downloader.next().await.is_none());
|
||||
}
|
||||
|
||||
@@ -1464,7 +1464,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p0, p1, p2,]));
|
||||
assert_eq!(headers.unwrap(), vec![p0, p1, p2,]);
|
||||
assert!(downloader.buffered_responses.is_empty());
|
||||
assert!(downloader.next().await.is_none());
|
||||
assert!(downloader.next().await.is_none());
|
||||
@@ -1496,18 +1496,18 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p0]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p0]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p1]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p1]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p2]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p2]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
assert!(downloader.next().await.is_none());
|
||||
@@ -1539,18 +1539,18 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p0]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p0]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p1]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p1]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p2]));
|
||||
let headers = headers.unwrap();
|
||||
assert_eq!(headers, vec![p2]);
|
||||
assert_eq!(headers.capacity(), headers.len());
|
||||
|
||||
assert!(downloader.next().await.is_none());
|
||||
|
||||
@@ -223,11 +223,11 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p0]));
|
||||
assert_eq!(headers.unwrap(), vec![p0]);
|
||||
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p1]));
|
||||
assert_eq!(headers.unwrap(), vec![p1]);
|
||||
let headers = downloader.next().await.unwrap();
|
||||
assert_eq!(headers, Ok(vec![p2]));
|
||||
assert_eq!(headers.unwrap(), vec![p2]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,6 @@ secp256k1 = { workspace = true, features = ["global-context", "std", "recovery",
|
||||
rand_08.workspace = true
|
||||
concat-kdf.workspace = true
|
||||
sha2.workspace = true
|
||||
sha3.workspace = true
|
||||
aes.workspace = true
|
||||
hmac.workspace = true
|
||||
block-padding.workspace = true
|
||||
|
||||
@@ -9,12 +9,12 @@ use crate::{
|
||||
use aes::{cipher::StreamCipher, Aes128, Aes256};
|
||||
use alloy_primitives::{
|
||||
bytes::{BufMut, Bytes, BytesMut},
|
||||
B128, B256, B512 as PeerId,
|
||||
Keccak256, B128, B256, B512 as PeerId,
|
||||
};
|
||||
use alloy_rlp::{Encodable, Rlp, RlpEncodable, RlpMaxEncodedLen};
|
||||
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
|
||||
use ctr::Ctr64BE;
|
||||
use digest::{crypto_common::KeyIvInit, Digest};
|
||||
use digest::crypto_common::KeyIvInit;
|
||||
use rand_08::{thread_rng as rng, Rng};
|
||||
use reth_network_peers::{id2pk, pk2id};
|
||||
use secp256k1::{
|
||||
@@ -22,7 +22,6 @@ use secp256k1::{
|
||||
PublicKey, SecretKey, SECP256K1,
|
||||
};
|
||||
use sha2::Sha256;
|
||||
use sha3::Keccak256;
|
||||
|
||||
const PROTOCOL_VERSION: usize = 4;
|
||||
|
||||
|
||||
@@ -10,11 +10,10 @@
|
||||
//! For more information, refer to the [Ethereum MAC specification](https://github.com/ethereum/devp2p/blob/master/rlpx.md#mac).
|
||||
|
||||
use aes::Aes256Enc;
|
||||
use alloy_primitives::{B128, B256};
|
||||
use alloy_primitives::{Keccak256, B128, B256};
|
||||
use block_padding::NoPadding;
|
||||
use cipher::BlockEncrypt;
|
||||
use digest::KeyInit;
|
||||
use sha3::{Digest, Keccak256};
|
||||
|
||||
/// [`Ethereum MAC`](https://github.com/ethereum/devp2p/blob/master/rlpx.md#mac) state.
|
||||
///
|
||||
@@ -57,7 +56,7 @@ impl MAC {
|
||||
self.hasher.update(data);
|
||||
let prev = self.digest();
|
||||
let aes = Aes256Enc::new_from_slice(self.secret.as_ref()).unwrap();
|
||||
let mut encrypted = self.digest().0;
|
||||
let mut encrypted = prev.0;
|
||||
|
||||
aes.encrypt_padded::<NoPadding>(&mut encrypted, B128::len_bytes()).unwrap();
|
||||
for i in 0..16 {
|
||||
|
||||
@@ -757,12 +757,12 @@ impl RequestTxHashes {
|
||||
Self::new(HashSet::with_capacity_and_hasher(capacity, Default::default()))
|
||||
}
|
||||
|
||||
/// Returns an new empty instance.
|
||||
/// Returns a new empty instance.
|
||||
fn empty() -> Self {
|
||||
Self::new(HashSet::default())
|
||||
}
|
||||
|
||||
/// Retains the given number of elements, returning and iterator over the rest.
|
||||
/// Retains the given number of elements, returning an iterator over the rest.
|
||||
pub fn retain_count(&mut self, count: usize) -> Self {
|
||||
let rest_capacity = self.hashes.len().saturating_sub(count);
|
||||
if rest_capacity == 0 {
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
use crate::{
|
||||
eth_requests::EthRequestHandler,
|
||||
transactions::{
|
||||
config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
|
||||
config::{
|
||||
AnnouncementFilteringPolicy, StrictEthAnnouncementFilter, TransactionPropagationKind,
|
||||
},
|
||||
policy::NetworkPolicies,
|
||||
TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig,
|
||||
},
|
||||
@@ -84,17 +86,41 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
|
||||
}
|
||||
|
||||
/// Creates a new [`TransactionsManager`] and wires it to the network.
|
||||
pub fn transactions_with_policy<Pool: TransactionPool>(
|
||||
///
|
||||
/// Uses the default [`StrictEthAnnouncementFilter`] for announcement filtering.
|
||||
pub fn transactions_with_policy<Pool: TransactionPool, P: TransactionPropagationPolicy<N>>(
|
||||
self,
|
||||
pool: Pool,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
propagation_policy: impl TransactionPropagationPolicy<N>,
|
||||
propagation_policy: P,
|
||||
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
|
||||
self.transactions_with_policies(
|
||||
pool,
|
||||
transactions_manager_config,
|
||||
propagation_policy,
|
||||
StrictEthAnnouncementFilter::default(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a new [`TransactionsManager`] with custom propagation and announcement policies.
|
||||
///
|
||||
/// This allows chains with custom transaction types (like CATX) to configure
|
||||
/// the announcement filter to accept their transaction types.
|
||||
pub fn transactions_with_policies<
|
||||
Pool: TransactionPool,
|
||||
P: TransactionPropagationPolicy<N>,
|
||||
A: AnnouncementFilteringPolicy<N>,
|
||||
>(
|
||||
self,
|
||||
pool: Pool,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
propagation_policy: P,
|
||||
announcement_policy: A,
|
||||
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
|
||||
let Self { mut network, request_handler, .. } = self;
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
network.set_transactions(tx);
|
||||
let handle = network.handle().clone();
|
||||
let announcement_policy = StrictEthAnnouncementFilter::default();
|
||||
let policies = NetworkPolicies::new(propagation_policy, announcement_policy);
|
||||
|
||||
let transactions = TransactionsManager::with_policy(
|
||||
|
||||
@@ -1245,7 +1245,7 @@ pub enum PeerAction {
|
||||
PeerRemoved(PeerId),
|
||||
}
|
||||
|
||||
/// Error thrown when a incoming connection is rejected right away
|
||||
/// Error thrown when an incoming connection is rejected right away
|
||||
#[derive(Debug, Error, PartialEq, Eq)]
|
||||
pub enum InboundConnectionError {
|
||||
/// The remote's ip address is banned
|
||||
|
||||
@@ -637,7 +637,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
//
|
||||
// known txns have already been successfully fetched or received over gossip.
|
||||
//
|
||||
// most hashes will be filtered out here since this the mempool protocol is a gossip
|
||||
// most hashes will be filtered out here since the mempool protocol is a gossip
|
||||
// protocol, healthy peers will send many of the same hashes.
|
||||
//
|
||||
let hashes_count_pre_pool_filter = partially_valid_msg.len();
|
||||
@@ -2000,7 +2000,7 @@ impl<N: NetworkPrimitives> PeerMetadata<N> {
|
||||
&self.request_tx
|
||||
}
|
||||
|
||||
/// Return a
|
||||
/// Returns a mutable reference to the seen transactions LRU cache.
|
||||
pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
|
||||
&mut self.seen_transactions
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user