Compare commits

...

25 Commits

Author SHA1 Message Date
Dan Cline
9775e2f682 wip: parallel save_blocks, dashmap keys, cache, big block bench 2026-01-14 17:27:10 +00:00
Dan Cline
c7faab0867 feat(reth-bench): add generate-big-block command
Adds a new reth-bench subcommand that generates large blocks by:
1. Fetching transactions from existing blocks via RPC
2. Packing them until target gas is reached
3. Calling engine_getPayloadV3Hacked or V4 to build the payload
4. Optionally executing via newPayload + forkchoiceUpdated

Usage:
  reth-bench generate-big-block \
    --rpc-url http://localhost:8545 \
    --engine-rpc-url http://localhost:8551 \
    --jwt-secret ~/.local/share/reth/mainnet/jwt.hex \
    --target-gas 30000000 \
    --execute
2026-01-14 17:27:10 +00:00
Arsenii Kulikov
edf67c1375 clone 2026-01-14 11:39:21 +00:00
Arsenii Kulikov
8e595694a8 wip 2026-01-14 10:25:07 +00:00
joshieDo
40ea5938fc fix 2026-01-13 17:47:12 +00:00
joshieDo
55e0f3d0db touch-ups 2026-01-13 16:48:46 +00:00
joshieDo
ad4998d473 rm Tx: Sync bound 2026-01-13 16:48:46 +00:00
joshieDo
cf89d57395 parallelize save_blocks 2026-01-13 16:48:46 +00:00
Dan Cline
3be8671028 feat(reth-bench): create gas ramp-up bench command 2026-01-13 14:10:08 +00:00
Alexey Shekhirin
9e3720d105 perf(engine): clear cache on hash mismatch instead of creating new 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
8c3c753208 fix split usage 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
6bf64ba793 Merge remote-tracking branch 'origin/main' into alexey/execution-cache-fixed-cache 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
701301724f use sizes in bytes for cache size 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
d7edca95c8 nits 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
b36d588779 fix imports 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
a1341ec875 Merge remote-tracking branch 'origin/main' into alexey/execution-cache-fixed-cache 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
7e93256b01 fixed cache metrics 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
34f0e138f5 remove selfdestruct handling 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
90b51e960e use 64k sizes for caches 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
33236d7bdf check is_prewarm like before 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
64f9ca4d7b handle wipe with dashmap (bad) 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
fa149fcd8c more get_or_try_insert methods 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
50a777e6eb Merge remote-tracking branch 'origin/main' into alexey/execution-cache-fixed-cache 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
bb034ee406 timestamped storage slots for invalidation 2026-01-13 00:12:01 +00:00
Alexey Shekhirin
bf4a697bbb use fixed-cache for accounts and storages 2026-01-13 00:12:01 +00:00
47 changed files with 3076 additions and 1207 deletions

709
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -497,33 +497,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.4.1", default-features = false }
alloy-contract = { version = "1.4.1", default-features = false }
alloy-eips = { version = "1.4.1", default-features = false }
alloy-genesis = { version = "1.4.1", default-features = false }
alloy-json-rpc = { version = "1.4.1", default-features = false }
alloy-network = { version = "1.4.1", default-features = false }
alloy-network-primitives = { version = "1.4.1", default-features = false }
alloy-provider = { version = "1.4.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.1", default-features = false }
alloy-rpc-client = { version = "1.4.1", default-features = false }
alloy-rpc-types = { version = "1.4.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.1", default-features = false }
alloy-rpc-types-debug = { version = "1.4.1", default-features = false }
alloy-rpc-types-engine = { version = "1.4.1", default-features = false }
alloy-rpc-types-eth = { version = "1.4.1", default-features = false }
alloy-rpc-types-mev = { version = "1.4.1", default-features = false }
alloy-rpc-types-trace = { version = "1.4.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.1", default-features = false }
alloy-serde = { version = "1.4.1", default-features = false }
alloy-signer = { version = "1.4.1", default-features = false }
alloy-signer-local = { version = "1.4.1", default-features = false }
alloy-transport = { version = "1.4.1" }
alloy-transport-http = { version = "1.4.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.1", default-features = false }
alloy-transport-ws = { version = "1.4.1", default-features = false }
alloy-consensus = { version = "1.4.0", default-features = false }
alloy-contract = { version = "1.4.0", default-features = false }
alloy-eips = { version = "1.4.0", default-features = false }
alloy-genesis = { version = "1.4.0", default-features = false }
alloy-json-rpc = { version = "1.4.0", default-features = false }
alloy-network = { version = "1.4.0", default-features = false }
alloy-network-primitives = { version = "1.4.0", default-features = false }
alloy-provider = { version = "1.4.0", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.0", default-features = false }
alloy-rpc-client = { version = "1.4.0", default-features = false }
alloy-rpc-types = { version = "1.4.0", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.0", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.0", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.0", default-features = false }
alloy-rpc-types-debug = { version = "1.4.0", default-features = false }
alloy-rpc-types-engine = { version = "1.4.0", default-features = false }
alloy-rpc-types-eth = { version = "1.4.0", default-features = false }
alloy-rpc-types-mev = { version = "1.4.0", default-features = false }
alloy-rpc-types-trace = { version = "1.4.0", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.0", default-features = false }
alloy-serde = { version = "1.4.0", default-features = false }
alloy-signer = { version = "1.4.0", default-features = false }
alloy-signer-local = { version = "1.4.0", default-features = false }
alloy-transport = { version = "1.4.0" }
alloy-transport-http = { version = "1.4.0", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.0", default-features = false }
alloy-transport-ws = { version = "1.4.0", default-features = false }
# op
alloy-op-evm = { version = "0.25.0", default-features = false }
@@ -588,7 +588,7 @@ tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
fixed-cache = { version = "0.1.4", features = ["stats"] }
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
@@ -746,34 +746,37 @@ vergen-git2 = "1.0.5"
# networking
ipnet = "2.11"
# [patch.crates-io]
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-network-primitives = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-admin = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-debug = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-eth = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-mev = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-rpc-types-txpool = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-serde = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-signer-local = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-transport = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
[patch.crates-io]
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-consensus-any = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-network-primitives = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-admin = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-any = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-debug = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-eth = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-mev = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-rpc-types-txpool = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-serde = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-signer-local = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-transport = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
alloy-tx-macros = { git = "https://github.com/alloy-rs/alloy", rev = "82055ca44d660508a3614a1fb9acfd9ece2c24d7" }
# op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }
# op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", rev = "a79d6fc" }

27
analyze_pinned_thread.py Normal file
View File

@@ -0,0 +1,27 @@
from playwright.sync_api import sync_playwright
import time
# The pinned thread URL - switch to call tree view
BASE_URL = "https://profiler.firefox.com/public/q0h10v5sjtqstm2sj90azpzxpmhjd8r3agnx058"
CALL_TREE_URL = BASE_URL + "/calltree/?globalTrackOrder=201&hiddenGlobalTracks=01&hiddenLocalTracksByPid=1153071.2-0wh~1153071.1-xgwxiz7z9wA5A7wAp&localTrackOrderByPid=1153071.1-xhxiz7z9wA5A7wApz8xjwz1Di0wxfz2wz6A6AqAswDfArDgDhxg&range=19841m4475&symbolServer=http%3A%2F%2F127.0.0.1%3A3001%2F24w2lg79pyph7f2pbcpqnhsfk1rrhp6yl4pr074&thread=E5&v=12"
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
print(f"Loading: {CALL_TREE_URL}")
page.goto(CALL_TREE_URL, timeout=60000)
page.wait_for_load_state('networkidle')
time.sleep(8) # Extra time for profile to load
page.screenshot(path='/tmp/pinned_thread_calltree.png', full_page=True)
print("Screenshot saved to /tmp/pinned_thread_calltree.png")
# Get the full page text
body_text = page.locator('body').inner_text()
print("\n" + "="*80)
print("FULL PAGE TEXT:")
print("="*80)
print(body_text[:8000])
browser.close()

59
analyze_profiler.py Normal file
View File

@@ -0,0 +1,59 @@
from playwright.sync_api import sync_playwright
import time
URLS = {
"slow_block_main": "https://share.firefox.dev/45a9S1G",
"normal_blocks": "https://share.firefox.dev/4j7tadO",
"pinned_thread": "https://share.firefox.dev/3LylcxY",
"proof_fetch_tasks": "https://share.firefox.dev/4pAZyac",
}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
for name, url in URLS.items():
print(f"\n{'='*60}")
print(f"PROFILE: {name}")
print(f"URL: {url}")
print('='*60)
page = browser.new_page()
page.goto(url, timeout=60000)
# Wait for the profiler to fully load
page.wait_for_load_state('networkidle')
time.sleep(5) # Extra time for JS rendering
# Get the final URL (profiler redirects)
final_url = page.url
print(f"Final URL: {final_url}")
# Take a screenshot
page.screenshot(path=f'/tmp/{name}.png', full_page=True)
print(f"Screenshot saved to /tmp/{name}.png")
# Try to extract call tree content
try:
# Look for the call tree table
call_tree = page.locator('.treeView, .call-tree, [class*="CallTree"], [class*="callTree"]').first
if call_tree:
text = call_tree.inner_text()
print(f"\nCall Tree Content (first 3000 chars):\n{text[:3000]}")
except Exception as e:
print(f"Could not extract call tree: {e}")
# Get visible text content
try:
body_text = page.locator('body').inner_text()
# Filter for relevant content
lines = body_text.split('\n')
relevant = [l for l in lines if any(kw in l.lower() for kw in
['%', 'reth', 'proof', 'trie', 'hash', 'fetch', 'state', 'root',
'rayon', 'tokio', 'engine', 'execute', 'sload', 'journal'])]
print(f"\nRelevant lines:\n" + '\n'.join(relevant[:100]))
except Exception as e:
print(f"Could not extract body: {e}")
page.close()
browser.close()

View File

@@ -17,21 +17,26 @@ workspace = true
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-engine-primitives.workspace = true
reth-ethereum-primitives.workspace = true
reth-fs-util.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-primitives-traits.workspace = true
reth-rpc-api.workspace = true
reth-tracing.workspace = true
reth-chainspec.workspace = true
# alloy
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-consensus.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-engine.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["kzg"] }
alloy-transport-http.workspace = true
alloy-transport-ipc.workspace = true
alloy-transport-ws.workspace = true

View File

@@ -0,0 +1,220 @@
//! Benchmarks empty block processing by ramping the block gas limit.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState, JwtSecret};
use clap::Parser;
use reqwest::Url;
use reth_chainspec::{ChainSpec, NamedChain, DEV, HOLESKY, HOODI, MAINNET, SEPOLIA};
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use std::{path::PathBuf, sync::Arc, time::Instant};
use tracing::{debug, info};
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
/// Number of blocks to generate.
#[arg(long, value_name = "BLOCKS")]
blocks: u64,
/// The Engine API RPC URL.
#[arg(long = "engine-rpc-url", value_name = "ENGINE_RPC_URL")]
engine_rpc_url: String,
/// Path to the JWT secret for Engine API authentication.
#[arg(long = "jwt-secret", value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Optional output directory for benchmark results.
#[arg(long, value_name = "OUTPUT")]
output: Option<PathBuf>,
}
/// Map a chain ID to a known chain spec.
fn chain_spec_from_id(chain_id: u64) -> Option<Arc<ChainSpec>> {
match NamedChain::try_from(chain_id).ok()? {
NamedChain::Mainnet => Some(MAINNET.clone()),
NamedChain::Sepolia => Some(SEPOLIA.clone()),
NamedChain::Holesky => Some(HOLESKY.clone()),
NamedChain::Hoodi => Some(HOODI.clone()),
NamedChain::Dev => Some(DEV.clone()),
_ => None,
}
}
impl Command {
/// Execute `benchmark gas-limit-ramp` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
if self.blocks == 0 {
return Err(eyre::eyre!("--blocks must be greater than 0"));
}
// Ensure output directory exists
if let Some(ref output) = self.output {
if output.is_file() {
return Err(eyre::eyre!("Output path must be a directory"));
}
if !output.exists() {
std::fs::create_dir_all(output)?;
info!("Created output directory: {:?}", output);
}
}
// Set up authenticated provider (used for both Engine API and eth_ methods)
let jwt = std::fs::read_to_string(&self.jwt_secret)?;
let jwt = JwtSecret::from_hex(jwt)?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_with(auth_transport).await?;
let provider = RootProvider::<AnyNetwork>::new(client);
// Get chain spec - required for fork detection
let chain_id = provider.get_chain_id().await?;
let chain_spec = chain_spec_from_id(chain_id)
.ok_or_else(|| eyre::eyre!("Unsupported chain id: {chain_id}"))?;
// Fetch the current head block as parent
let parent_block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
debug!(?parent_block, "Raw RPC parent block");
let (mut parent_header, mut parent_hash) = rpc_block_to_header(parent_block);
let start_block = parent_header.number + 1;
let end_block = start_block + self.blocks - 1;
debug!(?parent_header, "Converted parent header");
info!(start_block, end_block, "Starting gas limit ramp benchmark");
let mut next_block_number = start_block;
let mut results = Vec::new();
let total_benchmark_duration = Instant::now();
while next_block_number <= end_block {
let timestamp = parent_header.timestamp.saturating_add(1);
let request = prepare_payload_request(&chain_spec, timestamp, parent_hash);
let new_payload_version = request.new_payload_version;
let (payload, sidecar) = build_payload(&provider, request).await?;
let mut block =
payload.clone().try_into_block_with_sidecar::<TransactionSigned>(&sidecar)?;
let max_increase = max_gas_limit_increase(parent_header.gas_limit);
let gas_limit = next_gas_limit(parent_header.gas_limit, max_increase);
block.header.gas_limit = gas_limit;
let block_hash = block.header.hash_slow();
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
block.header.withdrawals_root,
Some(new_payload_version),
)?;
debug!(
target: "reth-bench",
block_number = block.header.number,
gas_limit,
max_increase,
"Sending empty payload"
);
let start = Instant::now();
call_new_payload(&provider, version, params).await?;
let new_payload_result =
NewPayloadResult { gas_used: block.header.gas_used, latency: start.elapsed() };
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let transaction_count = block.body.transactions.len() as u64;
let combined_result = CombinedResult {
block_number: block.header.number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
total_latency,
};
info!(%combined_result);
let gas_row = TotalGasRow {
block_number: block.header.number,
transaction_count,
gas_used: block.header.gas_used,
time: total_benchmark_duration.elapsed(),
};
results.push((gas_row, combined_result));
parent_header = block.header;
parent_hash = block_hash;
debug!(?parent_header, "RPC parent header");
next_block_number += 1;
}
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
if let Some(ref path) = self.output {
write_benchmark_results(path, &gas_output_results, combined_results)?;
}
let final_gas_limit = parent_header.gas_limit;
let gas_output = TotalGasOutput::new(gas_output_results)?;
info!(
total_duration=?gas_output.total_duration,
total_gas_used=?gas_output.total_gas_used,
blocks_processed=?gas_output.blocks_processed,
final_gas_limit,
"Total Ggas/s: {:.4}",
gas_output.total_gigagas_per_second()
);
Ok(())
}
}
const fn max_gas_limit_increase(parent_gas_limit: u64) -> u64 {
(parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR).saturating_sub(1)
}
fn next_gas_limit(parent_gas_limit: u64, max_increase: u64) -> u64 {
parent_gas_limit.saturating_add(max_increase).min(MAXIMUM_GAS_LIMIT_BLOCK)
}

