Compare commits

..

23 Commits

Author SHA1 Message Date
yongkangc
a8be40c029 perf(trie): optimize extend_sorted_vec with mem::take and fast paths
- Use mem::take to move ownership, avoiding clones of target elements
- Add fast path for non-overlapping ranges (just append)
- Use extend_from_slice for empty target case
- Reuse key from target on equal keys, only clone value
2026-01-15 19:32:47 +00:00
yongkangc
9c07dca43b perf(trie): fix extend_sorted_vec O(n log n) → O(n+m) merge
Replace the previous algorithm that appended elements then re-sorted
with a proper two-pointer merge that maintains O(n+m) complexity.

The old implementation would append new elements and call sort_by(),
resulting in O(n log n) complexity per merge. The new implementation
uses a classic merge algorithm that processes both sorted inputs
in a single pass.
2026-01-15 19:32:47 +00:00
joshieDo
b9e15dbd30 feat: add rocksdb to save_blocks (#21003)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-01-15 19:32:47 +00:00
yongkangc
8c07ee2be4 perf(trie): implement 3-way adaptive merge strategy for merge_batch
- Add prefer_sorted_merge() helper with thresholds: KWAY_MIN_SOURCES=30, PAIRWISE_MIN_AVG_ITEMS=2000
- k >= 30 sources: use k-way merge (avoids O(k) copying amplification)
- k < 30, avg >= 2000 items/source: use pairwise extend_ref (2-3x faster, better cache locality)
- Otherwise: use HashMap merge then sort (lower overhead for small data)

Benchmarks show extend_ref beats kway_merge by 1.3-3.2x for k < 30 sources.
2026-01-15 19:02:15 +00:00
Dan Cline
b1f107b171 feat(reth-bench): add generate-big-block command (#21082) 2026-01-15 15:30:04 +00:00
YK
7d0e7e72de perf(trie): add k-way merge batch optimization for merge_overlay_trie_input (#21080) 2026-01-15 15:22:15 +00:00
joshieDo
f012b3391e feat: parallelize save_blocks (#20993)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-15 14:58:06 +00:00
joshieDo
d225fc1d7f feat: add get/set db settings for rocksdb (#21095) 2026-01-15 14:48:05 +00:00
Dan Cline
d469b7f1d0 feat(rpc): add flag to skip invalid transactions in testing_buildBlockV1 (#21094)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-15 12:05:30 +00:00
YK
9bcd3712c8 test(storage): add parametrized MDBX/RocksDB history lookup equivalence tests (#20871) 2026-01-15 11:16:40 +00:00
Emma Jamieson-Hoare
b25f32a977 chore(release): set version v1.10.0 (#21091)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 10:50:35 +00:00
Emma Jamieson-Hoare
905de96944 chore: release 1.9.4 (#21048)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 09:41:54 +00:00
Sergei Shulepov
27fbd9a7de fix(db): change commit return type from Result<bool> to Result<()> (#21077)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 23:56:27 +00:00
DaniPopes
26a99ac5a3 perf: small improvement to extend_sorted_vec (#21032) 2026-01-14 23:46:58 +00:00
James Prestwich
1265a89c21 refactor: make use of dbi consistent across mdbx interface (#21079) 2026-01-14 23:42:42 +00:00
Matthias Seitz
b9ff5941eb feat(primitives): add SealedBlock::decode_sealed for efficient RLP decoding (#21030) 2026-01-14 22:49:55 +00:00
Sergei Shulepov
a75a0a5db7 feat(cli): support file:// URLs in reth download (#21026)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 22:30:42 +00:00
Matthias Seitz
0a4bac77d0 feat(primitives): add From<Sealed<B>> for SealedBlock<B> (#21078) 2026-01-14 22:19:09 +00:00
Kamil Szczygieł
1fbd5a95f8 feat: Support for sending logs through OTLP (#21039)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 21:29:00 +00:00
Arsenii Kulikov
1bc07fad8e perf: use binary search in ForwardInMemoryCursor (#21049) 2026-01-14 19:31:11 +00:00
Arsenii Kulikov
8cb506c4d3 perf: don't clone entire keys set (#21042) 2026-01-14 19:26:23 +00:00
ethfanWilliam
15f16a5a2e fix: propagate keccak-cache-global feature to reth-optimism-cli (#21051)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 19:22:22 +00:00
Brian Picciano
5cf1d2a0b0 fix(trie): Update branch masks when revealing blinded nodes (#20937) 2026-01-14 19:12:15 +00:00
211 changed files with 6680 additions and 3312 deletions

View File

@@ -12,7 +12,7 @@ workflows:
# Check that `A` activates the features of `B`.
"propagate-feature",
# These are the features to check:
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,tracy,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,js-tracer,portable,keccak-cache-global",
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,tracy,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,otlp-logs,js-tracer,portable,keccak-cache-global",
# Do not try to add a new section to `[features]` of `A` only because `B` exposes that feature. There are edge-cases where this is still needed, but we can add them manually.
"--left-side-feature-missing=ignore",
# Ignore the case that `A` it outside of the workspace. Otherwise it will report errors in external dependencies that we have no influence on.

868
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.9.3"
version = "1.10.0"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -485,7 +485,7 @@ revm-inspectors = "0.33.2"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.4.1"
alloy-dyn-abi = "1.4.3"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.1.0", default-features = false }
alloy-evm = { version = "0.25.1", default-features = false }
@@ -497,33 +497,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.4.1", default-features = false }
alloy-contract = { version = "1.4.1", default-features = false }
alloy-eips = { version = "1.4.1", default-features = false }
alloy-genesis = { version = "1.4.1", default-features = false }
alloy-json-rpc = { version = "1.4.1", default-features = false }
alloy-network = { version = "1.4.1", default-features = false }
alloy-network-primitives = { version = "1.4.1", default-features = false }
alloy-provider = { version = "1.4.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.1", default-features = false }
alloy-rpc-client = { version = "1.4.1", default-features = false }
alloy-rpc-types = { version = "1.4.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.1", default-features = false }
alloy-rpc-types-debug = { version = "1.4.1", default-features = false }
alloy-rpc-types-engine = { version = "1.4.1", default-features = false }
alloy-rpc-types-eth = { version = "1.4.1", default-features = false }
alloy-rpc-types-mev = { version = "1.4.1", default-features = false }
alloy-rpc-types-trace = { version = "1.4.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.1", default-features = false }
alloy-serde = { version = "1.4.1", default-features = false }
alloy-signer = { version = "1.4.1", default-features = false }
alloy-signer-local = { version = "1.4.1", default-features = false }
alloy-transport = { version = "1.4.1" }
alloy-transport-http = { version = "1.4.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.1", default-features = false }
alloy-transport-ws = { version = "1.4.1", default-features = false }
alloy-consensus = { version = "1.4.3", default-features = false }
alloy-contract = { version = "1.4.3", default-features = false }
alloy-eips = { version = "1.4.3", default-features = false }
alloy-genesis = { version = "1.4.3", default-features = false }
alloy-json-rpc = { version = "1.4.3", default-features = false }
alloy-network = { version = "1.4.3", default-features = false }
alloy-network-primitives = { version = "1.4.3", default-features = false }
alloy-provider = { version = "1.4.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.3", default-features = false }
alloy-rpc-client = { version = "1.4.3", default-features = false }
alloy-rpc-types = { version = "1.4.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.3", default-features = false }
alloy-rpc-types-debug = { version = "1.4.3", default-features = false }
alloy-rpc-types-engine = { version = "1.4.3", default-features = false }
alloy-rpc-types-eth = { version = "1.4.3", default-features = false }
alloy-rpc-types-mev = { version = "1.4.3", default-features = false }
alloy-rpc-types-trace = { version = "1.4.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.3", default-features = false }
alloy-serde = { version = "1.4.3", default-features = false }
alloy-signer = { version = "1.4.3", default-features = false }
alloy-signer-local = { version = "1.4.3", default-features = false }
alloy-transport = { version = "1.4.3" }
alloy-transport-http = { version = "1.4.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.3", default-features = false }
alloy-transport-ws = { version = "1.4.3", default-features = false }
# op
alloy-op-evm = { version = "0.25.0", default-features = false }
@@ -665,6 +665,7 @@ opentelemetry_sdk = "0.31"
opentelemetry = "0.31"
opentelemetry-otlp = "0.31"
opentelemetry-semantic-conventions = "0.31"
opentelemetry-appender-tracing = "0.31"
tracing-opentelemetry = "0.32"
# misc-testing

View File

@@ -17,21 +17,26 @@ workspace = true
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-engine-primitives.workspace = true
reth-ethereum-primitives.workspace = true
reth-fs-util.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-primitives-traits.workspace = true
reth-rpc-api.workspace = true
reth-tracing.workspace = true
reth-chainspec.workspace = true
# alloy
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-consensus.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-engine.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["kzg"] }
alloy-transport-http.workspace = true
alloy-transport-ipc.workspace = true
alloy-transport-ws.workspace = true

View File

@@ -0,0 +1,160 @@
//! Benchmarks empty block processing by ramping the block gas limit.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState, JwtSecret};
use clap::Parser;
use reqwest::Url;
use reth_chainspec::ChainSpec;
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use std::{path::PathBuf, time::Instant};
use tracing::info;
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
/// Number of blocks to generate.
#[arg(long, value_name = "BLOCKS")]
blocks: u64,
/// The Engine API RPC URL.
#[arg(long = "engine-rpc-url", value_name = "ENGINE_RPC_URL")]
engine_rpc_url: String,
/// Path to the JWT secret for Engine API authentication.
#[arg(long = "jwt-secret", value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
}
impl Command {
/// Execute `benchmark gas-limit-ramp` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
if self.blocks == 0 {
return Err(eyre::eyre!("--blocks must be greater than 0"));
}
// Ensure output directory exists
if self.output.is_file() {
return Err(eyre::eyre!("Output path must be a directory"));
}
if !self.output.exists() {
std::fs::create_dir_all(&self.output)?;
info!("Created output directory: {:?}", self.output);
}
// Set up authenticated provider (used for both Engine API and eth_ methods)
let jwt = std::fs::read_to_string(&self.jwt_secret)?;
let jwt = JwtSecret::from_hex(jwt)?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_with(auth_transport).await?;
let provider = RootProvider::<AnyNetwork>::new(client);
// Get chain spec - required for fork detection
let chain_id = provider.get_chain_id().await?;
let chain_spec = ChainSpec::from_chain_id(chain_id)
.ok_or_else(|| eyre::eyre!("Unsupported chain id: {chain_id}"))?;
// Fetch the current head block as parent
let parent_block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let (mut parent_header, mut parent_hash) = rpc_block_to_header(parent_block);
let canonical_parent = parent_header.number;
let start_block = canonical_parent + 1;
let end_block = start_block + self.blocks - 1;
info!(canonical_parent, start_block, end_block, "Starting gas limit ramp benchmark");
let mut next_block_number = start_block;
let total_benchmark_duration = Instant::now();
while next_block_number <= end_block {
let timestamp = parent_header.timestamp.saturating_add(1);
let request = prepare_payload_request(&chain_spec, timestamp, parent_hash);
let new_payload_version = request.new_payload_version;
let (payload, sidecar) = build_payload(&provider, request).await?;
let mut block =
payload.clone().try_into_block_with_sidecar::<TransactionSigned>(&sidecar)?;
let max_increase = max_gas_limit_increase(parent_header.gas_limit);
let gas_limit =
parent_header.gas_limit.saturating_add(max_increase).min(MAXIMUM_GAS_LIMIT_BLOCK);
block.header.gas_limit = gas_limit;
let block_hash = block.header.hash_slow();
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
block.header.withdrawals_root,
Some(new_payload_version),
)?;
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(block_number = block.header.number, path = %payload_path.display(), "Saved payload");
call_new_payload(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
parent_header = block.header;
parent_hash = block_hash;
next_block_number += 1;
}
let final_gas_limit = parent_header.gas_limit;
info!(
total_duration=?total_benchmark_duration.elapsed(),
blocks_processed = self.blocks,
final_gas_limit,
"Benchmark complete"
);
Ok(())
}
}
const fn max_gas_limit_increase(parent_gas_limit: u64) -> u64 {
(parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR).saturating_sub(1)
}

View File

@@ -0,0 +1,617 @@
//! Command for generating large blocks by packing transactions from real blocks.
//!
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_buildBlockV1` RPC endpoint.
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
PayloadAttributes,
};
use alloy_transport::layers::RetryBackoffLayer;
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_rpc_api::TestingBuildBlockRequestV1;
use std::future::Future;
use tokio::sync::mpsc;
use tracing::{info, warn};
/// A single transaction with its gas used and raw encoded bytes.
#[derive(Debug, Clone)]
pub struct RawTransaction {
/// The actual gas used by the transaction (from receipt).
pub gas_used: u64,
/// The transaction type (e.g., 3 for EIP-4844 blob txs).
pub tx_type: u8,
/// The raw RLP-encoded transaction bytes.
pub raw: Bytes,
}
/// Abstraction over sources of transactions for big block generation.
///
/// Implementors provide transactions from different sources (RPC, database, files, etc.)
pub trait TransactionSource {
/// Fetch transactions from a specific block number.
///
/// Returns `Ok(None)` if the block doesn't exist.
/// Returns `Ok(Some((transactions, gas_used)))` with the block's transactions and total gas.
fn fetch_block_transactions(
&self,
block_number: u64,
) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
}
/// RPC-based transaction source that fetches from a remote node.
#[derive(Debug)]
pub struct RpcTransactionSource {
provider: RootProvider<AnyNetwork>,
}
impl RpcTransactionSource {
/// Create a new RPC transaction source.
pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
Self { provider }
}
/// Create from an RPC URL with retry backoff.
pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let provider = RootProvider::<AnyNetwork>::new(client);
Ok(Self { provider })
}
}
impl TransactionSource for RpcTransactionSource {
async fn fetch_block_transactions(
&self,
block_number: u64,
) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
// Fetch block and receipts in parallel
let (block, receipts) = tokio::try_join!(
self.provider.get_block_by_number(block_number.into()).full(),
self.provider.get_block_receipts(block_number.into())
)?;
let Some(block) = block else {
return Ok(None);
};
let Some(receipts) = receipts else {
return Err(eyre::eyre!("Receipts not found for block {}", block_number));
};
let block_gas_used = block.header.gas_used;
// Convert cumulative gas from receipts to per-tx gas_used
let mut prev_cumulative = 0u64;
let transactions: Vec<RawTransaction> = block
.transactions
.txns()
.zip(receipts.iter())
.map(|(tx, receipt)| {
let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
let gas_used = cumulative - prev_cumulative;
prev_cumulative = cumulative;
let with_encoded = tx.inner.inner.clone().into_encoded();
RawTransaction {
gas_used,
tx_type: tx.inner.ty(),
raw: with_encoded.encoded_bytes().clone(),
}
})
.collect();
Ok(Some((transactions, block_gas_used)))
}
}
/// Collects transactions from a source up to a target gas usage.
#[derive(Debug)]
pub struct TransactionCollector<S> {
source: S,
target_gas: u64,
}
impl<S: TransactionSource> TransactionCollector<S> {
/// Create a new transaction collector.
pub const fn new(source: S, target_gas: u64) -> Self {
Self { source, target_gas }
}
/// Collect transactions starting from the given block number.
///
/// Skips blob transactions (type 3) and collects until target gas is reached.
/// Returns the collected raw transaction bytes, total gas used, and the next block number.
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
let mut transactions: Vec<Bytes> = Vec::new();
let mut total_gas: u64 = 0;
let mut current_block = start_block;
while total_gas < self.target_gas {
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
else {
warn!(block = current_block, "Block not found, stopping");
break;
};
for tx in block_txs {
// Skip blob transactions (EIP-4844, type 3)
if tx.tx_type == 3 {
continue;
}
if total_gas + tx.gas_used <= self.target_gas {
transactions.push(tx.raw);
total_gas += tx.gas_used;
}
if total_gas >= self.target_gas {
break;
}
}
current_block += 1;
// Stop early if remaining gas is under 1M (close enough to target)
let remaining_gas = self.target_gas.saturating_sub(total_gas);
if remaining_gas < 1_000_000 {
break;
}
}
info!(
total_txs = transactions.len(),
total_gas,
next_block = current_block,
"Finished collecting transactions"
);
Ok((transactions, total_gas, current_block))
}
}
/// `reth bench generate-big-block` command
///
/// Generates a large block by fetching transactions from existing blocks and packing them
/// into a single block using the `testing_buildBlockV1` RPC endpoint.
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC URL to use for fetching blocks (can be an external archive node).
#[arg(long, value_name = "RPC_URL")]
rpc_url: String,
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// The RPC URL for `testing_buildBlockV1` calls (same node as engine, regular RPC port).
#[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
testing_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
#[arg(long, default_value = "false")]
execute: bool,
/// Number of payloads to generate. Each payload uses the previous as parent.
/// When count == 1, the payload is only generated and saved, not executed.
/// When count > 1, each payload is executed before building the next.
#[arg(long, default_value = "1")]
count: u64,
/// Number of transaction batches to prefetch in background when count > 1.
/// Higher values reduce latency but use more memory.
#[arg(long, default_value = "4")]
prefetch_buffer: usize,
/// Output directory for generated payloads. Each payload is saved as `payload_block_N.json`.
#[arg(long, value_name = "OUTPUT_DIR")]
output_dir: std::path::PathBuf,
}
/// A built payload ready for execution.
struct BuiltPayload {
block_number: u64,
envelope: ExecutionPayloadEnvelopeV4,
block_hash: B256,
timestamp: u64,
}
impl Command {
/// Execute the `generate-big-block` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Set up testing RPC provider (for testing_buildBlockV1)
info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
let testing_client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(self.testing_rpc_url.parse()?);
let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
// Get the parent block (latest canonical block)
info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
let parent_block = auth_provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let parent_hash = parent_block.header.hash;
let parent_number = parent_block.header.number;
let parent_timestamp = parent_block.header.timestamp;
info!(
parent_hash = %parent_hash,
parent_number = parent_number,
"Using initial parent block"
);
// Create output directory
std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {
self.execute_pipelined(
&auth_provider,
&testing_provider,
start_block,
parent_hash,
parent_timestamp,
)
.await?;
} else {
// Single payload - collect transactions and build
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
let collector = TransactionCollector::new(tx_source, self.target_gas);
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected"));
}
self.execute_sequential(
&auth_provider,
&testing_provider,
transactions,
parent_hash,
parent_timestamp,
)
.await?;
}
info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
Ok(())
}
/// Sequential execution path for single payload or no-execute mode.
async fn execute_sequential(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
transactions: Vec<Bytes>,
mut parent_hash: B256,
mut parent_timestamp: u64,
) -> eyre::Result<()> {
for i in 0..self.count {
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
"Building payload via testing_buildBlockV1"
);
let built = self
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
.await?;
self.save_payload(&built)?;
if self.execute || self.count > 1 {
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
}
parent_hash = built.block_hash;
parent_timestamp = built.timestamp;
}
Ok(())
}
/// Pipelined execution - fetches transactions and builds payloads in background.
async fn execute_pipelined(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
start_block: u64,
initial_parent_hash: B256,
initial_parent_timestamp: u64,
) -> eyre::Result<()> {
// Create channel for transaction batches (one batch per payload)
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
// Spawn background task to continuously fetch transaction batches
let rpc_url = self.rpc_url.clone();
let target_gas = self.target_gas;
let count = self.count;
let fetcher_handle = tokio::spawn(async move {
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
Ok(source) => source,
Err(e) => {
warn!(error = %e, "Failed to create transaction source");
return;
}
};
let collector = TransactionCollector::new(tx_source, target_gas);
let mut current_block = start_block;
for payload_idx in 0..count {
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});
let mut parent_hash = initial_parent_hash;
let mut parent_timestamp = initial_parent_timestamp;
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
for i in 0..self.count {
let is_last = i == self.count - 1;
// Get current payload (either from pending build or build now)
let current_payload = if let Some(handle) = pending_build.take() {
handle.await??
} else {
// First payload - wait for transactions and build synchronously
let transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
tx_count = transactions.len(),
"Building payload via testing_buildBlockV1"
);
self.build_payload(
testing_provider,
&transactions,
i,
parent_hash,
parent_timestamp,
)
.await?
};
self.save_payload(&current_payload)?;
let current_block_hash = current_payload.block_hash;
let current_timestamp = current_payload.timestamp;
// Execute current payload first
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Start building next payload in background (if not last) - AFTER execution
if !is_last {
// Get transactions for next payload (should already be fetched or fetching)
let next_transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if next_transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
}
let testing_provider = testing_provider.clone();
let next_index = i + 1;
let total = self.count;
pending_build = Some(tokio::spawn(async move {
info!(
payload = next_index + 1,
total = total,
parent_hash = %current_block_hash,
parent_timestamp = current_timestamp,
tx_count = next_transactions.len(),
"Building payload via testing_buildBlockV1"
);
Self::build_payload_static(
&testing_provider,
&next_transactions,
next_index,
current_block_hash,
current_timestamp,
)
.await
}));
}
parent_hash = current_block_hash;
parent_timestamp = current_timestamp;
}
// Clean up the fetcher task
drop(tx_receiver);
let _ = fetcher_handle.await;
Ok(())
}
/// Build a single payload via `testing_buildBlockV1`.
async fn build_payload(
&self,
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
Self::build_payload_static(
testing_provider,
transactions,
index,
parent_hash,
parent_timestamp,
)
.await
}
/// Static version for use in spawned tasks.
async fn build_payload_static(
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
let request = TestingBuildBlockRequestV1 {
parent_block_hash: parent_hash,
payload_attributes: PayloadAttributes {
timestamp: parent_timestamp + 12,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
},
transactions: transactions.to_vec(),
extra_data: None,
};
let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
info!(
payload = index + 1,
tx_count = transactions.len(),
total_tx_bytes = total_tx_bytes,
parent_hash = %parent_hash,
"Sending to testing_buildBlockV1"
);
let envelope: ExecutionPayloadEnvelopeV5 =
testing_provider.client().request("testing_buildBlockV1", [request]).await?;
let v4_envelope = envelope.try_into_v4()?;
let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
let block_hash = inner.block_hash;
let block_number = inner.block_number;
let timestamp = inner.timestamp;
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
}
/// Save a payload to disk.
fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
let filename = format!("payload_block_{}.json", payload.block_number);
let filepath = self.output_dir.join(&filename);
let json = serde_json::to_string_pretty(&payload.envelope)?;
std::fs::write(&filepath, &json)
.wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
Ok(())
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload,
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
if !fcu_result.is_valid() {
return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
}
Ok(())
}
}

View File

@@ -0,0 +1,196 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, RootProvider};
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState,
PayloadAttributes, PayloadId, PraguePayloadFields,
};
use eyre::OptionExt;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_node_api::EngineApiMessageVersion;
use tracing::debug;
/// Prepared payload request data for triggering block building.
pub(crate) struct PayloadRequest {
/// The payload attributes for the new block.
pub(crate) attributes: PayloadAttributes,
/// The forkchoice state pointing to the parent block.
pub(crate) forkchoice_state: ForkchoiceState,
/// The engine API version for FCU calls.
pub(crate) fcu_version: EngineApiMessageVersion,
/// The getPayload version to use (1-5).
pub(crate) get_payload_version: u8,
/// The newPayload version to use.
pub(crate) new_payload_version: EngineApiMessageVersion,
}
/// Prepare payload attributes and forkchoice state for a new block.
pub(crate) fn prepare_payload_request(
chain_spec: &ChainSpec,
timestamp: u64,
parent_hash: B256,
) -> PayloadRequest {
let shanghai_active = chain_spec.is_shanghai_active_at_timestamp(timestamp);
let cancun_active = chain_spec.is_cancun_active_at_timestamp(timestamp);
let prague_active = chain_spec.is_prague_active_at_timestamp(timestamp);
let osaka_active = chain_spec.is_osaka_active_at_timestamp(timestamp);
// FCU version: V3 for Cancun+Prague+Osaka, V2 for Shanghai, V1 otherwise
let fcu_version = if cancun_active {
EngineApiMessageVersion::V3
} else if shanghai_active {
EngineApiMessageVersion::V2
} else {
EngineApiMessageVersion::V1
};
// getPayload version: 5 for Osaka, 4 for Prague, 3 for Cancun, 2 for Shanghai, 1 otherwise
// newPayload version: 4 for Prague+Osaka (no V5), 3 for Cancun, 2 for Shanghai, 1 otherwise
let (get_payload_version, new_payload_version) = if osaka_active {
(5, EngineApiMessageVersion::V4) // Osaka uses getPayloadV5 but newPayloadV4
} else if prague_active {
(4, EngineApiMessageVersion::V4)
} else if cancun_active {
(3, EngineApiMessageVersion::V3)
} else if shanghai_active {
(2, EngineApiMessageVersion::V2)
} else {
(1, EngineApiMessageVersion::V1)
};
PayloadRequest {
attributes: PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: shanghai_active.then(Vec::new),
parent_beacon_block_root: cancun_active.then_some(B256::ZERO),
},
forkchoice_state: ForkchoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
},
fcu_version,
get_payload_version,
new_payload_version,
}
}
/// Trigger payload building via FCU and retrieve the built payload.
///
/// This sends a forkchoiceUpdated with payload attributes to start building,
/// then calls getPayload to retrieve the result.
pub(crate) async fn build_payload(
provider: &RootProvider<AnyNetwork>,
request: PayloadRequest,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
let fcu_result = call_forkchoice_updated(
provider,
request.fcu_version,
request.forkchoice_state,
Some(request.attributes.clone()),
)
.await?;
let payload_id =
fcu_result.payload_id.ok_or_eyre("Payload builder did not return a payload id")?;
get_payload_with_sidecar(
provider,
request.get_payload_version,
payload_id,
request.attributes.parent_beacon_block_root,
)
.await
}
/// Convert an RPC block to a consensus header and block hash.
pub(crate) fn rpc_block_to_header(block: alloy_provider::network::AnyRpcBlock) -> (Header, B256) {
let block_hash = block.header.hash;
let header = block.header.inner.clone().into_header_with_defaults();
(header, block_hash)
}
/// Compute versioned hashes from KZG commitments.
fn versioned_hashes_from_commitments(
commitments: &[alloy_primitives::FixedBytes<48>],
) -> Vec<B256> {
commitments.iter().map(|c| kzg_to_versioned_hash(c.as_ref())).collect()
}
/// Fetch an execution payload using the appropriate engine API version.
pub(crate) async fn get_payload_with_sidecar(
provider: &RootProvider<AnyNetwork>,
version: u8,
payload_id: PayloadId,
parent_beacon_block_root: Option<B256>,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
debug!(get_payload_version = ?version, ?payload_id, "Sending getPayload");
match version {
1 => {
let payload = provider.get_payload_v1(payload_id).await?;
Ok((ExecutionPayload::V1(payload), ExecutionPayloadSidecar::none()))
}
2 => {
let envelope = provider.get_payload_v2(payload_id).await?;
let payload = match envelope.execution_payload {
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V1(p) => ExecutionPayload::V1(p),
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V2(p) => ExecutionPayload::V2(p),
};
Ok((payload, ExecutionPayloadSidecar::none()))
}
3 => {
let envelope = provider.get_payload_v3(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V3")?,
versioned_hashes,
};
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v3(cancun_fields),
))
}
4 => {
let envelope = provider.get_payload_v4(payload_id).await?;
let versioned_hashes = versioned_hashes_from_commitments(
&envelope.envelope_inner.blobs_bundle.commitments,
);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V4")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.envelope_inner.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
5 => {
// V5 (Osaka) - use raw request since alloy doesn't have get_payload_v5 yet
let envelope = provider.get_payload_v5(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V5")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
_ => panic!("This tool does not support getPayload versions past v5"),
}
}

View File

@@ -6,9 +6,16 @@ use reth_node_core::args::LogArgs;
use reth_tracing::FileWorkerGuard;
mod context;
mod gas_limit_ramp;
mod generate_big_block;
pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
mod new_payload_fcu;
mod new_payload_only;
mod output;
mod replay_payloads;
mod send_payload;
/// `reth bench` command
@@ -27,6 +34,9 @@ pub enum Subcommands {
/// Benchmark which calls `newPayload`, then `forkchoiceUpdated`.
NewPayloadFcu(new_payload_fcu::Command),
/// Benchmark which builds empty blocks with a ramped gas limit.
GasLimitRamp(gas_limit_ramp::Command),
/// Benchmark which only calls subsequent `newPayload` calls.
NewPayloadOnly(new_payload_only::Command),
@@ -41,6 +51,29 @@ pub enum Subcommands {
/// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000
/// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)`
SendPayload(send_payload::Command),
/// Generate a large block by packing transactions from existing blocks.
///
/// This command fetches transactions from real blocks and packs them into a single
/// block using the `testing_buildBlockV1` RPC endpoint.
///
/// Example:
///
/// `reth-bench generate-big-block --rpc-url http://localhost:8545 --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex --target-gas
/// 30000000`
GenerateBigBlock(generate_big_block::Command),
/// Replay pre-generated payloads from a directory.
///
/// This command reads payload files from a previous `generate-big-block` run and replays
/// them in sequence using `newPayload` followed by `forkchoiceUpdated`.
///
/// Example:
///
/// `reth-bench replay-payloads --payload-dir ./payloads --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex`
ReplayPayloads(replay_payloads::Command),
}
impl BenchmarkCommand {
@@ -51,8 +84,11 @@ impl BenchmarkCommand {
match self.command {
Subcommands::NewPayloadFcu(command) => command.execute(ctx).await,
Subcommands::GasLimitRamp(command) => command.execute(ctx).await,
Subcommands::NewPayloadOnly(command) => command.execute(ctx).await,
Subcommands::SendPayload(command) => command.execute(ctx).await,
Subcommands::GenerateBigBlock(command) => command.execute(ctx).await,
Subcommands::ReplayPayloads(command) => command.execute(ctx).await,
}
}

View File

@@ -13,8 +13,7 @@ use crate::{
bench::{
context::BenchContext,
output::{
CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
GAS_OUTPUT_SUFFIX,
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
@@ -27,7 +26,6 @@ use alloy_rpc_client::RpcClient;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
@@ -123,6 +121,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -188,6 +187,7 @@ impl Command {
result
} {
let gas_used = block.header.gas_used;
let gas_limit = block.header.gas_limit;
let block_number = block.header.number;
let transaction_count = block.transactions.len() as u64;
@@ -211,6 +211,7 @@ impl Command {
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
@@ -240,28 +241,11 @@ impl Command {
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
// Write CSV output files
if let Some(ref path) = self.benchmark.output {
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
write_benchmark_results(path, &gas_output_results, combined_results)?;
}
let gas_output = TotalGasOutput::new(gas_output_results)?;

View File

@@ -49,6 +49,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -96,11 +97,7 @@ impl Command {
let transaction_count = block.transactions.len() as u64;
let gas_used = block.header.gas_used;
debug!(
target: "reth-bench",
number=?block.header.number,
"Sending payload to engine",
);
debug!(number=?block.header.number, "Sending payload to engine");
let (version, params) = block_to_new_payload(block, is_optimism)?;

View File

@@ -1,10 +1,13 @@
//! Contains various benchmark output formats, either for logging or for
//! serialization to / from files.
use alloy_primitives::B256;
use csv::Writer;
use eyre::OptionExt;
use reth_primitives_traits::constants::GIGAGAS;
use serde::{ser::SerializeStruct, Serialize};
use std::time::Duration;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{path::Path, time::Duration};
use tracing::info;
/// This is the suffix for gas output csv files.
pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
@@ -15,6 +18,17 @@ pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv";
/// This is the suffix for new payload output csv files.
pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
/// Serialized format for gas ramp payloads on disk.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GasRampPayloadFile {
/// Engine API version (1-5).
pub(crate) version: u8,
/// The block hash for FCU.
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
/// used and the `newPayload` latency.
#[derive(Debug)]
@@ -67,6 +81,8 @@ impl Serialize for NewPayloadResult {
pub(crate) struct CombinedResult {
/// The block number of the block being processed.
pub(crate) block_number: u64,
/// The gas limit of the block.
pub(crate) gas_limit: u64,
/// The number of transactions in the block.
pub(crate) transaction_count: u64,
/// The `newPayload` result.
@@ -88,7 +104,7 @@ impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload {} processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
"Block {} processed at {:.4} Ggas/s, used {} total gas. Combined: {:.4} Ggas/s. fcu: {:?}, newPayload: {:?}",
self.block_number,
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
@@ -110,10 +126,11 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 6)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_limit", &self.gas_limit)?;
state.serialize_field("transaction_count", &self.transaction_count)?;
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
@@ -167,6 +184,36 @@ impl TotalGasOutput {
}
}
/// Write benchmark results to CSV files.
///
/// Writes two files to the output directory:
/// - `combined_latency.csv`: Per-block latency results
/// - `total_gas.csv`: Per-block gas usage over time
pub(crate) fn write_benchmark_results(
output_dir: &Path,
gas_results: &[TotalGasRow],
combined_results: Vec<CombinedResult>,
) -> eyre::Result<()> {
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in gas_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", output_dir);
Ok(())
}
/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
///
/// This is essentially just for the csv writer, which would have headers

View File

@@ -0,0 +1,332 @@
//! Command for replaying pre-generated payloads from disk.
//!
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::output::GasRampPayloadFile,
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_node_api::EngineApiMessageVersion;
use std::path::PathBuf;
use tracing::{debug, info};
/// `reth bench replay-payloads` command
///
/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
/// `forkchoiceUpdated` for each payload in sequence.
#[derive(Debug, Parser)]
pub struct Command {
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Directory containing payload files (`payload_block_N.json`).
#[arg(long, value_name = "PAYLOAD_DIR")]
payload_dir: PathBuf,
/// Optional limit on the number of payloads to replay.
/// If not specified, replays all payloads in the directory.
#[arg(long, value_name = "COUNT")]
count: Option<usize>,
/// Skip the first N payloads.
#[arg(long, value_name = "SKIP", default_value = "0")]
skip: usize,
/// Optional directory containing gas ramp payloads to replay first.
/// These are replayed before the main payloads to warm up the gas limit.
#[arg(long, value_name = "GAS_RAMP_DIR")]
gas_ramp_dir: Option<PathBuf>,
}
/// A loaded payload ready for execution.
struct LoadedPayload {
/// The index (from filename).
index: u64,
/// The payload envelope.
envelope: ExecutionPayloadEnvelopeV4,
/// The block hash.
block_hash: B256,
}
/// A gas ramp payload loaded from disk.
struct GasRampPayload {
/// Block number from filename.
block_number: u64,
/// Engine API version for newPayload.
version: EngineApiMessageVersion,
/// The file contents.
file: GasRampPayloadFile,
}
impl Command {
/// Execute the `replay-payloads` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Get parent block (latest canonical block) - we need this for the first FCU
let parent_block = auth_provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let initial_parent_hash = parent_block.header.hash;
let initial_parent_number = parent_block.header.number;
info!(
parent_hash = %initial_parent_hash,
parent_number = initial_parent_number,
"Using initial parent block"
);
// Load all payloads upfront to avoid I/O delays between phases
let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
if payloads.is_empty() {
return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
}
info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
payloads
} else {
Vec::new()
};
let payloads = self.load_payloads()?;
if payloads.is_empty() {
return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
}
info!(count = payloads.len(), "Loaded main payloads from disk");
let mut parent_hash = initial_parent_hash;
// Replay gas ramp payloads first
for (i, payload) in gas_ramp_payloads.iter().enumerate() {
info!(
gas_ramp_payload = i + 1,
total = gas_ramp_payloads.len(),
block_number = payload.block_number,
block_hash = %payload.file.block_hash,
"Executing gas ramp payload (newPayload + FCU)"
);
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
parent_hash = payload.file.block_hash;
}
if !gas_ramp_payloads.is_empty() {
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
}
for (i, payload) in payloads.iter().enumerate() {
info!(
payload = i + 1,
total = payloads.len(),
index = payload.index,
block_hash = %payload.block_hash,
"Executing payload (newPayload + FCU)"
);
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
parent_hash = payload.block_hash;
}
info!(count = payloads.len(), "All payloads replayed successfully");
Ok(())
}
/// Load and parse all payload files from the directory.
fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
let mut payloads = Vec::new();
// Read directory entries
let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_")
})
.collect();
// Parse filenames to get indices and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract index from "payload_NNN.json"
let index_str = name_str.strip_prefix("payload_")?.strip_suffix(".json")?;
let index: u64 = index_str.parse().ok()?;
Some((index, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(idx, _)| *idx);
// Apply skip and count
let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
let indexed_paths: Vec<_> = match self.count {
Some(count) => indexed_paths.into_iter().take(count).collect(),
None => indexed_paths,
};
// Load each payload
for (index, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
info!(
index = index,
block_hash = %block_hash,
path = %path.display(),
"Loaded payload"
);
payloads.push(LoadedPayload { index, envelope, block_hash });
}
Ok(payloads)
}
/// Load and parse gas ramp payload files from a directory.
fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
let mut payloads = Vec::new();
let entries: Vec<_> = std::fs::read_dir(dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_block_")
})
.collect();
// Parse filenames to get block numbers and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract block number from "payload_block_NNN.json"
let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
let block_number: u64 = block_str.parse().ok()?;
Some((block_number, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(num, _)| *num);
for (block_number, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let file: GasRampPayloadFile = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let version = match file.version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
};
info!(
block_number,
block_hash = %file.block_hash,
path = %path.display(),
"Loaded gas ramp payload"
);
payloads.push(GasRampPayload { block_number, version, file });
}
Ok(payloads)
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: &ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(?status, "newPayloadV4 response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(?fcu_result, "forkchoiceUpdatedV3 response");
Ok(())
}
}

View File

@@ -3,15 +3,16 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated,
PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use tracing::error;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
#[async_trait::async_trait]
@@ -52,6 +53,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV1",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
@@ -82,6 +90,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV2",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
@@ -112,6 +127,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV3",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
@@ -148,33 +170,51 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
if let Some(prague) = sidecar.prague() {
// Use target version if provided (for Osaka), otherwise default to V4
let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
if is_optimism {
let withdrawals_root = withdrawals_root.ok_or_else(|| {
eyre::eyre!("Missing withdrawals root for Optimism payload")
})?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
OpExecutionPayloadV4 {
payload_inner: payload,
withdrawals_root: block.withdrawals_root.unwrap(),
},
OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
Requests::default(),
))?,
)
} else {
// Extract actual Requests from RequestsOrHash
let requests = prague
.requests
.requests()
.cloned()
.ok_or_else(|| eyre::eyre!("Prague sidecar has hash, not requests"))?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
payload,
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
prague.requests.requests_hash(),
requests,
))?,
)
}
@@ -217,6 +257,8 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
) -> TransportResult<()> {
let method = version.method_name();
debug!(method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
@@ -237,12 +279,15 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
/// actual engine api message call.
///
/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
// FCU V3 is used for both Cancun and Prague (there is no FCU V4)
match message_version {
EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await

View File

@@ -81,12 +81,16 @@ backon.workspace = true
tempfile.workspace = true
[features]
default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
default = ["jemalloc", "otlp", "otlp-logs", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
otlp = [
"reth-ethereum-cli/otlp",
"reth-node-core/otlp",
]
otlp-logs = [
"reth-ethereum-cli/otlp-logs",
"reth-node-core/otlp-logs",
]
js-tracer = [
"reth-node-builder/js-tracer",
"reth-node-ethereum/js-tracer",

View File

@@ -460,6 +460,18 @@ impl ChainSpec {
pub fn builder() -> ChainSpecBuilder {
ChainSpecBuilder::default()
}
/// Map a chain ID to a known chain spec, if available.
pub fn from_chain_id(chain_id: u64) -> Option<Arc<Self>> {
match NamedChain::try_from(chain_id).ok()? {
NamedChain::Mainnet => Some(MAINNET.clone()),
NamedChain::Sepolia => Some(SEPOLIA.clone()),
NamedChain::Holesky => Some(HOLESKY.clone()),
NamedChain::Hoodi => Some(HOODI.clone()),
NamedChain::Dev => Some(DEV.clone()),
_ => None,
}
}
}
impl<H: BlockHeader> ChainSpec<H> {

View File

@@ -100,7 +100,7 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
tx.disable_long_read_transaction_safety();
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(&table_db).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let total_entries = stats.entries();
let final_entry_idx = total_entries.saturating_sub(1);
if self.args.skip > final_entry_idx {

View File

@@ -54,18 +54,18 @@ pub enum SetCommand {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storages history in RocksDB instead of MDBX
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in RocksDB instead of MDBX
AccountHistory {
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash numbers in RocksDB instead of MDBX
TxHashNumbers {
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -151,6 +151,14 @@ impl Command {
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
@@ -159,14 +167,6 @@ impl Command {
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
SetCommand::TxHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
}
// Write updated settings

View File

@@ -88,7 +88,7 @@ impl Command {
let stats = tx
.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {db_table}"))?;
// Defaults to 16KB right now but we should
@@ -129,7 +129,8 @@ impl Command {
table.add_row(row);
let freelist = tx.inner.env().freelist()?;
let pagesize = tx.inner.db_stat(&mdbx::Database::freelist_db())?.page_size() as usize;
let pagesize =
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
let freelist_size = freelist * pagesize;
let mut row = Row::new();

View File

@@ -86,6 +86,9 @@ impl DownloadDefaults {
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
help
}
@@ -293,19 +296,14 @@ impl CompressionFormat {
}
}
/// Downloads and extracts a snapshot, blocking until finished.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
let progress_reader = ProgressReader::new(response, total_size);
let format = CompressionFormat::from_url(url)?;
/// Extracts a compressed tar archive to the target directory with progress tracking.
fn extract_archive<R: Read>(
reader: R,
total_size: u64,
format: CompressionFormat,
target_dir: &Path,
) -> Result<()> {
let progress_reader = ProgressReader::new(reader, total_size);
match format {
CompressionFormat::Lz4 => {
@@ -322,6 +320,45 @@ fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
Ok(())
}
/// Extracts a snapshot from a local file.
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let file = std::fs::File::open(path)?;
let total_size = file.metadata()?.len();
extract_archive(file, total_size, format, target_dir)
}
/// Fetches the snapshot from a remote URL, uncompressing it in a streaming fashion.
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
extract_archive(response, total_size, format, target_dir)
}
/// Downloads and extracts a snapshot, blocking until finished.
///
/// Supports both `file://` URLs for local files and HTTP(S) URLs for remote downloads.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let format = CompressionFormat::from_url(url)?;
if let Ok(parsed_url) = Url::parse(url) &&
parsed_url.scheme() == "file"
{
let file_path = parsed_url
.to_file_path()
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
extract_from_file(&file_path, format, target_dir)
} else {
download_and_extract(url, format, target_dir)
}
}
async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
@@ -380,6 +417,7 @@ mod tests {
assert!(help.contains("Available snapshot sources:"));
assert!(help.contains("merkle.io"));
assert!(help.contains("publicnode.com"));
assert!(help.contains("file://"));
}
#[test]
@@ -404,4 +442,25 @@ mod tests {
assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
}
#[test]
fn test_compression_format_detection() {
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
}
}

View File

@@ -182,7 +182,6 @@ impl<C: ChainSpecParser> Command<C> {
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;

View File

@@ -42,7 +42,7 @@ where
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).with_default_tables().build()?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -39,7 +39,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Arc<Dat
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).with_default_tables().build()?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -29,7 +29,7 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Arc<Dat
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).with_default_tables().build()?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -62,7 +62,7 @@ where
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).with_default_tables().build()?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -125,10 +125,7 @@ pub async fn setup_engine_with_chain_import(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)?;
// Initialize genesis if needed
@@ -331,7 +328,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
.with_default_tables()
.build()
.unwrap(),
)
@@ -396,7 +392,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
)
@@ -495,10 +490,7 @@ mod tests {
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create provider factory");

View File

@@ -20,6 +20,7 @@ use reth_trie_parallel::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
@@ -609,7 +610,19 @@ impl MultiProofTask {
self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: targets
.keys()
.filter_map(|account| {
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.map(|keys| (*account, keys))
})
.collect(),
});
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
@@ -705,7 +718,33 @@ impl MultiProofTask {
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: {
let mut storages = B256Map::with_capacity_and_hasher(
not_fetched_state_update.storages.len(),
Default::default(),
);
for account in not_fetched_state_update
.storages
.keys()
.chain(not_fetched_state_update.accounts.keys())
{
if let hash_map::Entry::Vacant(entry) = storages.entry(*account) {
entry.insert(
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.unwrap_or_default(),
);
}
}
storages
},
});
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();

View File

@@ -1,5 +1,10 @@
//! Types and traits for validating blocks and payloads.
/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`.
///
/// Benchmarked crossover: `extend_ref` wins up to ~30 blocks, `merge_batch` wins beyond.
const MERGE_BATCH_THRESHOLD: usize = 30;
use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
@@ -40,7 +45,10 @@ use reth_provider::{
StateProvider, StateProviderFactory, StateReader, TrieReader,
};
use reth_revm::db::State;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie::{
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, StateRoot, TrieInputSorted,
};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -1012,34 +1020,63 @@ where
Ok((input, block_hash))
}
/// Aggregates multiple in-memory blocks into a single [`TrieInputSorted`] by combining their
/// Aggregates in-memory blocks into a single [`TrieInputSorted`] by combining their
/// state changes.
///
/// The input `blocks` vector is ordered newest -> oldest (see `TreeState::blocks_by_hash`).
/// We iterate it in reverse so we start with the oldest block's trie data and extend forward
/// toward the newest, ensuring newer state takes precedence.
///
/// Uses `extend_ref` loop for small k, k-way `merge_batch` for large k.
/// See [`MERGE_BATCH_THRESHOLD`] for crossover point.
fn merge_overlay_trie_input(blocks: &[ExecutedBlock<N>]) -> TrieInputSorted {
let mut input = TrieInputSorted::default();
let mut blocks_iter = blocks.iter().rev().peekable();
if let Some(first) = blocks_iter.next() {
let data = first.trie_data();
input.state = data.hashed_state;
input.nodes = data.trie_updates;
// Only clone and mutate if there are more in-memory blocks.
if blocks_iter.peek().is_some() {
let state_mut = Arc::make_mut(&mut input.state);
let nodes_mut = Arc::make_mut(&mut input.nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
}
if blocks.is_empty() {
return TrieInputSorted::default();
}
input
// Single block: return Arc directly without cloning
if blocks.len() == 1 {
let data = blocks[0].trie_data();
return TrieInputSorted {
state: Arc::clone(&data.hashed_state),
nodes: Arc::clone(&data.trie_updates),
prefix_sets: Default::default(),
};
}
if blocks.len() < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref loop is faster
// Iterate oldest->newest so newer values override older ones
let mut blocks_iter = blocks.iter().rev();
let first = blocks_iter.next().expect("blocks is non-empty");
let data = first.trie_data();
let mut state = Arc::clone(&data.hashed_state);
let mut nodes = Arc::clone(&data.trie_updates);
let state_mut = Arc::make_mut(&mut state);
let nodes_mut = Arc::make_mut(&mut nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
} else {
// Large k: merge_batch is faster (O(n log k) via k-way merge)
let trie_data: Vec<_> = blocks.iter().map(|b| b.trie_data()).collect();
let merged_state = HashedPostStateSorted::merge_batch(
trie_data.iter().map(|d| d.hashed_state.as_ref()),
);
let merged_nodes =
TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref()));
TrieInputSorted {
state: Arc::new(merged_state),
nodes: Arc::new(merged_nodes),
prefix_sets: Default::default(),
}
}
}
/// Spawns a background task to compute and sort trie data for the executed block.

View File

@@ -38,6 +38,7 @@ tempfile.workspace = true
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
dev = ["reth-cli-commands/arbitrary"]

View File

@@ -19,7 +19,7 @@ use reth_db::DatabaseEnv;
use reth_node_api::NodePrimitives;
use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_core::{
args::{LogArgs, OtlpInitStatus, TraceArgs},
args::{LogArgs, OtlpInitStatus, OtlpLogsStatus, TraceArgs},
version::version_metadata,
};
use reth_node_metrics::recorder::install_prometheus_recorder;
@@ -223,16 +223,19 @@ impl<
/// If file logging is enabled, this function returns a guard that must be kept alive to ensure
/// that all logs are flushed to disk.
///
/// If an OTLP endpoint is specified, it will export metrics to the configured collector.
/// If an OTLP endpoint is specified, it will export traces and logs to the configured
/// collector.
pub fn init_tracing(
&mut self,
runner: &CliRunner,
mut layers: Layers,
) -> eyre::Result<Option<FileWorkerGuard>> {
let otlp_status = runner.block_on(self.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.traces.init_otlp_logs(&mut layers))?;
let guard = self.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.traces.protocol);
@@ -243,6 +246,16 @@ impl<
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
Ok(guard)
}
}

View File

@@ -303,6 +303,8 @@ where
let eth_config =
EthConfigHandler::new(ctx.node.provider().clone(), ctx.node.evm_config().clone());
let testing_skip_invalid_transactions = ctx.config.rpc.testing_skip_invalid_transactions;
self.inner
.launch_add_ons_with(ctx, move |container| {
container.modules.merge_if_module_configured(
@@ -316,14 +318,16 @@ where
// testing_buildBlockV1: only wire when the hidden testing module is explicitly
// requested on any transport. Default stays disabled to honor security guidance.
let testing_api = TestingApi::new(
let mut testing_api = TestingApi::new(
container.registry.eth_api().clone(),
container.registry.evm_config().clone(),
)
.into_rpc();
);
if testing_skip_invalid_transactions {
testing_api = testing_api.with_skip_invalid_transactions();
}
container
.modules
.merge_if_module_configured(RethRpcModule::Testing, testing_api)?;
.merge_if_module_configured(RethRpcModule::Testing, testing_api.into_rpc())?;
Ok(())
})

View File

@@ -31,8 +31,8 @@ reth-payload-validator.workspace = true
# ethereum
alloy-rlp.workspace = true
revm.workspace = true
alloy-rpc-types-engine.workspace = true
revm.workspace = true
# alloy
alloy-eips.workspace = true

View File

@@ -247,7 +247,7 @@ pub async fn test_exex_context_with_chain_spec(
db,
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
)?;
let genesis_hash = init_genesis(&provider_factory)?;

View File

@@ -81,7 +81,8 @@ tokio.workspace = true
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
otlp = ["reth-tracing/otlp"]
otlp = ["reth-tracing/otlp", "reth-tracing-otlp/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-tracing-otlp/otlp-logs"]
tracy = ["reth-tracing/tracy"]
min-error-logs = ["tracing/release_max_level_error"]

View File

@@ -26,7 +26,7 @@ pub use log::{ColorMode, LogArgs, Verbosity};
/// `TraceArgs` for tracing and spans support
mod trace;
pub use trace::{OtlpInitStatus, TraceArgs};
pub use trace::{OtlpInitStatus, OtlpLogsStatus, TraceArgs};
/// `MetricArgs` to configure metrics.
mod metric;

View File

@@ -640,6 +640,13 @@ pub struct RpcServerArgs {
value_parser = parse_duration_from_secs_or_ms,
)]
pub rpc_send_raw_transaction_sync_timeout: Duration,
/// Skip invalid transactions in `testing_buildBlockV1` instead of failing.
///
/// When enabled, transactions that fail execution will be skipped, and all subsequent
/// transactions from the same sender will also be skipped.
#[arg(long = "testing.skip-invalid-transactions", default_value_t = false)]
pub testing_skip_invalid_transactions: bool,
}
impl RpcServerArgs {
@@ -852,6 +859,7 @@ impl Default for RpcServerArgs {
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
testing_skip_invalid_transactions: false,
}
}
}
@@ -1026,6 +1034,7 @@ mod tests {
default_suggested_fee: None,
},
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
testing_skip_invalid_transactions: true,
};
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([
@@ -1114,6 +1123,7 @@ mod tests {
"60",
"--rpc.send-raw-transaction-sync-timeout",
"30s",
"--testing.skip-invalid-transactions",
])
.args;

View File

@@ -61,16 +61,6 @@ pub struct StaticFilesArgs {
/// the node has been initialized, changing this flag requires re-syncing from scratch.
#[arg(long = "static-files.account-change-sets")]
pub account_changesets: bool,
/// Use `RocksDB` for history indices instead of MDBX.
///
/// When enabled, `AccountsHistory`, `StoragesHistory`, and `TransactionHashNumbers`
/// tables will be stored in `RocksDB` for better write performance.
///
/// Note: This setting can only be configured at genesis initialization. Once
/// the node has been initialized, changing this flag requires re-syncing from scratch.
#[arg(long = "storage.rocksdb")]
pub rocksdb: bool,
}
impl StaticFilesArgs {
@@ -111,8 +101,5 @@ impl StaticFilesArgs {
.with_receipts_in_static_files(self.receipts)
.with_transaction_senders_in_static_files(self.transaction_senders)
.with_account_changesets_in_static_files(self.account_changesets)
.with_account_history_in_rocksdb(self.rocksdb)
.with_storages_history_in_rocksdb(self.rocksdb)
.with_transaction_hash_numbers_in_rocksdb(self.rocksdb)
}
}

View File

@@ -1,4 +1,4 @@
//! Opentelemetry tracing configuration through CLI args.
//! Opentelemetry tracing and logging configuration through CLI args.
use clap::Parser;
use eyre::WrapErr;
@@ -6,7 +6,7 @@ use reth_tracing::{tracing_subscriber::EnvFilter, Layers};
use reth_tracing_otlp::OtlpProtocol;
use url::Url;
/// CLI arguments for configuring `Opentelemetry` trace and span export.
/// CLI arguments for configuring `Opentelemetry` trace and logs export.
#[derive(Debug, Clone, Parser)]
pub struct TraceArgs {
/// Enable `Opentelemetry` tracing export to an OTLP endpoint.
@@ -30,9 +30,29 @@ pub struct TraceArgs {
)]
pub otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces.
/// Enable `Opentelemetry` logs export to an OTLP endpoint.
///
/// - `http`: expects endpoint path to end with `/v1/traces`
/// If no value provided, defaults based on protocol:
/// - HTTP: `http://localhost:4318/v1/logs`
/// - gRPC: `http://localhost:4317`
///
/// Example: --logs-otlp=http://collector:4318/v1/logs
#[arg(
long = "logs-otlp",
env = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
global = true,
value_name = "URL",
num_args = 0..=1,
default_missing_value = "http://localhost:4318/v1/logs",
require_equals = true,
value_parser = parse_otlp_endpoint,
help_heading = "Logging"
)]
pub logs_otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces and logs.
///
/// - `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs`
/// - `grpc`: expects endpoint without a path
///
/// Defaults to HTTP if not specified.
@@ -62,6 +82,22 @@ pub struct TraceArgs {
)]
pub otlp_filter: EnvFilter,
/// Set a filter directive for the OTLP logs exporter. This controls the verbosity
/// of logs sent to the OTLP endpoint. It follows the same syntax as the
/// `RUST_LOG` environment variable.
///
/// Example: --logs-otlp.filter=info,reth=debug
///
/// Defaults to INFO if not specified.
#[arg(
long = "logs-otlp.filter",
global = true,
value_name = "FILTER",
default_value = "info",
help_heading = "Logging"
)]
pub logs_otlp_filter: EnvFilter,
/// Service name to use for OTLP tracing export.
///
/// This name will be used to identify the service in distributed tracing systems
@@ -101,8 +137,10 @@ impl Default for TraceArgs {
fn default() -> Self {
Self {
otlp: None,
logs_otlp: None,
protocol: OtlpProtocol::Http,
otlp_filter: EnvFilter::from_default_env(),
logs_otlp_filter: EnvFilter::try_new("info").expect("valid filter"),
sample_ratio: None,
service_name: "reth".to_string(),
}
@@ -150,6 +188,37 @@ impl TraceArgs {
Ok(OtlpInitStatus::Disabled)
}
}
/// Initialize OTLP logs export with the given layers.
///
/// This method handles OTLP logs initialization based on the configured options,
/// including validation and protocol selection.
///
/// Returns the initialization status to allow callers to log appropriate messages.
pub async fn init_otlp_logs(&mut self, _layers: &mut Layers) -> eyre::Result<OtlpLogsStatus> {
if let Some(endpoint) = self.logs_otlp.as_mut() {
self.protocol.validate_logs_endpoint(endpoint)?;
#[cfg(feature = "otlp-logs")]
{
let config = reth_tracing_otlp::OtlpLogsConfig::new(
self.service_name.clone(),
endpoint.clone(),
self.protocol,
)?;
_layers.with_log_layer(config.clone(), self.logs_otlp_filter.clone())?;
Ok(OtlpLogsStatus::Started(config.endpoint().clone()))
}
#[cfg(not(feature = "otlp-logs"))]
{
Ok(OtlpLogsStatus::NoFeature)
}
} else {
Ok(OtlpLogsStatus::Disabled)
}
}
}
/// Status of OTLP tracing initialization.
@@ -163,6 +232,17 @@ pub enum OtlpInitStatus {
NoFeature,
}
/// Status of OTLP logs initialization.
#[derive(Debug)]
pub enum OtlpLogsStatus {
/// OTLP logs export was successfully started with the given endpoint.
Started(Url),
/// OTLP logs export is disabled (no endpoint configured).
Disabled,
/// OTLP logs arguments provided but feature is not compiled.
NoFeature,
}
// Parses an OTLP endpoint url.
fn parse_otlp_endpoint(arg: &str) -> eyre::Result<Url> {
Url::parse(arg).wrap_err("Invalid URL for OTLP trace output")

View File

@@ -342,14 +342,6 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
self
}
/// Converts the node configuration to [`StorageSettings`].
///
/// This returns storage settings configured via CLI arguments including
/// static file settings and `RocksDB` settings.
pub const fn to_storage_settings(&self) -> reth_provider::StorageSettings {
self.static_files.to_settings()
}
/// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig>
where

View File

@@ -43,6 +43,7 @@ tracy = ["reth-optimism-cli/tracy"]
asm-keccak = ["reth-optimism-cli/asm-keccak", "reth-optimism-node/asm-keccak"]
keccak-cache-global = [
"reth-optimism-cli/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
dev = [

View File

@@ -76,8 +76,9 @@ reth-optimism-chainspec = { workspace = true, features = ["std", "superchain-con
[features]
default = []
# Opentelemtry feature to activate metrics export
# Opentelemetry feature to activate tracing and logs export
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
asm-keccak = [
"alloy-primitives/asm-keccak",
@@ -85,6 +86,12 @@ asm-keccak = [
"reth-optimism-node/asm-keccak",
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
# Jemalloc feature for vergen to generate correct env vars
jemalloc = [
"reth-node-core/jemalloc",

View File

@@ -3,7 +3,7 @@ use eyre::{eyre, Result};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::launcher::Launcher;
use reth_cli_runner::CliRunner;
use reth_node_core::args::OtlpInitStatus;
use reth_node_core::args::{OtlpInitStatus, OtlpLogsStatus};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_consensus::OpBeaconConsensus;
@@ -124,9 +124,11 @@ where
let mut layers = self.layers.take().unwrap_or_default();
let otlp_status = runner.block_on(self.cli.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.cli.traces.init_otlp_logs(&mut layers))?;
self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.cli.traces.protocol);
@@ -136,6 +138,16 @@ where
}
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.cli.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
}
Ok(())
}

View File

@@ -77,6 +77,7 @@ arbitrary = [
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
"reth-optimism-cli?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -1,12 +1,12 @@
//! Sealed block types
use crate::{
block::{error::BlockRecoveryError, RecoveredBlock},
transaction::signed::RecoveryError,
block::{error::BlockRecoveryError, header::BlockHeader, RecoveredBlock},
transaction::signed::{RecoveryError, SignedTransaction},
Block, BlockBody, GotExpected, InMemorySize, SealedHeader,
};
use alloc::vec::Vec;
use alloy_consensus::BlockHeader;
use alloy_consensus::BlockHeader as _;
use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
use alloy_primitives::{Address, BlockHash, Sealable, Sealed, B256};
use alloy_rlp::{Decodable, Encodable};
@@ -327,6 +327,31 @@ impl<B: Block> From<SealedBlock<B>> for Sealed<B> {
}
}
impl<B: Block> From<Sealed<B>> for SealedBlock<B> {
fn from(value: Sealed<B>) -> Self {
let (block, hash) = value.into_parts();
Self::new_unchecked(block, hash)
}
}
impl<T, H> SealedBlock<alloy_consensus::Block<T, H>>
where
T: Decodable + SignedTransaction,
H: BlockHeader,
{
/// Decodes the block from RLP, computing the header hash directly from the RLP bytes.
///
/// This is more efficient than decoding and then sealing, as the header hash is computed
/// from the raw RLP bytes without re-encoding.
///
/// This leverages [`alloy_consensus::Block::decode_sealed`].
pub fn decode_sealed(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let sealed = alloy_consensus::Block::<T, H>::decode_sealed(buf)?;
let (block, hash) = sealed.into_parts();
Ok(Self::new_unchecked(block, hash))
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl<'a, B> arbitrary::Arbitrary<'a> for SealedBlock<B>
where
@@ -555,4 +580,96 @@ mod tests {
assert_eq!(sealed_block.header().state_root, decoded.header().state_root);
assert_eq!(sealed_block.body().transactions.len(), decoded.body().transactions.len());
}
#[test]
fn test_decode_sealed_produces_correct_hash() {
// Create a sample block using alloy_consensus::Block
let header = alloy_consensus::Header {
parent_hash: B256::ZERO,
ommers_hash: B256::ZERO,
beneficiary: Address::ZERO,
state_root: B256::ZERO,
transactions_root: B256::ZERO,
receipts_root: B256::ZERO,
logs_bloom: Default::default(),
difficulty: Default::default(),
number: 42,
gas_limit: 30_000_000,
gas_used: 21_000,
timestamp: 1_000_000,
extra_data: Default::default(),
mix_hash: B256::ZERO,
nonce: Default::default(),
base_fee_per_gas: Some(1_000_000_000),
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
};
// Create a simple transaction
let tx = alloy_consensus::TxLegacy {
chain_id: Some(1),
nonce: 0,
gas_price: 21_000_000_000,
gas_limit: 21_000,
to: alloy_primitives::TxKind::Call(Address::ZERO),
value: alloy_primitives::U256::from(100),
input: alloy_primitives::Bytes::default(),
};
let tx_signed =
alloy_consensus::TxEnvelope::Legacy(alloy_consensus::Signed::new_unchecked(
tx,
alloy_primitives::Signature::test_signature(),
B256::ZERO,
));
// Create block body with the transaction
let body = alloy_consensus::BlockBody {
transactions: vec![tx_signed],
ommers: vec![],
withdrawals: Some(Default::default()),
};
// Create the block
let block = alloy_consensus::Block::new(header, body);
let expected_hash = block.header.hash_slow();
// Encode the block
let mut encoded = Vec::new();
block.encode(&mut encoded);
// Decode using decode_sealed - this should compute hash from raw RLP
let decoded =
SealedBlock::<alloy_consensus::Block<alloy_consensus::TxEnvelope>>::decode_sealed(
&mut encoded.as_slice(),
)
.expect("Failed to decode sealed block");
// Verify the hash matches
assert_eq!(decoded.hash(), expected_hash);
assert_eq!(decoded.header().number, 42);
assert_eq!(decoded.body().transactions.len(), 1);
}
#[test]
fn test_sealed_block_from_sealed() {
let header = alloy_consensus::Header::default();
let body = alloy_consensus::BlockBody::<alloy_consensus::TxEnvelope>::default();
let block = alloy_consensus::Block::new(header, body);
let hash = block.header.hash_slow();
// Create Sealed<Block>
let sealed: Sealed<alloy_consensus::Block<alloy_consensus::TxEnvelope>> =
Sealed::new_unchecked(block.clone(), hash);
// Convert to SealedBlock
let sealed_block: SealedBlock<alloy_consensus::Block<alloy_consensus::TxEnvelope>> =
SealedBlock::from(sealed);
assert_eq!(sealed_block.hash(), hash);
assert_eq!(sealed_block.header().number, block.header.number);
}
}

View File

@@ -36,7 +36,6 @@ alloy-serde.workspace = true
alloy-rpc-types-beacon.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-genesis.workspace = true
serde = { workspace = true, features = ["derive"] }
# misc
jsonrpsee = { workspace = true, features = ["server", "macros"] }

View File

@@ -5,32 +5,24 @@
//! disabled by default and never be exposed on public-facing RPC without an
//! explicit operator flag.
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV5, PayloadAttributes as EthPayloadAttributes,
};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};
/// Capability string for `testing_buildBlockV1`.
pub const TESTING_BUILD_BLOCK_V1: &str = "testing_buildBlockV1";
/// Request payload for `testing_buildBlockV1`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TestingBuildBlockRequestV1 {
/// Parent block hash of the block to build.
pub parent_block_hash: B256,
/// Payload attributes (Cancun version).
pub payload_attributes: EthPayloadAttributes,
/// Raw signed transactions to force-include in order.
pub transactions: Vec<Bytes>,
/// Optional extra data for the block header.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub extra_data: Option<Bytes>,
}
pub use alloy_rpc_types_engine::{TestingBuildBlockRequestV1, TESTING_BUILD_BLOCK_V1};
/// Testing RPC interface for building a block in a single call.
///
/// # Enabling
///
/// This namespace is disabled by default for security reasons. To enable it,
/// add `testing` to the `--http.api` flag:
///
/// ```sh
/// reth node --http --http.api eth,testing
/// ```
///
/// **Warning:** Never expose this on public-facing RPC endpoints without proper
/// authentication.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "testing"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "testing"))]
pub trait TestingApi {

View File

@@ -1,10 +1,22 @@
//! Implementation of the `testing` namespace.
//!
//! This exposes `testing_buildBlockV1`, intended for non-production/debug use.
//!
//! # Enabling the testing namespace
//!
//! The `testing_` namespace is disabled by default for security reasons.
//! To enable it, add `testing` to the `--http.api` flag when starting the node:
//!
//! ```sh
//! reth node --http --http.api eth,testing
//! ```
//!
//! **Warning:** This namespace allows building arbitrary blocks. Never expose it
//! on public-facing RPC endpoints without proper authentication.
use alloy_consensus::{Header, Transaction};
use alloy_evm::Evm;
use alloy_primitives::U256;
use alloy_primitives::{map::HashSet, Address, U256};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
@@ -19,19 +31,31 @@ use reth_rpc_eth_api::{helpers::Call, FromEthApiError};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_storage_api::{BlockReader, HeaderProvider};
use revm::context::Block;
use revm_primitives::map::DefaultHashBuilder;
use std::sync::Arc;
use tracing::debug;
/// Testing API handler.
#[derive(Debug, Clone)]
pub struct TestingApi<Eth, Evm> {
eth_api: Eth,
evm_config: Evm,
/// If true, skip invalid transactions instead of failing.
skip_invalid_transactions: bool,
}
impl<Eth, Evm> TestingApi<Eth, Evm> {
/// Create a new testing API handler.
pub const fn new(eth_api: Eth, evm_config: Evm) -> Self {
Self { eth_api, evm_config }
Self { eth_api, evm_config, skip_invalid_transactions: false }
}
/// Enable skipping invalid transactions instead of failing.
/// When a transaction fails, all subsequent transactions from the same sender are also
/// skipped.
pub const fn with_skip_invalid_transactions(mut self) -> Self {
self.skip_invalid_transactions = true;
self
}
}
@@ -46,6 +70,7 @@ where
request: TestingBuildBlockRequestV1,
) -> Result<ExecutionPayloadEnvelopeV5, Eth::Error> {
let evm_config = self.evm_config.clone();
let skip_invalid_transactions = self.skip_invalid_transactions;
self.eth_api
.spawn_with_state_at_block(request.parent_block_hash, move |eth_api, state| {
let state = state.database.0;
@@ -79,11 +104,41 @@ where
let mut total_fees = U256::ZERO;
let base_fee = builder.evm_mut().block().basefee();
for tx in request.transactions {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(&tx)?;
let mut invalid_senders: HashSet<Address, DefaultHashBuilder> = HashSet::default();
for (idx, tx) in request.transactions.iter().enumerate() {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(tx)?;
let sender = tx.signer();
if skip_invalid_transactions && invalid_senders.contains(&sender) {
continue;
}
let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default();
let gas_used =
builder.execute_transaction(tx).map_err(Eth::Error::from_eth_err)?;
let gas_used = match builder.execute_transaction(tx) {
Ok(gas_used) => gas_used,
Err(err) => {
if skip_invalid_transactions {
debug!(
target: "rpc::testing",
tx_idx = idx,
?sender,
error = ?err,
"Skipping invalid transaction"
);
invalid_senders.insert(sender);
continue;
}
debug!(
target: "rpc::testing",
tx_idx = idx,
?sender,
error = ?err,
"Transaction execution failed"
);
return Err(Eth::Error::from_eth_err(err));
}
};
total_fees += U256::from(tip) * U256::from(gas_used);
}

View File

@@ -347,10 +347,7 @@ mod tests {
.with_blocks_per_file(1)
.build()
.unwrap(),
RocksDBProvider::builder(create_test_rocksdb_dir().0.keep())
.with_default_tables()
.build()
.unwrap(),
RocksDBProvider::builder(create_test_rocksdb_dir().0.keep()).build().unwrap(),
)
.unwrap();

View File

@@ -1,25 +1,16 @@
use crate::stages::utils::collect_history_indices;
use super::{collect_account_history_indices, load_accounts_history_indices};
use alloy_primitives::{Address, BlockNumber};
use super::{collect_account_history_indices, load_history_indices};
use alloy_primitives::Address;
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{
cursor::DbCursorRO,
models::ShardedKey,
table::Decode,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_provider::{
make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, DBProvider,
EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::NodePrimitivesProvider;
use std::fmt::Debug;
use tracing::info;
@@ -62,9 +53,7 @@ where
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache
+ NodePrimitivesProvider
+ RocksDBProviderFactory,
+ StorageSettingsCache,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -136,7 +125,7 @@ where
};
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_accounts_history_indices(
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
@@ -157,40 +146,9 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Create EitherWriter for account history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
// Read changesets to identify what to unwind
let changesets = provider
.tx_ref()
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
// Group by address and find minimum block for each
// We only need to unwind once per address using the LOWEST block number
// since unwind removes all indices >= that block
let mut account_keys: std::collections::HashMap<Address, BlockNumber> =
std::collections::HashMap::new();
for (block_number, account) in changesets {
account_keys
.entry(account.address)
.and_modify(|min_bn| *min_bn = (*min_bn).min(block_number))
.or_insert(block_number);
}
// Unwind each account's history shards (once per unique address)
for (address, min_block) in account_keys {
super::utils::unwind_accounts_history_shards(&mut writer, address, min_block)?;
}
// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);
provider.unwind_account_history_indices_range(range)?;
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@@ -689,127 +647,3 @@ mod tests {
}
}
}
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_stage_tests {
use super::*;
use crate::test_utils::TestStageDB;
use reth_db_api::tables;
use reth_provider::{DatabaseProviderFactory, RocksDBProviderFactory};
use reth_storage_api::StorageSettings;
/// Test that `IndexAccountHistoryStage` writes to `RocksDB` when enabled.
#[test]
fn test_index_account_history_writes_to_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Setup changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
reth_db_api::models::StoredBlockBodyIndices {
tx_count: 3,
..Default::default()
},
)?;
tx.put::<tables::AccountChangeSets>(
block,
reth_db_api::models::AccountBeforeTx {
address: alloy_primitives::address!(
"0x0000000000000000000000000000000000000001"
),
info: None,
},
)?;
}
Ok(())
})
.unwrap();
// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify data is in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(count > 0, "Expected data in RocksDB, found {count} entries");
// Verify MDBX AccountsHistory is empty (data went to RocksDB)
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
assert!(mdbx_table.is_empty(), "MDBX should be empty when RocksDB is enabled");
}
/// Test that `IndexAccountHistoryStage` unwind clears `RocksDB` data.
#[test]
fn test_index_account_history_unwind_clears_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Setup changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
reth_db_api::models::StoredBlockBodyIndices {
tx_count: 3,
..Default::default()
},
)?;
tx.put::<tables::AccountChangeSets>(
block,
reth_db_api::models::AccountBeforeTx {
address: alloy_primitives::address!(
"0x0000000000000000000000000000000000000001"
),
info: None,
},
)?;
}
Ok(())
})
.unwrap();
// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify data exists in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let before_count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(before_count > 0, "Expected data in RocksDB before unwind");
// Unwind to block 0 (removes blocks 1-10, leaving nothing)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 0,
..Default::default()
};
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
provider.commit().unwrap();
// Verify RocksDB is cleared (no block 0 data exists)
let rocksdb = db.factory.rocksdb_provider();
let after_count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert_eq!(after_count, 0, "RocksDB should be empty after unwind to 0");
}
}

View File

@@ -1,22 +1,15 @@
use super::{collect_history_indices, load_storages_history_indices};
use super::{collect_history_indices, load_history_indices};
use crate::{StageCheckpoint, StageId};
use alloy_primitives::{Address, BlockNumber, B256};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{
cursor::DbCursorRO,
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Decode,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_provider::{
make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, DBProvider,
EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_storage_api::NodePrimitivesProvider;
use std::fmt::Debug;
use tracing::info;
@@ -53,13 +46,8 @@ impl Default for IndexStorageHistoryStage {
impl<Provider> Stage<Provider> for IndexStorageHistoryStage
where
Provider: DBProvider<Tx: DbTxMut>
+ PruneCheckpointWriter
+ HistoryWriter
+ PruneCheckpointReader
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
Provider:
DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -128,7 +116,7 @@ where
)?;
info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
load_storages_history_indices(
load_history_indices::<_, tables::StoragesHistory, _>(
provider,
collector,
first_sync,
@@ -151,44 +139,7 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Create EitherWriter for storage history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
// Read changesets to identify what to unwind
let changesets = provider
.tx_ref()
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(BlockNumberAddress::range(range))?
.collect::<Result<Vec<_>, _>>()?;
// Group by (address, storage_key) and find minimum block for each
// We only need to unwind once per (address, key) using the LOWEST block number
// since unwind removes all indices >= that block
let mut storage_keys: std::collections::HashMap<(Address, B256), BlockNumber> =
std::collections::HashMap::new();
for (BlockNumberAddress((bn, address)), storage) in changesets {
storage_keys
.entry((address, storage.key))
.and_modify(|min_bn| *min_bn = (*min_bn).min(bn))
.or_insert(bn);
}
// Unwind each storage slot's history shards (once per unique key)
for ((address, storage_key), min_block) in storage_keys {
super::utils::unwind_storages_history_shards(
&mut writer,
address,
storage_key,
min_block,
)?;
}
// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
@@ -713,117 +664,3 @@ mod tests {
}
}
}
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_stage_tests {
use super::*;
use crate::test_utils::TestStageDB;
use alloy_primitives::{address, b256, U256};
use reth_db_api::{models::StoredBlockBodyIndices, tables};
use reth_primitives_traits::StorageEntry;
use reth_provider::{DatabaseProviderFactory, RocksDBProviderFactory};
use reth_storage_api::StorageSettings;
const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
const STORAGE_KEY: alloy_primitives::B256 =
b256!("0x0000000000000000000000000000000000000000000000000000000000000001");
/// Test that `IndexStorageHistoryStage` writes to `RocksDB` when enabled.
#[test]
fn test_index_storage_history_writes_to_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Setup storage changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, ADDRESS)),
StorageEntry { key: STORAGE_KEY, value: U256::ZERO },
)?;
}
Ok(())
})
.unwrap();
// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify data is in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let count =
rocksdb.iter::<tables::StoragesHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(count > 0, "Expected data in RocksDB, found {count} entries");
// Verify MDBX StoragesHistory is empty (data went to RocksDB)
let mdbx_table = db.table::<tables::StoragesHistory>().unwrap();
assert!(mdbx_table.is_empty(), "MDBX should be empty when RocksDB is enabled");
}
/// Test that `IndexStorageHistoryStage` unwind clears `RocksDB` data.
#[test]
fn test_index_storage_history_unwind_clears_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Setup storage changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, ADDRESS)),
StorageEntry { key: STORAGE_KEY, value: U256::ZERO },
)?;
}
Ok(())
})
.unwrap();
// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify data exists in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let before_count =
rocksdb.iter::<tables::StoragesHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(before_count > 0, "Expected data in RocksDB before unwind");
// Unwind to block 0 (removes blocks 1-10, leaving nothing)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 0,
..Default::default()
};
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
provider.commit().unwrap();
// Verify RocksDB is cleared (no block 0 data exists)
let rocksdb = db.factory.rocksdb_provider();
let after_count =
rocksdb.iter::<tables::StoragesHistory>().unwrap().filter_map(|r| r.ok()).count();
assert_eq!(after_count, 0, "RocksDB should be empty after unwind to 0");
}
}

