Merge branch 'new-approach3' of https://github.com/Rimeeeeee/reth into new-approach3

This commit is contained in:
Soubhik Singha Mahapatra
2025-11-05 14:42:16 +05:30
38 changed files with 3749 additions and 409 deletions

View File

@@ -11,6 +11,7 @@ exclude_crates=(
# The following require investigation if they can be fixed
reth-basic-payload-builder
reth-bench
reth-bench-compare
reth-cli
reth-cli-commands
reth-cli-runner

111
Cargo.lock generated
View File

@@ -1667,6 +1667,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
dependencies = [
"objc2",
]
[[package]]
name = "blst"
version = "0.3.16"
@@ -2660,6 +2669,17 @@ dependencies = [
"cipher",
]
[[package]]
name = "ctrlc"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
dependencies = [
"dispatch2",
"nix 0.30.1",
"windows-sys 0.61.2",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
@@ -2687,6 +2707,19 @@ dependencies = [
"syn 2.0.108",
]
[[package]]
name = "custom-hardforks"
version = "0.1.0"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-primitives",
"reth-chainspec",
"reth-network-peers",
"serde",
]
[[package]]
name = "darling"
version = "0.20.11"
@@ -3042,6 +3075,18 @@ dependencies = [
"zeroize",
]
[[package]]
name = "dispatch2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec"
dependencies = [
"bitflags 2.10.0",
"block2",
"libc",
"objc2",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -5833,6 +5878,30 @@ dependencies = [
"unsigned-varint",
]
[[package]]
name = "nix"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags 2.10.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.10.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -6022,6 +6091,21 @@ dependencies = [
"smallvec",
]
[[package]]
name = "objc2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05"
dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2-encode"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]]
name = "once_cell"
version = "1.21.3"
@@ -7324,6 +7408,32 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-bench-compare"
version = "1.8.3"
dependencies = [
"alloy-primitives",
"alloy-provider",
"alloy-rpc-types-eth",
"chrono",
"clap",
"csv",
"ctrlc",
"eyre",
"nix 0.29.0",
"reth-chainspec",
"reth-cli-runner",
"reth-cli-util",
"reth-node-core",
"reth-tracing",
"serde",
"serde_json",
"shellexpand",
"shlex",
"tokio",
"tracing",
]
[[package]]
name = "reth-chain-state"
version = "1.8.3"
@@ -9534,7 +9644,6 @@ dependencies = [
"reth-primitives-traits",
"reth-rpc",
"reth-rpc-api",
"reth-rpc-convert",
"reth-rpc-engine-api",
"reth-rpc-eth-api",
"reth-rpc-eth-types",

View File

@@ -10,6 +10,7 @@ exclude = [".github/"]
[workspace]
members = [
"bin/reth-bench/",
"bin/reth-bench-compare/",
"bin/reth/",
"crates/storage/rpc-provider/",
"crates/chain-state/",
@@ -147,6 +148,7 @@ members = [
"examples/custom-node/",
"examples/custom-engine-types/",
"examples/custom-evm/",
"examples/custom-hardforks/",
"examples/custom-inspector/",
"examples/custom-node-components/",
"examples/custom-payload-builder/",
@@ -333,6 +335,7 @@ reth = { path = "bin/reth" }
reth-storage-rpc-provider = { path = "crates/storage/rpc-provider" }
reth-basic-payload-builder = { path = "crates/payload/basic" }
reth-bench = { path = "bin/reth-bench" }
reth-bench-compare = { path = "bin/reth-bench-compare" }
reth-chain-state = { path = "crates/chain-state" }
reth-chainspec = { path = "crates/chainspec", default-features = false }
reth-cli = { path = "crates/cli/cli" }
@@ -585,6 +588,7 @@ serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_with = { version = "3", default-features = false, features = ["macros"] }
sha2 = { version = "0.10", default-features = false }
shellexpand = "3.0.0"
shlex = "1.3"
smallvec = "1"
strum = { version = "0.27", default-features = false }
strum_macros = "0.27"
@@ -708,6 +712,7 @@ concat-kdf = "0.1.0"
crossbeam-channel = "0.5.13"
crossterm = "0.28.0"
csv = "1.3.0"
ctrlc = "3.4"
ctr = "0.9.2"
data-encoding = "2"
delegate = "0.13"

View File

@@ -0,0 +1,96 @@
[package]
name = "reth-bench-compare"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Automated reth benchmark comparison between git references"
[lints]
workspace = true
[[bin]]
name = "reth-bench-compare"
path = "src/main.rs"
[dependencies]
# reth
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-node-core.workspace = true
reth-tracing.workspace = true
reth-chainspec.workspace = true
# alloy
alloy-provider = { workspace = true, features = ["reqwest-rustls-tls"], default-features = false }
alloy-rpc-types-eth.workspace = true
alloy-primitives.workspace = true
# CLI and argument parsing
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
# Async runtime
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
# Serialization
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
# Time handling
chrono = { workspace = true, features = ["serde"] }
# Path manipulation
shellexpand.workspace = true
# CSV handling
csv.workspace = true
# Process management
ctrlc.workspace = true
shlex.workspace = true
[target.'cfg(unix)'.dependencies]
nix = { version = "0.29", features = ["signal", "process"] }
[features]
default = ["jemalloc"]
asm-keccak = [
"reth-node-core/asm-keccak",
"alloy-primitives/asm-keccak",
]
jemalloc = [
"reth-cli-util/jemalloc",
"reth-node-core/jemalloc",
]
jemalloc-prof = ["reth-cli-util/jemalloc-prof"]
tracy-allocator = ["reth-cli-util/tracy-allocator"]
min-error-logs = [
"tracing/release_max_level_error",
"reth-node-core/min-error-logs",
]
min-warn-logs = [
"tracing/release_max_level_warn",
"reth-node-core/min-warn-logs",
]
min-info-logs = [
"tracing/release_max_level_info",
"reth-node-core/min-info-logs",
]
min-debug-logs = [
"tracing/release_max_level_debug",
"reth-node-core/min-debug-logs",
]
min-trace-logs = [
"tracing/release_max_level_trace",
"reth-node-core/min-trace-logs",
]
# no-op feature flag for switching between the `optimism` and default functionality in CI matrices
ethereum = []

View File

@@ -0,0 +1,296 @@
//! Benchmark execution using reth-bench.
use crate::cli::Args;
use eyre::{eyre, Result, WrapErr};
use std::{
path::Path,
sync::{Arc, Mutex},
};
use tokio::{
fs::File as AsyncFile,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
process::Command,
};
use tracing::{debug, error, info, warn};
/// Manages benchmark execution using reth-bench
pub(crate) struct BenchmarkRunner {
rpc_url: String,
jwt_secret: String,
wait_time: Option<String>,
warmup_blocks: u64,
}
impl BenchmarkRunner {
/// Create a new `BenchmarkRunner` from CLI arguments
pub(crate) fn new(args: &Args) -> Self {
Self {
rpc_url: args.get_rpc_url(),
jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
wait_time: args.wait_time.clone(),
warmup_blocks: args.get_warmup_blocks(),
}
}
/// Clear filesystem caches (page cache, dentries, and inodes)
pub(crate) async fn clear_fs_caches() -> Result<()> {
info!("Clearing filesystem caches...");
// First sync to ensure all pending writes are flushed
let sync_output =
Command::new("sync").output().await.wrap_err("Failed to execute sync command")?;
if !sync_output.status.success() {
return Err(eyre!("sync command failed"));
}
// Drop caches - requires sudo/root permissions
// 3 = drop pagecache, dentries, and inodes
let drop_caches_cmd = Command::new("sudo")
.args(["-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
.output()
.await;
match drop_caches_cmd {
Ok(output) if output.status.success() => {
info!("Successfully cleared filesystem caches");
Ok(())
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("sudo: a password is required") {
warn!("Unable to clear filesystem caches: sudo password required");
warn!(
"For optimal benchmarking, configure passwordless sudo for cache clearing:"
);
warn!(" echo '$USER ALL=(ALL) NOPASSWD: /bin/sh -c echo\\\\ [0-9]\\\\ \\\\>\\\\ /proc/sys/vm/drop_caches' | sudo tee /etc/sudoers.d/drop_caches");
Ok(())
} else {
Err(eyre!("Failed to clear filesystem caches: {}", stderr))
}
}
Err(e) => {
warn!("Unable to clear filesystem caches: {}", e);
Ok(())
}
}
}
/// Run a warmup benchmark for cache warming
pub(crate) async fn run_warmup(&self, from_block: u64) -> Result<()> {
let to_block = from_block + self.warmup_blocks;
info!(
"Running warmup benchmark from block {} to {} ({} blocks)",
from_block, to_block, self.warmup_blocks
);
// Build the reth-bench command for warmup (no output flag)
let mut cmd = Command::new("reth-bench");
cmd.args([
"new-payload-fcu",
"--rpc-url",
&self.rpc_url,
"--jwt-secret",
&self.jwt_secret,
"--from",
&from_block.to_string(),
"--to",
&to_block.to_string(),
]);
// Add wait-time argument if provided
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
}
cmd.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
// Set process group for consistent signal handling
#[cfg(unix)]
{
cmd.process_group(0);
}
debug!("Executing warmup reth-bench command: {:?}", cmd);
// Execute the warmup benchmark
let mut child = cmd.spawn().wrap_err("Failed to start warmup reth-bench process")?;
// Stream output at debug level
if let Some(stdout) = child.stdout.take() {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[WARMUP] {}", line);
}
});
}
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[WARMUP] {}", line);
}
});
}
let status = child.wait().await.wrap_err("Failed to wait for warmup reth-bench")?;
if !status.success() {
return Err(eyre!("Warmup reth-bench failed with exit code: {:?}", status.code()));
}
info!("Warmup completed successfully");
Ok(())
}
/// Run a benchmark for the specified block range
pub(crate) async fn run_benchmark(
&self,
from_block: u64,
to_block: u64,
output_dir: &Path,
) -> Result<()> {
info!(
"Running benchmark from block {} to {} (output: {:?})",
from_block, to_block, output_dir
);
// Ensure output directory exists
std::fs::create_dir_all(output_dir)
.wrap_err_with(|| format!("Failed to create output directory: {output_dir:?}"))?;
// Create log file path for reth-bench output
let log_file_path = output_dir.join("reth_bench.log");
info!("reth-bench logs will be saved to: {:?}", log_file_path);
// Build the reth-bench command
let mut cmd = Command::new("reth-bench");
cmd.args([
"new-payload-fcu",
"--rpc-url",
&self.rpc_url,
"--jwt-secret",
&self.jwt_secret,
"--from",
&from_block.to_string(),
"--to",
&to_block.to_string(),
"--output",
&output_dir.to_string_lossy(),
]);
// Add wait-time argument if provided
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
}
cmd.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
// Set process group for consistent signal handling
#[cfg(unix)]
{
cmd.process_group(0);
}
// Debug log the command
debug!("Executing reth-bench command: {:?}", cmd);
// Execute the benchmark
let mut child = cmd.spawn().wrap_err("Failed to start reth-bench process")?;
// Capture stdout and stderr for error reporting
let stdout_lines = Arc::new(Mutex::new(Vec::new()));
let stderr_lines = Arc::new(Mutex::new(Vec::new()));
// Stream stdout with prefix at debug level, capture for error reporting, and write to log
// file
if let Some(stdout) = child.stdout.take() {
let stdout_lines_clone = stdout_lines.clone();
let log_file = AsyncFile::create(&log_file_path)
.await
.wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
let mut log_file = log_file;
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH-BENCH] {}", line);
if let Ok(mut captured) = stdout_lines_clone.lock() {
captured.push(line.clone());
}
// Write to log file (reth-bench output already has timestamps if needed)
let log_line = format!("{}\n", line);
if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
debug!("Failed to write to log file: {}", e);
}
}
});
}
// Stream stderr with prefix at debug level, capture for error reporting, and write to log
// file
if let Some(stderr) = child.stderr.take() {
let stderr_lines_clone = stderr_lines.clone();
let log_file = AsyncFile::options()
.create(true)
.append(true)
.open(&log_file_path)
.await
.wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
let mut log_file = log_file;
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH-BENCH] {}", line);
if let Ok(mut captured) = stderr_lines_clone.lock() {
captured.push(line.clone());
}
// Write to log file (reth-bench output already has timestamps if needed)
let log_line = format!("{}\n", line);
if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
debug!("Failed to write to log file: {}", e);
}
}
});
}
let status = child.wait().await.wrap_err("Failed to wait for reth-bench")?;
if !status.success() {
// Print all captured output when command fails
error!("reth-bench failed with exit code: {:?}", status.code());
if let Ok(stdout) = stdout_lines.lock() &&
!stdout.is_empty()
{
error!("reth-bench stdout:");
for line in stdout.iter() {
error!(" {}", line);
}
}
if let Ok(stderr) = stderr_lines.lock() &&
!stderr.is_empty()
{
error!("reth-bench stderr:");
for line in stderr.iter() {
error!(" {}", line);
}
}
return Err(eyre!("reth-bench failed with exit code: {:?}", status.code()));
}
info!("Benchmark completed");
Ok(())
}
}

View File