View File

@@ -0,0 +1,673 @@
//! Command for generating large blocks by packing transactions from real blocks.
//!
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_packBlock` RPC endpoint.
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockId, BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
PayloadAttributes,
};
use alloy_transport::layers::RetryBackoffLayer;
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_rpc_api::TestingBuildBlockRequestV1;
use std::future::Future;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
/// A single transaction with its gas usage and raw encoded bytes.
#[derive(Debug, Clone)]
pub struct RawTransaction {
/// The gas used by the transaction (from receipt).
pub gas_used: u64,
/// The transaction type (e.g., 3 for EIP-4844 blob txs).
pub tx_type: u8,
/// The raw RLP-encoded transaction bytes.
pub raw: Bytes,
}
/// Abstraction over sources of transactions for big block generation.
///
/// Implementors provide transactions from different sources (RPC, database, files, etc.)
pub trait TransactionSource {
/// Fetch transactions from a specific block number.
///
/// Returns `Ok(None)` if the block doesn't exist.
/// Returns `Ok(Some((transactions, gas_used)))` with the block's transactions and total gas.
fn fetch_block_transactions(
&self,
block_number: u64,
) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
}
/// RPC-based transaction source that fetches from a remote node.
#[derive(Debug)]
pub struct RpcTransactionSource {
provider: RootProvider<AnyNetwork>,
}
impl RpcTransactionSource {
/// Create a new RPC transaction source.
pub fn new(provider: RootProvider<AnyNetwork>) -> Self {
Self { provider }
}
/// Create from an RPC URL with retry backoff.
pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let provider = RootProvider::<AnyNetwork>::new(client);
Ok(Self { provider })
}
}
impl TransactionSource for RpcTransactionSource {
async fn fetch_block_transactions(
&self,
block_number: u64,
) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
let block = self.provider.get_block_by_number(block_number.into()).full().await?;
let Some(block) = block else {
return Ok(None);
};
let receipts = self
.provider
.get_block_receipts(BlockId::Number(BlockNumberOrTag::Number(block_number)))
.await?
.ok_or_else(|| eyre::eyre!("No receipts found for block {}", block_number))?;
if receipts.len() != block.transactions.len() {
return Err(eyre::eyre!(
"Receipt count ({}) does not match transaction count ({}) for block {}",
receipts.len(),
block.transactions.len(),
block_number
));
}
let gas_used = block.header.gas_used;
let transactions = block
.transactions
.txns()
.zip(receipts.iter())
.map(|(tx, receipt)| {
let with_encoded = tx.inner.inner.clone().into_encoded();
RawTransaction {
gas_used: receipt.gas_used,
tx_type: tx.inner.ty(),
raw: with_encoded.encoded_bytes().clone(),
}
})
.collect();
Ok(Some((transactions, gas_used)))
}
}
/// Collects transactions from a source up to a target gas limit.
#[derive(Debug)]
pub struct TransactionCollector<S> {
source: S,
target_gas: u64,
}
impl<S: TransactionSource> TransactionCollector<S> {
/// Create a new transaction collector.
pub fn new(source: S, target_gas: u64) -> Self {
Self { source, target_gas }
}
/// Collect transactions starting from the given block number.
///
/// Skips blob transactions (type 3) and collects until target gas is reached.
/// Returns the collected raw transaction bytes, total gas collected, and the next block number.
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
let mut transactions: Vec<Bytes> = Vec::new();
let mut total_gas: u64 = 0;
let mut current_block = start_block;
info!(start_block = start_block, "Fetching transactions from blocks");
while total_gas < self.target_gas {
info!(
block = current_block,
total_gas = total_gas,
target_gas = self.target_gas,
progress_pct = (total_gas as f64 / self.target_gas as f64 * 100.0) as u32,
"Fetching block"
);
let Some((block_txs, block_gas_used)) =
self.source.fetch_block_transactions(current_block).await?
else {
warn!(block = current_block, "Block not found, stopping");
break;
};
let block_tx_count = block_txs.len();
let mut block_gas_added: u64 = 0;
let mut block_txs_added: usize = 0;
for tx in block_txs {
// Skip blob transactions (EIP-4844, type 3)
if tx.tx_type == 3 {
debug!("Skipping blob transaction");
continue;
}
if total_gas + tx.gas_used <= self.target_gas {
transactions.push(tx.raw);
total_gas += tx.gas_used;
block_gas_added += tx.gas_used;
block_txs_added += 1;
}
if total_gas >= self.target_gas {
break;
}
}
info!(
block = current_block,
block_txs = block_tx_count,
block_gas_used = block_gas_used,
added_txs = block_txs_added,
added_gas = block_gas_added,
total_txs = transactions.len(),
total_gas = total_gas,
"Processed block"
);
current_block += 1;
// Stop early if remaining gas is under 1M (close enough to target)
let remaining_gas = self.target_gas.saturating_sub(total_gas);
if remaining_gas < 1_000_000 {
info!(remaining_gas = remaining_gas, "Stopping: within 1M of target gas");
break;
}
}
info!(
total_txs = transactions.len(),
total_gas = total_gas,
next_block = current_block,
"Finished collecting transactions"
);
Ok((transactions, total_gas, current_block))
}
}
/// `reth bench generate-big-block` command
///
/// Generates a large block by fetching transactions from existing blocks and packing them
/// into a single block using the `testing_packBlock` RPC endpoint.
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC URL to use for fetching blocks (can be an external archive node).
#[arg(long, value_name = "RPC_URL")]
rpc_url: String,
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// The RPC URL for testing_packBlock calls (same node as engine, regular RPC port).
#[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
testing_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
#[arg(long, default_value = "false")]
execute: bool,
/// Number of payloads to generate. Each payload uses the previous as parent.
#[arg(long, default_value = "1")]
count: u64,
/// Number of transaction batches to prefetch in background when count > 1.
/// Higher values reduce latency but use more memory.
#[arg(long, default_value = "4")]
prefetch_buffer: usize,
/// Output directory for generated payloads. Each payload is saved as payload_NNN.json.
#[arg(long, value_name = "OUTPUT_DIR")]
output_dir: std::path::PathBuf,
}
/// A built payload ready for execution.
struct BuiltPayload {
index: u64,
envelope: ExecutionPayloadEnvelopeV4,
block_hash: B256,
timestamp: u64,
}
impl Command {
/// Execute the `generate-big-block` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Set up testing RPC provider (for testing_packBlock)
info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
let testing_client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(self.testing_rpc_url.parse()?);
let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
// Get the parent block (latest canonical block)
info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
let parent_block = auth_provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let parent_hash = parent_block.header.hash;
let parent_number = parent_block.header.number;
let parent_timestamp = parent_block.header.timestamp;
info!(
parent_hash = %parent_hash,
parent_number = parent_number,
"Using initial parent block"
);
// Create output directory
std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {
self.execute_pipelined(
&auth_provider,
&testing_provider,
start_block,
parent_hash,
parent_timestamp,
)
.await?;
} else {
// Single payload - collect transactions and build
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
let collector = TransactionCollector::new(tx_source, self.target_gas);
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected"));
}
self.execute_sequential(
&auth_provider,
&testing_provider,
transactions,
parent_hash,
parent_timestamp,
)
.await?;
}
info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
Ok(())
}
/// Sequential execution path for single payload or no-execute mode.
async fn execute_sequential(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
transactions: Vec<Bytes>,
mut parent_hash: B256,
mut parent_timestamp: u64,
) -> eyre::Result<()> {
for i in 0..self.count {
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
"Building payload via testing_packBlock"
);
let built = self
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
.await?;
self.save_payload(&built)?;
if self.execute || self.count > 1 {
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Small delay to allow in-memory state to propagate before building next payload
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
parent_hash = built.block_hash;
parent_timestamp = built.timestamp;
}
Ok(())
}
/// Pipelined execution - fetches transactions and builds payloads in background.
async fn execute_pipelined(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
start_block: u64,
initial_parent_hash: B256,
initial_parent_timestamp: u64,
) -> eyre::Result<()> {
// Create channel for transaction batches (one batch per payload)
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
// Spawn background task to continuously fetch transaction batches
let rpc_url = self.rpc_url.clone();
let target_gas = self.target_gas;
let count = self.count;
let fetcher_handle = tokio::spawn(async move {
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
Ok(source) => source,
Err(e) => {
warn!(error = %e, "Failed to create transaction source");
return;
}
};
let collector = TransactionCollector::new(tx_source, target_gas);
let mut current_block = start_block;
for payload_idx in 0..count {
info!(
payload = payload_idx + 1,
start_block = current_block,
"Fetching transactions for payload"
);
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas = total_gas,
next_block = next_block,
"Fetched transactions for payload"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});
let mut parent_hash = initial_parent_hash;
let mut parent_timestamp = initial_parent_timestamp;
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
for i in 0..self.count {
let is_last = i == self.count - 1;
// Get current payload (either from pending build or build now)
let current_payload = if let Some(handle) = pending_build.take() {
handle.await??
} else {
// First payload - wait for transactions and build synchronously
let transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
tx_count = transactions.len(),
"Building payload via testing_packBlock"
);
self.build_payload(
testing_provider,
&transactions,
i,
parent_hash,
parent_timestamp,
)
.await?
};
self.save_payload(&current_payload)?;
let current_block_hash = current_payload.block_hash;
let current_timestamp = current_payload.timestamp;
// Execute current payload first
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Small delay to allow in-memory state to propagate before building next payload
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Start building next payload in background (if not last) - AFTER execution
if !is_last {
// Get transactions for next payload (should already be fetched or fetching)
let next_transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if next_transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
}
let testing_provider = testing_provider.clone();
let next_index = i + 1;
let total = self.count;
pending_build = Some(tokio::spawn(async move {
info!(
payload = next_index + 1,
total = total,
parent_hash = %current_block_hash,
parent_timestamp = current_timestamp,
tx_count = next_transactions.len(),
"Building payload via testing_packBlock"
);
Self::build_payload_static(
&testing_provider,
&next_transactions,
next_index,
current_block_hash,
current_timestamp,
)
.await
}));
}
parent_hash = current_block_hash;
parent_timestamp = current_timestamp;
}
// Clean up the fetcher task
drop(tx_receiver);
let _ = fetcher_handle.await;
Ok(())
}
/// Build a single payload via testing_packBlock.
async fn build_payload(
&self,
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
Self::build_payload_static(
testing_provider,
transactions,
index,
parent_hash,
parent_timestamp,
)
.await
}
/// Static version for use in spawned tasks.
async fn build_payload_static(
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
let request = TestingBuildBlockRequestV1 {
parent_block_hash: parent_hash,
payload_attributes: PayloadAttributes {
timestamp: parent_timestamp + 12,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
},
transactions: transactions.to_vec(),
extra_data: None,
};
info!(
endpoint = "testing",
method = "testing_packBlock",
payload = index + 1,
parent_hash = %parent_hash,
"RPC call"
);
let envelope: ExecutionPayloadEnvelopeV5 =
testing_provider.client().request("testing_packBlock", [request]).await?;
info!(payload = index + 1, "testing_packBlock response received, converting to V4 envelope");
let v4_envelope = envelope.try_into_v4()?;
let block_hash =
v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
let timestamp =
v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner.timestamp;
Ok(BuiltPayload { index, envelope: v4_envelope, block_hash, timestamp })
}
/// Save a payload to disk.
fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
let filename = format!("payload_{:03}.json", payload.index + 1);
let filepath = self.output_dir.join(&filename);
let json = serde_json::to_string_pretty(&payload.envelope)?;
std::fs::write(&filepath, &json)
.wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
info!(payload = payload.index + 1, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
Ok(())
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
info!(
endpoint = "engine",
method = "engine_newPayloadV4",
block_hash = %block_hash,
"RPC call"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload,
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(endpoint = "engine", method = "engine_newPayloadV4", ?status, "RPC response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
info!(
endpoint = "engine",
method = "engine_forkchoiceUpdatedV3",
head = %block_hash,
safe = %parent_hash,
finalized = %parent_hash,
"RPC call"
);
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(endpoint = "engine", method = "engine_forkchoiceUpdatedV3", ?fcu_result, "RPC response");
Ok(())
}
}

View File

@@ -0,0 +1,214 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, RootProvider};
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState,
PayloadAttributes, PayloadId, PraguePayloadFields,
};
use eyre::OptionExt;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_node_api::EngineApiMessageVersion;
use tracing::debug;
/// Prepared payload request data for triggering block building.
pub(crate) struct PayloadRequest {
/// The payload attributes for the new block.
pub(crate) attributes: PayloadAttributes,
/// The forkchoice state pointing to the parent block.
pub(crate) forkchoice_state: ForkchoiceState,
/// The engine API version for FCU calls.
pub(crate) fcu_version: EngineApiMessageVersion,
/// The getPayload version to use (1-5).
pub(crate) get_payload_version: u8,
/// The newPayload version to use.
pub(crate) new_payload_version: EngineApiMessageVersion,
}
/// Prepare payload attributes and forkchoice state for a new block.
pub(crate) fn prepare_payload_request(
chain_spec: &ChainSpec,
timestamp: u64,
parent_hash: B256,
) -> PayloadRequest {
let shanghai_active = chain_spec.is_shanghai_active_at_timestamp(timestamp);
let cancun_active = chain_spec.is_cancun_active_at_timestamp(timestamp);
let prague_active = chain_spec.is_prague_active_at_timestamp(timestamp);
let osaka_active = chain_spec.is_osaka_active_at_timestamp(timestamp);
// FCU version: V3 for Cancun+Prague+Osaka, V2 for Shanghai, V1 otherwise
let fcu_version = if cancun_active {
EngineApiMessageVersion::V3
} else if shanghai_active {
EngineApiMessageVersion::V2
} else {
EngineApiMessageVersion::V1
};
// getPayload version: 5 for Osaka, 4 for Prague, 3 for Cancun, 2 for Shanghai, 1 otherwise
// newPayload version: 4 for Prague+Osaka (no V5), 3 for Cancun, 2 for Shanghai, 1 otherwise
let (get_payload_version, new_payload_version) = if osaka_active {
(5, EngineApiMessageVersion::V4) // Osaka uses getPayloadV5 but newPayloadV4
} else if prague_active {
(4, EngineApiMessageVersion::V4)
} else if cancun_active {
(3, EngineApiMessageVersion::V3)
} else if shanghai_active {
(2, EngineApiMessageVersion::V2)
} else {
(1, EngineApiMessageVersion::V1)
};
PayloadRequest {
attributes: PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: shanghai_active.then(Vec::new),
parent_beacon_block_root: cancun_active.then_some(B256::ZERO),
},
forkchoice_state: ForkchoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
},
fcu_version,
get_payload_version,
new_payload_version,
}
}
/// Trigger payload building via FCU and retrieve the built payload.
///
/// This sends a forkchoiceUpdated with payload attributes to start building,
/// then calls getPayload to retrieve the result.
pub(crate) async fn build_payload(
provider: &RootProvider<AnyNetwork>,
request: PayloadRequest,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
let fcu_result = call_forkchoice_updated(
provider,
request.fcu_version,
request.forkchoice_state,
Some(request.attributes.clone()),
)
.await?;
let payload_id =
fcu_result.payload_id.ok_or_eyre("Payload builder did not return a payload id")?;
get_payload_with_sidecar(
provider,
request.get_payload_version,
payload_id,
request.attributes.parent_beacon_block_root,
)
.await
}
/// Convert an RPC block to a consensus header and block hash.
pub(crate) fn rpc_block_to_header(block: alloy_provider::network::AnyRpcBlock) -> (Header, B256) {
let block_hash = block.header.hash;
let header = block.header.inner.clone().into_header_with_defaults();
(header, block_hash)
}
/// Compute versioned hashes from KZG commitments.
fn versioned_hashes_from_commitments(
commitments: &[alloy_primitives::FixedBytes<48>],
) -> Vec<B256> {
commitments.iter().map(|c| kzg_to_versioned_hash(c.as_ref())).collect()
}
/// Fetch an execution payload using the appropriate engine API version.
pub(crate) async fn get_payload_with_sidecar(
provider: &RootProvider<AnyNetwork>,
version: u8,
payload_id: PayloadId,
parent_beacon_block_root: Option<B256>,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
let method = match version {
1 => "engine_getPayloadV1",
2 => "engine_getPayloadV2",
3 => "engine_getPayloadV3",
4 => "engine_getPayloadV4",
5 => "engine_getPayloadV5",
_ => return Err(eyre::eyre!("Unsupported getPayload version: {}", version)),
};
debug!(
target: "reth-bench",
method,
?payload_id,
"Sending getPayload"
);
match version {
1 => {
let payload = provider.get_payload_v1(payload_id).await?;
Ok((ExecutionPayload::V1(payload), ExecutionPayloadSidecar::none()))
}
2 => {
let envelope = provider.get_payload_v2(payload_id).await?;
let payload = match envelope.execution_payload {
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V1(p) => ExecutionPayload::V1(p),
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V2(p) => ExecutionPayload::V2(p),
};
Ok((payload, ExecutionPayloadSidecar::none()))
}
3 => {
let envelope = provider.get_payload_v3(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V3")?,
versioned_hashes,
};
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v3(cancun_fields),
))
}
4 => {
let envelope = provider.get_payload_v4(payload_id).await?;
let versioned_hashes = versioned_hashes_from_commitments(
&envelope.envelope_inner.blobs_bundle.commitments,
);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V4")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.envelope_inner.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
5 => {
// V5 (Osaka) - use raw request since alloy doesn't have get_payload_v5 yet
use alloy_provider::Provider;
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
let envelope: ExecutionPayloadEnvelopeV5 =
provider.client().request(method, (payload_id,)).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V5")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
_ => unreachable!(),
}
}

View File

@@ -6,9 +6,16 @@ use reth_node_core::args::LogArgs;
use reth_tracing::FileWorkerGuard;
mod context;
mod gas_limit_ramp;
mod generate_big_block;
pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
mod new_payload_fcu;
mod new_payload_only;
mod output;
mod replay_payloads;
mod send_payload;
/// `reth bench` command
@@ -27,6 +34,9 @@ pub enum Subcommands {
/// Benchmark which calls `newPayload`, then `forkchoiceUpdated`.
NewPayloadFcu(new_payload_fcu::Command),
/// Benchmark which builds empty blocks with a ramped gas limit.
GasLimitRamp(gas_limit_ramp::Command),
/// Benchmark which only calls subsequent `newPayload` calls.
NewPayloadOnly(new_payload_only::Command),
@@ -41,6 +51,29 @@ pub enum Subcommands {
/// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000
/// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)`
SendPayload(send_payload::Command),
/// Generate a large block by packing transactions from existing blocks.
///
/// This command fetches transactions from real blocks and packs them into a single
/// block using the `testing_packBlock` RPC endpoint.
///
/// Example:
///
/// `reth-bench generate-big-block --rpc-url http://localhost:8545 --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex --target-gas
/// 30000000`
GenerateBigBlock(generate_big_block::Command),
/// Replay pre-generated payloads from a directory.
///
/// This command reads payload files from a previous `generate-big-block` run and replays
/// them in sequence using `newPayload` followed by `forkchoiceUpdated`.
///
/// Example:
///
/// `reth-bench replay-payloads --payload-dir ./payloads --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex`
ReplayPayloads(replay_payloads::Command),
}
impl BenchmarkCommand {
@@ -51,8 +84,11 @@ impl BenchmarkCommand {
match self.command {
Subcommands::NewPayloadFcu(command) => command.execute(ctx).await,
Subcommands::GasLimitRamp(command) => command.execute(ctx).await,
Subcommands::NewPayloadOnly(command) => command.execute(ctx).await,
Subcommands::SendPayload(command) => command.execute(ctx).await,
Subcommands::GenerateBigBlock(command) => command.execute(ctx).await,
Subcommands::ReplayPayloads(command) => command.execute(ctx).await,
}
}

View File

@@ -13,8 +13,7 @@ use crate::{
bench::{
context::BenchContext,
output::{
CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
GAS_OUTPUT_SUFFIX,
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
@@ -27,7 +26,6 @@ use alloy_rpc_client::RpcClient;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
@@ -123,6 +121,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -188,6 +187,7 @@ impl Command {
result
} {
let gas_used = block.header.gas_used;
let gas_limit = block.header.gas_limit;
let block_number = block.header.number;
let transaction_count = block.transactions.len() as u64;
@@ -211,6 +211,7 @@ impl Command {
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
@@ -240,28 +241,11 @@ impl Command {
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
// Write CSV output files
if let Some(ref path) = self.benchmark.output {
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
write_benchmark_results(path, &gas_output_results, combined_results)?;
}
let gas_output = TotalGasOutput::new(gas_output_results)?;

View File

@@ -49,6 +49,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;

View File

@@ -1,10 +1,12 @@
//! Contains various benchmark output formats, either for logging or for
//! serialization to / from files.
use csv::Writer;
use eyre::OptionExt;
use reth_primitives_traits::constants::GIGAGAS;
use serde::{ser::SerializeStruct, Serialize};
use std::time::Duration;
use std::{path::Path, time::Duration};
use tracing::info;
/// This is the suffix for gas output csv files.
pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
@@ -67,6 +69,8 @@ impl Serialize for NewPayloadResult {
pub(crate) struct CombinedResult {
/// The block number of the block being processed.
pub(crate) block_number: u64,
/// The gas limit of the block.
pub(crate) gas_limit: u64,
/// The number of transactions in the block.
pub(crate) transaction_count: u64,
/// The `newPayload` result.
@@ -88,8 +92,9 @@ impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload {} processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
"Block {} (gas_limit: {}) at {:.4} Ggas/s, used {} gas. Combined: {:.4} Ggas/s. fcu: {:?}, newPayload: {:?}",
self.block_number,
self.gas_limit,
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
self.combined_gas_per_second() / GIGAGAS as f64,
@@ -110,10 +115,11 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 6)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_limit", &self.gas_limit)?;
state.serialize_field("transaction_count", &self.transaction_count)?;
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
@@ -167,6 +173,36 @@ impl TotalGasOutput {
}
}
/// Write benchmark results to CSV files.
///
/// Writes two files to the output directory:
/// - `combined_latency.csv`: Per-block latency results
/// - `total_gas.csv`: Per-block gas usage over time
pub(crate) fn write_benchmark_results(
output_dir: &Path,
gas_results: &[TotalGasRow],
combined_results: Vec<CombinedResult>,
) -> eyre::Result<()> {
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in gas_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", output_dir);
Ok(())
}
/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
///
/// This is essentially just for the csv writer, which would have headers

View File

@@ -0,0 +1,221 @@
//! Command for replaying pre-generated payloads from disk.
//!
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use std::path::PathBuf;
use tracing::{debug, info};
/// `reth bench replay-payloads` command
///
/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
/// `forkchoiceUpdated` for each payload in sequence.
#[derive(Debug, Parser)]
pub struct Command {
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Directory containing payload files (payload_001.json, payload_002.json, etc.).
#[arg(long, value_name = "PAYLOAD_DIR")]
payload_dir: PathBuf,
/// Optional limit on the number of payloads to replay.
/// If not specified, replays all payloads in the directory.
#[arg(long, value_name = "COUNT")]
count: Option<usize>,
/// Skip the first N payloads.
#[arg(long, value_name = "SKIP", default_value = "0")]
skip: usize,
}
/// A loaded payload ready for execution.
struct LoadedPayload {
/// The index (from filename).
index: u64,
/// The payload envelope.
envelope: ExecutionPayloadEnvelopeV4,
/// The block hash.
block_hash: B256,
}
impl Command {
/// Execute the `replay-payloads` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Get parent block (latest canonical block) - we need this for the first FCU
let parent_block = auth_provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let initial_parent_hash = parent_block.header.hash;
let initial_parent_number = parent_block.header.number;
info!(
parent_hash = %initial_parent_hash,
parent_number = initial_parent_number,
"Using initial parent block"
);
// Discover and load payloads
let payloads = self.load_payloads()?;
if payloads.is_empty() {
return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
}
info!(count = payloads.len(), "Loaded payloads from disk");
// Execute payloads in sequence
let mut parent_hash = initial_parent_hash;
for (i, payload) in payloads.iter().enumerate() {
info!(
payload = i + 1,
total = payloads.len(),
index = payload.index,
block_hash = %payload.block_hash,
"Executing payload (newPayload + FCU)"
);
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
parent_hash = payload.block_hash;
}
info!(count = payloads.len(), "All payloads replayed successfully");
Ok(())
}
/// Load and parse all payload files from the directory.
fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
let mut payloads = Vec::new();
// Read directory entries
let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_")
})
.collect();
// Parse filenames to get indices and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract index from "payload_NNN.json"
let index_str = name_str.strip_prefix("payload_")?.strip_suffix(".json")?;
let index: u64 = index_str.parse().ok()?;
Some((index, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(idx, _)| *idx);
// Apply skip and count
let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
let indexed_paths: Vec<_> = match self.count {
Some(count) => indexed_paths.into_iter().take(count).collect(),
None => indexed_paths,
};
// Load each payload
for (index, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
info!(
index = index,
block_hash = %block_hash,
path = %path.display(),
"Loaded payload"
);
payloads.push(LoadedPayload { index, envelope, block_hash });
}
Ok(payloads)
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: &ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(?status, "newPayloadV4 response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(?fcu_result, "forkchoiceUpdatedV3 response");
Ok(())
}
}

View File

@@ -3,15 +3,16 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated,
PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use tracing::error;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
#[async_trait::async_trait]
@@ -52,6 +53,14 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
target: "reth-bench",
method = "engine_forkchoiceUpdatedV1",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
@@ -82,6 +91,14 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
target: "reth-bench",
method = "engine_forkchoiceUpdatedV2",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
@@ -112,6 +129,14 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
target: "reth-bench",
method = "engine_forkchoiceUpdatedV3",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
@@ -148,33 +173,51 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
if let Some(prague) = sidecar.prague() {
// Use target version if provided (for Osaka), otherwise default to V4
let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
if is_optimism {
let withdrawals_root = withdrawals_root.ok_or_else(|| {
eyre::eyre!("Missing withdrawals root for Optimism payload")
})?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
OpExecutionPayloadV4 {
payload_inner: payload,
withdrawals_root: block.withdrawals_root.unwrap(),
},
OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
Requests::default(),
))?,
)
} else {
// Extract actual Requests from RequestsOrHash
let requests = prague
.requests
.requests()
.cloned()
.ok_or_else(|| eyre::eyre!("Prague sidecar has hash, not requests"))?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
payload,
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
prague.requests.requests_hash(),
requests,
))?,
)
}
@@ -217,6 +260,13 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
) -> TransportResult<()> {
let method = version.method_name();
debug!(
target: "reth-bench",
method,
params = %serde_json::to_string_pretty(&params).unwrap_or_default(),
"Sending newPayload"
);
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
@@ -237,12 +287,15 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
/// actual engine api message call.
///
/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
// FCU V3 is used for both Cancun and Prague (there is no FCU V4)
match message_version {
EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await

View File

@@ -460,6 +460,18 @@ impl ChainSpec {
pub fn builder() -> ChainSpecBuilder {
ChainSpecBuilder::default()
}
/// Map a chain ID to a known chain spec, if available.
pub fn from_chain_id(chain_id: u64) -> Option<Arc<Self>> {
match NamedChain::try_from(chain_id).ok()? {
NamedChain::Mainnet => Some(MAINNET.clone()),
NamedChain::Sepolia => Some(SEPOLIA.clone()),
NamedChain::Holesky => Some(HOLESKY.clone()),
NamedChain::Hoodi => Some(HOODI.clone()),
NamedChain::Dev => Some(DEV.clone()),
_ => None,
}
}
}
impl<H: BlockHeader> ChainSpec<H> {

View File

@@ -45,7 +45,7 @@ pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = 4 * 1024 * 1024 * 1024;
/// Determines if the host has enough parallelism to run the payload processor.
///
@@ -100,7 +100,7 @@ pub struct TreeConfig {
/// Whether to enable state provider metrics.
state_provider_metrics: bool,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
cross_block_cache_size: usize,
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
@@ -185,7 +185,7 @@ impl TreeConfig {
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
cross_block_cache_size: u64,
cross_block_cache_size: usize,
has_enough_parallelism: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
@@ -300,7 +300,7 @@ impl TreeConfig {
}
/// Returns the cross-block cache size.
pub const fn cross_block_cache_size(&self) -> u64 {
pub const fn cross_block_cache_size(&self) -> usize {
self.cross_block_cache_size
}
@@ -403,7 +403,7 @@ impl TreeConfig {
}
/// Setter for cross block cache size.
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: u64) -> Self {
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: usize) -> Self {
self.cross_block_cache_size = cross_block_cache_size;
self
}

View File

@@ -44,6 +44,8 @@ alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-rpc-types-engine.workspace = true
parking_lot.workspace = true
revm.workspace = true
revm-primitives.workspace = true
@@ -51,7 +53,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
fixed-cache.workspace = true
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
@@ -65,7 +67,6 @@ schnellru.workspace = true
rayon.workspace = true
tracing.workspace = true
derive_more.workspace = true
parking_lot.workspace = true
crossbeam-channel.workspace = true
# optional deps for test-utils

View File

@@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
@@ -151,7 +151,7 @@ where
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.commit()?;
}

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
ExecutionCacheBuilder, SavedCache,
FixedCacheMetrics, SavedCache,
},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
@@ -116,7 +116,7 @@ where
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
cross_block_cache_size: usize,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
@@ -299,7 +299,7 @@ where
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics) = saved_cache.split();
let (cache, metrics, _) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
@@ -474,8 +474,13 @@ where
cache
} else {
debug!("creating new execution cache on cache miss");
let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
let cache = crate::tree::cached_state::ExecutionCache::new(self.cross_block_cache_size);
SavedCache::new(
parent_hash,
cache,
CachedStateMetrics::zeroed(),
FixedCacheMetrics::zeroed(),
)
}
}
@@ -568,18 +573,22 @@ where
}
// Take existing cache (if any) or create fresh caches
let (caches, cache_metrics) = match cached.take() {
Some(existing) => {
existing.split()
}
let (caches, cache_metrics, fixed_cache_metrics) = match cached.take() {
Some(existing) => existing.split(),
None => (
ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
crate::tree::cached_state::ExecutionCache::new(self.cross_block_cache_size),
CachedStateMetrics::zeroed(),
FixedCacheMetrics::zeroed(),
),
};
// Insert the block's bundle state into cache
let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics);
let new_cache = SavedCache::new(
block_with_parent.block.hash,
caches,
cache_metrics,
fixed_cache_metrics,
);
if new_cache.cache().insert_state(bundle_state).is_err() {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
@@ -796,7 +805,10 @@ impl ExecutionCache {
"Existing cache found"
);
if hash_matches && available {
if available {
if !hash_matches {
c.clear();
}
return Some(c.clone());
}
} else {
@@ -862,7 +874,7 @@ where
mod tests {
use super::ExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
cached_state::{CachedStateMetrics, FixedCacheMetrics, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
@@ -891,8 +903,13 @@ mod tests {
use std::sync::Arc;
fn make_saved_cache(hash: B256) -> SavedCache {
let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
let execution_cache = crate::tree::cached_state::ExecutionCache::new(1_000);
SavedCache::new(
hash,
execution_cache,
CachedStateMetrics::zeroed(),
FixedCacheMetrics::zeroed(),
)
}
#[test]

View File

@@ -225,7 +225,7 @@ struct MultiproofInput {
proof_targets: MultiProofTargets,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
}
impl MultiproofInput {
@@ -608,9 +608,6 @@ impl MultiProofTask {
// we still want to optimistically fetch extension children for the leaf addition case.
self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
.prefetch_proof_targets_storages_histogram
@@ -636,7 +633,7 @@ impl MultiProofTask {
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
multi_added_removed_keys: Some(self.multi_added_removed_keys.clone()),
});
},
);
@@ -704,9 +701,6 @@ impl MultiProofTask {
state_updates += 1;
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();
let available_account_workers =
@@ -725,7 +719,7 @@ impl MultiProofTask {
let proof_targets = get_proof_targets(
&hashed_state_update,
&self.fetched_proof_targets,
&multi_added_removed_keys,
&self.multi_added_removed_keys,
);
spawned_proof_targets.extend_ref(&proof_targets);
@@ -735,7 +729,7 @@ impl MultiProofTask {
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
multi_added_removed_keys: Some(self.multi_added_removed_keys.clone()),
});
},
);
@@ -1217,7 +1211,7 @@ fn get_proof_targets(
.keys()
.filter(|slot| {
!fetched.is_some_and(|f| f.contains(*slot)) ||
storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
storage_added_removed_keys.as_ref().is_some_and(|k| k.is_removed(slot))
})
.peekable();
@@ -1272,8 +1266,9 @@ where
#[cfg(test)]
mod tests {
use crate::tree::cached_state::CachedStateProvider;
use super::*;
use crate::tree::cached_state::{CachedStateProvider, ExecutionCacheBuilder};
use alloy_eip7928::{AccountChanges, BalanceChange};
use alloy_primitives::Address;
use reth_provider::{
@@ -1331,7 +1326,7 @@ mod tests {
{
let db_provider = factory.database_provider_ro().unwrap();
let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
let cache = ExecutionCacheBuilder::default().build_caches(1000);
let cache = crate::tree::cached_state::ExecutionCache::new(1000);
CachedStateProvider::new(state_provider, cache, Default::default())
}

View File

@@ -272,8 +272,8 @@ where
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics);
let (caches, cache_metrics, fixed_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics, fixed_cache_metrics);
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome

View File

@@ -31,8 +31,8 @@ reth-payload-validator.workspace = true
# ethereum
alloy-rlp.workspace = true
revm.workspace = true
alloy-rpc-types-engine.workspace = true
revm.workspace = true
# alloy
alloy-eips.workspace = true

View File

@@ -24,7 +24,7 @@ pub struct DefaultEngineValues {
prewarming_disabled: bool,
parallel_sparse_trie_disabled: bool,
state_provider_metrics: bool,
cross_block_cache_size: u64,
cross_block_cache_size: usize,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
@@ -93,7 +93,7 @@ impl DefaultEngineValues {
}
/// Set the default cross-block cache size in MB
pub const fn with_cross_block_cache_size(mut self, v: u64) -> Self {
pub const fn with_cross_block_cache_size(mut self, v: usize) -> Self {
self.cross_block_cache_size = v;
self
}
@@ -254,7 +254,7 @@ pub struct EngineArgs {
/// Configure the size of cross-block cache in megabytes
#[arg(long = "engine.cross-block-cache-size", default_value_t = DefaultEngineValues::get_global().cross_block_cache_size)]
pub cross_block_cache_size: u64,
pub cross_block_cache_size: usize,
/// Enable comparing trie updates from the state root task to the trie updates from the regular
/// state root calculation.

View File

@@ -38,7 +38,7 @@ pub use reth_engine_primitives::{
};
/// Default size of cross-block cache in megabytes.
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: u64 = 4 * 1024;
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: usize = 4 * 1024;
/// This includes all necessary configuration to launch the node.
/// The individual configuration options can be overwritten before launching the node.

View File

@@ -228,7 +228,7 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
// finally, write the receipts
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes, true)?;
}
// Only commit if we have imported as many receipts as the number of transactions.

View File

@@ -42,4 +42,13 @@ pub trait TestingApi {
&self,
request: TestingBuildBlockRequestV1,
) -> jsonrpsee::core::RpcResult<ExecutionPayloadEnvelopeV5>;
/// Builds a block using the provided parent, payload attributes, and transactions.
///
/// Like testing_buildBlockV1 but skips invalid transactions.
#[method(name = "packBlock")]
async fn pack_block(
&self,
request: TestingBuildBlockRequestV1,
) -> jsonrpsee::core::RpcResult<ExecutionPayloadEnvelopeV5>;
}

View File

@@ -4,7 +4,7 @@
use alloy_consensus::{Header, Transaction};
use alloy_evm::Evm;
use alloy_primitives::U256;
use alloy_primitives::{map::HashSet, U256};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
@@ -17,9 +17,11 @@ use reth_revm::{database::StateProviderDatabase, db::State};
use reth_rpc_api::{TestingApiServer, TestingBuildBlockRequestV1};
use reth_rpc_eth_api::{helpers::Call, FromEthApiError};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_storage_api::{BlockReader, HeaderProvider};
use reth_storage_api::{BlockReader, HeaderProvider, StateProvider};
use revm::context::Block;
use revm_primitives::map::DefaultHashBuilder;
use std::sync::Arc;
use tracing::{debug, info};
/// Testing API handler.
#[derive(Debug, Clone)]
@@ -79,11 +81,40 @@ where
let mut total_fees = U256::ZERO;
let base_fee = builder.evm_mut().block().basefee();
for tx in request.transactions {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(&tx)?;
debug!(
target: "rpc::testing",
parent_hash = ?request.parent_block_hash,
parent_number = parent.number(),
num_transactions = request.transactions.len(),
?base_fee,
"Starting block build"
);
for (idx, tx) in request.transactions.iter().enumerate() {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(tx)?;
let sender = tx.signer();
let tx_nonce = tx.nonce();
let tx_hash = *tx.tx_hash();
let account_nonce = state
.account_nonce(&sender)
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default();
let gas_used =
builder.execute_transaction(tx).map_err(Eth::Error::from_eth_err)?;
let gas_used = builder.execute_transaction(tx).map_err(|e| {
debug!(
target: "rpc::testing",
tx_idx = idx,
?tx_hash,
?sender,
tx_nonce,
account_nonce = ?account_nonce,
error = ?e,
"Transaction execution failed"
);
Eth::Error::from_eth_err(e)
})?;
total_fees += U256::from(tip) * U256::from(gas_used);
}
@@ -107,6 +138,132 @@ where
})
.await
}
async fn pack_block(
&self,
request: TestingBuildBlockRequestV1,
) -> Result<ExecutionPayloadEnvelopeV5, Eth::Error> {
let evm_config = self.evm_config.clone();
self.eth_api
.spawn_with_state_at_block(request.parent_block_hash, move |eth_api, state| {
let state = state.database.0;
let mut db = State::builder()
.with_bundle_update()
.with_database(StateProviderDatabase::new(&state))
.build();
let parent = eth_api
.provider()
.sealed_header_by_hash(request.parent_block_hash)?
.ok_or_else(|| {
EthApiError::HeaderNotFound(request.parent_block_hash.into())
})?;
let env_attrs = NextBlockEnvAttributes {
timestamp: request.payload_attributes.timestamp,
suggested_fee_recipient: request.payload_attributes.suggested_fee_recipient,
prev_randao: request.payload_attributes.prev_randao,
gas_limit: parent.gas_limit(),
parent_beacon_block_root: request.payload_attributes.parent_beacon_block_root,
withdrawals: request.payload_attributes.withdrawals.map(Into::into),
extra_data: request.extra_data.unwrap_or_default(),
};
let mut builder = evm_config
.builder_for_next_block(&mut db, &parent, env_attrs)
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
builder.apply_pre_execution_changes().map_err(Eth::Error::from_eth_err)?;
let mut total_fees = U256::ZERO;
let base_fee = builder.evm_mut().block().basefee();
debug!(
target: "rpc::testing",
parent_hash = ?request.parent_block_hash,
parent_number = parent.number(),
num_transactions = request.transactions.len(),
?base_fee,
"Starting block build"
);
// Keeps track of invalid senders - if one tx is invalid then all future txs from
// that sender are invalid as well
let mut invalid_senders_map = HashSet::<_, DefaultHashBuilder>::default();
for (idx, tx) in request.transactions.iter().enumerate() {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(tx)?;
let sender = tx.signer();
let tx_nonce = tx.nonce();
let tx_hash = *tx.tx_hash();
if invalid_senders_map.contains(&sender) {
// skip any senders which we have ignored previously
continue;
}
let account_nonce = state
.account_nonce(&sender)
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default();
let gas_used = match builder.execute_transaction(tx) {
Err(err) => {
invalid_senders_map.insert(sender);
debug!(
target: "rpc::testing",
tx_idx = idx,
?tx_hash,
?sender,
tx_nonce,
account_nonce = ?account_nonce,
error = ?err,
"Transaction execution failed"
);
continue
}
Ok(gas_used) => gas_used,
};
total_fees += U256::from(tip) * U256::from(gas_used);
}
let outcome = builder.finish(&state).map_err(Eth::Error::from_eth_err)?;
let requests = outcome
.block
.requests_hash()
.is_some()
.then_some(outcome.execution_result.requests);
let sealed_block = outcome.block.into_sealed_block();
info!(
target: "rpc::testing",
header = ?sealed_block.header(),
block_hash = ?sealed_block.hash(),
"testing_packBlock built block"
);
let envelope = EthBuiltPayload::new(
alloy_rpc_types_engine::PayloadId::default(),
Arc::new(sealed_block),
total_fees,
requests,
)
.try_into_v5()
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
info!(
target: "rpc::testing",
blobs_bundle = ?envelope.blobs_bundle,
execution_requests = ?envelope.execution_requests,
"testing_packBlock payload envelope sidecar"
);
Ok(envelope)
})
.await
}
}
#[async_trait]
@@ -124,4 +281,13 @@ where
) -> RpcResult<ExecutionPayloadEnvelopeV5> {
self.build_block_v1(request).await.map_err(Into::into)
}
/// Handles `testing_packBlock` by gating concurrency via a semaphore and offloading heavy
/// work to the blocking pool to avoid stalling the async runtime.
async fn pack_block(
&self,
request: TestingBuildBlockRequestV1,
) -> RpcResult<ExecutionPayloadEnvelopeV5> {
self.pack_block(request).await.map_err(Into::into)
}
}

View File

@@ -463,7 +463,7 @@ where
}
// write output
provider.write_state(&state, OriginalValuesKnown::Yes)?;
provider.write_state(&state, OriginalValuesKnown::Yes, true)?;
let db_write_duration = time.elapsed();
debug!(

View File

@@ -334,7 +334,7 @@ where
Vec::new(),
);
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes, true)?;
trace!(target: "reth::cli", "Inserted state");