View File

@@ -10,10 +10,9 @@ use reth_db_api::{
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, BlockReader, DBProvider,
EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionsProvider,
TransactionsProviderExt,
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -159,11 +158,15 @@ where
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
@@ -184,8 +187,11 @@ where
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Register RocksDB batch for commit at provider level
register_rocksdb_batch(provider, writer);
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
trace!(target: "sync::stages::transaction_lookup",
total_hashes,
@@ -211,11 +217,15 @@ where
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();
@@ -238,8 +248,11 @@ where
}
}
// Register RocksDB batch for commit at provider level
register_rocksdb_batch(provider, writer);
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)

View File

@@ -1,28 +1,21 @@
//! Utils for `stages`.
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use alloy_primitives::{Address, BlockNumber, TxNumber};
use reth_config::config::EtlConfig;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
AccountBeforeTx, ShardedKey,
},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
table::{Decompress, Table},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
make_rocksdb_batch_arg, make_rocksdb_provider, providers::StaticFileProvider,
register_rocksdb_batch, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache,
providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::StageError;
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider};
use reth_storage_errors::provider::ProviderResult;
use reth_storage_api::ChangeSetReader;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
@@ -119,40 +112,6 @@ where
Ok::<(), StageError>(())
}
/// Generic shard-and-write helper used by both account and storage history loaders.
///
/// Chunks the list into shards, writes each shard via the provided write function,
/// and handles the last shard according to [`LoadMode`].
fn shard_and_write<F>(
list: &mut Vec<BlockNumber>,
mode: LoadMode,
mut write_fn: F,
) -> Result<(), StageError>
where
F: FnMut(Vec<u64>, BlockNumber) -> Result<(), StageError>,
{
if list.len() <= NUM_OF_INDICES_IN_SHARD && !mode.is_flush() {
return Ok(());
}
let chunks: Vec<_> = list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let highest = *chunk.last().expect("at least one index");
let is_last = iter.peek().is_none();
if !mode.is_flush() && is_last {
*list = chunk;
} else {
let highest = if is_last { u64::MAX } else { highest };
write_fn(chunk, highest)?;
}
}
Ok(())
}
/// Collects account history indices using a provider that implements `ChangeSetReader`.
pub(crate) fn collect_account_history_indices<Provider>(
provider: &Provider,
@@ -220,7 +179,6 @@ where
/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length
/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial
/// key shard is stored.
#[allow(dead_code)]
pub(crate) fn load_history_indices<Provider, H, P>(
provider: &Provider,
mut collector: Collector<H::Key, H::Value>,
@@ -305,7 +263,6 @@ where
}
/// Shard and insert the indices list according to [`LoadMode`] and its length.
#[allow(dead_code)]
pub(crate) fn load_indices<H, C, P>(
cursor: &mut C,
partial_key: P,
@@ -364,289 +321,6 @@ impl LoadMode {
}
}
/// Loads storage history indices from a collector into the database using `EitherWriter`.
///
/// This is a specialized version of [`load_history_indices`] for `tables::StoragesHistory`
/// that supports writing to either `MDBX` or `RocksDB` based on storage settings.
#[allow(dead_code)]
pub(crate) fn load_storages_history_indices<Provider, P>(
provider: &Provider,
mut collector: Collector<
<tables::StoragesHistory as Table>::Key,
<tables::StoragesHistory as Table>::Value,
>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> StorageShardedKey,
decode_key: impl Fn(Vec<u8>) -> Result<StorageShardedKey, DatabaseError>,
get_partial: impl Fn(StorageShardedKey) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
P: Copy + Default + Eq,
{
// Create EitherWriter for storage history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
// Create read cursor for checking existing shards
let mut read_cursor = provider.tx_ref().cursor_read::<tables::StoragesHistory>()?;
let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();
// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing storage history indices");
}
let partial_key = get_partial(sharded_key);
if current_partial != partial_key {
// Flush last shard for previous partial key
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
current_partial = partial_key;
current_list.clear();
// If not first sync, merge with existing shard
if !append_only &&
let Some((_, last_database_shard)) =
read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}
current_list.extend(new_list.iter());
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}
// Flush remaining shard
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);
Ok(())
}
/// Shard and insert storage history indices according to [`LoadMode`] and list length.
#[allow(dead_code)]
fn load_storages_history_shard<P, CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> StorageShardedKey,
_append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
P: Copy,
{
shard_and_write(list, mode, |chunk, highest| {
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
Ok(writer.put_storage_history(key, &value)?)
})
}
/// Loads account history indices from a collector into the database using `EitherWriter`.
///
/// This is a specialized version of [`load_history_indices`] for `tables::AccountsHistory`
/// that supports writing to either `MDBX` or `RocksDB` based on storage settings.
#[allow(dead_code)]
pub(crate) fn load_accounts_history_indices<Provider, P>(
provider: &Provider,
mut collector: Collector<
<tables::AccountsHistory as Table>::Key,
<tables::AccountsHistory as Table>::Value,
>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> ShardedKey<Address>,
decode_key: impl Fn(Vec<u8>) -> Result<ShardedKey<Address>, DatabaseError>,
get_partial: impl Fn(ShardedKey<Address>) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
P: Copy + Default + Eq,
{
// Create EitherWriter for account history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
// Create read cursor for checking existing shards
let mut read_cursor = provider.tx_ref().cursor_read::<tables::AccountsHistory>()?;
let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();
// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing account history indices");
}
let partial_key = get_partial(sharded_key);
if current_partial != partial_key {
// Flush last shard for previous partial key
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
current_partial = partial_key;
current_list.clear();
// If not first sync, merge with existing shard
if !append_only &&
let Some((_, last_database_shard)) =
read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}
current_list.extend(new_list.iter());
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}
// Flush remaining shard
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);
Ok(())
}
/// Shard and insert account history indices according to [`LoadMode`] and list length.
#[allow(dead_code)]
fn load_accounts_history_shard<P, CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> ShardedKey<Address>,
_append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
P: Copy,
{
shard_and_write(list, mode, |chunk, highest| {
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
Ok(writer.put_account_history(key, &value)?)
})
}
/// Unwinds storage history shards using `EitherWriter` for `RocksDB` support.
///
/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`.
/// Walks through shards for a given key, deleting those >= unwind point and preserving
/// indices below the unwind point.
#[allow(dead_code)]
pub(crate) fn unwind_storages_history_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
storage_key: B256,
block_number: BlockNumber,
) -> ProviderResult<()>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
{
writer.unwind_storage_history_shards(address, storage_key, block_number)
}
/// Unwinds account history shards using `EitherWriter` for `RocksDB` support.
///
/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`.
/// Walks through shards for a given key, deleting those >= unwind point and preserving
/// indices below the unwind point.
#[allow(dead_code)]
pub(crate) fn unwind_accounts_history_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
block_number: BlockNumber,
) -> ProviderResult<()>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
{
writer.unwind_account_history_shards(address, block_number)
}
/// Called when database is ahead of static files. Attempts to find the first block we are missing
/// transactions for.
pub(crate) fn missing_static_data_error<Provider>(

View File

@@ -92,10 +92,10 @@ impl DbTx for TxMock {
/// Commits the transaction.
///
/// **Mock behavior**: Always returns `Ok(true)`, indicating successful commit.
/// **Mock behavior**: Always returns `Ok(())`, indicating successful commit.
/// No actual data is persisted since this is a mock implementation.
fn commit(self) -> Result<bool, DatabaseError> {
Ok(true)
fn commit(self) -> Result<(), DatabaseError> {
Ok(())
}
/// Aborts the transaction.

View File

@@ -44,9 +44,9 @@ impl StorageSettings {
receipts_in_static_files: true,
transaction_senders_in_static_files: true,
account_changesets_in_static_files: true,
storages_history_in_rocksdb: true,
transaction_hash_numbers_in_rocksdb: true,
account_history_in_rocksdb: true,
storages_history_in_rocksdb: false,
transaction_hash_numbers_in_rocksdb: false,
account_history_in_rocksdb: false,
}
}

View File

@@ -35,7 +35,7 @@ pub trait DbTx: Debug + Send {
) -> Result<Option<T::Value>, DatabaseError>;
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, DatabaseError>;
fn commit(self) -> Result<(), DatabaseError>;
/// Aborts transaction
fn abort(self);
/// Iterate over read only values in table.

View File

@@ -765,9 +765,13 @@ mod tests {
};
use alloy_genesis::Genesis;
use reth_chainspec::{Chain, ChainSpec, HOLESKY, MAINNET, SEPOLIA};
use reth_db::DatabaseEnv;
use reth_db_api::{
cursor::DbCursorRO,
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
table::{Table, TableRow},
transaction::DbTx,
Database,
};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
@@ -775,17 +779,6 @@ mod tests {
};
use std::{collections::BTreeMap, sync::Arc};
#[cfg(not(feature = "edge"))]
use reth_db::DatabaseEnv;
#[cfg(not(feature = "edge"))]
use reth_db_api::{
cursor::DbCursorRO,
table::{Table, TableRow},
transaction::DbTx,
Database,
};
#[cfg(not(feature = "edge"))]
fn collect_table_entries<DB, T>(
tx: &<DB as Database>::TX,
) -> Result<Vec<TableRow<T>>, InitStorageError>
@@ -882,74 +875,26 @@ mod tests {
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
init_genesis(&factory).unwrap();
// In edge mode, history indices are written to RocksDB instead of MDBX
#[cfg(feature = "edge")]
{
let rocksdb = factory.rocksdb_provider();
let provider = factory.provider().unwrap();
let account_history: Vec<_> = rocksdb
.iter::<tables::AccountsHistory>()
.expect("failed to iterate")
.collect::<Result<Vec<_>, _>>()
.expect("failed to collect");
let tx = provider.tx_ref();
assert_eq!(
account_history,
vec![
(
ShardedKey::new(address_with_balance, u64::MAX),
IntegerList::new([0]).unwrap()
),
(
ShardedKey::new(address_with_storage, u64::MAX),
IntegerList::new([0]).unwrap()
)
],
);
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
.expect("failed to collect"),
vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap())
],
);
let storage_history: Vec<_> = rocksdb
.iter::<tables::StoragesHistory>()
.expect("failed to iterate")
.collect::<Result<Vec<_>, _>>()
.expect("failed to collect");
assert_eq!(
storage_history,
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}
#[cfg(not(feature = "edge"))]
{
let provider = factory.provider().unwrap();
let tx = provider.tx_ref();
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
.expect("failed to collect"),
vec![
(
ShardedKey::new(address_with_balance, u64::MAX),
IntegerList::new([0]).unwrap()
),
(
ShardedKey::new(address_with_storage, u64::MAX),
IntegerList::new([0]).unwrap()
)
],
);
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}
}