@@ -0,0 +1,931 @@
//! CLI argument parsing and main command orchestration.
use alloy_provider::{Provider, ProviderBuilder};
use clap::Parser;
use eyre::{eyre, Result, WrapErr};
use reth_chainspec::Chain;
use reth_cli_runner::CliContext;
use reth_node_core::args::{DatadirArgs, LogArgs};
use reth_tracing::FileWorkerGuard;
use std::{net::TcpListener, path::PathBuf, str::FromStr};
use tokio::process::Command;
use tracing::{debug, info, warn};
use crate::{
benchmark::BenchmarkRunner, comparison::ComparisonGenerator, compilation::CompilationManager,
git::GitManager, node::NodeManager,
};
/// Target for disabling the --debug.startup-sync-state-idle flag
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DisableStartupSyncStateIdle {
/// Disable for baseline and warmup runs
Baseline,
/// Disable for feature runs only
Feature,
/// Disable for all runs
All,
}
impl FromStr for DisableStartupSyncStateIdle {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"baseline" => Ok(Self::Baseline),
"feature" => Ok(Self::Feature),
"all" => Ok(Self::All),
_ => Err(format!("Invalid value '{}'. Expected 'baseline', 'feature', or 'all'", s)),
}
}
}
impl std::fmt::Display for DisableStartupSyncStateIdle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Baseline => write!(f, "baseline"),
Self::Feature => write!(f, "feature"),
Self::All => write!(f, "all"),
}
}
}
/// Automated reth benchmark comparison between git references
#[derive(Debug, Parser)]
#[command(
name = "reth-bench-compare",
about = "Compare reth performance between two git references (branches or tags)",
version
)]
pub(crate) struct Args {
/// Git reference (branch or tag) to use as baseline for comparison
#[arg(long, value_name = "REF")]
pub baseline_ref: String,
/// Git reference (branch or tag) to compare against the baseline
#[arg(long, value_name = "REF")]
pub feature_ref: String,
#[command(flatten)]
pub datadir: DatadirArgs,
/// Number of blocks to benchmark
#[arg(long, value_name = "N", default_value = "100")]
pub blocks: u64,
/// RPC endpoint for fetching block data
#[arg(long, value_name = "URL")]
pub rpc_url: Option<String>,
/// JWT secret file path
///
/// If not provided, defaults to `<datadir>/<chain>/jwt.hex`.
/// If the file doesn't exist, it will be created automatically.
#[arg(long, value_name = "PATH")]
pub jwt_secret: Option<PathBuf>,
/// Output directory for benchmark results
#[arg(long, value_name = "PATH", default_value = "./reth-bench-compare")]
pub output_dir: String,
/// Skip git branch validation (useful for testing)
#[arg(long)]
pub skip_git_validation: bool,
/// Port for reth metrics endpoint
#[arg(long, value_name = "PORT", default_value = "5005")]
pub metrics_port: u16,
/// The chain this node is running.
///
/// Possible values are either a built-in chain name or numeric chain ID.
#[arg(long, value_name = "CHAIN", default_value = "mainnet", required = false)]
pub chain: Chain,
/// Run reth binary with sudo (for elevated privileges)
#[arg(long)]
pub sudo: bool,
/// Generate comparison charts using Python script
#[arg(long)]
pub draw: bool,
/// Enable CPU profiling with samply during benchmark runs
#[arg(long)]
pub profile: bool,
/// Wait time between engine API calls (passed to reth-bench)
#[arg(long, value_name = "DURATION")]
pub wait_time: Option<String>,
/// Number of blocks to run for cache warmup after clearing caches.
/// If not specified, defaults to the same as --blocks
#[arg(long, value_name = "N")]
pub warmup_blocks: Option<u64>,
/// Disable filesystem cache clearing before warmup phase.
/// By default, filesystem caches are cleared before warmup to ensure consistent benchmarks.
#[arg(long)]
pub no_clear_cache: bool,
#[command(flatten)]
pub logs: LogArgs,
/// Additional arguments to pass to baseline reth node command
///
/// Example: `--baseline-args "--debug.tip 0xabc..."`
#[arg(long, value_name = "ARGS")]
pub baseline_args: Option<String>,
/// Additional arguments to pass to feature reth node command
///
/// Example: `--feature-args "--debug.tip 0xdef..."`
#[arg(long, value_name = "ARGS")]
pub feature_args: Option<String>,
/// Additional arguments to pass to reth node command (applied to both baseline and feature)
///
/// All arguments after `--` will be passed directly to the reth node command.
/// Example: `reth-bench-compare --baseline-ref main --feature-ref pr/123 -- --debug.tip
/// 0xabc...`
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
pub reth_args: Vec<String>,
/// Comma-separated list of features to enable during reth compilation
///
/// Example: `jemalloc,asm-keccak`
#[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
pub features: String,
/// Disable automatic --debug.startup-sync-state-idle flag for specific runs.
/// Can be "baseline", "feature", or "all".
/// By default, the flag is passed to warmup, baseline, and feature runs.
/// When "baseline" is specified, the flag is NOT passed to warmup OR baseline.
/// When "feature" is specified, the flag is NOT passed to feature.
/// When "all" is specified, the flag is NOT passed to any run.
#[arg(long, value_name = "TARGET")]
pub disable_startup_sync_state_idle: Option<DisableStartupSyncStateIdle>,
}
impl Args {
/// Initializes tracing with the configured options.
pub(crate) fn init_tracing(&self) -> Result<Option<FileWorkerGuard>> {
let guard = self.logs.init_tracing()?;
Ok(guard)
}
/// Build additional arguments for a specific ref type, conditionally including
/// --debug.startup-sync-state-idle based on the configuration
pub(crate) fn build_additional_args(
&self,
ref_type: &str,
base_args_str: Option<&String>,
) -> Vec<String> {
// Parse the base arguments string if provided
let mut args = base_args_str.map(|s| parse_args_string(s)).unwrap_or_default();
// Determine if we should add the --debug.startup-sync-state-idle flag
let should_add_flag = match self.disable_startup_sync_state_idle {
None => true, // By default, add the flag
Some(DisableStartupSyncStateIdle::All) => false,
Some(DisableStartupSyncStateIdle::Baseline) => {
ref_type != "baseline" && ref_type != "warmup"
}
Some(DisableStartupSyncStateIdle::Feature) => ref_type != "feature",
};
if should_add_flag {
args.push("--debug.startup-sync-state-idle".to_string());
debug!("Adding --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
} else {
debug!("Skipping --debug.startup-sync-state-idle flag for ref_type: {}", ref_type);
}
args
}
/// Get the default RPC URL for a given chain
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
match chain.id() {
8453 => "https://base-mainnet.rpc.ithaca.xyz", // base
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://reth-ethereum.ithaca.xyz/rpc", // mainnet and fallback
}
}
/// Get the RPC URL, using chain-specific default if not provided
pub(crate) fn get_rpc_url(&self) -> String {
self.rpc_url.clone().unwrap_or_else(|| Self::get_default_rpc_url(&self.chain).to_string())
}
/// Get the JWT secret path - either provided or derived from datadir
pub(crate) fn jwt_secret_path(&self) -> PathBuf {
match &self.jwt_secret {
Some(path) => {
let jwt_secret_str = path.to_string_lossy();
let expanded = shellexpand::tilde(&jwt_secret_str);
PathBuf::from(expanded.as_ref())
}
None => {
// Use the same logic as reth: <datadir>/<chain>/jwt.hex
let chain_path = self.datadir.clone().resolve_datadir(self.chain);
chain_path.jwt()
}
}
}
/// Get the resolved datadir path using the chain
pub(crate) fn datadir_path(&self) -> PathBuf {
let chain_path = self.datadir.clone().resolve_datadir(self.chain);
chain_path.data_dir().to_path_buf()
}
/// Get the expanded output directory path
pub(crate) fn output_dir_path(&self) -> PathBuf {
let expanded = shellexpand::tilde(&self.output_dir);
PathBuf::from(expanded.as_ref())
}
/// Get the effective warmup blocks value - either specified or defaults to blocks
pub(crate) fn get_warmup_blocks(&self) -> u64 {
self.warmup_blocks.unwrap_or(self.blocks)
}
}
/// Validate that the RPC endpoint chain ID matches the specified chain
async fn validate_rpc_chain_id(rpc_url: &str, expected_chain: &Chain) -> Result<()> {
// Create Alloy provider
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
let provider = ProviderBuilder::new().connect_http(url);
// Query chain ID using Alloy
let rpc_chain_id = provider
.get_chain_id()
.await
.map_err(|e| eyre!("Failed to get chain ID from RPC endpoint {}: {:?}", rpc_url, e))?;
let expected_chain_id = expected_chain.id();
if rpc_chain_id != expected_chain_id {
return Err(eyre!(
"RPC endpoint chain ID mismatch!\n\
Expected: {} (chain: {})\n\
Found: {} at RPC endpoint: {}\n\n\
Please use an RPC endpoint for the correct network or change the --chain argument.",
expected_chain_id,
expected_chain,
rpc_chain_id,
rpc_url
));
}
info!("Validated RPC endpoint chain ID");
Ok(())
}
/// Main comparison workflow execution
pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
// Create a new process group for this process and all its children
#[cfg(unix)]
{
use nix::unistd::{getpid, setpgid};
if let Err(e) = setpgid(getpid(), getpid()) {
warn!("Failed to create process group: {e}");
}
}
info!(
"Starting benchmark comparison between '{}' and '{}'",
args.baseline_ref, args.feature_ref
);
if args.sudo {
info!("Running in sudo mode - reth commands will use elevated privileges");
}
// Initialize Git manager
let git_manager = GitManager::new()?;
// Fetch all branches, tags, and commits
git_manager.fetch_all()?;
// Initialize compilation manager
let output_dir = args.output_dir_path();
let compilation_manager = CompilationManager::new(
git_manager.repo_root().to_string(),
output_dir.clone(),
git_manager.clone(),
args.features.clone(),
)?;
// Initialize node manager
let mut node_manager = NodeManager::new(&args);
let benchmark_runner = BenchmarkRunner::new(&args);
let mut comparison_generator = ComparisonGenerator::new(&args);
// Set the comparison directory in node manager to align with results directory
node_manager.set_comparison_dir(comparison_generator.get_output_dir());
// Store original git state for restoration
let original_ref = git_manager.get_current_ref()?;
info!("Current git reference: {}", original_ref);
// Validate git state
if !args.skip_git_validation {
git_manager.validate_clean_state()?;
git_manager.validate_refs(&[&args.baseline_ref, &args.feature_ref])?;
}
// Validate RPC endpoint chain ID matches the specified chain
let rpc_url = args.get_rpc_url();
validate_rpc_chain_id(&rpc_url, &args.chain).await?;
// Setup signal handling for cleanup
let git_manager_cleanup = git_manager.clone();
let original_ref_cleanup = original_ref.clone();
ctrlc::set_handler(move || {
eprintln!("Received interrupt signal, cleaning up...");
// Send SIGTERM to entire process group to ensure all children exit
#[cfg(unix)]
{
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
// Send SIGTERM to our process group (negative PID = process group)
let current_pid = std::process::id() as i32;
let pgid = Pid::from_raw(-current_pid);
if let Err(e) = kill(pgid, Signal::SIGTERM) {
eprintln!("Failed to send SIGTERM to process group: {e}");
}
}
// Give a moment for any ongoing git operations to complete
std::thread::sleep(std::time::Duration::from_millis(200));
if let Err(e) = git_manager_cleanup.switch_ref(&original_ref_cleanup) {
eprintln!("Failed to restore original git reference: {e}");
eprintln!("You may need to manually run: git checkout {original_ref_cleanup}");
}
std::process::exit(1);
})?;
let result = run_benchmark_workflow(
&git_manager,
&compilation_manager,
&mut node_manager,
&benchmark_runner,
&mut comparison_generator,
&args,
)
.await;
// Always restore original git reference
info!("Restoring original git reference: {}", original_ref);
git_manager.switch_ref(&original_ref)?;
// Handle any errors from the workflow
result?;
Ok(())
}
/// Parse a string of arguments into a vector of strings
fn parse_args_string(args_str: &str) -> Vec<String> {
shlex::split(args_str).unwrap_or_else(|| {
// Fallback to simple whitespace splitting if shlex fails
args_str.split_whitespace().map(|s| s.to_string()).collect()
})
}
/// Run compilation phase for both baseline and feature binaries
async fn run_compilation_phase(
git_manager: &GitManager,
compilation_manager: &CompilationManager,
args: &Args,
is_optimism: bool,
) -> Result<(String, String)> {
info!("=== Running compilation phase ===");
// Ensure required tools are available (only need to check once)
compilation_manager.ensure_reth_bench_available()?;
if args.profile {
compilation_manager.ensure_samply_available()?;
}
let refs = [&args.baseline_ref, &args.feature_ref];
let ref_types = ["baseline", "feature"];
// First, resolve all refs to commits using a HashMap to avoid race conditions where a ref is
// pushed to mid-run.
let mut ref_commits = std::collections::HashMap::new();
for &git_ref in &refs {
if !ref_commits.contains_key(git_ref) {
git_manager.switch_ref(git_ref)?;
let commit = git_manager.get_current_commit()?;
ref_commits.insert(git_ref.clone(), commit);
info!("Reference {} resolves to commit: {}", git_ref, &ref_commits[git_ref][..8]);
}
}
// Now compile each ref using the resolved commits
for (i, &git_ref) in refs.iter().enumerate() {
let ref_type = ref_types[i];
let commit = &ref_commits[git_ref];
info!(
"Compiling {} binary for reference: {} (commit: {})",
ref_type,
git_ref,
&commit[..8]
);
// Switch to target reference
git_manager.switch_ref(git_ref)?;
// Compile reth (with caching)
compilation_manager.compile_reth(commit, is_optimism)?;
info!("Completed compilation for {} reference", ref_type);
}
let baseline_commit = ref_commits[&args.baseline_ref].clone();
let feature_commit = ref_commits[&args.feature_ref].clone();
info!("Compilation phase completed");
Ok((baseline_commit, feature_commit))
}
/// Run warmup phase to warm up caches before benchmarking
async fn run_warmup_phase(
git_manager: &GitManager,
compilation_manager: &CompilationManager,
node_manager: &mut NodeManager,
benchmark_runner: &BenchmarkRunner,
args: &Args,
is_optimism: bool,
baseline_commit: &str,
) -> Result<()> {
info!("=== Running warmup phase ===");
// Use baseline for warmup
let warmup_ref = &args.baseline_ref;
// Switch to baseline reference
git_manager.switch_ref(warmup_ref)?;
// Get the cached binary path for baseline (should already be compiled)
let binary_path =
compilation_manager.get_cached_binary_path_for_commit(baseline_commit, is_optimism);
// Verify the cached binary exists
if !binary_path.exists() {
return Err(eyre!(
"Cached baseline binary not found at {:?}. Compilation phase should have created it.",
binary_path
));
}
info!("Using cached baseline binary for warmup (commit: {})", &baseline_commit[..8]);
// Build additional args with conditional --debug.startup-sync-state-idle flag
let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
// Start reth node for warmup
let mut node_process =
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
// Wait for node to be ready and get its current tip
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
info!("Warmup node is ready at tip: {}", current_tip);
// Store the tip we'll unwind back to
let original_tip = current_tip;
// Clear filesystem caches before warmup run only (unless disabled)
if args.no_clear_cache {
info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
} else {
BenchmarkRunner::clear_fs_caches().await?;
}
// Run warmup to warm up caches
benchmark_runner.run_warmup(current_tip).await?;
// Stop node before unwinding (node must be stopped to release database lock)
node_manager.stop_node(&mut node_process).await?;
// Unwind back to starting block after warmup
node_manager.unwind_to_block(original_tip).await?;
info!("Warmup phase completed");
Ok(())
}
/// Execute the complete benchmark workflow for both branches
async fn run_benchmark_workflow(
git_manager: &GitManager,
compilation_manager: &CompilationManager,
node_manager: &mut NodeManager,
benchmark_runner: &BenchmarkRunner,
comparison_generator: &mut ComparisonGenerator,
args: &Args,
) -> Result<()> {
// Detect if this is an Optimism chain once at the beginning
let rpc_url = args.get_rpc_url();
let is_optimism = compilation_manager.detect_optimism_chain(&rpc_url).await?;
// Run compilation phase for both binaries
let (baseline_commit, feature_commit) =
run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
// Run warmup phase before benchmarking (skip if warmup_blocks is 0)
if args.get_warmup_blocks() > 0 {
run_warmup_phase(
git_manager,
compilation_manager,
node_manager,
benchmark_runner,
args,
is_optimism,
&baseline_commit,
)
.await?;
} else {
info!("Skipping warmup phase (warmup_blocks is 0)");
}
let refs = [&args.baseline_ref, &args.feature_ref];
let ref_types = ["baseline", "feature"];
let commits = [&baseline_commit, &feature_commit];
for (i, &git_ref) in refs.iter().enumerate() {
let ref_type = ref_types[i];
let commit = commits[i];
info!("=== Processing {} reference: {} ===", ref_type, git_ref);
// Switch to target reference
git_manager.switch_ref(git_ref)?;
// Get the cached binary path for this git reference (should already be compiled)
let binary_path =
compilation_manager.get_cached_binary_path_for_commit(commit, is_optimism);
// Verify the cached binary exists
if !binary_path.exists() {
return Err(eyre!(
"Cached {} binary not found at {:?}. Compilation phase should have created it.",
ref_type,
binary_path
));
}
info!("Using cached {} binary (commit: {})", ref_type, &commit[..8]);
// Get reference-specific base arguments string
let base_args_str = match ref_type {
"baseline" => args.baseline_args.as_ref(),
"feature" => args.feature_args.as_ref(),
_ => None,
};
// Build additional args with conditional --debug.startup-sync-state-idle flag
let additional_args = args.build_additional_args(ref_type, base_args_str);
// Start reth node
let mut node_process =
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
// Wait for node to be ready and get its current tip (wherever it is)
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
info!("Node is ready at tip: {}", current_tip);
// Store the tip we'll unwind back to
let original_tip = current_tip;
// Calculate benchmark range
// Note: reth-bench has an off-by-one error where it consumes the first block
// of the range, so we add 1 to compensate and get exactly args.blocks blocks
let from_block = original_tip;
let to_block = original_tip + args.blocks;
// Run benchmark
let output_dir = comparison_generator.get_ref_output_dir(ref_type);
// Capture start timestamp for the benchmark run
let benchmark_start = chrono::Utc::now();
// Run benchmark (comparison logic is handled separately by ComparisonGenerator)
benchmark_runner.run_benchmark(from_block, to_block, &output_dir).await?;
// Capture end timestamp for the benchmark run
let benchmark_end = chrono::Utc::now();
// Stop node
node_manager.stop_node(&mut node_process).await?;
// Unwind back to original tip
node_manager.unwind_to_block(original_tip).await?;
// Store results for comparison
comparison_generator.add_ref_results(ref_type, &output_dir)?;
// Set the benchmark run timestamps
comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
info!("Completed {} reference benchmark", ref_type);
}
// Generate comparison report
comparison_generator.generate_comparison_report().await?;
// Generate charts if requested
if args.draw {
generate_comparison_charts(comparison_generator).await?;
}
// Start samply servers if profiling was enabled
if args.profile {
start_samply_servers(args).await?;
}
Ok(())
}
/// Generate comparison charts using the Python script
async fn generate_comparison_charts(comparison_generator: &ComparisonGenerator) -> Result<()> {
info!("Generating comparison charts with Python script...");
let baseline_output_dir = comparison_generator.get_ref_output_dir("baseline");
let feature_output_dir = comparison_generator.get_ref_output_dir("feature");
let baseline_csv = baseline_output_dir.join("combined_latency.csv");
let feature_csv = feature_output_dir.join("combined_latency.csv");
// Check if CSV files exist
if !baseline_csv.exists() {
return Err(eyre!("Baseline CSV not found: {:?}", baseline_csv));
}
if !feature_csv.exists() {
return Err(eyre!("Feature CSV not found: {:?}", feature_csv));
}
let output_dir = comparison_generator.get_output_dir();
let chart_output = output_dir.join("latency_comparison.png");
let script_path = "bin/reth-bench/scripts/compare_newpayload_latency.py";
info!("Running Python comparison script with uv...");
let mut cmd = Command::new("uv");
cmd.args([
"run",
script_path,
&baseline_csv.to_string_lossy(),
&feature_csv.to_string_lossy(),
"-o",
&chart_output.to_string_lossy(),
]);
// Set process group for consistent signal handling
#[cfg(unix)]
{
cmd.process_group(0);
}
let output = cmd.output().await.map_err(|e| {
eyre!("Failed to execute Python script with uv: {}. Make sure uv is installed.", e)
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
return Err(eyre!(
"Python script failed with exit code {:?}:\nstdout: {}\nstderr: {}",
output.status.code(),
stdout,
stderr
));
}
let stdout = String::from_utf8_lossy(&output.stdout);
if !stdout.trim().is_empty() {
info!("Python script output:\n{}", stdout);
}
info!("Comparison chart generated: {:?}", chart_output);
Ok(())
}
/// Start samply servers for viewing profiles
async fn start_samply_servers(args: &Args) -> Result<()> {
info!("Starting samply servers for profile viewing...");
let output_dir = args.output_dir_path();
let profiles_dir = output_dir.join("profiles");
// Build profile paths
let baseline_profile = profiles_dir.join("baseline.json.gz");
let feature_profile = profiles_dir.join("feature.json.gz");
// Check if profiles exist
if !baseline_profile.exists() {
warn!("Baseline profile not found: {:?}", baseline_profile);
return Ok(());
}
if !feature_profile.exists() {
warn!("Feature profile not found: {:?}", feature_profile);
return Ok(());
}
// Find two consecutive available ports starting from 3000
let (baseline_port, feature_port) = find_consecutive_ports(3000)?;
info!("Found available ports: {} and {}", baseline_port, feature_port);
// Get samply path
let samply_path = get_samply_path().await?;
// Start baseline server
info!("Starting samply server for baseline '{}' on port {}", args.baseline_ref, baseline_port);
let mut baseline_cmd = Command::new(&samply_path);
baseline_cmd
.args(["load", "--port", &baseline_port.to_string(), &baseline_profile.to_string_lossy()])
.kill_on_drop(true);
// Set process group for consistent signal handling
#[cfg(unix)]
{
baseline_cmd.process_group(0);
}
// Conditionally pipe output based on log level
if tracing::enabled!(tracing::Level::DEBUG) {
baseline_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
} else {
baseline_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
}
// Debug log the command
debug!("Executing samply load command: {:?}", baseline_cmd);
let mut baseline_child =
baseline_cmd.spawn().wrap_err("Failed to start samply server for baseline")?;
// Stream baseline samply output if debug logging is enabled
if tracing::enabled!(tracing::Level::DEBUG) {
if let Some(stdout) = baseline_child.stdout.take() {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[SAMPLY-BASELINE] {}", line);
}
});
}
if let Some(stderr) = baseline_child.stderr.take() {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[SAMPLY-BASELINE] {}", line);
}
});
}
}
// Start feature server
info!("Starting samply server for feature '{}' on port {}", args.feature_ref, feature_port);
let mut feature_cmd = Command::new(&samply_path);
feature_cmd
.args(["load", "--port", &feature_port.to_string(), &feature_profile.to_string_lossy()])
.kill_on_drop(true);
// Set process group for consistent signal handling
#[cfg(unix)]
{
feature_cmd.process_group(0);
}
// Conditionally pipe output based on log level
if tracing::enabled!(tracing::Level::DEBUG) {
feature_cmd.stdout(std::process::Stdio::piped()).stderr(std::process::Stdio::piped());
} else {
feature_cmd.stdout(std::process::Stdio::null()).stderr(std::process::Stdio::null());
}
// Debug log the command
debug!("Executing samply load command: {:?}", feature_cmd);
let mut feature_child =
feature_cmd.spawn().wrap_err("Failed to start samply server for feature")?;
// Stream feature samply output if debug logging is enabled
if tracing::enabled!(tracing::Level::DEBUG) {
if let Some(stdout) = feature_child.stdout.take() {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[SAMPLY-FEATURE] {}", line);
}
});
}
if let Some(stderr) = feature_child.stderr.take() {
tokio::spawn(async move {
use tokio::io::{AsyncBufReadExt, BufReader};
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[SAMPLY-FEATURE] {}", line);
}
});
}
}
// Give servers time to start
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Print access information
println!("\n=== SAMPLY PROFILE SERVERS STARTED ===");
println!("Baseline '{}': http://127.0.0.1:{}", args.baseline_ref, baseline_port);
println!("Feature '{}': http://127.0.0.1:{}", args.feature_ref, feature_port);
println!("\nOpen the URLs in your browser to view the profiles.");
println!("Press Ctrl+C to stop the servers and exit.");
println!("=========================================\n");
// Wait for Ctrl+C or process termination
let ctrl_c = tokio::signal::ctrl_c();
let baseline_wait = baseline_child.wait();
let feature_wait = feature_child.wait();
tokio::select! {
_ = ctrl_c => {
info!("Received Ctrl+C, shutting down samply servers...");
}
result = baseline_wait => {
match result {
Ok(status) => info!("Baseline samply server exited with status: {}", status),
Err(e) => warn!("Baseline samply server error: {}", e),
}
}
result = feature_wait => {
match result {
Ok(status) => info!("Feature samply server exited with status: {}", status),
Err(e) => warn!("Feature samply server error: {}", e),
}
}
}
// Ensure both processes are terminated
let _ = baseline_child.kill().await;
let _ = feature_child.kill().await;
info!("Samply servers stopped.");
Ok(())
}
/// Find two consecutive available ports starting from the given port
fn find_consecutive_ports(start_port: u16) -> Result<(u16, u16)> {
for port in start_port..=65533 {
// Check if both port and port+1 are available
if is_port_available(port) && is_port_available(port + 1) {
return Ok((port, port + 1));
}
}
Err(eyre!("Could not find two consecutive available ports starting from {}", start_port))
}
/// Check if a port is available by attempting to bind to it
fn is_port_available(port: u16) -> bool {
TcpListener::bind(("127.0.0.1", port)).is_ok()
}
/// Get the absolute path to samply using 'which' command
async fn get_samply_path() -> Result<String> {
let output = Command::new("which")
.arg("samply")
.output()
.await
.wrap_err("Failed to execute 'which samply' command")?;
if !output.status.success() {
return Err(eyre!("samply not found in PATH"));
}
let samply_path = String::from_utf8(output.stdout)
.wrap_err("samply path is not valid UTF-8")?
.trim()
.to_string();
if samply_path.is_empty() {
return Err(eyre!("which samply returned empty path"));
}
Ok(samply_path)
}

