Compare commits

..

42 Commits

Author SHA1 Message Date
yongkangc
a8be40c029 perf(trie): optimize extend_sorted_vec with mem::take and fast paths
- Use mem::take to move ownership, avoiding clones of target elements
- Add fast path for non-overlapping ranges (just append)
- Use extend_from_slice for empty target case
- Reuse key from target on equal keys, only clone value
2026-01-15 19:32:47 +00:00
yongkangc
9c07dca43b perf(trie): fix extend_sorted_vec O(n log n) → O(n+m) merge
Replace the previous algorithm that appended elements then re-sorted
with a proper two-pointer merge that maintains O(n+m) complexity.

The old implementation would append new elements and call sort_by(),
resulting in O(n log n) complexity per merge. The new implementation
uses a classic merge algorithm that processes both sorted inputs
in a single pass.
2026-01-15 19:32:47 +00:00
joshieDo
b9e15dbd30 feat: add rocksdb to save_blocks (#21003)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-01-15 19:32:47 +00:00
yongkangc
8c07ee2be4 perf(trie): implement 3-way adaptive merge strategy for merge_batch
- Add prefer_sorted_merge() helper with thresholds: KWAY_MIN_SOURCES=30, PAIRWISE_MIN_AVG_ITEMS=2000
- k >= 30 sources: use k-way merge (avoids O(k) copying amplification)
- k < 30, avg >= 2000 items/source: use pairwise extend_ref (2-3x faster, better cache locality)
- Otherwise: use HashMap merge then sort (lower overhead for small data)

Benchmarks show extend_ref beats kway_merge by 1.3-3.2x for k < 30 sources.
2026-01-15 19:02:15 +00:00
Dan Cline
b1f107b171 feat(reth-bench): add generate-big-block command (#21082) 2026-01-15 15:30:04 +00:00
YK
7d0e7e72de perf(trie): add k-way merge batch optimization for merge_overlay_trie_input (#21080) 2026-01-15 15:22:15 +00:00
joshieDo
f012b3391e feat: parallelize save_blocks (#20993)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-15 14:58:06 +00:00
joshieDo
d225fc1d7f feat: add get/set db settings for rocksdb (#21095) 2026-01-15 14:48:05 +00:00
Dan Cline
d469b7f1d0 feat(rpc): add flag to skip invalid transactions in testing_buildBlockV1 (#21094)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-15 12:05:30 +00:00
YK
9bcd3712c8 test(storage): add parametrized MDBX/RocksDB history lookup equivalence tests (#20871) 2026-01-15 11:16:40 +00:00
Emma Jamieson-Hoare
b25f32a977 chore(release): set version v1.10.0 (#21091)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 10:50:35 +00:00
Emma Jamieson-Hoare
905de96944 chore: release 1.9.4 (#21048)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 09:41:54 +00:00
Sergei Shulepov
27fbd9a7de fix(db): change commit return type from Result<bool> to Result<()> (#21077)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 23:56:27 +00:00
DaniPopes
26a99ac5a3 perf: small improvement to extend_sorted_vec (#21032) 2026-01-14 23:46:58 +00:00
James Prestwich
1265a89c21 refactor: make use of dbi consistent across mdbx interface (#21079) 2026-01-14 23:42:42 +00:00
Matthias Seitz
b9ff5941eb feat(primitives): add SealedBlock::decode_sealed for efficient RLP decoding (#21030) 2026-01-14 22:49:55 +00:00
Sergei Shulepov
a75a0a5db7 feat(cli): support file:// URLs in reth download (#21026)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 22:30:42 +00:00
Matthias Seitz
0a4bac77d0 feat(primitives): add From<Sealed<B>> for SealedBlock<B> (#21078) 2026-01-14 22:19:09 +00:00
Kamil Szczygieł
1fbd5a95f8 feat: Support for sending logs through OTLP (#21039)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 21:29:00 +00:00
Arsenii Kulikov
1bc07fad8e perf: use binary search in ForwardInMemoryCursor (#21049) 2026-01-14 19:31:11 +00:00
Arsenii Kulikov
8cb506c4d3 perf: don't clone entire keys set (#21042) 2026-01-14 19:26:23 +00:00
ethfanWilliam
15f16a5a2e fix: propagate keccak-cache-global feature to reth-optimism-cli (#21051)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 19:22:22 +00:00
Brian Picciano
5cf1d2a0b0 fix(trie): Update branch masks when revealing blinded nodes (#20937) 2026-01-14 19:12:15 +00:00
Matthias Seitz
59fb25d892 feat(bench-compare): add --skip-wait-syncing flag (#21035) 2026-01-14 16:24:19 +01:00
Alexey Shekhirin
665a0a8553 feat(cli): parse URL path and display ETA in reth download (#21014) 2026-01-14 10:01:01 +00:00
DaniPopes
54735ce0f4 perf: use fixed-map for StaticFileSegment maps (#21001)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-14 00:52:54 +00:00
joshieDo
a73e73adef feat(storage): split static file commit into sync_all and finalize (#20984) 2026-01-13 16:27:55 +00:00
github-actions[bot]
4f3bd3eac1 chore(deps): weekly cargo update (#20924)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-13 14:42:54 +00:00
YK
ae41823be6 fix: propagate edge feature to reth-node-core for version output (#20998) 2026-01-13 14:35:24 +00:00
Matthias Seitz
1fa71f893c test: add testing_buildBlockV1 RPC method and Osaka test (#20990) 2026-01-13 15:18:52 +01:00
ANtutov
c6b17848dd fix(trie): remove redundant storage trie root calculation in witness (#20965) 2026-01-13 13:12:39 +00:00
Alexey Shekhirin
a5dd7d0106 feat(node): --minimal flag (#20960) 2026-01-13 12:54:26 +00:00
Emilia Hane
61354e6c21 chore(test): use reth_optimism_chainspec::BASE_SEPOLIA in tests (#20988) 2026-01-13 12:07:47 +00:00
DaniPopes
2444533a04 perf: use in-memory length for static files metrics (#20987) 2026-01-13 11:37:00 +00:00
kurahin
8fa01eb62e fix: use global default for rpc_proof_permits CLI flag (#20967) 2026-01-12 23:03:51 +00:00
DaniPopes
c5e00e4aeb perf(db): throttle metrics reporting (#20974)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 22:44:24 +00:00
joshieDo
98a35cc870 fix: propagate FEATURES to sub-makes (#20975) 2026-01-12 20:03:34 +00:00
YK
46d670eca5 fix(stages): use static files for unwind in SenderRecovery stage (#20972)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-01-12 19:22:49 +00:00
DaniPopes
25906b7b3e fix(libmdbx): use correct size for freelist u32 values (#20970) 2026-01-12 18:52:03 +00:00
Matthias Seitz
1b3d815cb8 fix(rpc): validate eth_feeHistory newest_block against chain head (#20969)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 18:48:46 +00:00
DaniPopes
23f3f8e820 feat: add tracing-tracy (#20958)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 18:37:37 +00:00
DaniPopes
2663942b50 chore(deps): bump metrics (#20968) 2026-01-12 18:13:38 +00:00
240 changed files with 9234 additions and 1950 deletions

View File

@@ -12,7 +12,7 @@ workflows:
# Check that `A` activates the features of `B`.
"propagate-feature",
# These are the features to check:
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,js-tracer,portable,keccak-cache-global",
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,tracy,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,otlp-logs,js-tracer,portable,keccak-cache-global",
# Do not try to add a new section to `[features]` of `A` only because `B` exposes that feature. There are edge-cases where this is still needed, but we can add them manually.
"--left-side-feature-missing=ignore",
# Ignore the case that `A` it outside of the workspace. Otherwise it will report errors in external dependencies that we have no influence on.

931
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.9.3"
version = "1.10.0"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -485,7 +485,7 @@ revm-inspectors = "0.33.2"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.4.1"
alloy-dyn-abi = "1.4.3"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.1.0", default-features = false }
alloy-evm = { version = "0.25.1", default-features = false }
@@ -497,33 +497,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.2.1", default-features = false }
alloy-contract = { version = "1.2.1", default-features = false }
alloy-eips = { version = "1.2.1", default-features = false }
alloy-genesis = { version = "1.2.1", default-features = false }
alloy-json-rpc = { version = "1.2.1", default-features = false }
alloy-network = { version = "1.2.1", default-features = false }
alloy-network-primitives = { version = "1.2.1", default-features = false }
alloy-provider = { version = "1.2.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.2.1", default-features = false }
alloy-rpc-client = { version = "1.2.1", default-features = false }
alloy-rpc-types = { version = "1.2.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.2.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.2.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.2.1", default-features = false }
alloy-rpc-types-debug = { version = "1.2.1", default-features = false }
alloy-rpc-types-engine = { version = "1.2.1", default-features = false }
alloy-rpc-types-eth = { version = "1.2.1", default-features = false }
alloy-rpc-types-mev = { version = "1.2.1", default-features = false }
alloy-rpc-types-trace = { version = "1.2.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.2.1", default-features = false }
alloy-serde = { version = "1.2.1", default-features = false }
alloy-signer = { version = "1.2.1", default-features = false }
alloy-signer-local = { version = "1.2.1", default-features = false }
alloy-transport = { version = "1.2.1" }
alloy-transport-http = { version = "1.2.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.2.1", default-features = false }
alloy-transport-ws = { version = "1.2.1", default-features = false }
alloy-consensus = { version = "1.4.3", default-features = false }
alloy-contract = { version = "1.4.3", default-features = false }
alloy-eips = { version = "1.4.3", default-features = false }
alloy-genesis = { version = "1.4.3", default-features = false }
alloy-json-rpc = { version = "1.4.3", default-features = false }
alloy-network = { version = "1.4.3", default-features = false }
alloy-network-primitives = { version = "1.4.3", default-features = false }
alloy-provider = { version = "1.4.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.3", default-features = false }
alloy-rpc-client = { version = "1.4.3", default-features = false }
alloy-rpc-types = { version = "1.4.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.3", default-features = false }
alloy-rpc-types-debug = { version = "1.4.3", default-features = false }
alloy-rpc-types-engine = { version = "1.4.3", default-features = false }
alloy-rpc-types-eth = { version = "1.4.3", default-features = false }
alloy-rpc-types-mev = { version = "1.4.3", default-features = false }
alloy-rpc-types-trace = { version = "1.4.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.3", default-features = false }
alloy-serde = { version = "1.4.3", default-features = false }
alloy-signer = { version = "1.4.3", default-features = false }
alloy-signer-local = { version = "1.4.3", default-features = false }
alloy-transport = { version = "1.4.3" }
alloy-transport-http = { version = "1.4.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.3", default-features = false }
alloy-transport-ws = { version = "1.4.3", default-features = false }
# op
alloy-op-evm = { version = "0.25.0", default-features = false }
@@ -555,6 +555,7 @@ dirs-next = "2.0.0"
dyn-clone = "1.0.17"
eyre = "0.6"
fdlimit = "0.3.0"
fixed-map = { version = "0.9", default-features = false }
humantime = "2.1"
humantime-serde = "1.1"
itertools = { version = "0.14", default-features = false }
@@ -596,9 +597,9 @@ chrono = "0.4.41"
# metrics
metrics = "0.24.0"
metrics-derive = "0.1"
metrics-exporter-prometheus = { version = "0.16.0", default-features = false }
metrics-exporter-prometheus = { version = "0.18.0", default-features = false }
metrics-process = "2.1.0"
metrics-util = { default-features = false, version = "0.19.0" }
metrics-util = { default-features = false, version = "0.20.0" }
# proc-macros
proc-macro2 = "1.0"
@@ -664,6 +665,7 @@ opentelemetry_sdk = "0.31"
opentelemetry = "0.31"
opentelemetry-otlp = "0.31"
opentelemetry-semantic-conventions = "0.31"
opentelemetry-appender-tracing = "0.31"
tracing-opentelemetry = "0.32"
# misc-testing
@@ -734,6 +736,7 @@ tracing-journald = "0.3"
tracing-logfmt = "0.3.3"
tracing-samply = "0.1"
tracing-subscriber = { version = "0.3", default-features = false }
tracing-tracy = "0.11"
triehash = "0.8"
typenum = "1.15.0"
vergen = "9.0.4"

View File

@@ -283,11 +283,11 @@ docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker im
# Create a cross-arch Docker image with the given tags and push it
define docker_build_push
$(MAKE) build-x86_64-unknown-linux-gnu
$(MAKE) FEATURES="$(FEATURES)" build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/amd64/reth
$(MAKE) build-aarch64-unknown-linux-gnu
$(MAKE) FEATURES="$(FEATURES)" build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/arm64/reth
@@ -357,11 +357,11 @@ op-docker-build-push-nightly-profiling: ## Build and push cross-arch Docker imag
# Create a cross-arch Docker image with the given tags and push it
define op_docker_build_push
$(MAKE) op-build-x86_64-unknown-linux-gnu
$(MAKE) FEATURES="$(FEATURES)" op-build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/amd64/op-reth
$(MAKE) op-build-aarch64-unknown-linux-gnu
$(MAKE) FEATURES="$(FEATURES)" op-build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/arm64/op-reth

View File

@@ -71,7 +71,11 @@ jemalloc = [
"reth-node-core/jemalloc",
]
jemalloc-prof = ["reth-cli-util/jemalloc-prof"]
tracy-allocator = ["reth-cli-util/tracy-allocator"]
tracy-allocator = ["reth-cli-util/tracy-allocator", "tracy"]
tracy = [
"reth-node-core/tracy",
"reth-tracing/tracy",
]
min-error-logs = [
"tracing/release_max_level_error",

View File

@@ -147,6 +147,11 @@ pub(crate) struct Args {
#[arg(long)]
pub no_clear_cache: bool,
/// Skip waiting for the node to sync before starting benchmarks.
/// When enabled, assumes the node is already synced and skips the initial tip check.
#[arg(long)]
pub skip_wait_syncing: bool,
#[command(flatten)]
pub logs: LogArgs,
@@ -578,7 +583,11 @@ async fn run_warmup_phase(
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
// Wait for node to be ready and get its current tip
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
let current_tip = if args.skip_wait_syncing {
node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
} else {
node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
};
info!("Warmup node is ready at tip: {}", current_tip);
// Clear filesystem caches before warmup run only (unless disabled)
@@ -632,7 +641,11 @@ async fn run_benchmark_workflow(
let (mut node_process, _) = node_manager
.start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
.await?;
let starting_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
let starting_tip = if args.skip_wait_syncing {
node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
} else {
node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
};
info!("Node starting tip: {}", starting_tip);
node_manager.stop_node(&mut node_process).await?;
@@ -699,7 +712,11 @@ async fn run_benchmark_workflow(
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
// Wait for node to be ready and get its current tip (wherever it is)
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
let current_tip = if args.skip_wait_syncing {
node_manager.wait_for_rpc_and_get_tip(&mut node_process).await?
} else {
node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?
};
info!("Node is ready at tip: {}", current_tip);
// Calculate benchmark range

View File

@@ -458,6 +458,76 @@ impl NodeManager {
.wrap_err("Timed out waiting for node to be ready and synced")?
}
/// Wait for the node RPC to be ready and return its current tip, without waiting for sync.
///
/// This is faster than `wait_for_node_ready_and_get_tip` but may return a tip while
/// the node is still syncing.
pub(crate) async fn wait_for_rpc_and_get_tip(
&self,
child: &mut tokio::process::Child,
) -> Result<u64> {
info!("Waiting for node RPC to be ready (skipping sync wait)...");
let max_wait = Duration::from_secs(60);
let check_interval = Duration::from_secs(2);
let rpc_url = "http://localhost:8545";
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
let provider = ProviderBuilder::new().connect_http(url);
let start_time = tokio::time::Instant::now();
let mut iteration = 0;
timeout(max_wait, async {
loop {
iteration += 1;
debug!(
"RPC readiness check iteration {} (elapsed: {:?})",
iteration,
start_time.elapsed()
);
if let Some(status) = child.try_wait()? {
return Err(eyre!("Node process exited unexpectedly with {status}"));
}
match provider.get_block_number().await {
Ok(tip) => {
debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
let ws_connect = WsConnect::new(&ws_url);
match RpcClient::connect_pubsub(ws_connect).await {
Ok(_) => {
info!(
"Node RPC is ready at block: {} (took {:?}, {} iterations)",
tip,
start_time.elapsed(),
iteration
);
return Ok(tip);
}
Err(e) => {
debug!(
"HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
iteration, e
);
}
}
}
Err(e) => {
debug!("RPC not ready yet (iteration {}): {:?}", iteration, e);
}
}
sleep(check_interval).await;
}
})
.await
.wrap_err("Timed out waiting for node RPC to be ready")?
}
/// Stop the reth node gracefully
pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
let pid = child.id().ok_or_eyre("Child process ID should be available")?;

View File

@@ -17,21 +17,26 @@ workspace = true
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-engine-primitives.workspace = true
reth-ethereum-primitives.workspace = true
reth-fs-util.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-primitives-traits.workspace = true
reth-rpc-api.workspace = true
reth-tracing.workspace = true
reth-chainspec.workspace = true
# alloy
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-consensus.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-engine.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["kzg"] }
alloy-transport-http.workspace = true
alloy-transport-ipc.workspace = true
alloy-transport-ws.workspace = true
@@ -85,7 +90,11 @@ jemalloc = [
"reth-node-core/jemalloc",
]
jemalloc-prof = ["reth-cli-util/jemalloc-prof"]
tracy-allocator = ["reth-cli-util/tracy-allocator"]
tracy-allocator = ["reth-cli-util/tracy-allocator", "tracy"]
tracy = [
"reth-node-core/tracy",
"reth-tracing/tracy",
]
min-error-logs = [
"tracing/release_max_level_error",

View File

@@ -0,0 +1,160 @@
//! Benchmarks empty block processing by ramping the block gas limit.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState, JwtSecret};
use clap::Parser;
use reqwest::Url;
use reth_chainspec::ChainSpec;
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use std::{path::PathBuf, time::Instant};
use tracing::info;
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
/// Number of blocks to generate.
#[arg(long, value_name = "BLOCKS")]
blocks: u64,
/// The Engine API RPC URL.
#[arg(long = "engine-rpc-url", value_name = "ENGINE_RPC_URL")]
engine_rpc_url: String,
/// Path to the JWT secret for Engine API authentication.
#[arg(long = "jwt-secret", value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
}
impl Command {
/// Execute `benchmark gas-limit-ramp` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
if self.blocks == 0 {
return Err(eyre::eyre!("--blocks must be greater than 0"));
}
// Ensure output directory exists
if self.output.is_file() {
return Err(eyre::eyre!("Output path must be a directory"));
}
if !self.output.exists() {
std::fs::create_dir_all(&self.output)?;
info!("Created output directory: {:?}", self.output);
}
// Set up authenticated provider (used for both Engine API and eth_ methods)
let jwt = std::fs::read_to_string(&self.jwt_secret)?;
let jwt = JwtSecret::from_hex(jwt)?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_with(auth_transport).await?;
let provider = RootProvider::<AnyNetwork>::new(client);
// Get chain spec - required for fork detection
let chain_id = provider.get_chain_id().await?;
let chain_spec = ChainSpec::from_chain_id(chain_id)
.ok_or_else(|| eyre::eyre!("Unsupported chain id: {chain_id}"))?;
// Fetch the current head block as parent
let parent_block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let (mut parent_header, mut parent_hash) = rpc_block_to_header(parent_block);
let canonical_parent = parent_header.number;
let start_block = canonical_parent + 1;
let end_block = start_block + self.blocks - 1;
info!(canonical_parent, start_block, end_block, "Starting gas limit ramp benchmark");
let mut next_block_number = start_block;
let total_benchmark_duration = Instant::now();
while next_block_number <= end_block {
let timestamp = parent_header.timestamp.saturating_add(1);
let request = prepare_payload_request(&chain_spec, timestamp, parent_hash);
let new_payload_version = request.new_payload_version;
let (payload, sidecar) = build_payload(&provider, request).await?;
let mut block =
payload.clone().try_into_block_with_sidecar::<TransactionSigned>(&sidecar)?;
let max_increase = max_gas_limit_increase(parent_header.gas_limit);
let gas_limit =
parent_header.gas_limit.saturating_add(max_increase).min(MAXIMUM_GAS_LIMIT_BLOCK);
block.header.gas_limit = gas_limit;
let block_hash = block.header.hash_slow();
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
block.header.withdrawals_root,
Some(new_payload_version),
)?;
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(block_number = block.header.number, path = %payload_path.display(), "Saved payload");
call_new_payload(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
parent_header = block.header;
parent_hash = block_hash;
next_block_number += 1;
}
let final_gas_limit = parent_header.gas_limit;
info!(
total_duration=?total_benchmark_duration.elapsed(),
blocks_processed = self.blocks,
final_gas_limit,
"Benchmark complete"
);
Ok(())
}
}
const fn max_gas_limit_increase(parent_gas_limit: u64) -> u64 {
(parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR).saturating_sub(1)
}

View File

@@ -0,0 +1,617 @@
//! Command for generating large blocks by packing transactions from real blocks.
//!
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_buildBlockV1` RPC endpoint.
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
PayloadAttributes,
};
use alloy_transport::layers::RetryBackoffLayer;
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_rpc_api::TestingBuildBlockRequestV1;
use std::future::Future;
use tokio::sync::mpsc;
use tracing::{info, warn};
/// A single transaction with its gas used and raw encoded bytes.
#[derive(Debug, Clone)]
pub struct RawTransaction {
/// The actual gas used by the transaction (from receipt).
pub gas_used: u64,
/// The transaction type (e.g., 3 for EIP-4844 blob txs).
pub tx_type: u8,
/// The raw RLP-encoded transaction bytes.
pub raw: Bytes,
}
/// Abstraction over sources of transactions for big block generation.
///
/// Implementors provide transactions from different sources (RPC, database, files, etc.)
pub trait TransactionSource {
/// Fetch transactions from a specific block number.
///
/// Returns `Ok(None)` if the block doesn't exist.
/// Returns `Ok(Some((transactions, gas_used)))` with the block's transactions and total gas.
fn fetch_block_transactions(
&self,
block_number: u64,
) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
}
/// RPC-based transaction source that fetches from a remote node.
#[derive(Debug)]
pub struct RpcTransactionSource {
provider: RootProvider<AnyNetwork>,
}
impl RpcTransactionSource {
/// Create a new RPC transaction source.
pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
Self { provider }
}
/// Create from an RPC URL with retry backoff.
pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let provider = RootProvider::<AnyNetwork>::new(client);
Ok(Self { provider })
}
}
impl TransactionSource for RpcTransactionSource {
async fn fetch_block_transactions(
&self,
block_number: u64,
) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
// Fetch block and receipts in parallel
let (block, receipts) = tokio::try_join!(
self.provider.get_block_by_number(block_number.into()).full(),
self.provider.get_block_receipts(block_number.into())
)?;
let Some(block) = block else {
return Ok(None);
};
let Some(receipts) = receipts else {
return Err(eyre::eyre!("Receipts not found for block {}", block_number));
};
let block_gas_used = block.header.gas_used;
// Convert cumulative gas from receipts to per-tx gas_used
let mut prev_cumulative = 0u64;
let transactions: Vec<RawTransaction> = block
.transactions
.txns()
.zip(receipts.iter())
.map(|(tx, receipt)| {
let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
let gas_used = cumulative - prev_cumulative;
prev_cumulative = cumulative;
let with_encoded = tx.inner.inner.clone().into_encoded();
RawTransaction {
gas_used,
tx_type: tx.inner.ty(),
raw: with_encoded.encoded_bytes().clone(),
}
})
.collect();
Ok(Some((transactions, block_gas_used)))
}
}
/// Collects transactions from a source up to a target gas usage.
#[derive(Debug)]
pub struct TransactionCollector<S> {
source: S,
target_gas: u64,
}
impl<S: TransactionSource> TransactionCollector<S> {
/// Create a new transaction collector.
pub const fn new(source: S, target_gas: u64) -> Self {
Self { source, target_gas }
}
/// Collect transactions starting from the given block number.
///
/// Skips blob transactions (type 3) and collects until target gas is reached.
/// Returns the collected raw transaction bytes, total gas used, and the next block number.
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
let mut transactions: Vec<Bytes> = Vec::new();
let mut total_gas: u64 = 0;
let mut current_block = start_block;
while total_gas < self.target_gas {
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
else {
warn!(block = current_block, "Block not found, stopping");
break;
};
for tx in block_txs {
// Skip blob transactions (EIP-4844, type 3)
if tx.tx_type == 3 {
continue;
}
if total_gas + tx.gas_used <= self.target_gas {
transactions.push(tx.raw);
total_gas += tx.gas_used;
}
if total_gas >= self.target_gas {
break;
}
}
current_block += 1;
// Stop early if remaining gas is under 1M (close enough to target)
let remaining_gas = self.target_gas.saturating_sub(total_gas);
if remaining_gas < 1_000_000 {
break;
}
}
info!(
total_txs = transactions.len(),
total_gas,
next_block = current_block,
"Finished collecting transactions"
);
Ok((transactions, total_gas, current_block))
}
}
/// `reth bench generate-big-block` command
///
/// Generates a large block by fetching transactions from existing blocks and packing them
/// into a single block using the `testing_buildBlockV1` RPC endpoint.
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC URL to use for fetching blocks (can be an external archive node).
#[arg(long, value_name = "RPC_URL")]
rpc_url: String,
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// The RPC URL for `testing_buildBlockV1` calls (same node as engine, regular RPC port).
#[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
testing_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
#[arg(long, default_value = "false")]
execute: bool,
/// Number of payloads to generate. Each payload uses the previous as parent.
/// When count == 1, the payload is only generated and saved, not executed.
/// When count > 1, each payload is executed before building the next.
#[arg(long, default_value = "1")]
count: u64,
/// Number of transaction batches to prefetch in background when count > 1.
/// Higher values reduce latency but use more memory.
#[arg(long, default_value = "4")]
prefetch_buffer: usize,
/// Output directory for generated payloads. Each payload is saved as `payload_block_N.json`.
#[arg(long, value_name = "OUTPUT_DIR")]
output_dir: std::path::PathBuf,
}
/// A built payload ready for execution.
struct BuiltPayload {
block_number: u64,
envelope: ExecutionPayloadEnvelopeV4,
block_hash: B256,
timestamp: u64,
}
impl Command {
/// Execute the `generate-big-block` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Set up testing RPC provider (for testing_buildBlockV1)
info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
let testing_client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(self.testing_rpc_url.parse()?);
let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
// Get the parent block (latest canonical block)
info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
let parent_block = auth_provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let parent_hash = parent_block.header.hash;
let parent_number = parent_block.header.number;
let parent_timestamp = parent_block.header.timestamp;
info!(
parent_hash = %parent_hash,
parent_number = parent_number,
"Using initial parent block"
);
// Create output directory
std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {
self.execute_pipelined(
&auth_provider,
&testing_provider,
start_block,
parent_hash,
parent_timestamp,
)
.await?;
} else {
// Single payload - collect transactions and build
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
let collector = TransactionCollector::new(tx_source, self.target_gas);
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected"));
}
self.execute_sequential(
&auth_provider,
&testing_provider,
transactions,
parent_hash,
parent_timestamp,
)
.await?;
}
info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
Ok(())
}
/// Sequential execution path for single payload or no-execute mode.
async fn execute_sequential(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
transactions: Vec<Bytes>,
mut parent_hash: B256,
mut parent_timestamp: u64,
) -> eyre::Result<()> {
for i in 0..self.count {
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
"Building payload via testing_buildBlockV1"
);
let built = self
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
.await?;
self.save_payload(&built)?;
if self.execute || self.count > 1 {
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
}
parent_hash = built.block_hash;
parent_timestamp = built.timestamp;
}
Ok(())
}
/// Pipelined execution - fetches transactions and builds payloads in background.
async fn execute_pipelined(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
start_block: u64,
initial_parent_hash: B256,
initial_parent_timestamp: u64,
) -> eyre::Result<()> {
// Create channel for transaction batches (one batch per payload)
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
// Spawn background task to continuously fetch transaction batches
let rpc_url = self.rpc_url.clone();
let target_gas = self.target_gas;
let count = self.count;
let fetcher_handle = tokio::spawn(async move {
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
Ok(source) => source,
Err(e) => {
warn!(error = %e, "Failed to create transaction source");
return;
}
};
let collector = TransactionCollector::new(tx_source, target_gas);
let mut current_block = start_block;
for payload_idx in 0..count {
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});
let mut parent_hash = initial_parent_hash;
let mut parent_timestamp = initial_parent_timestamp;
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
for i in 0..self.count {
let is_last = i == self.count - 1;
// Get current payload (either from pending build or build now)
let current_payload = if let Some(handle) = pending_build.take() {
handle.await??
} else {
// First payload - wait for transactions and build synchronously
let transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
tx_count = transactions.len(),
"Building payload via testing_buildBlockV1"
);
self.build_payload(
testing_provider,
&transactions,
i,
parent_hash,
parent_timestamp,
)
.await?
};
self.save_payload(&current_payload)?;
let current_block_hash = current_payload.block_hash;
let current_timestamp = current_payload.timestamp;
// Execute current payload first
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Start building next payload in background (if not last) - AFTER execution
if !is_last {
// Get transactions for next payload (should already be fetched or fetching)
let next_transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if next_transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
}
let testing_provider = testing_provider.clone();
let next_index = i + 1;
let total = self.count;
pending_build = Some(tokio::spawn(async move {
info!(
payload = next_index + 1,
total = total,
parent_hash = %current_block_hash,
parent_timestamp = current_timestamp,
tx_count = next_transactions.len(),
"Building payload via testing_buildBlockV1"
);
Self::build_payload_static(
&testing_provider,
&next_transactions,
next_index,
current_block_hash,
current_timestamp,
)
.await
}));
}
parent_hash = current_block_hash;
parent_timestamp = current_timestamp;
}
// Clean up the fetcher task
drop(tx_receiver);
let _ = fetcher_handle.await;
Ok(())
}
/// Build a single payload via `testing_buildBlockV1`.
async fn build_payload(
&self,
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
Self::build_payload_static(
testing_provider,
transactions,
index,
parent_hash,
parent_timestamp,
)
.await
}
/// Static version for use in spawned tasks.
async fn build_payload_static(
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
let request = TestingBuildBlockRequestV1 {
parent_block_hash: parent_hash,
payload_attributes: PayloadAttributes {
timestamp: parent_timestamp + 12,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
},
transactions: transactions.to_vec(),
extra_data: None,
};
let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
info!(
payload = index + 1,
tx_count = transactions.len(),
total_tx_bytes = total_tx_bytes,
parent_hash = %parent_hash,
"Sending to testing_buildBlockV1"
);
let envelope: ExecutionPayloadEnvelopeV5 =
testing_provider.client().request("testing_buildBlockV1", [request]).await?;
let v4_envelope = envelope.try_into_v4()?;
let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
let block_hash = inner.block_hash;
let block_number = inner.block_number;
let timestamp = inner.timestamp;
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
}
/// Save a payload to disk.
fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
let filename = format!("payload_block_{}.json", payload.block_number);
let filepath = self.output_dir.join(&filename);
let json = serde_json::to_string_pretty(&payload.envelope)?;
std::fs::write(&filepath, &json)
.wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
Ok(())
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload,
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
if !fcu_result.is_valid() {
return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
}
Ok(())
}
}

View File

@@ -0,0 +1,196 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, RootProvider};
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState,
PayloadAttributes, PayloadId, PraguePayloadFields,
};
use eyre::OptionExt;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_node_api::EngineApiMessageVersion;
use tracing::debug;
/// Prepared payload request data for triggering block building.
pub(crate) struct PayloadRequest {
/// The payload attributes for the new block.
pub(crate) attributes: PayloadAttributes,
/// The forkchoice state pointing to the parent block.
pub(crate) forkchoice_state: ForkchoiceState,
/// The engine API version for FCU calls.
pub(crate) fcu_version: EngineApiMessageVersion,
/// The getPayload version to use (1-5).
pub(crate) get_payload_version: u8,
/// The newPayload version to use.
pub(crate) new_payload_version: EngineApiMessageVersion,
}
/// Prepare payload attributes and forkchoice state for a new block.
pub(crate) fn prepare_payload_request(
chain_spec: &ChainSpec,
timestamp: u64,
parent_hash: B256,
) -> PayloadRequest {
let shanghai_active = chain_spec.is_shanghai_active_at_timestamp(timestamp);
let cancun_active = chain_spec.is_cancun_active_at_timestamp(timestamp);
let prague_active = chain_spec.is_prague_active_at_timestamp(timestamp);
let osaka_active = chain_spec.is_osaka_active_at_timestamp(timestamp);
// FCU version: V3 for Cancun+Prague+Osaka, V2 for Shanghai, V1 otherwise
let fcu_version = if cancun_active {
EngineApiMessageVersion::V3
} else if shanghai_active {
EngineApiMessageVersion::V2
} else {
EngineApiMessageVersion::V1
};
// getPayload version: 5 for Osaka, 4 for Prague, 3 for Cancun, 2 for Shanghai, 1 otherwise
// newPayload version: 4 for Prague+Osaka (no V5), 3 for Cancun, 2 for Shanghai, 1 otherwise
let (get_payload_version, new_payload_version) = if osaka_active {
(5, EngineApiMessageVersion::V4) // Osaka uses getPayloadV5 but newPayloadV4
} else if prague_active {
(4, EngineApiMessageVersion::V4)
} else if cancun_active {
(3, EngineApiMessageVersion::V3)
} else if shanghai_active {
(2, EngineApiMessageVersion::V2)
} else {
(1, EngineApiMessageVersion::V1)
};
PayloadRequest {
attributes: PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: shanghai_active.then(Vec::new),
parent_beacon_block_root: cancun_active.then_some(B256::ZERO),
},
forkchoice_state: ForkchoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
},
fcu_version,
get_payload_version,
new_payload_version,
}
}
/// Trigger payload building via FCU and retrieve the built payload.
///
/// This sends a forkchoiceUpdated with payload attributes to start building,
/// then calls getPayload to retrieve the result.
pub(crate) async fn build_payload(
provider: &RootProvider<AnyNetwork>,
request: PayloadRequest,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
let fcu_result = call_forkchoice_updated(
provider,
request.fcu_version,
request.forkchoice_state,
Some(request.attributes.clone()),
)
.await?;
let payload_id =
fcu_result.payload_id.ok_or_eyre("Payload builder did not return a payload id")?;
get_payload_with_sidecar(
provider,
request.get_payload_version,
payload_id,
request.attributes.parent_beacon_block_root,
)
.await
}
/// Convert an RPC block to a consensus header and block hash.
pub(crate) fn rpc_block_to_header(block: alloy_provider::network::AnyRpcBlock) -> (Header, B256) {
let block_hash = block.header.hash;
let header = block.header.inner.clone().into_header_with_defaults();
(header, block_hash)
}
/// Compute versioned hashes from KZG commitments.
fn versioned_hashes_from_commitments(
commitments: &[alloy_primitives::FixedBytes<48>],
) -> Vec<B256> {
commitments.iter().map(|c| kzg_to_versioned_hash(c.as_ref())).collect()
}
/// Fetch an execution payload using the appropriate engine API version.
pub(crate) async fn get_payload_with_sidecar(
provider: &RootProvider<AnyNetwork>,
version: u8,
payload_id: PayloadId,
parent_beacon_block_root: Option<B256>,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
debug!(get_payload_version = ?version, ?payload_id, "Sending getPayload");
match version {
1 => {
let payload = provider.get_payload_v1(payload_id).await?;
Ok((ExecutionPayload::V1(payload), ExecutionPayloadSidecar::none()))
}
2 => {
let envelope = provider.get_payload_v2(payload_id).await?;
let payload = match envelope.execution_payload {
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V1(p) => ExecutionPayload::V1(p),
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V2(p) => ExecutionPayload::V2(p),
};
Ok((payload, ExecutionPayloadSidecar::none()))
}
3 => {
let envelope = provider.get_payload_v3(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V3")?,
versioned_hashes,
};
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v3(cancun_fields),
))
}
4 => {
let envelope = provider.get_payload_v4(payload_id).await?;
let versioned_hashes = versioned_hashes_from_commitments(
&envelope.envelope_inner.blobs_bundle.commitments,
);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V4")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.envelope_inner.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
5 => {
// V5 (Osaka) - use raw request since alloy doesn't have get_payload_v5 yet
let envelope = provider.get_payload_v5(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V5")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
_ => panic!("This tool does not support getPayload versions past v5"),
}
}

View File

@@ -6,9 +6,16 @@ use reth_node_core::args::LogArgs;
use reth_tracing::FileWorkerGuard;
mod context;
mod gas_limit_ramp;
mod generate_big_block;
pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
mod new_payload_fcu;
mod new_payload_only;
mod output;
mod replay_payloads;
mod send_payload;
/// `reth bench` command
@@ -27,6 +34,9 @@ pub enum Subcommands {
/// Benchmark which calls `newPayload`, then `forkchoiceUpdated`.
NewPayloadFcu(new_payload_fcu::Command),
/// Benchmark which builds empty blocks with a ramped gas limit.
GasLimitRamp(gas_limit_ramp::Command),
/// Benchmark which only calls subsequent `newPayload` calls.
NewPayloadOnly(new_payload_only::Command),
@@ -41,6 +51,29 @@ pub enum Subcommands {
/// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000
/// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)`
SendPayload(send_payload::Command),
/// Generate a large block by packing transactions from existing blocks.
///
/// This command fetches transactions from real blocks and packs them into a single
/// block using the `testing_buildBlockV1` RPC endpoint.
///
/// Example:
///
/// `reth-bench generate-big-block --rpc-url http://localhost:8545 --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex --target-gas
/// 30000000`
GenerateBigBlock(generate_big_block::Command),
/// Replay pre-generated payloads from a directory.
///
/// This command reads payload files from a previous `generate-big-block` run and replays
/// them in sequence using `newPayload` followed by `forkchoiceUpdated`.
///
/// Example:
///
/// `reth-bench replay-payloads --payload-dir ./payloads --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex`
ReplayPayloads(replay_payloads::Command),
}
impl BenchmarkCommand {
@@ -51,8 +84,11 @@ impl BenchmarkCommand {
match self.command {
Subcommands::NewPayloadFcu(command) => command.execute(ctx).await,
Subcommands::GasLimitRamp(command) => command.execute(ctx).await,
Subcommands::NewPayloadOnly(command) => command.execute(ctx).await,
Subcommands::SendPayload(command) => command.execute(ctx).await,
Subcommands::GenerateBigBlock(command) => command.execute(ctx).await,
Subcommands::ReplayPayloads(command) => command.execute(ctx).await,
}
}

View File

@@ -13,8 +13,7 @@ use crate::{
bench::{
context::BenchContext,
output::{
CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
GAS_OUTPUT_SUFFIX,
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
@@ -27,7 +26,6 @@ use alloy_rpc_client::RpcClient;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
@@ -123,6 +121,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -188,6 +187,7 @@ impl Command {
result
} {
let gas_used = block.header.gas_used;
let gas_limit = block.header.gas_limit;
let block_number = block.header.number;
let transaction_count = block.transactions.len() as u64;
@@ -211,6 +211,7 @@ impl Command {
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
@@ -240,28 +241,11 @@ impl Command {
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
// Write CSV output files
if let Some(ref path) = self.benchmark.output {
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
write_benchmark_results(path, &gas_output_results, combined_results)?;
}
let gas_output = TotalGasOutput::new(gas_output_results)?;

View File

@@ -49,6 +49,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -96,11 +97,7 @@ impl Command {
let transaction_count = block.transactions.len() as u64;
let gas_used = block.header.gas_used;
debug!(
target: "reth-bench",
number=?block.header.number,
"Sending payload to engine",
);
debug!(number=?block.header.number, "Sending payload to engine");
let (version, params) = block_to_new_payload(block, is_optimism)?;

View File

@@ -1,10 +1,13 @@
//! Contains various benchmark output formats, either for logging or for
//! serialization to / from files.
use alloy_primitives::B256;
use csv::Writer;
use eyre::OptionExt;
use reth_primitives_traits::constants::GIGAGAS;
use serde::{ser::SerializeStruct, Serialize};
use std::time::Duration;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{path::Path, time::Duration};
use tracing::info;
/// This is the suffix for gas output csv files.
pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
@@ -15,6 +18,17 @@ pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv";
/// This is the suffix for new payload output csv files.
pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
/// Serialized format for gas ramp payloads on disk.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GasRampPayloadFile {
/// Engine API version (1-5).
pub(crate) version: u8,
/// The block hash for FCU.
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
/// used and the `newPayload` latency.
#[derive(Debug)]
@@ -67,6 +81,8 @@ impl Serialize for NewPayloadResult {
pub(crate) struct CombinedResult {
/// The block number of the block being processed.
pub(crate) block_number: u64,
/// The gas limit of the block.
pub(crate) gas_limit: u64,
/// The number of transactions in the block.
pub(crate) transaction_count: u64,
/// The `newPayload` result.
@@ -88,7 +104,7 @@ impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload {} processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
"Block {} processed at {:.4} Ggas/s, used {} total gas. Combined: {:.4} Ggas/s. fcu: {:?}, newPayload: {:?}",
self.block_number,
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
@@ -110,10 +126,11 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 6)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_limit", &self.gas_limit)?;
state.serialize_field("transaction_count", &self.transaction_count)?;
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
@@ -167,6 +184,36 @@ impl TotalGasOutput {
}
}
/// Write benchmark results to CSV files.
///
/// Writes two files to the output directory:
/// - `combined_latency.csv`: Per-block latency results
/// - `total_gas.csv`: Per-block gas usage over time
pub(crate) fn write_benchmark_results(
output_dir: &Path,
gas_results: &[TotalGasRow],
combined_results: Vec<CombinedResult>,
) -> eyre::Result<()> {
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in gas_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", output_dir);
Ok(())
}
/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
///
/// This is essentially just for the csv writer, which would have headers

View File

@@ -0,0 +1,332 @@
//! Command for replaying pre-generated payloads from disk.
//!
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::output::GasRampPayloadFile,
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_node_api::EngineApiMessageVersion;
use std::path::PathBuf;
use tracing::{debug, info};
/// `reth bench replay-payloads` command
///
/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
/// `forkchoiceUpdated` for each payload in sequence.
#[derive(Debug, Parser)]
pub struct Command {
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Directory containing payload files (`payload_block_N.json`).
#[arg(long, value_name = "PAYLOAD_DIR")]
payload_dir: PathBuf,
/// Optional limit on the number of payloads to replay.
/// If not specified, replays all payloads in the directory.
#[arg(long, value_name = "COUNT")]
count: Option<usize>,
/// Skip the first N payloads.
#[arg(long, value_name = "SKIP", default_value = "0")]
skip: usize,
/// Optional directory containing gas ramp payloads to replay first.
/// These are replayed before the main payloads to warm up the gas limit.
#[arg(long, value_name = "GAS_RAMP_DIR")]
gas_ramp_dir: Option<PathBuf>,
}
/// A loaded payload ready for execution.
struct LoadedPayload {
/// The index (from filename).
index: u64,
/// The payload envelope.
envelope: ExecutionPayloadEnvelopeV4,
/// The block hash.
block_hash: B256,
}
/// A gas ramp payload loaded from disk.
struct GasRampPayload {
/// Block number from filename.
block_number: u64,
/// Engine API version for newPayload.
version: EngineApiMessageVersion,
/// The file contents.
file: GasRampPayloadFile,
}
impl Command {
/// Execute the `replay-payloads` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Get parent block (latest canonical block) - we need this for the first FCU
let parent_block = auth_provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let initial_parent_hash = parent_block.header.hash;
let initial_parent_number = parent_block.header.number;
info!(
parent_hash = %initial_parent_hash,
parent_number = initial_parent_number,
"Using initial parent block"
);
// Load all payloads upfront to avoid I/O delays between phases
let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
if payloads.is_empty() {
return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
}
info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
payloads
} else {
Vec::new()
};
let payloads = self.load_payloads()?;
if payloads.is_empty() {
return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
}
info!(count = payloads.len(), "Loaded main payloads from disk");
let mut parent_hash = initial_parent_hash;
// Replay gas ramp payloads first
for (i, payload) in gas_ramp_payloads.iter().enumerate() {
info!(
gas_ramp_payload = i + 1,
total = gas_ramp_payloads.len(),
block_number = payload.block_number,
block_hash = %payload.file.block_hash,
"Executing gas ramp payload (newPayload + FCU)"
);
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
parent_hash = payload.file.block_hash;
}
if !gas_ramp_payloads.is_empty() {
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
}
for (i, payload) in payloads.iter().enumerate() {
info!(
payload = i + 1,
total = payloads.len(),
index = payload.index,
block_hash = %payload.block_hash,
"Executing payload (newPayload + FCU)"
);
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
parent_hash = payload.block_hash;
}
info!(count = payloads.len(), "All payloads replayed successfully");
Ok(())
}
/// Load and parse all payload files from the directory.
fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
let mut payloads = Vec::new();
// Read directory entries
let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_")
})
.collect();
// Parse filenames to get indices and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract index from "payload_NNN.json"
let index_str = name_str.strip_prefix("payload_")?.strip_suffix(".json")?;
let index: u64 = index_str.parse().ok()?;
Some((index, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(idx, _)| *idx);
// Apply skip and count
let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
let indexed_paths: Vec<_> = match self.count {
Some(count) => indexed_paths.into_iter().take(count).collect(),
None => indexed_paths,
};
// Load each payload
for (index, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
info!(
index = index,
block_hash = %block_hash,
path = %path.display(),
"Loaded payload"
);
payloads.push(LoadedPayload { index, envelope, block_hash });
}
Ok(payloads)
}
/// Load and parse gas ramp payload files from a directory.
fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
let mut payloads = Vec::new();
let entries: Vec<_> = std::fs::read_dir(dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_block_")
})
.collect();
// Parse filenames to get block numbers and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract block number from "payload_block_NNN.json"
let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
let block_number: u64 = block_str.parse().ok()?;
Some((block_number, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(num, _)| *num);
for (block_number, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let file: GasRampPayloadFile = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let version = match file.version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
};
info!(
block_number,
block_hash = %file.block_hash,
path = %path.display(),
"Loaded gas ramp payload"
);
payloads.push(GasRampPayload { block_number, version, file });
}
Ok(payloads)
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: &ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(?status, "newPayloadV4 response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(?fcu_result, "forkchoiceUpdatedV3 response");
Ok(())
}
}

View File

@@ -3,15 +3,16 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated,
PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use tracing::error;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
#[async_trait::async_trait]
@@ -52,6 +53,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV1",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
@@ -82,6 +90,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV2",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
@@ -112,6 +127,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV3",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
@@ -148,33 +170,51 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
if let Some(prague) = sidecar.prague() {
// Use target version if provided (for Osaka), otherwise default to V4
let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
if is_optimism {
let withdrawals_root = withdrawals_root.ok_or_else(|| {
eyre::eyre!("Missing withdrawals root for Optimism payload")
})?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
OpExecutionPayloadV4 {
payload_inner: payload,
withdrawals_root: block.withdrawals_root.unwrap(),
},
OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
Requests::default(),
))?,
)
} else {
// Extract actual Requests from RequestsOrHash
let requests = prague
.requests
.requests()
.cloned()
.ok_or_else(|| eyre::eyre!("Prague sidecar has hash, not requests"))?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
payload,
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
prague.requests.requests_hash(),
requests,
))?,
)
}
@@ -217,6 +257,8 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
) -> TransportResult<()> {
let method = version.method_name();
debug!(method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
@@ -237,12 +279,15 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
/// actual engine api message call.
///
/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
// FCU V3 is used for both Cancun and Prague (there is no FCU V4)
match message_version {
EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await

View File

@@ -81,12 +81,16 @@ backon.workspace = true
tempfile.workspace = true
[features]
default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
default = ["jemalloc", "otlp", "otlp-logs", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
otlp = [
"reth-ethereum-cli/otlp",
"reth-node-core/otlp",
]
otlp-logs = [
"reth-ethereum-cli/otlp-logs",
"reth-node-core/otlp-logs",
]
js-tracer = [
"reth-node-builder/js-tracer",
"reth-node-ethereum/js-tracer",
@@ -131,6 +135,11 @@ jemalloc-unprefixed = [
tracy-allocator = [
"reth-cli-util/tracy-allocator",
"reth-ethereum-cli/tracy-allocator",
"tracy",
]
tracy = [
"reth-ethereum-cli/tracy",
"reth-node-core/tracy",
]
# Because jemalloc is default and preferred over snmalloc when both features are
@@ -171,7 +180,7 @@ min-trace-logs = [
"reth-node-core/min-trace-logs",
]
edge = ["reth-ethereum-cli/edge"]
edge = ["reth-ethereum-cli/edge", "reth-node-core/edge"]
[[bin]]
name = "reth"

View File

@@ -460,6 +460,18 @@ impl ChainSpec {
pub fn builder() -> ChainSpecBuilder {
ChainSpecBuilder::default()
}
/// Map a chain ID to a known chain spec, if available.
pub fn from_chain_id(chain_id: u64) -> Option<Arc<Self>> {
match NamedChain::try_from(chain_id).ok()? {
NamedChain::Mainnet => Some(MAINNET.clone()),
NamedChain::Sepolia => Some(SEPOLIA.clone()),
NamedChain::Holesky => Some(HOLESKY.clone()),
NamedChain::Hoodi => Some(HOODI.clone()),
NamedChain::Dev => Some(DEV.clone()),
_ => None,
}
}
}
impl<H: BlockHeader> ChainSpec<H> {

View File

@@ -83,6 +83,7 @@ backon.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] }
tokio-stream.workspace = true
reqwest.workspace = true
url.workspace = true
metrics.workspace = true
# io

View File

@@ -29,7 +29,7 @@ impl Command {
let static_file_provider = tool.provider_factory.static_file_provider();
let static_files = iter_static_files(static_file_provider.directory())?;
if let Some(segment_static_files) = static_files.get(&segment) {
if let Some(segment_static_files) = static_files.get(segment) {
for (block_range, _) in segment_static_files {
static_file_provider.delete_jar(segment, block_range.start())?;
}

View File

@@ -100,7 +100,7 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
tx.disable_long_read_transaction_safety();
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(&table_db).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let total_entries = stats.entries();
let final_entry_idx = total_entries.saturating_sub(1);
if self.args.skip > final_entry_idx {

View File

@@ -54,6 +54,21 @@ pub enum SetCommand {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
}
impl Command {
@@ -128,6 +143,30 @@ impl Command {
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
}
// Write updated settings

View File

@@ -88,7 +88,7 @@ impl Command {
let stats = tx
.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {db_table}"))?;
// Defaults to 16KB right now but we should
@@ -129,7 +129,8 @@ impl Command {
table.add_row(row);
let freelist = tx.inner.env().freelist()?;
let pagesize = tx.inner.db_stat(&mdbx::Database::freelist_db())?.page_size() as usize;
let pagesize =
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
let freelist_size = freelist * pagesize;
let mut row = Row::new();

View File

@@ -16,6 +16,7 @@ use std::{
use tar::Archive;
use tokio::task;
use tracing::info;
use url::Url;
use zstd::stream::read::Decoder as ZstdDecoder;
const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
@@ -85,6 +86,9 @@ impl DownloadDefaults {
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
help
}
@@ -170,12 +174,14 @@ struct DownloadProgress {
downloaded: u64,
total_size: u64,
last_displayed: Instant,
started_at: Instant,
}
impl DownloadProgress {
/// Creates new progress tracker with given total size
fn new(total_size: u64) -> Self {
Self { downloaded: 0, total_size, last_displayed: Instant::now() }
let now = Instant::now();
Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
}
/// Converts bytes to human readable format (B, KB, MB, GB)
@@ -191,6 +197,18 @@ impl DownloadProgress {
format!("{:.2} {}", size, BYTE_UNITS[unit_index])
}
/// Format duration as human readable string
fn format_duration(duration: Duration) -> String {
let secs = duration.as_secs();
if secs < 60 {
format!("{secs}s")
} else if secs < 3600 {
format!("{}m {}s", secs / 60, secs % 60)
} else {
format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
}
}
/// Updates progress bar
fn update(&mut self, chunk_size: u64) -> Result<()> {
self.downloaded += chunk_size;
@@ -201,8 +219,24 @@ impl DownloadProgress {
let formatted_total = Self::format_size(self.total_size);
let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
// Calculate ETA based on current speed
let elapsed = self.started_at.elapsed();
let eta = if self.downloaded > 0 {
let remaining = self.total_size.saturating_sub(self.downloaded);
let speed = self.downloaded as f64 / elapsed.as_secs_f64();
if speed > 0.0 {
Duration::from_secs_f64(remaining as f64 / speed)
} else {
Duration::ZERO
}
} else {
Duration::ZERO
};
let eta_str = Self::format_duration(eta);
// Pad with spaces to clear any previous longer line
print!(
"\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
"\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str} ",
);
io::stdout().flush()?;
self.last_displayed = Instant::now();
@@ -246,29 +280,30 @@ enum CompressionFormat {
impl CompressionFormat {
/// Detect compression format from file extension
fn from_url(url: &str) -> Result<Self> {
if url.ends_with(EXTENSION_TAR_LZ4) {
let path =
Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
if path.ends_with(EXTENSION_TAR_LZ4) {
Ok(Self::Lz4)
} else if url.ends_with(EXTENSION_TAR_ZSTD) {
} else if path.ends_with(EXTENSION_TAR_ZSTD) {
Ok(Self::Zstd)
} else {
Err(eyre::eyre!("Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}", url))
Err(eyre::eyre!(
"Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
path
))
}
}
}
/// Downloads and extracts a snapshot, blocking until finished.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
let progress_reader = ProgressReader::new(response, total_size);
let format = CompressionFormat::from_url(url)?;
/// Extracts a compressed tar archive to the target directory with progress tracking.
fn extract_archive<R: Read>(
reader: R,
total_size: u64,
format: CompressionFormat,
target_dir: &Path,
) -> Result<()> {
let progress_reader = ProgressReader::new(reader, total_size);
match format {
CompressionFormat::Lz4 => {
@@ -285,6 +320,45 @@ fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
Ok(())
}
/// Extracts a snapshot from a local file.
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let file = std::fs::File::open(path)?;
let total_size = file.metadata()?.len();
extract_archive(file, total_size, format, target_dir)
}
/// Fetches the snapshot from a remote URL, uncompressing it in a streaming fashion.
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
extract_archive(response, total_size, format, target_dir)
}
/// Downloads and extracts a snapshot, blocking until finished.
///
/// Supports both `file://` URLs for local files and HTTP(S) URLs for remote downloads.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let format = CompressionFormat::from_url(url)?;
if let Ok(parsed_url) = Url::parse(url) &&
parsed_url.scheme() == "file"
{
let file_path = parsed_url
.to_file_path()
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
extract_from_file(&file_path, format, target_dir)
} else {
download_and_extract(url, format, target_dir)
}
}
async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
@@ -343,6 +417,7 @@ mod tests {
assert!(help.contains("Available snapshot sources:"));
assert!(help.contains("merkle.io"));
assert!(help.contains("publicnode.com"));
assert!(help.contains("file://"));
}
#[test]
@@ -367,4 +442,25 @@ mod tests {
assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
}
#[test]
fn test_compression_format_detection() {
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
}
}

View File

@@ -11,7 +11,6 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_cli_util::get_secret_key;
use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
@@ -19,19 +18,19 @@ use reth_downloaders::{
use reth_exex::ExExManagerHandle;
use reth_network::BlockDownloaderProvider;
use reth_network_p2p::HeadersClient;
use reth_node_builder::common::metrics_hooks;
use reth_node_core::{
args::{NetworkArgs, StageEnum},
version::version_metadata,
};
use reth_node_metrics::{
chain::ChainSpecInfo,
hooks::Hooks,
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
use reth_provider::{
ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
StageCheckpointWriter,
};
use reth_stages::{
stages::{
@@ -139,20 +138,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor,
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
}
})
.build(),
metrics_hooks(&provider_factory),
data_dir.pprof_dumps(),
);

View File

@@ -26,7 +26,8 @@ rand_08.workspace = true
thiserror.workspace = true
serde.workspace = true
tracy-client = { workspace = true, optional = true, features = ["demangle"] }
tracy-client = { workspace = true, optional = true }
reth-tracing = { workspace = true, optional = true }
[dev-dependencies]
rand.workspace = true
@@ -46,7 +47,7 @@ jemalloc-prof = ["jemalloc", "tikv-jemallocator?/profiling"]
jemalloc-unprefixed = ["jemalloc", "tikv-jemallocator?/unprefixed_malloc_on_supported_platforms"]
# Wraps the selected allocator in the tracy profiling allocator
tracy-allocator = ["dep:tracy-client"]
tracy-allocator = ["dep:tracy-client", "dep:reth-tracing"]
snmalloc = ["dep:snmalloc-rs"]

View File

@@ -25,7 +25,6 @@ cfg_if::cfg_if! {
cfg_if::cfg_if! {
if #[cfg(feature = "tracy-allocator")] {
type AllocatorWrapper = tracy_client::ProfiledAllocator<AllocatorInner>;
tracy_client::register_demangler!();
const fn new_allocator_wrapper() -> AllocatorWrapper {
AllocatorWrapper::new(AllocatorInner {}, 100)
}

View File

@@ -8,6 +8,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "tracy-allocator")]
use reth_tracing as _;
pub mod allocator;
pub mod cancellation;

View File

@@ -2,9 +2,8 @@
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_prune_types::PruneModes;
use reth_stages_types::ExecutionStageThresholds;
use reth_static_file_types::StaticFileSegment;
use reth_static_file_types::{StaticFileMap, StaticFileSegment};
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::Duration,
};
@@ -473,8 +472,8 @@ impl StaticFilesConfig {
Ok(())
}
/// Converts the blocks per file configuration into a [`HashMap`] per segment.
pub fn as_blocks_per_file_map(&self) -> HashMap<StaticFileSegment, u64> {
/// Converts the blocks per file configuration into a [`StaticFileMap`].
pub fn as_blocks_per_file_map(&self) -> StaticFileMap<u64> {
let BlocksPerFileConfig {
headers,
transactions,
@@ -483,7 +482,7 @@ impl StaticFilesConfig {
account_change_sets,
} = self.blocks_per_file;
let mut map = HashMap::new();
let mut map = StaticFileMap::default();
// Iterating over all possible segments allows us to do an exhaustive match here,
// to not forget to configure new segments in the future.
for segment in StaticFileSegment::iter() {
@@ -1079,18 +1078,6 @@ transaction_lookup = 'full'
receipts = { distance = 16384 }
#";
let _conf: Config = toml::from_str(s).unwrap();
let s = r"#
[prune]
block_interval = 5
[prune.segments]
sender_recovery = { distance = 16384 }
transaction_lookup = 'full'
receipts = 'full'
#";
let err = toml::from_str::<Config>(s).unwrap_err().to_string();
assert!(err.contains("invalid value: string \"full\""), "{}", err);
}
#[test]

View File

@@ -2,11 +2,11 @@ use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTe
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_eips::BlockId;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV5, ForkchoiceState};
use alloy_rpc_types_eth::BlockNumberOrTag;
use eyre::Ok;
use futures_util::Future;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::{core::client::ClientT, http_client::HttpClient};
use reth_chainspec::EthereumHardforks;
use reth_network_api::test_utils::PeersHandleProvider;
use reth_node_api::{
@@ -20,6 +20,7 @@ use reth_provider::{
BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
HeaderProvider, StageCheckpointReader,
};
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_rpc_builder::auth::AuthServerHandle;
use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
use reth_stages_types::StageId;
@@ -319,4 +320,20 @@ where
Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
}
/// Calls the `testing_buildBlockV1` RPC on this node.
///
/// This endpoint builds a block using the provided parent, payload attributes, and
/// transactions. Requires the `Testing` RPC module to be enabled.
pub async fn testing_build_block_v1(
&self,
request: TestingBuildBlockRequestV1,
) -> eyre::Result<ExecutionPayloadEnvelopeV5> {
let client =
self.rpc_client().ok_or_else(|| eyre::eyre!("HTTP RPC client not available"))?;
let res: ExecutionPayloadEnvelopeV5 =
client.request("testing_buildBlockV1", [request]).await?;
eyre::Ok(res)
}
}

View File

@@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
@@ -151,7 +151,7 @@ where
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.commit()?;
}

View File

@@ -20,6 +20,7 @@ use reth_trie_parallel::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
@@ -609,7 +610,19 @@ impl MultiProofTask {
self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: targets
.keys()
.filter_map(|account| {
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.map(|keys| (*account, keys))
})
.collect(),
});
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
@@ -705,7 +718,33 @@ impl MultiProofTask {
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: {
let mut storages = B256Map::with_capacity_and_hasher(
not_fetched_state_update.storages.len(),
Default::default(),
);
for account in not_fetched_state_update
.storages
.keys()
.chain(not_fetched_state_update.accounts.keys())
{
if let hash_map::Entry::Vacant(entry) = storages.entry(*account) {
entry.insert(
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.unwrap_or_default(),
);
}
}
storages
},
});
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();

View File

@@ -1,5 +1,10 @@
//! Types and traits for validating blocks and payloads.
/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`.
///
/// Benchmarked crossover: `extend_ref` wins up to ~30 blocks, `merge_batch` wins beyond.
const MERGE_BATCH_THRESHOLD: usize = 30;
use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
@@ -40,7 +45,10 @@ use reth_provider::{
StateProvider, StateProviderFactory, StateReader, TrieReader,
};
use reth_revm::db::State;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie::{
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, StateRoot, TrieInputSorted,
};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -1012,34 +1020,63 @@ where
Ok((input, block_hash))
}
/// Aggregates multiple in-memory blocks into a single [`TrieInputSorted`] by combining their
/// Aggregates in-memory blocks into a single [`TrieInputSorted`] by combining their
/// state changes.
///
/// The input `blocks` vector is ordered newest -> oldest (see `TreeState::blocks_by_hash`).
/// We iterate it in reverse so we start with the oldest block's trie data and extend forward
/// toward the newest, ensuring newer state takes precedence.
///
/// Uses `extend_ref` loop for small k, k-way `merge_batch` for large k.
/// See [`MERGE_BATCH_THRESHOLD`] for crossover point.
fn merge_overlay_trie_input(blocks: &[ExecutedBlock<N>]) -> TrieInputSorted {
let mut input = TrieInputSorted::default();
let mut blocks_iter = blocks.iter().rev().peekable();
if let Some(first) = blocks_iter.next() {
let data = first.trie_data();
input.state = data.hashed_state;
input.nodes = data.trie_updates;
// Only clone and mutate if there are more in-memory blocks.
if blocks_iter.peek().is_some() {
let state_mut = Arc::make_mut(&mut input.state);
let nodes_mut = Arc::make_mut(&mut input.nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
}
if blocks.is_empty() {
return TrieInputSorted::default();
}
input
// Single block: return Arc directly without cloning
if blocks.len() == 1 {
let data = blocks[0].trie_data();
return TrieInputSorted {
state: Arc::clone(&data.hashed_state),
nodes: Arc::clone(&data.trie_updates),
prefix_sets: Default::default(),
};
}
if blocks.len() < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref loop is faster
// Iterate oldest->newest so newer values override older ones
let mut blocks_iter = blocks.iter().rev();
let first = blocks_iter.next().expect("blocks is non-empty");
let data = first.trie_data();
let mut state = Arc::clone(&data.hashed_state);
let mut nodes = Arc::clone(&data.trie_updates);
let state_mut = Arc::make_mut(&mut state);
let nodes_mut = Arc::make_mut(&mut nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
} else {
// Large k: merge_batch is faster (O(n log k) via k-way merge)
let trie_data: Vec<_> = blocks.iter().map(|b| b.trie_data()).collect();
let merged_state = HashedPostStateSorted::merge_batch(
trie_data.iter().map(|d| d.hashed_state.as_ref()),
);
let merged_nodes =
TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref()));
TrieInputSorted {
state: Arc::new(merged_state),
nodes: Arc::new(merged_nodes),
prefix_sets: Default::default(),
}
}
}
/// Spawns a background task to compute and sort trie data for the executed block.

View File

@@ -38,6 +38,7 @@ tempfile.workspace = true
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
dev = ["reth-cli-commands/arbitrary"]
@@ -58,7 +59,8 @@ jemalloc-symbols = [
"jemalloc-prof",
"reth-node-metrics/jemalloc-symbols",
]
tracy-allocator = []
tracy-allocator = ["tracy"]
tracy = ["reth-tracing/tracy", "reth-node-core/tracy"]
# Because jemalloc is default and preferred over snmalloc when both features are
# enabled, `--no-default-features` should be used when enabling snmalloc or

View File

@@ -19,7 +19,7 @@ use reth_db::DatabaseEnv;
use reth_node_api::NodePrimitives;
use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_core::{
args::{LogArgs, OtlpInitStatus, TraceArgs},
args::{LogArgs, OtlpInitStatus, OtlpLogsStatus, TraceArgs},
version::version_metadata,
};
use reth_node_metrics::recorder::install_prometheus_recorder;
@@ -223,16 +223,19 @@ impl<
/// If file logging is enabled, this function returns a guard that must be kept alive to ensure
/// that all logs are flushed to disk.
///
/// If an OTLP endpoint is specified, it will export metrics to the configured collector.
/// If an OTLP endpoint is specified, it will export traces and logs to the configured
/// collector.
pub fn init_tracing(
&mut self,
runner: &CliRunner,
mut layers: Layers,
) -> eyre::Result<Option<FileWorkerGuard>> {
let otlp_status = runner.block_on(self.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.traces.init_otlp_logs(&mut layers))?;
let guard = self.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.traces.protocol);
@@ -243,6 +246,16 @@ impl<
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
Ok(guard)
}
}

View File

@@ -303,6 +303,8 @@ where
let eth_config =
EthConfigHandler::new(ctx.node.provider().clone(), ctx.node.evm_config().clone());
let testing_skip_invalid_transactions = ctx.config.rpc.testing_skip_invalid_transactions;
self.inner
.launch_add_ons_with(ctx, move |container| {
container.modules.merge_if_module_configured(
@@ -316,14 +318,16 @@ where
// testing_buildBlockV1: only wire when the hidden testing module is explicitly
// requested on any transport. Default stays disabled to honor security guidance.
let testing_api = TestingApi::new(
let mut testing_api = TestingApi::new(
container.registry.eth_api().clone(),
container.registry.evm_config().clone(),
)
.into_rpc();
);
if testing_skip_invalid_transactions {
testing_api = testing_api.with_skip_invalid_transactions();
}
container
.modules
.merge_if_module_configured(RethRpcModule::Testing, testing_api)?;
.merge_if_module_configured(RethRpcModule::Testing, testing_api.into_rpc())?;
Ok(())
})

View File

@@ -1,6 +1,10 @@
use crate::utils::eth_payload_attributes;
use alloy_eips::eip7685::RequestsOrHash;
use alloy_genesis::Genesis;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use alloy_primitives::{Address, B256};
use alloy_rpc_types_engine::{PayloadAttributes, PayloadStatusEnum};
use jsonrpsee_core::client::ClientT;
use reth_chainspec::{ChainSpecBuilder, EthChainSpec, MAINNET};
use reth_e2e_test_utils::{
node::NodeTestContext, setup, transaction::TransactionTestContext, wallet::Wallet,
};
@@ -8,6 +12,7 @@ use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_provider::BlockNumReader;
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_tasks::TaskManager;
use std::sync::Arc;
@@ -180,3 +185,74 @@ async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
ChainSpecBuilder::default().chain(MAINNET.chain).genesis(genesis).osaka_activated().build(),
);
let genesis_hash = chain_spec.genesis_hash();
let node_config =
NodeConfig::test().with_chain(chain_spec.clone()).with_unused_ports().with_rpc(
RpcServerArgs::default()
.with_unused_ports()
.with_http()
.with_http_api(reth_rpc_server_types::RpcModuleSelection::All),
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.node(EthereumNode::default())
.launch()
.await?;
let node = NodeTestContext::new(node, eth_payload_attributes).await?;
let wallet = Wallet::default();
let raw_tx = TransactionTestContext::transfer_tx_bytes(1, wallet.inner).await;
let payload_attributes = PayloadAttributes {
timestamp: chain_spec.genesis().timestamp + 1,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
let request = TestingBuildBlockRequestV1 {
parent_block_hash: genesis_hash,
payload_attributes,
transactions: vec![raw_tx],
extra_data: None,
};
let envelope = node.testing_build_block_v1(request).await?;
let engine_client = node.auth_server_handle().http_client();
let payload = envelope.execution_payload.clone();
let block_hash = payload.payload_inner.payload_inner.block_hash;
let versioned_hashes: Vec<B256> = Vec::new();
let parent_beacon_block_root = B256::ZERO;
let execution_requests = RequestsOrHash::Requests(envelope.execution_requests);
let status: alloy_rpc_types_engine::PayloadStatus = engine_client
.request(
"engine_newPayloadV4",
(payload, versioned_hashes, parent_beacon_block_root, execution_requests),
)
.await?;
assert_eq!(status.status, PayloadStatusEnum::Valid);
node.update_forkchoice(genesis_hash, block_hash).await?;
node.wait_block(1, block_hash, false).await?;
Ok(())
}

View File

@@ -31,8 +31,8 @@ reth-payload-validator.workspace = true
# ethereum
alloy-rlp.workspace = true
revm.workspace = true
alloy-rpc-types-engine.workspace = true
revm.workspace = true
# alloy
alloy-eips.workspace = true

View File

@@ -79,9 +79,12 @@ use reth_stages::{
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use reth_tracing::{
throttle,
tracing::{debug, error, info, warn},
};
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, thread::available_parallelism};
use std::{sync::Arc, thread::available_parallelism, time::Duration};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, watch,
@@ -167,7 +170,8 @@ impl LaunchContext {
toml_config.peers.trusted_nodes_only = config.network.trusted_only;
// Merge static file CLI arguments with config file, giving priority to CLI
toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
toml_config.static_files =
config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
Ok(toml_config)
}
@@ -479,7 +483,7 @@ where
let static_file_provider =
StaticFileProviderBuilder::read_write(self.data_dir().static_files())
.with_metrics()
.with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
.with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
.build()?;
@@ -650,23 +654,13 @@ where
},
ChainSpecInfo { name: self.chain_id().to_string() },
self.task_executor().clone(),
Hooks::builder()
.with_hook({
let db = self.database().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = self.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build(),
metrics_hooks(self.provider_factory()),
self.data_dir().pprof_dumps(),
)
.with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
.with_push_gateway(
self.node_config().metrics.push_gateway_url.clone(),
self.node_config().metrics.push_gateway_interval,
);
MetricServer::new(config).serve().await?;
}
@@ -952,7 +946,7 @@ where
error!(
"Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
);
return Err(ProviderError::BestBlockNotFound)
return Err(ProviderError::BestBlockNotFound);
}
}
@@ -1266,6 +1260,26 @@ where
head: Head,
}
/// Returns the metrics hooks for the node.
pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
throttle!(Duration::from_secs(5 * 60), || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
})
}
})
.build()
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};
@@ -1288,6 +1302,7 @@ mod tests {
let node_config = NodeConfig {
pruning: PruningArgs {
full: true,
minimal: false,
block_interval: None,
sender_recovery_full: false,
sender_recovery_distance: None,

View File

@@ -81,7 +81,9 @@ tokio.workspace = true
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
otlp = ["reth-tracing/otlp"]
otlp = ["reth-tracing/otlp", "reth-tracing-otlp/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-tracing-otlp/otlp-logs"]
tracy = ["reth-tracing/tracy"]
min-error-logs = ["tracing/release_max_level_error"]
min-warn-logs = ["tracing/release_max_level_warn"]
@@ -89,6 +91,9 @@ min-info-logs = ["tracing/release_max_level_info"]
min-debug-logs = ["tracing/release_max_level_debug"]
min-trace-logs = ["tracing/release_max_level_trace"]
# Marker feature for edge/unstable builds - captured by vergen in build.rs
edge = []
[build-dependencies]
vergen = { workspace = true, features = ["build", "cargo", "emit_and_set"] }
vergen-git2.workspace = true

View File

@@ -75,6 +75,20 @@ pub struct LogArgs {
)]
pub samply_filter: String,
/// Emit traces to tracy. Only useful when profiling.
#[arg(long = "log.tracy", global = true, hide = true)]
pub tracy: bool,
/// The filter to use for traces emitted to tracy.
#[arg(
long = "log.tracy.filter",
value_name = "FILTER",
global = true,
default_value = "debug",
hide = true
)]
pub tracy_filter: String,
/// Sets whether or not the formatter emits ANSI terminal escape codes for colors and other
/// text formatting.
#[arg(
@@ -148,6 +162,12 @@ impl LogArgs {
tracer = tracer.with_samply(config);
}
#[cfg(feature = "tracy")]
if self.tracy {
let config = self.layer_info(LogFormat::Terminal, self.tracy_filter.clone(), false);
tracer = tracer.with_tracy(config);
}
let guard = tracer.init_with_layers(layers)?;
Ok(guard)
}

View File

@@ -26,7 +26,7 @@ pub use log::{ColorMode, LogArgs, Verbosity};
/// `TraceArgs` for tracing and spans support
mod trace;
pub use trace::{OtlpInitStatus, TraceArgs};
pub use trace::{OtlpInitStatus, OtlpLogsStatus, TraceArgs};
/// `MetricArgs` to configure metrics.
mod metric;
@@ -78,7 +78,7 @@ pub use era::{DefaultEraHost, EraArgs, EraSourceArgs};
/// `StaticFilesArgs` for configuring static files.
mod static_files;
pub use static_files::StaticFilesArgs;
pub use static_files::{StaticFilesArgs, MINIMAL_BLOCKS_PER_FILE};
mod error;
pub mod types;

View File

@@ -16,9 +16,18 @@ use std::{collections::BTreeMap, ops::Not};
#[command(next_help_heading = "Pruning")]
pub struct PruningArgs {
/// Run full node. Only the most recent [`MINIMUM_PRUNING_DISTANCE`] block states are stored.
#[arg(long, default_value_t = false)]
#[arg(long, default_value_t = false, conflicts_with = "minimal")]
pub full: bool,
/// Run minimal storage mode with maximum pruning and smaller static files.
///
/// This mode configures the node to use minimal disk space by:
/// - Fully pruning sender recovery, transaction lookup, receipts
/// - Leaving 10,064 blocks for account, storage history and block bodies
/// - Using 10,000 blocks per static file segment
#[arg(long, default_value_t = false, conflicts_with = "full")]
pub minimal: bool,
/// Minimum pruning interval measured in blocks.
#[arg(long = "prune.block-interval", alias = "block-interval", value_parser = RangedU64ValueParser::<u64>::new().range(1..))]
pub block_interval: Option<u64>,
@@ -140,6 +149,23 @@ impl PruningArgs {
}
}
// If --minimal is set, use minimal storage mode with aggressive pruning.
if self.minimal {
config = PruneConfig {
block_interval: config.block_interval,
segments: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: Some(PruneMode::Full),
receipts: Some(PruneMode::Full),
account_history: Some(PruneMode::Distance(10064)),
storage_history: Some(PruneMode::Distance(10064)),
bodies_history: Some(PruneMode::Distance(10064)),
merkle_changesets: PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS),
receipts_log_filter: Default::default(),
},
}
}
// Override with any explicitly set prune.* flags.
if let Some(block_interval) = self.block_interval {
config.block_interval = block_interval as usize;

View File

@@ -605,7 +605,7 @@ pub struct RpcServerArgs {
pub rpc_eth_proof_window: u64,
/// Maximum number of concurrent getproof requests.
#[arg(long = "rpc.proof-permits", alias = "rpc-proof-permits", value_name = "COUNT", default_value_t = constants::DEFAULT_PROOF_PERMITS)]
#[arg(long = "rpc.proof-permits", alias = "rpc-proof-permits", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_proof_permits)]
pub rpc_proof_permits: usize,
/// Configures the pending block behavior for RPC responses.
@@ -640,6 +640,13 @@ pub struct RpcServerArgs {
value_parser = parse_duration_from_secs_or_ms,
)]
pub rpc_send_raw_transaction_sync_timeout: Duration,
/// Skip invalid transactions in `testing_buildBlockV1` instead of failing.
///
/// When enabled, transactions that fail execution will be skipped, and all subsequent
/// transactions from the same sender will also be skipped.
#[arg(long = "testing.skip-invalid-transactions", default_value_t = false)]
pub testing_skip_invalid_transactions: bool,
}
impl RpcServerArgs {
@@ -852,6 +859,7 @@ impl Default for RpcServerArgs {
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
testing_skip_invalid_transactions: false,
}
}
}
@@ -1026,6 +1034,7 @@ mod tests {
default_suggested_fee: None,
},
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
testing_skip_invalid_transactions: true,
};
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([
@@ -1114,6 +1123,7 @@ mod tests {
"60",
"--rpc.send-raw-transaction-sync-timeout",
"30s",
"--testing.skip-invalid-transactions",
])
.args;

View File

@@ -4,6 +4,11 @@ use clap::Args;
use reth_config::config::{BlocksPerFileConfig, StaticFilesConfig};
use reth_provider::StorageSettings;
/// Blocks per static file when running in `--minimal` node.
///
/// 10000 blocks per static file allows us to prune all history every 10k blocks.
pub const MINIMAL_BLOCKS_PER_FILE: u64 = 10000;
/// Parameters for static files configuration
#[derive(Debug, Args, PartialEq, Eq, Default, Clone, Copy)]
#[command(next_help_heading = "Static Files")]
@@ -61,14 +66,25 @@ pub struct StaticFilesArgs {
impl StaticFilesArgs {
/// Merges the CLI arguments with an existing [`StaticFilesConfig`], giving priority to CLI
/// args.
pub fn merge_with_config(&self, config: StaticFilesConfig) -> StaticFilesConfig {
///
/// If `minimal` is true, uses [`MINIMAL_BLOCKS_PER_FILE`] blocks per file as the default for
/// headers, transactions, and receipts segments.
pub fn merge_with_config(&self, config: StaticFilesConfig, minimal: bool) -> StaticFilesConfig {
let minimal_blocks_per_file = minimal.then_some(MINIMAL_BLOCKS_PER_FILE);
StaticFilesConfig {
blocks_per_file: BlocksPerFileConfig {
headers: self.blocks_per_file_headers.or(config.blocks_per_file.headers),
headers: self
.blocks_per_file_headers
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.headers),
transactions: self
.blocks_per_file_transactions
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.transactions),
receipts: self.blocks_per_file_receipts.or(config.blocks_per_file.receipts),
receipts: self
.blocks_per_file_receipts
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.receipts),
transaction_senders: self
.blocks_per_file_transaction_senders
.or(config.blocks_per_file.transaction_senders),

View File

@@ -1,4 +1,4 @@
//! Opentelemetry tracing configuration through CLI args.
//! Opentelemetry tracing and logging configuration through CLI args.
use clap::Parser;
use eyre::WrapErr;
@@ -6,7 +6,7 @@ use reth_tracing::{tracing_subscriber::EnvFilter, Layers};
use reth_tracing_otlp::OtlpProtocol;
use url::Url;
/// CLI arguments for configuring `Opentelemetry` trace and span export.
/// CLI arguments for configuring `Opentelemetry` trace and logs export.
#[derive(Debug, Clone, Parser)]
pub struct TraceArgs {
/// Enable `Opentelemetry` tracing export to an OTLP endpoint.
@@ -30,9 +30,29 @@ pub struct TraceArgs {
)]
pub otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces.
/// Enable `Opentelemetry` logs export to an OTLP endpoint.
///
/// - `http`: expects endpoint path to end with `/v1/traces`
/// If no value provided, defaults based on protocol:
/// - HTTP: `http://localhost:4318/v1/logs`
/// - gRPC: `http://localhost:4317`
///
/// Example: --logs-otlp=http://collector:4318/v1/logs
#[arg(
long = "logs-otlp",
env = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
global = true,
value_name = "URL",
num_args = 0..=1,
default_missing_value = "http://localhost:4318/v1/logs",
require_equals = true,
value_parser = parse_otlp_endpoint,
help_heading = "Logging"
)]
pub logs_otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces and logs.
///
/// - `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs`
/// - `grpc`: expects endpoint without a path
///
/// Defaults to HTTP if not specified.
@@ -62,6 +82,22 @@ pub struct TraceArgs {
)]
pub otlp_filter: EnvFilter,
/// Set a filter directive for the OTLP logs exporter. This controls the verbosity
/// of logs sent to the OTLP endpoint. It follows the same syntax as the
/// `RUST_LOG` environment variable.
///
/// Example: --logs-otlp.filter=info,reth=debug
///
/// Defaults to INFO if not specified.
#[arg(
long = "logs-otlp.filter",
global = true,
value_name = "FILTER",
default_value = "info",
help_heading = "Logging"
)]
pub logs_otlp_filter: EnvFilter,
/// Service name to use for OTLP tracing export.
///
/// This name will be used to identify the service in distributed tracing systems
@@ -101,8 +137,10 @@ impl Default for TraceArgs {
fn default() -> Self {
Self {
otlp: None,
logs_otlp: None,
protocol: OtlpProtocol::Http,
otlp_filter: EnvFilter::from_default_env(),
logs_otlp_filter: EnvFilter::try_new("info").expect("valid filter"),
sample_ratio: None,
service_name: "reth".to_string(),
}
@@ -150,6 +188,37 @@ impl TraceArgs {
Ok(OtlpInitStatus::Disabled)
}
}
/// Initialize OTLP logs export with the given layers.
///
/// This method handles OTLP logs initialization based on the configured options,
/// including validation and protocol selection.
///
/// Returns the initialization status to allow callers to log appropriate messages.
pub async fn init_otlp_logs(&mut self, _layers: &mut Layers) -> eyre::Result<OtlpLogsStatus> {
if let Some(endpoint) = self.logs_otlp.as_mut() {
self.protocol.validate_logs_endpoint(endpoint)?;
#[cfg(feature = "otlp-logs")]
{
let config = reth_tracing_otlp::OtlpLogsConfig::new(
self.service_name.clone(),
endpoint.clone(),
self.protocol,
)?;
_layers.with_log_layer(config.clone(), self.logs_otlp_filter.clone())?;
Ok(OtlpLogsStatus::Started(config.endpoint().clone()))
}
#[cfg(not(feature = "otlp-logs"))]
{
Ok(OtlpLogsStatus::NoFeature)
}
} else {
Ok(OtlpLogsStatus::Disabled)
}
}
}
/// Status of OTLP tracing initialization.
@@ -163,6 +232,17 @@ pub enum OtlpInitStatus {
NoFeature,
}
/// Status of OTLP logs initialization.
#[derive(Debug)]
pub enum OtlpLogsStatus {
/// OTLP logs export was successfully started with the given endpoint.
Started(Url),
/// OTLP logs export is disabled (no endpoint configured).
Disabled,
/// OTLP logs arguments provided but feature is not compiled.
NoFeature,
}
// Parses an OTLP endpoint url.
fn parse_otlp_endpoint(arg: &str) -> eyre::Result<Url> {
Url::parse(arg).wrap_err("Invalid URL for OTLP trace output")

View File

@@ -38,10 +38,12 @@ js-tracer = [
jemalloc = ["reth-cli-util/jemalloc", "reth-optimism-cli/jemalloc"]
jemalloc-prof = ["jemalloc", "reth-cli-util/jemalloc-prof", "reth-optimism-cli/jemalloc-prof"]
jemalloc-symbols = ["jemalloc-prof", "reth-optimism-cli/jemalloc-symbols"]
tracy-allocator = ["reth-cli-util/tracy-allocator"]
tracy-allocator = ["reth-cli-util/tracy-allocator", "tracy"]
tracy = ["reth-optimism-cli/tracy"]
asm-keccak = ["reth-optimism-cli/asm-keccak", "reth-optimism-node/asm-keccak"]
keccak-cache-global = [
"reth-optimism-cli/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
dev = [

View File

@@ -76,8 +76,9 @@ reth-optimism-chainspec = { workspace = true, features = ["std", "superchain-con
[features]
default = []
# Opentelemtry feature to activate metrics export
# Opentelemetry feature to activate tracing and logs export
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
asm-keccak = [
"alloy-primitives/asm-keccak",
@@ -85,6 +86,12 @@ asm-keccak = [
"reth-optimism-node/asm-keccak",
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
# Jemalloc feature for vergen to generate correct env vars
jemalloc = [
"reth-node-core/jemalloc",
@@ -99,6 +106,8 @@ jemalloc-symbols = [
"reth-node-metrics/jemalloc-symbols",
]
tracy = ["reth-tracing/tracy", "reth-node-core/tracy"]
dev = [
"dep:proptest",
"reth-cli-commands/arbitrary",
@@ -115,4 +124,4 @@ serde = [
"reth-optimism-chainspec/serde",
]
edge = ["reth-cli-commands/edge"]
edge = ["reth-cli-commands/edge", "reth-node-core/edge"]

View File

@@ -3,7 +3,7 @@ use eyre::{eyre, Result};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::launcher::Launcher;
use reth_cli_runner::CliRunner;
use reth_node_core::args::OtlpInitStatus;
use reth_node_core::args::{OtlpInitStatus, OtlpLogsStatus};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_consensus::OpBeaconConsensus;
@@ -124,9 +124,11 @@ where
let mut layers = self.layers.take().unwrap_or_default();
let otlp_status = runner.block_on(self.cli.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.cli.traces.init_otlp_logs(&mut layers))?;
self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.cli.traces.protocol);
@@ -136,6 +138,16 @@ where
}
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.cli.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
}
Ok(())
}

View File

@@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader,
};
use reth_stages::{StageCheckpoint, StageId};
@@ -228,7 +228,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
// finally, write the receipts
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;
}
// Only commit if we have imported as many receipts as the number of transactions.

View File

@@ -306,8 +306,7 @@ mod test {
use alloy_op_hardforks::BASE_SEPOLIA_JOVIAN_TIMESTAMP;
use alloy_primitives::{b64, Address, B256, B64};
use alloy_rpc_types_engine::PayloadAttributes;
use reth_chainspec::ChainSpec;
use reth_optimism_chainspec::{OpChainSpec, BASE_SEPOLIA};
use reth_optimism_chainspec::BASE_SEPOLIA;
use reth_provider::noop::NoopProvider;
use reth_trie_common::KeccakKeyHasher;
@@ -323,24 +322,6 @@ mod test {
}};
}
fn get_chainspec() -> Arc<OpChainSpec> {
let base_sepolia_spec = BASE_SEPOLIA.inner.clone();
Arc::new(OpChainSpec {
inner: ChainSpec {
chain: base_sepolia_spec.chain,
genesis: base_sepolia_spec.genesis,
genesis_header: base_sepolia_spec.genesis_header,
paris_block_and_final_difficulty: base_sepolia_spec
.paris_block_and_final_difficulty,
hardforks: base_sepolia_spec.hardforks,
base_fee_params: base_sepolia_spec.base_fee_params,
prune_delete_limit: 10000,
..Default::default()
},
})
}
const fn get_attributes(
eip_1559_params: Option<B64>,
min_base_fee: Option<u64>,
@@ -364,8 +345,10 @@ mod test {
#[test]
fn test_well_formed_attributes_pre_holocene() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(None, None, 1732633199);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -378,8 +361,10 @@ mod test {
#[test]
fn test_well_formed_attributes_holocene_no_eip1559_params() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(None, None, 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -392,8 +377,10 @@ mod test {
#[test]
fn test_well_formed_attributes_holocene_eip1559_params_zero_denominator() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(Some(b64!("0000000000000008")), None, 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -406,8 +393,10 @@ mod test {
#[test]
fn test_well_formed_attributes_holocene_eip1559_params_zero_elasticity() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(Some(b64!("0000000800000000")), None, 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -420,8 +409,10 @@ mod test {
#[test]
fn test_well_formed_attributes_holocene_valid() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(Some(b64!("0000000800000008")), None, 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -434,8 +425,10 @@ mod test {
#[test]
fn test_well_formed_attributes_holocene_valid_all_zero() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(Some(b64!("0000000000000000")), None, 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -448,8 +441,10 @@ mod test {
#[test]
fn test_well_formed_attributes_jovian_valid() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes =
get_attributes(Some(b64!("0000000000000000")), Some(1), BASE_SEPOLIA_JOVIAN_TIMESTAMP);
@@ -464,8 +459,10 @@ mod test {
/// After Jovian (and holocene), eip1559 params must be Some
#[test]
fn test_malformed_attributes_jovian_with_eip_1559_params_none() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(None, Some(1), BASE_SEPOLIA_JOVIAN_TIMESTAMP);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -479,8 +476,10 @@ mod test {
/// Before Jovian, min base fee must be None
#[test]
fn test_malformed_attributes_pre_jovian_with_min_base_fee() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes = get_attributes(Some(b64!("0000000000000000")), Some(1), 1732633200);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
@@ -494,8 +493,10 @@ mod test {
/// After Jovian, min base fee must be Some
#[test]
fn test_malformed_attributes_post_jovian_with_min_base_fee_none() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let validator = OpEngineValidator::new::<KeccakKeyHasher>(
BASE_SEPOLIA.clone(),
NoopProvider::default(),
);
let attributes =
get_attributes(Some(b64!("0000000000000000")), None, BASE_SEPOLIA_JOVIAN_TIMESTAMP);

View File

@@ -77,6 +77,7 @@ arbitrary = [
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
"reth-optimism-cli?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -1,12 +1,12 @@
//! Sealed block types
use crate::{
block::{error::BlockRecoveryError, RecoveredBlock},
transaction::signed::RecoveryError,
block::{error::BlockRecoveryError, header::BlockHeader, RecoveredBlock},
transaction::signed::{RecoveryError, SignedTransaction},
Block, BlockBody, GotExpected, InMemorySize, SealedHeader,
};
use alloc::vec::Vec;
use alloy_consensus::BlockHeader;
use alloy_consensus::BlockHeader as _;
use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
use alloy_primitives::{Address, BlockHash, Sealable, Sealed, B256};
use alloy_rlp::{Decodable, Encodable};
@@ -327,6 +327,31 @@ impl<B: Block> From<SealedBlock<B>> for Sealed<B> {
}
}
impl<B: Block> From<Sealed<B>> for SealedBlock<B> {
fn from(value: Sealed<B>) -> Self {
let (block, hash) = value.into_parts();
Self::new_unchecked(block, hash)
}
}
impl<T, H> SealedBlock<alloy_consensus::Block<T, H>>
where
T: Decodable + SignedTransaction,
H: BlockHeader,
{
/// Decodes the block from RLP, computing the header hash directly from the RLP bytes.
///
/// This is more efficient than decoding and then sealing, as the header hash is computed
/// from the raw RLP bytes without re-encoding.
///
/// This leverages [`alloy_consensus::Block::decode_sealed`].
pub fn decode_sealed(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let sealed = alloy_consensus::Block::<T, H>::decode_sealed(buf)?;
let (block, hash) = sealed.into_parts();
Ok(Self::new_unchecked(block, hash))
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl<'a, B> arbitrary::Arbitrary<'a> for SealedBlock<B>
where
@@ -555,4 +580,96 @@ mod tests {
assert_eq!(sealed_block.header().state_root, decoded.header().state_root);
assert_eq!(sealed_block.body().transactions.len(), decoded.body().transactions.len());
}
#[test]
fn test_decode_sealed_produces_correct_hash() {
// Create a sample block using alloy_consensus::Block
let header = alloy_consensus::Header {
parent_hash: B256::ZERO,
ommers_hash: B256::ZERO,
beneficiary: Address::ZERO,
state_root: B256::ZERO,
transactions_root: B256::ZERO,
receipts_root: B256::ZERO,
logs_bloom: Default::default(),
difficulty: Default::default(),
number: 42,
gas_limit: 30_000_000,
gas_used: 21_000,
timestamp: 1_000_000,
extra_data: Default::default(),
mix_hash: B256::ZERO,
nonce: Default::default(),
base_fee_per_gas: Some(1_000_000_000),
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
};
// Create a simple transaction
let tx = alloy_consensus::TxLegacy {
chain_id: Some(1),
nonce: 0,
gas_price: 21_000_000_000,
gas_limit: 21_000,
to: alloy_primitives::TxKind::Call(Address::ZERO),
value: alloy_primitives::U256::from(100),
input: alloy_primitives::Bytes::default(),
};
let tx_signed =
alloy_consensus::TxEnvelope::Legacy(alloy_consensus::Signed::new_unchecked(
tx,
alloy_primitives::Signature::test_signature(),
B256::ZERO,
));
// Create block body with the transaction
let body = alloy_consensus::BlockBody {
transactions: vec![tx_signed],
ommers: vec![],
withdrawals: Some(Default::default()),
};
// Create the block
let block = alloy_consensus::Block::new(header, body);
let expected_hash = block.header.hash_slow();
// Encode the block
let mut encoded = Vec::new();
block.encode(&mut encoded);
// Decode using decode_sealed - this should compute hash from raw RLP
let decoded =
SealedBlock::<alloy_consensus::Block<alloy_consensus::TxEnvelope>>::decode_sealed(
&mut encoded.as_slice(),
)
.expect("Failed to decode sealed block");
// Verify the hash matches
assert_eq!(decoded.hash(), expected_hash);
assert_eq!(decoded.header().number, 42);
assert_eq!(decoded.body().transactions.len(), 1);
}
#[test]
fn test_sealed_block_from_sealed() {
let header = alloy_consensus::Header::default();
let body = alloy_consensus::BlockBody::<alloy_consensus::TxEnvelope>::default();
let block = alloy_consensus::Block::new(header, body);
let hash = block.header.hash_slow();
// Create Sealed<Block>
let sealed: Sealed<alloy_consensus::Block<alloy_consensus::TxEnvelope>> =
Sealed::new_unchecked(block.clone(), hash);
// Convert to SealedBlock
let sealed_block: SealedBlock<alloy_consensus::Block<alloy_consensus::TxEnvelope>> =
SealedBlock::from(sealed);
assert_eq!(sealed_block.hash(), hash);
assert_eq!(sealed_block.header().number, block.header.number);
}
}

View File

@@ -42,15 +42,15 @@ impl PruneMode {
purpose: PrunePurpose,
) -> Result<Option<(BlockNumber, Self)>, PruneSegmentError> {
let result = match self {
Self::Full if segment.min_blocks(purpose) == 0 => Some((tip, *self)),
Self::Full if segment.min_blocks() == 0 => Some((tip, *self)),
Self::Distance(distance) if *distance > tip => None, // Nothing to prune yet
Self::Distance(distance) if *distance >= segment.min_blocks(purpose) => {
Self::Distance(distance) if *distance >= segment.min_blocks() => {
Some((tip - distance, *self))
}
Self::Before(n) if *n == tip + 1 && purpose.is_static_file() => Some((tip, *self)),
Self::Before(n) if *n > tip => None, // Nothing to prune yet
Self::Before(n) => {
(tip - n >= segment.min_blocks(purpose)).then(|| ((*n).saturating_sub(1), *self))
(tip - n >= segment.min_blocks()).then(|| ((*n).saturating_sub(1), *self))
}
_ => return Err(PruneSegmentError::Configuration(segment)),
};
@@ -93,7 +93,7 @@ mod tests {
#[test]
fn test_prune_target_block() {
let tip = 20000;
let segment = PruneSegment::Receipts;
let segment = PruneSegment::AccountHistory;
let tests = vec![
// MINIMUM_PRUNING_DISTANCE makes this impossible
@@ -101,8 +101,8 @@ mod tests {
// Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)),
(
PruneMode::Distance(segment.min_blocks(PrunePurpose::User) + 1),
Ok(Some(tip - (segment.min_blocks(PrunePurpose::User) + 1))),
PruneMode::Distance(segment.min_blocks() + 1),
Ok(Some(tip - (segment.min_blocks() + 1))),
),
// Nothing to prune
(PruneMode::Before(tip + 1), Ok(None)),

View File

@@ -61,15 +61,12 @@ impl PruneSegment {
}
/// Returns minimum number of blocks to keep in the database for this segment.
pub const fn min_blocks(&self, purpose: PrunePurpose) -> u64 {
pub const fn min_blocks(&self) -> u64 {
match self {
Self::SenderRecovery | Self::TransactionLookup => 0,
Self::Receipts if purpose.is_static_file() => 0,
Self::ContractLogs |
Self::AccountHistory |
Self::StorageHistory |
Self::Bodies |
Self::Receipts => MINIMUM_PRUNING_DISTANCE,
Self::SenderRecovery | Self::TransactionLookup | Self::Receipts | Self::Bodies => 0,
Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => {
MINIMUM_PRUNING_DISTANCE
}
Self::MerkleChangeSets => MERKLE_CHANGESETS_RETENTION_BLOCKS,
#[expect(deprecated)]
#[expect(clippy::match_same_arms)]

View File

@@ -58,13 +58,7 @@ pub struct PruneModes {
pub transaction_lookup: Option<PruneMode>,
/// Receipts pruning configuration. This setting overrides `receipts_log_filter`
/// and offers improved performance.
#[cfg_attr(
any(test, feature = "serde"),
serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
)
)]
#[cfg_attr(any(test, feature = "serde"), serde(skip_serializing_if = "Option::is_none",))]
pub receipts: Option<PruneMode>,
/// Account History pruning configuration.
#[cfg_attr(
@@ -85,13 +79,7 @@ pub struct PruneModes {
)]
pub storage_history: Option<PruneMode>,
/// Bodies History pruning configuration.
#[cfg_attr(
any(test, feature = "serde"),
serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
)
)]
#[cfg_attr(any(test, feature = "serde"), serde(skip_serializing_if = "Option::is_none",))]
pub bodies_history: Option<PruneMode>,
/// Merkle Changesets pruning configuration for `AccountsTrieChangeSets` and
/// `StoragesTrieChangeSets`.

View File

@@ -36,7 +36,6 @@ alloy-serde.workspace = true
alloy-rpc-types-beacon.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-genesis.workspace = true
serde = { workspace = true, features = ["derive"] }
# misc
jsonrpsee = { workspace = true, features = ["server", "macros"] }

View File

@@ -5,32 +5,24 @@
//! disabled by default and never be exposed on public-facing RPC without an
//! explicit operator flag.
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV5, PayloadAttributes as EthPayloadAttributes,
};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};
/// Capability string for `testing_buildBlockV1`.
pub const TESTING_BUILD_BLOCK_V1: &str = "testing_buildBlockV1";
/// Request payload for `testing_buildBlockV1`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TestingBuildBlockRequestV1 {
/// Parent block hash of the block to build.
pub parent_block_hash: B256,
/// Payload attributes (Cancun version).
pub payload_attributes: EthPayloadAttributes,
/// Raw signed transactions to force-include in order.
pub transactions: Vec<Bytes>,
/// Optional extra data for the block header.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub extra_data: Option<Bytes>,
}
pub use alloy_rpc_types_engine::{TestingBuildBlockRequestV1, TESTING_BUILD_BLOCK_V1};
/// Testing RPC interface for building a block in a single call.
///
/// # Enabling
///
/// This namespace is disabled by default for security reasons. To enable it,
/// add `testing` to the `--http.api` flag:
///
/// ```sh
/// reth node --http --http.api eth,testing
/// ```
///
/// **Warning:** Never expose this on public-facing RPC endpoints without proper
/// authentication.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "testing"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "testing"))]
pub trait TestingApi {

View File

@@ -13,7 +13,9 @@ use reth_rpc_eth_types::{
fee_history::calculate_reward_percentiles_for_block, utils::checked_blob_gas_used_ratio,
EthApiError, FeeHistoryCache, FeeHistoryEntry, GasPriceOracle, RpcInvalidTransactionError,
};
use reth_storage_api::{BlockIdReader, BlockReaderIdExt, HeaderProvider, ProviderHeader};
use reth_storage_api::{
BlockIdReader, BlockNumReader, BlockReaderIdExt, HeaderProvider, ProviderHeader,
};
use tracing::debug;
/// Fee related functions for the [`EthApiServer`](crate::EthApiServer) trait in the
@@ -92,6 +94,17 @@ pub trait EthFees:
newest_block = BlockNumberOrTag::Latest;
}
// For explicit block numbers, validate against chain head before resolution
if let BlockNumberOrTag::Number(requested) = newest_block {
let latest_block =
self.provider().best_block_number().map_err(Self::Error::from_eth_err)?;
if requested > latest_block {
return Err(
EthApiError::RequestBeyondHead { requested, head: latest_block }.into()
)
}
}
let end_block = self
.provider()
.block_number_for_id(newest_block.into())

View File

@@ -56,6 +56,7 @@ metrics.workspace = true
# misc
serde = { workspace = true, features = ["derive"] }
url = { workspace = true, features = ["serde"] }
thiserror.workspace = true
derive_more.workspace = true
schnellru.workspace = true

View File

@@ -92,6 +92,14 @@ pub enum EthApiError {
/// When an invalid block range is provided
#[error("invalid block range")]
InvalidBlockRange,
/// Requested block number is beyond the head block
#[error("request beyond head block: requested {requested}, head {head}")]
RequestBeyondHead {
/// The requested block number
requested: u64,
/// The current head block number
head: u64,
},
/// Thrown when the target block for proof computation exceeds the maximum configured window.
#[error("distance to target block exceeds maximum proof window")]
ExceedsMaxProofWindow,
@@ -268,6 +276,7 @@ impl From<EthApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EthApiError::InvalidTransactionSignature |
EthApiError::EmptyRawTransactionData |
EthApiError::InvalidBlockRange |
EthApiError::RequestBeyondHead { .. } |
EthApiError::ExceedsMaxProofWindow |
EthApiError::ConflictingFeeFieldsInRequest |
EthApiError::Signing(_) |

View File

@@ -8,6 +8,9 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
// `url` is needed for serde support on `reqwest::Url`
use url as _;
pub mod block;
pub mod builder;
pub mod cache;

View File

@@ -134,7 +134,9 @@ where
mod tests {
use super::*;
use crate::eth::helpers::types::EthRpcConverter;
use alloy_consensus::{Block, Header, SidecarBuilder, SimpleCoder, Transaction};
use alloy_consensus::{
BlobTransactionSidecar, Block, Header, SidecarBuilder, SimpleCoder, Transaction,
};
use alloy_primitives::{Address, U256};
use alloy_rpc_types_eth::request::TransactionRequest;
use reth_chainspec::{ChainSpec, ChainSpecBuilder};
@@ -332,7 +334,9 @@ mod tests {
let tx_req = TransactionRequest {
from: Some(address),
to: Some(Address::random().into()),
sidecar: Some(builder.build().unwrap().into()),
sidecar: Some(BlobTransactionSidecarVariant::from(
builder.build::<BlobTransactionSidecar>().unwrap(),
)),
..Default::default()
};
@@ -370,7 +374,9 @@ mod tests {
from: Some(address),
to: Some(Address::random().into()),
transaction_type: Some(3), // EIP-4844
sidecar: Some(builder.build().unwrap().into()),
sidecar: Some(BlobTransactionSidecarVariant::from(
builder.build::<BlobTransactionSidecar>().unwrap(),
)),
max_fee_per_blob_gas: Some(provided_blob_fee), // Already set
..Default::default()
};

View File

@@ -1,10 +1,22 @@
//! Implementation of the `testing` namespace.
//!
//! This exposes `testing_buildBlockV1`, intended for non-production/debug use.
//!
//! # Enabling the testing namespace
//!
//! The `testing_` namespace is disabled by default for security reasons.
//! To enable it, add `testing` to the `--http.api` flag when starting the node:
//!
//! ```sh
//! reth node --http --http.api eth,testing
//! ```
//!
//! **Warning:** This namespace allows building arbitrary blocks. Never expose it
//! on public-facing RPC endpoints without proper authentication.
use alloy_consensus::{Header, Transaction};
use alloy_evm::Evm;
use alloy_primitives::U256;
use alloy_primitives::{map::HashSet, Address, U256};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
@@ -19,19 +31,31 @@ use reth_rpc_eth_api::{helpers::Call, FromEthApiError};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_storage_api::{BlockReader, HeaderProvider};
use revm::context::Block;
use revm_primitives::map::DefaultHashBuilder;
use std::sync::Arc;
use tracing::debug;
/// Testing API handler.
#[derive(Debug, Clone)]
pub struct TestingApi<Eth, Evm> {
eth_api: Eth,
evm_config: Evm,
/// If true, skip invalid transactions instead of failing.
skip_invalid_transactions: bool,
}
impl<Eth, Evm> TestingApi<Eth, Evm> {
/// Create a new testing API handler.
pub const fn new(eth_api: Eth, evm_config: Evm) -> Self {
Self { eth_api, evm_config }
Self { eth_api, evm_config, skip_invalid_transactions: false }
}
/// Enable skipping invalid transactions instead of failing.
/// When a transaction fails, all subsequent transactions from the same sender are also
/// skipped.
pub const fn with_skip_invalid_transactions(mut self) -> Self {
self.skip_invalid_transactions = true;
self
}
}
@@ -46,6 +70,7 @@ where
request: TestingBuildBlockRequestV1,
) -> Result<ExecutionPayloadEnvelopeV5, Eth::Error> {
let evm_config = self.evm_config.clone();
let skip_invalid_transactions = self.skip_invalid_transactions;
self.eth_api
.spawn_with_state_at_block(request.parent_block_hash, move |eth_api, state| {
let state = state.database.0;
@@ -79,11 +104,41 @@ where
let mut total_fees = U256::ZERO;
let base_fee = builder.evm_mut().block().basefee();
for tx in request.transactions {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(&tx)?;
let mut invalid_senders: HashSet<Address, DefaultHashBuilder> = HashSet::default();
for (idx, tx) in request.transactions.iter().enumerate() {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(tx)?;
let sender = tx.signer();
if skip_invalid_transactions && invalid_senders.contains(&sender) {
continue;
}
let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default();
let gas_used =
builder.execute_transaction(tx).map_err(Eth::Error::from_eth_err)?;
let gas_used = match builder.execute_transaction(tx) {
Ok(gas_used) => gas_used,
Err(err) => {
if skip_invalid_transactions {
debug!(
target: "rpc::testing",
tx_idx = idx,
?sender,
error = ?err,
"Skipping invalid transaction"
);
invalid_senders.insert(sender);
continue;
}
debug!(
target: "rpc::testing",
tx_idx = idx,
?sender,
error = ?err,
"Transaction execution failed"
);
return Err(Eth::Error::from_eth_err(err));
}
};
total_fees += U256::from(tip) * U256::from(gas_used);
}

View File

@@ -12,7 +12,7 @@ use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockReader, DBProvider, EitherWriter, ExecutionOutcome, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
@@ -463,7 +463,7 @@ where
}
// write output
provider.write_state(&state, OriginalValuesKnown::Yes)?;
provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?;
let db_write_duration = time.elapsed();
debug!(

View File

@@ -7,7 +7,7 @@ use reth_db_api::{
table::Value,
tables,
transaction::{DbTx, DbTxMut},
DbTxUnwindExt, RawValue,
RawValue,
};
use reth_primitives_traits::{GotExpected, NodePrimitives, SignedTransaction};
use reth_provider::{
@@ -158,12 +158,13 @@ where
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Lookup latest tx id that we should unwind to
let latest_tx_id = provider
// Lookup the next tx id after unwind_to block (first tx to remove)
let unwind_tx_from = provider
.block_body_indices(unwind_to)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?
.last_tx_num();
provider.tx_ref().unwind_table_by_num::<tables::TransactionSenders>(latest_tx_id)?;
.next_tx_num();
EitherWriter::new_senders(provider, unwind_to)?.prune_senders(unwind_tx_from, unwind_to)?;
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)

View File

@@ -15,6 +15,7 @@ workspace = true
alloy-primitives.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
fixed-map.workspace = true
derive_more.workspace = true
serde = { workspace = true, features = ["alloc", "derive"] }
strum = { workspace = true, features = ["derive"] }
@@ -32,5 +33,6 @@ std = [
"serde/std",
"strum/std",
"serde_json/std",
"fixed-map/std",
]
clap = ["dep:clap"]

View File

@@ -21,6 +21,9 @@ use core::ops::RangeInclusive;
pub use event::StaticFileProducerEvent;
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
/// Map keyed by [`StaticFileSegment`].
pub type StaticFileMap<T> = alloc::boxed::Box<fixed_map::Map<StaticFileSegment, T>>;
/// Default static file block count.
pub const DEFAULT_BLOCKS_PER_STATIC_FILE: u64 = 500_000;

View File

@@ -22,6 +22,7 @@ use strum::{EnumIs, EnumString};
EnumIs,
Serialize,
Deserialize,
fixed_map::Key,
)]
#[strum(serialize_all = "kebab-case")]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]

View File

@@ -101,4 +101,11 @@ impl StorageSettings {
self.account_changesets_in_static_files = value;
self
}
/// Returns `true` if any tables are configured to be stored in `RocksDB`.
pub const fn any_in_rocksdb(&self) -> bool {
self.transaction_hash_numbers_in_rocksdb ||
self.account_history_in_rocksdb ||
self.storages_history_in_rocksdb
}
}

View File

@@ -16,8 +16,8 @@ use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
HashingWriter, HeaderProvider, HistoryWriter, MetadataWriter, OriginalValuesKnown,
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriter,
StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
@@ -334,7 +334,11 @@ where
Vec::new(),
);
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;
trace!(target: "reth::cli", "Inserted state");

View File

@@ -248,7 +248,7 @@ where
println!(
"{:?}\n",
tx.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.map_err(|_| format!("Could not find table: {}", T::NAME))
.map(|stats| {
let num_pages =

View File

@@ -278,7 +278,7 @@ impl DatabaseMetrics for DatabaseEnv {
let stats = tx
.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {table}"))?;
let page_size = stats.page_size() as usize;

View File

@@ -67,18 +67,25 @@ impl<K: TransactionKind> Tx<K> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
}
/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
if let Some(dbi) = self.dbis.get(T::NAME) {
/// Gets a table database handle by name if it exists, otherwise, check the
/// database, opening the DB if it exists.
pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
if let Some(dbi) = self.dbis.get(name) {
Ok(*dbi)
} else {
self.inner
.open_db(Some(T::NAME))
.open_db(Some(name))
.map(|db| db.dbi())
.map_err(|e| DatabaseError::Open(e.into()))
}
}
/// Gets a table database handle by name if it exists, otherwise, check the
/// database, opening the DB if it exists.
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
self.get_dbi_raw(T::NAME)
}
/// Create db Cursor
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
let inner = self
@@ -298,15 +305,7 @@ impl<K: TransactionKind> DbTx for Tx<K> {
fn commit(self) -> Result<(), DatabaseError> {
self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
Ok((true, _)) => (
Err(DatabaseError::Commit(
"transaction was aborted due to a previous error (MDBX_RESULT_TRUE)"
.to_string()
.into(),
)),
None,
),
Ok((false, latency)) => (Ok(()), Some(latency)),
Ok(latency) => (Ok(()), Some(latency)),
Err(e) => (Err(e), None),
}
})

View File

@@ -1,20 +1,22 @@
//! reth's static file database table import and access
use std::{collections::HashMap, path::Path};
use reth_nippy_jar::{NippyJar, NippyJarError};
use reth_static_file_types::{
SegmentHeader, SegmentRangeInclusive, StaticFileMap, StaticFileSegment,
};
use std::path::Path;
mod cursor;
pub use cursor::StaticFileCursor;
mod mask;
pub use mask::*;
use reth_nippy_jar::{NippyJar, NippyJarError};
mod masks;
pub use masks::*;
use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
/// Alias type for a map of [`StaticFileSegment`] and sorted lists of existing static file ranges.
type SortedStaticFiles = HashMap<StaticFileSegment, Vec<(SegmentRangeInclusive, SegmentHeader)>>;
type SortedStaticFiles = StaticFileMap<Vec<(SegmentRangeInclusive, SegmentHeader)>>;
/// Given the `static_files` directory path, it returns a list over the existing `static_files`
/// organized by [`StaticFileSegment`]. Each segment has a sorted list of block ranges and
@@ -44,8 +46,8 @@ pub fn iter_static_files(path: &Path) -> Result<SortedStaticFiles, NippyJarError
}
}
// Sort by block end range.
for range_list in static_files.values_mut() {
// Sort by block end range.
range_list.sort_by_key(|(block_range, _)| block_range.end());
}

View File

@@ -20,6 +20,9 @@ pub enum ProviderError {
/// Pruning error.
#[error(transparent)]
Pruning(#[from] PruneSegmentError),
/// Static file writer error.
#[error(transparent)]
StaticFileWriter(#[from] StaticFileWriterError),
/// RLP error.
#[error("{_0}")]
Rlp(alloy_rlp::Error),
@@ -216,18 +219,24 @@ pub struct RootMismatch {
pub block_hash: BlockHash,
}
/// A Static File Write Error.
#[derive(Debug, thiserror::Error)]
#[error("{message}")]
pub struct StaticFileWriterError {
/// The error message.
pub message: String,
/// A Static File Writer Error.
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum StaticFileWriterError {
/// Cannot call `sync_all` or `finalize` when prune is queued.
#[error("cannot call sync_all or finalize when prune is queued, use commit() instead")]
FinalizeWithPruneQueued,
/// Thread panicked during execution.
#[error("thread panicked: {_0}")]
ThreadPanic(&'static str),
/// Other error with message.
#[error("{0}")]
Other(String),
}
impl StaticFileWriterError {
/// Creates a new [`StaticFileWriterError`] with the given message.
/// Creates a new [`StaticFileWriterError::Other`] with the given message.
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
Self::Other(message.into())
}
}
/// Consistent database view error.

View File

@@ -12,10 +12,10 @@ fn bench_get_seq_iter(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = db.dbi();
c.bench_function("bench_get_seq_iter", |b| {
b.iter(|| {
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
let mut i = 0;
let mut count = 0u32;
@@ -54,11 +54,11 @@ fn bench_get_seq_cursor(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = db.dbi();
c.bench_function("bench_get_seq_cursor", |b| {
b.iter(|| {
let (i, count) = txn
.cursor(&db)
.cursor(dbi)
.unwrap()
.iter::<ObjectLength, ObjectLength>()
.map(Result::unwrap)

View File

@@ -42,7 +42,9 @@ impl TableObject for Cow<'_, [u8]> {
#[cfg(not(feature = "return-borrowed"))]
{
let is_dirty = (!K::IS_READ_ONLY) &&
crate::error::mdbx_result(ffi::mdbx_is_dirty(_txn, data_val.iov_base))?;
crate::error::mdbx_result(unsafe {
ffi::mdbx_is_dirty(_txn, data_val.iov_base)
})?;
Ok(if is_dirty { Cow::Owned(s.to_vec()) } else { Cow::Borrowed(s) })
}

View File

@@ -211,15 +211,14 @@ impl Environment {
let mut freelist: usize = 0;
let txn = self.begin_ro_txn()?;
let db = Database::freelist_db();
let cursor = txn.cursor(&db)?;
let cursor = txn.cursor(db.dbi())?;
for result in cursor.iter_slices() {
let (_key, value) = result?;
if value.len() < size_of::<usize>() {
if value.len() < size_of::<u32>() {
return Err(Error::Corrupted)
}
let s = &value[..size_of::<usize>()];
let s = &value[..size_of::<u32>()];
freelist += NativeEndian::read_u32(s) as usize;
}
@@ -990,7 +989,10 @@ mod tests {
result @ Err(_) => result.unwrap(),
}
}
tx.commit().unwrap();
// The transaction may be in an error state after hitting MapFull,
// so commit could fail. We don't care about the result here since
// the purpose of this test is to verify the HSR callback was called.
let _ = tx.commit();
}
// Expect the HSR to be called

View File

@@ -123,6 +123,12 @@ pub enum Error {
/// Read transaction has been timed out.
#[error("read transaction has been timed out")]
ReadTransactionTimeout,
/// The transaction commit was aborted due to previous errors.
///
/// This can happen in exceptionally rare cases and it signals the problem coming from inside
/// of mdbx.
#[error("botched transaction")]
BotchedTransaction,
/// Permission defined
#[error("permission denied to setup database")]
Permission,
@@ -204,6 +210,7 @@ impl Error {
Self::WriteTransactionUnsupportedInReadOnlyMode |
Self::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Self::ReadTransactionTimeout => -96000, // Custom non-MDBX error code
Self::BotchedTransaction => -96001,
Self::Permission => ffi::MDBX_EPERM,
Self::Other(err_code) => *err_code,
}
@@ -216,6 +223,14 @@ impl From<Error> for i32 {
}
}
/// Parses an MDBX error code into a result type.
///
/// Note that this function returns `Ok(false)` on `MDBX_SUCCESS` and
/// `Ok(true)` on `MDBX_RESULT_TRUE`. The return value requires extra
/// care since its interpretation depends on the callee being called.
///
/// The most unintuitive case is `mdbx_txn_commit` which returns `Ok(true)`
/// when the commit has been aborted.
#[inline]
pub(crate) const fn mdbx_result(err_code: c_int) -> Result<bool> {
match err_code {

View File

@@ -170,8 +170,8 @@ where
/// Commits the transaction.
///
/// Any pending operations will be saved.
pub fn commit(self) -> Result<(bool, CommitLatency)> {
let result = self.txn_execute(|txn| {
pub fn commit(self) -> Result<CommitLatency> {
match self.txn_execute(|txn| {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env().txn_manager().remove_active_read_transaction(txn);
@@ -186,10 +186,21 @@ where
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
rx.recv().unwrap()
}
})?;
self.inner.set_committed();
result
})? {
//
Ok((false, lat)) => {
self.inner.set_committed();
Ok(lat)
}
Ok((true, _)) => {
// MDBX_RESULT_TRUE means the transaction was aborted due to prior errors.
// The transaction is still finished/freed by MDBX, so we must mark it as
// committed to prevent the Drop impl from trying to abort it again.
self.inner.set_committed();
Err(Error::BotchedTransaction)
}
Err(e) => Err(e),
}
}
/// Opens a handle to an MDBX database.
@@ -208,11 +219,11 @@ where
}
/// Gets the option flags for the given database in the transaction.
pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
pub fn db_flags(&self, dbi: ffi::MDBX_dbi) -> Result<DatabaseFlags> {
let mut flags: c_uint = 0;
unsafe {
self.txn_execute(|txn| {
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, dbi, &mut flags, ptr::null_mut()))
})??;
}
@@ -222,8 +233,8 @@ where
}
/// Retrieves database statistics.
pub fn db_stat(&self, db: &Database) -> Result<Stat> {
self.db_stat_with_dbi(db.dbi())
pub fn db_stat(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
self.db_stat_with_dbi(dbi)
}
/// Retrieves database statistics by the given dbi.
@@ -238,8 +249,8 @@ where
}
/// Open a new cursor on the given database.
pub fn cursor(&self, db: &Database) -> Result<Cursor<K>> {
Cursor::new(self.clone(), db.dbi())
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
Cursor::new(self.clone(), dbi)
}
/// Open a new cursor on the given dbi.
@@ -400,7 +411,7 @@ impl Transaction<RW> {
#[allow(clippy::mut_from_ref)]
pub fn reserve(
&self,
db: &Database,
dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>,
len: usize,
flags: WriteFlags,
@@ -412,13 +423,7 @@ impl Transaction<RW> {
ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
unsafe {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_put(
txn,
db.dbi(),
&key_val,
&mut data_val,
flags.bits() | ffi::MDBX_RESERVE,
)
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits() | ffi::MDBX_RESERVE)
})?)?;
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
}
@@ -473,10 +478,10 @@ impl Transaction<RW> {
/// Drops the database from the environment.
///
/// # Safety
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, db.dbi(), true) })?)?;
/// Caller must close ALL other [Database] and [Cursor] instances pointing
/// to the same dbi BEFORE calling this function.
pub unsafe fn drop_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, true) })?)?;
Ok(())
}
@@ -488,8 +493,8 @@ impl Transaction<RO> {
/// # Safety
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()) })?;
pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), dbi) })?;
Ok(())
}

View File

@@ -58,6 +58,9 @@ impl TxnManager {
match rx.recv() {
Ok(msg) => match msg {
TxnManagerMessage::Begin { parent, flags, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
.entered();
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
let res = mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(
@@ -72,9 +75,13 @@ impl TxnManager {
sender.send(res).unwrap();
}
TxnManagerMessage::Abort { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
sender
.send({
let mut latency = CommitLatency::new();

View File

@@ -9,15 +9,15 @@ fn test_get() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
assert_eq!(None, txn.cursor(&db).unwrap().first::<(), ()>().unwrap());
assert_eq!(None, txn.cursor(dbi).unwrap().first::<(), ()>().unwrap());
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.next().unwrap(), Some((*b"key2", *b"val2")));
@@ -34,15 +34,15 @@ fn test_get_dup() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
let dbi = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap().dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.first_dup().unwrap(), Some(*b"val1"));
assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1")));
@@ -78,15 +78,16 @@ fn test_get_dupfixed() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val4", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val5", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val6", WriteFlags::empty()).unwrap();
let dbi =
txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap().dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val4", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val5", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val6", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.get_multiple().unwrap(), Some(*b"val1val2val3"));
assert_eq!(cursor.next_multiple::<(), ()>().unwrap(), None);
@@ -110,12 +111,12 @@ fn test_iter() {
for (key, data) in &items {
txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
}
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
// Because Result implements FromIterator, we can collect the iterator
// of items of type Result<_, E> into a Result<Vec<_, E>> by specifying
@@ -155,8 +156,8 @@ fn test_iter_empty_database() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert!(cursor.iter::<(), ()>().next().is_none());
assert!(cursor.iter_start::<(), ()>().next().is_none());
@@ -173,8 +174,8 @@ fn test_iter_empty_dup_database() {
txn.commit().unwrap();
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert!(cursor.iter::<(), ()>().next().is_none());
assert!(cursor.iter_start::<(), ()>().next().is_none());
@@ -223,8 +224,8 @@ fn test_iter_dup() {
}
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(items, cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap());
cursor.set::<()>(b"b").unwrap();
@@ -271,9 +272,9 @@ fn test_iter_del_get() {
let items = vec![(*b"a", *b"1"), (*b"b", *b"2")];
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
let dbi = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap().dbi();
assert_eq!(
txn.cursor(&db)
txn.cursor(dbi)
.unwrap()
.iter_dup_of::<(), ()>(b"a")
.collect::<Result<Vec<_>>>()
@@ -294,8 +295,8 @@ fn test_iter_del_get() {
}
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(items, cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap());
assert_eq!(
@@ -316,8 +317,8 @@ fn test_put_del() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
cursor.put(b"key1", b"val1", WriteFlags::empty()).unwrap();
cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap();

View File

@@ -50,9 +50,9 @@ fn test_put_get_del_multi() {
txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut cur = txn.cursor(&db).unwrap();
let mut cur = txn.cursor(dbi).unwrap();
let iter = cur.iter_dup_of::<(), [u8; 4]>(b"key1");
let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
assert_eq!(vals, vec![*b"val1", *b"val2", *b"val3"]);
@@ -66,9 +66,9 @@ fn test_put_get_del_multi() {
txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut cur = txn.cursor(&db).unwrap();
let mut cur = txn.cursor(dbi).unwrap();
let iter = cur.iter_dup_of::<(), [u8; 4]>(b"key1");
let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
assert_eq!(vals, vec![*b"val1", *b"val3"]);
@@ -103,9 +103,9 @@ fn test_reserve() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut writer = txn.reserve(&db, b"key1", 4, WriteFlags::empty()).unwrap();
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
}
txn.commit().unwrap();
@@ -148,13 +148,13 @@ fn test_clear_db() {
{
let txn = env.begin_rw_txn().unwrap();
txn.put(txn.open_db(None).unwrap().dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
{
let txn = env.begin_rw_txn().unwrap();
txn.clear_db(txn.open_db(None).unwrap().dbi()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
let txn = env.begin_ro_txn().unwrap();
@@ -178,16 +178,16 @@ fn test_drop_db() {
.unwrap();
// Workaround for MDBX dbi drop issue
txn.create_db(Some("canary"), DatabaseFlags::empty()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(Some("test")).unwrap();
let dbi = txn.open_db(Some("test")).unwrap().dbi();
unsafe {
txn.drop_db(db).unwrap();
txn.drop_db(dbi).unwrap();
}
assert!(matches!(txn.open_db(Some("test")).unwrap_err(), Error::NotFound));
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
}
@@ -291,8 +291,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 3);
}
@@ -304,8 +304,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 1);
}
@@ -318,8 +318,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 4);
}
}
@@ -331,20 +331,22 @@ fn test_stat_dupsort() {
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
let dbi = db.dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 9);
}
@@ -356,7 +358,8 @@ fn test_stat_dupsort() {
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 5);
}
@@ -369,7 +372,8 @@ fn test_stat_dupsort() {
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 8);
}
}

View File

@@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fs::File,
io::{Read, Write},
io::{self, Read, Write},
ops::Range,
path::{Path, PathBuf},
};
@@ -201,11 +201,11 @@ impl<H: NippyJarHeader> NippyJar<H> {
let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
let config_file = File::open(&config_path)
.inspect_err(|e| {
warn!( ?path, %e, "Failed to load static file jar");
warn!(?path, %e, "Failed to load static file jar");
})
.map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
let mut obj = Self::load_from_reader(config_file)?;
let mut obj = Self::load_from_reader(io::BufReader::new(config_file))?;
obj.path = path.to_path_buf();
Ok(obj)
}
@@ -418,10 +418,15 @@ impl DataReader {
&self.data_mmap[range]
}
/// Returns total size of data
/// Returns total size of data file.
pub fn size(&self) -> usize {
self.data_mmap.len()
}
/// Returns total size of offsets file.
pub fn offsets_size(&self) -> usize {
self.offset_mmap.len()
}
}
#[cfg(test)]

View File

@@ -347,11 +347,27 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
/// Commits configuration and offsets to disk. It drains the internal offset list.
pub fn commit(&mut self) -> Result<(), NippyJarError> {
self.sync_all()?;
self.finalize()?;
Ok(())
}
/// Syncs data and offsets to disk.
///
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
/// configuration and mark the writer as clean.
pub fn sync_all(&mut self) -> Result<(), NippyJarError> {
self.data_file.flush()?;
self.data_file.get_ref().sync_all()?;
self.commit_offsets()?;
Ok(())
}
/// Commits configuration to disk and marks the writer as clean.
///
/// Must be called after [`Self::sync_all`] to complete the commit.
pub fn finalize(&mut self) -> Result<(), NippyJarError> {
// Flushes `max_row_size` and total `rows` to disk.
self.jar.freeze_config()?;
self.dirty = false;

View File

@@ -17,7 +17,6 @@ reth-chainspec.workspace = true
reth-execution-types.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["reth-codec"] }
reth-primitives-traits = { workspace = true, features = ["reth-codec", "secp256k1"] }
reth-fs-util.workspace = true
reth-errors.workspace = true
reth-storage-errors.workspace = true
reth-storage-api = { workspace = true, features = ["std", "db-api"] }

View File

@@ -10,7 +10,7 @@ use std::{
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksDBBatch;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
@@ -429,6 +429,41 @@ where
}
}
/// Puts multiple transaction hash number mappings in a batch.
///
/// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
/// This is more efficient than calling `put_transaction_hash_number` repeatedly.
///
/// When `append_only` is true, uses `cursor.append()` which requires entries to be
/// pre-sorted and the table to be empty or have only lower keys.
/// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_numbers_batch(
&mut self,
entries: Vec<(TxHash, TxNumber)>,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => {
for (hash, tx_num) in entries {
if append_only {
cursor.append(hash, &tx_num)?;
} else {
cursor.upsert(hash, &tx_num)?;
}
}
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => {
for (hash, tx_num) in entries {
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
}
Ok(())
}
}
}
/// Deletes a transaction hash number mapping.
pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
match self {
@@ -708,7 +743,7 @@ impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
where
CURSOR: DbCursorRO<tables::StoragesHistory>,
{
/// Gets a storage history entry.
/// Gets a storage history shard entry for the given [`StorageShardedKey`], if present.
pub fn get_storage_history(
&mut self,
key: StorageShardedKey,
@@ -720,13 +755,43 @@ where
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
}
}
/// Lookup storage history and return [`HistoryInfo`].
pub fn storage_history_info(
&mut self,
address: Address,
storage_key: alloy_primitives::B256,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
let key = StorageShardedKey::new(address, storage_key, block_number);
history_info::<tables::StoragesHistory, _, _>(
cursor,
key,
block_number,
|k| k.address == address && k.sharded_key.key == storage_key,
lowest_available_block_number,
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.storage_history_info(
address,
storage_key,
block_number,
lowest_available_block_number,
),
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
where
CURSOR: DbCursorRO<tables::AccountsHistory>,
{
/// Gets an account history entry.
/// Gets an account history shard entry for the given [`ShardedKey`], if present.
pub fn get_account_history(
&mut self,
key: ShardedKey<Address>,
@@ -738,6 +803,32 @@ where
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
}
}
/// Lookup account history and return [`HistoryInfo`].
pub fn account_history_info(
&mut self,
address: Address,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
let key = ShardedKey::new(address, block_number);
history_info::<tables::AccountsHistory, _, _>(
cursor,
key,
block_number,
|k| k.key == address,
lowest_available_block_number,
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => {
tx.account_history_info(address, block_number, lowest_available_block_number)
}
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
@@ -894,8 +985,11 @@ mod rocksdb_tests {
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
transaction::DbTxMut,
};
use reth_ethereum_primitives::EthPrimitives;
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use std::marker::PhantomData;
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -1125,10 +1219,391 @@ mod rocksdb_tests {
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
}
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
// ==================== Parametrized Backend Equivalence Tests ====================
//
// These tests verify that MDBX and RocksDB produce identical results for history lookups.
// Each scenario sets up the same data in both backends and asserts identical HistoryInfo.
/// Query parameters for a history lookup test case.
struct HistoryQuery {
block_number: BlockNumber,
lowest_available: Option<BlockNumber>,
expected: HistoryInfo,
}
// Type aliases for cursor types (needed for EitherWriter/EitherReader type inference)
type AccountsHistoryWriteCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
type StoragesHistoryWriteCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
type AccountsHistoryReadCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
type StoragesHistoryReadCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
/// Runs the same account history queries against both MDBX and `RocksDB` backends,
/// asserting they produce identical results.
fn run_account_history_scenario(
scenario_name: &str,
address: Address,
shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
queries: &[HistoryQuery],
) {
// Setup MDBX and RocksDB with identical data using EitherWriter
let factory = create_test_provider_factory();
let mdbx_provider = factory.database_provider_rw().unwrap();
let (temp_dir, rocks_provider) = create_rocksdb_provider();
// Create writers for both backends
let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
EitherWriter::Database(
mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
);
let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
EitherWriter::RocksDB(rocks_provider.batch());
// Write identical data to both backends in a single loop
for (highest_block, blocks) in shards {
let key = ShardedKey::new(address, *highest_block);
let value = IntegerList::new(blocks.clone()).unwrap();
mdbx_writer.put_account_history(key.clone(), &value).unwrap();
rocks_writer.put_account_history(key, &value).unwrap();
}
// Commit both backends
drop(mdbx_writer);
mdbx_provider.commit().unwrap();
if let EitherWriter::RocksDB(batch) = rocks_writer {
batch.commit().unwrap();
}
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
EitherReader::Database(
mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
PhantomData,
);
let mdbx_result = mdbx_reader
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
// RocksDB query via EitherReader
let mut rocks_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
// Assert both backends produce identical results
assert_eq!(
mdbx_result,
rocks_result,
"Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
MDBX: {:?}, RocksDB: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
rocks_result
);
// Also verify against expected result
assert_eq!(
mdbx_result,
query.expected,
"Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
Got: {:?}, Expected: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
query.expected
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
/// Runs the same storage history queries against both MDBX and `RocksDB` backends,
/// asserting they produce identical results.
fn run_storage_history_scenario(
scenario_name: &str,
address: Address,
storage_key: B256,
shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
queries: &[HistoryQuery],
) {
// Setup MDBX and RocksDB with identical data using EitherWriter
let factory = create_test_provider_factory();
let mdbx_provider = factory.database_provider_rw().unwrap();
let (temp_dir, rocks_provider) = create_rocksdb_provider();
// Create writers for both backends
let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
EitherWriter::Database(
mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
);
let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
EitherWriter::RocksDB(rocks_provider.batch());
// Write identical data to both backends in a single loop
for (highest_block, blocks) in shards {
let key = StorageShardedKey::new(address, storage_key, *highest_block);
let value = IntegerList::new(blocks.clone()).unwrap();
mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
rocks_writer.put_storage_history(key, &value).unwrap();
}
// Commit both backends
drop(mdbx_writer);
mdbx_provider.commit().unwrap();
if let EitherWriter::RocksDB(batch) = rocks_writer {
batch.commit().unwrap();
}
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
EitherReader::Database(
mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
PhantomData,
);
let mdbx_result = mdbx_reader
.storage_history_info(
address,
storage_key,
query.block_number,
query.lowest_available,
)
.unwrap();
// RocksDB query via EitherReader
let mut rocks_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
.storage_history_info(
address,
storage_key,
query.block_number,
query.lowest_available,
)
.unwrap();
// Assert both backends produce identical results
assert_eq!(
mdbx_result,
rocks_result,
"Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
MDBX: {:?}, RocksDB: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
rocks_result
);
// Also verify against expected result
assert_eq!(
mdbx_result,
query.expected,
"Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
Got: {:?}, Expected: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
query.expected
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
/// Tests account history lookups across both MDBX and `RocksDB` backends.
///
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
/// in a single place, making it easier to reason about commit ordering and consistency.
/// Covers the following scenarios from PR2's `RocksDB`-only tests:
/// 1. Single shard - basic lookups within one shard
/// 2. Multiple shards - `prev()` shard detection and transitions
/// 3. No history - query address with no entries
/// 4. Pruning boundary - `lowest_available` boundary behavior (block at/after boundary)
#[test]
fn test_account_history_info_both_backends() {
let address = Address::from([0x42; 20]);
// Scenario 1: Single shard with blocks [100, 200, 300]
run_account_history_scenario(
"single_shard",
address,
&[(u64::MAX, vec![100, 200, 300])],
&[
// Before first entry -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Between entries -> InChangeset(next_write)
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// Exact match on entry -> InChangeset(same_block)
HistoryQuery {
block_number: 300,
lowest_available: None,
expected: HistoryInfo::InChangeset(300),
},
// After last entry in last shard -> InPlainState
HistoryQuery {
block_number: 500,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// Scenario 2: Multiple shards - tests prev() shard detection
run_account_history_scenario(
"multiple_shards",
address,
&[
(500, vec![100, 200, 300, 400, 500]), // First shard ends at 500
(u64::MAX, vec![600, 700, 800]), // Last shard
],
&[
// Before first shard, no prev -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Within first shard
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// Between shards - prev() should find first shard
HistoryQuery {
block_number: 550,
lowest_available: None,
expected: HistoryInfo::InChangeset(600),
},
// After all entries
HistoryQuery {
block_number: 900,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// Scenario 3: No history for address
let address_without_history = Address::from([0x43; 20]);
run_account_history_scenario(
"no_history",
address_without_history,
&[], // No shards for this address
&[HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
}],
);
// Scenario 4: Query at pruning boundary
// Note: We test block >= lowest_available because HistoricalStateProviderRef
// errors on blocks below the pruning boundary before doing the lookup.
// The RocksDB implementation doesn't have this check at the same level.
// This tests that when pruning IS available, both backends agree.
run_account_history_scenario(
"with_pruning_boundary",
address,
&[(u64::MAX, vec![100, 200, 300])],
&[
// At pruning boundary -> InChangeset(first entry after block)
HistoryQuery {
block_number: 100,
lowest_available: Some(100),
expected: HistoryInfo::InChangeset(100),
},
// After pruning boundary, between entries
HistoryQuery {
block_number: 150,
lowest_available: Some(100),
expected: HistoryInfo::InChangeset(200),
},
],
);
}
/// Tests storage history lookups across both MDBX and `RocksDB` backends.
#[test]
fn test_storage_history_info_both_backends() {
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
let other_storage_key = B256::from([0x02; 32]);
// Single shard with blocks [100, 200, 300]
run_storage_history_scenario(
"storage_single_shard",
address,
storage_key,
&[(u64::MAX, vec![100, 200, 300])],
&[
// Before first entry -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Between entries -> InChangeset(next_write)
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// After last entry -> InPlainState
HistoryQuery {
block_number: 500,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// No history for different storage key
run_storage_history_scenario(
"storage_no_history",
address,
other_storage_key,
&[], // No shards for this storage key
&[HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
}],
);
}
/// Test that `RocksDB` batches created via `EitherWriter` are only made visible when
/// `provider.commit()` is called, not when the writer is dropped.
#[test]
fn test_rocksdb_commits_at_provider_level() {
let factory = create_test_provider_factory();

View File

@@ -21,7 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
StaticFileAccess, StaticFileProviderBuilder, StaticFileWriter,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
};
pub mod changeset_walker;
@@ -44,8 +45,8 @@ pub use revm_database::states::OriginalValuesKnown;
// reexport traits to avoid breaking changes
pub use reth_static_file_types as static_file;
pub use reth_storage_api::{
HistoryWriter, MetadataProvider, MetadataWriter, StatsReader, StorageSettings,
StorageSettingsCache,
HistoryWriter, MetadataProvider, MetadataWriter, StateWriteConfig, StatsReader,
StorageSettings, StorageSettingsCache,
};
/// Re-export provider error.
pub use reth_storage_errors::provider::{ProviderError, ProviderResult};

View File

@@ -789,7 +789,7 @@ mod tests {
create_test_provider_factory, create_test_provider_factory_with_chain_spec,
MockNodeTypesWithDB,
},
BlockWriter, CanonChainTracker, ProviderFactory,
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode,
};
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, TxNumber, B256};
@@ -808,8 +808,8 @@ mod tests {
use reth_storage_api::{
BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
BlockReaderIdExt, BlockSource, ChangeSetReader, DBProvider, DatabaseProviderFactory,
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, StateWriter,
TransactionVariant, TransactionsProvider,
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory,
StateWriteConfig, StateWriter, TransactionVariant, TransactionsProvider,
};
use reth_testing_utils::generators::{
self, random_block, random_block_range, random_changeset_range, random_eoa_accounts,
@@ -907,6 +907,7 @@ mod tests {
..Default::default()
},
OriginalValuesKnown::No,
StateWriteConfig::default(),
)?;
}
@@ -997,7 +998,7 @@ mod tests {
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![lowest_memory_block]).unwrap();
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
provider_rw.commit().unwrap();
// Remove from memory

View File

@@ -40,16 +40,8 @@ pub(crate) enum Action {
InsertHeaderNumbers,
InsertBlockBodyIndices,
InsertTransactionBlocks,
GetNextTxNum,
InsertTransactionSenders,
InsertTransactionHashNumbers,
SaveBlocksInsertBlock,
SaveBlocksWriteState,
SaveBlocksWriteHashedState,
SaveBlocksWriteTrieChangesets,
SaveBlocksWriteTrieUpdates,
SaveBlocksUpdateHistoryIndices,
SaveBlocksUpdatePipelineStages,
}
/// Database provider metrics
@@ -66,19 +58,24 @@ pub(crate) struct DatabaseProviderMetrics {
insert_history_indices: Histogram,
/// Duration of update pipeline stages
update_pipeline_stages: Histogram,
/// Duration of insert canonical headers
/// Duration of insert header numbers
insert_header_numbers: Histogram,
/// Duration of insert block body indices
insert_block_body_indices: Histogram,
/// Duration of insert transaction blocks
insert_tx_blocks: Histogram,
/// Duration of get next tx num
get_next_tx_num: Histogram,
/// Duration of insert transaction senders
insert_transaction_senders: Histogram,
/// Duration of insert transaction hash numbers
insert_transaction_hash_numbers: Histogram,
/// Duration of `save_blocks`
save_blocks_total: Histogram,
/// Duration of MDBX work in `save_blocks`
save_blocks_mdbx: Histogram,
/// Duration of static file work in `save_blocks`
save_blocks_sf: Histogram,
/// Duration of `RocksDB` work in `save_blocks`
save_blocks_rocksdb: Histogram,
/// Duration of `insert_block` in `save_blocks`
save_blocks_insert_block: Histogram,
/// Duration of `write_state` in `save_blocks`
@@ -93,6 +90,39 @@ pub(crate) struct DatabaseProviderMetrics {
save_blocks_update_history_indices: Histogram,
/// Duration of `update_pipeline_stages` in `save_blocks`
save_blocks_update_pipeline_stages: Histogram,
/// Number of blocks per `save_blocks` call
save_blocks_block_count: Histogram,
/// Duration of MDBX commit in `save_blocks`
save_blocks_commit_mdbx: Histogram,
/// Duration of static file commit in `save_blocks`
save_blocks_commit_sf: Histogram,
/// Duration of `RocksDB` commit in `save_blocks`
save_blocks_commit_rocksdb: Histogram,
}
/// Timings collected during a `save_blocks` call.
#[derive(Debug, Default)]
pub(crate) struct SaveBlocksTimings {
pub total: Duration,
pub mdbx: Duration,
pub sf: Duration,
pub rocksdb: Duration,
pub insert_block: Duration,
pub write_state: Duration,
pub write_hashed_state: Duration,
pub write_trie_changesets: Duration,
pub write_trie_updates: Duration,
pub update_history_indices: Duration,
pub update_pipeline_stages: Duration,
pub block_count: u64,
}
/// Timings collected during a `commit` call.
#[derive(Debug, Default)]
pub(crate) struct CommitTimings {
pub mdbx: Duration,
pub sf: Duration,
pub rocksdb: Duration,
}
impl DatabaseProviderMetrics {
@@ -107,28 +137,33 @@ impl DatabaseProviderMetrics {
Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration),
Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration),
Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration),
Action::GetNextTxNum => self.get_next_tx_num.record(duration),
Action::InsertTransactionSenders => self.insert_transaction_senders.record(duration),
Action::InsertTransactionHashNumbers => {
self.insert_transaction_hash_numbers.record(duration)
}
Action::SaveBlocksInsertBlock => self.save_blocks_insert_block.record(duration),
Action::SaveBlocksWriteState => self.save_blocks_write_state.record(duration),
Action::SaveBlocksWriteHashedState => {
self.save_blocks_write_hashed_state.record(duration)
}
Action::SaveBlocksWriteTrieChangesets => {
self.save_blocks_write_trie_changesets.record(duration)
}
Action::SaveBlocksWriteTrieUpdates => {
self.save_blocks_write_trie_updates.record(duration)
}
Action::SaveBlocksUpdateHistoryIndices => {
self.save_blocks_update_history_indices.record(duration)
}
Action::SaveBlocksUpdatePipelineStages => {
self.save_blocks_update_pipeline_stages.record(duration)
}
}
}
/// Records all `save_blocks` timings.
pub(crate) fn record_save_blocks(&self, timings: &SaveBlocksTimings) {
self.save_blocks_total.record(timings.total);
self.save_blocks_mdbx.record(timings.mdbx);
self.save_blocks_sf.record(timings.sf);
self.save_blocks_rocksdb.record(timings.rocksdb);
self.save_blocks_insert_block.record(timings.insert_block);
self.save_blocks_write_state.record(timings.write_state);
self.save_blocks_write_hashed_state.record(timings.write_hashed_state);
self.save_blocks_write_trie_changesets.record(timings.write_trie_changesets);
self.save_blocks_write_trie_updates.record(timings.write_trie_updates);
self.save_blocks_update_history_indices.record(timings.update_history_indices);
self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages);
self.save_blocks_block_count.record(timings.block_count as f64);
}
/// Records all commit timings.
pub(crate) fn record_commit(&self, timings: &CommitTimings) {
self.save_blocks_commit_mdbx.record(timings.mdbx);
self.save_blocks_commit_sf.record(timings.sf);
self.save_blocks_commit_rocksdb.record(timings.rocksdb);
}
}

View File

@@ -43,7 +43,7 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;

View File

@@ -4,8 +4,8 @@ use crate::{
},
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
static_file::StaticFileWriter,
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
to_range,
@@ -35,7 +35,7 @@ use alloy_primitives::{
use itertools::Itertools;
use parking_lot::RwLock;
use rayon::slice::ParallelSliceMut;
use reth_chain_state::ExecutedBlock;
use reth_chain_state::{ComputedTrieData, ExecutedBlock};
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
@@ -61,10 +61,10 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{
BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
TryIntoHistoricalStateProvider,
NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
StorageSettingsCache, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
use reth_trie::{
trie_cursor::{
InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
@@ -85,9 +85,10 @@ use std::{
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
sync::Arc,
time::{Duration, Instant},
thread,
time::Instant,
};
use tracing::{debug, trace};
use tracing::{debug, instrument, trace};
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@@ -150,6 +151,25 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
}
}
/// Mode for [`DatabaseProvider::save_blocks`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SaveBlocksMode {
/// Full mode: write block structure + receipts + state + trie.
/// Used by engine/production code.
Full,
/// Blocks only: write block structure (headers, txs, senders, indices).
/// Receipts/state/trie are skipped - they may come later via separate calls.
/// Used by `insert_block`.
BlocksOnly,
}
impl SaveBlocksMode {
/// Returns `true` if this is [`SaveBlocksMode::Full`].
pub const fn with_state(self) -> bool {
matches!(self, Self::Full)
}
}
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
pub struct DatabaseProvider<TX, N: NodeTypes> {
@@ -168,8 +188,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -185,10 +205,10 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
.field("rocksdb_provider", &self.rocksdb_provider)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
}
@@ -316,8 +336,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -356,98 +375,288 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok(result)
}
/// Creates the context for static file writes.
fn static_file_write_ctx(
&self,
save_mode: SaveBlocksMode,
first_block: BlockNumber,
last_block: BlockNumber,
) -> ProviderResult<StaticFileWriteCtx> {
let tip = self.last_block_number()?.max(last_block);
Ok(StaticFileWriteCtx {
write_senders: EitherWriterDestination::senders(self).is_static_file() &&
self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
write_receipts: save_mode.with_state() &&
EitherWriter::receipts_destination(self).is_static_file(),
write_account_changesets: save_mode.with_state() &&
EitherWriterDestination::account_changesets(self).is_static_file(),
tip,
receipts_prune_mode: self.prune_modes.receipts,
// Receipts are prunable if no receipts exist in SF yet and within pruning distance
receipts_prunable: self
.static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Receipts)
.is_none() &&
PruneMode::Distance(self.minimum_pruning_distance)
.should_prune(first_block, tip),
})
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
}
}
/// Writes executed blocks and state to storage.
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
///
/// This method parallelizes static file (SF) writes with MDBX writes.
/// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
/// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
///
/// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
/// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
#[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
save_mode: SaveBlocksMode,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "providers::db", "Attempted to write empty block range");
return Ok(())
}
// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().recovered_block();
let total_start = Instant::now();
let block_count = blocks.len() as u64;
let first_number = blocks.first().unwrap().recovered_block().number();
let last_block_number = blocks.last().unwrap().recovered_block().number();
let last_block = blocks.last().unwrap().recovered_block();
let first_number = first_block.number();
let last_block_number = last_block.number();
debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
// Compute tx_nums upfront (both threads need these)
let first_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
// Accumulate durations for each step
let mut total_insert_block = Duration::ZERO;
let mut total_write_state = Duration::ZERO;
let mut total_write_hashed_state = Duration::ZERO;
let mut total_write_trie_changesets = Duration::ZERO;
let mut total_write_trie_updates = Duration::ZERO;
let tx_nums: Vec<TxNumber> = {
let mut nums = Vec::with_capacity(blocks.len());
let mut current = first_tx_num;
for block in &blocks {
nums.push(current);
current += block.recovered_block().body().transaction_count() as u64;
}
nums
};
// TODO: Do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * indices (already done basically)
// Insert the blocks
for block in blocks {
let trie_data = block.trie_data();
let ExecutedBlock { recovered_block, execution_output, .. } = block;
let block_number = recovered_block.number();
let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
thread::scope(|s| {
// SF writes
let sf_handle = s.spawn(|| {
let start = Instant::now();
sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
Ok::<_, ProviderError>(start.elapsed())
});
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
s.spawn(|| {
let start = Instant::now();
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
Ok::<_, ProviderError>(start.elapsed())
})
});
// MDBX writes
let mdbx_start = Instant::now();
// Collect all transaction hashes across all blocks, sort them, and write in batch
if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
{
let start = Instant::now();
let mut all_tx_hashes = Vec::new();
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
let mut tx_num = tx_nums[i];
for transaction in recovered_block.body().transactions_iter() {
all_tx_hashes.push((*transaction.tx_hash(), tx_num));
tx_num += 1;
}
}
// Sort by hash for optimal MDBX insertion performance
all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
// Write all transaction hash numbers in a single batch
self.with_rocksdb_batch(|batch| {
let mut tx_hash_writer =
EitherWriter::new_transaction_hash_numbers(self, batch)?;
tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
Ok(((), raw_batch))
})?;
self.metrics.record_duration(
metrics::Action::InsertTransactionHashNumbers,
start.elapsed(),
);
}
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
let start = Instant::now();
self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
timings.insert_block += start.elapsed();
if save_mode.with_state() {
let execution_output = block.execution_outcome();
let block_number = recovered_block.number();
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
// Skip receipts/account changesets if they're being written to static files.
let start = Instant::now();
self.write_state(
execution_output,
OriginalValuesKnown::No,
StateWriteConfig {
write_receipts: !sf_ctx.write_receipts,
write_account_changesets: !sf_ctx.write_account_changesets,
},
)?;
timings.write_state += start.elapsed();
let trie_data = block.trie_data();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
timings.write_hashed_state += start.elapsed();
let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
timings.write_trie_changesets += start.elapsed();
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
timings.write_trie_updates += start.elapsed();
}
}
// Full mode: update history indices
if save_mode.with_state() {
let start = Instant::now();
self.update_history_indices(first_number..=last_block_number)?;
timings.update_history_indices = start.elapsed();
}
// Update pipeline progress
let start = Instant::now();
self.insert_block(&recovered_block)?;
total_insert_block += start.elapsed();
self.update_pipeline_stages(last_block_number, false)?;
timings.update_pipeline_stages = start.elapsed();
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let start = Instant::now();
self.write_state(&execution_output, OriginalValuesKnown::No)?;
total_write_state += start.elapsed();
timings.mdbx = mdbx_start.elapsed();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
total_write_hashed_state += start.elapsed();
// Wait for SF thread
timings.sf = sf_handle
.join()
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
total_write_trie_changesets += start.elapsed();
// Wait for RocksDB thread
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(handle) = rocksdb_handle {
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
}
timings.total = total_start.elapsed();
self.metrics.record_save_blocks(&timings);
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
Ok(())
})
}
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
///
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn insert_block_mdbx_only(
&self,
block: &RecoveredBlock<BlockTy<N>>,
first_tx_num: TxNumber,
) -> ProviderResult<StoredBlockBodyIndices> {
if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
EitherWriterDestination::senders(self).is_database()
{
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
total_write_trie_updates += start.elapsed();
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
cursor.append(tx_num, &sender)?;
}
self.metrics
.record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
}
// update history indices
let block_number = block.number();
let tx_count = block.body().transaction_count() as u64;
let start = Instant::now();
self.update_history_indices(first_number..=last_block_number)?;
let duration_update_history_indices = start.elapsed();
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
// Update pipeline progress
self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
}
/// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
/// `Ommers`/`Withdrawals`).
fn write_block_body_indices(
&self,
block_number: BlockNumber,
body: &BodyTy<N>,
first_tx_num: TxNumber,
tx_count: u64,
) -> ProviderResult<()> {
// MDBX: BlockBodyIndices
let start = Instant::now();
self.update_pipeline_stages(last_block_number, false)?;
let duration_update_pipeline_stages = start.elapsed();
self.tx
.cursor_write::<tables::BlockBodyIndices>()?
.append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
// Record all metrics at the end
self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
self.metrics
.record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state);
self.metrics.record_duration(
metrics::Action::SaveBlocksWriteTrieChangesets,
total_write_trie_changesets,
);
self.metrics
.record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdateHistoryIndices,
duration_update_history_indices,
);
self.metrics.record_duration(
metrics::Action::SaveBlocksUpdatePipelineStages,
duration_update_pipeline_stages,
);
// MDBX: TransactionBlocks (last tx -> block mapping)
if tx_count > 0 {
let start = Instant::now();
self.tx
.cursor_write::<tables::TransactionBlocks>()?
.append(first_tx_num + tx_count - 1, &block_number)?;
self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
}
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
// MDBX: Ommers/Withdrawals
self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
Ok(())
}
@@ -642,8 +851,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -1727,6 +1935,7 @@ impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N
Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
}
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn update_pipeline_stages(
&self,
block_number: BlockNumber,
@@ -1817,24 +2026,31 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
{
type Receipt = ReceiptTy<N>;
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_state(
&self,
execution_outcome: &ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown,
config: StateWriteConfig,
) -> ProviderResult<()> {
let first_block = execution_outcome.first_block();
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, first_block, config)?;
self.write_state_changes(plain_state)?;
if !config.write_receipts {
return Ok(());
}
let block_count = execution_outcome.len() as u64;
let last_block = execution_outcome.last_block();
let block_range = first_block..=last_block;
let tip = self.last_block_number()?.max(last_block);
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, first_block)?;
self.write_state_changes(plain_state)?;
// Fetch the first transaction number for each block in the range
let block_indices: Vec<_> = self
.block_body_indices_range(block_range)?
@@ -1918,6 +2134,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
&self,
reverts: PlainStateReverts,
first_block: BlockNumber,
config: StateWriteConfig,
) -> ProviderResult<()> {
// Write storage changes
tracing::trace!("Writing storage changes");
@@ -1965,7 +2182,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
}
// Write account changes to static files
if !config.write_account_changesets {
return Ok(());
}
// Write account changes
tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes");
for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
@@ -2043,6 +2264,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
Ok(())
}
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
// Write hashed account updates.
let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
@@ -2336,6 +2558,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
/// Writes trie updates to the database with already sorted updates.
///
/// Returns the number of entries modified.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
if trie_updates.is_empty() {
return Ok(0)
@@ -2379,6 +2602,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
/// the same `TrieUpdates`.
///
/// Returns the number of keys written.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_trie_changesets(
&self,
block_number: BlockNumber,
@@ -2970,15 +3194,15 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
)
}
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
// account history stage
{
let storage_settings = self.cached_storage_settings();
if !storage_settings.account_history_in_rocksdb {
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
// storage history stage
{
if !storage_settings.storages_history_in_rocksdb {
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}
@@ -2987,7 +3211,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
for DatabaseProvider<TX, N>
{
fn take_block_and_execution_above(
@@ -3030,89 +3254,40 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
for DatabaseProvider<TX, N>
{
type Block = BlockTy<N>;
type Receipt = ReceiptTy<N>;
/// Inserts the block into the database, always modifying the following static file segments and
/// tables:
/// * [`StaticFileSegment::Headers`]
/// * [`tables::HeaderNumbers`]
/// * [`tables::BlockBodyIndices`]
/// Inserts the block into the database, writing to both static files and MDBX.
///
/// If there are transactions in the block, the following static file segments and tables will
/// be modified:
/// * [`StaticFileSegment::Transactions`]
/// * [`tables::TransactionBlocks`]
///
/// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
/// If withdrawals are not empty, this will modify
/// [`BlockWithdrawals`](tables::BlockWithdrawals).
///
/// If the provider has __not__ configured full sender pruning, this will modify either:
/// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
/// * [`tables::TransactionSenders`] if senders are written to the database
///
/// If the provider has __not__ configured full transaction lookup pruning, this will modify
/// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
/// This is a convenience method primarily used in tests. For production use,
/// prefer [`Self::save_blocks`] which handles execution output and trie data.
fn insert_block(
&self,
block: &RecoveredBlock<Self::Block>,
) -> ProviderResult<StoredBlockBodyIndices> {
let block_number = block.number();
let tx_count = block.body().transaction_count() as u64;
let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
self.static_file_provider
.get_writer(block_number, StaticFileSegment::Headers)?
.append_header(block.header(), &block.hash())?;
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
let first_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
senders_writer.increment_block(block.number())?;
senders_writer
.append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
}
if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
self.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
let hash = transaction.tx_hash();
writer.put_transaction_hash_number(*hash, tx_num, false)?;
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
}
self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
debug!(
target: "providers::db",
?block_number,
actions = ?durations_recorder.actions,
"Inserted block"
// Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
let executed_block = ExecutedBlock::new(
Arc::new(block.clone()),
Arc::new(ExecutionOutcome::new(
Default::default(),
Vec::<Vec<ReceiptTy<N>>>::new(),
block_number,
vec![],
)),
ComputedTrieData::default(),
);
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
// Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
// Return the body indices
self.block_body_indices(block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
}
fn append_block_bodies(
@@ -3298,7 +3473,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
durations_recorder.record_relative(metrics::Action::InsertBlock);
}
self.write_state(execution_outcome, OriginalValuesKnown::No)?;
self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
durations_recorder.record_relative(metrics::Action::InsertState);
// insert hashes and intermediate merkle nodes
@@ -3440,17 +3615,28 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
// Normal path: finalize() will call sync_all() if not already synced
let mut timings = metrics::CommitTimings::default();
let start = Instant::now();
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}
Ok(())
@@ -3523,10 +3709,17 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
provider_rw.insert_block(&data.blocks[0].0).unwrap();
provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(
&data.blocks[0].1,
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();
@@ -3549,11 +3742,18 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(
&data.blocks[i].1,
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3579,13 +3779,20 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
// insert blocks 1-3 with receipts
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(
&data.blocks[i].1,
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3610,11 +3817,18 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(
&data.blocks[i].1,
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -3673,11 +3887,18 @@ mod tests {
.write_state(
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
for i in 0..3 {
provider_rw.insert_block(&data.blocks[i].0).unwrap();
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(
&data.blocks[i].1,
crate::OriginalValuesKnown::No,
StateWriteConfig::default(),
)
.unwrap();
}
provider_rw.commit().unwrap();
@@ -4991,7 +5212,9 @@ mod tests {
}]],
..Default::default()
};
provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
provider_rw
.write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
.unwrap();
provider_rw.commit().unwrap();
};

View File

@@ -10,14 +10,14 @@ pub use database::*;
mod static_file;
pub use static_file::{
StaticFileAccess, StaticFileJarProvider, StaticFileProvider, StaticFileProviderBuilder,
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriter,
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriteCtx, StaticFileWriter,
};
mod state;
pub use state::{
historical::{
needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef, HistoryInfo,
LowestAvailableBlocks,
history_info, needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef,
HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},

View File

@@ -6,7 +6,11 @@ use reth_db::Tables;
use reth_metrics::Metrics;
use strum::{EnumIter, IntoEnumIterator};
const ROCKSDB_TABLES: &[&str] = &[Tables::TransactionHashNumbers.name()];
const ROCKSDB_TABLES: &[&str] = &[
Tables::TransactionHashNumbers.name(),
Tables::StoragesHistory.name(),
Tables::AccountsHistory.name(),
];
/// Metrics for the `RocksDB` provider.
#[derive(Debug)]

View File

@@ -4,4 +4,5 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -1,11 +1,16 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_primitives::{Address, BlockNumber, B256};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
provider::{ProviderError, ProviderResult},
@@ -16,11 +21,41 @@ use rocksdb::{
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
collections::BTreeMap,
fmt,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Instant,
};
use tracing::instrument;
/// Pending `RocksDB` batches type alias.
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
}
impl fmt::Debug for RocksDBWriteCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBWriteCtx")
.field("first_block_number", &self.first_block_number)
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.finish()
}
}
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -474,6 +509,125 @@ impl RocksDBProvider {
}))
})
}
/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on
/// the provided storage settings. Each operation runs in parallel with its own batch,
/// pushing to `ctx.pending_batches` for later commit.
#[instrument(level = "debug", target = "providers::db", skip_all)]
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: RocksDBWriteCtx,
) -> ProviderResult<()> {
if !ctx.storage_settings.any_in_rocksdb() {
return Ok(());
}
thread::scope(|s| {
let handles: Vec<_> = [
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
ctx.storage_settings
.account_history_in_rocksdb
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
ctx.storage_settings
.storages_history_in_rocksdb
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
]
.into_iter()
.enumerate()
.filter_map(|(i, h)| h.map(|h| (i, h)))
.collect();
for (i, handle) in handles {
handle.join().map_err(|_| {
ProviderError::Database(DatabaseError::Other(format!(
"rocksdb write thread {i} panicked"
)))
})??;
}
Ok(())
})
}
/// Writes transaction hash to number mappings for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
/// Handle for building a batch of operations atomically.
@@ -1272,101 +1426,9 @@ mod tests {
assert_eq!(last, Some((20, b"value_20".to_vec())));
}
#[test]
fn test_account_history_info_single_shard() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create a single shard with blocks [100, 200, 300] and highest_block = u64::MAX
// This is the "last shard" invariant
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten (before first entry, no prev shard)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 300: should return InChangeset(300) - exact match means look at
// changeset at that block for the previous value
let result = tx.account_history_info(address, 300, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(300));
// Query for block 500: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_multiple_shards() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create two shards: first shard ends at block 500, second is the last shard
let chunk1 = IntegerList::new([100, 200, 300, 400, 500]).unwrap();
let shard_key1 = ShardedKey::new(address, 500);
provider.put::<tables::AccountsHistory>(shard_key1, &chunk1).unwrap();
let chunk2 = IntegerList::new([600, 700, 800]).unwrap();
let shard_key2 = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
let tx = provider.tx();
// Query for block 50: should return NotYetWritten (before first shard, no prev)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 150: should find block 200 in first shard's changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 550: should find block 600 in second shard's changeset
// prev() should detect first shard exists
let result = tx.account_history_info(address, 550, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(600));
// Query for block 900: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 900, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_no_history() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address1 = Address::from([0x42; 20]);
let address2 = Address::from([0x43; 20]);
// Only add history for address1
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address1, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for address2 (no history exists): should return NotYetWritten
let result = tx.account_history_info(address2, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this RocksDB-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();
@@ -1390,39 +1452,4 @@ mod tests {
tx.rollback().unwrap();
}
#[test]
fn test_storage_history_info() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
// Create a single shard for this storage slot
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = StorageShardedKey::new(address, storage_key, u64::MAX);
provider.put::<tables::StoragesHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.storage_history_info(address, storage_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten
let result = tx.storage_history_info(address, storage_key, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 500: should return InPlainState
let result = tx.storage_history_info(address, storage_key, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
// Query for different storage key (no history): should return NotYetWritten
let other_key = B256::from([0x02; 32]);
let result = tx.storage_history_info(address, other_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
}

View File

@@ -2,28 +2,42 @@
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! Operations will produce errors if actually attempted.
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
use reth_db_api::table::{Encode, Table};
use reth_storage_errors::{
db::LogLevel,
provider::{ProviderError::UnsupportedProvider, ProviderResult},
};
use std::path::Path;
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use reth_db_api::models::StorageSettings;
use reth_prune_types::PruneMode;
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
use std::{path::Path, sync::Arc};
/// Pending `RocksDB` batches type alias (stub - uses unit type).
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
}
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
/// route to MDBX instead.
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
///
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
@@ -33,130 +47,22 @@ impl RocksDBProvider {
RocksDBBuilder::new(path)
}
/// Get a value from `RocksDB` (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Delete a value from `RocksDB` (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Write a batch of operations (stub implementation).
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
where
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
{
Err(UnsupportedProvider)
}
/// Creates a new transaction (stub implementation).
pub const fn tx(&self) -> RocksTx {
RocksTx
}
/// Creates a new batch for atomic writes (stub implementation).
pub const fn batch(&self) -> RocksDBBatch {
RocksDBBatch
}
/// Gets the first key-value pair from a table (stub implementation).
pub const fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Gets the last key-value pair from a table (stub implementation).
pub const fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Creates an iterator for the specified table (stub implementation).
///
/// Returns an empty iterator. This is consistent with `first()` and `last()` returning
/// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable.
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
Ok(RocksDBIter { _marker: std::marker::PhantomData })
}
/// Check consistency of `RocksDB` tables (stub implementation).
///
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
pub const fn check_consistency<Provider>(
&self,
_provider: &Provider,
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
}
/// A stub batch writer for `RocksDB` on non-Unix platforms.
/// A stub batch writer for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBatch;
impl RocksDBBatch {
/// Puts a value into the batch (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value into the batch using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the batch (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Commits the batch (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` (non-transactional).
#[derive(Debug)]
pub struct RocksDBIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksDBIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// A stub builder for `RocksDB` on non-Unix platforms.
/// A stub builder for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBuilder;
@@ -167,7 +73,7 @@ impl RocksDBBuilder {
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T: Table>(self) -> Self {
pub const fn with_table<T>(self) -> Self {
self
}
@@ -205,71 +111,3 @@ impl RocksDBBuilder {
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
impl RocksTx {
/// Gets a value from the specified table (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Gets a value using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Puts a value into the specified table (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the specified table (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Creates an iterator for the specified table (stub implementation).
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Creates an iterator starting from the given key (stub implementation).
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Commits the transaction (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Rolls back the transaction (stub implementation).
pub const fn rollback(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` transactions.
#[derive(Debug)]
pub struct RocksTxIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksTxIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}

View File

@@ -135,7 +135,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountsHistory, _>(
self.history_info_lookup::<tables::AccountsHistory, _>(
history_key,
|key| key.key == address,
self.lowest_available_blocks.account_history_block_number,
@@ -154,7 +154,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StoragesHistory, _>(
self.history_info_lookup::<tables::StoragesHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_available_blocks.storage_history_block_number,
@@ -204,7 +204,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info<T, K>(
fn history_info_lookup<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
@@ -214,45 +214,13 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
T: Table<Key = K, Value = BlockNumberList>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(key, _)| key_filter(key)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(self.block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(self.block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, self.block_number) &&
!cursor.prev()?.is_some_and(|(key, _)| key_filter(&key));
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
history_info::<T, K, _>(
&mut cursor,
key,
self.block_number,
key_filter,
lowest_available_block_number,
)
}
/// Set the lowest block number at which the account history is available.
@@ -570,6 +538,60 @@ pub fn needs_prev_shard_check(
rank == 0 && found_block != Some(block_number)
}
/// Generic history lookup for sharded history tables.
///
/// Seeks to the shard containing `block_number`, verifies the key via `key_filter`,
/// and checks previous shard to detect if we're before the first write.
pub fn history_info<T, K, C>(
cursor: &mut C,
key: K,
block_number: BlockNumber,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
C: DbCursorRO<T>,
{
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(k, _)| key_filter(k)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
let is_before_first_write = needs_prev_shard_check(rank, found_block, block_number) &&
!cursor.prev()?.is_some_and(|(k, _)| key_filter(&k));
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
#[cfg(test)]
mod tests {
use super::needs_prev_shard_check;

Some files were not shown because too many files have changed in this diff Show More