View File

@@ -248,7 +248,7 @@ where
println!(
"{:?}\n",
tx.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.map_err(|_| format!("Could not find table: {}", T::NAME))
.map(|stats| {
let num_pages =

View File

@@ -278,7 +278,7 @@ impl DatabaseMetrics for DatabaseEnv {
let stats = tx
.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {table}"))?;
let page_size = stats.page_size() as usize;

View File

@@ -67,18 +67,25 @@ impl<K: TransactionKind> Tx<K> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
}
/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
if let Some(dbi) = self.dbis.get(T::NAME) {
/// Gets a table database handle by name if it exists, otherwise, check the
/// database, opening the DB if it exists.
pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
if let Some(dbi) = self.dbis.get(name) {
Ok(*dbi)
} else {
self.inner
.open_db(Some(T::NAME))
.open_db(Some(name))
.map(|db| db.dbi())
.map_err(|e| DatabaseError::Open(e.into()))
}
}
/// Gets a table database handle by name if it exists, otherwise, check the
/// database, opening the DB if it exists.
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
self.get_dbi_raw(T::NAME)
}
/// Create db Cursor
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
let inner = self
@@ -295,10 +302,10 @@ impl<K: TransactionKind> DbTx for Tx<K> {
})
}
fn commit(self) -> Result<bool, DatabaseError> {
fn commit(self) -> Result<(), DatabaseError> {
self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
Ok((v, latency)) => (Ok(v), Some(latency)),
Ok(latency) => (Ok(()), Some(latency)),
Err(e) => (Err(e), None),
}
})