View File

@@ -0,0 +1,484 @@
//! Results comparison and report generation.
use crate::cli::Args;
use chrono::{DateTime, Utc};
use csv::Reader;
use eyre::{eyre, Result, WrapErr};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
};
use tracing::{info, warn};
/// Manages comparison between baseline and feature reference results
pub(crate) struct ComparisonGenerator {
output_dir: PathBuf,
timestamp: String,
baseline_ref_name: String,
feature_ref_name: String,
baseline_results: Option<BenchmarkResults>,
feature_results: Option<BenchmarkResults>,
}
/// Represents the results from a single benchmark run
#[derive(Debug, Clone)]
pub(crate) struct BenchmarkResults {
pub ref_name: String,
pub combined_latency_data: Vec<CombinedLatencyRow>,
pub summary: BenchmarkSummary,
pub start_timestamp: Option<DateTime<Utc>>,
pub end_timestamp: Option<DateTime<Utc>>,
}
/// Combined latency CSV row structure
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct CombinedLatencyRow {
pub block_number: u64,
pub gas_used: u64,
pub new_payload_latency: u128,
}
/// Total gas CSV row structure
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct TotalGasRow {
pub block_number: u64,
pub gas_used: u64,
pub time: u128,
}
/// Summary statistics for a benchmark run
#[derive(Debug, Clone, Serialize)]
pub(crate) struct BenchmarkSummary {
pub total_blocks: u64,
pub total_gas_used: u64,
pub total_duration_ms: u128,
pub avg_new_payload_latency_ms: f64,
pub gas_per_second: f64,
pub blocks_per_second: f64,
}
/// Comparison report between two benchmark runs
#[derive(Debug, Serialize)]
pub(crate) struct ComparisonReport {
pub timestamp: String,
pub baseline: RefInfo,
pub feature: RefInfo,
pub comparison_summary: ComparisonSummary,
pub per_block_comparisons: Vec<BlockComparison>,
}
/// Information about a reference in the comparison
#[derive(Debug, Serialize)]
pub(crate) struct RefInfo {
pub ref_name: String,
pub summary: BenchmarkSummary,
pub start_timestamp: Option<DateTime<Utc>>,
pub end_timestamp: Option<DateTime<Utc>>,
}
/// Summary of the comparison between references
#[derive(Debug, Serialize)]
pub(crate) struct ComparisonSummary {
pub new_payload_latency_change_percent: f64,
pub gas_per_second_change_percent: f64,
pub blocks_per_second_change_percent: f64,
}
/// Per-block comparison data
#[derive(Debug, Serialize)]
pub(crate) struct BlockComparison {
pub block_number: u64,
pub baseline_new_payload_latency: u128,
pub feature_new_payload_latency: u128,
pub new_payload_latency_change_percent: f64,
}
impl ComparisonGenerator {
/// Create a new comparison generator
pub(crate) fn new(args: &Args) -> Self {
let now: DateTime<Utc> = Utc::now();
let timestamp = now.format("%Y%m%d_%H%M%S").to_string();
Self {
output_dir: args.output_dir_path(),
timestamp,
baseline_ref_name: args.baseline_ref.clone(),
feature_ref_name: args.feature_ref.clone(),
baseline_results: None,
feature_results: None,
}
}
/// Get the output directory for a specific reference
pub(crate) fn get_ref_output_dir(&self, ref_type: &str) -> PathBuf {
self.output_dir.join("results").join(&self.timestamp).join(ref_type)
}
/// Get the main output directory for this comparison run
pub(crate) fn get_output_dir(&self) -> PathBuf {
self.output_dir.join("results").join(&self.timestamp)
}
/// Add benchmark results for a reference
pub(crate) fn add_ref_results(&mut self, ref_type: &str, output_path: &Path) -> Result<()> {
let ref_name = match ref_type {
"baseline" => &self.baseline_ref_name,
"feature" => &self.feature_ref_name,
_ => return Err(eyre!("Unknown reference type: {}", ref_type)),
};
let results = self.load_benchmark_results(ref_name, output_path)?;
match ref_type {
"baseline" => self.baseline_results = Some(results),
"feature" => self.feature_results = Some(results),
_ => return Err(eyre!("Unknown reference type: {}", ref_type)),
}
info!("Loaded benchmark results for {} reference", ref_type);
Ok(())
}
/// Set the benchmark run timestamps for a reference
pub(crate) fn set_ref_timestamps(
&mut self,
ref_type: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<()> {
match ref_type {
"baseline" => {
if let Some(ref mut results) = self.baseline_results {
results.start_timestamp = Some(start);
results.end_timestamp = Some(end);
} else {
return Err(eyre!("Baseline results not loaded yet"));
}
}
"feature" => {
if let Some(ref mut results) = self.feature_results {
results.start_timestamp = Some(start);
results.end_timestamp = Some(end);
} else {
return Err(eyre!("Feature results not loaded yet"));
}
}
_ => return Err(eyre!("Unknown reference type: {}", ref_type)),
}
Ok(())
}
/// Generate the final comparison report
pub(crate) async fn generate_comparison_report(&self) -> Result<()> {
info!("Generating comparison report...");
let baseline =
self.baseline_results.as_ref().ok_or_else(|| eyre!("Baseline results not loaded"))?;
let feature =
self.feature_results.as_ref().ok_or_else(|| eyre!("Feature results not loaded"))?;
// Generate comparison
let comparison_summary =
self.calculate_comparison_summary(&baseline.summary, &feature.summary)?;
let per_block_comparisons = self.calculate_per_block_comparisons(baseline, feature)?;
let report = ComparisonReport {
timestamp: self.timestamp.clone(),
baseline: RefInfo {
ref_name: baseline.ref_name.clone(),
summary: baseline.summary.clone(),
start_timestamp: baseline.start_timestamp,
end_timestamp: baseline.end_timestamp,
},
feature: RefInfo {
ref_name: feature.ref_name.clone(),
summary: feature.summary.clone(),
start_timestamp: feature.start_timestamp,
end_timestamp: feature.end_timestamp,
},
comparison_summary,
per_block_comparisons,
};
// Write reports
self.write_comparison_reports(&report).await?;
// Print summary to console
self.print_comparison_summary(&report);
Ok(())
}
/// Load benchmark results from CSV files
fn load_benchmark_results(
&self,
ref_name: &str,
output_path: &Path,
) -> Result<BenchmarkResults> {
let combined_latency_path = output_path.join("combined_latency.csv");
let total_gas_path = output_path.join("total_gas.csv");
let combined_latency_data = self.load_combined_latency_csv(&combined_latency_path)?;
let total_gas_data = self.load_total_gas_csv(&total_gas_path)?;
let summary = self.calculate_summary(&combined_latency_data, &total_gas_data)?;
Ok(BenchmarkResults {
ref_name: ref_name.to_string(),
combined_latency_data,
summary,
start_timestamp: None,
end_timestamp: None,
})
}
/// Load combined latency CSV data
fn load_combined_latency_csv(&self, path: &Path) -> Result<Vec<CombinedLatencyRow>> {
let mut reader = Reader::from_path(path)
.wrap_err_with(|| format!("Failed to open combined latency CSV: {path:?}"))?;
let mut rows = Vec::new();
for result in reader.deserialize() {
let row: CombinedLatencyRow = result
.wrap_err_with(|| format!("Failed to parse combined latency row in {path:?}"))?;
rows.push(row);
}
if rows.is_empty() {
return Err(eyre!("No data found in combined latency CSV: {:?}", path));
}
Ok(rows)
}
/// Load total gas CSV data
fn load_total_gas_csv(&self, path: &Path) -> Result<Vec<TotalGasRow>> {
let mut reader = Reader::from_path(path)
.wrap_err_with(|| format!("Failed to open total gas CSV: {path:?}"))?;
let mut rows = Vec::new();
for result in reader.deserialize() {
let row: TotalGasRow =
result.wrap_err_with(|| format!("Failed to parse total gas row in {path:?}"))?;
rows.push(row);
}
if rows.is_empty() {
return Err(eyre!("No data found in total gas CSV: {:?}", path));
}
Ok(rows)
}
/// Calculate summary statistics for a benchmark run
fn calculate_summary(
&self,
combined_data: &[CombinedLatencyRow],
total_gas_data: &[TotalGasRow],
) -> Result<BenchmarkSummary> {
if combined_data.is_empty() || total_gas_data.is_empty() {
return Err(eyre!("Cannot calculate summary for empty data"));
}
let total_blocks = combined_data.len() as u64;
let total_gas_used: u64 = combined_data.iter().map(|r| r.gas_used).sum();
let total_duration_ms = total_gas_data.last().unwrap().time / 1000; // Convert microseconds to milliseconds
let avg_new_payload_latency_ms: f64 =
combined_data.iter().map(|r| r.new_payload_latency as f64 / 1000.0).sum::<f64>() /
total_blocks as f64;
let total_duration_seconds = total_duration_ms as f64 / 1000.0;
let gas_per_second = if total_duration_seconds > f64::EPSILON {
total_gas_used as f64 / total_duration_seconds
} else {
0.0
};
let blocks_per_second = if total_duration_seconds > f64::EPSILON {
total_blocks as f64 / total_duration_seconds
} else {
0.0
};
Ok(BenchmarkSummary {
total_blocks,
total_gas_used,
total_duration_ms,
avg_new_payload_latency_ms,
gas_per_second,
blocks_per_second,
})
}
/// Calculate comparison summary between baseline and feature
fn calculate_comparison_summary(
&self,
baseline: &BenchmarkSummary,
feature: &BenchmarkSummary,
) -> Result<ComparisonSummary> {
let calc_percent_change = |baseline: f64, feature: f64| -> f64 {
if baseline.abs() > f64::EPSILON {
((feature - baseline) / baseline) * 100.0
} else {
0.0
}
};
Ok(ComparisonSummary {
new_payload_latency_change_percent: calc_percent_change(
baseline.avg_new_payload_latency_ms,
feature.avg_new_payload_latency_ms,
),
gas_per_second_change_percent: calc_percent_change(
baseline.gas_per_second,
feature.gas_per_second,
),
blocks_per_second_change_percent: calc_percent_change(
baseline.blocks_per_second,
feature.blocks_per_second,
),
})
}
/// Calculate per-block comparisons
fn calculate_per_block_comparisons(
&self,
baseline: &BenchmarkResults,
feature: &BenchmarkResults,
) -> Result<Vec<BlockComparison>> {
let mut baseline_map: HashMap<u64, &CombinedLatencyRow> = HashMap::new();
for row in &baseline.combined_latency_data {
baseline_map.insert(row.block_number, row);
}
let mut comparisons = Vec::new();
for feature_row in &feature.combined_latency_data {
if let Some(baseline_row) = baseline_map.get(&feature_row.block_number) {
let calc_percent_change = |baseline: u128, feature: u128| -> f64 {
if baseline > 0 {
((feature as f64 - baseline as f64) / baseline as f64) * 100.0
} else {
0.0
}
};
let comparison = BlockComparison {
block_number: feature_row.block_number,
baseline_new_payload_latency: baseline_row.new_payload_latency,
feature_new_payload_latency: feature_row.new_payload_latency,
new_payload_latency_change_percent: calc_percent_change(
baseline_row.new_payload_latency,
feature_row.new_payload_latency,
),
};
comparisons.push(comparison);
} else {
warn!("Block {} not found in baseline data", feature_row.block_number);
}
}
Ok(comparisons)
}
/// Write comparison reports to files
async fn write_comparison_reports(&self, report: &ComparisonReport) -> Result<()> {
let report_dir = self.output_dir.join("results").join(&self.timestamp);
fs::create_dir_all(&report_dir)
.wrap_err_with(|| format!("Failed to create report directory: {report_dir:?}"))?;
// Write JSON report
let json_path = report_dir.join("comparison_report.json");
let json_content = serde_json::to_string_pretty(report)
.wrap_err("Failed to serialize comparison report to JSON")?;
fs::write(&json_path, json_content)
.wrap_err_with(|| format!("Failed to write JSON report: {json_path:?}"))?;
// Write CSV report for per-block comparisons
let csv_path = report_dir.join("per_block_comparison.csv");
let mut writer = csv::Writer::from_path(&csv_path)
.wrap_err_with(|| format!("Failed to create CSV writer: {csv_path:?}"))?;
for comparison in &report.per_block_comparisons {
writer.serialize(comparison).wrap_err("Failed to write comparison row to CSV")?;
}
writer.flush().wrap_err("Failed to flush CSV writer")?;
info!("Comparison reports written to: {:?}", report_dir);
Ok(())
}
/// Print comparison summary to console
fn print_comparison_summary(&self, report: &ComparisonReport) {
// Parse and format timestamp nicely
let formatted_timestamp = if let Ok(dt) = chrono::DateTime::parse_from_str(
&format!("{} +0000", report.timestamp.replace('_', " ")),
"%Y%m%d %H%M%S %z",
) {
dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
} else {
// Fallback to original if parsing fails
report.timestamp.clone()
};
println!("\n=== BENCHMARK COMPARISON SUMMARY ===");
println!("Timestamp: {formatted_timestamp}");
println!("Baseline: {}", report.baseline.ref_name);
println!("Feature: {}", report.feature.ref_name);
println!();
let summary = &report.comparison_summary;
println!("Performance Changes:");
println!(" NewPayload Latency: {:+.2}%", summary.new_payload_latency_change_percent);
println!(" Gas/Second: {:+.2}%", summary.gas_per_second_change_percent);
println!(" Blocks/Second: {:+.2}%", summary.blocks_per_second_change_percent);
println!();
println!("Baseline Summary:");
let baseline = &report.baseline.summary;
println!(
" Blocks: {}, Gas: {}, Duration: {:.2}s",
baseline.total_blocks,
baseline.total_gas_used,
baseline.total_duration_ms as f64 / 1000.0
);
println!(" Avg NewPayload: {:.2}ms", baseline.avg_new_payload_latency_ms);
if let (Some(start), Some(end)) =
(&report.baseline.start_timestamp, &report.baseline.end_timestamp)
{
println!(
" Started: {}, Ended: {}",
start.format("%Y-%m-%d %H:%M:%S UTC"),
end.format("%Y-%m-%d %H:%M:%S UTC")
);
}
println!();
println!("Feature Summary:");
let feature = &report.feature.summary;
println!(
" Blocks: {}, Gas: {}, Duration: {:.2}s",
feature.total_blocks,
feature.total_gas_used,
feature.total_duration_ms as f64 / 1000.0
);
println!(" Avg NewPayload: {:.2}ms", feature.avg_new_payload_latency_ms);
if let (Some(start), Some(end)) =
(&report.feature.start_timestamp, &report.feature.end_timestamp)
{
println!(
" Started: {}, Ended: {}",
start.format("%Y-%m-%d %H:%M:%S UTC"),
end.format("%Y-%m-%d %H:%M:%S UTC")
);
}
println!();
}
}

View File

@@ -0,0 +1,354 @@
//! Compilation operations for reth and reth-bench.
use crate::git::GitManager;
use alloy_primitives::address;
use alloy_provider::{Provider, ProviderBuilder};
use eyre::{eyre, Result, WrapErr};
use std::{fs, path::PathBuf, process::Command};
use tracing::{debug, error, info, warn};
/// Manages compilation operations for reth components
#[derive(Debug)]
pub(crate) struct CompilationManager {
repo_root: String,
output_dir: PathBuf,
git_manager: GitManager,
features: String,
}
impl CompilationManager {
/// Create a new `CompilationManager`
pub(crate) const fn new(
repo_root: String,
output_dir: PathBuf,
git_manager: GitManager,
features: String,
) -> Result<Self> {
Ok(Self { repo_root, output_dir, git_manager, features })
}
/// Detect if the RPC endpoint is an Optimism chain
pub(crate) async fn detect_optimism_chain(&self, rpc_url: &str) -> Result<bool> {
info!("Detecting chain type from RPC endpoint...");
// Create Alloy provider
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
let provider = ProviderBuilder::new().connect_http(url);
// Check for Optimism predeploy at address 0x420000000000000000000000000000000000000F
let is_optimism = !provider
.get_code_at(address!("0x420000000000000000000000000000000000000F"))
.await?
.is_empty();
if is_optimism {
info!("Detected Optimism chain");
} else {
info!("Detected Ethereum chain");
}
Ok(is_optimism)
}
/// Get the path to the cached binary using explicit commit hash
pub(crate) fn get_cached_binary_path_for_commit(
&self,
commit: &str,
is_optimism: bool,
) -> PathBuf {
let identifier = &commit[..8]; // Use first 8 chars of commit
let binary_name = if is_optimism {
format!("op-reth_{}", identifier)
} else {
format!("reth_{}", identifier)
};
self.output_dir.join("bin").join(binary_name)
}
/// Compile reth using cargo build and cache the binary
pub(crate) fn compile_reth(&self, commit: &str, is_optimism: bool) -> Result<()> {
// Validate that current git commit matches the expected commit
let current_commit = self.git_manager.get_current_commit()?;
if current_commit != commit {
return Err(eyre!(
"Git commit mismatch! Expected: {}, but currently at: {}",
&commit[..8],
&current_commit[..8]
));
}
let cached_path = self.get_cached_binary_path_for_commit(commit, is_optimism);
// Check if cached binary already exists (since path contains commit hash, it's valid)
if cached_path.exists() {
info!("Using cached binary (commit: {})", &commit[..8]);
return Ok(());
}
info!("No cached binary found, compiling (commit: {})...", &commit[..8]);
let binary_name = if is_optimism { "op-reth" } else { "reth" };
info!(
"Compiling {} with profiling configuration (commit: {})...",
binary_name,
&commit[..8]
);
let mut cmd = Command::new("cargo");
cmd.arg("build").arg("--profile").arg("profiling");
// Add features
cmd.arg("--features").arg(&self.features);
info!("Using features: {}", self.features);
// Add bin-specific arguments for optimism
if is_optimism {
cmd.arg("--bin")
.arg("op-reth")
.arg("--manifest-path")
.arg("crates/optimism/bin/Cargo.toml");
}
cmd.current_dir(&self.repo_root);
// Set RUSTFLAGS for native CPU optimization
cmd.env("RUSTFLAGS", "-C target-cpu=native");
// Debug log the command
debug!("Executing cargo command: {:?}", cmd);
let output = cmd.output().wrap_err("Failed to execute cargo build command")?;
// Print stdout and stderr with prefixes at debug level
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stdout.lines() {
if !line.trim().is_empty() {
debug!("[CARGO] {}", line);
}
}
for line in stderr.lines() {
if !line.trim().is_empty() {
debug!("[CARGO] {}", line);
}
}
if !output.status.success() {
// Print all output when compilation fails
error!("Cargo build failed with exit code: {:?}", output.status.code());
if !stdout.trim().is_empty() {
error!("Cargo stdout:");
for line in stdout.lines() {
error!(" {}", line);
}
}
if !stderr.trim().is_empty() {
error!("Cargo stderr:");
for line in stderr.lines() {
error!(" {}", line);
}
}
return Err(eyre!("Compilation failed with exit code: {:?}", output.status.code()));
}
info!("{} compilation completed", binary_name);
// Copy the compiled binary to cache
let source_path =
PathBuf::from(&self.repo_root).join(format!("target/profiling/{}", binary_name));
if !source_path.exists() {
return Err(eyre!("Compiled binary not found at {:?}", source_path));
}
// Create bin directory if it doesn't exist
let bin_dir = self.output_dir.join("bin");
fs::create_dir_all(&bin_dir).wrap_err("Failed to create bin directory")?;
// Copy binary to cache
fs::copy(&source_path, &cached_path).wrap_err("Failed to copy binary to cache")?;
// Make the cached binary executable
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&cached_path)?.permissions();
perms.set_mode(0o755);
fs::set_permissions(&cached_path, perms)?;
}
info!("Cached compiled binary at: {:?}", cached_path);
Ok(())
}
/// Check if reth-bench is available in PATH
pub(crate) fn is_reth_bench_available(&self) -> bool {
match Command::new("which").arg("reth-bench").output() {
Ok(output) => {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout);
info!("Found reth-bench: {}", path.trim());
true
} else {
false
}
}
Err(_) => false,
}
}
/// Check if samply is available in PATH
pub(crate) fn is_samply_available(&self) -> bool {
match Command::new("which").arg("samply").output() {
Ok(output) => {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout);
info!("Found samply: {}", path.trim());
true
} else {
false
}
}
Err(_) => false,
}
}
/// Install samply using cargo
pub(crate) fn install_samply(&self) -> Result<()> {
info!("Installing samply via cargo...");
let mut cmd = Command::new("cargo");
cmd.args(["install", "--locked", "samply"]);
// Debug log the command
debug!("Executing cargo command: {:?}", cmd);
let output = cmd.output().wrap_err("Failed to execute cargo install samply command")?;
// Print stdout and stderr with prefixes at debug level
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stdout.lines() {
if !line.trim().is_empty() {
debug!("[CARGO-SAMPLY] {}", line);
}
}
for line in stderr.lines() {
if !line.trim().is_empty() {
debug!("[CARGO-SAMPLY] {}", line);
}
}
if !output.status.success() {
// Print all output when installation fails
error!("Cargo install samply failed with exit code: {:?}", output.status.code());
if !stdout.trim().is_empty() {
error!("Cargo stdout:");
for line in stdout.lines() {
error!(" {}", line);
}
}
if !stderr.trim().is_empty() {
error!("Cargo stderr:");
for line in stderr.lines() {
error!(" {}", line);
}
}
return Err(eyre!(
"samply installation failed with exit code: {:?}",
output.status.code()
));
}
info!("Samply installation completed");
Ok(())
}
/// Ensure samply is available, installing if necessary
pub(crate) fn ensure_samply_available(&self) -> Result<()> {
if self.is_samply_available() {
Ok(())
} else {
warn!("samply not found in PATH, installing...");
self.install_samply()
}
}
/// Ensure reth-bench is available, compiling if necessary
pub(crate) fn ensure_reth_bench_available(&self) -> Result<()> {
if self.is_reth_bench_available() {
Ok(())
} else {
warn!("reth-bench not found in PATH, compiling and installing...");
self.compile_reth_bench()
}
}
/// Compile and install reth-bench using `make install-reth-bench`
pub(crate) fn compile_reth_bench(&self) -> Result<()> {
info!("Compiling and installing reth-bench...");
let mut cmd = Command::new("make");
cmd.arg("install-reth-bench").current_dir(&self.repo_root);
// Debug log the command
debug!("Executing make command: {:?}", cmd);
let output = cmd.output().wrap_err("Failed to execute make install-reth-bench command")?;
// Print stdout and stderr with prefixes at debug level
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stdout.lines() {
if !line.trim().is_empty() {
debug!("[MAKE-BENCH] {}", line);
}
}
for line in stderr.lines() {
if !line.trim().is_empty() {
debug!("[MAKE-BENCH] {}", line);
}
}
if !output.status.success() {
// Print all output when compilation fails
error!("Make install-reth-bench failed with exit code: {:?}", output.status.code());
if !stdout.trim().is_empty() {
error!("Make stdout:");
for line in stdout.lines() {
error!(" {}", line);
}
}
if !stderr.trim().is_empty() {
error!("Make stderr:");
for line in stderr.lines() {
error!(" {}", line);
}
}
return Err(eyre!(
"reth-bench compilation failed with exit code: {:?}",
output.status.code()
));
}
info!("Reth-bench compilation completed");
Ok(())
}
}