View File

@@ -225,6 +225,9 @@ pub enum StaticFileWriterError {
/// Cannot call `sync_all` or `finalize` when prune is queued.
#[error("cannot call sync_all or finalize when prune is queued, use commit() instead")]
FinalizeWithPruneQueued,
/// Thread panicked during execution.
#[error("thread panicked: {_0}")]
ThreadPanic(&'static str),
/// Other error with message.
#[error("{0}")]
Other(String),

View File

@@ -21,7 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
StaticFileAccess, StaticFileProviderBuilder, StaticFileWriter,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
};
pub mod changeset_walker;

View File

@@ -789,7 +789,7 @@ mod tests {
create_test_provider_factory, create_test_provider_factory_with_chain_spec,
MockNodeTypesWithDB,
},
BlockWriter, CanonChainTracker, ProviderFactory,
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode,
};
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, TxNumber, B256};
@@ -907,6 +907,7 @@ mod tests {
..Default::default()
},
OriginalValuesKnown::No,
true,
)?;
}
@@ -997,7 +998,7 @@ mod tests {
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![lowest_memory_block]).unwrap();
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
provider_rw.commit().unwrap();
// Remove from memory

View File

@@ -43,7 +43,7 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;

View File

@@ -5,7 +5,7 @@ use crate::{
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
static_file::StaticFileWriter,
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
to_range,
@@ -35,7 +35,7 @@ use alloy_primitives::{
use itertools::Itertools;
use parking_lot::RwLock;
use rayon::slice::ParallelSliceMut;
use reth_chain_state::ExecutedBlock;
use reth_chain_state::{ComputedTrieData, ExecutedBlock};
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
@@ -64,7 +64,7 @@ use reth_storage_api::{
NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
use reth_trie::{
trie_cursor::{
InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
@@ -85,6 +85,7 @@ use std::{
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
sync::Arc,
thread,
time::{Duration, Instant},
};
use tracing::{debug, trace};
@@ -150,6 +151,25 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
}
}
/// Mode for [`DatabaseProvider::save_blocks`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SaveBlocksMode {
/// Full mode: write block structure + receipts + state + trie.
/// Used by engine/production code.
Full,
/// Blocks only: write block structure (headers, txs, senders, indices).
/// Receipts/state/trie are skipped - they may come later via separate calls.
/// Used by `insert_block`.
BlocksOnly,
}
impl SaveBlocksMode {
/// Returns `true` if this is [`SaveBlocksMode::Full`].
pub const fn with_state(self) -> bool {
matches!(self, Self::Full)
}
}
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
pub struct DatabaseProvider<TX, N: NodeTypes> {
@@ -356,22 +376,74 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok(result)
}
/// Creates the context for static file writes.
fn static_file_write_ctx(
&self,
save_mode: SaveBlocksMode,
first_block: BlockNumber,
last_block: BlockNumber,
) -> ProviderResult<StaticFileWriteCtx> {
let tip = self.last_block_number()?.max(last_block);
Ok(StaticFileWriteCtx {
write_senders: EitherWriterDestination::senders(self).is_static_file() &&
self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
write_receipts: save_mode.with_state() &&
EitherWriter::receipts_destination(self).is_static_file(),
tip,
receipts_prune_mode: self.prune_modes.receipts,
// Receipts are prunable if no receipts exist in SF yet and within pruning distance
receipts_prunable: self
.static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Receipts)
.is_none() &&
PruneMode::Distance(self.minimum_pruning_distance)
.should_prune(first_block, tip),
})
}
/// Writes executed blocks and state to storage.
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
///
/// This method parallelizes static file (SF) writes with MDBX writes.
/// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
/// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
///
/// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
/// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
save_mode: SaveBlocksMode,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "providers::db", "Attempted to write empty block range");
return Ok(())
}
// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().recovered_block();
let last_block = blocks.last().unwrap().recovered_block();
let first_number = first_block.number();
let last_block_number = last_block.number();
let first_number = blocks.first().unwrap().recovered_block().number();
let last_block_number = blocks.last().unwrap().recovered_block().number();
debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
// Compute tx_nums upfront (both threads need these)
let start = Instant::now();
let first_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
self.metrics.record_duration(metrics::Action::GetNextTxNum, start.elapsed());
let tx_nums: Vec<TxNumber> = {
let mut nums = Vec::with_capacity(blocks.len());
let mut current = first_tx_num;
for block in &blocks {
nums.push(current);
current += block.recovered_block().body().transaction_count() as u64;
}
nums
};
// Accumulate durations for each step
let mut total_insert_block = Duration::ZERO;
let mut total_write_state = Duration::ZERO;
@@ -379,75 +451,176 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let mut total_write_trie_changesets = Duration::ZERO;
let mut total_write_trie_updates = Duration::ZERO;
// TODO: Do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * indices (already done basically)
// Insert the blocks
for block in blocks {
let trie_data = block.trie_data();
let ExecutedBlock { recovered_block, execution_output, .. } = block;
let block_number = recovered_block.number();
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
let start = Instant::now();
self.insert_block(&recovered_block)?;
total_insert_block += start.elapsed();
thread::scope(|s| {
// SF writes
let sf_handle = s.spawn(|| sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx));
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let start = Instant::now();
self.write_state(&execution_output, OriginalValuesKnown::No)?;
total_write_state += start.elapsed();
// MDBX writes
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
total_write_hashed_state += start.elapsed();
let start = Instant::now();
self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
total_insert_block += start.elapsed();
let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
total_write_trie_changesets += start.elapsed();
if save_mode.with_state() {
let execution_output = block.execution_outcome();
let block_number = recovered_block.number();
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let start = Instant::now();
self.write_state(
execution_output,
OriginalValuesKnown::No,
!sf_ctx.write_receipts,
)?;
total_write_state += start.elapsed();
let trie_data = block.trie_data();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
total_write_hashed_state += start.elapsed();
let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
total_write_trie_changesets += start.elapsed();
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
total_write_trie_updates += start.elapsed();
}
}
// Full mode: update history indices
let duration_update_history_indices = if save_mode.with_state() {
let start = Instant::now();
self.update_history_indices(first_number..=last_block_number)?;
start.elapsed()
} else {
Duration::ZERO
};
// Update pipeline progress
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
total_write_trie_updates += start.elapsed();
self.update_pipeline_stages(last_block_number, false)?;
let duration_update_pipeline_stages = start.elapsed();
// Record all metrics
self.metrics
.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
self.metrics.record_duration(
metrics::Action::SaveBlocksWriteHashedState,
total_write_hashed_state,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksWriteTrieChangesets,
total_write_trie_changesets,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksWriteTrieUpdates,
total_write_trie_updates,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdateHistoryIndices,
duration_update_history_indices,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdatePipelineStages,
duration_update_pipeline_stages,
);
// Wait for SF thread
sf_handle.join().map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
Ok(())
})
}
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
///
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
fn insert_block_mdbx_only(
&self,
block: &RecoveredBlock<BlockTy<N>>,
first_tx_num: TxNumber,
) -> ProviderResult<StoredBlockBodyIndices> {
if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
EitherWriterDestination::senders(self).is_database()
{
let start = Instant::now();
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
cursor.append(tx_num, &sender)?;
}
self.metrics
.record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
}
// update history indices
let block_number = block.number();
let tx_count = block.body().transaction_count() as u64;
let start = Instant::now();
self.update_history_indices(first_number..=last_block_number)?;
let duration_update_history_indices = start.elapsed();
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
// Update pipeline progress
// Write tx hash numbers to MDBX if not handled by RocksDB and not fully pruned
if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
{
let start = Instant::now();
let mut cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
let mut tx_num = first_tx_num;
for transaction in block.body().transactions_iter() {
cursor.upsert(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
self.metrics
.record_duration(metrics::Action::InsertTransactionHashNumbers, start.elapsed());
}
self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
}
/// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
/// `Ommers`/`Withdrawals`).
fn write_block_body_indices(
&self,
block_number: BlockNumber,
body: &BodyTy<N>,
first_tx_num: TxNumber,
tx_count: u64,
) -> ProviderResult<()> {
// MDBX: BlockBodyIndices
let start = Instant::now();
self.update_pipeline_stages(last_block_number, false)?;
let duration_update_pipeline_stages = start.elapsed();
self.tx
.cursor_write::<tables::BlockBodyIndices>()?
.append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
// Record all metrics at the end
self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
self.metrics
.record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state);
self.metrics.record_duration(
metrics::Action::SaveBlocksWriteTrieChangesets,
total_write_trie_changesets,
);
self.metrics
.record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdateHistoryIndices,
duration_update_history_indices,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdatePipelineStages,
duration_update_pipeline_stages,
);
// MDBX: TransactionBlocks (last tx -> block mapping)
if tx_count > 0 {
let start = Instant::now();
self.tx
.cursor_write::<tables::TransactionBlocks>()?
.append(first_tx_num + tx_count - 1, &block_number)?;
self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
}
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
// MDBX: Ommers/Withdrawals
self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
Ok(())
}
@@ -1821,13 +1994,9 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
&self,
execution_outcome: &ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown,
write_receipts: bool,
) -> ProviderResult<()> {
let first_block = execution_outcome.first_block();
let block_count = execution_outcome.len() as u64;
let last_block = execution_outcome.last_block();
let block_range = first_block..=last_block;
let tip = self.last_block_number()?.max(last_block);
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
@@ -1835,6 +2004,16 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
self.write_state_reverts(reverts, first_block)?;
self.write_state_changes(plain_state)?;
if !write_receipts {
return Ok(());
}
let block_count = execution_outcome.len() as u64;
let last_block = execution_outcome.last_block();
let block_range = first_block..=last_block;
let tip = self.last_block_number()?.max(last_block);
// Fetch the first transaction number for each block in the range
let block_indices: Vec<_> = self
.block_body_indices_range(block_range)?
@@ -2987,7 +3166,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
for DatabaseProvider<TX, N>
{
fn take_block_and_execution_above(
@@ -3030,89 +3209,40 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
for DatabaseProvider<TX, N>
{
type Block = BlockTy<N>;
type Receipt = ReceiptTy<N>;
/// Inserts the block into the database, always modifying the following static file segments and
/// tables:
/// * [`StaticFileSegment::Headers`]
/// * [`tables::HeaderNumbers`]
/// * [`tables::BlockBodyIndices`]
/// Inserts the block into the database, writing to both static files and MDBX.
///
/// If there are transactions in the block, the following static file segments and tables will
/// be modified:
/// * [`StaticFileSegment::Transactions`]
/// * [`tables::TransactionBlocks`]
///
/// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
/// If withdrawals are not empty, this will modify
/// [`BlockWithdrawals`](tables::BlockWithdrawals).
///
/// If the provider has __not__ configured full sender pruning, this will modify either:
/// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
/// * [`tables::TransactionSenders`] if senders are written to the database
///
/// If the provider has __not__ configured full transaction lookup pruning, this will modify
/// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
/// This is a convenience method primarily used in tests. For production use,
/// prefer [`Self::save_blocks`] which handles execution output and trie data.
fn insert_block(
&self,
block: &RecoveredBlock<Self::Block>,
) -> ProviderResult<StoredBlockBodyIndices> {
let block_number = block.number();
let tx_count = block.body().transaction_count() as u64;
let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
self.static_file_provider
.get_writer(block_number, StaticFileSegment::Headers)?
.append_header(block.header(), &block.hash())?;
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
let first_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
senders_writer.increment_block(block.number())?;
senders_writer
.append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
}
if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
self.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
let hash = transaction.tx_hash();
writer.put_transaction_hash_number(*hash, tx_num, false)?;
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
}
self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
debug!(
target: "providers::db",
?block_number,
actions = ?durations_recorder.actions,
"Inserted block"
// Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
let executed_block = ExecutedBlock::new(
Arc::new(block.clone()),
Arc::new(ExecutionOutcome::new(
Default::default(),
Vec::<Vec<ReceiptTy<N>>>::new(),
block_number,
vec![],
)),
ComputedTrieData::default(),
);
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
// Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
// Return the body indices
self.block_body_indices(block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
}
fn append_block_bodies(
@@ -3298,7 +3428,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
durations_recorder.record_relative(metrics::Action::InsertBlock);
}
self.write_state(execution_outcome, OriginalValuesKnown::No)?;
self.write_state(execution_outcome, OriginalValuesKnown::No, true)?;
durations_recorder.record_relative(metrics::Action::InsertState);
// insert hashes and intermediate merkle nodes
@@ -3440,7 +3570,8 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
// Normal path: finalize() will call sync_all() if not already synced
self.static_file_provider.finalize()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
@@ -3523,10 +3654,11 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
true,
)
.unwrap();
provider_rw.insert_block(&data.blocks[0].0).unwrap();
provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No, true).unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();
@@ -3549,11 +3681,14 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
true,
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No, true)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3579,13 +3714,16 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
true,
)
.unwrap();
// insert blocks 1-3 with receipts
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No, true)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3610,11 +3748,14 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
true,
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No, true)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3673,11 +3814,14 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
true,
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No, true)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -4991,7 +5135,7 @@ mod tests {
}]],
..Default::default()
};
provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No, true).unwrap();
provider_rw.commit().unwrap();
};

