Compare commits

..

85 Commits

Author SHA1 Message Date
Arsenii Kulikov
1549e93eac fix 2026-02-01 19:00:59 +04:00
Arsenii Kulikov
48e1270dec wip 2026-02-01 18:19:52 +04:00
Georgios Konstantopoulos
821974a6a6 fix(trie): use full_key in proof_required_fn when path is prefix
When calling proof_required_fn in update_leaves, use the original full_key
instead of the padded path when path is a prefix of full_path. This ensures
proof targets are correctly associated with the original key for cache lookups.

The min_len is always derived from path.len() regardless of which key is used.

Introduces proof_target_for_path helper to DRY up the logic across all three
call sites (removal, update/insert, and touched).
2026-01-31 11:53:26 +00:00
Arsenii Kulikov
6183adb2db Merge branch 'klkvr/sparse-trie-cache' into klkvr/sparse-trie-cache-preserve 2026-01-30 23:40:00 +04:00
Arsenii Kulikov
1b0c54a0a9 wip 2026-01-30 22:50:49 +04:00
Arsenii Kulikov
3aaa8daefc wip 2026-01-30 22:40:27 +04:00
Arsenii Kulikov
bb97b80116 wip 2026-01-30 22:25:33 +04:00
Arsenii Kulikov
81d1aa1eb4 Merge branch 'mattsse/sparse-trie-preservation' into klkvr/sparse-trie-cache-preserve 2026-01-30 21:55:32 +04:00
Arsenii Kulikov
d6d1a090e5 fix 2026-01-30 21:51:55 +04:00
Arsenii Kulikov
b52700dd95 Merge branch 'main' into klkvr/sparse-trie-cache 2026-01-30 21:50:32 +04:00
Matthias Seitz
65201a1abf fix docs 2026-01-30 18:49:43 +01:00
Matthias Seitz
b7cb06b88f update take updates 2026-01-30 18:42:11 +01:00
Arsenii Kulikov
5b86a17331 fix 2026-01-30 21:25:30 +04:00
Arsenii Kulikov
914ab7d5e6 fix 2026-01-30 21:22:53 +04:00
Matthias Seitz
7e38827df5 docs: fix broken intra-doc link in StorageSettings
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:18:59 +01:00
Arsenii Kulikov
dd2c5b7c53 wip 2026-01-30 21:18:53 +04:00
Matthias Seitz
ed528fb975 fix(trie): update prune test expectations for hash stub counting
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:12:29 +01:00
Arsenii Kulikov
25a97f0be4 wip 2026-01-30 20:59:13 +04:00
Arsenii Kulikov
844e531a3f wip 2026-01-30 20:55:49 +04:00
Matthias Seitz
ef860c20fb refactor: PreservedSparseTrie as enum, anchor by state root
- Make PreservedSparseTrie an enum with Anchored and Cleared variants
- Anchor trie by computed state_root instead of block_hash
- Add parent_state_root to ExecutionEnv for continuation checks
- Feature-gate prune functions with cfg(feature = "std")
- Add reveal-on-first-try hitrate tracking to update_leaves
- Add prune stats (nodes_converted, hot_subtries_skipped)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 17:46:13 +01:00
Arsenii Kulikov
755d1e5f61 wip 2026-01-30 20:45:34 +04:00
Arsenii Kulikov
7d1bd7c9f8 wip 2026-01-30 20:41:19 +04:00
Arsenii Kulikov
93c2455cef wip 2026-01-30 20:39:51 +04:00
Arsenii Kulikov
6c661eb868 instrument 2026-01-30 20:36:25 +04:00
Matthias Seitz
c8b84404ae fix: typo and update revealed_node_count to size_hint in test
Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:41:50 +01:00
Arsenii Kulikov
b722a7f360 wip 2026-01-30 19:21:31 +04:00
Matthias Seitz
f0eb2aad7c refactor(trie): rename Heat structs to Modified and cleanup
- Rename HotSubtries to ModifiedSubtries
- Rename SubtrieHeat to SubtrieModifications
- Rename TrieHeatState to TrieModificationState
- Rename StorageTrieHeat to StorageTrieModifications
- Merge run/run_inner into single run method in sparse_trie.rs
- Feature-gate rayon::join with cfg(feature = "std")

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:14:41 +01:00
Arsenii Kulikov
ea6da11cf4 wip 2026-01-30 18:55:13 +04:00
Arsenii Kulikov
20029d3c1a wip 2026-01-30 18:52:21 +04:00
Arsenii Kulikov
40eb3bb7b4 wip 2026-01-30 18:42:24 +04:00
Matthias Seitz
8875c8da25 Merge branch 'main' into mattsse/sparse-trie-preservation 2026-01-30 15:35:11 +01:00
Arsenii Kulikov
d87b48c9fc wip 2026-01-30 18:31:52 +04:00
Arsenii Kulikov
95778a0cc1 wip 2026-01-30 18:26:01 +04:00
Arsenii Kulikov
17359dadd9 wip 2026-01-30 18:19:23 +04:00
Arsenii Kulikov
bab01a7bba wip 2026-01-30 17:27:51 +04:00
Arsenii Kulikov
82960045c9 wip 2026-01-30 17:25:15 +04:00
Brian Picciano
513fca16e9 always clear Cleared variant 2026-01-30 14:25:03 +01:00
Brian Picciano
ad242a2002 Restore conditional spawn of SparseTrieCacheTask 2026-01-30 14:21:15 +01:00
Arsenii Kulikov
840d6066a6 wip 2026-01-30 16:15:19 +04:00
Brian Picciano
1f315b4f3d Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-30 13:11:36 +01:00
Arsenii Kulikov
fa604aa3dc wip 2026-01-30 16:02:28 +04:00
Arsenii Kulikov
4b851acfdb wip 2026-01-30 15:53:15 +04:00
Arsenii Kulikov
785933a8ef wip 2026-01-30 15:38:08 +04:00
Arsenii Kulikov
eb5455de68 wip 2026-01-30 14:38:58 +04:00
Arsenii Kulikov
cb9bf1fe3a wip 2026-01-30 14:34:38 +04:00
Matthias Seitz
99973c7781 chore: add debug println statements for size hints and prune stats
Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:44:40 +01:00
Matthias Seitz
9d89d4cb5e feat(trie): add heat tracking to ParallelSparseTrie pruning
- Add SubtrieHeat to track modification frequency of lower subtries
- Hot subtries (heat > 0) are skipped during pruning to avoid expensive re-reveals
- Heat increments by 2 on modification, decays by 1 each prune cycle
- Add subtrie_for_path_mut_untracked for internal operations that shouldn't affect heat
- Update tests to account for heat decay cycles

Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:32:25 +01:00
Matthias Seitz
f07b37f029 fix: restore shrink_to logic to distribute capacity among all tries
Distribute capacity equally among account trie and all storage tries
(both active and cleared) for proper memory management.

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:20:23 +01:00
Matthias Seitz
10b87c8a2e feat(trie): add heat-based tracking with prune backlog for storage tries
- Replace generation-based recency tracking with heat-based tracking
- Track heat (incremented on access) and prune_backlog (cycles since last prune)
- Prune every other cycle using prune_backlog threshold
- Add StorageTriesPruneStats for debug logging of prune operations
- Use debug! logging instead of println for timing information

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 03:05:36 +01:00
Matthias Seitz
095d021969 fix: track heat on storage reveal instead of leaf update
Heat should be tracked when storage proofs are revealed (actual access),
not on leaf updates. This is more accurate because:
- Reveals indicate the storage was actually needed for this block
- Multiple updates to the same storage in one block count as one access
- The modified_this_cycle deduplication handles repeated reveals

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:09:28 +01:00
Matthias Seitz
0c192ef514 refactor: move StorageTrieHeat after StorageTries impl blocks
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:01:47 +01:00
Matthias Seitz
946c74a538 feat: replace generation-based recency with heat-based tracking for storage tries
- Add StorageTrieHeat struct with decay mechanism similar to SubtrieHeat
- Heat incremented by 2 on actual leaf updates (not insert/remove operations)
- Heat decays by 1 each prune cycle for unmodified tries
- Score = size * heat_multiplier for smarter eviction decisions
- Only prune hot tries (already pruned cold ones in previous cycles)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:00:18 +01:00
Matthias Seitz
2455fa4ff5 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:17:10 +01:00
Matthias Seitz
0585dc1180 chore: add detailed timing debug for StorageTries::prune
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:08:32 +01:00
Matthias Seitz
8b985c102a chore: add timing debug for prune branches
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:07:19 +01:00
Matthias Seitz
6fefbfc65f chore: remove unused requested_proof_targets field, add timing debug
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:01:35 +01:00
Matthias Seitz
2012602909 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:44:21 +01:00
Matthias Seitz
c84b3475a4 chore: simplify take_updates and remove SparseTrieUpdates::with_capacity
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:40:58 +01:00
Arsenii Kulikov
f3107565f3 clippy 2026-01-30 03:25:42 +04:00
Arsenii Kulikov
ac6a0fa372 wip 2026-01-30 03:14:57 +04:00
Arsenii Kulikov
5899cd4188 fix 2026-01-30 03:14:42 +04:00
Arsenii Kulikov
a3b76591b7 wip 2026-01-30 03:08:49 +04:00
Matthias Seitz
ab84d61f91 chore: simplify take_updates implementation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:59:44 +01:00
Matthias Seitz
8c1724af13 chore: remove parallel iteration from shrink_to
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:58:05 +01:00
Georgios Konstantopoulos
3f6354c2a4 chore: fmt
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:52:38 +00:00
Matthias Seitz
ba64eb5fc7 merge: origin/main into mattsse/sparse-trie-preservation
Merged upstream changes with the sparse trie preservation feature:
- Resolved conflicts in TreeConfig, payload processor, and sparse trie modules
- Kept simplified generation-based recency tracking for storage tries
- Preserved score-based pruning strategy (size * recency)
- Retained size_hint() O(1) optimization for SparseTrieExt

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:50:50 +01:00
Arsenii Kulikov
1b2935763b wip 2026-01-30 01:16:06 +04:00
Matthias Seitz
cf3eef1874 chore: remove debug timing prints
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:49 +01:00
Matthias Seitz
cc3f3d7062 perf(trie): simplify sparse trie pruning and shrinking
- Add size_hint() to SparseTrieExt trait for O(1) size estimation
- Simplify storage trie eviction: keep top N by score (size * recency_multiplier)
- Only prune recently modified tries (age <= 2) above size threshold (1000 nodes)
- Fix shrink_to: don't shrink cleared tries, simpler capacity distribution
- Remove LRU dirty tracking complexity in favor of generation-based recency

Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:04 +01:00
Matthias Seitz
92ce4f3af0 wip 2026-01-29 20:54:57 +01:00
Georgios Konstantopoulos
28e5911482 Revert "fix(trie): address clippy lints for sparse trie pruning"
This reverts commit 257ccd41e9.
2026-01-29 18:35:03 +00:00
Georgios Konstantopoulos
257ccd41e9 fix(trie): address clippy lints for sparse trie pruning
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 18:32:18 +00:00
Matthias Seitz
a53ef64a54 perf(trie): optimize prune with LRU eviction and dirty tracking
- Add generation-based LRU tracking for storage tries
- Only prune dirty tries (modified since last prune)
- Evict non-dirty tries first when over limit, then LRU evict oldest
- Use O(n) select_nth_unstable for eviction instead of O(n log n) sort
- Parallelize shrink_to for storage tries
- Track trie modifications via insert_storage_trie

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0a25-8a84-7739-b5eb-ccf486a76629
2026-01-29 18:10:35 +01:00
Brian Picciano
74f9e386ed add histogram for reuse timing 2026-01-29 15:20:12 +01:00
Brian Picciano
45ff349b00 refactor: move sparse trie constants to reth-engine-primitives
Remove DEFAULT_SPARSE_TRIE_PRUNE_DEPTH and DEFAULT_MAX_PRESERVED_STORAGE_TRIES
from reth-trie-sparse and define them in reth-engine-primitives instead.
2026-01-29 13:47:46 +00:00
Brian Picciano
d057c4e9e6 refactor: also reuse DEFAULT_SPARSE_TRIE_PRUNE_DEPTH from reth-trie-sparse 2026-01-29 13:46:36 +00:00
Brian Picciano
405b40f02f refactor: address review comments
- Reuse DEFAULT_MAX_PRESERVED_STORAGE_TRIES from reth-trie-sparse instead of duplicating constant
- Apply take_updates with_capacity pattern to sparse/src/trie.rs to match sparse-parallel
2026-01-29 13:45:23 +00:00
Brian Picciano
868fffe0dd nitpick comment 2026-01-29 14:29:10 +01:00
Brian Picciano
b288c4e259 Fix PST::take_updates not replacing with Some if was previously enabled 2026-01-29 14:27:15 +01:00
Brian Picciano
4e26a6edc9 Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-29 12:48:51 +01:00
Matthias Seitz
26f788d5ca feat(engine): enable sparse trie structure reuse for continuation payloads
When a new payload's parent matches the preserved trie's block hash,
the pruned trie structure is now reused directly instead of being cleared.
This enables incremental trie updates without rebuilding from scratch.

For non-continuation payloads, the trie is still cleared but allocations
are preserved for memory efficiency.

Amp-Thread-ID: https://ampcode.com/threads/T-019c06cb-b73a-756c-963b-c1ef3db6a6bb
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:30 +01:00
Matthias Seitz
9433ac5666 fix(engine): prevent race condition in sparse trie preservation
Acquire guard before sending state root result to ensure the next block's
take() blocks until we've stored the trie for reuse.

Also always clear the trie (keeping allocations) because the pruned trie
cannot be directly reused - prune() deletes values needed for state root
computation and clears revealed_account_paths which breaks proof filtering.

Amp-Thread-ID: https://ampcode.com/threads/T-019c060d-2327-741f-b8d2-6d246e5d521e
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:26 +01:00
Matthias Seitz
0796a20f24 fix(engine): clear trie on failed state root computation
A failed computation may leave the trie in a partially updated state.
Only preserve the trie for reuse when computation succeeds.

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
2026-01-28 18:20:09 +01:00
Matthias Seitz
2633d3e513 refactor(engine): improve sparse trie task API and error handling
- Change run() to take &mut self instead of consuming self
- Add into_cleared_trie() for invalid/cancelled payloads (skips pruning)
- Clear trie when state root receiver is dropped
- Improve documentation with cross-references

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:55:16 +01:00
Matthias Seitz
1f8fb7e58f feat(engine): preserve sparse trie across payload validations
Implement sparse trie preservation to reduce memory allocations when
validating consecutive blocks (parent-child relationship).

Changes:
- Add PreservedSparseTrie to store trie with its computed block hash
- Add SharedPreservedSparseTrie newtype for cleaner handle passing
- Add prune_depth and max_storage_tries config to TreeConfig
- Add SparseStateTrie::clear() and shrink_to() methods
- Prune trie after sending state root result (non-blocking)
- Reuse pruned trie when new payload's parent matches stored block hash
- Clear trie (keeping allocations) when not a continuation

RETH-183

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:44:55 +01:00
235 changed files with 2450 additions and 6293 deletions

View File

@@ -1,20 +0,0 @@
# Changelogs configuration for reth
# https://github.com/wevm/changelogs
# How to bump packages that depend on changed packages
dependent_bump = "patch"
[changelog]
# Generate per-crate changelogs (vs single root changelog)
format = "per-crate"
# Fixed groups: all always share the same version
# reth binaries share version
[[fixed]]
members = ["reth", "op-reth"]
# Packages to ignore (internal/test-only crates)
ignore = [
"reth-testing-utils",
"reth-bench",
]

View File

@@ -1,5 +0,0 @@
---
reth-engine-tree: patch
---
Reordered cache size calculations in `ExecutionCache::new` to group related operations together.

View File

@@ -1,6 +0,0 @@
---
reth: patch
op-reth: patch
---
Added automated changelog generation infrastructure using wevm/changelogs-rs with Claude Code integration. Configured per-crate changelog format with fixed version groups for reth binaries and exclusions for internal test utilities.

View File

@@ -1,5 +0,0 @@
---
reth: patch
---
Updated Alloy dependencies from 1.5.2 to 1.6.1.

View File

@@ -1,59 +0,0 @@
#!/usr/bin/env bash
# Verifies that Docker images have the expected architectures.
#
# Usage:
# ./verify_image_arch.sh <targets> <registry> <ethereum_tags> <optimism_tags>
#
# Environment:
# DRY_RUN=true - Skip actual verification, just print what would be checked.
set -euo pipefail
TARGETS="${1:-}"
REGISTRY="${2:-}"
ETHEREUM_TAGS="${3:-}"
OPTIMISM_TAGS="${4:-}"
DRY_RUN="${DRY_RUN:-false}"
verify_image() {
local image="$1"
shift
local expected_archs=("$@")
echo "Checking $image..."
if [[ "$DRY_RUN" == "true" ]]; then
echo " [dry-run] Would verify architectures: ${expected_archs[*]}"
return 0
fi
manifest=$(docker manifest inspect "$image" 2>/dev/null) || {
echo "::error::Failed to inspect manifest for $image"
return 1
}
for arch in "${expected_archs[@]}"; do
if ! echo "$manifest" | jq -e ".manifests[] | select(.platform.architecture == \"$arch\" and .platform.os == \"linux\")" > /dev/null; then
echo "::error::Missing architecture $arch for $image"
return 1
fi
echo " ✓ linux/$arch"
done
}
if [[ "$TARGETS" == *"nightly"* ]]; then
verify_image "${REGISTRY}/reth:nightly" amd64 arm64
verify_image "${REGISTRY}/op-reth:nightly" amd64 arm64
verify_image "${REGISTRY}/reth:nightly-profiling" amd64
verify_image "${REGISTRY}/reth:nightly-edge-profiling" amd64
verify_image "${REGISTRY}/op-reth:nightly-profiling" amd64
else
for tag in $(echo "$ETHEREUM_TAGS" | tr ',' ' '); do
verify_image "$tag" amd64 arm64
done
for tag in $(echo "$OPTIMISM_TAGS" | tr ',' ' '); do
verify_image "$tag" amd64 arm64
done
fi
echo "All image architectures verified successfully"

View File

@@ -1,21 +0,0 @@
name: Changelog
on:
workflow_dispatch:
jobs:
changelog:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
ref: ${{ github.head_ref }}
- run: npm install -g @anthropic-ai/claude-code
- uses: wevm/changelogs-rs/gen@master
with:
ai: 'claude -p'
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}

View File

@@ -102,13 +102,3 @@ jobs:
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
optimism.tags=${{ steps.params.outputs.optimism_tags }}
- name: Verify image architectures
env:
DRY_RUN: ${{ github.event_name == 'workflow_dispatch' && inputs.dry_run }}
run: |
./.github/scripts/verify_image_arch.sh \
"${{ steps.params.outputs.targets }}" \
"ghcr.io/${{ github.repository_owner }}" \
"${{ steps.params.outputs.ethereum_tags }}" \
"${{ steps.params.outputs.optimism_tags }}"

View File

@@ -90,7 +90,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
- run: cargo nextest run --release -p ef-tests --features "asm-keccak ef-tests"
doc:
name: doc tests

View File