View File

@@ -0,0 +1,330 @@
//! Git operations for branch management.
use eyre::{eyre, Result, WrapErr};
use std::process::Command;
use tracing::{info, warn};
/// Manages git operations for branch switching
#[derive(Debug, Clone)]
pub(crate) struct GitManager {
repo_root: String,
}
impl GitManager {
/// Create a new `GitManager`, detecting the repository root
pub(crate) fn new() -> Result<Self> {
let output = Command::new("git")
.args(["rev-parse", "--show-toplevel"])
.output()
.wrap_err("Failed to execute git command - is git installed?")?;
if !output.status.success() {
return Err(eyre!("Not in a git repository or git command failed"));
}
let repo_root = String::from_utf8(output.stdout)
.wrap_err("Git output is not valid UTF-8")?
.trim()
.to_string();
let manager = Self { repo_root };
info!(
"Detected git repository at: {}, current reference: {}",
manager.repo_root(),
manager.get_current_ref()?
);
Ok(manager)
}
/// Get the current git branch name
pub(crate) fn get_current_branch(&self) -> Result<String> {
let output = Command::new("git")
.args(["branch", "--show-current"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to get current branch")?;
if !output.status.success() {
return Err(eyre!("Failed to determine current branch"));
}
let branch = String::from_utf8(output.stdout)
.wrap_err("Branch name is not valid UTF-8")?
.trim()
.to_string();
if branch.is_empty() {
return Err(eyre!("Not on a named branch (detached HEAD?)"));
}
Ok(branch)
}
/// Get the current git reference (branch name, tag, or commit hash)
pub(crate) fn get_current_ref(&self) -> Result<String> {
// First try to get branch name
if let Ok(branch) = self.get_current_branch() {
return Ok(branch);
}
// If not on a branch, check if we're on a tag
let tag_output = Command::new("git")
.args(["describe", "--exact-match", "--tags", "HEAD"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to check for tag")?;
if tag_output.status.success() {
let tag = String::from_utf8(tag_output.stdout)
.wrap_err("Tag name is not valid UTF-8")?
.trim()
.to_string();
return Ok(tag);
}
// If not on a branch or tag, return the commit hash
let commit_output = Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to get current commit")?;
if !commit_output.status.success() {
return Err(eyre!("Failed to get current commit hash"));
}
let commit_hash = String::from_utf8(commit_output.stdout)
.wrap_err("Commit hash is not valid UTF-8")?
.trim()
.to_string();
Ok(commit_hash)
}
/// Check if the git working directory has uncommitted changes to tracked files
pub(crate) fn validate_clean_state(&self) -> Result<()> {
let output = Command::new("git")
.args(["status", "--porcelain"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to check git status")?;
if !output.status.success() {
return Err(eyre!("Git status command failed"));
}
let status_output =
String::from_utf8(output.stdout).wrap_err("Git status output is not valid UTF-8")?;
// Check for uncommitted changes to tracked files
// Status codes: M = modified, A = added, D = deleted, R = renamed, C = copied, U = updated
// ?? = untracked files (we want to ignore these)
let has_uncommitted_changes = status_output.lines().any(|line| {
if line.len() >= 2 {
let status = &line[0..2];
// Ignore untracked files (??) and ignored files (!!)
!matches!(status, "??" | "!!")
} else {
false
}
});
if has_uncommitted_changes {
warn!("Git working directory has uncommitted changes to tracked files:");
for line in status_output.lines() {
if line.len() >= 2 && !matches!(&line[0..2], "??" | "!!") {
warn!(" {}", line);
}
}
return Err(eyre!(
"Git working directory has uncommitted changes to tracked files. Please commit or stash changes before running benchmark comparison."
));
}
// Check if there are untracked files and log them as info
let untracked_files: Vec<&str> =
status_output.lines().filter(|line| line.starts_with("??")).collect();
if !untracked_files.is_empty() {
info!(
"Git working directory has {} untracked files (this is OK)",
untracked_files.len()
);
}
info!("Git working directory is clean (no uncommitted changes to tracked files)");
Ok(())
}
/// Fetch all refs from remote to ensure we have latest branches and tags
pub(crate) fn fetch_all(&self) -> Result<()> {
let output = Command::new("git")
.args(["fetch", "--all", "--tags", "--quiet", "--force"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to fetch latest refs")?;
if output.status.success() {
info!("Fetched latest refs");
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
// Only warn if there's actual error content, not just fetch progress
if !stderr.trim().is_empty() && !stderr.contains("-> origin/") {
warn!("Git fetch encountered issues (continuing anyway): {}", stderr);
}
}
Ok(())
}
/// Validate that the specified git references exist (branches, tags, or commits)
pub(crate) fn validate_refs(&self, refs: &[&str]) -> Result<()> {
for &git_ref in refs {
// Try branch first, then tag, then commit
let branch_check = Command::new("git")
.args(["rev-parse", "--verify", &format!("refs/heads/{git_ref}")])
.current_dir(&self.repo_root)
.output();
let tag_check = Command::new("git")
.args(["rev-parse", "--verify", &format!("refs/tags/{git_ref}")])
.current_dir(&self.repo_root)
.output();
let commit_check = Command::new("git")
.args(["rev-parse", "--verify", &format!("{git_ref}^{{commit}}")])
.current_dir(&self.repo_root)
.output();
let found = if let Ok(output) = branch_check &&
output.status.success()
{
info!("Validated branch exists: {}", git_ref);
true
} else if let Ok(output) = tag_check &&
output.status.success()
{
info!("Validated tag exists: {}", git_ref);
true
} else if let Ok(output) = commit_check &&
output.status.success()
{
info!("Validated commit exists: {}", git_ref);
true
} else {
false
};
if !found {
return Err(eyre!(
"Git reference '{}' does not exist as branch, tag, or commit",
git_ref
));
}
}
Ok(())
}
/// Switch to the specified git reference (branch, tag, or commit)
pub(crate) fn switch_ref(&self, git_ref: &str) -> Result<()> {
// First checkout the reference
let output = Command::new("git")
.args(["checkout", git_ref])
.current_dir(&self.repo_root)
.output()
.wrap_err_with(|| format!("Failed to switch to reference '{git_ref}'"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(eyre!("Failed to switch to reference '{}': {}", git_ref, stderr));
}
// Check if this is a branch that tracks a remote and pull latest changes
let is_branch = Command::new("git")
.args(["show-ref", "--verify", "--quiet", &format!("refs/heads/{git_ref}")])
.current_dir(&self.repo_root)
.status()
.map(|s| s.success())
.unwrap_or(false);
if is_branch {
// Check if the branch tracks a remote
let tracking_output = Command::new("git")
.args([
"rev-parse",
"--abbrev-ref",
"--symbolic-full-name",
&format!("{git_ref}@{{upstream}}"),
])
.current_dir(&self.repo_root)
.output();
if let Ok(output) = tracking_output &&
output.status.success()
{
let upstream = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !upstream.is_empty() && upstream != format!("{git_ref}@{{upstream}}") {
// Branch tracks a remote, pull latest changes
info!("Pulling latest changes for branch: {}", git_ref);
let pull_output = Command::new("git")
.args(["pull", "--ff-only"])
.current_dir(&self.repo_root)
.output()
.wrap_err_with(|| {
format!("Failed to pull latest changes for branch '{git_ref}'")
})?;
if pull_output.status.success() {
info!("Successfully pulled latest changes for branch: {}", git_ref);
} else {
let stderr = String::from_utf8_lossy(&pull_output.stderr);
warn!("Failed to pull latest changes for branch '{}': {}", git_ref, stderr);
// Continue anyway, we'll use whatever version we have
}
}
}
}
// Verify the checkout succeeded by checking the current commit
let current_commit_output = Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to get current commit")?;
if !current_commit_output.status.success() {
return Err(eyre!("Failed to verify git checkout"));
}
info!("Switched to reference: {}", git_ref);
Ok(())
}
/// Get the current commit hash
pub(crate) fn get_current_commit(&self) -> Result<String> {
let output = Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(&self.repo_root)
.output()
.wrap_err("Failed to get current commit")?;
if !output.status.success() {
return Err(eyre!("Failed to get current commit hash"));
}
let commit_hash = String::from_utf8(output.stdout)
.wrap_err("Commit hash is not valid UTF-8")?
.trim()
.to_string();
Ok(commit_hash)
}
/// Get the repository root path
pub(crate) fn repo_root(&self) -> &str {
&self.repo_root
}
}

View File

@@ -0,0 +1,45 @@
//! # reth-bench-compare
//!
//! Automated tool for comparing reth performance between two git branches.
//! This tool automates the complete workflow of compiling, running, and benchmarking
//! reth on different branches to provide meaningful performance comparisons.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#[global_allocator]
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
mod benchmark;
mod cli;
mod comparison;
mod compilation;
mod git;
mod node;
use clap::Parser;
use cli::{run_comparison, Args};
use eyre::Result;
use reth_cli_runner::CliRunner;
fn main() -> Result<()> {
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
if std::env::var_os("RUST_BACKTRACE").is_none() {
unsafe {
std::env::set_var("RUST_BACKTRACE", "1");
}
}
let args = Args::parse();
// Initialize tracing
let _guard = args.init_tracing()?;
// Run until either exit or sigint or sigterm
let runner = CliRunner::try_default_runtime()?;
runner.run_command_until_exit(|ctx| run_comparison(args, ctx))
}

View File

@@ -0,0 +1,511 @@
//! Node management for starting, stopping, and controlling reth instances.
use crate::cli::Args;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::SyncStatus;
use eyre::{eyre, OptionExt, Result, WrapErr};
#[cfg(unix)]
use nix::sys::signal::{killpg, Signal};
#[cfg(unix)]
use nix::unistd::Pid;
use reth_chainspec::Chain;
use std::{fs, path::PathBuf, time::Duration};
use tokio::{
fs::File as AsyncFile,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader},
process::Command,
time::{sleep, timeout},
};
use tracing::{debug, info, warn};
/// Manages reth node lifecycle and operations
pub(crate) struct NodeManager {
datadir: Option<String>,
metrics_port: u16,
chain: Chain,
use_sudo: bool,
binary_path: Option<std::path::PathBuf>,
enable_profiling: bool,
output_dir: PathBuf,
additional_reth_args: Vec<String>,
comparison_dir: Option<PathBuf>,
}
impl NodeManager {
/// Create a new `NodeManager` with configuration from CLI args
pub(crate) fn new(args: &Args) -> Self {
Self {
datadir: Some(args.datadir_path().to_string_lossy().to_string()),
metrics_port: args.metrics_port,
chain: args.chain,
use_sudo: args.sudo,
binary_path: None,
enable_profiling: args.profile,
output_dir: args.output_dir_path(),
additional_reth_args: args.reth_args.clone(),
comparison_dir: None,
}
}
/// Set the comparison directory path for logging
pub(crate) fn set_comparison_dir(&mut self, dir: PathBuf) {
self.comparison_dir = Some(dir);
}
/// Get the log file path for a given reference type
fn get_log_file_path(&self, ref_type: &str) -> Result<PathBuf> {
let comparison_dir = self
.comparison_dir
.as_ref()
.ok_or_eyre("Comparison directory not set. Call set_comparison_dir first.")?;
// The comparison directory already contains the full path to results/<timestamp>
let log_dir = comparison_dir.join(ref_type);
// Create the directory if it doesn't exist
fs::create_dir_all(&log_dir)
.wrap_err(format!("Failed to create log directory: {:?}", log_dir))?;
let log_file = log_dir.join("reth_node.log");
Ok(log_file)
}
/// Get the perf event max sample rate from the system, capped at 10000
fn get_perf_sample_rate(&self) -> Option<String> {
let perf_rate_file = "/proc/sys/kernel/perf_event_max_sample_rate";
if let Ok(content) = fs::read_to_string(perf_rate_file) {
let rate_str = content.trim();
if !rate_str.is_empty() {
if let Ok(system_rate) = rate_str.parse::<u32>() {
let capped_rate = std::cmp::min(system_rate, 10000);
info!(
"Detected perf_event_max_sample_rate: {}, using: {}",
system_rate, capped_rate
);
return Some(capped_rate.to_string());
}
warn!("Failed to parse perf_event_max_sample_rate: {}", rate_str);
}
}
None
}
/// Get the absolute path to samply using 'which' command
async fn get_samply_path(&self) -> Result<String> {
let output = Command::new("which")
.arg("samply")
.output()
.await
.wrap_err("Failed to execute 'which samply' command")?;
if !output.status.success() {
return Err(eyre!("samply not found in PATH"));
}
let samply_path = String::from_utf8(output.stdout)
.wrap_err("samply path is not valid UTF-8")?
.trim()
.to_string();
if samply_path.is_empty() {
return Err(eyre!("which samply returned empty path"));
}
Ok(samply_path)
}
/// Build reth arguments as a vector of strings
fn build_reth_args(
&self,
binary_path_str: &str,
additional_args: &[String],
) -> (Vec<String>, String) {
let mut reth_args = vec![binary_path_str.to_string(), "node".to_string()];
// Add chain argument (skip for mainnet as it's the default)
let chain_str = self.chain.to_string();
if chain_str != "mainnet" {
reth_args.extend_from_slice(&["--chain".to_string(), chain_str.clone()]);
}
// Add datadir if specified
if let Some(ref datadir) = self.datadir {
reth_args.extend_from_slice(&["--datadir".to_string(), datadir.clone()]);
}
// Add reth-specific arguments
let metrics_arg = format!("0.0.0.0:{}", self.metrics_port);
reth_args.extend_from_slice(&[
"--engine.accept-execution-requests-hash".to_string(),
"--metrics".to_string(),
metrics_arg,
"--http".to_string(),
"--http.api".to_string(),
"eth".to_string(),
"--disable-discovery".to_string(),
"--trusted-only".to_string(),
]);
// Add any additional arguments passed via command line (common to both baseline and
// feature)
reth_args.extend_from_slice(&self.additional_reth_args);
// Add reference-specific additional arguments
reth_args.extend_from_slice(additional_args);
(reth_args, chain_str)
}
/// Create a command for profiling mode
async fn create_profiling_command(
&self,
ref_type: &str,
reth_args: &[String],
) -> Result<Command> {
// Create profiles directory if it doesn't exist
let profile_dir = self.output_dir.join("profiles");
fs::create_dir_all(&profile_dir).wrap_err("Failed to create profiles directory")?;
let profile_path = profile_dir.join(format!("{}.json.gz", ref_type));
info!("Starting reth node with samply profiling...");
info!("Profile output: {:?}", profile_path);
// Get absolute path to samply
let samply_path = self.get_samply_path().await?;
let mut cmd = if self.use_sudo {
let mut sudo_cmd = Command::new("sudo");
sudo_cmd.arg(&samply_path);
sudo_cmd
} else {
Command::new(&samply_path)
};
// Add samply arguments
cmd.args(["record", "--save-only", "-o", &profile_path.to_string_lossy()]);
// Add rate argument if available
if let Some(rate) = self.get_perf_sample_rate() {
cmd.args(["--rate", &rate]);
}
// Add separator and complete reth command
cmd.arg("--");
cmd.args(reth_args);
Ok(cmd)
}
/// Create a command for direct reth execution
fn create_direct_command(&self, reth_args: &[String]) -> Command {
let binary_path = &reth_args[0];
if self.use_sudo {
info!("Starting reth node with sudo...");
let mut cmd = Command::new("sudo");
cmd.args(reth_args);
cmd
} else {
info!("Starting reth node...");
let mut cmd = Command::new(binary_path);
cmd.args(&reth_args[1..]); // Skip the binary path since it's the command
cmd
}
}
/// Start a reth node using the specified binary path and return the process handle
pub(crate) async fn start_node(
&mut self,
binary_path: &std::path::Path,
_git_ref: &str,
ref_type: &str,
additional_args: &[String],
) -> Result<tokio::process::Child> {
// Store the binary path for later use (e.g., in unwind_to_block)
self.binary_path = Some(binary_path.to_path_buf());
let binary_path_str = binary_path.to_string_lossy();
let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args);
// Log additional arguments if any
if !self.additional_reth_args.is_empty() {
info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
}
if !additional_args.is_empty() {
info!("Using reference-specific additional reth arguments: {:?}", additional_args);
}
let mut cmd = if self.enable_profiling {
self.create_profiling_command(ref_type, &reth_args).await?
} else {
self.create_direct_command(&reth_args)
};
// Set process group for better signal handling
#[cfg(unix)]
{
cmd.process_group(0);
}
debug!("Executing reth command: {cmd:?}");
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true) // Kill on drop so that on Ctrl-C for parent process we stop all child processes
.spawn()
.wrap_err("Failed to start reth node")?;
info!(
"Reth node started with PID: {:?} (binary: {})",
child.id().ok_or_eyre("Reth node is not running")?,
binary_path_str
);
// Prepare log file path
let log_file_path = self.get_log_file_path(ref_type)?;
info!("Reth node logs will be saved to: {:?}", log_file_path);
// Stream stdout and stderr with prefixes at debug level and to log file
if let Some(stdout) = child.stdout.take() {
let log_file = AsyncFile::create(&log_file_path)
.await
.wrap_err(format!("Failed to create log file: {:?}", log_file_path))?;
tokio::spawn(async move {
let reader = AsyncBufReader::new(stdout);
let mut lines = reader.lines();
let mut log_file = log_file;
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH] {}", line);
// Write to log file (reth already includes timestamps)
let log_line = format!("{}\n", line);
if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
debug!("Failed to write to log file: {}", e);
}
}
});
}
if let Some(stderr) = child.stderr.take() {
let log_file = AsyncFile::options()
.create(true)
.append(true)
.open(&log_file_path)
.await
.wrap_err(format!("Failed to open log file for stderr: {:?}", log_file_path))?;
tokio::spawn(async move {
let reader = AsyncBufReader::new(stderr);
let mut lines = reader.lines();
let mut log_file = log_file;
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH] {}", line);
// Write to log file (reth already includes timestamps)
let log_line = format!("{}\n", line);
if let Err(e) = log_file.write_all(log_line.as_bytes()).await {
debug!("Failed to write to log file: {}", e);
}
}
});
}
// Give the node a moment to start up
sleep(Duration::from_secs(5)).await;
Ok(child)
}
/// Wait for the node to be ready and return its current tip
pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
info!("Waiting for node to be ready and synced...");
let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
let check_interval = Duration::from_secs(2);
let rpc_url = "http://localhost:8545";
// Create Alloy provider
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
let provider = ProviderBuilder::new().connect_http(url);
timeout(max_wait, async {
loop {
// First check if RPC is up and node is not syncing
match provider.syncing().await {
Ok(sync_result) => {
match sync_result {
SyncStatus::Info(sync_info) => {
debug!("Node is still syncing {sync_info:?}, waiting...");
}
_ => {
// Node is not syncing, now get the tip
match provider.get_block_number().await {
Ok(tip) => {
info!("Node is ready and not syncing at block: {}", tip);
return Ok(tip);
}
Err(e) => {
debug!("Failed to get block number: {}", e);
}
}
}
}
}
Err(e) => {
debug!("Node RPC not ready yet or failed to check sync status: {}", e);
}
}
sleep(check_interval).await;
}
})
.await
.wrap_err("Timed out waiting for node to be ready and synced")?
}
/// Stop the reth node gracefully
pub(crate) async fn stop_node(&self, child: &mut tokio::process::Child) -> Result<()> {
let pid = child.id().expect("Child process ID should be available");
// Check if the process has already exited
match child.try_wait() {
Ok(Some(status)) => {
info!("Reth node (PID: {}) has already exited with status: {:?}", pid, status);
return Ok(());
}
Ok(None) => {
// Process is still running, proceed to stop it
info!("Stopping process gracefully with SIGINT (PID: {})...", pid);
}
Err(e) => {
return Err(eyre!("Failed to check process status: {}", e));
}
}
#[cfg(unix)]
{
// Send SIGINT to process group to mimic Ctrl-C behavior
let nix_pgid = Pid::from_raw(pid as i32);
match killpg(nix_pgid, Signal::SIGINT) {
Ok(()) => {}
Err(nix::errno::Errno::ESRCH) => {
info!("Process group {} has already exited", pid);
}
Err(e) => {
return Err(eyre!("Failed to send SIGINT to process group {}: {}", pid, e));
}
}
}
#[cfg(not(unix))]
{
// On non-Unix systems, fall back to using external kill command
let output = Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.output()
.await
.wrap_err("Failed to execute taskkill command")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
// Check if the error is because the process doesn't exist
if stderr.contains("not found") || stderr.contains("not exist") {
info!("Process {} has already exited", pid);
} else {
return Err(eyre!("Failed to kill process {}: {}", pid, stderr));
}
}
}
// Wait for the process to exit
match child.wait().await {
Ok(status) => {
info!("Reth node (PID: {}) exited with status: {:?}", pid, status);
}
Err(e) => {
// If we get an error here, it might be because the process already exited
debug!("Error waiting for process exit (may have already exited): {}", e);
}
}
Ok(())
}
/// Unwind the node to a specific block
pub(crate) async fn unwind_to_block(&self, block_number: u64) -> Result<()> {
if self.use_sudo {
info!("Unwinding node to block: {} (with sudo)", block_number);
} else {
info!("Unwinding node to block: {}", block_number);
}
// Use the binary path from the last start_node call, or fallback to default
let binary_path = self
.binary_path
.as_ref()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| "./target/profiling/reth".to_string());
let mut cmd = if self.use_sudo {
let mut sudo_cmd = Command::new("sudo");
sudo_cmd.args([&binary_path, "stage", "unwind"]);
sudo_cmd
} else {
let mut reth_cmd = Command::new(&binary_path);
reth_cmd.args(["stage", "unwind"]);
reth_cmd
};
// Add chain argument (skip for mainnet as it's the default)
let chain_str = self.chain.to_string();
if chain_str != "mainnet" {
cmd.args(["--chain", &chain_str]);
}
// Add datadir if specified
if let Some(ref datadir) = self.datadir {
cmd.args(["--datadir", datadir]);
}
cmd.args(["to-block", &block_number.to_string()]);
// Debug log the command
debug!("Executing reth unwind command: {:?}", cmd);
let mut child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.wrap_err("Failed to start unwind command")?;
// Stream stdout and stderr with prefixes in real-time
if let Some(stdout) = child.stdout.take() {
tokio::spawn(async move {
let reader = AsyncBufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH-UNWIND] {}", line);
}
});
}
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let reader = AsyncBufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!("[RETH-UNWIND] {}", line);
}
});
}
// Wait for the command to complete
let status = child.wait().await.wrap_err("Failed to wait for unwind command")?;
if !status.success() {
return Err(eyre!("Unwind command failed with exit code: {:?}", status.code()));
}
info!("Unwound to block: {}", block_number);
Ok(())
}
}