View File

@@ -12,10 +12,10 @@ fn bench_get_seq_iter(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = db.dbi();
c.bench_function("bench_get_seq_iter", |b| {
b.iter(|| {
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
let mut i = 0;
let mut count = 0u32;
@@ -54,11 +54,11 @@ fn bench_get_seq_cursor(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = db.dbi();
c.bench_function("bench_get_seq_cursor", |b| {
b.iter(|| {
let (i, count) = txn
.cursor(&db)
.cursor(dbi)
.unwrap()
.iter::<ObjectLength, ObjectLength>()
.map(Result::unwrap)

View File

@@ -42,7 +42,9 @@ impl TableObject for Cow<'_, [u8]> {
#[cfg(not(feature = "return-borrowed"))]
{
let is_dirty = (!K::IS_READ_ONLY) &&
crate::error::mdbx_result(ffi::mdbx_is_dirty(_txn, data_val.iov_base))?;
crate::error::mdbx_result(unsafe {
ffi::mdbx_is_dirty(_txn, data_val.iov_base)
})?;
Ok(if is_dirty { Cow::Owned(s.to_vec()) } else { Cow::Borrowed(s) })
}

View File

@@ -211,7 +211,7 @@ impl Environment {
let mut freelist: usize = 0;
let txn = self.begin_ro_txn()?;
let db = Database::freelist_db();
let cursor = txn.cursor(&db)?;
let cursor = txn.cursor(db.dbi())?;
for result in cursor.iter_slices() {
let (_key, value) = result?;
@@ -989,7 +989,10 @@ mod tests {
result @ Err(_) => result.unwrap(),
}
}
tx.commit().unwrap();
// The transaction may be in an error state after hitting MapFull,
// so commit could fail. We don't care about the result here since
// the purpose of this test is to verify the HSR callback was called.
let _ = tx.commit();
}
// Expect the HSR to be called

View File

@@ -123,6 +123,12 @@ pub enum Error {
/// Read transaction has been timed out.
#[error("read transaction has been timed out")]
ReadTransactionTimeout,
/// The transaction commit was aborted due to previous errors.
///
/// This can happen in exceptionally rare cases and it signals the problem coming from inside
/// of mdbx.
#[error("botched transaction")]
BotchedTransaction,
/// Permission defined
#[error("permission denied to setup database")]
Permission,
@@ -204,6 +210,7 @@ impl Error {
Self::WriteTransactionUnsupportedInReadOnlyMode |
Self::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Self::ReadTransactionTimeout => -96000, // Custom non-MDBX error code
Self::BotchedTransaction => -96001,
Self::Permission => ffi::MDBX_EPERM,
Self::Other(err_code) => *err_code,
}
@@ -216,6 +223,14 @@ impl From<Error> for i32 {
}
}
/// Parses an MDBX error code into a result type.
///
/// Note that this function returns `Ok(false)` on `MDBX_SUCCESS` and
/// `Ok(true)` on `MDBX_RESULT_TRUE`. The return value requires extra
/// care since its interpretation depends on the callee being called.
///
/// The most unintuitive case is `mdbx_txn_commit` which returns `Ok(true)`
/// when the commit has been aborted.
#[inline]
pub(crate) const fn mdbx_result(err_code: c_int) -> Result<bool> {
match err_code {

View File

@@ -170,8 +170,8 @@ where
/// Commits the transaction.
///
/// Any pending operations will be saved.
pub fn commit(self) -> Result<(bool, CommitLatency)> {
let result = self.txn_execute(|txn| {
pub fn commit(self) -> Result<CommitLatency> {
match self.txn_execute(|txn| {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env().txn_manager().remove_active_read_transaction(txn);
@@ -186,10 +186,21 @@ where
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
rx.recv().unwrap()
}
})?;
self.inner.set_committed();
result
})? {
//
Ok((false, lat)) => {
self.inner.set_committed();
Ok(lat)
}
Ok((true, _)) => {
// MDBX_RESULT_TRUE means the transaction was aborted due to prior errors.
// The transaction is still finished/freed by MDBX, so we must mark it as
// committed to prevent the Drop impl from trying to abort it again.
self.inner.set_committed();
Err(Error::BotchedTransaction)
}
Err(e) => Err(e),
}
}
/// Opens a handle to an MDBX database.
@@ -208,11 +219,11 @@ where
}
/// Gets the option flags for the given database in the transaction.
pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
pub fn db_flags(&self, dbi: ffi::MDBX_dbi) -> Result<DatabaseFlags> {
let mut flags: c_uint = 0;
unsafe {
self.txn_execute(|txn| {
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, dbi, &mut flags, ptr::null_mut()))
})??;
}
@@ -222,8 +233,8 @@ where
}
/// Retrieves database statistics.
pub fn db_stat(&self, db: &Database) -> Result<Stat> {
self.db_stat_with_dbi(db.dbi())
pub fn db_stat(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
self.db_stat_with_dbi(dbi)
}
/// Retrieves database statistics by the given dbi.
@@ -238,8 +249,8 @@ where
}
/// Open a new cursor on the given database.
pub fn cursor(&self, db: &Database) -> Result<Cursor<K>> {
Cursor::new(self.clone(), db.dbi())
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
Cursor::new(self.clone(), dbi)
}
/// Open a new cursor on the given dbi.
@@ -400,7 +411,7 @@ impl Transaction<RW> {
#[allow(clippy::mut_from_ref)]
pub fn reserve(
&self,
db: &Database,
dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>,
len: usize,
flags: WriteFlags,
@@ -412,13 +423,7 @@ impl Transaction<RW> {
ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
unsafe {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_put(
txn,
db.dbi(),
&key_val,
&mut data_val,
flags.bits() | ffi::MDBX_RESERVE,
)
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits() | ffi::MDBX_RESERVE)
})?)?;
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
}
@@ -473,10 +478,10 @@ impl Transaction<RW> {
/// Drops the database from the environment.
///
/// # Safety
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, db.dbi(), true) })?)?;
/// Caller must close ALL other [Database] and [Cursor] instances pointing
/// to the same dbi BEFORE calling this function.
pub unsafe fn drop_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, true) })?)?;
Ok(())
}
@@ -488,8 +493,8 @@ impl Transaction<RO> {
/// # Safety
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()) })?;
pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), dbi) })?;
Ok(())
}