@@ -38,7 +38,7 @@ Reth is a high-performance Ethereum execution client written in Rust, focusing o
2. **Linting**: Run clippy with all features
```bash
cargo +nightly clippy --workspace --lib --examples --tests --benches --all-features
RUSTFLAGS="-D warnings" cargo +nightly clippy --workspace --lib --examples --tests --benches --all-features --locked
```
3. **Testing**: Use nextest for faster test execution
@@ -169,11 +169,12 @@ Based on PR patterns, avoid:
Before submitting changes, ensure:
1. **Format Check**: `cargo +nightly fmt --all --check`
2. **Clippy**: No warnings
2. **Clippy**: No warnings with `RUSTFLAGS="-D warnings"`
3. **Tests Pass**: All unit and integration tests
4. **Documentation**: Update relevant docs and add doc comments with `cargo docs --document-private-items`
5. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
### Opening PRs against <https://github.com/paradigmxyz/reth>
Label PRs appropriately, first check the available labels and then apply the relevant ones:
@@ -348,10 +349,10 @@ Let's say you want to fix a bug where external IP resolution fails on startup:
}
```
5. **Run checks** (IMPORTANT!):
5. **Run checks**:
```bash
cargo +nightly fmt --all
cargo clippy --workspace --all-features # Make sure WHOLE WORKSPACE compiles!
cargo clippy --all-features
cargo test -p reth-discv4
```
@@ -373,7 +374,7 @@ Let's say you want to fix a bug where external IP resolution fails on startup:
cargo +nightly fmt --all
# Run lints
cargo +nightly clippy --workspace --all-features
RUSTFLAGS="-D warnings" cargo +nightly clippy --workspace --all-features --locked
# Run tests
cargo nextest run --workspace

813
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -480,7 +480,7 @@ revm-primitives = { version = "22.0.0", default-features = false }
revm-interpreter = { version = "32.0.0", default-features = false }
revm-database-interface = { version = "9.0.0", default-features = false }
op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
revm-inspectors = "0.34.1"
# eth
alloy-dyn-abi = "1.5.4"
@@ -490,42 +490,42 @@ alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-evm = { version = "0.27.0", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.6.1", default-features = false }
alloy-contract = { version = "1.6.1", default-features = false }
alloy-eips = { version = "1.6.1", default-features = false }
alloy-genesis = { version = "1.6.1", default-features = false }
alloy-json-rpc = { version = "1.6.1", default-features = false }
alloy-network = { version = "1.6.1", default-features = false }
alloy-network-primitives = { version = "1.6.1", default-features = false }
alloy-provider = { version = "1.6.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.6.1", default-features = false }
alloy-rpc-client = { version = "1.6.1", default-features = false }
alloy-rpc-types = { version = "1.6.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.6.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.1", default-features = false }
alloy-rpc-types-debug = { version = "1.6.1", default-features = false }
alloy-rpc-types-engine = { version = "1.6.1", default-features = false }
alloy-rpc-types-eth = { version = "1.6.1", default-features = false }
alloy-rpc-types-mev = { version = "1.6.1", default-features = false }
alloy-rpc-types-trace = { version = "1.6.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.1", default-features = false }
alloy-serde = { version = "1.6.1", default-features = false }
alloy-signer = { version = "1.6.1", default-features = false }
alloy-signer-local = { version = "1.6.1", default-features = false }
alloy-transport = { version = "1.6.1" }
alloy-transport-http = { version = "1.6.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.6.1", default-features = false }
alloy-transport-ws = { version = "1.6.1", default-features = false }
alloy-consensus = { version = "1.5.2", default-features = false }
alloy-contract = { version = "1.5.2", default-features = false }
alloy-eips = { version = "1.5.2", default-features = false }
alloy-genesis = { version = "1.5.2", default-features = false }
alloy-json-rpc = { version = "1.5.2", default-features = false }
alloy-network = { version = "1.5.2", default-features = false }
alloy-network-primitives = { version = "1.5.2", default-features = false }
alloy-provider = { version = "1.5.2", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.5.2", default-features = false }
alloy-rpc-client = { version = "1.5.2", default-features = false }
alloy-rpc-types = { version = "1.5.2", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.5.2", default-features = false }
alloy-rpc-types-anvil = { version = "1.5.2", default-features = false }
alloy-rpc-types-beacon = { version = "1.5.2", default-features = false }
alloy-rpc-types-debug = { version = "1.5.2", default-features = false }
alloy-rpc-types-engine = { version = "1.5.2", default-features = false }
alloy-rpc-types-eth = { version = "1.5.2", default-features = false }
alloy-rpc-types-mev = { version = "1.5.2", default-features = false }
alloy-rpc-types-trace = { version = "1.5.2", default-features = false }
alloy-rpc-types-txpool = { version = "1.5.2", default-features = false }
alloy-serde = { version = "1.5.2", default-features = false }
alloy-signer = { version = "1.5.2", default-features = false }
alloy-signer-local = { version = "1.5.2", default-features = false }
alloy-transport = { version = "1.5.2" }
alloy-transport-http = { version = "1.5.2", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.5.2", default-features = false }
alloy-transport-ws = { version = "1.5.2", default-features = false }
# op
alloy-op-evm = { version = "0.27.2", default-features = false }
alloy-op-evm = { version = "0.27.0", default-features = false }
alloy-op-hardforks = "0.4.4"
op-alloy-rpc-types = { version = "0.23.1", default-features = false }
op-alloy-rpc-types-engine = { version = "0.23.1", default-features = false }
@@ -543,7 +543,7 @@ backon = { version = "1.2", default-features = false, features = ["std-blocking-
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
bytes = { version = "1.11.1", default-features = false }
bytes = { version = "1.5", default-features = false }
brotli = "8"
cfg-if = "1.0"
clap = "4"
@@ -560,9 +560,9 @@ humantime-serde = "1.1"
itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
modular-bitfield = "0.11.2"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
nybbles = { version = "0.4.8", default-features = false }
nybbles = { version = "0.4.2", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
parking_lot = "0.12"
paste = "1.0"
@@ -589,13 +589,13 @@ zstd = "0.13"
byteorder = "1"
fixed-cache = { version = "0.1.7", features = ["stats"] }
moka = "0.12"
tar-no-std = { version = "0.4.2", default-features = false }
miniz_oxide = { version = "0.9.0", default-features = false }
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
chrono = "0.4.41"
# metrics
metrics = "0.24.0"
metrics-derive = "0.1.1"
metrics-derive = "0.1"
metrics-exporter-prometheus = { version = "0.18.0", default-features = false }
metrics-process = "2.1.0"
metrics-util = { default-features = false, version = "0.20.0" }
@@ -607,7 +607,7 @@ quote = "1.0"
# tokio
tokio = { version = "1.44.2", default-features = false }
tokio-stream = "0.1.11"
tokio-tungstenite = "0.28.0"
tokio-tungstenite = "0.26.2"
tokio-util = { version = "0.7.4", features = ["codec"] }
# async
@@ -620,7 +620,7 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -640,6 +640,7 @@ jsonrpsee-types = "0.26.0"
http = "1.0"
http-body = "1.0"
http-body-util = "0.1.2"
jsonwebtoken = "9"
proptest-arbitrary-interop = "0.1.0"
# crypto
@@ -653,7 +654,7 @@ rand_08 = { package = "rand", version = "0.8" }
c-kzg = "2.1.5"
# config
toml = "0.9"
toml = "0.8"
# rocksdb
rocksdb = { version = "0.24" }
@@ -672,16 +673,16 @@ assert_matches = "1.5.0"
criterion = { package = "codspeed-criterion-compat", version = "4.3" }
insta = "1.41"
proptest = "1.7"
proptest-derive = "0.7"
proptest-derive = "0.5"
similar-asserts = { version = "1.5.0", features = ["serde"] }
tempfile = "3.20"
test-fuzz = "7"
rstest = "0.26.1"
rstest = "0.24.0"
test-case = "3"
# ssz encoding
ethereum_ssz = "0.10.1"
ethereum_ssz_derive = "0.10.1"
ethereum_ssz = "0.9.0"
ethereum_ssz_derive = "0.9.0"
# allocators
jemalloc_pprof = { version = "0.8", default-features = false }
@@ -693,14 +694,14 @@ snmalloc-rs = { version = "0.3.7", features = ["build_cc"] }
aes = "0.8.1"
ahash = "0.8"
anyhow = "1.0"
bindgen = { version = "0.72", default-features = false }
block-padding = "0.3"
bindgen = { version = "0.71", default-features = false }
block-padding = "0.3.2"
cc = "1.2.15"
cipher = "0.4.3"
comfy-table = "7.0"
concat-kdf = "0.1.0"
crossbeam-channel = "0.5.13"
crossterm = "0.29.0"
crossterm = "0.28.0"
csv = "1.3.0"
ctrlc = "3.4"
ctr = "0.9.2"
@@ -713,7 +714,7 @@ hmac = "0.12.1"
human_bytes = "0.4.1"
indexmap = "2"
interprocess = "2.2.0"
lz4_flex = { version = "0.12", default-features = false }
lz4_flex = { version = "0.11", default-features = false }
memmap2 = "0.9.4"
mev-share-sse = { version = "0.5.0", default-features = false }
num-traits = "0.2.15"
@@ -721,15 +722,15 @@ page_size = "0.6.0"
parity-scale-codec = "3.2.1"
plain_hasher = "0.2"
pretty_assertions = "1.4"
ratatui = { version = "0.30", default-features = false }
ringbuffer = "0.16.0"
ratatui = { version = "0.29", default-features = false }
ringbuffer = "0.15.0"
rmp-serde = "1.3"
roaring = "0.11.3"
roaring = "0.10.2"
rolling-file = "0.2.0"
sha3 = "0.10.5"
snap = "1.1.1"
socket2 = { version = "0.6", default-features = false }
sysinfo = { version = "0.38", default-features = false }
socket2 = { version = "0.5", default-features = false }
sysinfo = { version = "0.33", default-features = false }
tracing-journald = "0.3"
tracing-logfmt = "=0.3.5"
tracing-samply = "0.1"

View File

@@ -29,8 +29,9 @@ ARG MANIFEST_PATH=bin/reth
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
# Extra Cargo flags (can be overridden, otherwise set per-platform below)
# Extra Cargo flags
ARG RUSTFLAGS=""
ENV RUSTFLAGS="$RUSTFLAGS"
# Extra Cargo features
ARG FEATURES=""
@@ -45,18 +46,11 @@ ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Build application
# Platform-specific RUSTFLAGS: amd64 uses x86-64-v3 (Haswell+) with pclmulqdq for rocksdb
ARG TARGETPLATFORM
COPY --exclude=.git . .
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
fi && \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true

View File

@@ -56,7 +56,7 @@ ctrlc.workspace = true
shlex.workspace = true
[target.'cfg(unix)'.dependencies]
nix = { version = "0.31", features = ["signal", "process"] }
nix = { version = "0.29", features = ["signal", "process"] }
[features]
default = ["jemalloc"]

View File

@@ -186,12 +186,10 @@ impl BenchmarkRunner {
&output_dir.to_string_lossy(),
]);
// Configure wait mode: both can be used together
// When both are set: wait at least wait_time, and also wait for persistence if needed
// Configure wait mode: wait-time takes precedence over persistence-based flow
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
}
if self.wait_for_persistence {
} else if self.wait_for_persistence {
cmd.arg("--wait-for-persistence");
// Add persistence threshold if specified

View File

@@ -116,9 +116,9 @@ pub(crate) struct Args {
/// Optional fixed delay between engine API calls (passed to reth-bench).
///
/// Can be combined with `--wait-for-persistence`: when both are set,
/// waits at least this duration, and also waits for persistence if needed.
#[arg(long, value_name = "DURATION")]
/// When set, reth-bench uses wait-time mode and disables persistence-based flow.
/// This flag remains for compatibility with older scripts.
#[arg(long, value_name = "DURATION", hide = true)]
pub wait_time: Option<String>,
/// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
@@ -126,9 +126,6 @@ pub(crate) struct Args {
/// When enabled, waits for every Nth block to be persisted using the
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
/// doesn't outpace persistence.
///
/// Can be combined with `--wait-time`: when both are set, waits at least
/// wait-time, and also waits for persistence if the block hasn't been persisted yet.
#[arg(long)]
pub wait_for_persistence: bool,

View File

@@ -45,7 +45,7 @@ op-alloy-consensus = { workspace = true, features = ["alloy-compat"] }
op-alloy-rpc-types-engine = { workspace = true, features = ["serde"] }
# reqwest
reqwest.workspace = true
reqwest = { workspace = true, default-features = false, features = ["rustls-tls-native-roots"] }
# tower
tower.workspace = true

View File

@@ -572,22 +572,13 @@ impl Command {
for i in 0..self.count {
// Get initial batch of transactions for this payload
let Some(mut result) = tx_buffer.take_batch().await else {
info!(
payloads_built = i,
payloads_requested = self.count,
"Transaction source exhausted, stopping"
);
break;
};
let mut result = tx_buffer
.take_batch()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if result.transactions.is_empty() {
info!(
payloads_built = i,
payloads_requested = self.count,
"No more transactions available, stopping"
);
break;
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
// Build with retry - may need to request more transactions

View File

@@ -96,23 +96,9 @@ impl Command {
);
}
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
// Set up waiter based on configured options (duration takes precedence)
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(
self.benchmark.ws_rpc_url.as_deref(),

View File

@@ -4,8 +4,6 @@
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
//! - **Persistence-based waits**: Wait for blocks to be persisted using
//! `reth_subscribePersistedBlock` subscription
//! - **Combined mode**: Wait at least the fixed duration, and also wait for persistence if the
//! block hasn't been persisted yet (whichever takes longer)
use alloy_eips::BlockNumHash;
use alloy_network::Ethereum;
@@ -221,39 +219,14 @@ impl PersistenceWaiter {
}
}
/// Creates a waiter that combines both duration and persistence waiting.
///
/// Waits at least `wait_time` between blocks, and also waits for persistence
/// if the block hasn't been persisted yet (whichever takes longer).
pub(crate) const fn with_duration_and_subscription(
wait_time: Duration,
subscription: PersistenceSubscription,
threshold: u64,
timeout: Duration,
) -> Self {
Self {
wait_time: Some(wait_time),
subscription: Some(subscription),
blocks_sent: 0,
last_persisted: 0,
threshold,
timeout,
}
}
/// Called once per block. Waits based on the configured mode.
///
/// When both `wait_time` and `subscription` are set (combined mode):
/// - Always waits at least `wait_time`
/// - Additionally waits for persistence if we're at a persistence checkpoint
#[allow(clippy::manual_is_multiple_of)]
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
// Always wait for the fixed duration if configured
if let Some(wait_time) = self.wait_time {
tokio::time::sleep(wait_time).await;
return Ok(());
}
// Check persistence if subscription is configured
let Some(ref mut subscription) = self.subscription else {
return Ok(());
};

View File

@@ -148,20 +148,9 @@ impl Command {
);
}
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
// Set up waiter based on configured options (duration takes precedence)
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;

View File

@@ -6,7 +6,7 @@ use crate::{
};
use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
use alloy_primitives::{map::B256Map, BlockNumber, TxHash, B256};
use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_ethereum_primitives::EthPrimitives;
@@ -57,7 +57,7 @@ pub(crate) struct InMemoryStateMetrics {
#[derive(Debug, Default)]
pub(crate) struct InMemoryState<N: NodePrimitives = EthPrimitives> {
/// All canonical blocks that are not on disk yet.
blocks: RwLock<B256Map<Arc<BlockState<N>>>>,
blocks: RwLock<HashMap<B256, Arc<BlockState<N>>>>,
/// Mapping of block numbers to block hashes.
numbers: RwLock<BTreeMap<u64, B256>>,
/// The pending block that has not yet been made canonical.
@@ -68,7 +68,7 @@ pub(crate) struct InMemoryState<N: NodePrimitives = EthPrimitives> {
impl<N: NodePrimitives> InMemoryState<N> {
pub(crate) fn new(
blocks: B256Map<Arc<BlockState<N>>>,
blocks: HashMap<B256, Arc<BlockState<N>>>,
numbers: BTreeMap<u64, B256>,
pending: Option<BlockState<N>>,
) -> Self {
@@ -184,7 +184,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// Create a new in-memory state with the given blocks, numbers, pending state, and optional
/// finalized header.
pub fn new(
blocks: B256Map<Arc<BlockState<N>>>,
blocks: HashMap<B256, Arc<BlockState<N>>>,
numbers: BTreeMap<u64, B256>,
pending: Option<BlockState<N>>,
finalized: Option<SealedHeader<N::BlockHeader>>,
@@ -209,7 +209,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// Create an empty state.
pub fn empty() -> Self {
Self::new(B256Map::default(), BTreeMap::new(), None, None, None)
Self::new(HashMap::default(), BTreeMap::new(), None, None, None)
}
/// Create a new in memory state with the given local head and finalized header
@@ -1176,7 +1176,7 @@ mod tests {
#[test]
fn test_in_memory_state_impl_state_by_hash() {
let mut state_by_hash = B256Map::default();
let mut state_by_hash = HashMap::default();
let number = rand::rng().random::<u64>();
let mut test_block_builder: TestBlockBuilder = TestBlockBuilder::default();
let state = Arc::new(create_mock_state(&mut test_block_builder, number, B256::random()));
@@ -1190,7 +1190,7 @@ mod tests {
#[test]
fn test_in_memory_state_impl_state_by_number() {
let mut state_by_hash = B256Map::default();
let mut state_by_hash = HashMap::default();
let mut hash_by_number = BTreeMap::new();
let number = rand::rng().random::<u64>();
@@ -1209,7 +1209,7 @@ mod tests {
#[test]
fn test_in_memory_state_impl_head_state() {
let mut state_by_hash = B256Map::default();
let mut state_by_hash = HashMap::default();
let mut hash_by_number = BTreeMap::new();
let mut test_block_builder: TestBlockBuilder = TestBlockBuilder::default();
let state1 = Arc::new(create_mock_state(&mut test_block_builder, 1, B256::random()));
@@ -1237,7 +1237,7 @@ mod tests {
let pending_hash = pending_state.hash();
let in_memory_state =
InMemoryState::new(B256Map::default(), BTreeMap::new(), Some(pending_state));
InMemoryState::new(HashMap::default(), BTreeMap::new(), Some(pending_state));
let result = in_memory_state.pending_state();
assert!(result.is_some());
@@ -1249,7 +1249,7 @@ mod tests {
#[test]
fn test_in_memory_state_impl_no_pending_state() {
let in_memory_state: InMemoryState =
InMemoryState::new(B256Map::default(), BTreeMap::new(), None);
InMemoryState::new(HashMap::default(), BTreeMap::new(), None);
assert_eq!(in_memory_state.pending_state(), None);
}
@@ -1380,7 +1380,7 @@ mod tests {
let state2 = Arc::new(BlockState::with_parent(block2.clone(), Some(state1.clone())));
let state3 = Arc::new(BlockState::with_parent(block3.clone(), Some(state2.clone())));
let mut blocks = B256Map::default();
let mut blocks = HashMap::default();
blocks.insert(block1.recovered_block().hash(), state1);
blocks.insert(block2.recovered_block().hash(), state2);
blocks.insert(block3.recovered_block().hash(), state3);
@@ -1427,7 +1427,7 @@ mod tests {
fn test_canonical_in_memory_state_canonical_chain_single_block() {
let block = TestBlockBuilder::eth().get_executed_block_with_number(1, B256::random());
let hash = block.recovered_block().hash();
let mut blocks = B256Map::default();
let mut blocks = HashMap::default();
blocks.insert(hash, Arc::new(BlockState::new(block)));
let mut numbers = BTreeMap::new();
numbers.insert(1, hash);

View File

@@ -541,7 +541,7 @@ impl<H: BlockHeader> ChainSpec<H> {
}
}
bf_params.first().map(|(_, params)| *params).unwrap_or_else(BaseFeeParams::ethereum)
bf_params.first().map(|(_, params)| *params).unwrap_or(BaseFeeParams::ethereum())
}
}
}

View File

@@ -133,4 +133,4 @@ arbitrary = [
"reth-ethereum-primitives/arbitrary",
]
edge = ["reth-db-common/edge", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
edge = ["reth-db-common/edge", "reth-stages/rocksdb", "reth-provider/rocksdb"]

View File

@@ -121,16 +121,14 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
let genesis_block_number = self.chain.genesis().number.unwrap_or_default();
let (db, sfp) = match access {
AccessRights::RW => (
init_db(db_path, self.db.database_args())?,
Arc::new(init_db(db_path, self.db.database_args())?),
StaticFileProviderBuilder::read_write(sf_path)
.with_metrics()
.with_genesis_block_number(genesis_block_number)
.build()?,
),
AccessRights::RO | AccessRights::RoInconsistent => {
(open_db_read_only(&db_path, self.db.database_args())?, {
(Arc::new(open_db_read_only(&db_path, self.db.database_args())?), {
let provider = StaticFileProviderBuilder::read_only(sf_path)
.with_metrics()
.with_genesis_block_number(genesis_block_number)
.build()?;
provider.watch_directory();
@@ -162,16 +160,16 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
fn create_provider_factory<N: CliNodeTypes>(
&self,
config: &Config,
db: DatabaseEnv,
db: Arc<DatabaseEnv>,
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let prune_modes = config.prune.segments.clone();
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, DatabaseEnv>>::new(
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::new(
db,
self.chain.clone(),
static_file_provider,
@@ -202,7 +200,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
// Builds and executes an unwind-only pipeline
let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, DatabaseEnv>>::builder()
let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder()
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
@@ -231,7 +229,7 @@ pub struct Environment<N: NodeTypes> {
/// Configuration for reth node
pub config: Config,
/// Provider factory.
pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
/// Datadir path.
pub data_dir: ChainPath<DataDirPath>,
}
@@ -263,8 +261,8 @@ impl AccessRights {
/// Helper alias to satisfy `FullNodeTypes` bound on [`Node`] trait generic.
type FullTypesAdapter<T> = FullNodeTypesAdapter<
T,
DatabaseEnv,
BlockchainProvider<NodeTypesWithDBAdapter<T, DatabaseEnv>>,
Arc<DatabaseEnv>,
BlockchainProvider<NodeTypesWithDBAdapter<T, Arc<DatabaseEnv>>>,
>;
/// Helper trait with a common set of requirements for the

View File

@@ -17,6 +17,7 @@ use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProvider
use reth_static_file_types::StaticFileSegment;
use std::{
hash::{BuildHasher, Hasher},
sync::Arc,
time::{Duration, Instant},
};
use tracing::{info, warn};
@@ -89,7 +90,7 @@ impl Command {
/// Execute `db checksum` command
pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
self,
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
) -> eyre::Result<()> {
warn!("This command should be run without the node running!");
@@ -116,7 +117,7 @@ fn checksum_hasher() -> impl Hasher {
}
fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
segment: StaticFileSegment,
start_block: Option<u64>,
end_block: Option<u64>,

View File

@@ -9,7 +9,7 @@ use reth_db_api::table::Table;
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDBAdapter;
use reth_provider::RocksDBProviderFactory;
use std::{hash::Hasher, time::Instant};
use std::{hash::Hasher, sync::Arc, time::Instant};
use tracing::info;
/// RocksDB tables that can be checksummed.
@@ -36,7 +36,7 @@ impl RocksDbTable {
/// Computes a checksum for a RocksDB table.
pub fn checksum_rocksdb<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
table: RocksDbTable,
limit: Option<usize>,
) -> eyre::Result<()> {

View File

@@ -16,6 +16,7 @@ use std::{
hash::Hash,
io::Write,
path::{Path, PathBuf},
sync::Arc,
};
use tracing::{info, warn};
@@ -55,7 +56,7 @@ impl Command {
/// then written to a file in the output directory.
pub fn execute<T: NodeTypes>(
self,
tool: &DbTool<NodeTypesWithDBAdapter<T, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<T, Arc<DatabaseEnv>>>,
) -> eyre::Result<()> {
warn!("Make sure the node is not running when running `reth db diff`!");
// open second db

View File

@@ -7,7 +7,7 @@ use reth_db::{transaction::DbTx, DatabaseEnv};
use reth_db_api::{database::Database, table::Table, RawValue, TableViewer, Tables};
use reth_db_common::{DbTool, ListFilter};
use reth_node_builder::{NodeTypes, NodeTypesWithDBAdapter};
use std::cell::RefCell;
use std::{cell::RefCell, sync::Arc};
use tracing::error;
#[derive(Parser, Debug)]
@@ -55,7 +55,7 @@ impl Command {
/// Execute `db list` command
pub fn execute<N: NodeTypes<ChainSpec: EthereumHardforks>>(
self,
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
) -> eyre::Result<()> {
self.table.view(&ListTableViewer { tool, args: &self })
}
@@ -89,7 +89,7 @@ impl Command {
}
struct ListTableViewer<'a, N: NodeTypes> {
tool: &'a DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &'a DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
args: &'a Command,
}

View File

@@ -11,7 +11,6 @@ use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_provider::providers::ProviderNodeTypes;
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
use reth_tasks::spawn_scoped_os_thread;
use std::{
collections::BTreeSet,
thread,
@@ -231,7 +230,7 @@ impl Command {
thread::scope(|s| {
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
spawn_scoped_os_thread(s, "db-state-worker", move || {
s.spawn(move || {
loop {
// Get next chunk to process
let chunk_idx = {

View File

@@ -16,7 +16,7 @@ use reth_provider::{
RocksDBProviderFactory,
};
use reth_static_file_types::SegmentRangeInclusive;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
#[derive(Parser, Debug)]
/// The arguments for the `reth db stats` command
@@ -48,7 +48,7 @@ impl Command {
pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
self,
data_dir: ChainPath<DataDirPath>,
tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
) -> eyre::Result<()> {
if self.checksum {
let checksum_report = self.checksum_report(tool)?;
@@ -72,7 +72,7 @@ impl Command {
Ok(())
}
fn db_stats_table<N: NodeTypesWithDB<DB = DatabaseEnv>>(
fn db_stats_table<N: NodeTypesWithDB<DB = Arc<DatabaseEnv>>>(
&self,
tool: &DbTool<N>,
) -> eyre::Result<ComfyTable> {

View File

@@ -227,9 +227,8 @@ where
// Handle errors
if let Err(err) = res {
error!("{err}");
error!("{:?}", err)
}
Ok(())
}
}
@@ -242,7 +241,6 @@ fn event_loop<B: Backend, F, T: Table>(
) -> io::Result<()>
where
F: FnMut(usize, usize) -> Vec<TableRow<T>>,
io::Error: From<B::Error>,
{
let mut last_tick = Instant::now();
let mut running = true;

View File

@@ -2,7 +2,7 @@ use futures::Future;
use reth_cli::chainspec::ChainSpecParser;
use reth_db::DatabaseEnv;
use reth_node_builder::{NodeBuilder, WithLaunchContext};
use std::fmt;
use std::{fmt, sync::Arc};
/// A trait for launching a reth node with custom configuration strategies.
///
@@ -30,7 +30,7 @@ where
/// * `builder_args` - Extension arguments for configuration
fn entrypoint(
self,
builder: WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
builder: WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
builder_args: Ext,
) -> impl Future<Output = eyre::Result<()>>;
}
@@ -58,7 +58,7 @@ impl<F> FnLauncher<F> {
where
C: ChainSpecParser,
F: AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> eyre::Result<()>,
{
@@ -77,13 +77,13 @@ where
C: ChainSpecParser,
Ext: clap::Args + fmt::Debug,
F: AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> eyre::Result<()>,
{
fn entrypoint(
self,
builder: WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
builder: WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
builder_args: Ext,
) -> impl Future<Output = eyre::Result<()>> {
(self.func)(builder, builder_args)

View File

@@ -206,7 +206,7 @@ where
let db_path = data_dir.db();
tracing::info!(target: "reth::cli", path = ?db_path, "Opening database");
let database = init_db(db_path.clone(), self.db.database_args())?.with_metrics();
let database = Arc::new(init_db(db_path.clone(), self.db.database_args())?.with_metrics());
if with_unused_ports {
node_config = node_config.with_unused_ports();

View File

@@ -1,10 +1,9 @@
//! Command that runs pruning.
//! Command that runs pruning without any limits.
use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
use clap::Parser;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_cli_util::cancellation::CancellationToken;
use reth_node_builder::common::metrics_hooks;
use reth_node_core::{args::MetricArgs, version::version_metadata};
use reth_node_metrics::{
@@ -12,14 +11,12 @@ use reth_node_metrics::{
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
#[cfg(all(unix, feature = "edge"))]
use reth_provider::RocksDBProviderFactory;
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
use std::sync::Arc;
use tracing::info;
/// Prunes according to the configuration
/// Prunes according to the configuration without any limits
#[derive(Debug, Parser)]
pub struct PruneCommand<C: ChainSpecParser> {
#[command(flatten)]
@@ -53,7 +50,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
build_profile: version_metadata().build_profile_name.as_ref(),
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor.clone(),
ctx.task_executor,
metrics_hooks(&provider_factory),
data_dir.pprof_dumps(),
);
@@ -72,66 +69,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
// Delete data which has been copied to static files.
if let Some(prune_tip) = lowest_static_file_height {
info!(target: "reth::cli", ?prune_tip, ?config, "Pruning data from database...");
// Set up cancellation token for graceful shutdown on Ctrl+C
let cancellation = CancellationToken::new();
let cancellation_clone = cancellation.clone();
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
cancellation_clone.cancel();
});
// Use batched pruning with a limit to bound memory, running in a loop until complete.
//
// A limit of 20_000_000 results in a max memory usage of ~5G.
const DELETE_LIMIT: usize = 20_000_000;
// Run the pruner according to the configuration, and don't enforce any limits on it
let mut pruner = PrunerBuilder::new(config)
.delete_limit(DELETE_LIMIT)
.build_with_provider_factory(provider_factory.clone());
.delete_limit(usize::MAX)
.build_with_provider_factory(provider_factory);
let mut total_pruned = 0usize;
loop {
if cancellation.is_cancelled() {
info!(target: "reth::cli", total_pruned, "Pruning interrupted by user");
break;
}
let output = pruner.run(prune_tip)?;
let batch_pruned: usize = output.segments.iter().map(|(_, seg)| seg.pruned).sum();
total_pruned = total_pruned.saturating_add(batch_pruned);
// Check if all segments are finished (not just the overall progress,
// since the pruner sets overall progress from the last segment only)
let all_segments_finished =
output.segments.iter().all(|(_, seg)| seg.progress.is_finished());
if all_segments_finished {
info!(target: "reth::cli", total_pruned, "Pruned data from database");
break;
}
if batch_pruned == 0 {
return Err(eyre::eyre!(
"pruner made no progress but reported more data remaining; \
aborting to prevent infinite loop"
));
}
info!(
target: "reth::cli",
batch_pruned,
total_pruned,
"Pruning batch complete, continuing..."
);
}
}
// Flush and compact RocksDB to reclaim disk space after pruning
#[cfg(all(unix, feature = "edge"))]
{
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
provider_factory.rocksdb_provider().flush_and_compact()?;
info!(target: "reth::cli", "RocksDB compaction complete");
pruner.run(prune_tip)?;
info!(target: "reth::cli", "Pruned data from database");
}
Ok(())

View File

@@ -26,7 +26,7 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
consensus: C,
) -> eyre::Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
C: FullConsensus<E::Primitives> + 'static,
{
@@ -39,7 +39,7 @@ where
if should_run {
dry_run(
ProviderFactory::<N>::new(
output_db,
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,

View File

@@ -10,9 +10,10 @@ use reth_provider::{
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
use std::sync::Arc;
use tracing::info;
pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = DatabaseEnv>>(
pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
db_tool: &DbTool<N>,
from: BlockNumber,
to: BlockNumber,
@@ -35,7 +36,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
if should_run {
dry_run(
ProviderFactory::<N>::new(
output_db,
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,

View File

@@ -9,9 +9,10 @@ use reth_provider::{
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
use std::sync::Arc;
use tracing::info;
pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = DatabaseEnv>>(
pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
db_tool: &DbTool<N>,
from: u64,
to: u64,
@@ -25,7 +26,7 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
if should_run {
dry_run(
ProviderFactory::<N>::new(
output_db,
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,

View File

@@ -34,7 +34,7 @@ pub(crate) async fn dump_merkle_stage<N>(
consensus: impl FullConsensus<N::Primitives> + 'static,
) -> Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
{
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
@@ -59,7 +59,7 @@ where
if should_run {
dry_run(
ProviderFactory::<N>::new(
output_db,
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,

View File

@@ -158,7 +158,7 @@ enum Subcommands {
impl Subcommands {
/// Returns the block to unwind to. The returned block will stay in database.
fn unwind_target<N: ProviderNodeTypes<DB = DatabaseEnv>>(
fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
&self,
factory: ProviderFactory<N>,
) -> eyre::Result<u64> {

View File

@@ -83,7 +83,22 @@ impl CliRunner {
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
// `drop(tokio_runtime)` would block the current thread until its pools
// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
// it on a separate thread and wait for up to 5 seconds for this operation to
// complete.
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || {
drop(tokio_runtime);
let _ = tx.send(());
})
.unwrap();
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
});
command_res
}
@@ -122,7 +137,19 @@ impl CliRunner {
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
// Shutdown the runtime on a separate thread
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || {
drop(tokio_runtime);
let _ = tx.send(());
})
.unwrap();
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
});
command_res
}
@@ -152,7 +179,13 @@ impl CliRunner {
tokio_runtime
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
tokio_shutdown(tokio_runtime, false);
// drop the tokio runtime on a separate thread because drop blocks until its pools
// (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
// the current thread but we want to exit right away.
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || drop(tokio_runtime))
.unwrap();
Ok(())
}
@@ -219,14 +252,7 @@ impl CliRunnerConfig {
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name("tokio-rt")
.build()
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the given future to completion or until a critical task panicked.
@@ -295,27 +321,3 @@ where
Ok(())
}
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
///
/// `drop(tokio_runtime)` would block the current thread until its pools
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
/// it on a separate thread and wait for up to 5 seconds for this operation to
/// complete.
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
// Shutdown the runtime on a separate thread
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-shutdown".to_string())
.spawn(move || {
drop(rt);
let _ = tx.send(());
})
.unwrap();
if wait {
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
});
}
}

View File

@@ -29,7 +29,7 @@ auto_impl.workspace = true
derive_more.workspace = true
futures.workspace = true
eyre.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = ["rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["time"] }
serde_json.workspace = true

View File

@@ -95,7 +95,7 @@ where
let block_hash = payload.block_hash();
let block_number = payload.block_number();
previous_block_hashes.enqueue(block_hash);
previous_block_hashes.push(block_hash);
// Send new events to execution client
let _ = self.engine_handle.new_payload(payload).await;
@@ -160,7 +160,7 @@ mod tests {
// Push hashes 0..65
for i in 0..65u8 {
buffer.enqueue(B256::with_last_byte(i));
buffer.push(B256::with_last_byte(i));
}
// offset=0 should return the most recent (64)
@@ -181,7 +181,7 @@ mod tests {
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
// With only 1 entry, only offset=0 works
buffer.enqueue(B256::with_last_byte(1));
buffer.push(B256::with_last_byte(1));
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 1), None);
assert_eq!(get_hash_at_offset(&buffer, 32), None);
@@ -189,7 +189,7 @@ mod tests {
// With 33 entries, offset=32 works but offset=64 doesn't
for i in 2..=33u8 {
buffer.enqueue(B256::with_last_byte(i));
buffer.push(B256::with_last_byte(i));
}
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 64), None);

View File

@@ -114,22 +114,22 @@ pub async fn setup_engine_with_chain_import(
// Initialize the database using init_db (same as CLI import command)
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
let db = reth_db::init_db(&db_path, db_args)?;
let db_env = reth_db::init_db(&db_path, db_args)?;
let db = Arc::new(db_env);
// Create a provider factory with the initialized database (use regular DB, not
// TempDatabase) We need to specify the node types properly for the adapter
let provider_factory =
ProviderFactory::<NodeTypesWithDBAdapter<EthereumNode, DatabaseEnv>>::new(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(
static_files_path.clone(),
)?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
)?;
let provider_factory = ProviderFactory::<
NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>,
>::new(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
)?;
// Initialize genesis if needed
reth_db_common::init::init_genesis(&provider_factory)?;
@@ -320,10 +320,11 @@ mod tests {
// Import the chain
{
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
let db = reth_db::init_db(&db_path, db_args).unwrap();
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
let db = Arc::new(db_env);
let provider_factory: ProviderFactory<
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
> = ProviderFactory::new(
db.clone(),
chain_spec.clone(),
@@ -384,10 +385,11 @@ mod tests {
// Now reopen the database and verify checkpoints are still there
{
let db = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
let db = Arc::new(db_env);
let provider_factory: ProviderFactory<
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, DatabaseEnv>,
NodeTypesWithDBAdapter<reth_node_ethereum::EthereumNode, Arc<DatabaseEnv>>,
> = ProviderFactory::new(
db,
chain_spec.clone(),

View File

@@ -528,12 +528,8 @@ impl TreeConfig {
}
/// Setter for the number of storage proof worker threads.
///
/// No-op if it's [`None`].
pub fn with_storage_worker_count_opt(mut self, storage_worker_count: Option<usize>) -> Self {
if let Some(count) = storage_worker_count {
self.storage_worker_count = count.max(MIN_WORKER_COUNT);
}
pub fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
self.storage_worker_count = storage_worker_count.max(MIN_WORKER_COUNT);
self
}
@@ -543,12 +539,8 @@ impl TreeConfig {
}
/// Setter for the number of account proof worker threads.
///
/// No-op if it's [`None`].
pub fn with_account_worker_count_opt(mut self, account_worker_count: Option<usize>) -> Self {
if let Some(count) = account_worker_count {
self.account_worker_count = count.max(MIN_WORKER_COUNT);
}
pub fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
self
}

View File

@@ -17,6 +17,7 @@ reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true

View File

@@ -14,6 +14,7 @@ pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent,
};
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_node_types::{BlockTy, NodeTypes};
@@ -96,7 +97,7 @@ where
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

View File

@@ -23,7 +23,7 @@ reth-evm = { workspace = true, features = ["metrics"] }
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-primitives-traits = { workspace = true, features = ["rayon", "dashmap"] }
reth-primitives-traits.workspace = true
reth-ethereum-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
@@ -62,18 +62,19 @@ metrics.workspace = true
reth-metrics = { workspace = true, features = ["common"] }
# misc
dashmap.workspace = true
schnellru.workspace = true
rayon.workspace = true
tracing.workspace = true
derive_more.workspace = true
parking_lot.workspace = true
crossbeam-channel.workspace = true
reth-tracing.workspace = true
# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
reth-stages = { workspace = true, optional = true }
reth-static-file = { workspace = true, optional = true }
reth-tracing = { workspace = true, optional = true }
[dev-dependencies]
# reth
@@ -132,6 +133,7 @@ test-utils = [
"reth-stages-api/test-utils",
"reth-stages/test-utils",
"reth-static-file",
"reth-tracing",
"reth-trie/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",

View File

@@ -2,7 +2,7 @@
use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
use alloy_consensus::BlockHeader;
use alloy_primitives::{map::B256Set, B256};
use alloy_primitives::B256;
use futures::FutureExt;
use reth_consensus::Consensus;
use reth_network_p2p::{
@@ -12,7 +12,7 @@ use reth_network_p2p::{
use reth_primitives_traits::{Block, SealedBlock};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, VecDeque},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
fmt::Debug,
sync::Arc,
task::{Context, Poll},
@@ -109,7 +109,7 @@ where
}
/// Processes a block set download request.
fn download_block_set(&mut self, hashes: B256Set) {
fn download_block_set(&mut self, hashes: HashSet<B256>) {
for hash in hashes {
self.download_full_block(hash);
}
@@ -397,7 +397,7 @@ mod tests {
// send block set download request
block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
B256Set::from_iter([tip.hash(), tip.parent_hash]),
HashSet::from([tip.hash(), tip.parent_hash]),
)));
// ensure we have TOTAL_BLOCKS in flight full block request
@@ -440,7 +440,7 @@ mod tests {
)));
// send block set download request
let download_set = B256Set::from_iter([tip.hash(), tip.parent_hash]);
let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
block_downloader
.on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));

View File

@@ -5,7 +5,7 @@ use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use alloy_primitives::{map::B256Set, B256};
use alloy_primitives::B256;
use crossbeam_channel::Sender;
use futures::{Stream, StreamExt};
use reth_chain_state::ExecutedBlock;
@@ -14,6 +14,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_payload_primitives::PayloadTypes;
use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
use std::{
collections::HashSet,
fmt::Display,
task::{ready, Context, Poll},
};
@@ -340,7 +341,7 @@ pub enum RequestHandlerEvent<T> {
#[derive(Debug)]
pub enum DownloadRequest {
/// Download the given set of blocks.
BlockSet(B256Set),
BlockSet(HashSet<B256>),
/// Download the given range of blocks.
BlockRange(B256, u64),
}
@@ -348,6 +349,6 @@ pub enum DownloadRequest {
impl DownloadRequest {
/// Returns a [`DownloadRequest`] for a single block.
pub fn single_block(hash: B256) -> Self {
Self::BlockSet(B256Set::from_iter([hash]))
Self::BlockSet(HashSet::from([hash]))
}
}

View File

@@ -20,7 +20,7 @@ pub(crate) struct PersistenceMetrics {
/// How long it took for blocks to be saved
pub(crate) save_blocks_duration_seconds: Histogram,
/// How many blocks we persist at once.
pub(crate) save_blocks_batch_size: Histogram,
pub(crate) save_blocks_block_count: Histogram,
/// How long it took for blocks to be pruned
pub(crate) prune_before_duration_seconds: Histogram,
}

View File

@@ -11,13 +11,8 @@ use reth_provider::{
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_tasks::spawn_os_thread;
use std::{
sync::{
mpsc::{Receiver, SendError, Sender},
Arc,
},
thread::JoinHandle,
sync::mpsc::{Receiver, SendError, Sender},
time::Instant,
};
use thiserror::Error;
@@ -45,12 +40,6 @@ where
metrics: PersistenceMetrics,
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
sync_metrics_tx: MetricEventsSender,
/// Pending finalized block number to be committed with the next block save.
/// This avoids triggering a separate fsync for each finalized block update.
pending_finalized_block: Option<u64>,
/// Pending safe block number to be committed with the next block save.
/// This avoids triggering a separate fsync for each safe block update.
pending_safe_block: Option<u64>,
}
impl<N> PersistenceService<N>
@@ -64,15 +53,7 @@ where
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
) -> Self {
Self {
provider,
incoming,
pruner,
metrics: PersistenceMetrics::default(),
sync_metrics_tx,
pending_finalized_block: None,
pending_safe_block: None,
}
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
}
/// Prunes block data before the given block number according to the configured prune
@@ -125,10 +106,14 @@ where
}
}
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
self.pending_finalized_block = Some(finalized_block);
let provider = self.provider.database_provider_rw()?;
provider.save_finalized_block_number(finalized_block)?;
provider.commit()?;
}
PersistenceAction::SaveSafeBlock(safe_block) => {
self.pending_safe_block = Some(safe_block);
let provider = self.provider.database_provider_rw()?;
provider.save_safe_block_number(safe_block)?;
provider.commit()?;
}
}
}
@@ -153,39 +138,26 @@ where
}
fn on_save_blocks(
&mut self,
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
let block_count = blocks.len();
// Take any pending finalized/safe block updates to commit together
let pending_finalized = self.pending_finalized_block.take();
let pending_safe = self.pending_safe_block.take();
debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
let start_time = Instant::now();
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
// Commit pending finalized/safe block updates in the same transaction
if let Some(finalized) = pending_finalized {
provider_rw.save_finalized_block_number(finalized)?;
}
if let Some(safe) = pending_safe {
provider_rw.save_safe_block_number(safe)?;
}
provider_rw.commit()?;
}
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
self.metrics.save_blocks_batch_size.record(block_count as f64);
self.metrics.save_blocks_block_count.record(block_count as f64);
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block)
@@ -232,25 +204,15 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction<N>>,
/// Guard that joins the service thread when all handles are dropped.
/// Uses `Arc` so the handle remains `Clone`.
_service_guard: Arc<ServiceGuard>,
}
impl<T: NodePrimitives> PersistenceHandle<T> {
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
///
/// This is intended for testing purposes where you want to mock the persistence service.
/// For production use, prefer [`spawn_service`](Self::spawn_service).
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender }
}
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
///
/// The returned handle can be cloned and shared. When all clones are dropped, the service
/// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are
/// released.
pub fn spawn_service<N>(
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
@@ -262,19 +224,22 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
// construct persistence handle
let persistence_handle = PersistenceHandle::new(db_service_tx);
// spawn the persistence service
let db_service =
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
let join_handle = spawn_os_thread("persistence", || {
if let Err(err) = db_service.run() {
error!(target: "engine::persistence", ?err, "Persistence service failed");
}
});
std::thread::Builder::new()
.name("Persistence Service".to_string())
.spawn(|| {
if let Err(err) = db_service.run() {
error!(target: "engine::persistence", ?err, "Persistence service failed");
}
})
.unwrap();
PersistenceHandle {
sender: db_service_tx,
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
}
persistence_handle
}
/// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
@@ -302,10 +267,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
/// Queues the finalized block number to be persisted on disk.
///
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
/// call to avoid triggering a separate fsync for each update.
/// Persists the finalized block number on disk.
pub fn save_finalized_block_number(
&self,
finalized_block: u64,
@@ -313,10 +275,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
}
/// Queues the safe block number to be persisted on disk.
///
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
/// call to avoid triggering a separate fsync for each update.
/// Persists the safe block number on disk.
pub fn save_safe_block_number(
&self,
safe_block: u64,
@@ -338,27 +297,6 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
}
}
/// Guard that joins the persistence service thread when dropped.
///
/// This ensures graceful shutdown - the service thread completes before resources like
/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle
/// can be cloned while sharing the same guard.
struct ServiceGuard(Option<JoinHandle<()>>);
impl std::fmt::Debug for ServiceGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
}
}
impl Drop for ServiceGuard {
fn drop(&mut self) {
if let Some(join_handle) = self.0.take() {
let _ = join_handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -385,12 +323,12 @@ mod tests {
#[test]
fn test_save_blocks_empty() {
reth_tracing::init_test_tracing();
let handle = default_persistence_handle();
let persistence_handle = default_persistence_handle();
let blocks = vec![];
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
persistence_handle.save_blocks(blocks, tx).unwrap();
let hash = rx.recv().unwrap();
assert_eq!(hash, None);
@@ -399,7 +337,7 @@ mod tests {
#[test]
fn test_save_blocks_single_block() {
reth_tracing::init_test_tracing();
let handle = default_persistence_handle();
let persistence_handle = default_persistence_handle();
let block_number = 0;
let mut test_block_builder = TestBlockBuilder::eth();
let executed =
@@ -409,7 +347,7 @@ mod tests {
let blocks = vec![executed];
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx
.recv_timeout(std::time::Duration::from_secs(10))
@@ -422,14 +360,14 @@ mod tests {
#[test]
fn test_save_blocks_multiple_blocks() {
reth_tracing::init_test_tracing();
let handle = default_persistence_handle();
let persistence_handle = default_persistence_handle();
let mut test_block_builder = TestBlockBuilder::eth();
let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
@@ -437,7 +375,7 @@ mod tests {
#[test]
fn test_save_blocks_multiple_calls() {
reth_tracing::init_test_tracing();
let handle = default_persistence_handle();
let persistence_handle = default_persistence_handle();
let ranges = [0..1, 1..2, 2..4, 4..5];
let mut test_block_builder = TestBlockBuilder::eth();
@@ -446,7 +384,7 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);

View File

@@ -76,8 +76,7 @@ impl CacheConfig for EpochCacheConfig {
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
/// A wrapper of a state provider and a shared cache.
#[derive(Debug)]
pub struct CachedStateProvider<S> {
pub(crate) struct CachedStateProvider<S> {
/// The state provider
state_provider: S,
@@ -97,7 +96,7 @@ where
{
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub const fn new(
pub(crate) const fn new(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
@@ -115,7 +114,7 @@ impl<S> CachedStateProvider<S> {
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub const fn prewarm(mut self) -> Self {
pub(crate) const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
@@ -132,7 +131,7 @@ impl<S> CachedStateProvider<S> {
/// and the fixed-cache internal stats (collisions, size, capacity).
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
pub struct CachedStateMetrics {
pub(crate) struct CachedStateMetrics {
/// Number of times a new execution cache was created
execution_cache_created_total: Counter,
@@ -187,7 +186,7 @@ pub struct CachedStateMetrics {
impl CachedStateMetrics {
/// Sets all values to zero, indicating that a new block is being executed.
pub fn reset(&self) {
pub(crate) fn reset(&self) {
// code cache
self.code_cache_hits.set(0);
self.code_cache_misses.set(0);
@@ -205,7 +204,7 @@ impl CachedStateMetrics {
}
/// Returns a new zeroed-out instance of [`CachedStateMetrics`].
pub fn zeroed() -> Self {
pub(crate) fn zeroed() -> Self {
let zeroed = Self::default();
zeroed.reset();
zeroed
@@ -327,7 +326,7 @@ impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
/// Represents the status of a key in the cache.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CachedStatus<T> {
pub(crate) enum CachedStatus<T> {
/// The key is not in the cache (or was invalidated). The value was recalculated.
NotCached(T),
/// The key exists in cache and has a specific value.
@@ -488,7 +487,7 @@ impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider
/// Since EIP-6780, SELFDESTRUCT only works within the same transaction where the
/// contract was created, so we don't need to handle clearing the storage.
#[derive(Debug, Clone)]
pub struct ExecutionCache {
pub(crate) struct ExecutionCache {
/// Cache for contract bytecode, keyed by code hash.
code_cache: Arc<FixedCache<B256, Option<Bytecode>, FbBuildHasher<32>>>,
@@ -520,7 +519,7 @@ impl ExecutionCache {
///
/// Fixed-cache requires power-of-two sizes for efficient indexing.
/// With epochs enabled, the minimum size is 4096 entries.
pub const fn bytes_to_entries(size_bytes: usize, entry_size: usize) -> usize {
pub(crate) const fn bytes_to_entries(size_bytes: usize, entry_size: usize) -> usize {
let entries = size_bytes / entry_size;
// Round down to nearest power of two
let rounded = if entries == 0 { 1 } else { (entries + 1).next_power_of_two() >> 1 };
@@ -533,10 +532,10 @@ impl ExecutionCache {
}
/// Build an [`ExecutionCache`] struct, so that execution caches can be easily cloned.
pub fn new(total_cache_size: usize) -> Self {
let code_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
pub(crate) fn new(total_cache_size: usize) -> Self {
let storage_cache_size = (total_cache_size * 8888) / 10000; // 88.88% of total
let account_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
let code_cache_size = (total_cache_size * 556) / 10000; // 5.56% of total
let code_capacity = Self::bytes_to_entries(code_cache_size, CODE_CACHE_ENTRY_SIZE);
let storage_capacity = Self::bytes_to_entries(storage_cache_size, STORAGE_CACHE_ENTRY_SIZE);
@@ -567,7 +566,7 @@ impl ExecutionCache {
}
/// Gets code from cache, or inserts using the provided function.
pub fn get_or_try_insert_code_with<E>(
pub(crate) fn get_or_try_insert_code_with<E>(
&self,
hash: B256,
f: impl FnOnce() -> Result<Option<Bytecode>, E>,
@@ -586,7 +585,7 @@ impl ExecutionCache {
}
/// Gets storage from cache, or inserts using the provided function.
pub fn get_or_try_insert_storage_with<E>(
pub(crate) fn get_or_try_insert_storage_with<E>(
&self,
address: Address,
key: StorageKey,
@@ -606,7 +605,7 @@ impl ExecutionCache {
}
/// Gets account from cache, or inserts using the provided function.
pub fn get_or_try_insert_account_with<E>(
pub(crate) fn get_or_try_insert_account_with<E>(
&self,
address: Address,
f: impl FnOnce() -> Result<Option<Account>, E>,
@@ -625,7 +624,12 @@ impl ExecutionCache {
}
/// Insert storage value into cache.
pub fn insert_storage(&self, address: Address, key: StorageKey, value: Option<StorageValue>) {
pub(crate) fn insert_storage(
&self,
address: Address,
key: StorageKey,
value: Option<StorageValue>,
) {
self.storage_cache.insert((address, key), value.unwrap_or_default());
}
@@ -658,8 +662,7 @@ impl ExecutionCache {
///
/// Returns an error if the state updates are inconsistent and should be discarded.
#[instrument(level = "debug", target = "engine::caching", skip_all)]
#[expect(clippy::result_unit_err)]
pub fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
let _enter =
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
.entered();
@@ -768,7 +771,7 @@ impl ExecutionCache {
/// A saved cache that has been used for executing a specific block, which has been updated for its
/// execution.
#[derive(Debug, Clone)]
pub struct SavedCache {
pub(crate) struct SavedCache {
/// The hash of the block these caches were used to execute.
hash: B256,
@@ -788,43 +791,43 @@ pub struct SavedCache {
impl SavedCache {
/// Creates a new instance with the internals
pub fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
pub(super) fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
Self { hash, caches, metrics, usage_guard: Arc::new(()), disable_cache_metrics: false }
}
/// Sets whether to disable cache metrics recording.
pub const fn with_disable_cache_metrics(mut self, disable: bool) -> Self {
pub(super) const fn with_disable_cache_metrics(mut self, disable: bool) -> Self {
self.disable_cache_metrics = disable;
self
}
/// Returns the hash for this cache
pub const fn executed_block_hash(&self) -> B256 {
pub(crate) const fn executed_block_hash(&self) -> B256 {
self.hash
}
/// Splits the cache into its caches, metrics, and `disable_cache_metrics` flag, consuming it.
pub fn split(self) -> (ExecutionCache, CachedStateMetrics, bool) {
pub(crate) fn split(self) -> (ExecutionCache, CachedStateMetrics, bool) {
(self.caches, self.metrics, self.disable_cache_metrics)
}
/// Returns true if the cache is available for use (no other tasks are currently using it).
pub fn is_available(&self) -> bool {
pub(crate) fn is_available(&self) -> bool {
Arc::strong_count(&self.usage_guard) == 1
}
/// Returns the current strong count of the usage guard.
pub fn usage_count(&self) -> usize {
pub(crate) fn usage_count(&self) -> usize {
Arc::strong_count(&self.usage_guard)
}
/// Returns the [`ExecutionCache`] belonging to the tracked hash.
pub const fn cache(&self) -> &ExecutionCache {
pub(crate) const fn cache(&self) -> &ExecutionCache {
&self.caches
}
/// Returns the metrics associated with this cache.
pub const fn metrics(&self) -> &CachedStateMetrics {
pub(crate) const fn metrics(&self) -> &CachedStateMetrics {
&self.metrics
}

View File

@@ -13,13 +13,13 @@ use std::time::{Duration, Instant};
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub struct EngineApiMetrics {
pub(crate) struct EngineApiMetrics {
/// Engine API-specific metrics.
pub engine: EngineMetrics,
pub(crate) engine: EngineMetrics,
/// Block executor metrics.
pub executor: ExecutorMetrics,
pub(crate) executor: ExecutorMetrics,
/// Metrics for block validation
pub block_validation: BlockValidationMetrics,
pub(crate) block_validation: BlockValidationMetrics,
/// Canonical chain and reorg related metrics
pub tree: TreeMetrics,
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
@@ -32,7 +32,7 @@ impl EngineApiMetrics {
///
/// This method updates metrics for execution time, gas usage, and the number
/// of accounts, storage slots and bytecodes updated.
pub fn record_block_execution<R>(
pub(crate) fn record_block_execution<R>(
&self,
output: &BlockExecutionOutput<R>,
execution_duration: Duration,
@@ -59,27 +59,27 @@ impl EngineApiMetrics {
}
/// Returns a reference to the executor metrics for use in state hooks.
pub const fn executor_metrics(&self) -> &ExecutorMetrics {
pub(crate) const fn executor_metrics(&self) -> &ExecutorMetrics {
&self.executor
}
/// Records the duration of block pre-execution changes (e.g., beacon root update).
pub fn record_pre_execution(&self, elapsed: Duration) {
pub(crate) fn record_pre_execution(&self, elapsed: Duration) {
self.executor.pre_execution_histogram.record(elapsed);
}
/// Records the duration of block post-execution changes (e.g., finalization).
pub fn record_post_execution(&self, elapsed: Duration) {
pub(crate) fn record_post_execution(&self, elapsed: Duration) {
self.executor.post_execution_histogram.record(elapsed);
}
/// Records the time spent waiting for the next transaction from the iterator.
pub fn record_transaction_wait(&self, elapsed: Duration) {
pub(crate) fn record_transaction_wait(&self, elapsed: Duration) {
self.executor.transaction_wait_histogram.record(elapsed);
}
/// Records the duration of a single transaction execution.
pub fn record_transaction_execution(&self, elapsed: Duration) {
pub(crate) fn record_transaction_execution(&self, elapsed: Duration) {
self.executor.transaction_execution_histogram.record(elapsed);
}
}
@@ -87,7 +87,7 @@ impl EngineApiMetrics {
/// Metrics for the entire blockchain tree
#[derive(Metrics)]
#[metrics(scope = "blockchain_tree")]
pub struct TreeMetrics {
pub(crate) struct TreeMetrics {
/// The highest block number in the canonical chain
pub canonical_chain_height: Gauge,
/// The number of reorgs
@@ -103,7 +103,7 @@ pub struct TreeMetrics {
/// Metrics for the `EngineApi`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub struct EngineMetrics {
pub(crate) struct EngineMetrics {
/// Engine API forkchoiceUpdated response type metrics
#[metric(skip)]
pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
@@ -336,42 +336,42 @@ pub(crate) struct BalMetrics {
/// Metrics for non-execution related block validation.
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.block_validation")]
pub struct BlockValidationMetrics {
pub(crate) struct BlockValidationMetrics {
/// Total number of storage tries updated in the state root calculation
pub state_root_storage_tries_updated_total: Counter,
pub(crate) state_root_storage_tries_updated_total: Counter,
/// Total number of times the parallel state root computation fell back to regular.
pub state_root_parallel_fallback_total: Counter,
pub(crate) state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub state_root_task_fallback_success_total: Counter,
pub(crate) state_root_task_fallback_success_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub state_root_duration: Gauge,
pub(crate) state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root
pub state_root_histogram: Histogram,
pub(crate) state_root_histogram: Histogram,
/// Histogram of deferred trie computation duration.
pub deferred_trie_compute_duration: Histogram,
pub(crate) deferred_trie_compute_duration: Histogram,
/// Payload conversion and validation latency
pub payload_validation_duration: Gauge,
pub(crate) payload_validation_duration: Gauge,
/// Histogram of payload validation latency
pub payload_validation_histogram: Histogram,
pub(crate) payload_validation_histogram: Histogram,
/// Payload processor spawning duration
pub spawn_payload_processor: Histogram,
pub(crate) spawn_payload_processor: Histogram,
/// Post-execution validation duration
pub post_execution_validation_duration: Histogram,
pub(crate) post_execution_validation_duration: Histogram,
/// Total duration of the new payload call
pub total_duration: Histogram,
pub(crate) total_duration: Histogram,
/// Size of `HashedPostStateSorted` (`total_len`)
pub hashed_post_state_size: Histogram,
pub(crate) hashed_post_state_size: Histogram,
/// Size of `TrieUpdatesSorted` (`total_len`)
pub trie_updates_sorted_size: Histogram,
pub(crate) trie_updates_sorted_size: Histogram,
/// Size of `AnchoredTrieInput` overlay `TrieUpdatesSorted` (`total_len`)
pub anchored_overlay_trie_updates_size: Histogram,
pub(crate) anchored_overlay_trie_updates_size: Histogram,
/// Size of `AnchoredTrieInput` overlay `HashedPostStateSorted` (`total_len`)
pub anchored_overlay_hashed_state_size: Histogram,
pub(crate) anchored_overlay_hashed_state_size: Histogram,
}
impl BlockValidationMetrics {
/// Records a new state root time, updating both the histogram and state root gauge
pub fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
self.state_root_storage_tries_updated_total
.increment(trie_output.storage_tries_ref().len() as u64);
self.state_root_duration.set(elapsed_as_secs);
@@ -380,7 +380,7 @@ impl BlockValidationMetrics {
/// Records a new payload validation time, updating both the histogram and the payload
/// validation gauge
pub fn record_payload_validation(&self, elapsed_as_secs: f64) {
pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
self.payload_validation_duration.set(elapsed_as_secs);
self.payload_validation_histogram.record(elapsed_as_secs);
}

View File

@@ -3,7 +3,7 @@ use crate::{
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
persistence::PersistenceHandle,
tree::{error::InsertPayloadError, payload_validator::TreeCtx},
tree::{error::InsertPayloadError, metrics::EngineApiMetrics, payload_validator::TreeCtx},
};
use alloy_consensus::BlockHeader;
use alloy_eips::{eip1898::BlockWithParent, merge::EPOCH_SLOTS, BlockNumHash, NumHash};
@@ -37,7 +37,6 @@ use reth_provider::{
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_tasks::spawn_os_thread;
use reth_trie_db::ChangesetCache;
use revm::state::EvmState;
use state::TreeState;
@@ -56,19 +55,18 @@ pub mod error;
pub mod instrumented_state;
mod invalid_headers;
mod metrics;
pub mod payload_processor;
mod payload_processor;
pub mod payload_validator;
mod persistence_state;
pub mod precompile_cache;
#[cfg(test)]
mod tests;
#[expect(unused)]
mod trie_updates;
use crate::tree::error::AdvancePersistenceError;
pub use block_buffer::BlockBuffer;
pub use cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache};
pub use invalid_headers::InvalidHeaderCache;
pub use metrics::EngineApiMetrics;
pub use payload_processor::*;
pub use payload_validator::{BasicEngineValidator, EngineValidator};
pub use persistence_state::PersistenceState;
@@ -160,16 +158,6 @@ impl<N: NodePrimitives> EngineApiTreeState<N> {
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
}
}
/// Returns a reference to the tree state.
pub const fn tree_state(&self) -> &TreeState<N> {
&self.tree_state
}
/// Returns true if the block has been marked as invalid.
pub fn has_invalid_header(&mut self, hash: &B256) -> bool {
self.invalid_headers.get(hash).is_some()
}
}
/// The outcome of a tree operation.
@@ -432,7 +420,7 @@ where
changeset_cache,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || task.run());
std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
(incoming, outgoing)
}
@@ -581,9 +569,6 @@ where
&mut self,
payload: T::ExecutionData,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
let _trace_guard =
reth_tracing::runtime::maybe_trace_newpayload_block(payload.block_number());
trace!(target: "engine::tree", "invoked new payload");
// start timing for the new payload process
@@ -972,13 +957,14 @@ where
&self,
canonical_header: &SealedHeader<N::BlockHeader>,
) -> ProviderResult<()> {
// Load the block into memory if it's not already present
self.ensure_block_in_memory(canonical_header.number(), canonical_header.hash())?;
let new_head_number = canonical_header.number();
let new_head_hash = canonical_header.hash();
// Update the canonical head header
self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
Ok(())
// Load the block into memory if it's not already present
self.ensure_block_in_memory(new_head_number, new_head_hash)
}
/// Ensures a block is loaded into memory if not already present.
@@ -2615,27 +2601,19 @@ where
let block_num_hash = block_id.block;
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
// Check if block already exists - first in memory, then DB only if it could be persisted
if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
convert_to_block(self, input)?;
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
}
// Only query DB if block could be persisted (number <= last persisted block).
// New blocks from CL always have number > last persisted, so skip DB lookup for them.
if block_num_hash.number <= self.persistence_state.last_persisted_block.number {
match self.provider.sealed_header_by_hash(block_num_hash.hash) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
}
Ok(Some(_)) => {
convert_to_block(self, input)?;
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
}
Ok(None) => {}
match self.sealed_header_by_hash(block_num_hash.hash) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
}
}
Ok(Some(_)) => {
// We now assume that we already have this block in the tree. However, we need to
// run the conversion to ensure that the block hash is valid.
convert_to_block(self, input)?;
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
}
_ => {}
};
// Ensure that the parent state is available.
match self.state_provider_builder(block_id.parent) {

View File

@@ -21,7 +21,7 @@ pub fn total_slots(bal: &BlockAccessList) -> usize {
/// first, followed by read-only slots. The iterator intelligently skips accounts and slots
/// outside the specified range for efficient traversal.
#[derive(Debug)]
pub struct BALSlotIter<'a> {
pub(crate) struct BALSlotIter<'a> {
bal: &'a BlockAccessList,
range: Range<usize>,
current_index: usize,
@@ -34,7 +34,7 @@ pub struct BALSlotIter<'a> {
impl<'a> BALSlotIter<'a> {
/// Creates a new iterator over storage slots within the specified range.
pub fn new(bal: &'a BlockAccessList, range: Range<usize>) -> Self {
pub(crate) fn new(bal: &'a BlockAccessList, range: Range<usize>) -> Self {
let mut iter = Self { bal, range, current_index: 0, account_idx: 0, slot_idx: 0 };
iter.skip_to_range_start();
iter

View File

@@ -1,7 +1,10 @@
//! Executor for mixed I/O and CPU workloads.
use reth_trie_parallel::root::get_tokio_runtime_handle;
use tokio::{runtime::Handle, task::JoinHandle};
use std::{sync::OnceLock, time::Duration};
use tokio::{
runtime::{Builder, Handle, Runtime},
task::JoinHandle,
};
/// An executor for mixed I/O and CPU workloads.
///
@@ -24,7 +27,7 @@ impl WorkloadExecutor {
&self.inner.handle
}
/// Runs the provided function on an executor dedicated to blocking operations.
/// Shorthand for [`Runtime::spawn_blocking`]
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
@@ -42,6 +45,29 @@ struct WorkloadExecutorInner {
impl WorkloadExecutorInner {
fn new() -> Self {
Self { handle: get_tokio_runtime_handle() }
fn get_runtime_handle() -> Handle {
Handle::try_current().unwrap_or_else(|_| {
// Create a new runtime if no runtime is available
static RT: OnceLock<Runtime> = OnceLock::new();
let rt = RT.get_or_init(|| {
Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time, which is 12 seconds
// at the time of writing, plus a little extra.
//
// This is to prevent the costly process of spawning new threads on every
// new block, and instead reuse the existing
// threads.
.thread_keep_alive(Duration::from_secs(15))
.build()
.unwrap()
});
rt.handle().clone()
})
}
Self { handle: get_runtime_handle() }
}
}

View File

@@ -16,7 +16,7 @@ use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::{Counter, Histogram};
use metrics::Counter;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
@@ -145,7 +145,7 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// Returns a reference to the workload executor driving payload tasks.
pub const fn executor(&self) -> &WorkloadExecutor {
pub(super) const fn executor(&self) -> &WorkloadExecutor {
&self.executor
}
@@ -235,7 +235,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx, transaction_count_hint) =
self.spawn_tx_iterator(transactions);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -259,6 +260,7 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
@@ -269,6 +271,7 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
@@ -342,7 +345,7 @@ where
///
/// Returns a [`PayloadHandle`] to communicate with the task.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
&self,
env: ExecutionEnv<Evm>,
transactions: I,
@@ -352,10 +355,10 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -373,15 +376,19 @@ where
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_par_iter();
let transaction_count_hint = transactions.len();
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
self.executor.spawn_blocking(move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
@@ -417,14 +424,16 @@ where
}
});
(prewarm_rx, execute_rx)
(prewarm_rx, execute_rx, transaction_count_hint)
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[expect(clippy::too_many_arguments)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -459,6 +468,7 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
transaction_count_hint,
self.prewarm_max_concurrency,
);
@@ -513,27 +523,20 @@ where
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor", "sparse_trie_task")
.entered();
let _enter = span.entered();
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let start = Instant::now();
let preserved = preserved_sparse_trie.take();
trie_metrics
.sparse_trie_cache_wait_duration_histogram
.record(start.elapsed().as_secs_f64());
let sparse_state_trie = preserved
let sparse_state_trie = preserved_sparse_trie
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
@@ -559,16 +562,18 @@ where
sparse_state_trie,
))
} else {
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new(
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie,
chunk_size,
))
};
let result = task.run();
if let Err(e) = &result {
tracing::error!(target: "engine::tree::payload_processor", "State root computation failed: {e:?}");
}
// Capture the computed state_root before sending the result
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
@@ -587,14 +592,11 @@ where
target: "engine::tree::payload_processor",
"State root receiver dropped, clearing trie"
);
let (trie, deferred) = task.into_cleared_trie(
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
// Drop guard before deferred to release lock before expensive deallocations
drop(guard);
drop(deferred);
return;
}
@@ -602,9 +604,9 @@ where
// A failed computation may have left the trie in a partially updated state.
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
let deferred = if let Some(state_root) = computed_state_root {
if let Some(state_root) = computed_state_root {
let start = std::time::Instant::now();
let (trie, deferred) = task.into_trie_for_reuse(
let trie = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
@@ -614,22 +616,17 @@ where
.into_trie_for_reuse_duration_histogram
.record(start.elapsed().as_secs_f64());
guard.store(PreservedSparseTrie::anchored(trie, state_root));
deferred
} else {
debug!(
target: "engine::tree::payload_processor",
"State root computation failed, clearing trie"
);
let (trie, deferred) = task.into_cleared_trie(
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
deferred
};
// Drop guard before deferred to release lock before expensive deallocations
drop(guard);
drop(deferred);
}
});
}
@@ -640,7 +637,7 @@ where
///
/// The cache enables subsequent blocks to reuse account, storage, and bytecode data without
/// hitting the database, maintaining performance consistency.
pub fn on_inserted_executed_block(
pub(crate) fn on_inserted_executed_block(
&self,
block_with_parent: BlockWithParent,
bundle_state: &BundleState,
@@ -737,19 +734,19 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
}
/// Returns a clone of the caches used by prewarming
pub fn caches(&self) -> Option<ExecutionCache> {
pub(super) fn caches(&self) -> Option<ExecutionCache> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
}
/// Returns a clone of the cache metrics used by prewarming
pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
}
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
pub fn stop_prewarming_execution(&self) {
pub(super) fn stop_prewarming_execution(&self) {
self.prewarm_handle.stop_prewarming_execution()
}
@@ -760,7 +757,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// path without cloning the expensive `BundleState`.
///
/// Returns a sender for the channel that should be notified on block validation success.
pub fn terminate_caching(
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
) -> Option<mpsc::Sender<()>> {
@@ -780,7 +777,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// prewarm task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub struct CacheTaskHandle<R> {
pub(crate) struct CacheTaskHandle<R> {
/// The shared cache the task operates with.
saved_cache: Option<SavedCache>,
/// Channel to the spawned prewarm task if any
@@ -791,7 +788,7 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
pub fn stop_prewarming_execution(&self) {
pub(super) fn stop_prewarming_execution(&self) {
self.to_prewarm_task
.as_ref()
.map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
@@ -802,7 +799,7 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
#[must_use = "sender must be used and notified on block validation success"]
pub fn terminate_caching(
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
) -> Option<mpsc::Sender<()>> {
@@ -856,7 +853,7 @@ impl<R> Drop for CacheTaskHandle<R> {
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
pub struct PayloadExecutionCache {
struct PayloadExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
@@ -875,7 +872,6 @@ impl PayloadExecutionCache {
let cache = self.inner.read();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
if elapsed.as_millis() > 5 {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
@@ -937,7 +933,7 @@ impl PayloadExecutionCache {
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
pub fn update_with_guard<F>(&self, update_fn: F)
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
{
@@ -953,8 +949,6 @@ pub(crate) struct ExecutionCacheMetrics {
/// Counter for when the execution cache was unavailable because other threads
/// (e.g., prewarming) are still using it.
pub(crate) execution_cache_in_use: Counter,
/// Time spent waiting for execution cache mutex to become available.
pub(crate) execution_cache_wait_duration: Histogram,
}
/// EVM context required to execute a block.
@@ -970,10 +964,6 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
/// Number of transactions in the block.
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -986,7 +976,6 @@ where
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
}
}
}

View File

@@ -22,7 +22,7 @@ use reth_trie_parallel::{
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
ProofWorkerHandle,
},
targets_v2::MultiProofTargetsV2,
targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
@@ -63,7 +63,7 @@ const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
/// The default max targets, for limiting the number of account and storage proof targets to be
/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
@@ -100,7 +100,7 @@ impl SparseTrieUpdate {
/// Messages used internally by the multi proof task.
#[derive(Debug)]
pub enum MultiProofMessage {
pub(super) enum MultiProofMessage {
/// Prefetch proof targets
PrefetchProofs(VersionedMultiProofTargets),
/// New state update from transaction execution with its source
@@ -257,7 +257,7 @@ fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiP
/// A set of multiproof targets which can be either in the legacy or V2 representations.
#[derive(Debug)]
pub enum VersionedMultiProofTargets {
pub(super) enum VersionedMultiProofTargets {
/// Legacy targets
Legacy(MultiProofTargets),
/// V2 targets
@@ -363,7 +363,9 @@ impl VersionedMultiProofTargets {
Self::Legacy(targets) => {
Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
}
Self::V2(targets) => Box::new(targets.chunks(chunk_size).map(Self::V2)),
Self::V2(targets) => {
Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2))
}
}
}
}
@@ -583,8 +585,6 @@ pub(crate) struct MultiProofTaskMetrics {
pub last_proof_wait_time_histogram: Histogram,
/// Time spent preparing the sparse trie for reuse after state root computation.
pub into_trie_for_reuse_duration_histogram: Histogram,
/// Time spent waiting for preserved sparse trie cache to become available.
pub sparse_trie_cache_wait_duration_histogram: Histogram,
}
/// Standalone task that receives a transaction state stream and updates relevant

View File

@@ -49,8 +49,7 @@ use std::{
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
pub(super) enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
@@ -70,8 +69,7 @@ struct IndexedTransaction<Tx> {
/// individually in parallel.
///
/// Note: This task runs until cancelled externally.
#[derive(Debug)]
pub struct PrewarmCacheTask<N, P, Evm>
pub(super) struct PrewarmCacheTask<N, P, Evm>
where
N: NodePrimitives,
Evm: ConfigureEvm<Primitives = N>,
@@ -84,6 +82,8 @@ where
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// The number of transactions to be processed
transaction_count_hint: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -99,11 +99,12 @@ where
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Initializes the task with the given transactions pending execution
pub fn new(
pub(super) fn new(
executor: WorkloadExecutor,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
@@ -111,7 +112,7 @@ where
trace!(
target: "engine::tree::payload_processor::prewarm",
max_concurrency,
transaction_count = ctx.env.transaction_count,
transaction_count_hint,
"Initialized prewarm task"
);
@@ -121,6 +122,7 @@ where
execution_cache,
ctx,
max_concurrency,
transaction_count_hint,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
@@ -144,6 +146,7 @@ where
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
let span = Span::current();
self.executor.spawn_blocking(move || {
@@ -151,14 +154,13 @@ where
let (done_tx, done_rx) = mpsc::channel();
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
let workers_needed = if transaction_count_hint == 0 {
max_concurrency
} else {
transaction_count.min(max_concurrency)
transaction_count_hint.min(max_concurrency)
};
// Spawn workers
@@ -368,8 +370,11 @@ where
name = "prewarm and caching",
skip_all
)]
pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
where
pub(super) fn run<Tx>(
self,
mode: PrewarmMode<Tx>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
// Spawn execution tasks based on mode
@@ -431,29 +436,23 @@ where
/// Context required by tx execution tasks.
#[derive(Debug, Clone)]
pub struct PrewarmContext<N, P, Evm>
pub(super) struct PrewarmContext<N, P, Evm>
where
N: NodePrimitives,
Evm: ConfigureEvm<Primitives = N>,
{
/// The execution environment.
pub env: ExecutionEnv<Evm>,
/// The EVM configuration.
pub evm_config: Evm,
/// The saved cache.
pub saved_cache: Option<SavedCache>,
pub(super) env: ExecutionEnv<Evm>,
pub(super) evm_config: Evm,
pub(super) saved_cache: Option<SavedCache>,
/// Provider to obtain the state
pub provider: StateProviderBuilder<N, P>,
/// The metrics for the prewarm task.
pub metrics: PrewarmMetrics,
pub(super) provider: StateProviderBuilder<N, P>,
pub(super) metrics: PrewarmMetrics,
/// An atomic bool that tells prewarm tasks to not start any more execution.
pub terminate_execution: Arc<AtomicBool>,
/// Whether the precompile cache is disabled.
pub precompile_cache_disabled: bool,
/// The precompile cache map.
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
pub(super) terminate_execution: Arc<AtomicBool>,
pub(super) precompile_cache_disabled: bool,
pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// Whether V2 proof calculation is enabled.
pub v2_proofs_enabled: bool,
pub(super) v2_proofs_enabled: bool,
}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
@@ -853,8 +852,7 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
/// execution path without cloning the expensive `BundleState`.
#[derive(Debug)]
pub enum PrewarmTaskEvent<R> {
pub(super) enum PrewarmTaskEvent<R> {
/// Forcefully terminate all remaining transaction execution.
TerminateTransactionExecution,
/// Forcefully terminate the task on demand and update the shared cache with the given output
@@ -884,7 +882,7 @@ pub enum PrewarmTaskEvent<R> {
/// Metrics for transactions prewarming.
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.prewarm")]
pub struct PrewarmMetrics {
pub(crate) struct PrewarmMetrics {
/// The number of transactions to prewarm
pub(crate) transactions: Gauge,
/// A histogram of the number of transactions to prewarm

View File

@@ -3,19 +3,19 @@
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
VersionedMultiProofTargets,
},
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use alloy_rlp::Decodable;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_primitives_traits::Account;
use reth_revm::state::EvmState;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
TrieAccount, EMPTY_ROOT_HASH,
};
use reth_trie_parallel::{
proof_task::{
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
@@ -72,7 +72,7 @@ where
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
@@ -88,7 +88,7 @@ where
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
@@ -135,7 +135,6 @@ where
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
name = "SparseTrieTask::run",
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
@@ -199,17 +198,13 @@ where
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
let deferred = self.trie.take_deferred_drops();
(self.trie, deferred)
self.trie
}
}
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
/// Sender for proof results.
@@ -222,15 +217,6 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
trie: SparseStateTrie<A, S>,
/// Handle to the proof worker pools (storage and account).
proof_worker_handle: ProofWorkerHandle,
/// The size of proof targets chunk to spawn in one calculation.
/// If None, chunking is disabled and all targets are processed in a single proof.
chunk_size: Option<usize>,
/// If this number is exceeded and chunking is enabled, then this will override whether or not
/// there are any active workers and force chunking across workers. This is to prevent tasks
/// which are very long from hitting a single worker.
max_targets_for_chunking: usize,
/// Account trie updates.
account_updates: B256Map<LeafUpdate>,
/// Storage trie updates. hashed address -> slot -> update.
@@ -255,16 +241,12 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
/// Cache of storage proof targets that have already been fetched/requested from the proof
/// workers. account -> slot -> lowest `min_len` requested.
fetched_storage_targets: B256Map<B256Map<u8>>,
/// Reusable buffer for RLP encoding of accounts.
account_rlp_buf: Vec<u8>,
/// Whether the last state update has been received.
finished_state_updates: bool,
/// Pending targets to be dispatched to the proof workers.
pending_targets: MultiProofTargetsV2,
/// Number of pending execution/prewarming updates received but not yet passed to
/// `update_leaves`.
/// Number of pending updates that were received but not yet processed.
pending_updates: usize,
/// Metrics for the sparse trie.
metrics: MultiProofTaskMetrics,
}
@@ -274,13 +256,12 @@ where
A: SparseTrieExt + Default,
S: SparseTrieExt + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new(
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
chunk_size: Option<usize>,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
Self {
@@ -289,14 +270,11 @@ where
updates,
proof_worker_handle,
trie,
chunk_size,
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
account_updates: Default::default(),
storage_updates: Default::default(),
pending_account_updates: Default::default(),
fetched_account_targets: Default::default(),
fetched_storage_targets: Default::default(),
account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
finished_state_updates: Default::default(),
pending_targets: Default::default(),
pending_updates: Default::default(),
@@ -313,11 +291,10 @@ where
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> SparseStateTrie<A, S> {
self.trie.prune(prune_depth, max_storage_tries);
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
let deferred = self.trie.take_deferred_drops();
(self.trie, deferred)
self.trie
}
/// Clears and shrinks the trie, discarding all state.
@@ -328,11 +305,10 @@ where
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
let deferred = self.trie.take_deferred_drops();
(self.trie, deferred)
self.trie
}
/// Runs the sparse trie task to completion.
@@ -342,7 +318,6 @@ where
///
/// This concludes once the last state update has been received and processed.
#[instrument(
name = "SparseTrieCacheTask::run",
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
@@ -371,33 +346,24 @@ where
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
while let Ok(next) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = next.result? else {
while let Ok(res) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = res.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
result.extend(res);
}
self.on_proof_result(result)?;
},
}
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
// If we don't have any pending messages, we can spend some time on computing
// storage roots and promoting account updates.
self.dispatch_pending_targets();
self.promote_pending_account_updates()?;
self.process_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
// If we don't have any pending updates OR we've accumulated a lot already, apply
// them to the trie,
} else if self.updates.is_empty() || self.pending_updates > 100 {
self.process_leaf_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() ||
self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default()
{
// Make sure to dispatch targets if we don't have any updates or if we've
// accumulated a lot of them.
} else if self.updates.is_empty() || self.pending_targets.chunking_length() > 100 {
self.dispatch_pending_targets();
}
@@ -424,7 +390,6 @@ where
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
/// Processes a [`MultiProofMessage`].
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
match message {
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
@@ -520,8 +485,6 @@ where
})
}
/// Applies all account and storage leaf updates to corresponding tries and collects any new
/// multiproof targets.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
@@ -530,21 +493,26 @@ where
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
self.pending_updates = 0;
// Start with processing all storage updates in parallel.
let storage_results = self
.storage_updates
// Make sure that tries exist for all addresses that have updates.
for address in self.storage_updates.keys() {
self.trie.get_or_create_storage_trie_mut(*address);
}
let storage_results: Vec<_> = self
.trie
.storage_tries()
.iter_mut()
.map(|(address, updates)| {
let trie = self.trie.take_or_create_storage_trie(address);
.filter_map(|(address, trie)| {
let updates = self.storage_updates.remove(address)?;
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
(address, updates, fetched, trie)
Some((address, updates, fetched, trie))
})
.par_bridge()
.map(|(address, updates, mut fetched, mut trie)| {
.map(|(address, mut updates, mut fetched, trie)| {
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
trie.update_leaves(&mut updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
@@ -557,13 +525,13 @@ where
}
})?;
SparseTrieResult::Ok((address, targets, fetched, trie))
SparseTrieResult::Ok((address, targets, fetched, updates))
})
.collect::<Result<Vec<_>, _>>()?;
for (address, targets, fetched, trie) in storage_results {
for (address, targets, fetched, updates) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.trie.insert_storage_trie(*address, trie);
self.storage_updates.insert(*address, updates);
if !targets.is_empty() {
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
@@ -571,17 +539,6 @@ where
}
// Process account trie updates and fill the account targets.
self.process_account_leaf_updates()?;
Ok(())
}
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
///
/// Returns whether any updates were drained (applied to the trie).
fn process_account_leaf_updates(&mut self) -> SparseTrieResult<bool> {
let updates_len_before = self.account_updates.len();
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
@@ -602,18 +559,16 @@ where
},
)?;
Ok(self.account_updates.len() < updates_len_before)
Ok(())
}
/// Iterates through all storage tries for which all updates were processed, computes their
/// storage roots, and promotes corresponding pending account updates into proper leaf updates
/// for accounts trie.
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
fn process_updates(&mut self) -> SparseTrieResult<()> {
self.process_leaf_updates()?;
if self.pending_account_updates.is_empty() {
@@ -622,7 +577,7 @@ where
let roots = self
.trie
.storage_tries_mut()
.storage_tries()
.par_iter_mut()
.filter(|(address, _)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
@@ -635,10 +590,10 @@ where
})
.collect::<Vec<_>>();
for (addr, storage_root) in roots {
for (address, storage_root) in roots {
// If the storage root is known and we have a pending update for this account, encode it
// into a proper update.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*address) &&
entry.get().is_some()
{
let account = entry.remove().expect("just checked, should be Some");
@@ -647,70 +602,77 @@ where
{
Vec::new()
} else {
self.account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(&mut self.account_rlp_buf);
self.account_rlp_buf.clone()
// TODO: optimize allocation
alloy_rlp::encode(account.unwrap_or_default().into_trie_account(storage_root))
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
self.account_updates.insert(*address, LeafUpdate::Changed(encoded));
}
}
loop {
// Now handle pending account updates that can be upgraded to a proper update.
let account_rlp_buf = &mut self.account_rlp_buf;
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
return true;
// Now promote pending account updates if possible.
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
return true;
}
// Get the current account state either from the trie or from latest account update.
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
Some(encoded).filter(|encoded| !encoded.is_empty())
} else if !self.account_updates.contains_key(addr) {
self.trie.get_account_value(addr)
} else {
// Needs to be revealed first
return true;
};
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
let (account, storage_root) = if let Some(account) = account.take() {
// If account is Some(_) here it means it didn't have any storage updates
// and we can fetch the storage root directly from the account trie.
//
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
(account, storage_root)
} else {
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
};
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
Vec::new()
} else {
let account = account.unwrap_or_default().into_trie_account(storage_root);
// TODO: optimize allocation
alloy_rlp::encode(account)
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
false
});
// Process account trie updates and fill the account targets.
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
// Get the current account state either from the trie or from latest account update.
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
Some(encoded).filter(|encoded| !encoded.is_empty())
} else if !self.account_updates.contains_key(addr) {
self.trie.get_account_value(addr)
} else {
// Needs to be revealed first
return true;
};
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
let (account, storage_root) = if let Some(account) = account.take() {
// If account is Some(_) here it means it didn't have any storage updates
// and we can fetch the storage root directly from the account trie.
//
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
(account, storage_root)
} else {
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
};
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
Vec::new()
} else {
account_rlp_buf.clear();
account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
false
});
// Only exit when no new updates are processed.
//
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if !self.process_account_leaf_updates()? {
break
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
},
)?;
Ok(())
}
@@ -726,8 +688,8 @@ where
dispatch_with_chunking(
std::mem::take(&mut self.pending_targets),
chunking_length,
self.chunk_size,
self.max_targets_for_chunking,
Some(60),
300,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
MultiProofTargetsV2::chunks,
@@ -800,7 +762,7 @@ where
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge_buffered()
.par_bridge()
.map(|(address, storage, storage_trie)| {
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)

View File

@@ -407,7 +407,6 @@ where
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
};
// Plan the strategy used for state root computation.
@@ -520,14 +519,6 @@ where
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
// we double check the state root here for good measure
if state_root == block.header().state_root() {
// Compare trie updates with serial computation if configured
if self.config.always_compare_trie_updates() {
self.compare_trie_updates_with_serial(
overlay_factory.clone(),
&hashed_state,
trie_updates.clone(),
);
}
maybe_state_root = Some((state_root, trie_updates, elapsed))
} else {
warn!(
@@ -903,62 +894,6 @@ where
.root_with_updates()?)
}
/// Compares trie updates from the state root task with serial state root computation.
///
/// This is used for debugging and validating the correctness of the parallel state root
/// task implementation. When enabled via `--engine.state-root-task-compare-updates`, this
/// method runs a separate serial state root computation and compares the resulting trie
/// updates.
fn compare_trie_updates_with_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
task_trie_updates: TrieUpdates,
) {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
Ok((serial_root, serial_trie_updates)) => {
debug!(
target: "engine::tree::payload_validator",
?serial_root,
"Serial state root computation finished for comparison"
);
// Get a database provider to use as trie cursor factory
match overlay_factory.database_provider_ro() {
Ok(provider) => {
if let Err(err) = super::trie_updates::compare_trie_updates(
&provider,
task_trie_updates,
serial_trie_updates,
) {
warn!(
target: "engine::tree::payload_validator",
%err,
"Error comparing trie updates"
);
}
}
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Failed to get database provider for trie update comparison"
);
}
}
}
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Failed to compute serial state root for comparison"
);
}
}
}
/// Validates the block after execution.
///
/// This performs:

View File

@@ -1,9 +1,9 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use dashmap::DashMap;
use moka::policy::EvictionPolicy;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use reth_primitives_traits::dashmap::DashMap;
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
use revm_primitives::Address;
use std::{hash::Hash, sync::Arc};
@@ -21,8 +21,7 @@ impl<S> PrecompileCacheMap<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
/// Get the precompile cache for the given address.
pub fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
// Try just using `.get` first to avoid acquiring a write lock.
if let Some(cache) = self.0.get(&address) {
return cache.clone();
@@ -91,7 +90,7 @@ impl<S> CacheEntry<S> {
/// A cache for precompile inputs / outputs.
#[derive(Debug)]
pub struct CachedPrecompile<S>
pub(crate) struct CachedPrecompile<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
@@ -110,7 +109,7 @@ where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
/// `CachedPrecompile` constructor.
pub const fn new(
pub(crate) const fn new(
precompile: DynPrecompile,
cache: PrecompileCache<S>,
spec_id: S,
@@ -119,8 +118,7 @@ where
Self { precompile, cache, spec_id, metrics }
}
/// Wrap the given precompile in a cached precompile.
pub fn wrap(
pub(crate) fn wrap(
precompile: DynPrecompile,
cache: PrecompileCache<S>,
spec_id: S,
@@ -198,18 +196,18 @@ where
/// Metrics for the cached precompile.
#[derive(reth_metrics::Metrics, Clone)]
#[metrics(scope = "sync.caching")]
pub struct CachedPrecompileMetrics {
pub(crate) struct CachedPrecompileMetrics {
/// Precompile cache hits
pub precompile_cache_hits: metrics::Counter,
precompile_cache_hits: metrics::Counter,
/// Precompile cache misses
pub precompile_cache_misses: metrics::Counter,
precompile_cache_misses: metrics::Counter,
/// Precompile cache size. Uses the LRU cache length as the size metric.
pub precompile_cache_size: metrics::Gauge,
precompile_cache_size: metrics::Gauge,
/// Precompile execution errors.
pub precompile_errors: metrics::Counter,
precompile_errors: metrics::Counter,
}
impl CachedPrecompileMetrics {
@@ -217,7 +215,7 @@ impl CachedPrecompileMetrics {
///
/// Adds address as an `address` label padded with zeros to at least two hex symbols, prefixed
/// by `0x`.
pub fn new_with_address(address: Address) -> Self {
pub(crate) fn new_with_address(address: Address) -> Self {
Self::new_with_labels(&[("address", format!("0x{address:02x}"))])
}
}

View File

@@ -3,7 +3,7 @@
use crate::engine::EngineApiKind;
use alloy_eips::BlockNumHash;
use alloy_primitives::{
map::{B256Map, B256Set},
map::{HashMap, HashSet},
BlockNumber, B256,
};
use reth_chain_state::{DeferredTrieData, EthPrimitives, ExecutedBlock, LazyOverlay};
@@ -25,7 +25,7 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
/// __All__ unique executed blocks by block hash that are connected to the canonical chain.
///
/// This includes blocks of all forks.
pub(crate) blocks_by_hash: B256Map<ExecutedBlock<N>>,
pub(crate) blocks_by_hash: HashMap<B256, ExecutedBlock<N>>,
/// Executed blocks grouped by their respective block number.
///
/// This maps unique block number to all known blocks for that height.
@@ -33,7 +33,7 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
/// Note: there can be multiple blocks at the same height due to forks.
pub(crate) blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock<N>>>,
/// Map of any parent block hash to its children.
pub(crate) parent_to_child: B256Map<B256Set>,
pub(crate) parent_to_child: HashMap<B256, HashSet<B256>>,
/// Currently tracked canonical head of the chain.
pub(crate) current_canonical_head: BlockNumHash,
/// The engine API variant of this handler
@@ -48,34 +48,37 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
impl<N: NodePrimitives> TreeState<N> {
/// Returns a new, empty tree state that points to the given canonical head.
pub fn new(current_canonical_head: BlockNumHash, engine_kind: EngineApiKind) -> Self {
pub(crate) fn new(current_canonical_head: BlockNumHash, engine_kind: EngineApiKind) -> Self {
Self {
blocks_by_hash: B256Map::default(),
blocks_by_hash: HashMap::default(),
blocks_by_number: BTreeMap::new(),
current_canonical_head,
parent_to_child: B256Map::default(),
parent_to_child: HashMap::default(),
engine_kind,
cached_canonical_overlay: None,
}
}
/// Resets the state and points to the given canonical head.
pub fn reset(&mut self, current_canonical_head: BlockNumHash) {
pub(crate) fn reset(&mut self, current_canonical_head: BlockNumHash) {
*self = Self::new(current_canonical_head, self.engine_kind);
}
/// Returns the number of executed blocks stored.
pub fn block_count(&self) -> usize {
pub(crate) fn block_count(&self) -> usize {
self.blocks_by_hash.len()
}
/// Returns the [`ExecutedBlock`] by hash.
pub fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
pub(crate) fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
self.blocks_by_hash.get(&hash)
}
/// Returns the sealed block header by hash.
pub fn sealed_header_by_hash(&self, hash: &B256) -> Option<SealedHeader<N::BlockHeader>> {
pub(crate) fn sealed_header_by_hash(
&self,
hash: &B256,
) -> Option<SealedHeader<N::BlockHeader>> {
self.blocks_by_hash.get(hash).map(|b| b.sealed_block().sealed_header().clone())
}
@@ -84,7 +87,7 @@ impl<N: NodePrimitives> TreeState<N> {
/// highest persisted block connected to this chain.
///
/// Returns `None` if the block for the given hash is not found.
pub fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
pub(crate) fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
let block = self.blocks_by_hash.get(&hash).cloned()?;
let mut parent_hash = block.recovered_block().parent_hash();
let mut blocks = vec![block];
@@ -157,7 +160,7 @@ impl<N: NodePrimitives> TreeState<N> {
}
/// Insert executed block into the state.
pub fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
pub(crate) fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
let hash = executed.recovered_block().hash();
let parent_hash = executed.recovered_block().parent_hash();
let block_number = executed.recovered_block().number();
@@ -178,7 +181,7 @@ impl<N: NodePrimitives> TreeState<N> {
/// ## Returns
///
/// The removed block and the block hashes of its children.
fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, B256Set)> {
fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, HashSet<B256>)> {
let executed = self.blocks_by_hash.remove(&hash)?;
// Remove this block from collection of children of its parent block.
@@ -213,7 +216,7 @@ impl<N: NodePrimitives> TreeState<N> {
}
/// Returns whether or not the hash is part of the canonical chain.
pub fn is_canonical(&self, hash: B256) -> bool {
pub(crate) fn is_canonical(&self, hash: B256) -> bool {
let mut current_block = self.current_canonical_head.hash;
if current_block == hash {
return true
@@ -231,7 +234,11 @@ impl<N: NodePrimitives> TreeState<N> {
/// Removes canonical blocks below the upper bound, only if the last persisted hash is
/// part of the canonical chain.
pub fn remove_canonical_until(&mut self, upper_bound: BlockNumber, last_persisted_hash: B256) {
pub(crate) fn remove_canonical_until(
&mut self,
upper_bound: BlockNumber,
last_persisted_hash: B256,
) {
debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
// If the last persisted hash is not canonical, then we don't want to remove any canonical
@@ -256,7 +263,7 @@ impl<N: NodePrimitives> TreeState<N> {
/// Removes all blocks that are below the finalized block, as well as removing non-canonical
/// sidechains that fork from below the finalized block.
pub fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
// We remove disconnected sidechains in three steps:
@@ -316,7 +323,7 @@ impl<N: NodePrimitives> TreeState<N> {
/// NOTE: if the finalized block is greater than the upper bound, the only blocks that will be
/// removed are canonical blocks and sidechains that fork below the `upper_bound`. This is the
/// same behavior as if the `finalized_num` were `Some(upper_bound)`.
pub fn remove_until(
pub(crate) fn remove_until(
&mut self,
upper_bound: BlockNumHash,
last_persisted_hash: B256,
@@ -354,22 +361,22 @@ impl<N: NodePrimitives> TreeState<N> {
}
/// Updates the canonical head to the given block.
pub const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
pub(crate) const fn set_canonical_head(&mut self, new_head: BlockNumHash) {
self.current_canonical_head = new_head;
}
/// Returns the tracked canonical head.
pub const fn canonical_head(&self) -> &BlockNumHash {
pub(crate) const fn canonical_head(&self) -> &BlockNumHash {
&self.current_canonical_head
}
/// Returns the block hash of the canonical head.
pub const fn canonical_block_hash(&self) -> B256 {
pub(crate) const fn canonical_block_hash(&self) -> B256 {
self.canonical_head().hash
}
/// Returns the block number of the canonical head.
pub const fn canonical_block_number(&self) -> BlockNumber {
pub(crate) const fn canonical_block_number(&self) -> BlockNumber {
self.canonical_head().number
}
}
@@ -379,7 +386,7 @@ impl<N: NodePrimitives> TreeState<N> {
/// Determines if the second block is a descendant of the first block.
///
/// If the two blocks are the same, this returns `false`.
pub fn is_descendant(
pub(crate) fn is_descendant(
&self,
first: BlockNumHash,
second: alloy_eips::eip1898::BlockWithParent,
@@ -489,7 +496,7 @@ mod tests {
assert_eq!(
tree_state.parent_to_child.get(&blocks[0].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[1].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[1].recovered_block().hash()]))
);
assert!(!tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
@@ -498,7 +505,7 @@ mod tests {
assert_eq!(
tree_state.parent_to_child.get(&blocks[1].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[2].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[2].recovered_block().hash()]))
);
assert!(tree_state.parent_to_child.contains_key(&blocks[1].recovered_block().hash()));
@@ -586,11 +593,11 @@ mod tests {
assert_eq!(
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
);
assert_eq!(
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
);
}
@@ -636,11 +643,11 @@ mod tests {
assert_eq!(
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
);
assert_eq!(
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
);
}
@@ -686,11 +693,11 @@ mod tests {
assert_eq!(
tree_state.parent_to_child.get(&blocks[2].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[3].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[3].recovered_block().hash()]))
);
assert_eq!(
tree_state.parent_to_child.get(&blocks[3].recovered_block().hash()),
Some(&B256Set::from_iter([blocks[4].recovered_block().hash()]))
Some(&HashSet::from_iter([blocks[4].recovered_block().hash()]))
);
}
}

View File

@@ -11,7 +11,7 @@ use reth_trie_db::ChangesetCache;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{
map::{B256Map, B256Set},
map::{HashMap, HashSet},
Bytes, B256,
};
use alloy_rlp::Decodable;
@@ -28,7 +28,6 @@ use reth_ethereum_primitives::{Block, EthPrimitives};
use reth_evm_ethereum::MockEvmConfig;
use reth_primitives_traits::Block as _;
use reth_provider::test_utils::MockEthProvider;
use reth_tasks::spawn_os_thread;
use std::{
collections::BTreeMap,
str::FromStr,
@@ -235,11 +234,11 @@ impl TestHarness {
}
fn with_blocks(mut self, blocks: Vec<ExecutedBlock>) -> Self {
let mut blocks_by_hash = B256Map::default();
let mut blocks_by_hash = HashMap::default();
let mut blocks_by_number = BTreeMap::new();
let mut state_by_hash = B256Map::default();
let mut state_by_hash = HashMap::default();
let mut hash_by_number = BTreeMap::new();
let mut parent_to_child: B256Map<B256Set> = B256Map::default();
let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
let mut parent_hash = B256::ZERO;
for block in &blocks {
@@ -539,7 +538,10 @@ async fn test_tree_persist_blocks() {
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
.collect();
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
spawn_os_thread("engine", || test_harness.tree.run());
std::thread::Builder::new()
.name("Engine Task".to_string())
.spawn(|| test_harness.tree.run())
.unwrap();
// send a message to the tree to enter the main loop.
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
@@ -957,7 +959,7 @@ async fn test_engine_tree_fcu_missing_head() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
let expected_block_set = B256Set::from_iter([missing_block.hash()]);
let expected_block_set = HashSet::from_iter([missing_block.hash()]);
assert_eq!(actual_block_set, expected_block_set);
}
_ => panic!("Unexpected event: {event:#?}"),
@@ -1002,7 +1004,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, B256Set::from_iter([main_chain_last_hash]));
assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {event:#?}"),
}
@@ -1011,7 +1013,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, B256Set::from_iter([main_chain_last_hash]));
assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {event:#?}"),
}
@@ -1987,7 +1989,10 @@ mod forkchoice_updated_tests {
let action_rx = test_harness.action_rx;
// Spawn tree in background thread
spawn_os_thread("engine", || test_harness.tree.run());
std::thread::Builder::new()
.name("Engine Task".to_string())
.spawn(|| test_harness.tree.run())
.unwrap();
// Send terminate request
to_tree_tx

View File

@@ -1,7 +1,4 @@
use alloy_primitives::{
map::{B256Map, HashMap},
B256,
};
use alloy_primitives::{map::HashMap, B256};
use reth_db::DatabaseError;
use reth_trie::{
trie_cursor::{TrieCursor, TrieCursorFactory},
@@ -22,7 +19,7 @@ struct EntryDiff<T> {
struct TrieUpdatesDiff {
account_nodes: HashMap<Nibbles, EntryDiff<Option<BranchNodeCompact>>>,
removed_nodes: HashMap<Nibbles, EntryDiff<bool>>,
storage_tries: B256Map<StorageTrieUpdatesDiff>,
storage_tries: HashMap<B256, StorageTrieUpdatesDiff>,
}
impl TrieUpdatesDiff {
@@ -101,7 +98,7 @@ impl StorageTrieUpdatesDiff {
/// Compares the trie updates from state root task, regular state root calculation and database,
/// and logs the differences if there's any.
pub(crate) fn compare_trie_updates(
pub(super) fn compare_trie_updates(
trie_cursor_factory: impl TrieCursorFactory,
task: TrieUpdates,
regular: TrieUpdates,
@@ -189,8 +186,7 @@ fn compare_storage_trie_updates<C: TrieCursor>(
task: &mut StorageTrieUpdates,
regular: &mut StorageTrieUpdates,
) -> Result<StorageTrieUpdatesDiff, DatabaseError> {
// Check if the storage trie exists by seeking to the first entry
let database_not_exists = trie_cursor()?.seek(Nibbles::default())?.is_none();
let database_not_exists = trie_cursor()?.next()?.is_none();
let mut diff = StorageTrieUpdatesDiff {
// If the deletion is a no-op, meaning that the entry is not in the
// database, do not add it to the diff.

View File

@@ -20,6 +20,8 @@ reth-era.workspace = true
# http
bytes.workspace = true
reqwest.workspace = true
reqwest.default-features = false
reqwest.features = ["stream", "rustls-tls-native-roots"]
# async
tokio.workspace = true

View File

@@ -86,7 +86,7 @@ where
mut self,
components: impl CliComponentsBuilder<N>,
launcher: impl AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> Result<()>,
) -> Result<()>
@@ -132,7 +132,7 @@ pub(crate) fn run_commands_with<C, Ext, Rpc, N, SubCmd>(
runner: CliRunner,
components: impl CliComponentsBuilder<N>,
launcher: impl AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> Result<()>,
) -> Result<()>

View File

@@ -131,7 +131,7 @@ impl<
/// ````
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
where
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
Fut: Future<Output = eyre::Result<()>>,
C: ChainSpecParser<ChainSpec = ChainSpec>,
{
@@ -148,7 +148,7 @@ impl<
self,
components: impl CliComponentsBuilder<N>,
launcher: impl AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> eyre::Result<()>,
) -> eyre::Result<()>
@@ -180,7 +180,7 @@ impl<
/// ```
pub fn with_runner<L, Fut>(self, runner: CliRunner, launcher: L) -> eyre::Result<()>
where
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
Fut: Future<Output = eyre::Result<()>>,
C: ChainSpecParser<ChainSpec = ChainSpec>,
{
@@ -196,7 +196,7 @@ impl<
runner: CliRunner,
components: impl CliComponentsBuilder<N>,
launcher: impl AsyncFnOnce(
WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>,
WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
Ext,
) -> eyre::Result<()>,
) -> eyre::Result<()>