View File

@@ -10,7 +10,7 @@ pub use database::*;
mod static_file;
pub use static_file::{
StaticFileAccess, StaticFileJarProvider, StaticFileProvider, StaticFileProviderBuilder,
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriter,
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriteCtx, StaticFileWriter,
};
mod state;

View File

@@ -14,6 +14,7 @@ use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash,
use dashmap::DashMap;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use parking_lot::RwLock;
use reth_chain_state::ExecutedBlock;
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
use reth_db::{
lockfile::StorageLock,
@@ -32,7 +33,9 @@ use reth_db_api::{
use reth_ethereum_primitives::{Receipt, TransactionSigned};
use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
use reth_primitives_traits::{
AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader, SignedTransaction,
};
use reth_stages_types::{PipelineTarget, StageId};
use reth_static_file_types::{
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
@@ -41,13 +44,14 @@ use reth_static_file_types::{
use reth_storage_api::{
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
use std::{
collections::BTreeMap,
fmt::Debug,
ops::{Deref, Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
sync::{atomic::AtomicU64, mpsc, Arc},
thread,
};
use tracing::{debug, info, trace, warn};
@@ -77,6 +81,23 @@ impl StaticFileAccess {
}
}
/// Context for static file block writes.
///
/// Contains target segments and pruning configuration.
#[derive(Debug, Clone, Copy, Default)]
pub struct StaticFileWriteCtx {
/// Whether transaction senders should be written to static files.
pub write_senders: bool,
/// Whether receipts should be written to static files.
pub write_receipts: bool,
/// The current chain tip block number (for pruning).
pub tip: BlockNumber,
/// The prune mode for receipts, if any.
pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
/// Whether receipts are prunable (based on storage settings and prune distance).
pub receipts_prunable: bool,
}
/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
///
/// "Static files" contain immutable chain history data, such as:
@@ -504,6 +525,123 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
Ok(())
}
/// Spawns a scoped thread that writes to a static file segment using the provided closure.
///
/// The closure receives a mutable reference to the segment writer. After the closure completes,
/// `sync_all()` is called to flush writes to disk.
fn spawn_segment_writer<'scope, 'env, F>(
&'env self,
scope: &'scope thread::Scope<'scope, 'env>,
segment: StaticFileSegment,
first_block_number: BlockNumber,
f: F,
) -> thread::ScopedJoinHandle<'scope, ProviderResult<()>>
where
F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env,
{
scope.spawn(move || {
let mut w = self.get_writer(first_block_number, segment)?;
f(&mut w)?;
w.sync_all()
})
}
/// Writes all static file data for multiple blocks in parallel per-segment.
///
/// This spawns separate threads for each segment type and each thread calls `sync_all()` on its
/// writer when done.
pub fn write_blocks_data(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: StaticFileWriteCtx,
) -> ProviderResult<()> {
if blocks.is_empty() {
return Ok(());
}
let first_block_number = blocks[0].recovered_block().number();
thread::scope(|s| {
let h_headers =
self.spawn_segment_writer(s, StaticFileSegment::Headers, first_block_number, |w| {
for block in blocks {
let b = block.recovered_block();
w.append_header(b.header(), &b.hash())?;
}
Ok(())
});
let h_txs = self.spawn_segment_writer(
s,
StaticFileSegment::Transactions,
first_block_number,
|w| {
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
let b = block.recovered_block();
w.increment_block(b.number())?;
for (i, tx) in b.body().transactions().iter().enumerate() {
w.append_transaction(first_tx + i as u64, tx)?;
}
}
Ok(())
},
);
let h_senders = ctx.write_senders.then(|| {
self.spawn_segment_writer(
s,
StaticFileSegment::TransactionSenders,
first_block_number,
|w| {
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
let b = block.recovered_block();
w.increment_block(b.number())?;
for (i, sender) in b.senders_iter().enumerate() {
w.append_transaction_sender(first_tx + i as u64, sender)?;
}
}
Ok(())
},
)
});
let h_receipts = ctx.write_receipts.then(|| {
self.spawn_segment_writer(s, StaticFileSegment::Receipts, first_block_number, |w| {
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
let block_number = block.recovered_block().number();
w.increment_block(block_number)?;
// skip writing receipts if pruning configuration requires us to.
if ctx.receipts_prunable &&
ctx.receipts_prune_mode
.is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
{
continue
}
for (i, receipt) in
block.execution_outcome().receipts.iter().flatten().enumerate()
{
w.append_receipt(first_tx + i as u64, receipt)?;
}
}
Ok(())
})
});
h_headers.join().map_err(|_| StaticFileWriterError::ThreadPanic("headers"))??;
h_txs.join().map_err(|_| StaticFileWriterError::ThreadPanic("transactions"))??;
if let Some(h) = h_senders {
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("senders"))??;
}
if let Some(h) = h_receipts {
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("receipts"))??;
}
Ok(())
})
}
/// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
/// either block or transaction.
pub fn get_segment_provider(

View File

@@ -1,6 +1,7 @@
mod manager;
pub use manager::{
StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter,
StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
};
mod jar;

View File

@@ -206,6 +206,8 @@ pub struct StaticFileProviderRW<N> {
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// On commit, contains the pruning strategy to apply for the segment.
prune_on_commit: Option<PruneStrategy>,
/// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
synced: bool,
}
impl<N: NodePrimitives> StaticFileProviderRW<N> {
@@ -227,6 +229,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
reader,
metrics,
prune_on_commit: None,
synced: false,
};
writer.ensure_end_range_consistency()?;
@@ -335,12 +338,13 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
if self.writer.is_dirty() {
self.writer.sync_all().map_err(ProviderError::other)?;
}
self.synced = true;
Ok(())
}
/// Commits configuration to disk and updates the reader index.
///
/// Must be called after [`Self::sync_all`] to complete the commit.
/// If `sync_all()` was not called, this will call it first to ensure data is persisted.
///
/// Returns an error if prune is queued (use [`Self::commit`] instead).
pub fn finalize(&mut self) -> ProviderResult<()> {
@@ -348,9 +352,14 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
}
if self.writer.is_dirty() {
if !self.synced {
self.writer.sync_all().map_err(ProviderError::other)?;
}
self.writer.finalize().map_err(ProviderError::other)?;
self.update_index()?;
}
self.synced = false;
Ok(())
}