View File

@@ -9,15 +9,15 @@ fn test_get() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
assert_eq!(None, txn.cursor(&db).unwrap().first::<(), ()>().unwrap());
assert_eq!(None, txn.cursor(dbi).unwrap().first::<(), ()>().unwrap());
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.next().unwrap(), Some((*b"key2", *b"val2")));
@@ -34,15 +34,15 @@ fn test_get_dup() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
let dbi = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap().dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val3", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.first_dup().unwrap(), Some(*b"val1"));
assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1")));
@@ -78,15 +78,16 @@ fn test_get_dupfixed() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val4", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val5", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val6", WriteFlags::empty()).unwrap();
let dbi =
txn.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap().dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val4", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val5", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val6", WriteFlags::empty()).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1")));
assert_eq!(cursor.get_multiple().unwrap(), Some(*b"val1val2val3"));
assert_eq!(cursor.next_multiple::<(), ()>().unwrap(), None);
@@ -110,12 +111,12 @@ fn test_iter() {
for (key, data) in &items {
txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
}
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
// Because Result implements FromIterator, we can collect the iterator
// of items of type Result<_, E> into a Result<Vec<_, E>> by specifying
@@ -155,8 +156,8 @@ fn test_iter_empty_database() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert!(cursor.iter::<(), ()>().next().is_none());
assert!(cursor.iter_start::<(), ()>().next().is_none());
@@ -173,8 +174,8 @@ fn test_iter_empty_dup_database() {
txn.commit().unwrap();
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert!(cursor.iter::<(), ()>().next().is_none());
assert!(cursor.iter_start::<(), ()>().next().is_none());
@@ -223,8 +224,8 @@ fn test_iter_dup() {
}
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(items, cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap());
cursor.set::<()>(b"b").unwrap();
@@ -271,9 +272,9 @@ fn test_iter_del_get() {
let items = vec![(*b"a", *b"1"), (*b"b", *b"2")];
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
let dbi = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap().dbi();
assert_eq!(
txn.cursor(&db)
txn.cursor(dbi)
.unwrap()
.iter_dup_of::<(), ()>(b"a")
.collect::<Result<Vec<_>>>()
@@ -294,8 +295,8 @@ fn test_iter_del_get() {
}
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
assert_eq!(items, cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap());
assert_eq!(
@@ -316,8 +317,8 @@ fn test_put_del() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let mut cursor = txn.cursor(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let mut cursor = txn.cursor(dbi).unwrap();
cursor.put(b"key1", b"val1", WriteFlags::empty()).unwrap();
cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap();

View File

@@ -50,9 +50,9 @@ fn test_put_get_del_multi() {
txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut cur = txn.cursor(&db).unwrap();
let mut cur = txn.cursor(dbi).unwrap();
let iter = cur.iter_dup_of::<(), [u8; 4]>(b"key1");
let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
assert_eq!(vals, vec![*b"val1", *b"val2", *b"val3"]);
@@ -66,9 +66,9 @@ fn test_put_get_del_multi() {
txn.commit().unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut cur = txn.cursor(&db).unwrap();
let mut cur = txn.cursor(dbi).unwrap();
let iter = cur.iter_dup_of::<(), [u8; 4]>(b"key1");
let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
assert_eq!(vals, vec![*b"val1", *b"val3"]);
@@ -103,9 +103,9 @@ fn test_reserve() {
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut writer = txn.reserve(&db, b"key1", 4, WriteFlags::empty()).unwrap();
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
}
txn.commit().unwrap();
@@ -148,13 +148,13 @@ fn test_clear_db() {
{
let txn = env.begin_rw_txn().unwrap();
txn.put(txn.open_db(None).unwrap().dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
{
let txn = env.begin_rw_txn().unwrap();
txn.clear_db(txn.open_db(None).unwrap().dbi()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
let txn = env.begin_ro_txn().unwrap();
@@ -178,16 +178,16 @@ fn test_drop_db() {
.unwrap();
// Workaround for MDBX dbi drop issue
txn.create_db(Some("canary"), DatabaseFlags::empty()).unwrap();
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(Some("test")).unwrap();
let dbi = txn.open_db(Some("test")).unwrap().dbi();
unsafe {
txn.drop_db(db).unwrap();
txn.drop_db(dbi).unwrap();
}
assert!(matches!(txn.open_db(Some("test")).unwrap_err(), Error::NotFound));
assert!(!txn.commit().unwrap().0);
txn.commit().unwrap();
}
}
@@ -291,8 +291,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 3);
}
@@ -304,8 +304,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 1);
}
@@ -318,8 +318,8 @@ fn test_stat() {
{
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let stat = txn.db_stat(&db).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 4);
}
}
@@ -331,20 +331,22 @@ fn test_stat_dupsort() {
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key3", b"val3", WriteFlags::empty()).unwrap();
let dbi = db.dbi();
txn.put(dbi, b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key1", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key2", b"val3", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val1", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val2", WriteFlags::empty()).unwrap();
txn.put(dbi, b"key3", b"val3", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 9);
}
@@ -356,7 +358,8 @@ fn test_stat_dupsort() {
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 5);
}
@@ -369,7 +372,8 @@ fn test_stat_dupsort() {
{
let txn = env.begin_ro_txn().unwrap();
let stat = txn.db_stat(&txn.open_db(None).unwrap()).unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
let stat = txn.db_stat(dbi).unwrap();
assert_eq!(stat.entries(), 8);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -709,7 +709,7 @@ mod tests {
Arc::new(chain_spec),
DatabaseArguments::new(Default::default()),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(&rocksdb_path).with_default_tables().build().unwrap(),
RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
)
.unwrap();
let provider = factory.provider().unwrap();

View File

@@ -126,7 +126,7 @@ impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut,
impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
/// Commit database transaction and static file if it exists.
pub fn commit(self) -> ProviderResult<bool> {
pub fn commit(self) -> ProviderResult<()> {
self.0.commit()
}
@@ -188,6 +188,7 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
@@ -402,6 +403,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
@@ -460,7 +462,9 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
thread::scope(|s| {
@@ -471,7 +475,8 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok::<_, ProviderError>(start.elapsed())
});
// RocksDB writes (batches are pushed to pending_batches inside write_blocks_data)
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
s.spawn(|| {
let start = Instant::now();
@@ -482,6 +487,39 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// MDBX writes
let mdbx_start = Instant::now();
// Collect all transaction hashes across all blocks, sort them, and write in batch
if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
{
let start = Instant::now();
let mut all_tx_hashes = Vec::new();
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
let mut tx_num = tx_nums[i];
for transaction in recovered_block.body().transactions_iter() {
all_tx_hashes.push((*transaction.tx_hash(), tx_num));
tx_num += 1;
}
}
// Sort by hash for optimal MDBX insertion performance
all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
// Write all transaction hash numbers in a single batch
self.with_rocksdb_batch(|batch| {
let mut tx_hash_writer =
EitherWriter::new_transaction_hash_numbers(self, batch)?;
tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
Ok(((), raw_batch))
})?;
self.metrics.record_duration(
metrics::Action::InsertTransactionHashNumbers,
start.elapsed(),
);
}
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
@@ -543,14 +581,11 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
.join()
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
// Wait for RocksDB thread (batches already pushed to pending_batches)
// Wait for RocksDB thread
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(handle) = rocksdb_handle {
let elapsed = handle.join().expect("RocksDB thread panicked")?;
timings.rocksdb = elapsed;
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
}
#[cfg(not(all(unix, feature = "rocksdb")))]
let _ = rocksdb_handle;
timings.total = total_start.elapsed();
@@ -561,7 +596,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
})
}
/// Writes MDBX-only data for a block (eg. indices, lookups, and senders).
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
///
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
#[instrument(level = "debug", target = "providers::db", skip_all)]
@@ -590,21 +625,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
// Write tx hash numbers to MDBX if not handled by RocksDB and not fully pruned
if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
{
let start = Instant::now();
let mut cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
let mut tx_num = first_tx_num;
for transaction in block.body().transactions_iter() {
cursor.upsert(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
self.metrics
.record_duration(metrics::Action::InsertTransactionHashNumbers, start.elapsed());
}
self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
@@ -3577,7 +3597,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
fn commit(self) -> ProviderResult<bool> {
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
// truncate the static files according to the
@@ -3619,7 +3639,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.metrics.record_commit(&timings);
}
Ok(true)
Ok(())
}
}

View File

@@ -16,8 +16,8 @@ pub use static_file::{
mod state;
pub use state::{
historical::{
compute_history_rank, needs_prev_shard_check, HistoricalStateProvider,
HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
history_info, needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef,
HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},
@@ -38,7 +38,7 @@ pub use consistent::ConsistentProvider;
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
pub(crate) mod rocksdb;
pub use rocksdb::{RocksDBBuilder, RocksDBProvider};
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].

View File

@@ -164,16 +164,15 @@ impl RocksDBProvider {
self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
}
(None, None) => {
// Both MDBX and static files are empty - this is expected on first run.
// Log a warning but don't require unwind to 0, as the pipeline will
// naturally populate the data during sync.
// Both MDBX and static files are empty.
// If checkpoint says we should have data, that's an inconsistency.
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"TransactionHashNumbers: no transaction data exists but checkpoint is set. \
This is expected on first run with RocksDB enabled."
"Checkpoint set but no transaction data exists, unwind needed"
);
return Ok(Some(0));
}
}
}
@@ -264,35 +263,16 @@ impl RocksDBProvider {
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// entries
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// Sentinel entries represent "open" shards that haven't been completed yet,
// so no actual history has been indexed.
if !found_non_sentinel {
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"StoragesHistory has only sentinel entries but checkpoint is set. \
This is expected on first run with RocksDB enabled."
);
}
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
@@ -316,16 +296,10 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table - this is expected on first run / migration.
// Log a warning but don't require unwind to 0, as the pipeline will
// naturally populate the table during sync.
// Empty RocksDB table
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"StoragesHistory is empty but checkpoint is set. \
This is expected on first run with RocksDB enabled."
);
// Stage says we should have data but we don't
return Ok(Some(0));
}
Ok(None)
}
@@ -403,35 +377,16 @@ impl RocksDBProvider {
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// entries
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// Sentinel entries represent "open" shards that haven't been completed yet,
// so no actual history has been indexed.
if !found_non_sentinel {
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"AccountsHistory has only sentinel entries but checkpoint is set. \
This is expected on first run with RocksDB enabled."
);
}
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
@@ -458,16 +413,10 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table - this is expected on first run / migration.
// Log a warning but don't require unwind to 0, as the pipeline will
// naturally populate the table during sync.
// Empty RocksDB table
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"AccountsHistory is empty but checkpoint is set. \
This is expected on first run with RocksDB enabled."
);
// Stage says we should have data but we don't
return Ok(Some(0));
}
Ok(None)
}
@@ -593,7 +542,7 @@ mod tests {
}
#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
@@ -617,10 +566,10 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB is empty but checkpoint says block 100 was processed
// This means RocksDB is missing data and we need to unwind to rebuild
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Empty data with checkpoint is treated as first run");
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
}
#[test]
@@ -701,7 +650,7 @@ mod tests {
}
#[test]
fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
@@ -725,10 +674,9 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB is empty but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
}
#[test]
@@ -1030,97 +978,6 @@ mod tests {
);
}
#[test]
fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
fn test_check_consistency_storages_history_behind_checkpoint_single_entry() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
@@ -1278,7 +1135,7 @@ mod tests {
}
#[test]
fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
@@ -1302,10 +1159,9 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB is empty but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
}
#[test]