View File

@@ -119,9 +119,10 @@ impl EthereumNode {
/// use reth_db::open_db_read_only;
/// use reth_node_ethereum::EthereumNode;
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
/// use std::sync::Arc;
///
/// let factory = EthereumNode::provider_factory_builder()
/// .db(open_db_read_only("db", Default::default()).unwrap())
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())

View File

@@ -100,12 +100,10 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
ChainSpecBuilder::default().chain(MAINNET.chain).genesis(genesis).osaka_activated().build(),
);
let genesis_hash = chain_spec.genesis_hash();
let node_config = NodeConfig::test().with_chain(chain_spec).with_unused_ports().with_rpc(
RpcServerArgs::default()
.with_unused_ports()
.with_http()
.with_force_blob_sidecar_upcasting(),
);
let node_config = NodeConfig::test()
.with_chain(chain_spec)
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.node(EthereumNode::default())
@@ -127,7 +125,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
let blob_tx_hash = node.rpc.inject_tx(blob_tx).await?;
// fetch it from rpc
let envelope = node.rpc.envelope_by_hash(blob_tx_hash).await?;
// assert that sidecar was converted to eip7594 (force upcasting is enabled)
// assert that sidecar was converted to eip7594
assert!(envelope.as_eip4844().unwrap().tx().sidecar().unwrap().is_eip7594());
// validate sidecar
TransactionTestContext::validate_sidecar(envelope);
@@ -163,12 +161,10 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
.build(),
);
let genesis_hash = chain_spec.genesis_hash();
let node_config = NodeConfig::test().with_chain(chain_spec).with_unused_ports().with_rpc(
RpcServerArgs::default()
.with_unused_ports()
.with_http()
.with_force_blob_sidecar_upcasting(),
);
let node_config = NodeConfig::test()
.with_chain(chain_spec)
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.node(EthereumNode::default())