View File

@@ -230,21 +230,15 @@ fn bench_state_root(c: &mut Criterion) {
},
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -28,10 +28,7 @@ use reth_evm::{
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::{
hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut,
trie_cursor::TrieCursorFactory,
};
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
@@ -204,10 +201,7 @@ where
provider_builder: StateProviderBuilder<N, P>,
multiproof_provider_factory: F,
config: &TreeConfig,
) -> Result<
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
(ParallelStateRootError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
>
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
@@ -222,10 +216,9 @@ where
// consistent view of the database, including the trie tables. Because of this there is no
// need for an overarching prefix set to invalidate any section of the trie tables, and so
// we use an empty prefix set.
let prefix_sets = Arc::new(TriePrefixSetsMut::default());
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory, prefix_sets);
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
@@ -267,12 +260,12 @@ where
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
Ok(PayloadHandle {
PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
})
}
}
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
@@ -897,19 +890,13 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -1317,7 +1317,7 @@ mod tests {
{
let rt_handle = get_test_runtime_handle();
let overlay_factory = OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(overlay_factory, Default::default());
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();

View File

@@ -370,8 +370,7 @@ where
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
// Plan the strategy used for state root computation.
let state_root_plan = self.plan_state_root_computation();
let strategy = state_root_plan.strategy;
let strategy = self.plan_state_root_computation();
debug!(
target: "engine::tree::payload_validator",
@@ -383,7 +382,7 @@ where
let txs = self.tx_iterator_for(&input)?;
// Spawn the appropriate processor based on strategy
let (mut handle, strategy) = ensure_ok!(self.spawn_payload_processor(
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
@@ -751,13 +750,10 @@ where
state: &EngineApiTreeState<N>,
strategy: StateRootStrategy,
) -> Result<
(
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
>,
StateRootStrategy,
),
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
>,
InsertBlockErrorKind,
> {
match strategy {
@@ -791,34 +787,13 @@ where
// Use state root task only if prefix sets are empty, otherwise proof generation is
// too expensive because it requires walking all paths in every proof.
let spawn_start = Instant::now();
let (handle, strategy) = match self.payload_processor.spawn(
let handle = self.payload_processor.spawn(
env,
txs,
provider_builder,
multiproof_provider_factory,
&self.config,
) {
Ok(handle) => {
// Successfully spawned with state root task support
(handle, StateRootStrategy::StateRootTask)
}
Err((error, txs, env, provider_builder)) => {
// Failed to spawn proof workers, fallback to parallel state root
error!(
target: "engine::tree::payload_validator",
?error,
"Failed to spawn proof workers, falling back to parallel state root"
);
(
self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
),
StateRootStrategy::Parallel,
)
}
};
);
// record prewarming initialization duration
self.metrics
@@ -826,9 +801,9 @@ where
.spawn_payload_processor
.record(spawn_start.elapsed().as_secs_f64());
Ok((handle, strategy))
Ok(handle)
}
strategy @ (StateRootStrategy::Parallel | StateRootStrategy::Synchronous) => {
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
let start = Instant::now();
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
@@ -839,7 +814,7 @@ where
.spawn_payload_processor
.record(start.elapsed().as_secs_f64());
Ok((handle, strategy))
Ok(handle)
}
}
}
@@ -877,7 +852,7 @@ where
/// Determines the state root computation strategy based on configuration.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn plan_state_root_computation(&self) -> StateRootPlan {
fn plan_state_root_computation(&self) -> StateRootStrategy {
let strategy = if self.config.state_root_fallback() {
StateRootStrategy::Synchronous
} else if self.config.use_state_root_task() {
@@ -892,7 +867,7 @@ where
"Planned state root computation strategy"
);
StateRootPlan { strategy }
strategy
}
/// Called when an invalid block is encountered during validation.
@@ -971,12 +946,6 @@ enum StateRootStrategy {
Synchronous,
}
/// State root computation plan that captures strategy and required data.
struct StateRootPlan {
/// Strategy that should be attempted for computing the state root.
strategy: StateRootStrategy,
}
/// Type that validates the payloads processed by the engine.
///
/// This provides the necessary functions for validating/executing payloads/blocks.

View File

@@ -125,11 +125,14 @@ impl BanList {
/// Bans the IP until the timestamp.
///
/// This does not ban non-global IPs.
/// If the IP is already banned, the timeout will be updated to the new value.
pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
self.ban_ip_with(ip, Some(until));
}
/// Bans the peer until the timestamp
/// Bans the peer until the timestamp.
///
/// If the peer is already banned, the timeout will be updated to the new value.
pub fn ban_peer_until(&mut self, node_id: PeerId, until: Instant) {
self.ban_peer_with(node_id, Some(until));
}
@@ -147,6 +150,8 @@ impl BanList {
}
/// Bans the peer indefinitely or until the given timeout.
///
/// If the peer is already banned, the timeout will be updated to the new value.
pub fn ban_peer_with(&mut self, node_id: PeerId, until: Option<Instant>) {
self.banned_peers.insert(node_id, until);
}
@@ -154,6 +159,7 @@ impl BanList {
/// Bans the ip indefinitely or until the given timeout.
///
/// This does not ban non-global IPs.
/// If the IP is already banned, the timeout will be updated to the new value.
pub fn ban_ip_with(&mut self, ip: IpAddr, until: Option<Instant>) {
if is_global(&ip) {
self.banned_ips.insert(ip, until);
@@ -167,7 +173,7 @@ mod tests {
#[test]
fn can_ban_unban_peer() {
let peer = PeerId::random();
let peer = PeerId::new([1; 64]);
let mut banlist = BanList::default();
banlist.ban_peer(peer);
assert!(banlist.is_banned_peer(&peer));

View File

@@ -21,7 +21,6 @@ use std::{
cmp::Ordering,
collections::BinaryHeap,
fmt::Debug,
mem,
ops::RangeInclusive,
pin::Pin,
sync::Arc,
@@ -215,9 +214,7 @@ where
/// Adds a new response to the internal buffer
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B>>) {
// take into account capacity
let size = response.iter().map(BlockResponse::size).sum::<usize>() +
response.capacity() * mem::size_of::<BlockResponse<B>>();
let size = response.iter().map(BlockResponse::size).sum::<usize>();
let response = OrderedBodiesResponse { resp: response, size };
let response_len = response.len();

View File

@@ -427,6 +427,12 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
self
}
/// Disables all discovery services for the node.
pub const fn with_disabled_discovery(mut self) -> Self {
self.network.discovery.disable_discovery = true;
self
}
/// Effectively disables the RPC state cache by setting the cache sizes to `0`.
///
/// By setting the cache sizes to 0, caching of newly executed or fetched blocks will be

View File

@@ -1 +1 @@
c9881d543174ff00b8f3a9ad3f31bf4630b9743b
9e3f71cee0e4e2acb4864cb00f5fbee3555d8e9f

View File

@@ -28,7 +28,6 @@ reth-node-builder.workspace = true
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-rpc-engine-api.workspace = true
reth-rpc-convert.workspace = true
# op-reth
reth-optimism-evm.workspace = true

View File

@@ -35,10 +35,8 @@ use reth_rpc_eth_api::{
EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
RpcNodeCoreExt, RpcTypes,
};
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, PendingBlockEnvOrigin,
};
use reth_storage_api::ProviderHeader;
use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock};
use reth_storage_api::{BlockReaderIdExt, ProviderHeader};
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner,
@@ -96,6 +94,11 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
Self { inner }
}
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
pub const fn builder() -> OpEthApiBuilder<Rpc> {
OpEthApiBuilder::new()
}
/// Returns a reference to the [`EthApiNodeBackend`].
pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
self.inner.eth_api()
@@ -134,11 +137,6 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
}
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
pub const fn builder() -> OpEthApiBuilder<Rpc> {
OpEthApiBuilder::new()
}
/// Awaits a fresh flashblock if one is being built, otherwise returns current.
async fn flashblock(
&self,
@@ -177,13 +175,11 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
OpEthApiError: FromEvmError<N::Evm>,
Rpc: RpcConvert<Primitives = N::Primitives>,
{
let pending = self.pending_block_env_and_cfg()?;
let parent = match pending.origin {
PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
let Some(latest) = self.provider().latest_header()? else {
return Ok(None);
};
self.flashblock(parent.hash()).await
self.flashblock(latest.hash()).await
}
}

View File

@@ -6,16 +6,13 @@ use alloy_eips::BlockNumberOrTag;
use reth_chain_state::BlockState;
use reth_rpc_eth_api::{
helpers::{pending_block::PendingEnvBuilder, LoadPendingBlock, SpawnBlocking},
FromEvmError, RpcConvert, RpcNodeCore,
FromEvmError, RpcConvert, RpcNodeCore, RpcNodeCoreExt,
};
use reth_rpc_eth_types::{
block::BlockAndReceipts, builder::config::PendingBlockKind, error::FromEthApiError,
EthApiError, PendingBlock,
};
use reth_storage_api::{
BlockReader, BlockReaderIdExt, ReceiptProvider, StateProviderBox, StateProviderFactory,
};
use std::sync::Arc;
use reth_storage_api::{BlockReaderIdExt, StateProviderBox, StateProviderFactory};
impl<N, Rpc> LoadPendingBlock for OpEthApi<N, Rpc>
where
@@ -38,33 +35,6 @@ where
self.inner.eth_api.pending_block_kind()
}
/// Returns the locally built pending block
async fn local_pending_block(
&self,
) -> Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error> {
if let Ok(Some(pending)) = self.pending_flashblock().await {
return Ok(Some(pending.into_block_and_receipts()));
}
// See: <https://github.com/ethereum-optimism/op-geth/blob/f2e69450c6eec9c35d56af91389a1c47737206ca/miner/worker.go#L367-L375>
let latest = self
.provider()
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let block_id = latest.hash().into();
let block = self
.provider()
.recovered_block(block_id, Default::default())?
.ok_or(EthApiError::HeaderNotFound(block_id.into()))?;
let receipts = self
.provider()
.receipts_by_block(block_id)?
.ok_or(EthApiError::ReceiptsNotFound(block_id.into()))?;
Ok(Some(BlockAndReceipts { block: Arc::new(block), receipts: Arc::new(receipts) }))
}
/// Returns a [`StateProviderBox`] on a mem-pool built pending block overlaying latest.
async fn local_pending_state(&self) -> Result<Option<StateProviderBox>, Self::Error>
where
@@ -83,4 +53,27 @@ where
Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
}
/// Returns the locally built pending block
async fn local_pending_block(
&self,
) -> Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error> {
if let Ok(Some(pending)) = self.pending_flashblock().await {
return Ok(Some(pending.into_block_and_receipts()));
}
// See: <https://github.com/ethereum-optimism/op-geth/blob/f2e69450c6eec9c35d56af91389a1c47737206ca/miner/worker.go#L367-L375>
let latest = self
.provider()
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest = self
.cache()
.get_block_and_receipts(latest.hash())
.await
.map_err(Self::Error::from_eth_err)?
.map(|(block, receipts)| BlockAndReceipts { block, receipts });
Ok(latest)
}
}

View File

@@ -1,20 +1,15 @@
//! Loads and formats OP transaction RPC response.
use crate::{OpEthApi, OpEthApiError, SequencerClient};
use alloy_consensus::TxReceipt as _;
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_eth::TransactionInfo;
use futures::StreamExt;
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_primitives::DepositReceipt;
use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_primitives_traits::{BlockBody, SignedTransaction};
use reth_rpc_eth_api::{
helpers::{
receipt::calculate_gas_used_and_next_log_index, spec::SignersForRpc, EthTransactions,
LoadReceipt, LoadTransaction,
},
helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction},
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
};
@@ -88,21 +83,35 @@ where
fn send_raw_transaction_sync(
&self,
tx: Bytes,
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
where
Self: LoadReceipt + 'static,
{
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
let this = self.clone();
let timeout_duration = self.send_raw_transaction_sync_timeout();
async move {
let mut canonical_stream = this.provider().canonical_state_stream();
let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
let flashblock_rx = this.pending_block_rx();
let mut flashblock_stream = flashblock_rx.map(WatchStream::new);
let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
tokio::time::timeout(timeout_duration, async {
loop {
tokio::select! {
biased;
// check if the tx was preconfirmed in a new flashblock
flashblock = async {
if let Some(stream) = &mut flashblock_stream {
stream.next().await
} else {
futures::future::pending().await
}
} => {
if let Some(flashblock) = flashblock.flatten() {
// if flashblocks are supported, attempt to find id from the pending block
if let Some(receipt) = flashblock
.find_and_convert_transaction_receipt(hash, this.tx_resp_builder())
{
return receipt;
}
}
}
// Listen for regular canonical block updates for inclusion
canonical_notification = canonical_stream.next() => {
if let Some(notification) = canonical_notification {
@@ -118,23 +127,6 @@ where
break;
}
}
// check if the tx was preconfirmed in a new flashblock
_flashblock_update = async {
if let Some(ref mut stream) = flashblock_stream {
stream.next().await
} else {
futures::future::pending().await
}
} => {
// Check flashblocks for faster confirmation (Optimism-specific)
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
let block_and_receipts = pending_block.into_block_and_receipts();
if block_and_receipts.block.body().contains_transaction(&hash)
&& let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
}
}
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
@@ -168,42 +160,11 @@ where
if tx_receipt.is_none() {
// if flashblocks are supported, attempt to find id from the pending block
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
let block_and_receipts = pending_block.into_block_and_receipts();
if let Some((tx, receipt)) =
block_and_receipts.find_transaction_and_receipt_by_hash(hash)
{
// Build tx receipt from pending block and receipts directly inline.
// This avoids canonical cache lookup that would be done by the
// `build_transaction_receipt` which would result in a block not found
// issue. See: https://github.com/paradigmxyz/reth/issues/18529
let meta = tx.meta();
let all_receipts = &block_and_receipts.receipts;
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
return Ok(Some(
this.tx_resp_builder()
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx
.tx()
.clone()
.try_into_recovered_unchecked()
.map_err(Self::Error::from_eth_err)?
.as_recovered_ref(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
block_and_receipts.sealed_block(),
)?
.pop()
.unwrap(),
))
}
if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
let Some(Ok(receipt)) = pending_block
.find_and_convert_transaction_receipt(hash, this.tx_resp_builder())
{
return Ok(Some(receipt));
}
}
let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };

View File

@@ -6,27 +6,11 @@ use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
use futures::Future;
use reth_primitives_traits::SignerRecoverable;
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert};
use reth_rpc_eth_types::{error::FromEthApiError, EthApiError};
use reth_rpc_eth_types::{
error::FromEthApiError, utils::calculate_gas_used_and_next_log_index, EthApiError,
};
use reth_storage_api::{ProviderReceipt, ProviderTx};
/// Calculates the gas used and next log index for a transaction at the given index
pub fn calculate_gas_used_and_next_log_index(
tx_index: u64,
all_receipts: &[impl TxReceipt],
) -> (u64, usize) {
let mut gas_used = 0;
let mut next_log_index = 0;
if tx_index > 0 {
for receipt in all_receipts.iter().take(tx_index as usize) {
gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();
}
}
(gas_used, next_log_index)
}
/// Assembles transaction receipt data w.r.t to network.
///
/// Behaviour shared by several `eth_` RPC methods, not exclusive to `eth_` receipts RPC methods.

View File

@@ -4,17 +4,18 @@
use std::{sync::Arc, time::Instant};
use crate::block::BlockAndReceipts;
use alloy_consensus::BlockHeader;
use crate::{block::BlockAndReceipts, utils::calculate_gas_used_and_next_log_index};
use alloy_consensus::{BlockHeader, TxReceipt};
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_primitives::{BlockHash, B256};
use alloy_primitives::{BlockHash, TxHash, B256};
use derive_more::Constructor;
use reth_chain_state::{BlockState, ExecutedBlock};
use reth_ethereum_primitives::Receipt;
use reth_evm::{ConfigureEvm, EvmEnvFor};
use reth_primitives_traits::{
Block, BlockTy, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader,
Block, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
/// Configured [`reth_evm::EvmEnv`] for a pending block.
#[derive(Debug, Clone, Constructor)]
@@ -129,6 +130,52 @@ impl<N: NodePrimitives> PendingBlock<N> {
pub fn parent_hash(&self) -> BlockHash {
self.executed_block.recovered_block().parent_hash()
}
/// Finds a transaction by hash and returns it along with its corresponding receipt.
///
/// Returns `None` if the transaction is not found in this block.
pub fn find_transaction_and_receipt_by_hash(
&self,
tx_hash: TxHash,
) -> Option<(IndexedTx<'_, N::Block>, &N::Receipt)> {
let indexed_tx = self.executed_block.recovered_block().find_indexed(tx_hash)?;
let receipt = self.receipts.get(indexed_tx.index())?;
Some((indexed_tx, receipt))
}
/// Returns the rpc transaction receipt for the given transaction hash if it exists.
///
/// This uses the given converter to turn [`Self::find_transaction_and_receipt_by_hash`] into
/// the rpc format.
pub fn find_and_convert_transaction_receipt<C>(
&self,
tx_hash: TxHash,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
C: RpcConvert<Primitives = N>,
{
let (tx, receipt) = self.find_transaction_and_receipt_by_hash(tx_hash)?;
let meta = tx.meta();
let all_receipts = &self.receipts;
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
converter
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx.recovered_tx(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
self.executed_block.sealed_block(),
)
.map(|mut receipts| receipts.pop())
.transpose()
}
}
impl<N: NodePrimitives> From<PendingBlock<N>> for BlockState<N> {

View File

@@ -1,9 +1,28 @@
//! Commonly used code snippets
use super::{EthApiError, EthResult};
use alloy_consensus::TxReceipt;
use reth_primitives_traits::{Recovered, SignedTransaction};
use std::future::Future;
/// Calculates the gas used and next log index for a transaction at the given index
pub fn calculate_gas_used_and_next_log_index(
tx_index: u64,
all_receipts: &[impl TxReceipt],
) -> (u64, usize) {
let mut gas_used = 0;
let mut next_log_index = 0;
if tx_index > 0 {
for receipt in all_receipts.iter().take(tx_index as usize) {
gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();
}
}
(gas_used, next_log_index)
}
/// Recovers a [`SignedTransaction`] from an enveloped encoded byte stream.
///
/// This is a helper function that returns the appropriate RPC-specific error if the input data is

View File

@@ -1,6 +1,7 @@
//! `eth_` `Filter` RPC handler implementation
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{Sealable, TxHash};
use alloy_rpc_types_eth::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
@@ -17,6 +18,7 @@ use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_errors::ProviderError;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
use reth_rpc_eth_api::{
helpers::{EthBlocks, LoadReceipt},
EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
RpcNodeCoreExt, RpcTransaction,
};
@@ -48,7 +50,11 @@ use tracing::{debug, error, trace};
impl<Eth> EngineEthFilter for EthFilter<Eth>
where
Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
Eth: FullEthApiTypes
+ RpcNodeCoreExt<Provider: BlockIdReader>
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Returns logs matching given filter object, no query limits
fn logs(
@@ -193,7 +199,11 @@ where
impl<Eth> EthFilter<Eth>
where
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
+ RpcNodeCoreExt
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
@@ -315,7 +325,7 @@ where
#[async_trait]
impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
where
Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
{
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
@@ -356,8 +366,6 @@ where
}
};
//let filter = FilterKind::PendingTransaction(transaction_kind);
// Install the filter and propagate any errors
self.inner.install_filter(transaction_kind).await
}
@@ -434,6 +442,8 @@ impl<Eth> EthFilterInner<Eth>
where
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Access the underlying provider.
@@ -487,10 +497,43 @@ where
Ok(all_logs)
}
FilterBlockOption::Range { from_block, to_block } => {
// compute the range
let info = self.provider().chain_info()?;
// Handle special case where from block is pending
if from_block.is_some_and(|b| b.is_pending()) {
let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
if !(to_block.is_pending() || to_block.is_number()) {
// always empty range
return Ok(Vec::new());
}
// Try to get pending block and receipts
if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
if let BlockNumberOrTag::Number(to_block) = to_block &&
to_block < pending_block.block.number()
{
// this block range is empty based on the user input
return Ok(Vec::new());
}
// we start at the most recent block if unset in filter
let info = self.provider().chain_info()?;
if pending_block.block.number() > info.best_number {
// only consider the pending block if it is ahead of the chain
let mut all_logs = Vec::new();
let timestamp = pending_block.block.timestamp();
let block_num_hash = pending_block.block.num_hash();
append_matching_block_logs(
&mut all_logs,
ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
&filter,
block_num_hash,
&pending_block.receipts,
false, // removed = false for pending blocks
timestamp,
)?;
return Ok(all_logs);
}
}
}
let info = self.provider().chain_info()?;
let start_block = info.best_number;
let from = from_block
.map(|num| self.provider().convert_block_number(num))
@@ -912,7 +955,11 @@ where
/// Represents different modes for processing block ranges when filtering logs
enum RangeMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
/// Use cache-based processing for recent blocks
Cached(CachedMode<Eth>),
@@ -921,7 +968,11 @@ enum RangeMode<
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> RangeMode<Eth>
{
/// Creates a new `RangeMode`.
@@ -993,14 +1044,22 @@ impl<
/// Mode for processing blocks using cache optimization for recent blocks
struct CachedMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> CachedMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
@@ -1027,7 +1086,11 @@ type ReceiptFetchFuture<P> =
/// Mode for processing blocks using range queries for older blocks
struct RangeBlockMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
@@ -1038,7 +1101,11 @@ struct RangeBlockMode<
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> RangeBlockMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {

View File

@@ -367,7 +367,7 @@ where
) -> Result<Vec<LocalizedTransactionTrace>, Eth::Error> {
// We'll reuse the matcher across multiple blocks that are traced in parallel
let matcher = Arc::new(filter.matcher());
let TraceFilter { from_block, to_block, after, count, .. } = filter;
let TraceFilter { from_block, to_block, mut after, count, .. } = filter;
let start = from_block.unwrap_or(0);
let latest_block = self.provider().best_block_number().map_err(Eth::Error::from_eth_err)?;
@@ -393,81 +393,98 @@ where
.into())
}
// fetch all blocks in that range
let blocks = self
.provider()
.recovered_block_range(start..=end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let mut all_traces = Vec::new();
let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests);
for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) {
let chunk_end =
std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end);
// trace all blocks
let mut block_traces = Vec::with_capacity(blocks.len());
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
block_traces.push(traces);
}
// fetch all blocks in that chunk
let blocks = self
.eth_api()
.spawn_blocking_io(move |this| {
Ok(this
.provider()
.recovered_block_range(chunk_start..=chunk_end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>())
})
.await?;
let block_traces = futures::future::try_join_all(block_traces).await?;
let mut all_traces = block_traces
.into_iter()
.flatten()
.flat_map(|traces| traces.into_iter().flatten().flat_map(|traces| traces.into_iter()))
.collect::<Vec<_>>();
// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
// trace all blocks
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any rewards
// because the blocks in ascending order
break
block_traces.push(traces);
}
#[allow(clippy::iter_with_drain)]
let block_traces = futures::future::try_join_all(block_traces.drain(..)).await?;
all_traces.extend(block_traces.into_iter().flatten().flat_map(|traces| {
traces.into_iter().flatten().flat_map(|traces| traces.into_iter())
}));
// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any
// rewards because the blocks in ascending order
break
}
}
// Skips the first `after` number of matching traces.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff < all_traces.len()
{
all_traces.drain(..cutoff);
// we removed the first `after` traces
after = None;
}
// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
return Ok(all_traces)
}
};
}
// Skips the first `after` number of matching traces.
// If `after` is greater than or equal to the number of matched traces, it returns an empty
// array.
if let Some(after) = after.map(|a| a as usize) {
if after < all_traces.len() {
all_traces.drain(..after);
} else {
return Ok(vec![])
}
// If `after` is greater than or equal to the number of matched traces, it returns an
// empty array.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff >= all_traces.len()
{
return Ok(vec![])
}
// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
}
};
Ok(all_traces)
}
@@ -696,6 +713,7 @@ where
/// # Limitations
/// This currently requires block filter fields, since reth does not have address indices yet.
async fn trace_filter(&self, filter: TraceFilter) -> RpcResult<Vec<LocalizedTransactionTrace>> {
let _permit = self.inner.blocking_task_guard.clone().acquire_many_owned(2).await;
Ok(Self::trace_filter(self, filter).await.map_err(Into::into)?)
}