View File

@@ -8,8 +8,8 @@ use strum::{EnumIter, IntoEnumIterator};
const ROCKSDB_TABLES: &[&str] = &[
Tables::TransactionHashNumbers.name(),
Tables::AccountsHistory.name(),
Tables::StoragesHistory.name(),
Tables::AccountsHistory.name(),
];
/// Metrics for the `RocksDB` provider.

View File

@@ -4,5 +4,5 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBBatch, RocksDBWriteCtx, RocksTx};
pub use provider::{RocksDBBuilder, RocksDBProvider};
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -1,15 +1,11 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use itertools::Itertools;
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
StorageSettings,
},
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
@@ -430,32 +426,6 @@ impl RocksDBProvider {
})
}
/// Clears all entries from the specified table.
///
/// This iterates through all entries in the table and deletes them.
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
let cf = this.get_cf_handle::<T>()?;
let iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
for result in iter {
let (key, _) = result.map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
this.0.db.delete_cf(cf, &key).map_err(|e| {
ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
}
Ok(())
})
}
/// Gets the first (smallest key) entry from the specified table.
pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
@@ -540,22 +510,6 @@ impl RocksDBProvider {
})
}
/// Creates a reverse iterator over a table starting from the given key.
///
/// Iterates from the key towards the beginning of the table.
pub fn iter_from_reverse<T: Table>(
&self,
key: T::Key,
) -> ProviderResult<RocksDBIterReverse<'_, T>> {
let cf = self.get_cf_handle::<T>()?;
let encoded_key = key.encode();
let iter = self
.0
.db
.iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Reverse));
Ok(RocksDBIterReverse { inner: iter, _marker: std::marker::PhantomData })
}
/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on
@@ -638,10 +592,10 @@ impl RocksDBProvider {
account_history.entry(address).or_default().push(block_number);
}
}
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
@@ -666,10 +620,10 @@ impl RocksDBProvider {
}
}
}
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
batch.append_storage_history_shard(address, slot, indices)?;
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
@@ -760,91 +714,6 @@ impl<'a> RocksDBBatch<'a> {
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
/// Appends indices to an account history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
pub fn append_account_history_shard(
&mut self,
address: Address,
indices: impl IntoIterator<Item = u64>,
) -> ProviderResult<()> {
let last_key = ShardedKey::new(address, u64::MAX);
let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, highest_block_number),
&shard,
)?;
}
Ok(())
}
/// Appends indices to a storage history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
pub fn append_storage_history_shard(
&mut self,
address: Address,
storage_key: B256,
indices: impl IntoIterator<Item = u64>,
) -> ProviderResult<()> {
let last_key = StorageShardedKey::last(address, storage_key);
let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::StoragesHistory>(
StorageShardedKey::new(address, storage_key, highest_block_number),
&shard,
)?;
}
Ok(())
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -1085,9 +954,21 @@ impl<'db> RocksTx<'db> {
};
};
let chunk = BlockNumberList::decompress(value_bytes)?;
let (rank, found_block) = compute_history_rank(&chunk, block_number);
// Check if this is before the first write by looking at the previous shard.
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// Lazy check for previous shard - only called when needed.
// If we can step to a previous shard for this same key, history already exists,
// so the target block is not before the first write.
let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
iter.prev();
Self::raw_iter_status_ok(&iter)?;
@@ -1161,60 +1042,6 @@ impl<T: Table> Iterator for RocksDBIter<'_, T> {
}
}
/// Result type for raw iterator items.
type RocksDBRawIterResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
/// Decodes an iterator result from `RocksDB` into a table key-value pair.
fn decode_iter_result<T: Table>(
result: RocksDBRawIterResult,
) -> Option<ProviderResult<(T::Key, T::Value)>> {
let (key_bytes, value_bytes) = match result {
Ok(kv) => kv,
Err(e) => {
return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))))
}
};
// Decode key
let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
Ok(k) => k,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
// Decompress value
let value = match T::Value::decompress(&value_bytes) {
Ok(v) => v,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
Some(Ok((key, value)))
}
/// Reverse iterator over a `RocksDB` table (non-transactional).
///
/// Yields decoded `(Key, Value)` pairs in reverse key order.
pub struct RocksDBIterReverse<'db, T: Table> {
inner: rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>,
_marker: std::marker::PhantomData<T>,
}
impl<T: Table> fmt::Debug for RocksDBIterReverse<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBIterReverse").field("table", &T::NAME).finish_non_exhaustive()
}
}
impl<T: Table> Iterator for RocksDBIterReverse<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
decode_iter_result::<T>(self.inner.next()?)
}
}
/// Iterator over a `RocksDB` table within a transaction.
///
/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
@@ -1599,101 +1426,9 @@ mod tests {
assert_eq!(last, Some((20, b"value_20".to_vec())));
}
#[test]
fn test_account_history_info_single_shard() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create a single shard with blocks [100, 200, 300] and highest_block = u64::MAX
// This is the "last shard" invariant
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten (before first entry, no prev shard)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 300: should return InChangeset(300) - exact match means look at
// changeset at that block for the previous value
let result = tx.account_history_info(address, 300, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(300));
// Query for block 500: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_multiple_shards() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create two shards: first shard ends at block 500, second is the last shard
let chunk1 = IntegerList::new([100, 200, 300, 400, 500]).unwrap();
let shard_key1 = ShardedKey::new(address, 500);
provider.put::<tables::AccountsHistory>(shard_key1, &chunk1).unwrap();
let chunk2 = IntegerList::new([600, 700, 800]).unwrap();
let shard_key2 = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
let tx = provider.tx();
// Query for block 50: should return NotYetWritten (before first shard, no prev)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 150: should find block 200 in first shard's changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 550: should find block 600 in second shard's changeset
// prev() should detect first shard exists
let result = tx.account_history_info(address, 550, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(600));
// Query for block 900: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 900, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_no_history() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address1 = Address::from([0x42; 20]);
let address2 = Address::from([0x43; 20]);
// Only add history for address1
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address1, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for address2 (no history exists): should return NotYetWritten
let result = tx.account_history_info(address2, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this RocksDB-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();
@@ -1717,39 +1452,4 @@ mod tests {
tx.rollback().unwrap();
}
#[test]
fn test_storage_history_info() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
// Create a single shard for this storage slot
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = StorageShardedKey::new(address, storage_key, u64::MAX);
provider.put::<tables::StoragesHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.storage_history_info(address, storage_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten
let result = tx.storage_history_info(address, storage_key, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 500: should return InPlainState
let result = tx.storage_history_info(address, storage_key, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
// Query for different storage key (no history): should return NotYetWritten
let other_key = B256::from([0x02; 32]);
let result = tx.storage_history_info(address, other_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
}

View File

@@ -2,19 +2,13 @@
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! Operations will produce errors if actually attempted.
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use reth_db_api::{
models::StorageSettings,
table::{Encode, Table},
};
use reth_db_api::models::StorageSettings;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::LogLevel,
provider::{ProviderError::UnsupportedProvider, ProviderResult},
};
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
use std::{path::Path, sync::Arc};
/// Pending `RocksDB` batches type alias (stub - uses unit type).
@@ -22,7 +16,8 @@ pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
pub struct RocksDBWriteCtx {
#[allow(dead_code)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
@@ -36,16 +31,13 @@ pub struct RocksDBWriteCtx {
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
/// route to MDBX instead.
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
///
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
@@ -55,152 +47,22 @@ impl RocksDBProvider {
RocksDBBuilder::new(path)
}
/// Get a value from `RocksDB` (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Delete a value from `RocksDB` (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Clear all entries from a table (stub implementation).
///
/// Returns `Ok(())` since the stub behaves as if the database is empty.
pub const fn clear<T: Table>(&self) -> ProviderResult<()> {
Ok(())
}
/// Write a batch of operations (stub implementation).
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
where
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
{
Err(UnsupportedProvider)
}
/// Creates a new transaction (stub implementation).
pub const fn tx(&self) -> RocksTx {
RocksTx
}
/// Creates a new batch for atomic writes (stub implementation).
pub const fn batch(&self) -> RocksDBBatch {
RocksDBBatch
}
/// Gets the first key-value pair from a table (stub implementation).
pub const fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Gets the last key-value pair from a table (stub implementation).
pub const fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Creates an iterator for the specified table (stub implementation).
///
/// Returns an empty iterator. This is consistent with `first()` and `last()` returning
/// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable.
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
Ok(RocksDBIter { _marker: std::marker::PhantomData })
}
/// Check consistency of `RocksDB` tables (stub implementation).
///
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
pub const fn check_consistency<Provider>(
&self,
_provider: &Provider,
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
/// Writes all `RocksDB` data for multiple blocks (stub implementation).
///
/// No-op since `RocksDB` is not available on this platform.
pub fn write_blocks_data<N>(
&self,
_blocks: &[reth_chain_state::ExecutedBlock<N>],
_tx_nums: &[alloy_primitives::TxNumber],
_ctx: RocksDBWriteCtx,
) -> ProviderResult<()>
where
N: reth_node_types::NodePrimitives,
{
Ok(())
}
}
/// A stub batch writer for `RocksDB` on non-Unix platforms.
/// A stub batch writer for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBatch;
impl RocksDBBatch {
/// Puts a value into the batch (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value into the batch using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the batch (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Commits the batch (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` (non-transactional).
#[derive(Debug)]
pub struct RocksDBIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksDBIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// A stub builder for `RocksDB` on non-Unix platforms.
/// A stub builder for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBuilder;
@@ -211,7 +73,7 @@ impl RocksDBBuilder {
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T: Table>(self) -> Self {
pub const fn with_table<T>(self) -> Self {
self
}
@@ -249,71 +111,3 @@ impl RocksDBBuilder {
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
impl RocksTx {
/// Gets a value from the specified table (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Gets a value using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Puts a value into the specified table (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the specified table (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Creates an iterator for the specified table (stub implementation).
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Creates an iterator starting from the given key (stub implementation).
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Commits the transaction (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Rolls back the transaction (stub implementation).
pub const fn rollback(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` transactions.
#[derive(Debug)]
pub struct RocksTxIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksTxIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}

View File

@@ -1,14 +1,20 @@
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError,
StateProvider, StateRootProvider,
};
use alloy_eips::merge::EPOCH_SLOTS;
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::Table,
tables,
transaction::DbTx,
BlockNumberList,
};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageRootProvider, StorageSettingsCache,
BlockNumReader, BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -121,47 +127,38 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Self { provider, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
/// Lookup an account in the `AccountsHistory` table
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo> {
if !self.lowest_available_blocks.is_account_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
self.provider.with_rocksdb_tx(|rocks_tx_ref| {
let mut reader = EitherReader::new_accounts_history(self.provider, rocks_tx_ref)?;
reader.account_history_info(
address,
self.block_number,
self.lowest_available_blocks.account_history_block_number,
)
})
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info_lookup::<tables::AccountsHistory, _>(
history_key,
|key| key.key == address,
self.lowest_available_blocks.account_history_block_number,
)
}
/// Lookup a storage key in the `StoragesHistory` table using `EitherReader`.
/// Lookup a storage key in the `StoragesHistory` table
pub fn storage_history_lookup(
&self,
address: Address,
storage_key: StorageKey,
) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
) -> ProviderResult<HistoryInfo> {
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
self.provider.with_rocksdb_tx(|rocks_tx_ref| {
let mut reader = EitherReader::new_storages_history(self.provider, rocks_tx_ref)?;
reader.storage_history_info(
address,
storage_key,
self.block_number,
self.lowest_available_blocks.storage_history_block_number,
)
})
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info_lookup::<tables::StoragesHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_available_blocks.storage_history_block_number,
)
}
/// Checks and returns `true` if distance to historical block exceeds the provided limit.
@@ -207,6 +204,25 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info_lookup<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
history_info::<T, K, _>(
&mut cursor,
key,
self.block_number,
key_filter,
lowest_available_block_number,
)
}
/// Set the lowest block number at which the account history is available.
pub const fn with_lowest_available_account_history_block_number(
mut self,
@@ -232,14 +248,8 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'_, Provi
}
}
impl<
Provider: DBProvider
+ BlockNumReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> AccountReader for HistoricalStateProviderRef<'_, Provider>
impl<Provider: DBProvider + BlockNumReader + ChangeSetReader> AccountReader
for HistoricalStateProviderRef<'_, Provider>
{
/// Get basic account information.
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
@@ -394,15 +404,8 @@ impl<Provider> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provid
}
}
impl<
Provider: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> StateProvider for HistoricalStateProviderRef<'_, Provider>
impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader> StateProvider
for HistoricalStateProviderRef<'_, Provider>
{
/// Get storage.
fn storage(
@@ -492,7 +495,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> HistoricalStatePro
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -522,29 +525,6 @@ impl LowestAvailableBlocks {
}
}
/// Computes the rank and selected block from a history shard chunk.
///
/// Given a `BlockNumberList` (history shard) and a target block number, this function:
/// 1. Finds the rank of the first entry at or before `block_number`
/// 2. Adjusts the rank if the found entry equals `block_number` (so we get strictly before)
/// 3. Returns `(rank, found_block)` for use with [`needs_prev_shard_check`] and
/// [`HistoryInfo::from_lookup`]
///
/// This logic is shared between MDBX cursor-based lookups and `RocksDB` iterator lookups.
#[inline]
pub fn compute_history_rank(
chunk: &reth_db_api::BlockNumberList,
block_number: BlockNumber,
) -> (u64, Option<u64>) {
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before
// our block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
(rank, chunk.select(rank))
}
/// Checks if a previous shard lookup is needed to determine if we're before the first write.
///
/// Returns `true` when `rank == 0` (first entry in shard) and the found block doesn't match
@@ -558,14 +538,67 @@ pub fn needs_prev_shard_check(
rank == 0 && found_block != Some(block_number)
}
/// Generic history lookup for sharded history tables.
///
/// Seeks to the shard containing `block_number`, verifies the key via `key_filter`,
/// and checks previous shard to detect if we're before the first write.
pub fn history_info<T, K, C>(
cursor: &mut C,
key: K,
block_number: BlockNumber,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
C: DbCursorRO<T>,
{
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(k, _)| key_filter(k)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
let is_before_first_write = needs_prev_shard_check(rank, found_block, block_number) &&
!cursor.prev()?.is_some_and(|(k, _)| key_filter(&k));
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
#[cfg(test)]
mod tests {
use super::needs_prev_shard_check;
use crate::{
providers::state::historical::{HistoryInfo, LowestAvailableBlocks},
test_utils::create_test_provider_factory,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, RocksDBProviderFactory,
StateProvider,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
};
use alloy_primitives::{address, b256, Address, B256, U256};
use reth_db_api::{
@@ -577,7 +610,6 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
@@ -589,13 +621,7 @@ mod tests {
const fn assert_state_provider<T: StateProvider>() {}
#[expect(dead_code)]
const fn assert_historical_state_provider<
T: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
T: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader,
>() {
assert_state_provider::<HistoricalStateProvider<T>>();
}

View File

@@ -267,7 +267,7 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + 'static> DBProvider
self.tx
}
fn commit(self) -> ProviderResult<bool> {
fn commit(self) -> ProviderResult<()> {
Ok(self.tx.commit()?)
}

View File

@@ -27,7 +27,7 @@ impl<C: Send + Sync, N: NodePrimitives> StaticFileProviderFactory for NoopProvid
impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<C, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).with_default_tables().build().unwrap()
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]

View File

@@ -1,5 +1,4 @@
use crate::{either_writer::RocksTxRefArg, providers::RocksDBProvider};
use reth_storage_errors::provider::ProviderResult;
use crate::providers::RocksDBProvider;
/// `RocksDB` provider factory.
///
@@ -14,21 +13,4 @@ pub trait RocksDBProviderFactory {
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
/// Executes a closure with a `RocksDB` transaction for reading.
///
/// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
where
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
{
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let tx = rocksdb.tx();
f(&tx)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
f(())
}
}

View File

@@ -1357,7 +1357,7 @@ where
self
}
fn commit(self) -> ProviderResult<bool> {
fn commit(self) -> ProviderResult<()> {
unimplemented!("commit not supported for RPC provider")
}

View File

@@ -37,7 +37,7 @@ pub trait DBProvider: Sized {
}
/// Commit database transaction
fn commit(self) -> ProviderResult<bool>;
fn commit(self) -> ProviderResult<()>;
/// Returns a reference to prune modes.
fn prune_modes_ref(&self) -> &PruneModes;

View File

@@ -639,7 +639,7 @@ impl<ChainSpec: Send + Sync, N: NodePrimitives> DBProvider for NoopProvider<Chai
&self.prune_modes
}
fn commit(self) -> ProviderResult<bool> {
fn commit(self) -> ProviderResult<()> {
use reth_db_api::transaction::DbTx;
Ok(self.tx.commit()?)

View File

@@ -14,6 +14,7 @@ opentelemetry_sdk = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true, features = ["grpc-tonic"] }
opentelemetry-semantic-conventions = { workspace = true, optional = true }
opentelemetry-appender-tracing = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
tracing-subscriber.workspace = true
tracing.workspace = true
@@ -36,3 +37,10 @@ otlp = [
"opentelemetry-semantic-conventions",
"tracing-opentelemetry",
]
otlp-logs = [
"otlp",
"opentelemetry-appender-tracing",
"opentelemetry-otlp/logs",
"opentelemetry_sdk/logs",
]

View File

@@ -1,10 +1,12 @@
#![cfg(feature = "otlp")]
//! Provides a tracing layer for `OpenTelemetry` that exports spans to an OTLP endpoint.
//! Provides tracing layers for `OpenTelemetry` that export spans, logs, and metrics to an OTLP
//! endpoint.
//!
//! This module simplifies the integration of `OpenTelemetry` tracing with OTLP export in Rust
//! applications. It allows for easily capturing and exporting distributed traces to compatible
//! backends like Jaeger, Zipkin, or any other OpenTelemetry-compatible tracing system.
//! This module simplifies the integration of `OpenTelemetry` with OTLP export in Rust
//! applications. It allows for easily capturing and exporting distributed traces, logs,
//! and metrics to compatible backends like Jaeger, Zipkin, or any other
//! OpenTelemetry-compatible system.
use clap::ValueEnum;
use eyre::ensure;
@@ -24,6 +26,7 @@ use url::Url;
// Otlp http endpoint is expected to end with this path.
// See also <https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_traces_endpoint>.
const HTTP_TRACE_ENDPOINT: &str = "/v1/traces";
const HTTP_LOGS_ENDPOINT: &str = "/v1/logs";
/// Creates a tracing [`OpenTelemetryLayer`] that exports spans to an OTLP endpoint.
///
@@ -62,6 +65,42 @@ where
Ok(tracing_opentelemetry::layer().with_tracer(tracer))
}
/// Creates a tracing layer that exports logs to an OTLP endpoint.
///
/// This layer bridges logs emitted via the `tracing` crate to `OpenTelemetry` logs.
#[cfg(feature = "otlp-logs")]
pub fn log_layer(
otlp_config: OtlpLogsConfig,
) -> eyre::Result<
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge<
opentelemetry_sdk::logs::SdkLoggerProvider,
opentelemetry_sdk::logs::SdkLogger,
>,
> {
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::SdkLoggerProvider;
let resource = build_resource(otlp_config.service_name.clone());
let log_builder = LogExporter::builder();
let log_exporter = match otlp_config.protocol {
OtlpProtocol::Http => {
log_builder.with_http().with_endpoint(otlp_config.endpoint.as_str()).build()?
}
OtlpProtocol::Grpc => {
log_builder.with_tonic().with_endpoint(otlp_config.endpoint.as_str()).build()?
}
};
let logger_provider = SdkLoggerProvider::builder()
.with_resource(resource)
.with_batch_exporter(log_exporter)
.build();
Ok(opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&logger_provider))
}
/// Configuration for OTLP trace export.
#[derive(Debug, Clone)]
pub struct OtlpConfig {
@@ -115,6 +154,43 @@ impl OtlpConfig {
}
}
/// Configuration for OTLP logs export.
#[derive(Debug, Clone)]
pub struct OtlpLogsConfig {
/// Service name for log identification
service_name: String,
/// Otlp endpoint URL
endpoint: Url,
/// Transport protocol, HTTP or gRPC
protocol: OtlpProtocol,
}
impl OtlpLogsConfig {
/// Creates a new OTLP logs configuration.
pub fn new(
service_name: impl Into<String>,
endpoint: Url,
protocol: OtlpProtocol,
) -> eyre::Result<Self> {
Ok(Self { service_name: service_name.into(), endpoint, protocol })
}
/// Returns the service name.
pub fn service_name(&self) -> &str {
&self.service_name
}
/// Returns the OTLP endpoint URL.
pub const fn endpoint(&self) -> &Url {
&self.endpoint
}
/// Returns the transport protocol.
pub const fn protocol(&self) -> OtlpProtocol {
self.protocol
}
}
// Builds OTLP resource with service information.
fn build_resource(service_name: impl Into<Value>) -> Resource {
Resource::builder()
@@ -145,23 +221,35 @@ pub enum OtlpProtocol {
}
impl OtlpProtocol {
/// Validate and correct the URL to match protocol requirements.
/// Validate and correct the URL to match protocol requirements for traces.
///
/// For HTTP: Ensures the path ends with `/v1/traces`, appending it if necessary.
/// For gRPC: Ensures the path does NOT include `/v1/traces`.
pub fn validate_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
self.validate_endpoint_with_path(url, HTTP_TRACE_ENDPOINT)
}
/// Validate and correct the URL to match protocol requirements for logs.
///
/// For HTTP: Ensures the path ends with `/v1/logs`, appending it if necessary.
/// For gRPC: Ensures the path does NOT include `/v1/logs`.
pub fn validate_logs_endpoint(&self, url: &mut Url) -> eyre::Result<()> {
self.validate_endpoint_with_path(url, HTTP_LOGS_ENDPOINT)
}
fn validate_endpoint_with_path(&self, url: &mut Url, http_path: &str) -> eyre::Result<()> {
match self {
Self::Http => {
if !url.path().ends_with(HTTP_TRACE_ENDPOINT) {
if !url.path().ends_with(http_path) {
let path = url.path().trim_end_matches('/');
url.set_path(&format!("{}{}", path, HTTP_TRACE_ENDPOINT));
url.set_path(&format!("{}{}", path, http_path));
}
}
Self::Grpc => {
ensure!(
!url.path().ends_with(HTTP_TRACE_ENDPOINT),
!url.path().ends_with(http_path),
"OTLP gRPC endpoint should not include {} path, got: {}",
HTTP_TRACE_ENDPOINT,
http_path,
url
);
}

View File

@@ -33,4 +33,5 @@ rolling-file.workspace = true
[features]
default = ["otlp"]
otlp = ["reth-tracing-otlp"]
otlp-logs = ["reth-tracing-otlp/otlp-logs"]
tracy = ["tracing-tracy", "tracy-client"]

View File

@@ -1,4 +1,6 @@
use crate::{formatter::LogFormat, LayerInfo};
#[cfg(feature = "otlp-logs")]
use reth_tracing_otlp::{log_layer, OtlpLogsConfig};
#[cfg(feature = "otlp")]
use reth_tracing_otlp::{span_layer, OtlpConfig};
use rolling_file::{RollingConditionBasic, RollingFileAppender};
@@ -168,6 +170,22 @@ impl Layers {
Ok(())
}
/// Add OTLP logs layer to the layer collection
#[cfg(feature = "otlp-logs")]
pub fn with_log_layer(
&mut self,
otlp_config: OtlpLogsConfig,
filter: EnvFilter,
) -> eyre::Result<()> {
let log_layer = log_layer(otlp_config)
.map_err(|e| eyre::eyre!("Failed to build OTLP log exporter {}", e))?
.with_filter(filter);
self.add_layer(log_layer);
Ok(())
}
}
/// Holds configuration information for file logging.

View File

@@ -141,3 +141,7 @@ harness = false
name = "hashed_state"
harness = false
required-features = ["rayon"]
[[bench]]
name = "merge_strategies"
harness = false

View File

@@ -0,0 +1,131 @@
#![allow(missing_docs, unreachable_pub)]
use alloy_primitives::{B256, U256};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use reth_primitives_traits::Account;
use reth_trie_common::{HashedPostState, HashedPostStateSorted, HashedStorage, HashedStorageSorted};
use std::collections::HashMap;
fn keccak256_mock(n: u64) -> B256 {
let mut bytes = [0u8; 32];
bytes[24..].copy_from_slice(&n.to_be_bytes());
for (i, item) in bytes.iter_mut().enumerate() {
*item ^= ((n.wrapping_mul(0x9e3779b97f4a7c15) >> (i * 8)) & 0xff) as u8;
}
B256::from(bytes)
}
fn generate_hashed_post_state_sorted(
base_offset: u64,
num_accounts: usize,
storage_slots_per_account: usize,
) -> HashedPostStateSorted {
let mut state = HashedPostState::default();
for i in 0..num_accounts {
let hashed_address = keccak256_mock(base_offset + i as u64);
let account = Some(Account {
nonce: i as u64,
balance: U256::from(i * 1000),
bytecode_hash: None,
});
state.accounts.insert(hashed_address, account);
if storage_slots_per_account > 0 {
let mut storage = HashedStorage::new(false);
for j in 0..storage_slots_per_account {
let slot = keccak256_mock(base_offset * 1000 + (i * 100 + j) as u64);
storage.storage.insert(slot, U256::from(j));
}
state.storages.insert(hashed_address, storage);
}
}
state.into_sorted()
}
fn generate_states(
num_sources: usize,
items_per_source: usize,
) -> Vec<HashedPostStateSorted> {
(0..num_sources)
.map(|i| {
generate_hashed_post_state_sorted(
(i * 1_000_000) as u64,
items_per_source,
5,
)
})
.collect()
}
fn extend_ref_chain(states: &[HashedPostStateSorted]) -> HashedPostStateSorted {
if states.is_empty() {
return HashedPostStateSorted::default();
}
let mut result = states[0].clone();
for state in &states[1..] {
result.extend_ref(state);
}
result
}
fn bench_merge_strategies(c: &mut Criterion) {
let mut group = c.benchmark_group("HashedPostStateSorted_merge");
// Test various source counts and items per source
// We want to find where extend_ref beats kway_merge
let configs = [
// (num_sources, items_per_source)
(5, 100),
(5, 500),
(5, 1000),
(5, 2000),
(5, 5000),
(5, 10000),
(10, 100),
(10, 500),
(10, 1000),
(10, 2000),
(10, 5000),
(20, 100),
(20, 500),
(20, 1000),
(20, 2000),
];
for (num_sources, items_per_source) in configs {
let states = generate_states(num_sources, items_per_source);
let total_items = num_sources * items_per_source;
let label = format!("{}src_{}items", num_sources, items_per_source);
group.throughput(Throughput::Elements(total_items as u64));
group.bench_with_input(
BenchmarkId::new("kway_merge", &label),
&states,
|b, states| {
b.iter(|| {
let result = HashedPostStateSorted::merge_batch(black_box(states.iter()));
black_box(result)
});
},
);
group.bench_with_input(
BenchmarkId::new("extend_ref_new", &label),
&states,
|b, states| {
b.iter(|| {
let result = extend_ref_chain(black_box(states));
black_box(result)
});
},
);
}
group.finish();
}
criterion_group!(benches, bench_merge_strategies);
criterion_main!(benches);

View File

@@ -7,8 +7,10 @@ use alloy_trie::proof::AddedRemovedKeys;
/// Tracks added and removed keys across account and storage tries.
#[derive(Debug, Clone)]
pub struct MultiAddedRemovedKeys {
account: AddedRemovedKeys,
storages: B256Map<AddedRemovedKeys>,
/// Added and removed accounts.
pub account: AddedRemovedKeys,
/// Added and removed storage keys for each account.
pub storages: B256Map<AddedRemovedKeys>,
}
/// Returns [`AddedRemovedKeys`] with default parameters. This is necessary while we are not yet

View File

@@ -3,7 +3,7 @@ use core::ops::Not;
use crate::{
added_removed_keys::MultiAddedRemovedKeys,
prefix_set::{PrefixSetMut, TriePrefixSetsMut},
utils::extend_sorted_vec,
utils::{extend_sorted_vec, kway_merge_sorted},
KeyHasher, MultiProofTargets, Nibbles,
};
use alloc::{borrow::Cow, vec::Vec};
@@ -634,6 +634,87 @@ impl HashedPostStateSorted {
}
}
/// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(states: impl IntoIterator<Item = &'a Self>) -> Self {
let states: Vec<_> = states.into_iter().collect();
match states.len() {
0 => return Self::default(),
1 => return states[0].clone(),
n => {
let total_items: usize = states.iter().map(|s| s.total_len()).sum();
if n >= crate::utils::KWAY_MIN_SOURCES {
Self::merge_batch_kway(&states)
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = states[0].clone();
for state in &states[1..] {
result.extend_ref(state);
}
result
} else {
Self::merge_batch_hashmap(&states)
}
}
}
}
/// K-way merge implementation for many sources.
fn merge_batch_kway<'a>(states: &[&'a Self]) -> Self {
let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice()));
struct StorageAcc<'a> {
wiped: bool,
sealed: bool,
slices: Vec<&'a [(B256, U256)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
for state in states {
for (addr, storage) in &state.storages {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
wiped: false,
sealed: false,
slices: Vec::new(),
});
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_slots.as_slice());
if storage.wiped {
entry.wiped = true;
entry.sealed = true;
}
}
}
let storages = acc
.into_iter()
.map(|(addr, entry)| {
let storage_slots = kway_merge_sorted(entry.slices);
(addr, HashedStorageSorted { wiped: entry.wiped, storage_slots })
})
.collect();
Self { accounts, storages }
}
/// HashMap-based merge for small data with low per-element overhead.
fn merge_batch_hashmap<'a>(states: &[&'a Self]) -> Self {
let mut unsorted = HashedPostState::default();
for state in states.iter().rev() {
unsorted.extend_from_sorted(state);
}
unsorted.into_sorted()
}
/// Clears all accounts and storage data.
pub fn clear(&mut self) {
self.accounts.clear();
@@ -648,7 +729,7 @@ impl AsRef<Self> for HashedPostStateSorted {
}
/// Sorted hashed storage optimized for iterating during state trie calculation.
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct HashedStorageSorted {
/// Sorted collection of updated storage slots. [`U256::ZERO`] indicates a deleted value.
@@ -694,6 +775,53 @@ impl HashedStorageSorted {
// Extend the sorted non-zero valued slots
extend_sorted_vec(&mut self.storage_slots, &other.storage_slots);
}
/// Batch-merge sorted hashed storage. Iterator yields **newest to oldest**.
/// If any update is wiped, prior data is discarded.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
let wipe_idx = updates.iter().position(|u| u.wiped);
let relevant = wipe_idx.map_or(&updates[..], |idx| &updates[..=idx]);
match relevant.len() {
0 => return Self::default(),
1 => return Self { wiped: wipe_idx.is_some(), ..relevant[0].clone() },
n => {
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
let storage_slots = if n >= crate::utils::KWAY_MIN_SOURCES {
kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()))
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = relevant[0].storage_slots.clone();
for update in &relevant[1..] {
extend_sorted_vec(&mut result, &update.storage_slots);
}
result
} else {
Self::merge_batch_hashmap(relevant)
};
Self { wiped: wipe_idx.is_some(), storage_slots }
}
}
}
fn merge_batch_hashmap(updates: &[&Self]) -> Vec<(B256, U256)> {
let mut map: B256Map<U256> = B256Map::default();
for update in updates.iter().rev() {
for &(slot, value) in &update.storage_slots {
map.insert(slot, value);
}
}
let mut result: Vec<_> = map.into_iter().collect();
result.sort_unstable_by_key(|(k, _)| *k);
result
}
}
impl From<HashedStorageSorted> for HashedStorage {

View File

@@ -1,4 +1,7 @@
use crate::{utils::extend_sorted_vec, BranchNodeCompact, HashBuilder, Nibbles};
use crate::{
utils::{extend_sorted_vec, kway_merge_sorted},
BranchNodeCompact, HashBuilder, Nibbles,
};
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
vec::Vec,
@@ -23,6 +26,15 @@ pub struct TrieUpdates {
}
impl TrieUpdates {
/// Creates a new `TrieUpdates` with pre-allocated capacity.
pub fn with_capacity(account_nodes: usize, storage_tries: usize) -> Self {
Self {
account_nodes: HashMap::with_capacity_and_hasher(account_nodes, Default::default()),
removed_nodes: HashSet::with_capacity_and_hasher(account_nodes / 4, Default::default()),
storage_tries: B256Map::with_capacity_and_hasher(storage_tries, Default::default()),
}
}
/// Returns `true` if the updates are empty.
pub fn is_empty(&self) -> bool {
self.account_nodes.is_empty() &&
@@ -611,6 +623,86 @@ impl TrieUpdatesSorted {
self.account_nodes.clear();
self.storage_tries.clear();
}
/// Batch-merge sorted trie updates. Iterator yields **newest to oldest**.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
match updates.len() {
0 => return Self::default(),
1 => return updates[0].clone(),
n => {
let total_items: usize = updates.iter().map(|u| u.total_len()).sum();
if n >= crate::utils::KWAY_MIN_SOURCES {
Self::merge_batch_kway(&updates)
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = updates[0].clone();
for update in &updates[1..] {
result.extend_ref(update);
}
result
} else {
Self::merge_batch_hashmap(&updates)
}
}
}
}
fn merge_batch_kway<'a>(updates: &[&'a Self]) -> Self {
let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice()));
struct StorageAcc<'a> {
is_deleted: bool,
sealed: bool,
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
for update in updates {
for (addr, storage) in &update.storage_tries {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
is_deleted: false,
sealed: false,
slices: Vec::new(),
});
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_nodes.as_slice());
if storage.is_deleted {
entry.is_deleted = true;
entry.sealed = true;
}
}
}
let storage_tries = acc
.into_iter()
.map(|(addr, entry)| {
let storage_nodes = kway_merge_sorted(entry.slices);
(addr, StorageTrieUpdatesSorted { is_deleted: entry.is_deleted, storage_nodes })
})
.collect();
Self { account_nodes, storage_tries }
}
fn merge_batch_hashmap<'a>(updates: &[&'a Self]) -> Self {
let mut unsorted = TrieUpdates::default();
for update in updates.iter().rev() {
unsorted.extend_from_sorted(update);
}
unsorted.into_sorted()
}
}
impl AsRef<Self> for TrieUpdatesSorted {
@@ -702,6 +794,55 @@ impl StorageTrieUpdatesSorted {
extend_sorted_vec(&mut self.storage_nodes, &other.storage_nodes);
self.is_deleted = self.is_deleted || other.is_deleted;
}
/// Batch-merge sorted storage trie updates. Iterator yields **newest to oldest**.
/// If any update is deleted, older data is discarded.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
let del_idx = updates.iter().position(|u| u.is_deleted);
let relevant = del_idx.map_or(&updates[..], |idx| &updates[..=idx]);
match relevant.len() {
0 => return Self::default(),
1 => return Self { is_deleted: del_idx.is_some(), ..relevant[0].clone() },
n => {
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
let storage_nodes = if n >= crate::utils::KWAY_MIN_SOURCES {
kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()))
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = relevant[0].storage_nodes.clone();
for update in &relevant[1..] {
extend_sorted_vec(&mut result, &update.storage_nodes);
}
result
} else {
Self::merge_batch_hashmap(relevant)
};
Self { is_deleted: del_idx.is_some(), storage_nodes }
}
}
}
fn merge_batch_hashmap(
updates: &[&Self],
) -> Vec<(Nibbles, Option<BranchNodeCompact>)> {
let mut map: HashMap<Nibbles, Option<BranchNodeCompact>> = HashMap::default();
for update in updates.iter().rev() {
for (nibbles, node) in &update.storage_nodes {
map.insert(*nibbles, node.clone());
}
}
let mut result: Vec<_> = map.into_iter().collect();
result.sort_unstable_by_key(|(k, _)| *k);
result
}
}
/// Excludes empty nibbles from the given iterator.

View File

@@ -1,15 +1,49 @@
use alloc::vec::Vec;
use core::cmp::Ordering;
use itertools::Itertools;
/// Helper function to extend a sorted vector with another sorted vector.
/// Values from `other` take precedence for duplicate keys.
/// Minimum average items per source to prefer pairwise sorted merge over HashMap merge.
pub(crate) const PAIRWISE_MIN_AVG_ITEMS: usize = 2000;
/// Minimum number of sources that triggers k-way merge instead of pairwise sorted merge.
pub(crate) const KWAY_MIN_SOURCES: usize = 30;
/// Returns true if pairwise sorted merge is preferred over HashMap merge.
/// Returns false if k >= KWAY_MIN_SOURCES (use kway) or avg items < threshold (use HashMap).
#[inline]
pub(crate) fn prefer_sorted_merge(num_sources: usize, total_items: usize) -> bool {
if num_sources >= KWAY_MIN_SOURCES {
return false;
}
total_items >= PAIRWISE_MIN_AVG_ITEMS.saturating_mul(num_sources)
}
/// Merge sorted slices into a sorted `Vec`. First occurrence wins for duplicate keys.
///
/// This function efficiently merges two sorted vectors by:
/// 1. Iterating through the target vector with mutable references
/// 2. Using a peekable iterator for the other vector
/// 3. For each target item, processing other items that come before or equal to it
/// 4. Collecting items from other that need to be inserted
/// 5. Appending and re-sorting only if new items were added
/// Callers pass slices in priority order (index 0 = highest priority), so the first
/// slice's value for a key takes precedence over later slices.
pub(crate) fn kway_merge_sorted<'a, K, V>(
slices: impl IntoIterator<Item = &'a [(K, V)]>,
) -> Vec<(K, V)>
where
K: Ord + Clone + 'a,
V: Clone + 'a,
{
slices
.into_iter()
.filter(|s| !s.is_empty())
.enumerate()
// Merge by reference: (priority, &K, &V) - avoids cloning all elements upfront
.map(|(i, s)| s.iter().map(move |(k, v)| (i, k, v)))
.kmerge_by(|(i1, k1, _), (i2, k2, _)| (k1, i1) < (k2, i2))
.dedup_by(|(_, k1, _), (_, k2, _)| *k1 == *k2)
// Clone only surviving elements after dedup
.map(|(_, k, v)| (k.clone(), v.clone()))
.collect()
}
/// Extend a sorted vector with another sorted vector using O(n+m) merge.
/// Values from `other` take precedence for duplicate keys.
pub(crate) fn extend_sorted_vec<K, V>(target: &mut Vec<(K, V)>, other: &[(K, V)])
where
K: Clone + Ord,
@@ -19,37 +53,45 @@ where
return;
}
let mut other_iter = other.iter().peekable();
let mut to_insert = Vec::new();
if target.is_empty() {
target.extend_from_slice(other);
return;
}
// Iterate through target and update/collect items from other
for target_item in target.iter_mut() {
while let Some(other_item) = other_iter.peek() {
match other_item.0.cmp(&target_item.0) {
Ordering::Less => {
// Other item comes before current target item, collect it
to_insert.push(other_iter.next().unwrap().clone());
}
Ordering::Equal => {
// Same key, update target with other's value
target_item.1 = other_iter.next().unwrap().1.clone();
break;
}
Ordering::Greater => {
// Other item comes after current target item, keep target unchanged
break;
}
// Fast path: non-overlapping ranges - just append
if target.last().map(|(k, _)| k) < other.first().map(|(k, _)| k) {
target.extend_from_slice(other);
return;
}
// Move ownership of target to avoid cloning owned elements
let left = core::mem::take(target);
let mut out = Vec::with_capacity(left.len() + other.len());
let mut a = left.into_iter().peekable();
let mut b = other.iter().peekable();
while let (Some(aa), Some(bb)) = (a.peek(), b.peek()) {
match aa.0.cmp(&bb.0) {
Ordering::Less => {
out.push(a.next().unwrap());
}
Ordering::Greater => {
out.push(b.next().unwrap().clone());
}
Ordering::Equal => {
// `other` takes precedence for duplicate keys - reuse key from `a`
let (k, _) = a.next().unwrap();
out.push((k, b.next().unwrap().1.clone()));
}
}
}
// Append collected new items, as well as any remaining from `other` which are necessarily also
// new, and sort if needed
if !to_insert.is_empty() || other_iter.peek().is_some() {
target.extend(to_insert);
target.extend(other_iter.cloned());
target.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
// Drain remaining: `a` moves, `b` clones
out.extend(a);
out.extend(b.cloned());
*target = out;
}
#[cfg(test)]
@@ -63,4 +105,118 @@ mod tests {
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c_new")]);
}
#[test]
fn test_extend_sorted_vec_empty_target() {
let mut target: Vec<(i32, &str)> = vec![];
let other = vec![(1, "a"), (2, "b")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b")]);
}
#[test]
fn test_extend_sorted_vec_empty_other() {
let mut target = vec![(1, "a"), (2, "b")];
let other: Vec<(i32, &str)> = vec![];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b")]);
}
#[test]
fn test_extend_sorted_vec_all_duplicates() {
let mut target = vec![(1, "old1"), (2, "old2"), (3, "old3")];
let other = vec![(1, "new1"), (2, "new2"), (3, "new3")];
extend_sorted_vec(&mut target, &other);
// other takes precedence
assert_eq!(target, vec![(1, "new1"), (2, "new2"), (3, "new3")]);
}
#[test]
fn test_extend_sorted_vec_interleaved() {
let mut target = vec![(1, "a"), (3, "c"), (5, "e")];
let other = vec![(2, "b"), (4, "d"), (6, "f")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")]);
}
#[test]
fn test_extend_sorted_vec_other_all_smaller() {
let mut target = vec![(5, "e"), (6, "f")];
let other = vec![(1, "a"), (2, "b")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
}
#[test]
fn test_extend_sorted_vec_other_all_larger() {
let mut target = vec![(1, "a"), (2, "b")];
let other = vec![(5, "e"), (6, "f")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
}
#[test]
fn test_kway_merge_sorted_basic() {
let slice1 = vec![(1, "a1"), (3, "c1")];
let slice2 = vec![(2, "b2"), (3, "c2")];
let slice3 = vec![(1, "a3"), (4, "d3")];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
// First occurrence wins: key 1 -> a1 (slice1), key 3 -> c1 (slice1)
assert_eq!(result, vec![(1, "a1"), (2, "b2"), (3, "c1"), (4, "d3")]);
}
#[test]
fn test_kway_merge_sorted_empty_slices() {
let slice1: Vec<(i32, &str)> = vec![];
let slice2 = vec![(1, "a")];
let slice3: Vec<(i32, &str)> = vec![];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
assert_eq!(result, vec![(1, "a")]);
}
#[test]
fn test_kway_merge_sorted_all_same_key() {
let slice1 = vec![(5, "first")];
let slice2 = vec![(5, "middle")];
let slice3 = vec![(5, "last")];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
// First occurrence wins (slice1 has highest priority)
assert_eq!(result, vec![(5, "first")]);
}
#[test]
fn test_kway_merge_sorted_single_slice() {
let slice = vec![(1, "a"), (2, "b"), (3, "c")];
let result = kway_merge_sorted([slice.as_slice()]);
assert_eq!(result, vec![(1, "a"), (2, "b"), (3, "c")]);
}
#[test]
fn test_kway_merge_sorted_no_slices() {
let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new());
assert!(result.is_empty());
}
#[test]
fn test_prefer_sorted_merge_kway_threshold() {
assert!(!prefer_sorted_merge(30, 100_000));
assert!(!prefer_sorted_merge(50, 200_000));
assert!(prefer_sorted_merge(29, 29 * PAIRWISE_MIN_AVG_ITEMS));
}
#[test]
fn test_prefer_sorted_merge_pairwise_threshold() {
assert!(prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS));
assert!(prefer_sorted_merge(10, 10 * PAIRWISE_MIN_AVG_ITEMS + 1));
assert!(!prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS - 1));
}
#[test]
fn test_prefer_sorted_merge_small_data() {
assert!(!prefer_sorted_merge(2, 100));
assert!(!prefer_sorted_merge(5, 1000));
}
}

View File

@@ -39,6 +39,7 @@ reth-trie-common = { workspace = true, features = ["test-utils", "arbitrary"] }
reth-trie-db.workspace = true
reth-trie-sparse = { workspace = true, features = ["test-utils"] }
reth-trie.workspace = true
reth-tracing.workspace = true
# misc
arbitrary.workspace = true

View File

@@ -339,7 +339,12 @@ impl SparseTrieInterface for ParallelSparseTrie {
if let Some(reveal_path) = reveal_path {
let subtrie = self.subtrie_for_path_mut(&reveal_path);
if subtrie.nodes.get(&reveal_path).expect("node must exist").is_hash() {
let reveal_masks = if subtrie
.nodes
.get(&reveal_path)
.expect("node must exist")
.is_hash()
{
debug!(
target: "trie::parallel_sparse",
child_path = ?reveal_path,
@@ -360,12 +365,19 @@ impl SparseTrieInterface for ParallelSparseTrie {
);
let masks = BranchNodeMasks::from_optional(hash_mask, tree_mask);
subtrie.reveal_node(reveal_path, &decoded, masks)?;
masks
} else {
return Err(SparseTrieErrorKind::NodeNotFoundInProvider {
path: reveal_path,
}
.into())
}
} else {
None
};
if let Some(masks) = reveal_masks {
self.branch_node_masks.insert(reveal_path, masks);
}
}
@@ -436,7 +448,11 @@ impl SparseTrieInterface for ParallelSparseTrie {
// If we didn't update the target leaf, we need to call update_leaf on the subtrie
// to ensure that the leaf is updated correctly.
subtrie.update_leaf(full_path, value, provider, retain_updates)?;
if let Some((revealed_path, revealed_masks)) =
subtrie.update_leaf(full_path, value, provider, retain_updates)?
{
self.branch_node_masks.insert(revealed_path, revealed_masks);
}
}
Ok(())
@@ -1232,7 +1248,7 @@ impl ParallelSparseTrie {
) -> SparseTrieResult<SparseNode> {
let remaining_child_subtrie = self.subtrie_for_path_mut(remaining_child_path);
let remaining_child_node = match remaining_child_subtrie
let (remaining_child_node, remaining_child_masks) = match remaining_child_subtrie
.nodes
.get(remaining_child_path)
.unwrap()
@@ -1258,7 +1274,10 @@ impl ParallelSparseTrie {
);
let masks = BranchNodeMasks::from_optional(hash_mask, tree_mask);
remaining_child_subtrie.reveal_node(*remaining_child_path, &decoded, masks)?;
remaining_child_subtrie.nodes.get(remaining_child_path).unwrap().clone()
(
remaining_child_subtrie.nodes.get(remaining_child_path).unwrap().clone(),
masks,
)
} else {
return Err(SparseTrieErrorKind::NodeNotFoundInProvider {
path: *remaining_child_path,
@@ -1266,9 +1285,15 @@ impl ParallelSparseTrie {
.into())
}
}
node => node.clone(),
// The node is already revealed so we don't need to return its masks here, as they don't
// need to be inserted.
node => (node.clone(), None),
};
if let Some(masks) = remaining_child_masks {
self.branch_node_masks.insert(*remaining_child_path, masks);
}
// If `recurse_into_extension` is true, and the remaining child is an extension node, then
// its child will be ensured to be revealed as well. This is required for generation of
// trie updates; without revealing the grandchild branch it's not always possible to know
@@ -1636,9 +1661,9 @@ impl SparseSubtrie {
///
/// # Returns
///
/// Returns the `Ok` if the update is successful.
/// Returns the path and masks of any blinded node revealed as a result of updating the leaf.
///
/// Note: If an update requires revealing a blinded node, an error is returned if the blinded
/// If an update requires revealing a blinded node, an error is returned if the blinded
/// provider returns an error.
pub fn update_leaf(
&mut self,
@@ -1646,16 +1671,17 @@ impl SparseSubtrie {
value: Vec<u8>,
provider: impl TrieNodeProvider,
retain_updates: bool,
) -> SparseTrieResult<()> {
) -> SparseTrieResult<Option<(Nibbles, BranchNodeMasks)>> {
debug_assert!(full_path.starts_with(&self.path));
let existing = self.inner.values.insert(full_path, value);
if existing.is_some() {
// trie structure unchanged, return immediately
return Ok(())
return Ok(None)
}
// Here we are starting at the root of the subtrie, and traversing from there.
let mut current = Some(self.path);
let mut revealed = None;
while let Some(current_path) = current {
match self.update_next_node(current_path, &full_path, retain_updates)? {
LeafUpdateStep::Continue { next_node } => {
@@ -1685,6 +1711,12 @@ impl SparseSubtrie {
);
let masks = BranchNodeMasks::from_optional(hash_mask, tree_mask);
self.reveal_node(reveal_path, &decoded, masks)?;
debug_assert_eq!(
revealed, None,
"Only a single blinded node should be revealed during update_leaf"
);
revealed = masks.map(|masks| (reveal_path, masks));
} else {
return Err(SparseTrieErrorKind::NodeNotFoundInProvider {
path: reveal_path,
@@ -1701,7 +1733,7 @@ impl SparseSubtrie {
}
}
Ok(())
Ok(revealed)
}
/// Processes the current node, returning what to do next in the leaf update process.
@@ -6803,4 +6835,137 @@ mod tests {
if path == path_to_blind && hash == blinded_hash
);
}
#[test]
fn test_mainnet_block_24185431_storage_0x6ba784ee() {
reth_tracing::init_test_tracing();
// Reveal branch at 0x3 with full state
let mut branch_0x3_hashes = vec![
B256::from(hex!("fc11ba8de4b220b8f19a09f0676c69b8e18bae1350788392640069e59b41733d")),
B256::from(hex!("8afe085cc6685680bd8ba4bac6e65937a4babf737dc5e7413d21cdda958e8f74")),
B256::from(hex!("c7b6f7c0fc601a27aece6ec178fd9be17cdee77c4884ecfbe1ee459731eb57da")),
B256::from(hex!("71c1aec60db78a2deb4e10399b979a2ed5be42b4ee0c0a17c614f9ddc9f9072e")),
B256::from(hex!("e9261302e7c0b77930eaf1851b585210906cd01e015ab6be0f7f3c0cc947c32a")),
B256::from(hex!("38ce8f369c56bd77fabdf679b27265b1f8d0a54b09ef612c8ee8ddfc6b3fab95")),
B256::from(hex!("7b507a8936a28c5776b647d1c4bda0bbbb3d0d227f16c5f5ebba58d02e31918d")),
B256::from(hex!("0f456b9457a824a81e0eb555aa861461acb38674dcf36959b3b26deb24ed0af9")),
B256::from(hex!("2145420289652722ad199ba932622e3003c779d694fa5a2acfb2f77b0782b38a")),
B256::from(hex!("2c1a04dce1a9e2f1cfbf8806edce50a356dfa58e7e7c542c848541502613b796")),
B256::from(hex!("dad7ca55186ac8f40d4450dc874166df8267b44abc07e684d9507260f5712df3")),
B256::from(hex!("3a8c2a1d7d2423e92965ec29014634e7f0307ded60b1a63d28c86c3222b24236")),
B256::from(hex!("4e9929e6728b3a7bf0db6a0750ab376045566b556c9c605e606ecb8ec25200d7")),
B256::from(hex!("1797c36f98922f52292c161590057a1b5582d5503e3370bcfbf6fd939f3ec98b")),
B256::from(hex!("9e514589a9c9210b783c19fa3f0b384bbfaefe98f10ea189a2bfc58c6bf000a1")),
B256::from(hex!("85bdaabbcfa583cbd049650e41d3d19356bd833b3ed585cf225a3548557c7fa3")),
];
let branch_0x3_node = create_branch_node_with_children(
&[0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf],
branch_0x3_hashes.iter().map(RlpNode::word_rlp),
);
// Reveal branch at 0x31
let branch_0x31_hashes = vec![B256::from(hex!(
"3ca994ba59ce70b83fee1f01731c8dac4fdd0f70ade79bf9b0695c4c53531aab"
))];
let branch_0x31_node = create_branch_node_with_children(
&[0xc],
branch_0x31_hashes.into_iter().map(|h| RlpNode::word_rlp(&h)),
);
// Reveal leaf at 0x31b0b645a6c4a0a1bb3d2f0c1d31c39f4aba2e3b015928a8eef7161e28388b81
let leaf_path = hex!("31b0b645a6c4a0a1bb3d2f0c1d31c39f4aba2e3b015928a8eef7161e28388b81");
let leaf_nibbles = Nibbles::unpack(leaf_path.as_slice());
let leaf_value = hex!("0009ae8ce8245bff").to_vec();
// Reveal branch at 0x31c
let branch_0x31c_hashes = vec![
B256::from(hex!("1a68fdb36b77e9332b49a977faf800c22d0199e6cecf44032bb083c78943e540")),
B256::from(hex!("cd4622c6df6fd7172c7fed1b284ef241e0f501b4c77b675ef10c612bd0948a7a")),
B256::from(hex!("abf3603d2f991787e21f1709ee4c7375d85dfc506995c0435839fccf3fe2add4")),
];
let branch_0x31c_node = create_branch_node_with_children(
&[0x3, 0x7, 0xc],
branch_0x31c_hashes.into_iter().map(|h| RlpNode::word_rlp(&h)),
);
let mut branch_0x31c_node_encoded = Vec::new();
branch_0x31c_node.encode(&mut branch_0x31c_node_encoded);
// Create a mock provider and preload 0x31c onto it, it will be revealed during remove_leaf.
let mut provider = MockTrieNodeProvider::new();
provider.add_revealed_node(
Nibbles::from_nibbles([0x3, 0x1, 0xc]),
RevealedNode {
node: branch_0x31c_node_encoded.into(),
tree_mask: Some(0.into()),
hash_mask: Some(4096.into()),
},
);
// Reveal the trie structure using ProofTrieNode
let proof_nodes = vec![
ProofTrieNode {
path: Nibbles::from_nibbles([0x3]),
node: branch_0x3_node,
masks: Some(BranchNodeMasks {
tree_mask: TrieMask::new(26099),
hash_mask: TrieMask::new(65535),
}),
},
ProofTrieNode {
path: Nibbles::from_nibbles([0x3, 0x1]),
node: branch_0x31_node,
masks: Some(BranchNodeMasks {
tree_mask: TrieMask::new(4096),
hash_mask: TrieMask::new(4096),
}),
},
];
// Create a sparse trie and reveal nodes
let mut trie = ParallelSparseTrie::default()
.with_root(
TrieNode::Extension(ExtensionNode {
key: Nibbles::from_nibbles([0x3]),
child: RlpNode::word_rlp(&B256::ZERO),
}),
None,
true,
)
.expect("root revealed");
trie.reveal_nodes(proof_nodes).unwrap();
// Update the leaf in order to reveal it in the trie
trie.update_leaf(leaf_nibbles, leaf_value, &provider).unwrap();
// Now delete the leaf
trie.remove_leaf(&leaf_nibbles, &provider).unwrap();
// Compute the root to trigger updates
let _ = trie.root();
// Assert the resulting branch node updates
let updates = trie.updates_ref();
// Check that the branch at 0x3 was updated with the expected structure
let branch_0x3_update = updates
.updated_nodes
.get(&Nibbles::from_nibbles([0x3]))
.expect("Branch at 0x3 should be in updates");
// We no longer expect to track the hash for child 1
branch_0x3_hashes.remove(1);
// Expected structure from prompt.md
let expected_branch = BranchNodeCompact::new(
0b1111111111111111,
0b0110010111110011,
0b1111111111111101,
branch_0x3_hashes,
None,
);
assert_eq!(branch_0x3_update, &expected_branch);
}
}

View File

@@ -53,9 +53,13 @@ impl<'a, K, V> ForwardInMemoryCursor<'a, K, V> {
}
}
/// Threshold for remaining entries above which binary search is used instead of linear scan.
/// For small slices, linear scan has better cache locality and lower overhead.
const BINARY_SEARCH_THRESHOLD: usize = 64;
impl<K, V> ForwardInMemoryCursor<'_, K, V>
where
K: PartialOrd + Clone,
K: Ord + Clone,
V: Clone,
{
/// Returns the first entry from the current cursor position that's greater or equal to the
@@ -73,19 +77,22 @@ where
/// Advances the cursor forward while `predicate` returns `true` or until the collection is
/// exhausted.
///
/// Uses binary search for large remaining slices (>= 64 entries), linear scan for small ones.
///
/// Returns the first entry for which `predicate` returns `false` or `None`. The cursor will
/// point to the returned entry.
fn advance_while(&mut self, predicate: impl Fn(&K) -> bool) -> Option<(K, V)> {
let mut entry;
loop {
entry = self.current();
if entry.is_some_and(|(k, _)| predicate(k)) {
let remaining = self.entries.len().saturating_sub(self.idx);
if remaining >= BINARY_SEARCH_THRESHOLD {
let slice = &self.entries[self.idx..];
let pos = slice.partition_point(|(k, _)| predicate(k));
self.idx += pos;
} else {
while self.current().is_some_and(|(k, _)| predicate(k)) {
self.next();
} else {
break;
}
}
entry.cloned()
self.current().cloned()
}
}
@@ -94,7 +101,7 @@ mod tests {
use super::*;
#[test]
fn test_cursor() {
fn test_cursor_small() {
let mut cursor = ForwardInMemoryCursor::new(&[(1, ()), (2, ()), (3, ()), (4, ()), (5, ())]);
assert_eq!(cursor.current(), Some(&(1, ())));
@@ -113,4 +120,72 @@ mod tests {
assert_eq!(cursor.seek(&6), None);
assert_eq!(cursor.current(), None);
}
#[test]
fn test_cursor_large_binary_search() {
// Create a large enough collection to trigger binary search
let entries: Vec<(i32, ())> = (0..200).map(|i| (i * 2, ())).collect();
let mut cursor = ForwardInMemoryCursor::new(&entries);
// Seek to beginning
assert_eq!(cursor.seek(&0), Some((0, ())));
assert_eq!(cursor.idx, 0);
// Seek to middle (should use binary search)
assert_eq!(cursor.seek(&100), Some((100, ())));
assert_eq!(cursor.idx, 50);
// Seek to non-existent key (should find next greater)
assert_eq!(cursor.seek(&101), Some((102, ())));
assert_eq!(cursor.idx, 51);
// Seek to end
assert_eq!(cursor.seek(&398), Some((398, ())));
assert_eq!(cursor.idx, 199);
// Seek past end
assert_eq!(cursor.seek(&1000), None);
}
#[test]
fn test_first_after_large() {
let entries: Vec<(i32, ())> = (0..200).map(|i| (i * 2, ())).collect();
let mut cursor = ForwardInMemoryCursor::new(&entries);
// first_after should find strictly greater
assert_eq!(cursor.first_after(&0), Some((2, ())));
assert_eq!(cursor.idx, 1);
// Reset and test from beginning
cursor.reset();
assert_eq!(cursor.first_after(&99), Some((100, ())));
// first_after on exact match
cursor.reset();
assert_eq!(cursor.first_after(&100), Some((102, ())));
}
#[test]
fn test_cursor_consistency() {
// Verify binary search and linear scan produce same results
let entries: Vec<(i32, ())> = (0..200).map(|i| (i * 3, ())).collect();
for search_key in [0, 1, 3, 50, 150, 299, 300, 597, 598, 599, 1000] {
// Test with fresh cursor (binary search path)
let mut cursor1 = ForwardInMemoryCursor::new(&entries);
let result1 = cursor1.seek(&search_key);
// Manually advance to trigger linear path by getting close first
let mut cursor2 = ForwardInMemoryCursor::new(&entries);
if search_key > 100 {
cursor2.seek(&(search_key - 50));
}
let result2 = cursor2.seek(&search_key);
assert_eq!(
result1, result2,
"Mismatch for key {search_key}: binary={result1:?}, linear={result2:?}"
);
}
}
}

View File

@@ -159,7 +159,7 @@ pub trait DbTx: Debug + Send + Sync {
) -> Result<Option<T::Value>, DatabaseError>;
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, DatabaseError>;
fn commit(self) -> Result<(), DatabaseError>;
/// Aborts transaction
fn abort(self);
/// Iterate over read only values in table.

View File

@@ -31,8 +31,8 @@
- [`reth db settings set transaction_senders`](./reth/db/settings/set/transaction_senders.mdx)
- [`reth db settings set account_changesets`](./reth/db/settings/set/account_changesets.mdx)
- [`reth db settings set storages_history`](./reth/db/settings/set/storages_history.mdx)
- [`reth db settings set transaction_hash_numbers`](./reth/db/settings/set/transaction_hash_numbers.mdx)
- [`reth db settings set account_history`](./reth/db/settings/set/account_history.mdx)
- [`reth db settings set tx_hash_numbers`](./reth/db/settings/set/tx_hash_numbers.mdx)
- [`reth db account-storage`](./reth/db/account-storage.mdx)
- [`reth download`](./reth/download.mdx)
- [`reth stage`](./reth/stage.mdx)
@@ -87,8 +87,8 @@
- [`op-reth db settings set transaction_senders`](./op-reth/db/settings/set/transaction_senders.mdx)
- [`op-reth db settings set account_changesets`](./op-reth/db/settings/set/account_changesets.mdx)
- [`op-reth db settings set storages_history`](./op-reth/db/settings/set/storages_history.mdx)
- [`op-reth db settings set transaction_hash_numbers`](./op-reth/db/settings/set/transaction_hash_numbers.mdx)
- [`op-reth db settings set account_history`](./op-reth/db/settings/set/account_history.mdx)
- [`op-reth db settings set tx_hash_numbers`](./op-reth/db/settings/set/tx_hash_numbers.mdx)
- [`op-reth db account-storage`](./op-reth/db/account-storage.mdx)
- [`op-reth stage`](./op-reth/stage.mdx)
- [`op-reth stage run`](./op-reth/stage/run.mdx)

View File

@@ -99,6 +99,24 @@ Logging:
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
@@ -123,9 +141,9 @@ Tracing:
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces.
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` - `grpc`: expects endpoint without a path
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.

View File

@@ -87,6 +87,24 @@ Logging:
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
@@ -111,9 +129,9 @@ Tracing:
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces.
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` - `grpc`: expects endpoint without a path
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.

View File

@@ -145,13 +145,6 @@ Static Files:
Note: This setting can only be configured at genesis initialization. Once the node has been initialized, changing this flag requires re-syncing from scratch.
--storage.rocksdb
Use `RocksDB` for history indices instead of MDBX.
When enabled, `AccountsHistory`, `StoragesHistory`, and `TransactionHashNumbers` tables will be stored in `RocksDB` for better write performance.
Note: This setting can only be configured at genesis initialization. Once the node has been initialized, changing this flag requires re-syncing from scratch.
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
@@ -221,6 +214,24 @@ Logging:
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
@@ -245,9 +256,9 @@ Tracing:
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces.
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` - `grpc`: expects endpoint without a path
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.

View File

@@ -95,6 +95,24 @@ Logging:
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
@@ -119,9 +137,9 @@ Tracing:
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces.
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` - `grpc`: expects endpoint without a path
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.

Some files were not shown because too many files have changed in this diff Show More