View File

@@ -511,8 +511,9 @@ mod compact {
total_length += flags.len() + buffer.len();
buf.put_slice(&flags);
if zstd {
reth_zstd_compressors::with_receipt_compressor(|compressor| {
let compressed = compressor.compress(&buffer).expect("Failed to compress.");
reth_zstd_compressors::RECEIPT_COMPRESSOR.with(|compressor| {
let compressed =
compressor.borrow_mut().compress(&buffer).expect("Failed to compress.");
buf.put(compressed.as_slice());
});
} else {
@@ -524,7 +525,8 @@ mod compact {
fn from_compact(buf: &[u8], _len: usize) -> (Self, &[u8]) {
let (flags, mut buf) = ReceiptFlags::from(buf);
if flags.__zstd() != 0 {
reth_zstd_compressors::with_receipt_decompressor(|decompressor| {
reth_zstd_compressors::RECEIPT_DECOMPRESSOR.with(|decompressor| {
let decompressor = &mut decompressor.borrow_mut();
let decompressed = decompressor.decompress(buf);
let original_buf = buf;
let mut buf: &[u8] = decompressed;

View File

@@ -577,11 +577,19 @@ impl reth_codecs::Compact for TransactionSigned {
let tx_bits = if zstd_bit {
let mut tmp = Vec::with_capacity(256);
reth_zstd_compressors::with_tx_compressor(|compressor| {
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
} else {
let mut compressor = reth_zstd_compressors::create_tx_compressor();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
}
} else {
self.transaction.to_compact(buf) as u8
};
@@ -603,13 +611,26 @@ impl reth_codecs::Compact for TransactionSigned {
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
reth_zstd_compressors::with_tx_decompressor(|decompressor| {
// TODO: enforce that zstd is only present at a "top" level type
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
(transaction, buf)
})
} else {
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
(transaction, buf)
})
}
} else {
let transaction_type = bitflags >> 1;
Transaction::from_compact(buf, transaction_type)

View File

@@ -36,6 +36,7 @@ rayon = { workspace = true, optional = true }
[dev-dependencies]
reth-ethereum-primitives.workspace = true
reth-ethereum-forks.workspace = true
[features]
default = ["std"]
@@ -46,6 +47,7 @@ std = [
"alloy-primitives/std",
"alloy-consensus/std",
"revm/std",
"reth-ethereum-forks/std",
"alloy-evm/std",
"reth-execution-errors/std",
"reth-execution-types/std",

View File

@@ -57,12 +57,6 @@ pub enum StateProofError {
/// RLP decoding error.
#[error(transparent)]
Rlp(#[from] alloy_rlp::Error),
/// Trie inconsistency detected during proof calculation.
///
/// This occurs when cached trie nodes disagree with the leaf data, causing
/// proof calculation to be unable to make forward progress.
#[error("trie inconsistency: {0}")]
TrieInconsistency(alloc::string::String),
}
impl From<StateProofError> for ProviderError {
@@ -70,7 +64,6 @@ impl From<StateProofError> for ProviderError {
match value {
StateProofError::Database(error) => Self::Database(error),
StateProofError::Rlp(error) => Self::Rlp(error),
StateProofError::TrieInconsistency(msg) => Self::Database(DatabaseError::Other(msg)),
}
}
}

View File

@@ -1,11 +1,7 @@
use crate::{BlockExecutionOutput, BlockExecutionResult};
use alloc::{vec, vec::Vec};
use alloy_eips::eip7685::Requests;
use alloy_primitives::{
logs_bloom,
map::{AddressMap, B256Map, HashMap},
Address, BlockNumber, Bloom, Log, B256, U256,
};
use alloy_primitives::{logs_bloom, map::HashMap, Address, BlockNumber, Bloom, Log, B256, U256};
use reth_primitives_traits::{Account, Bytecode, Receipt, StorageEntry};
use reth_trie_common::{HashedPostState, KeyHasher};
use revm::{
@@ -14,13 +10,14 @@ use revm::{
};
/// Type used to initialize revms bundle state.
pub type BundleStateInit = AddressMap<(Option<Account>, Option<Account>, B256Map<(U256, U256)>)>;
pub type BundleStateInit =
HashMap<Address, (Option<Account>, Option<Account>, HashMap<B256, (U256, U256)>)>;
/// Types used inside `RevertsInit` to initialize revms reverts.
pub type AccountRevertInit = (Option<Option<Account>>, Vec<StorageEntry>);
/// Type used to initialize revms reverts.
pub type RevertsInit = HashMap<BlockNumber, AddressMap<AccountRevertInit>>;
pub type RevertsInit = HashMap<BlockNumber, HashMap<Address, AccountRevertInit>>;
/// Represents a changed account
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -617,12 +614,12 @@ mod tests {
);
// Create a BundleStateInit object and insert initial data
let mut state_init: BundleStateInit = AddressMap::default();
let mut state_init: BundleStateInit = HashMap::default();
state_init
.insert(Address::new([2; 20]), (None, Some(Account::default()), B256Map::default()));
.insert(Address::new([2; 20]), (None, Some(Account::default()), HashMap::default()));
// Create an AddressMap for account reverts and insert initial data
let mut revert_inner: AddressMap<AccountRevertInit> = AddressMap::default();
// Create a HashMap for account reverts and insert initial data
let mut revert_inner: HashMap<Address, AccountRevertInit> = HashMap::default();
revert_inner.insert(Address::new([2; 20]), (None, vec![]));
// Create a RevertsInit object and insert the revert_inner data

View File

@@ -41,7 +41,6 @@ metrics.workspace = true
[dev-dependencies]
reth-tracing.workspace = true
alloy-primitives = { workspace = true, features = ["rand"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
secp256k1 = { workspace = true, features = ["std", "rand"] }
rand_08.workspace = true

View File

@@ -30,12 +30,12 @@ tokio-stream.workspace = true
hickory-resolver = { workspace = true, features = ["tokio"] }
# misc
dashmap = { workspace = true, features = ["inline"] }
data-encoding.workspace = true
linked_hash_set.workspace = true
schnellru.workspace = true
thiserror.workspace = true
tracing.workspace = true
parking_lot.workspace = true
serde = { workspace = true, optional = true }
serde_with = { workspace = true, optional = true }
@@ -56,9 +56,9 @@ serde = [
"alloy-primitives/serde",
"enr/serde",
"linked_hash_set/serde",
"parking_lot/serde",
"rand/serde",
"secp256k1/serde",
"hickory-resolver/serde",
"reth-ethereum-forks/serde",
"dashmap/serde",
]

View File

@@ -1,9 +1,9 @@
//! Perform DNS lookups
use dashmap::DashMap;
use hickory_resolver::name_server::ConnectionProvider;
pub use hickory_resolver::{ResolveError, TokioResolver};
use std::future::Future;
use parking_lot::RwLock;
use std::{collections::HashMap, future::Future};
use tracing::trace;
/// A type that can lookup DNS entries
@@ -72,25 +72,25 @@ impl Resolver for DnsResolver {
/// A [Resolver] that uses an in memory map to lookup entries
#[derive(Debug, Default)]
pub struct MapResolver(DashMap<String, String>);
pub struct MapResolver(RwLock<HashMap<String, String>>);
// === impl MapResolver ===
impl MapResolver {
/// Inserts a key-value pair into the map.
pub fn insert(&self, k: String, v: String) -> Option<String> {
self.0.insert(k, v)
self.0.write().insert(k, v)
}
/// Returns the value corresponding to the key
pub fn get(&self, k: &str) -> Option<String> {
self.0.get(k).map(|entry| entry.value().clone())
self.0.read().get(k).cloned()
}
/// Removes a key from the map, returning the value at the key if the key was previously in the
/// map.
pub fn remove(&self, k: &str) -> Option<String> {
self.0.remove(k).map(|(_, v)| v)
self.0.write().remove(k)
}
}

View File

@@ -621,11 +621,12 @@ mod tests {
bodies::test_utils::{insert_headers, zip_blocks},
test_utils::{generate_bodies, TestBodiesClient},
};
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::B256;
use assert_matches::assert_matches;
use reth_consensus::test_utils::TestConsensus;
use reth_provider::test_utils::create_test_provider_factory;
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
use std::collections::HashMap;
// Check that the blocks are emitted in order of block number, not in order of
// first-downloaded
@@ -673,7 +674,7 @@ mod tests {
let bodies = blocks
.into_iter()
.map(|block| (block.hash(), block.into_body()))
.collect::<B256Map<_>>();
.collect::<HashMap<_, _>>();
insert_headers(&factory, &headers);

View File

@@ -3,7 +3,7 @@
#![allow(dead_code)]
use alloy_consensus::BlockHeader;
use alloy_primitives::map::B256Map;
use alloy_primitives::B256;
use reth_ethereum_primitives::BlockBody;
use reth_network_p2p::bodies::response::BlockResponse;
use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
@@ -11,10 +11,11 @@ use reth_provider::{
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderFactory, StaticFileSegment,
StaticFileWriter,
};
use std::collections::HashMap;
pub(crate) fn zip_blocks<'a, B: Block>(
headers: impl Iterator<Item = &'a SealedHeader<B::Header>>,
bodies: &mut B256Map<B::Body>,
bodies: &mut HashMap<B256, B::Body>,
) -> Vec<BlockResponse<B>> {
headers
.into_iter()
@@ -31,7 +32,7 @@ pub(crate) fn zip_blocks<'a, B: Block>(
pub(crate) fn create_raw_bodies(
headers: impl IntoIterator<Item = SealedHeader>,
bodies: &mut B256Map<BlockBody>,
bodies: &mut HashMap<B256, BlockBody>,
) -> Vec<reth_ethereum_primitives::Block> {
headers
.into_iter()

View File

@@ -704,7 +704,7 @@ mod tests {
FileClient::from_file(file.into(), NoopConsensus::arc())
.await
.unwrap()
.with_bodies(bodies.clone().into_iter().collect()),
.with_bodies(bodies.clone()),
);
let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
client.clone(),

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::B256;
use reth_ethereum_primitives::BlockBody;
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
@@ -7,6 +7,7 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use std::{
collections::HashMap,
fmt::Debug,
ops::RangeInclusive,
sync::{
@@ -20,7 +21,7 @@ use tokio::sync::Mutex;
/// A [`BodiesClient`] for testing.
#[derive(Debug, Default)]
pub struct TestBodiesClient {
bodies: Arc<Mutex<B256Map<BlockBody>>>,
bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
should_delay: bool,
max_batch_size: Option<usize>,
times_requested: AtomicU64,
@@ -28,7 +29,7 @@ pub struct TestBodiesClient {
}
impl TestBodiesClient {
pub(crate) fn with_bodies(mut self, bodies: B256Map<BlockBody>) -> Self {
pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
self.bodies = Arc::new(Mutex::new(bodies));
self
}

View File

@@ -4,10 +4,10 @@
#[cfg(any(test, feature = "file-client"))]
use crate::{bodies::test_utils::create_raw_bodies, file_codec::BlockFileCodec};
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::B256;
use reth_ethereum_primitives::BlockBody;
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
use std::ops::RangeInclusive;
use std::{collections::HashMap, ops::RangeInclusive};
mod bodies_client;
pub use bodies_client::TestBodiesClient;
@@ -19,7 +19,7 @@ pub(crate) const TEST_SCOPE: &str = "downloaders.test";
/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
range: RangeInclusive<u64>,
) -> (Vec<SealedHeader>, B256Map<BlockBody>) {
) -> (Vec<SealedHeader>, HashMap<B256, BlockBody>) {
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
@@ -38,7 +38,7 @@ pub(crate) fn generate_bodies(
#[cfg(any(test, feature = "file-client"))]
pub(crate) async fn generate_bodies_file(
range: RangeInclusive<u64>,
) -> (tokio::fs::File, Vec<SealedHeader>, B256Map<BlockBody>) {
) -> (tokio::fs::File, Vec<SealedHeader>, HashMap<B256, BlockBody>) {
use futures::SinkExt;
use std::io::SeekFrom;
use tokio::{fs::File, io::AsyncSeekExt};

View File

@@ -8,13 +8,13 @@ use crate::{
};
use alloy_consensus::Header;
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_eth_wire_types::HeadersDirection;
use reth_ethereum_primitives::{Block, BlockBody};
use reth_network_peers::{PeerId, WithPeerId};
use reth_primitives_traits::{SealedBlock, SealedHeader};
use std::{ops::RangeInclusive, sync::Arc};
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
/// A headers+bodies client that stores the headers and bodies in memory, with an artificial soft
/// bodies response limit that is set to 20 by default.
@@ -22,8 +22,8 @@ use std::{ops::RangeInclusive, sync::Arc};
/// This full block client can be [Clone]d and shared between multiple tasks.
#[derive(Clone, Debug)]
pub struct TestFullBlockClient {
headers: Arc<Mutex<B256Map<Header>>>,
bodies: Arc<Mutex<B256Map<BlockBody>>>,
headers: Arc<Mutex<HashMap<B256, Header>>>,
bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
// soft response limit, max number of bodies to respond with
soft_limit: usize,
}
@@ -31,8 +31,8 @@ pub struct TestFullBlockClient {
impl Default for TestFullBlockClient {
fn default() -> Self {
Self {
headers: Arc::new(Mutex::new(B256Map::default())),
bodies: Arc::new(Mutex::new(B256Map::default())),
headers: Arc::new(Mutex::new(HashMap::default())),
bodies: Arc::new(Mutex::new(HashMap::default())),
soft_limit: 20,
}
}

View File

@@ -251,8 +251,6 @@ impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
}
/// Creates a preconfigured node for testing purposes with a specific datadir.
///
/// The entire `datadir` will be cleaned up when the node is dropped.
#[cfg(feature = "test-utils")]
pub fn testing_node_with_datadir(
mut self,
@@ -270,7 +268,7 @@ impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
let data_dir =
path.unwrap_or_chain_default(self.config.chain.chain(), self.config.datadir.clone());
let db = reth_db::test_utils::create_test_rw_db_with_datadir(data_dir.data_dir());
let db = reth_db::test_utils::create_test_rw_db_with_path(data_dir.db());
WithLaunchContext { builder: self.with_database(db), task_executor }
}

View File

@@ -1,7 +1,7 @@
//! Pool component for the node builder.
use crate::{BuilderContext, FullNodeTypes};
use alloy_primitives::map::AddressSet;
use alloy_primitives::Address;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::EthereumHardforks;
use reth_node_api::{BlockTy, NodeTypes, TxTy};
@@ -9,7 +9,7 @@ use reth_transaction_pool::{
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
};
use std::future::Future;
use std::{collections::HashSet, future::Future};
/// A type that knows how to build the transaction pool.
pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
@@ -62,7 +62,7 @@ pub struct PoolBuilderConfigOverrides {
/// Minimum base fee required by the protocol.
pub minimal_protocol_basefee: Option<u64>,
/// Addresses that will be considered as local. Above exemptions apply.
pub local_addresses: AddressSet,
pub local_addresses: HashSet<Address>,
/// Additional tasks to validate new transactions.
pub additional_validation_tasks: Option<usize>,
}

View File

@@ -66,8 +66,8 @@ use reth_node_metrics::{
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
@@ -236,7 +236,7 @@ impl LaunchContext {
.map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
if let Err(err) = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("rayon-{i}"))
.thread_name(|i| format!("reth-rayon-{i}"))
.build_global()
{
warn!(%err, "Failed to build global thread pool")
@@ -507,10 +507,32 @@ where
.with_prune_modes(self.prune_modes())
.with_changeset_cache(changeset_cache);
// Check consistency between the database and static files, returning
// the unwind targets for each storage layer if inconsistencies are
// found.
let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
// Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the
// earliest consistent block.
//
// Order matters:
// 1) heal static files (no pruning)
// 2) check RocksDB (needs static-file tx data)
// 3) check static-file checkpoints vs MDBX (may prune)
//
// Compute one unwind target and run a single unwind.
let provider_ro = factory.database_provider_ro()?;
// Step 1: heal file-level inconsistencies (no pruning)
factory.static_file_provider().check_file_consistency(&provider_ro)?;
// Step 2: RocksDB consistency check (needs static files tx data)
let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?;
// Step 3: Static file checkpoint consistency (may prune)
let static_file_unwind = factory
.static_file_provider()
.check_consistency(&provider_ro)?
.map(|target| match target {
PipelineTarget::Unwind(block) => block,
PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
});
// Take the minimum block number to ensure all storage layers are consistent.
let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();

View File

@@ -134,12 +134,6 @@ impl EngineNodeLauncher {
let node_config = ctx.node_config();
// Configure trace-level logging for specific block if requested
if let Some(block_num) = node_config.debug.trace_block {
info!(target: "reth::cli", block_num, "Trace-level logging enabled for newPayload block");
reth_tracing::runtime::set_trace_block(Some(block_num));
}
// We always assume that node is syncing after a restart
network_handle.update_sync_state(SyncState::Syncing);

View File

@@ -218,9 +218,9 @@ impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> DerefMut for FullNode<N
}
/// Helper type alias to define [`FullNode`] for a given [`Node`].
pub type FullNodeFor<N, DB = DatabaseEnv> =
pub type FullNodeFor<N, DB = Arc<DatabaseEnv>> =
FullNode<NodeAdapter<RethFullAdapter<DB, N>>, <N as Node<RethFullAdapter<DB, N>>>::AddOns>;
/// Helper type alias to define [`NodeHandle`] for a given [`Node`].
pub type NodeHandleFor<N, DB = DatabaseEnv> =
pub type NodeHandleFor<N, DB = Arc<DatabaseEnv>> =
NodeHandle<NodeAdapter<RethFullAdapter<DB, N>>, <N as Node<RethFullAdapter<DB, N>>>::AddOns>;

View File

@@ -1192,7 +1192,6 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
.pending_block_kind(self.config.pending_block_kind)
.raw_tx_forwarder(self.config.raw_tx_forwarder)
.evm_memory_limit(self.config.rpc_evm_memory_limit)
.force_blob_sidecar_upcasting(self.config.force_blob_sidecar_upcasting)
}
}

View File

@@ -108,13 +108,6 @@ pub struct DebugArgs {
/// the backfill, but did not yet receive any new blocks.
#[arg(long = "debug.startup-sync-state-idle", help_heading = "Debug")]
pub startup_sync_state_idle: bool,
/// Enable trace-level logging for a specific block during `engine_newPayload` processing.
///
/// This is useful for debugging block execution issues. Once the block is processed,
/// trace logging is automatically disabled (one-shot behavior).
#[arg(long = "debug.trace-block", help_heading = "Debug", value_name = "BLOCK_NUMBER")]
pub trace_block: Option<u64>,
}
impl Default for DebugArgs {
@@ -134,7 +127,6 @@ impl Default for DebugArgs {
healthy_node_rpc_url: None,
ethstats: None,
startup_sync_state_idle: false,
trace_block: None,
}
}
}

View File

@@ -1,10 +1,7 @@
//! clap [Args](clap::Args) for engine purposes
use clap::{builder::Resettable, Args};
use reth_engine_primitives::{
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
};
use reth_engine_primitives::{TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE};
use std::sync::OnceLock;
use crate::node_config::{
@@ -41,8 +38,6 @@ pub struct DefaultEngineValues {
disable_proof_v2: bool,
cache_metrics_disabled: bool,
enable_sparse_trie_as_cache: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
}
impl DefaultEngineValues {
@@ -184,18 +179,6 @@ impl DefaultEngineValues {
self.enable_sparse_trie_as_cache = v;
self
}
/// Set the sparse trie prune depth by default
pub const fn with_sparse_trie_prune_depth(mut self, v: usize) -> Self {
self.sparse_trie_prune_depth = v;
self
}
/// Set the maximum number of storage tries to retain after sparse trie pruning by default
pub const fn with_sparse_trie_max_storage_tries(mut self, v: usize) -> Self {
self.sparse_trie_max_storage_tries = v;
self
}
}
impl Default for DefaultEngineValues {
@@ -222,8 +205,6 @@ impl Default for DefaultEngineValues {
disable_proof_v2: false,
cache_metrics_disabled: false,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
}
}
}
@@ -355,14 +336,6 @@ pub struct EngineArgs {
/// Enable sparse trie as cache.
#[arg(long = "engine.enable-sparse-trie-as-cache", default_value_t = DefaultEngineValues::get_global().enable_sparse_trie_as_cache, conflicts_with = "disable_proof_v2")]
pub enable_sparse_trie_as_cache: bool,
/// Sparse trie prune depth.
#[arg(long = "engine.sparse-trie-prune-depth", default_value_t = DefaultEngineValues::get_global().sparse_trie_prune_depth, requires = "enable_sparse_trie_as_cache")]
pub sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after sparse trie pruning.
#[arg(long = "engine.sparse-trie-max-storage-tries", default_value_t = DefaultEngineValues::get_global().sparse_trie_max_storage_tries, requires = "enable_sparse_trie_as_cache")]
pub sparse_trie_max_storage_tries: usize,
}
#[allow(deprecated)]
@@ -390,8 +363,6 @@ impl Default for EngineArgs {
disable_proof_v2,
cache_metrics_disabled,
enable_sparse_trie_as_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
} = DefaultEngineValues::get_global().clone();
Self {
persistence_threshold,
@@ -419,8 +390,6 @@ impl Default for EngineArgs {
disable_proof_v2,
cache_metrics_disabled,
enable_sparse_trie_as_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
}
}
}
@@ -428,7 +397,7 @@ impl Default for EngineArgs {
impl EngineArgs {
/// Creates a [`TreeConfig`] from the engine arguments.
pub fn tree_config(&self) -> TreeConfig {
TreeConfig::default()
let mut config = TreeConfig::default()
.with_persistence_threshold(self.persistence_threshold)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
@@ -445,14 +414,21 @@ impl EngineArgs {
.with_always_process_payload_attributes_on_canonical_head(
self.always_process_payload_attributes_on_canonical_head,
)
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
.with_storage_worker_count_opt(self.storage_worker_count)
.with_account_worker_count_opt(self.account_worker_count)
.with_disable_proof_v2(self.disable_proof_v2)
.without_cache_metrics(self.cache_metrics_disabled)
.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache)
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
.with_unwind_canonical_header(self.allow_unwind_canonical_header);
if let Some(count) = self.storage_worker_count {
config = config.with_storage_worker_count(count);
}
if let Some(count) = self.account_worker_count {
config = config.with_account_worker_count(count);
}
config = config.with_disable_proof_v2(self.disable_proof_v2);
config = config.without_cache_metrics(self.cache_metrics_disabled);
config = config.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache);
config
}
}
@@ -503,9 +479,7 @@ mod tests {
account_worker_count: Some(8),
disable_proof_v2: false,
cache_metrics_disabled: true,
enable_sparse_trie_as_cache: true,
sparse_trie_prune_depth: 10,
sparse_trie_max_storage_tries: 100,
enable_sparse_trie_as_cache: false,
};
let parsed_args = CommandParser::<EngineArgs>::parse_from([
@@ -536,11 +510,6 @@ mod tests {
"--engine.account-worker-count",
"8",
"--engine.disable-cache-metrics",
"--engine.enable-sparse-trie-as-cache",
"--engine.sparse-trie-prune-depth",
"10",
"--engine.sparse-trie-max-storage-tries",
"100",
])
.args;

View File

@@ -8,13 +8,9 @@ use reth_tracing::{
};
use std::{fmt, fmt::Display};
use tracing::{level_filters::LevelFilter, Level};
/// Constant to convert megabytes to bytes
const MB_TO_BYTES: u64 = 1024 * 1024;
const PROFILER_TRACING_FILTER: &str =
"info,engine=debug,trie=debug,providers=debug,rpc=debug,sync=debug,pruner=debug";
/// The log configuration.
#[derive(Debug, Args)]
#[command(next_help_heading = "Logging")]
@@ -74,7 +70,7 @@ pub struct LogArgs {
long = "log.samply.filter",
value_name = "FILTER",
global = true,
default_value = PROFILER_TRACING_FILTER,
default_value = "debug",
hide = true
)]
pub samply_filter: String,
@@ -88,7 +84,7 @@ pub struct LogArgs {
long = "log.tracy.filter",
value_name = "FILTER",
global = true,
default_value = PROFILER_TRACING_FILTER,
default_value = "debug",
hide = true
)]
pub tracy_filter: String,

View File

@@ -6,7 +6,7 @@ use clap::{builder::RangedU64ValueParser, Args};
use reth_chainspec::EthereumHardforks;
use reth_config::config::PruneConfig;
use reth_prune_types::{
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE,
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_UNWIND_SAFE_DISTANCE,
};
use std::{collections::BTreeMap, ops::Not, sync::OnceLock};
@@ -81,7 +81,7 @@ impl Default for DefaultPruningValues {
minimal_prune_modes: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: Some(PruneMode::Full),
receipts: Some(PruneMode::Distance(MINIMUM_DISTANCE)),
receipts: Some(PruneMode::Full),
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
bodies_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),

View File

@@ -4,7 +4,7 @@ use crate::args::{
types::{MaxU32, ZeroAsNoneU64},
GasPriceOracleArgs, RpcStateCacheArgs,
};
use alloy_primitives::map::AddressSet;
use alloy_primitives::Address;
use alloy_rpc_types_engine::JwtSecret;
use clap::{
builder::{PossibleValue, RangedU64ValueParser, Resettable, TypedValueParser},
@@ -15,6 +15,7 @@ use reth_cli_util::{parse_duration_from_secs_or_ms, parse_ether_value};
use reth_rpc_eth_types::builder::config::PendingBlockKind;
use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
use std::{
collections::HashSet,
ffi::OsStr,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
@@ -88,7 +89,7 @@ pub struct DefaultRpcServerArgs {
rpc_proof_permits: usize,
rpc_pending_block: PendingBlockKind,
rpc_forwarder: Option<Url>,
builder_disallow: Option<AddressSet>,
builder_disallow: Option<HashSet<Address>>,
rpc_state_cache: RpcStateCacheArgs,
gas_price_oracle: GasPriceOracleArgs,
rpc_send_raw_transaction_sync_timeout: Duration,
@@ -334,7 +335,7 @@ impl DefaultRpcServerArgs {
}
/// Set the default builder disallow addresses
pub fn with_builder_disallow(mut self, v: Option<AddressSet>) -> Self {
pub fn with_builder_disallow(mut self, v: Option<HashSet<Address>>) -> Self {
self.builder_disallow = v;
self
}
@@ -620,8 +621,8 @@ pub struct RpcServerArgs {
/// Path to file containing disallowed addresses, json-encoded list of strings. Block
/// validation API will reject blocks containing transactions from these addresses.
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<AddressSet>, default_value = Resettable::from(DefaultRpcServerArgs::get_global().builder_disallow.as_ref().map(|v| format!("{:?}", v).into())))]
pub builder_disallow: Option<AddressSet>,
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<HashSet<Address>>, default_value = Resettable::from(DefaultRpcServerArgs::get_global().builder_disallow.as_ref().map(|v| format!("{:?}", v).into())))]
pub builder_disallow: Option<HashSet<Address>>,
/// State cache configuration.
#[command(flatten)]
@@ -646,14 +647,6 @@ pub struct RpcServerArgs {
/// transactions from the same sender will also be skipped.
#[arg(long = "testing.skip-invalid-transactions", default_value_t = true)]
pub testing_skip_invalid_transactions: bool,
/// Force upcasting EIP-4844 blob sidecars to EIP-7594 format when Osaka is active.
///
/// When enabled, blob transactions submitted via `eth_sendRawTransaction` with EIP-4844
/// sidecars will be automatically converted to EIP-7594 format if the next block is Osaka.
/// By default this is disabled, meaning transactions are submitted as-is.
#[arg(long = "rpc.force-blob-sidecar-upcasting", default_value_t = false)]
pub rpc_force_blob_sidecar_upcasting: bool,
}
impl RpcServerArgs {
@@ -775,12 +768,6 @@ impl RpcServerArgs {
self.rpc_send_raw_transaction_sync_timeout = timeout;
self
}
/// Enables forced blob sidecar upcasting from EIP-4844 to EIP-7594 format.
pub const fn with_force_blob_sidecar_upcasting(mut self) -> Self {
self.rpc_force_blob_sidecar_upcasting = true;
self
}
}
impl Default for RpcServerArgs {
@@ -873,7 +860,6 @@ impl Default for RpcServerArgs {
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
testing_skip_invalid_transactions: true,
rpc_force_blob_sidecar_upcasting: false,
}
}
}
@@ -1050,7 +1036,6 @@ mod tests {
},
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
testing_skip_invalid_transactions: true,
rpc_force_blob_sidecar_upcasting: false,
};
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([

View File

@@ -21,7 +21,7 @@ alloy-primitives.workspace = true
alloy-consensus.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
futures-util.workspace = true
tokio-stream.workspace = true

View File

@@ -37,7 +37,7 @@ tempfile = { workspace = true, optional = true }
tikv-jemalloc-ctl = { workspace = true, optional = true, features = ["stats"] }
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.18.0"
procfs = "0.17.0"
[dev-dependencies]
reqwest.workspace = true

View File

@@ -37,7 +37,7 @@ pub use commands::{import::ImportOpCommand, import_receipts::ImportReceiptsOpCom
use reth_optimism_chainspec::OpChainSpec;
use reth_rpc_server_types::{DefaultRpcModuleValidator, RpcModuleValidator};
use std::{ffi::OsString, fmt, marker::PhantomData};
use std::{ffi::OsString, fmt, marker::PhantomData, sync::Arc};
use chainspec::OpChainSpecParser;
use clap::Parser;
@@ -121,7 +121,7 @@ where
/// [`NodeCommand`](reth_cli_commands::node::NodeCommand).
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
where
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
Fut: Future<Output = eyre::Result<()>>,
{
self.with_runner(CliRunner::try_default_runtime()?, launcher)
@@ -130,7 +130,7 @@ where
/// Execute the configured cli command with the provided [`CliRunner`].
pub fn with_runner<L, Fut>(self, runner: CliRunner, launcher: L) -> eyre::Result<()>
where
L: FnOnce(WithLaunchContext<NodeBuilder<DatabaseEnv, C::ChainSpec>>, Ext) -> Fut,
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
Fut: Future<Output = eyre::Result<()>>,
{
let mut this = self.configure();

View File

@@ -293,11 +293,7 @@ mod tests {
use alloy_consensus::{Header, Receipt};
use alloy_eips::eip7685::Requests;
use alloy_genesis::Genesis;
use alloy_primitives::{
bytes,
map::{AddressMap, B256Map, HashMap},
Address, LogData, B256,
};
use alloy_primitives::{bytes, map::HashMap, Address, LogData, B256};
use op_revm::OpSpecId;
use reth_chainspec::ChainSpec;
use reth_evm::execute::ProviderError;
@@ -592,12 +588,12 @@ mod tests {
);
// Create a BundleStateInit object and insert initial data
let mut state_init: BundleStateInit = AddressMap::default();
let mut state_init: BundleStateInit = HashMap::default();
state_init
.insert(Address::new([2; 20]), (None, Some(Account::default()), B256Map::default()));
.insert(Address::new([2; 20]), (None, Some(Account::default()), HashMap::default()));
// Create an AddressMap for account reverts and insert initial data
let mut revert_inner: AddressMap<AccountRevertInit> = AddressMap::default();
// Create a HashMap for account reverts and insert initial data
let mut revert_inner: HashMap<Address, AccountRevertInit> = HashMap::default();
revert_inner.insert(Address::new([2; 20]), (None, vec![]));
// Create a RevertsInit object and insert the revert_inner data

View File

@@ -38,7 +38,7 @@ op-alloy-rpc-types-engine = { workspace = true, features = ["k256"] }
# io
tokio.workspace = true
tokio-tungstenite.workspace = true
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
serde_json.workspace = true
url.workspace = true
futures-util.workspace = true

Some files were not shown because too many files have changed in this diff Show More