View File

@@ -396,15 +396,12 @@ where
match self.tx_fee_cap {
Some(0) | None => {} // Skip if cap is 0 or None
Some(tx_fee_cap_wei) => {
// max possible tx fee is (gas_price * gas_limit)
// (if EIP1559) max possible tx fee is (max_fee_per_gas * gas_limit)
let gas_price = transaction.max_fee_per_gas();
let max_tx_fee_wei = gas_price.saturating_mul(transaction_gas_limit as u128);
let max_tx_fee_wei = transaction.cost().saturating_sub(transaction.value());
if max_tx_fee_wei > tx_fee_cap_wei {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidPoolTransactionError::ExceedsFeeCap {
max_tx_fee_wei,
max_tx_fee_wei: max_tx_fee_wei.saturating_to(),
tx_fee_cap_wei,
},
))

View File

@@ -71,16 +71,18 @@ pub struct TriePrefixSets {
/// This data structure stores a set of `Nibbles` and provides methods to insert
/// new elements and check whether any existing element has a given prefix.
///
/// Internally, this implementation uses a `Vec` and aims to act like a `BTreeSet` in being both
/// sorted and deduplicated. It does this by keeping a `sorted` flag. The `sorted` flag represents
/// whether or not the `Vec` is definitely sorted. When a new element is added, it is set to
/// `false.`. The `Vec` is sorted and deduplicated when `sorted` is `true` and:
/// * An element is being checked for inclusion (`contains`), or
/// * The set is being converted into an immutable `PrefixSet` (`freeze`)
/// Internally, this implementation stores keys in an unsorted `Vec<Nibbles>` together with an
/// `all` flag. The `all` flag indicates that every entry should be considered changed and that
/// individual keys can be ignored.
///
/// This means that a `PrefixSet` will always be sorted and deduplicated when constructed from a
/// `PrefixSetMut`.
/// Sorting and deduplication do not happen during insertion or membership checks on this mutable
/// structure. Instead, keys are sorted and deduplicated when converting into the immutable
/// `PrefixSet` via `freeze()`. The immutable `PrefixSet` provides `contains` and relies on the
/// sorted and unique keys produced by `freeze()`; it does not perform additional sorting or
/// deduplication.
///
/// This guarantees that a `PrefixSet` constructed from a `PrefixSetMut` is always sorted and
/// deduplicated.
/// # Examples
///
/// ```
@@ -165,8 +167,7 @@ impl PrefixSetMut {
} else {
self.keys.sort_unstable();
self.keys.dedup();
// We need to shrink in both the sorted and non-sorted cases because deduping may have
// occurred either on `freeze`, or during `contains`.
// Shrink after deduplication to release unused capacity.
self.keys.shrink_to_fit();
PrefixSet { index: 0, all: false, keys: Arc::new(self.keys) }
}

View File

@@ -329,7 +329,7 @@ mod tests {
let rt = Runtime::new().unwrap();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(factory, Default::default());
let task_ctx = ProofTaskCtx::new(factory);
let proof_worker_handle = ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1);
let parallel_result =

View File

@@ -42,12 +42,12 @@ use alloy_rlp::{BufMut, Encodable};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{DatabaseProviderROFactory, ProviderError};
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
hashed_cursor::HashedCursorFactory,
node_iter::{TrieElement, TrieNodeIter},
prefix_set::{TriePrefixSets, TriePrefixSetsMut},
prefix_set::TriePrefixSets,
proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
trie_cursor::TrieCursorFactory,
walker::TrieWalker,
@@ -161,7 +161,14 @@ impl ProofWorkerHandle {
#[cfg(feature = "metrics")]
metrics,
);
worker.run()
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
drop(parent_span);
@@ -191,7 +198,14 @@ impl ProofWorkerHandle {
#[cfg(feature = "metrics")]
metrics,
);
worker.run()
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
drop(parent_span);
@@ -358,16 +372,12 @@ impl ProofWorkerHandle {
pub struct ProofTaskCtx<Factory> {
/// The factory for creating state providers.
factory: Factory,
/// The collection of prefix sets for the computation. Since the prefix sets _always_
/// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
/// if we have cached nodes for them.
prefix_sets: Arc<TriePrefixSetsMut>,
}
impl<Factory> ProofTaskCtx<Factory> {
/// Creates a new [`ProofTaskCtx`] with the given factory and prefix sets.
pub const fn new(factory: Factory, prefix_sets: Arc<TriePrefixSetsMut>) -> Self {
Self { factory, prefix_sets }
/// Creates a new [`ProofTaskCtx`] with the given factory.
pub const fn new(factory: Factory) -> Self {
Self { factory }
}
}
@@ -377,17 +387,14 @@ pub struct ProofTaskTx<Provider> {
/// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
provider: Provider,
/// The prefix sets for the computation.
prefix_sets: Arc<TriePrefixSetsMut>,
/// Identifier for the worker within the worker pool, used only for tracing.
id: usize,
}
impl<Provider> ProofTaskTx<Provider> {
/// Initializes a [`ProofTaskTx`] with the given provider, prefix sets, and ID.
const fn new(provider: Provider, prefix_sets: Arc<TriePrefixSetsMut>, id: usize) -> Self {
Self { provider, prefix_sets, id }
/// Initializes a [`ProofTaskTx`] with the given provider and ID.
const fn new(provider: Provider, id: usize) -> Self {
Self { provider, id }
}
}
@@ -462,12 +469,8 @@ where
account: B256,
path: &Nibbles,
) -> TrieNodeProviderResult {
let storage_node_provider = ProofBlindedStorageProvider::new(
&self.provider,
&self.provider,
self.prefix_sets.clone(),
account,
);
let storage_node_provider =
ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
storage_node_provider.trie_node(path)
}
@@ -475,11 +478,8 @@ where
///
/// Used by account workers to retrieve blinded account trie nodes for proof construction.
fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
let account_node_provider = ProofBlindedAccountProvider::new(
&self.provider,
&self.provider,
self.prefix_sets.clone(),
);
let account_node_provider =
ProofBlindedAccountProvider::new(&self.provider, &self.provider);
account_node_provider.trie_node(path)
}
}
@@ -691,7 +691,7 @@ where
///
/// If this function panics, the worker thread terminates but other workers
/// continue operating and the system degrades gracefully.
fn run(self) {
fn run(self) -> ProviderResult<()> {
let Self {
task_ctx,
work_rx,
@@ -702,11 +702,8 @@ where
} = self;
// Create provider from factory
let provider = task_ctx
.factory
.database_provider_ro()
.expect("Storage worker failed to initialize: unable to create provider");
let proof_tx = ProofTaskTx::new(provider, task_ctx.prefix_sets, worker_id);
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
trace!(
target: "trie::proof_task",
@@ -761,6 +758,8 @@ where
#[cfg(feature = "metrics")]
metrics.record_storage_nodes(storage_nodes_processed as usize);
Ok(())
}
/// Processes a storage proof request.
@@ -934,7 +933,7 @@ where
///
/// If this function panics, the worker thread terminates but other workers
/// continue operating and the system degrades gracefully.
fn run(self) {
fn run(self) -> ProviderResult<()> {
let Self {
task_ctx,
work_rx,
@@ -946,11 +945,8 @@ where
} = self;
// Create provider from factory
let provider = task_ctx
.factory
.database_provider_ro()
.expect("Account worker failed to initialize: unable to create provider");
let proof_tx = ProofTaskTx::new(provider, task_ctx.prefix_sets, worker_id);
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
trace!(
target: "trie::proof_task",
@@ -1004,6 +1000,8 @@ where
#[cfg(feature = "metrics")]
metrics.record_account_nodes(account_nodes_processed as usize);
Ok(())
}
/// Processes an account multiproof request.
@@ -1476,12 +1474,10 @@ enum AccountWorkerJob {
mod tests {
use super::*;
use reth_provider::test_utils::create_test_provider_factory;
use reth_trie_common::prefix_set::TriePrefixSetsMut;
use std::sync::Arc;
use tokio::{runtime::Builder, task};
fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
ProofTaskCtx::new(factory, Arc::new(TriePrefixSetsMut::default()))
ProofTaskCtx::new(factory)
}
/// Ensures `ProofWorkerHandle::new` spawns workers correctly.

View File

@@ -2,11 +2,11 @@ use super::{Proof, StorageProof};
use crate::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use alloy_primitives::{map::HashSet, B256};
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_trie_common::{prefix_set::TriePrefixSetsMut, MultiProofTargets, Nibbles};
use reth_trie_common::{MultiProofTargets, Nibbles};
use reth_trie_sparse::provider::{
pad_path_to_key, RevealedNode, TrieNodeProvider, TrieNodeProviderFactory,
};
use std::{sync::Arc, time::Instant};
use std::time::Instant;
use tracing::{enabled, trace, Level};
/// Factory for instantiating providers capable of retrieving blinded trie nodes via proofs.
@@ -16,18 +16,12 @@ pub struct ProofTrieNodeProviderFactory<T, H> {
trie_cursor_factory: T,
/// The factory for hashed cursors.
hashed_cursor_factory: H,
/// A set of prefix sets that have changes.
prefix_sets: Arc<TriePrefixSetsMut>,
}
impl<T, H> ProofTrieNodeProviderFactory<T, H> {
/// Create new proof-based blinded provider factory.
pub const fn new(
trie_cursor_factory: T,
hashed_cursor_factory: H,
prefix_sets: Arc<TriePrefixSetsMut>,
) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory, prefix_sets }
pub const fn new(trie_cursor_factory: T, hashed_cursor_factory: H) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory }
}
}
@@ -43,7 +37,6 @@ where
ProofBlindedAccountProvider {
trie_cursor_factory: self.trie_cursor_factory.clone(),
hashed_cursor_factory: self.hashed_cursor_factory.clone(),
prefix_sets: self.prefix_sets.clone(),
}
}
@@ -51,7 +44,6 @@ where
ProofBlindedStorageProvider {
trie_cursor_factory: self.trie_cursor_factory.clone(),
hashed_cursor_factory: self.hashed_cursor_factory.clone(),
prefix_sets: self.prefix_sets.clone(),
account,
}
}
@@ -64,18 +56,12 @@ pub struct ProofBlindedAccountProvider<T, H> {
trie_cursor_factory: T,
/// The factory for hashed cursors.
hashed_cursor_factory: H,
/// A set of prefix sets that have changes.
prefix_sets: Arc<TriePrefixSetsMut>,
}
impl<T, H> ProofBlindedAccountProvider<T, H> {
/// Create new proof-based blinded account node provider.
pub const fn new(
trie_cursor_factory: T,
hashed_cursor_factory: H,
prefix_sets: Arc<TriePrefixSetsMut>,
) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory, prefix_sets }
pub const fn new(trie_cursor_factory: T, hashed_cursor_factory: H) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory }
}
}
@@ -89,7 +75,6 @@ where
let targets = MultiProofTargets::from_iter([(pad_path_to_key(path), HashSet::default())]);
let mut proof = Proof::new(&self.trie_cursor_factory, &self.hashed_cursor_factory)
.with_prefix_sets_mut(self.prefix_sets.as_ref().clone())
.with_branch_node_masks(true)
.multiproof(targets)
.map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;
@@ -117,21 +102,14 @@ pub struct ProofBlindedStorageProvider<T, H> {
trie_cursor_factory: T,
/// The factory for hashed cursors.
hashed_cursor_factory: H,
/// A set of prefix sets that have changes.
prefix_sets: Arc<TriePrefixSetsMut>,
/// Target account.
account: B256,
}
impl<T, H> ProofBlindedStorageProvider<T, H> {
/// Create new proof-based blinded storage node provider.
pub const fn new(
trie_cursor_factory: T,
hashed_cursor_factory: H,
prefix_sets: Arc<TriePrefixSetsMut>,
account: B256,
) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory, prefix_sets, account }
pub const fn new(trie_cursor_factory: T, hashed_cursor_factory: H, account: B256) -> Self {
Self { trie_cursor_factory, hashed_cursor_factory, account }
}
}
@@ -144,14 +122,11 @@ where
let start = enabled!(target: "trie::proof::blinded", Level::TRACE).then(Instant::now);
let targets = HashSet::from_iter([pad_path_to_key(path)]);
let storage_prefix_set =
self.prefix_sets.storage_prefix_sets.get(&self.account).cloned().unwrap_or_default();
let mut proof = StorageProof::new_hashed(
&self.trie_cursor_factory,
&self.hashed_cursor_factory,
self.account,
)
.with_prefix_set_mut(storage_prefix_set)
.with_branch_node_masks(true)
.storage_multiproof(targets)
.map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?;