View File

@@ -280,7 +280,7 @@ mod tests {
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
// Check plain storage state
@@ -380,7 +380,7 @@ mod tests {
state.merge_transitions(BundleRetention::Reverts);
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 2, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
assert_eq!(
@@ -448,7 +448,7 @@ mod tests {
let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build();
@@ -607,7 +607,7 @@ mod tests {
let outcome: ExecutionOutcome =
ExecutionOutcome::new(bundle, Default::default(), 1, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider
@@ -773,7 +773,7 @@ mod tests {
let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build();
@@ -822,7 +822,7 @@ mod tests {
state.merge_transitions(BundleRetention::Reverts);
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new());
provider
.write_state(&outcome, OriginalValuesKnown::Yes)
.write_state(&outcome, OriginalValuesKnown::Yes, true)
.expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider

View File

@@ -12,12 +12,15 @@ pub trait StateWriter {
/// Receipt type included into [`ExecutionOutcome`].
type Receipt;
/// Write the state and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
/// Write the state and optionally receipts to the database.
///
/// If `write_receipts` is false, receipts are skipped (useful when receipts are written
/// separately to static files).
fn write_state(
&self,
execution_outcome: &ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown,
write_receipts: bool,
) -> ProviderResult<()>;
/// Write state reverts to the database.

View File

@@ -28,6 +28,7 @@ bytes = { workspace = true, optional = true }
derive_more.workspace = true
itertools = { workspace = true, features = ["use_alloc"] }
nybbles = { workspace = true, features = ["rlp"] }
dashmap.workspace = true
# reth
revm-database.workspace = true

View File

@@ -1,14 +1,17 @@
//! Tracking of keys having been added and removed from the tries.
use std::sync::Arc;
use crate::HashedPostState;
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::B256;
use alloy_trie::proof::AddedRemovedKeys;
use dashmap::{mapref::one::Ref, DashMap};
/// Tracks added and removed keys across account and storage tries.
#[derive(Debug, Clone)]
pub struct MultiAddedRemovedKeys {
account: AddedRemovedKeys,
storages: B256Map<AddedRemovedKeys>,
storages: Arc<DashMap<B256, AddedRemovedKeys, alloy_primitives::map::DefaultHashBuilder>>,
}
/// Returns [`AddedRemovedKeys`] with default parameters. This is necessary while we are not yet
@@ -50,7 +53,7 @@ impl MultiAddedRemovedKeys {
continue
}
let storage_removed_keys =
let mut storage_removed_keys =
self.storages.entry(*hashed_address).or_insert_with(default_added_removed_keys);
for (key, val) in &storage.storage {
@@ -68,7 +71,7 @@ impl MultiAddedRemovedKeys {
}
/// Returns a [`AddedRemovedKeys`] for the storage trie of a particular account, if any.
pub fn get_storage(&self, hashed_address: &B256) -> Option<&AddedRemovedKeys> {
pub fn get_storage(&self, hashed_address: &B256) -> Option<Ref<'_, B256, AddedRemovedKeys>> {
self.storages.get(hashed_address)
}

View File

@@ -215,7 +215,9 @@ impl HashedPostState {
let mut storage_not_in_targets = HashedStorage::default();
storage.storage.retain(|&slot, value| {
if storage_in_targets.contains(&slot) &&
!storage_added_removed_keys.is_some_and(|k| k.is_removed(&slot))
!storage_added_removed_keys
.as_ref()
.is_some_and(|k| k.is_removed(&slot))
{
return true
}

View File

@@ -30,7 +30,7 @@ pub struct ParallelProof {
/// Flag indicating whether to include branch node masks in the proof.
collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
/// Handle to the proof worker pools.
proof_worker_handle: ProofWorkerHandle,
/// Whether to use V2 storage proofs.
@@ -72,7 +72,7 @@ impl ParallelProof {
/// extra proofs needed to add and remove leaf nodes from the tries.
pub fn with_multi_added_removed_keys(
mut self,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
) -> Self {
self.multi_added_removed_keys = multi_added_removed_keys;
self

View File

@@ -447,8 +447,9 @@ where
// Get or create added/removed keys context
let multi_added_removed_keys =
multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
multi_added_removed_keys.unwrap_or_else(MultiAddedRemovedKeys::new);
let added_removed_keys =
multi_added_removed_keys.get_storage(&hashed_address).map(|k| k.as_ref().clone());
let span = debug_span!(
target: "trie::proof_task",
@@ -1556,7 +1557,7 @@ fn dispatch_storage_proofs(
targets: &MultiProofTargets,
storage_prefix_sets: &mut B256Map<PrefixSet>,
with_branch_node_masks: bool,
multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<&MultiAddedRemovedKeys>,
use_v2_proofs: bool,
) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
let mut storage_proof_receivers =
@@ -1613,7 +1614,7 @@ pub enum StorageProofInput {
/// Whether or not to collect branch node masks
with_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
},
/// V2 storage proof variant
V2 {
@@ -1632,7 +1633,7 @@ impl StorageProofInput {
prefix_set: PrefixSet,
target_slots: B256Set,
with_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
) -> Self {
Self::Legacy {
hashed_address,
@@ -1668,7 +1669,7 @@ pub struct AccountMultiproofInput {
/// Whether or not to collect branch node masks.
pub collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
pub multi_added_removed_keys: Option<MultiAddedRemovedKeys>,
/// Context for sending the proof result.
pub proof_result_sender: ProofResultContext,
/// Whether to use V2 storage proofs.
@@ -1684,7 +1685,7 @@ struct AccountMultiproofParams<'a> {
/// Whether or not to collect branch node masks.
collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
multi_added_removed_keys: Option<&'a MultiAddedRemovedKeys>,
/// Receivers for storage proofs being computed in parallel.
storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
/// Cached storage roots. This will be used to read storage roots for missed leaves, as well as

View File

@@ -325,7 +325,11 @@ fn run_case(
// Commit the post state/state diff to the database
provider
.write_state(&ExecutionOutcome::single(block.number, output), OriginalValuesKnown::Yes)
.write_state(
&ExecutionOutcome::single(block.number, output),
OriginalValuesKnown::Yes,
true,
)
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
provider