View File

@@ -24,7 +24,7 @@ use reth_trie_sparse::{
provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory},
SerialSparseTrie, SparseStateTrie,
};
use std::sync::{mpsc, Arc};
use std::sync::mpsc;
/// State transition witness for the trie.
#[derive(Debug)]
@@ -147,11 +147,7 @@ where
let (tx, rx) = mpsc::channel();
let blinded_provider_factory = WitnessTrieNodeProviderFactory::new(
ProofTrieNodeProviderFactory::new(
self.trie_cursor_factory,
self.hashed_cursor_factory,
Arc::new(self.prefix_sets),
),
ProofTrieNodeProviderFactory::new(self.trie_cursor_factory, self.hashed_cursor_factory),
tx,
);
let mut sparse_trie = SparseStateTrie::<SerialSparseTrie>::new();

View File

@@ -0,0 +1,16 @@
[package]
name = "custom-hardforks"
license.workspace = true
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
# Core Reth dependencies for chain specs and hardforks
reth-chainspec.workspace = true
reth-network-peers.workspace = true
alloy-genesis.workspace = true
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
serde = { version = "1.0", features = ["derive"] }

View File

@@ -0,0 +1,149 @@
//! Custom chain specification integrating hardforks.
//!
//! This demonstrates how to build a `ChainSpec` with custom hardforks,
//! implementing required traits for integration with Reth's chain management.
use alloy_eips::eip7840::BlobParams;
use alloy_genesis::Genesis;
use alloy_primitives::{B256, U256};
use reth_chainspec::{
hardfork, BaseFeeParams, Chain, ChainSpec, DepositContract, EthChainSpec, EthereumHardfork,
EthereumHardforks, ForkCondition, Hardfork, Hardforks,
};
use reth_network_peers::NodeRecord;
use serde::{Deserialize, Serialize};
// Define custom hardfork variants using Reth's `hardfork!` macro.
// Each variant represents a protocol upgrade (e.g., enabling new features).
hardfork!(
/// Custom hardforks for the example chain.
///
/// These are inspired by Ethereum's upgrades but customized for demonstration.
/// Add new variants here to extend the chain's hardfork set.
CustomHardfork {
/// Enables basic custom features (e.g., a new precompile).
BasicUpgrade,
/// Enables advanced features (e.g., state modifications).
AdvancedUpgrade,
}
);
// Implement the `Hardfork` trait for each variant.
// This defines the name and any custom logic (e.g., feature toggles).
// Note: The hardfork! macro already implements Hardfork, so no manual impl needed.
// Configuration for hardfork activation.
// This struct holds settings like activation blocks and is serializable for config files.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct CustomHardforkConfig {
/// Block number to activate BasicUpgrade.
pub basic_upgrade_block: u64,
/// Block number to activate AdvancedUpgrade.
pub advanced_upgrade_block: u64,
}
// Custom chain spec wrapping Reth's `ChainSpec` with our hardforks.
#[derive(Debug, Clone)]
pub struct CustomChainSpec {
pub inner: ChainSpec,
}
impl CustomChainSpec {
/// Creates a custom chain spec from a genesis file.
///
/// This parses the [`ChainSpec`] and adds the custom hardforks.
pub fn from_genesis(genesis: Genesis) -> Self {
let extra = genesis.config.extra_fields.deserialize_as::<CustomHardforkConfig>().unwrap();
let mut inner = ChainSpec::from_genesis(genesis);
inner.hardforks.insert(
CustomHardfork::BasicUpgrade,
ForkCondition::Timestamp(extra.basic_upgrade_block),
);
inner.hardforks.insert(
CustomHardfork::AdvancedUpgrade,
ForkCondition::Timestamp(extra.advanced_upgrade_block),
);
Self { inner }
}
}
// Implement `Hardforks` to integrate custom hardforks with Reth's system.
impl Hardforks for CustomChainSpec {
fn fork<H: Hardfork>(&self, fork: H) -> ForkCondition {
self.inner.fork(fork)
}
fn forks_iter(&self) -> impl Iterator<Item = (&dyn Hardfork, ForkCondition)> {
self.inner.forks_iter()
}
fn fork_id(&self, head: &reth_chainspec::Head) -> reth_chainspec::ForkId {
self.inner.fork_id(head)
}
fn latest_fork_id(&self) -> reth_chainspec::ForkId {
self.inner.latest_fork_id()
}
fn fork_filter(&self, head: reth_chainspec::Head) -> reth_chainspec::ForkFilter {
self.inner.fork_filter(head)
}
}
// Implement `EthChainSpec` for compatibility with Ethereum-based nodes.
impl EthChainSpec for CustomChainSpec {
type Header = alloy_consensus::Header;
fn chain(&self) -> Chain {
self.inner.chain()
}
fn base_fee_params_at_timestamp(&self, timestamp: u64) -> BaseFeeParams {
self.inner.base_fee_params_at_timestamp(timestamp)
}
fn blob_params_at_timestamp(&self, timestamp: u64) -> Option<BlobParams> {
self.inner.blob_params_at_timestamp(timestamp)
}
fn deposit_contract(&self) -> Option<&DepositContract> {
self.inner.deposit_contract()
}
fn genesis_hash(&self) -> B256 {
self.inner.genesis_hash()
}
fn prune_delete_limit(&self) -> usize {
self.inner.prune_delete_limit()
}
fn display_hardforks(&self) -> Box<dyn core::fmt::Display> {
Box::new(self.inner.display_hardforks())
}
fn genesis_header(&self) -> &Self::Header {
self.inner.genesis_header()
}
fn genesis(&self) -> &Genesis {
self.inner.genesis()
}
fn bootnodes(&self) -> Option<Vec<NodeRecord>> {
self.inner.bootnodes()
}
fn final_paris_total_difficulty(&self) -> Option<U256> {
self.inner.final_paris_total_difficulty()
}
}
// Implement `EthereumHardforks` to support Ethereum hardfork queries.
impl EthereumHardforks for CustomChainSpec {
fn ethereum_fork_activation(&self, fork: EthereumHardfork) -> ForkCondition {
self.inner.ethereum_fork_activation(fork)
}
}

View File

@@ -0,0 +1,5 @@
//! Example that showcases how to inject custom hardforks.
pub mod chainspec;
fn main() {}