Compare commits

...

36 Commits

Author SHA1 Message Date
Derek Cofausper
e191721b43 fix: resolve doc link to BlockExecutor
Amp-Thread-ID: https://ampcode.com/threads/T-019c9f18-597c-75fb-831c-db1da98d2a3f
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 13:34:49 +00:00
Derek Cofausper
e4d886f7f5 refactor: use disable/enable_inspector instead of two-phase executor
Create the BlockExecutor with TracingInspector from the start, disable
it during pre-execution changes and prior tx replay, then enable it
only for the target transaction. This removes the need to drop the
executor and create a separate EVM for tracing.

Amp-Thread-ID: https://ampcode.com/threads/T-019c9f18-597c-75fb-831c-db1da98d2a3f
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 12:55:04 +00:00
Derek Cofausper
40f78bf3e4 refactor: address review - use BlockExecutor abstraction and remove find_and_trace_failing_tx
- Remove find_and_trace_failing_tx: if execute_one errors it's an
  executor-level issue, not a specific tx. Only trace when we know
  the tx index (receipt/gas mismatch).
- Refactor trace_failed_transaction to use executor_for_block +
  apply_pre_execution_changes + execute_transaction instead of
  manually calling SystemCaller and setting state_clear_flag.
  This makes the tracing logic chain-agnostic.
- Remove Spec generic parameter and EthereumHardforks bound from
  trace_failed_transaction.

Amp-Thread-ID: https://ampcode.com/threads/T-019c9f18-597c-75fb-831c-db1da98d2a3f
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 12:43:58 +00:00
Derek Cofausper
5932774eb0 chore: rustfmt
Amp-Thread-ID: https://ampcode.com/threads/T-019c9ec0-4e76-752e-aec2-1b0cd8cb0919
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 12:16:58 +00:00
Derek Cofausper
6c1f4b88f9 fix: set EIP-161 state clear flag before tracing replay
The real executor sets set_state_clear_flag based on Spurious Dragon
activation before executing any transactions. Without this, empty
accounts may not be correctly cleared from state during replay,
causing the traced transaction to see different state than during
actual block execution.

Amp-Thread-ID: https://ampcode.com/threads/T-019c9ec0-4e76-752e-aec2-1b0cd8cb0919
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 12:06:19 +00:00
Derek Cofausper
efaf89f0f7 fix: apply pre-execution system calls before tracing
Apply EIP-4788 (beacon root) and EIP-2935 (blockhashes) system calls
via SystemCaller before replaying transactions. Without this, the
traced transaction would see stale state.

Amp-Thread-ID: https://ampcode.com/threads/T-019c9ec0-4e76-752e-aec2-1b0cd8cb0919
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 11:56:01 +00:00
Derek Cofausper
ec8dd2d618 feat(cli): add opcode-level tracing for failed txs in re-execute command
When re-execute encounters a failing transaction (execute_one error or
receipt/gas mismatch), automatically re-execute the failing tx with a
TracingInspector to produce geth-style structLogs with opcode-level
detail (pc, op, gas, stack, memory, storage) and log them as JSON.

Two helpers:
- find_and_trace_failing_tx: locates failing tx index by replaying
  txs individually, then traces it
- trace_failed_transaction: builds parent state, replays prior txs,
  executes failing tx with inspector, logs DefaultFrame JSON

Zero cost on the happy path — tracing only runs on failure.

Amp-Thread-ID: https://ampcode.com/threads/T-019c9ec0-4e76-752e-aec2-1b0cd8cb0919
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 11:34:43 +00:00
DaniPopes
0df9791bea chore: bump alloy-evm to 0.28.0 (#22636)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-27 10:22:58 +00:00
Delweng
09adb83922 fix(engine/tree): continue sync-target progression for already-seen downloaded blocks (#22628)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 08:12:06 +00:00
Delweng
c12b6d4c90 fix(rpc): return -38003 for FCU beacon-root payloadAttributes mismatches (#22634)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 07:54:20 +00:00
Derek Cofausper
7a78044587 chore(libmdbx): fix MDB_ -> MDBX_ typos (#22630)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 06:06:07 +00:00
figtracer
f88538e033 refactor(net): add peers() accessors on Swarm to flatten accessor chains (#22616)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 05:35:14 +00:00
DaniPopes
63dff64b8a chore: simplify tx iterator (#22365) 2026-02-27 05:09:13 +00:00
DaniPopes
233590cefd chore: use better hasher for precompile cache (#22360) 2026-02-27 05:09:12 +00:00
Derek Cofausper
40962ef6fc chore(hive): remove engine-withdrawals from ignored tests (#22625)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-27 03:57:43 +00:00
github-actions[bot]
2f121b099b chore(deps): weekly cargo update (#22624)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 03:36:42 +00:00
Delweng
0470050c05 fix(engine): continue downloading head block after making non-head sync target canonical (#22613)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 03:15:52 +00:00
MagicJoshh
cbc416b82a fix(rpc-provider): state_root delegates to stub that always returns zero (#22610) 2026-02-27 02:53:57 +00:00
MagicJoshh
3fddefbd38 fix(rpc): prevent u64 underflow when re-executing genesis block (#22532) 2026-02-27 02:48:59 +00:00
Julian Meyer
f97a6530c1 chore: make cached overlay fetch public (#22619) 2026-02-27 02:47:50 +00:00
Derek Cofausper
80e3e1c79d docs: add storage v2 guide (#22620)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-26 20:22:52 +00:00
Arsenii Kulikov
ee37c25a4b perf: use more multiproof workers (#22615) 2026-02-26 19:59:06 +00:00
Derek Cofausper
c01f9688e2 feat: add transaction iterator helpers to Chain (#22618)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 19:39:34 +00:00
bigbear
815a75833e refactor(exex): remove redundant update_capacity call (#22603) 2026-02-26 13:09:41 +00:00
cui
59c4e24296 fix(downloaders): reset metrics on clear (#21858) 2026-02-26 12:38:55 +00:00
Derek Cofausper
d5b5caa439 docs: add PR title and description guidelines to CLAUDE.md (#22602)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 12:20:54 +00:00
Julio
47f1999654 fix(net): abort discv4 and DNS discovery tasks on Discovery drop (#22590)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-26 10:37:57 +00:00
MergeBot
3ac5637bd1 chore(ci): fix collapsible_match clippy lint in chainspec (#22594)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 10:04:19 +00:00
Derek Cofausper
4cec99ed13 chore(bench): include core count in Slack notification when non-default (#22584)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 21:58:39 +00:00
Arsenii Kulikov
2f73835483 feat(reth-bench): support benchmarking via rlp blocks (#22581) 2026-02-25 20:28:47 +00:00
stevencartavia
ed20a40649 refactor(rpc): fetch block before tracing to avoid double lookups (#22503) 2026-02-25 20:17:45 +00:00
MergeBot
080a9cfc10 fix(rpc): add missing apply_pre_execution_changes in spawn_replay_transaction (#22575) 2026-02-25 20:04:02 +00:00
MergeBot
c4cd5c9b7b fix(rpc): add missing apply_pre_execution_changes in debug_traceCallMany (#22577) 2026-02-25 20:00:12 +00:00
Dan Cline
ce2a194fb7 feat(cli): add db stage-checkpoints command (#22579)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 19:58:59 +00:00
Vitalyr
6dcab51c97 fix(rpc): respect pending-block=none for provider blocks (#22556) 2026-02-25 19:45:42 +00:00
Derek Cofausper
4db23809cc fix(storage): return early in RocksDB healing when checkpoint is 0 (#22576)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 19:29:45 +00:00
144 changed files with 2168 additions and 1024 deletions

View File

@@ -118,9 +118,12 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
const countsLine = warmup
? `*Warmup:* ${warmup} | *Blocks:* ${summary.blocks}`
: `*Blocks:* ${summary.blocks}`;
const cores = process.env.BENCH_CORES || '0';
const countsParts = [];
if (warmup) countsParts.push(`*Warmup:* ${warmup}`);
countsParts.push(`*Blocks:* ${summary.blocks}`);
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
const countsLine = countsParts.join(' | ');
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');

View File

@@ -11,17 +11,6 @@
#
# When a test should no longer be ignored, remove it from this list.
# flaky
engine-withdrawals:
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
# P2P sync timing issue in hive Docker environment: secondary client returns SYNCING but
# peer discovery/connection doesn't complete within the timeout when running with
# --sim.parallelism 16. Not a correctness bug, purely a CI timing issue.
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
engine-cancun:
- Transaction Re-Org, New Payload on Revert Back (Cancun) (reth)
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)

View File

@@ -191,6 +191,78 @@ The `book` CI job (`.github/workflows/lint.yml`) enforces this by regenerating t
### Opening PRs against <https://github.com/paradigmxyz/reth>
#### Titles
Use [Conventional Commits](https://www.conventionalcommits.org/) with an optional scope:
```
<type>(<scope>): <short description>
```
**Types**: `feat`, `fix`, `perf`, `refactor`, `docs`, `test`, `chore`
**Scope** (optional): crate or area, e.g. `evm`, `trie`, `rpc`, `engine`, `net`
Examples:
- `fix(rpc): correct gas estimation for ERC-20 transfers`
- `perf: batch trie updates to reduce cursor overhead`
- `feat(engine): add new_payload_interval metric`
#### Descriptions
Keep it short. Say what changed and why — nothing more.
**Do:**
- Write 13 sentences summarizing the change
- Explain _why_ if the diff doesn't make it obvious
- Link related issues or EIPs
- Include benchmark numbers for perf changes
**Don't:**
- List every file changed — that's what the diff is for
- Repeat the title in the body
- Add "Files changed" or "Changes" sections
- Write walls of text that go stale when the diff is updated
- Use filler like "This PR introduces...", "comprehensive", "robust", "enhance", "leverage"
**Template:**
```
Closes #<issue>
<what changed, 1-3 sentences>
<why, if not obvious from the diff>
```
**Good example:**
```
Closes #16800
Adds fallback for external IP resolution so node startup doesn't fail
when STUN is unreachable. Falls back to the configured default.
```
**Bad example:**
```
## Summary
This PR introduces comprehensive improvements to the IP resolution system.
## Changes
- Modified `crates/net/discv4/src/lib.rs` to add fallback
- Modified `crates/net/discv4/src/config.rs` to add default IP
- Added tests in `crates/net/discv4/src/tests/ip.rs`
## Files Changed
- crates/net/discv4/src/lib.rs
- crates/net/discv4/src/config.rs
- crates/net/discv4/src/tests/ip.rs
```
#### Labels and CI
Label PRs appropriately, first check the available labels and then apply the relevant ones:
* when changes are RPC related, add A-rpc label
* when changes are docs related, add C-docs label

466
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -454,7 +454,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-evm = { version = "0.28.0", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }

View File

@@ -31,6 +31,8 @@ pub(crate) struct BenchContext {
pub(crate) is_optimism: bool,
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
pub(crate) use_reth_namespace: bool,
/// Whether to fetch and replay RLP-encoded blocks.
pub(crate) rlp_blocks: bool,
}
impl BenchContext {
@@ -142,7 +144,8 @@ impl BenchContext {
};
let next_block = first_block.header.number + 1;
let use_reth_namespace = bench_args.reth_new_payload;
let rlp_blocks = bench_args.rlp_blocks;
let use_reth_namespace = bench_args.reth_new_payload || rlp_blocks;
Ok(Self {
auth_provider,
block_provider,
@@ -150,6 +153,7 @@ impl BenchContext {
next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
})
}
}

View File

@@ -21,6 +21,7 @@ use reth_chainspec::ChainSpec;
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use reth_rpc_api::RethNewPayloadInput;
use std::{path::PathBuf, time::Instant};
use tracing::info;
@@ -184,34 +185,32 @@ impl Command {
Some(new_payload_version),
)?;
let (version, params) = if self.reth_new_payload {
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?)
} else {
(Some(version), params)
};
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file = GasRampPayloadFile {
version: version as u8,
version: version.map(|v| v as u8),
block_hash,
params: params.clone(),
execution_data: Some(execution_data.clone()),
};
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
let reth_data = self.reth_new_payload.then_some(execution_data);
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
let _ = call_new_payload_with_reth(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated_with_reth(
&provider,
version,
forkchoice_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&provider, version, forkchoice_state).await?;
parent_header = block.header;
parent_hash = block_hash;

View File

@@ -25,7 +25,7 @@ use crate::{
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
},
};
use alloy_provider::Provider;
use alloy_provider::{ext::DebugApi, Provider};
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
@@ -154,6 +154,7 @@ impl Command {
mut next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
@@ -186,6 +187,21 @@ impl Command {
}
};
let rlp = if rlp_blocks {
let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
Ok(rlp) => rlp,
Err(e) => {
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
let _ = error_sender
.send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
break;
}
};
Some(rlp)
} else {
None
};
let head_block_hash = block.header.hash;
let safe_block_hash = block_provider
.get_block_by_number(block.header.number.saturating_sub(32).into());
@@ -207,7 +223,7 @@ impl Command {
next_block += 1;
if let Err(e) = sender
.send((block, head_block_hash, safe_block_hash, finalized_block_hash))
.send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
.await
{
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
@@ -221,7 +237,7 @@ impl Command {
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
while let Some((block, head, safe, finalized)) = {
while let Some((block, head, safe, finalized, rlp)) = {
let wait_start = Instant::now();
let result = receiver.recv().await;
total_wait_time += wait_start.elapsed();
@@ -240,11 +256,11 @@ impl Command {
finalized_block_hash: finalized,
};
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) =
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload_with_reth(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
@@ -263,13 +279,7 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
version,
forkchoice_state,
use_reth_namespace,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = if server_timings.is_some() {

View File

@@ -11,7 +11,7 @@ use crate::{
},
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
};
use alloy_provider::Provider;
use alloy_provider::{ext::DebugApi, Provider};
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
@@ -51,6 +51,7 @@ impl Command {
mut next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
@@ -83,8 +84,21 @@ impl Command {
}
};
let rlp = if rlp_blocks {
let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
else {
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
let _ = error_sender
.send(eyre::eyre!("Failed to fetch raw block {next_block}"));
break;
};
Some(rlp)
} else {
None
};
next_block += 1;
if let Err(e) = sender.send(block).await {
if let Err(e) = sender.send((block, rlp)).await {
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
break;
}
@@ -96,7 +110,7 @@ impl Command {
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
while let Some(block) = {
while let Some((block, rlp)) = {
let wait_start = Instant::now();
let result = receiver.recv().await;
total_wait_time += wait_start.elapsed();
@@ -108,12 +122,12 @@ impl Command {
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) =
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload_with_reth(&auth_provider, version, params).await?;
let latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());

View File

@@ -22,14 +22,14 @@ pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GasRampPayloadFile {
/// Engine API version (1-5).
pub(crate) version: u8,
///
/// `None` indicates that `reth_newPayload` should be used.
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) version: Option<u8>,
/// The block hash for FCU.
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
/// The execution data for `reth_newPayload`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas

View File

@@ -38,6 +38,7 @@ use eyre::Context;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
use reth_rpc_api::RethNewPayloadInput;
use std::{
path::PathBuf,
time::{Duration, Instant},
@@ -161,7 +162,9 @@ struct GasRampPayload {
/// Block number from filename.
block_number: u64,
/// Engine API version for newPayload.
version: EngineApiMessageVersion,
///
/// `None` indicates that `reth_newPayload` should be used.
version: Option<EngineApiMessageVersion>,
/// The file contents.
file: GasRampPayloadFile,
}
@@ -273,13 +276,10 @@ impl Command {
"Executing gas ramp payload (newPayload + FCU)"
);
let reth_data =
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
let _ = call_new_payload_with_reth(
&auth_provider,
payload.version,
payload.file.params.clone(),
reth_data,
)
.await?;
@@ -288,13 +288,7 @@ impl Command {
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated_with_reth(
&auth_provider,
payload.version,
fcu_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, payload.version, fcu_state).await?;
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
@@ -342,31 +336,34 @@ impl Command {
"Sending newPayload"
);
let params = serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?;
let (version, params) = if self.reth_new_payload {
let reth_data = ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields {
requests: envelope.execution_requests.clone().into(),
},
),
};
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(reth_data),))?)
} else {
(
Some(EngineApiMessageVersion::V4),
serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?,
)
};
let reth_data = self.reth_new_payload.then(|| ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
),
});
let server_timings = call_new_payload_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
params,
reth_data,
)
.await?;
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
@@ -391,13 +388,7 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
fcu_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency =
@@ -558,13 +549,18 @@ impl Command {
let file: GasRampPayloadFile = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let version = match file.version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
let version = if let Some(version) = file.version {
match version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
}
.into()
} else {
None
};
info!(

View File

@@ -3,7 +3,7 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
@@ -12,6 +12,7 @@ use alloy_rpc_types_engine::{
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use reth_rpc_api::RethNewPayloadInput;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error};
@@ -169,7 +170,15 @@ where
pub(crate) fn block_to_new_payload(
block: AnyRpcBlock,
is_optimism: bool,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
rlp: Option<Bytes>,
reth_new_payload: bool,
) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
if let Some(rlp) = rlp {
return Ok((
None,
serde_json::to_value((RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),))?,
));
}
let block = block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
@@ -181,7 +190,14 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
let (version, params, execution_data) =
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
if reth_new_payload {
Ok((None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?))
} else {
Ok((Some(version), params))
}
}
/// Converts an execution payload and sidecar into versioned engine API params and an
@@ -266,17 +282,15 @@ pub(crate) fn payload_to_new_payload(
#[allow(dead_code)]
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
version: Option<EngineApiMessageVersion>,
params: serde_json::Value,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params, None).await
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params).await
}
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
#[derive(Debug, Deserialize)]
struct RethPayloadStatus {
#[serde(flatten)]
status: PayloadStatus,
latency_us: u64,
#[serde(default)]
persistence_wait_us: Option<u64>,
@@ -300,72 +314,50 @@ pub(crate) struct NewPayloadTimingBreakdown {
}
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
/// `reth_execution_data` is provided.
/// `version` is provided.
///
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
///
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
/// the standard engine namespace.
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
version: Option<EngineApiMessageVersion>,
params: serde_json::Value,
reth_execution_data: Option<ExecutionData>,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
if let Some(execution_data) = reth_execution_data {
let method = "reth_newPayload";
let reth_params = serde_json::to_value((execution_data.clone(),))
.expect("ExecutionData serialization cannot fail");
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
debug!(target: "reth-bench", method, "Sending newPayload");
debug!(target: "reth-bench", method, "Sending newPayload");
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
let resp = loop {
let resp: serde_json::Value = provider.client().request(method, &params).await?;
let status = PayloadStatus::deserialize(&resp)?;
while !resp.status.is_valid() {
if resp.status.is_invalid() {
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
)))
}
if resp.status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
if status.is_valid() {
break resp;
}
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
} else {
let method = version.method_name();
debug!(target: "reth-bench", method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {status:?}")),
)))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
status = provider.client().request(method, &params).await?;
if status.is_invalid() {
return Err(eyre::eyre!("Invalid {method}: {status:?}"));
}
Ok(None)
if status.is_syncing() {
return Err(eyre::eyre!(
"invalid range: no canonical state found for parent of requested block"
));
}
};
if version.is_some() {
return Ok(None);
}
let resp: RethPayloadStatus = serde_json::from_value(resp)?;
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
@@ -403,20 +395,25 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
P: Provider<N> + EngineApiValidWaitExt<N>,
>(
provider: P,
message_version: EngineApiMessageVersion,
message_version: Option<EngineApiMessageVersion>,
forkchoice_state: ForkchoiceState,
use_reth: bool,
) -> TransportResult<ForkchoiceUpdated> {
if use_reth {
if let Some(message_version) = message_version {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
} else {
let method = "reth_forkchoiceUpdated";
let reth_params = serde_json::to_value((forkchoice_state,))
.expect("ForkchoiceState serialization cannot fail");
debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
let mut resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
loop {
let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
if resp.is_valid() {
break Ok(resp)
}
while !resp.is_valid() {
if resp.is_invalid() {
error!(target: "reth-bench", ?resp, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
@@ -428,11 +425,6 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
}
Ok(resp)
} else {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
}
}

View File

@@ -855,15 +855,9 @@ impl From<Genesis> for ChainSpec {
// those networks we use the activation
// blocks of those networks
match genesis.config.chain_id {
1 => {
if ttd == MAINNET_PARIS_TTD {
return Some(MAINNET_PARIS_BLOCK)
}
}
11155111 => {
if ttd == SEPOLIA_PARIS_TTD {
return Some(SEPOLIA_PARIS_BLOCK)
}
1 if ttd == MAINNET_PARIS_TTD => return Some(MAINNET_PARIS_BLOCK),
11155111 if ttd == SEPOLIA_PARIS_TTD => {
return Some(SEPOLIA_PARIS_BLOCK)
}
_ => {}
};

View File

@@ -45,6 +45,7 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
revm-inspectors.workspace = true
reth-stages.workspace = true
reth-stages-types = { workspace = true, optional = true }
reth-static-file-types = { workspace = true, features = ["clap"] }

View File

@@ -19,6 +19,7 @@ mod list;
mod prune_checkpoints;
mod repair_trie;
mod settings;
mod stage_checkpoints;
mod state;
mod static_file_header;
mod stats;
@@ -70,6 +71,8 @@ pub enum Subcommands {
Settings(settings::Command),
/// View or set prune checkpoints
PruneCheckpoints(prune_checkpoints::Command),
// View or set stage checkpoints
StageCheckpoints(stage_checkpoints::Command),
/// Gets storage size information for an account
AccountStorage(account_storage::Command),
/// Gets account state and storage at a specific block
@@ -213,6 +216,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::StageCheckpoints(command) => {
db_exec!(self.env, tool, N, command.access_rights(), {
command.execute(&tool)?;
});
}
Subcommands::AccountStorage(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -0,0 +1,297 @@
//! `reth db stage-checkpoints` command for viewing and setting stage checkpoint values.
use clap::{Args, Parser, Subcommand, ValueEnum};
use reth_db_common::DbTool;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
StageCheckpointWriter,
};
use reth_stages::StageId;
use crate::common::AccessRights;
/// `reth db stage-checkpoints` subcommand
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
impl Command {
/// Returns database access rights required for the command.
pub fn access_rights(&self) -> AccessRights {
match &self.command {
Subcommands::Get { .. } => AccessRights::RO,
Subcommands::Set(_) => AccessRights::RW,
}
}
/// Execute the command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Get { stage } => Self::get(tool, stage),
Subcommands::Set(args) => Self::set(tool, args),
}
}
fn get<N: ProviderNodeTypes>(tool: &DbTool<N>, stage: Option<StageArg>) -> eyre::Result<()> {
let provider = tool.provider_factory.provider()?;
match stage {
Some(stage) => {
let stage_id = stage.into();
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
println!("{stage_id}: {checkpoint:?}");
}
None => {
let mut checkpoints = provider.get_all_checkpoints()?;
checkpoints.sort_by(|a, b| a.0.cmp(&b.0));
for (stage, checkpoint) in checkpoints {
println!("{stage}: {checkpoint:?}");
}
}
}
Ok(())
}
fn set<N: ProviderNodeTypes>(tool: &DbTool<N>, args: SetArgs) -> eyre::Result<()> {
let stage_id: StageId = args.stage.into();
let provider_rw = tool.provider_factory.database_provider_rw()?;
let previous = provider_rw.get_stage_checkpoint(stage_id)?;
let mut checkpoint = previous.unwrap_or_default();
checkpoint.block_number = args.block_number;
if args.clear_stage_unit {
checkpoint.stage_checkpoint = None;
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
provider_rw.commit()?;
println!("Updated checkpoint for {stage_id}: {checkpoint:?}");
Ok(())
}
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Get stage checkpoint(s) from database.
Get {
/// Specific stage to query. If omitted, shows all stages.
#[arg(long, value_enum)]
stage: Option<StageArg>,
},
/// Set a stage checkpoint.
Set(SetArgs),
}
/// Arguments for the `set` subcommand.
#[derive(Debug, Args)]
pub struct SetArgs {
/// Stage to update.
#[arg(long, value_enum)]
stage: StageArg,
/// Block number to set as stage checkpoint.
#[arg(long)]
block_number: u64,
/// Clear stage-specific unit checkpoint payload.
#[arg(long)]
clear_stage_unit: bool,
}
/// CLI-friendly stage names.
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum StageArg {
Era,
Headers,
Bodies,
SenderRecovery,
Execution,
PruneSenderRecovery,
MerkleUnwind,
AccountHashing,
StorageHashing,
MerkleExecute,
TransactionLookup,
IndexStorageHistory,
IndexAccountHistory,
Prune,
Finish,
}
impl From<StageArg> for StageId {
fn from(arg: StageArg) -> Self {
match arg {
StageArg::Era => Self::Era,
StageArg::Headers => Self::Headers,
StageArg::Bodies => Self::Bodies,
StageArg::SenderRecovery => Self::SenderRecovery,
StageArg::Execution => Self::Execution,
StageArg::PruneSenderRecovery => Self::PruneSenderRecovery,
StageArg::MerkleUnwind => Self::MerkleUnwind,
StageArg::AccountHashing => Self::AccountHashing,
StageArg::StorageHashing => Self::StorageHashing,
StageArg::MerkleExecute => Self::MerkleExecute,
StageArg::TransactionLookup => Self::TransactionLookup,
StageArg::IndexStorageHistory => Self::IndexStorageHistory,
StageArg::IndexAccountHistory => Self::IndexAccountHistory,
StageArg::Prune => Self::Prune,
StageArg::Finish => Self::Finish,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
use reth_provider::{
test_utils::create_test_provider_factory, DBProvider, DatabaseProviderFactory,
StageCheckpointReader, StageCheckpointWriter,
};
use reth_stages::StageCheckpoint;
#[test]
fn parse_set_args() {
let command = Command::parse_from([
"stage-checkpoints",
"set",
"--stage",
"headers",
"--block-number",
"123",
]);
assert!(matches!(
command.command,
Subcommands::Set(SetArgs {
stage: StageArg::Headers,
block_number: 123,
clear_stage_unit: false,
})
));
}
#[test]
fn set_overwrites_block_number() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
provider_rw
.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(10))
.expect("save checkpoint");
provider_rw.commit().expect("commit initial checkpoint");
}
let command = Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Headers,
block_number: 42,
clear_stage_unit: false,
}),
};
command.execute(&tool).expect("execute command");
let provider = provider_factory.provider().expect("provider");
let checkpoint = provider
.get_stage_checkpoint(StageId::Headers)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert_eq!(checkpoint.block_number, 42);
}
#[test]
fn set_preserves_stage_unit_checkpoint_unless_cleared() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
let checkpoint = StageCheckpoint::new(10).with_block_range(&StageId::Execution, 5, 10);
provider_rw
.save_stage_checkpoint(StageId::Execution, checkpoint)
.expect("save checkpoint");
provider_rw.commit().expect("commit initial checkpoint");
}
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Execution,
block_number: 11,
clear_stage_unit: false,
}),
}
.execute(&tool)
.expect("execute command");
let provider = provider_factory.provider().expect("provider");
let checkpoint = provider
.get_stage_checkpoint(StageId::Execution)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert!(checkpoint.stage_checkpoint.is_some());
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Execution,
block_number: 12,
clear_stage_unit: true,
}),
}
.execute(&tool)
.expect("execute command");
let checkpoint = provider_factory
.provider()
.expect("provider")
.get_stage_checkpoint(StageId::Execution)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert!(checkpoint.stage_checkpoint.is_none());
}
#[test]
fn set_preserves_checkpoint_progress() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
provider_rw
.save_stage_checkpoint(StageId::MerkleExecute, StageCheckpoint::new(10))
.expect("save checkpoint");
provider_rw
.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![1, 2, 3])
.expect("save progress");
provider_rw.commit().expect("commit initial checkpoint");
}
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::MerkleExecute,
block_number: 20,
clear_stage_unit: false,
}),
}
.execute(&tool)
.expect("execute command");
let provider = provider_factory.provider().expect("provider");
let progress = provider
.get_stage_checkpoint_progress(StageId::MerkleExecute)
.expect("get stage checkpoint progress");
assert_eq!(progress, Some(vec![1, 2, 3]));
}
}

View File

@@ -297,21 +297,18 @@ where
}
match event {
Event::Key(key) => {
if key.kind == event::KeyEventKind::Press {
match key.code {
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
KeyCode::Down => app.next(),
KeyCode::Up => app.previous(),
KeyCode::Right => app.next_page(),
KeyCode::Left => app.previous_page(),
KeyCode::Char('G') => {
app.mode = ViewMode::GoToPage;
}
_ => {}
}
Event::Key(key) if key.kind == event::KeyEventKind::Press => match key.code {
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
KeyCode::Down => app.next(),
KeyCode::Up => app.previous(),
KeyCode::Right => app.next_page(),
KeyCode::Left => app.previous_page(),
KeyCode::Char('G') => {
app.mode = ViewMode::GoToPage;
}
}
_ => {}
},
Event::Key(_) => {}
Event::Mouse(e) => match e.kind {
MouseEventKind::ScrollDown => app.next(),
MouseEventKind::ScrollUp => app.previous(),

View File

@@ -11,14 +11,18 @@ use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_util::cancellation::CancellationToken;
use reth_consensus::FullConsensus;
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
use reth_evm::{
execute::{BlockExecutor, Executor},
ConfigureEvm, Evm,
};
use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected, NodePrimitives};
use reth_provider::{
BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
StaticFileProviderFactory, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_revm::{database::StateProviderDatabase, State};
use reth_stages::stages::calculate_gas_used_from_headers;
use revm_inspectors::tracing::{TracingInspector, TracingInspectorConfig};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
@@ -65,6 +69,108 @@ impl<C: ChainSpecParser> Command<C> {
}
}
/// Traces a failed transaction at the given index within a block, producing opcode-level output.
///
/// Creates a [`BlockExecutor`] with a [`TracingInspector`]
/// attached but initially disabled. The inspector is enabled only for the target transaction,
/// keeping pre-execution changes and prior transaction replay uninstrumented.
/// The resulting trace is logged as a JSON-serialized geth-style `DefaultFrame` containing
/// opcode-level `structLogs`.
fn trace_failed_transaction<C, DB>(
evm_config: &C,
db: StateProviderDatabase<DB>,
block: &reth_primitives_traits::RecoveredBlock<<C::Primitives as NodePrimitives>::Block>,
tx_index: usize,
) where
C: ConfigureEvm,
DB: reth_revm::database::EvmStateProvider,
{
let evm_env = match evm_config.evm_env(block.header()) {
Ok(env) => env,
Err(err) => {
error!(%err, "Failed to create EVM env for opcode tracing");
return;
}
};
let mut state = State::builder().with_database(db).with_bundle_update().build();
let ctx = match evm_config.context_for_block(block) {
Ok(ctx) => ctx,
Err(err) => {
error!(%err, "Failed to create execution context for opcode tracing");
return;
}
};
let inspector = TracingInspector::new(TracingInspectorConfig::all());
let evm = evm_config.evm_with_env_and_inspector(&mut state, evm_env, inspector);
let mut executor = evm_config.create_executor(evm, ctx);
// Disable inspector during pre-execution changes and prior tx replay.
executor.evm_mut().disable_inspector();
if let Err(err) = executor.apply_pre_execution_changes() {
error!(%err, "Failed to apply pre-execution changes for opcode tracing");
return;
}
for (i, tx) in block.transactions_recovered().enumerate() {
if i >= tx_index {
break;
}
if let Err(err) = executor.execute_transaction(tx) {
error!(index = i, %err, "Failed to replay transaction before failing tx");
return;
}
}
// Enable inspector for the target transaction.
executor.evm_mut().enable_inspector();
let tx = match block.transactions_recovered().nth(tx_index) {
Some(tx) => tx,
None => {
error!(tx_index, "Transaction index out of bounds for opcode tracing");
return;
}
};
let mut gas_used = 0;
let mut return_value = Default::default();
let mut execution_result = None;
let result = executor.execute_transaction_with_result_closure(tx, |res| {
gas_used = res.gas_used();
return_value = res.output().cloned().unwrap_or_default();
execution_result = Some(format!("{res:?}"));
});
// Build geth-style trace with opcode-level structLogs.
let frame = executor.evm_mut().inspector_mut().geth_builder().geth_traces(
gas_used,
return_value,
Default::default(),
);
match serde_json::to_string(&frame) {
Ok(json) => {
error!(
block_number = block.number(),
block_hash = ?block.hash(),
tx_index,
tx_hash = ?block.body().transactions()[tx_index].tx_hash(),
?result,
execution_result,
opcode_trace = %json,
"Opcode-level trace for failing transaction"
);
}
Err(err) => {
error!(%err, "Failed to serialize opcode trace");
}
}
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `re-execute` command
pub async fn execute<N>(
@@ -223,6 +329,15 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
// Trace the mismatched tx at the opcode level.
trace_failed_transaction(
&evm_config,
db_at(block.number() - 1),
&block,
i,
);
if skip_invalid_blocks {
executor = evm_config
.batch_executor(db_at(block.number()));

View File

@@ -9,26 +9,6 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// Returns the default number of storage worker threads based on available parallelism.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map_or(8, |n| n.get() * 2)
}
#[cfg(not(feature = "std"))]
{
8
}
}
/// Returns the default number of account worker threads.
///
/// Account workers coordinate storage proof collection and account trie traversal.
/// They are set to the same count as storage workers for simplicity.
fn default_account_worker_count() -> usize {
default_storage_worker_count()
}
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
@@ -147,10 +127,6 @@ pub struct TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Depth for sparse trie pruning after state root computation.
@@ -187,8 +163,6 @@ impl Default for TreeConfig {
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_cache_metrics: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
@@ -220,8 +194,6 @@ impl TreeConfig {
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
@@ -246,8 +218,6 @@ impl TreeConfig {
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_cache_metrics,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
@@ -479,42 +449,6 @@ impl TreeConfig {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
}
/// Setter for the number of storage proof worker threads.
///
/// No-op if it's [`None`].
pub const fn with_storage_worker_count_opt(
mut self,
storage_worker_count: Option<usize>,
) -> Self {
if let Some(count) = storage_worker_count {
self.storage_worker_count = count;
}
self
}
/// Return the number of account proof worker threads.
pub const fn account_worker_count(&self) -> usize {
self.account_worker_count
}
/// Setter for the number of account proof worker threads.
///
/// No-op if it's [`None`].
pub const fn with_account_worker_count_opt(
mut self,
account_worker_count: Option<usize>,
) -> Self {
if let Some(count) = account_worker_count {
self.account_worker_count = count;
}
self
}
/// Returns whether cache metrics recording is disabled.
pub const fn disable_cache_metrics(&self) -> bool {
self.disable_cache_metrics

View File

@@ -2583,6 +2583,51 @@ where
Some(TreeEvent::Download(request))
}
/// Handles a downloaded block that was successfully inserted as valid.
///
/// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
/// If it matches a non-head sync target (safe or finalized), makes it canonical inline
/// and triggers a download for the remaining blocks towards the actual head.
/// Otherwise, tries to connect buffered blocks.
fn on_valid_downloaded_block(
&mut self,
block_num_hash: BlockNumHash,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
// check if we just inserted a block that's part of sync targets,
// i.e. head, safe, or finalized
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
sync_target.contains(block_num_hash.hash)
{
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
if sync_target.head_block_hash == block_num_hash.hash {
// we just inserted the sync target head block, make it canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
})))
}
// This block is part of the sync target (safe or finalized) but not the
// head. Make it canonical and try to connect any buffered children, then
// continue downloading towards the actual head if needed.
self.make_canonical(block_num_hash.hash)?;
self.try_connect_buffered_blocks(block_num_hash)?;
// Check if we've reached the sync target head after connecting buffered
// blocks (e.g. the head block may have already been buffered).
if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
}
return Ok(None)
}
trace!(target: "engine::tree", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash)?;
Ok(None)
}
/// Invoked with a block downloaded from the network
///
/// Returns an event with the appropriate action to take, such as:
@@ -2605,22 +2650,11 @@ where
// try to append the block
match self.insert_block(block) {
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
// check if we just inserted a block that's part of sync targets,
// i.e. head, safe, or finalized
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
sync_target.contains(block_num_hash.hash)
{
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
// we just inserted a block that we know is part of the canonical chain, so we
// can make it canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
})))
}
trace!(target: "engine::tree", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash)?;
Ok(
InsertPayloadOk::Inserted(BlockStatus::Valid) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid),
) => {
return self.on_valid_downloaded_block(block_num_hash);
}
Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
// block is not connected to the canonical head, we need to download

View File

@@ -822,9 +822,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Returns iterator yielding transactions from the stream.
pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
core::iter::repeat_with(|| self.transactions.recv())
.take_while(|res| res.is_ok())
.map(|res| res.unwrap())
self.transactions.iter()
}
}

View File

@@ -558,7 +558,7 @@ where
if !self.precompile_cache_disabled {
// Only cache pure precompiles to avoid issues with stateful precompiles
evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
evm.precompiles_mut().map_cacheable_precompiles(|address, precompile| {
CachedPrecompile::wrap(
precompile,
self.precompile_cache_map.cache_for_address(*address),

View File

@@ -838,19 +838,21 @@ where
if !self.config.precompile_cache_disabled() {
let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
let metrics = self
.precompile_cache_metrics
.entry(*address)
.or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
.clone();
CachedPrecompile::wrap(
precompile,
self.precompile_cache_map.cache_for_address(*address),
spec_id,
Some(metrics),
)
});
executor.evm_mut().precompiles_mut().map_cacheable_precompiles(
|address, precompile| {
let metrics = self
.precompile_cache_metrics
.entry(*address)
.or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
.clone();
CachedPrecompile::wrap(
precompile,
self.precompile_cache_map.cache_for_address(*address),
spec_id,
Some(metrics),
)
},
);
}
// Spawn background task to compute receipt root and logs bloom incrementally.

View File

@@ -1,6 +1,9 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use alloy_primitives::{
map::{DefaultHashBuilder, FbBuildHasher},
Bytes,
};
use moka::policy::EvictionPolicy;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use reth_primitives_traits::dashmap::DashMap;
@@ -13,7 +16,7 @@ const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>, FbBuildHasher<20>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
@@ -37,9 +40,7 @@ where
/// Cache for precompiles, for each input stores the result.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
pub struct PrecompileCache<S>(moka::sync::Cache<Bytes, CacheEntry<S>, DefaultHashBuilder>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;

View File

@@ -139,7 +139,7 @@ impl<N: NodePrimitives> TreeState<N> {
///
/// Both parent hash and anchor hash must match to ensure the overlay is valid.
/// This prevents using a stale overlay after persistence has advanced the anchor.
pub(crate) fn get_cached_overlay(
pub fn get_cached_overlay(
&self,
parent_hash: B256,
expected_anchor: B256,

View File

@@ -2041,3 +2041,126 @@ mod forkchoice_updated_tests {
assert_eq!(last_persisted_number, canonical_tip);
}
}
/// Tests that `on_valid_downloaded_block` triggers a download for the actual head block when
/// the block matches a non-head sync target (safe or finalized).
///
/// This exercises the exact code path fixed in `on_downloaded_block`: after `insert_block`
/// returns `Inserted(Valid)`, `on_valid_downloaded_block` checks `sync_target.contains()`.
/// If the block is NOT the head, it should make canonical inline and emit a `Download`
/// event for the head — rather than returning `MakeCanonical` which would stop the download
/// pipeline.
///
/// Reproduces the hive test failure:
/// "Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts -
/// No Transactions: Timeout while waiting for secondary client to sync"
#[test]
fn test_on_valid_downloaded_non_head_sync_target_continues_to_head() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
// Build blocks: genesis (0) and safe block (1).
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
let genesis = &blocks[0];
let safe_block = &blocks[1];
// Insert genesis and safe block into the tree. The safe block must be in the tree
// for `make_canonical` to succeed inside `on_valid_downloaded_block`.
test_harness = test_harness.with_blocks(vec![genesis.clone(), safe_block.clone()]);
let genesis_hash = genesis.recovered_block().hash();
let safe_hash = safe_block.recovered_block().hash();
let head_hash = B256::random(); // head block is unknown — hasn't been downloaded yet
// Reset canonical head to genesis so the safe block is in tree but not yet canonical.
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
// Set the forkchoice tracker to SYNCING with head != safe.
let fcu_state = ForkchoiceState {
head_block_hash: head_hash,
safe_block_hash: safe_hash,
finalized_block_hash: genesis_hash,
};
test_harness
.tree
.state
.forkchoice_state_tracker
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
// Call on_valid_downloaded_block — this is called by on_downloaded_block after
// insert_block returns Inserted(Valid).
let safe_num_hash = safe_block.recovered_block().num_hash();
let result = test_harness.tree.on_valid_downloaded_block(safe_num_hash).unwrap();
// With the fix: the engine makes safe canonical inline, then emits Download for head.
// Without the fix: it would return MakeCanonical{safe_hash} and never download head.
match result {
Some(TreeEvent::Download(DownloadRequest::BlockSet(hashes))) => {
assert!(
hashes.contains(&head_hash),
"Expected download for head block {head_hash}, got {hashes:?}"
);
}
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
panic!(
"BUG: returned MakeCanonical for non-head block {sync_target_head} \
instead of downloading the actual head {head_hash}"
);
}
other => panic!("Expected Download event for head block, got: {other:?}"),
}
// Verify the safe block was made canonical.
assert_eq!(
test_harness.tree.state.tree_state.canonical_block_hash(),
safe_hash,
"Safe block should be canonical after on_valid_downloaded_block"
);
}
/// Tests that `on_valid_downloaded_block` returns `MakeCanonical` when the downloaded block
/// IS the sync target head (the normal non-buggy path).
#[test]
fn test_on_valid_downloaded_head_sync_target_returns_make_canonical() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
let genesis = &blocks[0];
let head_block = &blocks[1];
test_harness = test_harness.with_blocks(vec![genesis.clone(), head_block.clone()]);
let genesis_hash = genesis.recovered_block().hash();
let head_hash = head_block.recovered_block().hash();
// Reset canonical head to genesis.
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
// Set the forkchoice tracker: head == the downloaded block.
let fcu_state = ForkchoiceState {
head_block_hash: head_hash,
safe_block_hash: head_hash,
finalized_block_hash: genesis_hash,
};
test_harness
.tree
.state
.forkchoice_state_tracker
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
let head_num_hash = head_block.recovered_block().num_hash();
let result = test_harness.tree.on_valid_downloaded_block(head_num_hash).unwrap();
// When the downloaded block IS the head, should return MakeCanonical.
match result {
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
assert_eq!(sync_target_head, head_hash);
}
other => panic!("Expected MakeCanonical for head block, got: {other:?}"),
}
}

View File

@@ -108,7 +108,7 @@ impl EngineMessageStore {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
Ok(filenames_by_ts.into_values().flatten())
}
}

View File

@@ -242,7 +242,7 @@ mod tests {
#[test]
fn test_valid_gas_limit_increase() {
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
let child = header_with_gas_limit((parent.gas_limit + 5) as u64);
let child = header_with_gas_limit(parent.gas_limit + 5);
assert!(validate_against_parent_gas_limit(
&child,
@@ -260,7 +260,7 @@ mod tests {
assert!(matches!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
ConsensusError::GasLimitInvalidMinimum { child_gas_limit }
if child_gas_limit == child.gas_limit as u64
if child_gas_limit == child.gas_limit
));
}

View File

@@ -28,10 +28,6 @@ alloy-evm.workspace = true
alloy-consensus.workspace = true
alloy-rpc-types-engine.workspace = true
# Misc
parking_lot = { workspace = true, optional = true }
derive_more = { workspace = true, optional = true }
[dev-dependencies]
reth-testing-utils.workspace = true
reth-evm = { workspace = true, features = ["test-utils"] }
@@ -54,14 +50,11 @@ std = [
"reth-primitives-traits/std",
"revm/std",
"reth-ethereum-primitives/std",
"derive_more?/std",
"alloy-rpc-types-engine/std",
"reth-storage-errors/std",
]
test-utils = [
"std",
"dep:parking_lot",
"dep:derive_more",
"reth-chainspec/test-utils",
"reth-ethereum-primitives/test-utils",
"reth-evm/test-utils",

View File

@@ -3,7 +3,7 @@ use alloy_consensus::{
proofs::{self, calculate_receipt_root},
Block, BlockBody, BlockHeader, Header, TxReceipt, EMPTY_OMMER_ROOT_HASH,
};
use alloy_eips::merge::BEACON_NONCE;
use alloy_eips::{eip4895::Withdrawals, merge::BEACON_NONCE};
use alloy_evm::{block::BlockExecutorFactory, eth::EthBlockExecutionCtx};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_evm::execute::{BlockAssembler, BlockAssemblerInput, BlockExecutionError};
@@ -61,7 +61,7 @@ where
let withdrawals = self
.chain_spec
.is_shanghai_active_at_timestamp(timestamp)
.then(|| ctx.withdrawals.map(|w| w.into_owned()).unwrap_or_default());
.then(|| Withdrawals::new(ctx.withdrawals.map(|w| w.into_owned()).unwrap_or_default()));
let withdrawals_root =
withdrawals.as_deref().map(|w| proofs::calculate_withdrawals_root(w));

View File

@@ -192,7 +192,7 @@ where
parent_hash: block.header().parent_hash,
parent_beacon_block_root: block.header().parent_beacon_block_root,
ommers: &block.body().ommers,
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
withdrawals: block.body().withdrawals.as_ref().map(|w| Cow::Borrowed(w.as_slice())),
extra_data: block.header().extra_data.clone(),
})
}
@@ -207,7 +207,7 @@ where
parent_hash: parent.hash(),
parent_beacon_block_root: attributes.parent_beacon_block_root,
ommers: &[],
withdrawals: attributes.withdrawals.map(Cow::Owned),
withdrawals: attributes.withdrawals.map(|w| Cow::Owned(w.into_inner())),
extra_data: attributes.extra_data,
})
}
@@ -287,7 +287,7 @@ where
parent_hash: payload.parent_hash(),
parent_beacon_block_root: payload.sidecar.parent_beacon_block_root(),
ommers: &[],
withdrawals: payload.payload.withdrawals().map(|w| Cow::Owned(w.clone().into())),
withdrawals: payload.payload.withdrawals().map(|w| Cow::Borrowed(w.as_slice())),
extra_data: payload.payload.as_v1().extra_data.clone(),
})
}

View File

@@ -1,221 +1,8 @@
use crate::EthEvmConfig;
use alloc::{boxed::Box, sync::Arc, vec, vec::Vec};
use alloy_consensus::{Header, TxType};
use alloy_eips::eip7685::Requests;
use alloy_evm::precompiles::PrecompilesMap;
use alloy_primitives::Bytes;
use alloy_rpc_types_engine::ExecutionData;
use parking_lot::Mutex;
use reth_ethereum_primitives::{Receipt, TransactionSigned};
use reth_evm::{
block::{
BlockExecutionError, BlockExecutor, BlockExecutorFactory, BlockExecutorFor, ExecutableTx,
},
eth::{EthBlockExecutionCtx, EthEvmContext, EthTxResult},
ConfigureEngineEvm, ConfigureEvm, Database, EthEvm, EthEvmFactory, Evm, EvmEnvFor, EvmFactory,
ExecutableTxIterator, ExecutionCtxFor, RecoveredTx,
};
use reth_execution_types::{BlockExecutionResult, ExecutionOutcome};
use reth_primitives_traits::{BlockTy, SealedBlock, SealedHeader};
use revm::{
context::result::{ExecutionResult, HaltReason, Output, ResultAndState, SuccessReason},
database::State,
Inspector,
};
use reth_evm::noop::NoopEvmConfig;
/// A helper type alias for mocked block executor provider.
pub type MockExecutorProvider = MockEvmConfig;
/// A block executor provider that returns mocked execution results.
#[derive(Clone, Debug)]
pub struct MockEvmConfig {
inner: EthEvmConfig,
exec_results: Arc<Mutex<Vec<ExecutionOutcome>>>,
}
impl Default for MockEvmConfig {
fn default() -> Self {
Self { inner: EthEvmConfig::mainnet(), exec_results: Default::default() }
}
}
impl MockEvmConfig {
/// Extend the mocked execution results
pub fn extend(&self, results: impl IntoIterator<Item = impl Into<ExecutionOutcome>>) {
self.exec_results.lock().extend(results.into_iter().map(Into::into));
}
}
impl BlockExecutorFactory for MockEvmConfig {
type EvmFactory = EthEvmFactory;
type ExecutionCtx<'a> = EthBlockExecutionCtx<'a>;
type Receipt = Receipt;
type Transaction = TransactionSigned;
fn evm_factory(&self) -> &Self::EvmFactory {
self.inner.evm_factory()
}
fn create_executor<'a, DB, I>(
&'a self,
evm: EthEvm<&'a mut State<DB>, I, PrecompilesMap>,
_ctx: Self::ExecutionCtx<'a>,
) -> impl BlockExecutorFor<'a, Self, DB, I>
where
DB: Database + 'a,
I: Inspector<<Self::EvmFactory as EvmFactory>::Context<&'a mut State<DB>>> + 'a,
{
MockExecutor {
result: self.exec_results.lock().pop().unwrap(),
evm,
hook: None,
receipts: Vec::new(),
}
}
}
/// Mock executor that returns a fixed execution result.
#[derive(derive_more::Debug)]
pub struct MockExecutor<'a, DB: Database, I> {
result: ExecutionOutcome,
evm: EthEvm<&'a mut State<DB>, I, PrecompilesMap>,
#[debug(skip)]
hook: Option<Box<dyn reth_evm::OnStateHook>>,
receipts: Vec<Receipt>,
}
impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExecutor
for MockExecutor<'a, DB, I>
{
type Evm = EthEvm<&'a mut State<DB>, I, PrecompilesMap>;
type Transaction = TransactionSigned;
type Receipt = Receipt;
type Result = EthTxResult<HaltReason, TxType>;
fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
Ok(())
}
fn receipts(&self) -> &[Self::Receipt] {
&self.receipts
}
fn execute_transaction_without_commit(
&mut self,
tx: impl ExecutableTx<Self>,
) -> Result<Self::Result, BlockExecutionError> {
Ok(EthTxResult {
result: ResultAndState::new(
ExecutionResult::Success {
reason: SuccessReason::Return,
gas_used: 0,
gas_refunded: 0,
logs: vec![],
output: Output::Call(Bytes::from(vec![])),
},
Default::default(),
),
tx_type: tx.into_parts().1.tx().tx_type(),
blob_gas_used: 0,
})
}
fn commit_transaction(&mut self, _output: Self::Result) -> Result<u64, BlockExecutionError> {
Ok(0)
}
fn finish(
self,
) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
let Self { result, mut evm, .. } = self;
let ExecutionOutcome { bundle, receipts, requests, first_block: _ } = result;
let result = BlockExecutionResult {
receipts: receipts.into_iter().flatten().collect(),
requests: requests.into_iter().fold(Requests::default(), |mut reqs, req| {
reqs.extend(req);
reqs
}),
gas_used: 0,
blob_gas_used: 0,
};
evm.db_mut().bundle_state = bundle;
Ok((evm, result))
}
fn set_state_hook(&mut self, hook: Option<Box<dyn reth_evm::OnStateHook>>) {
self.hook = hook;
}
fn evm(&self) -> &Self::Evm {
&self.evm
}
fn evm_mut(&mut self) -> &mut Self::Evm {
&mut self.evm
}
}
impl ConfigureEvm for MockEvmConfig {
type BlockAssembler = <EthEvmConfig as ConfigureEvm>::BlockAssembler;
type BlockExecutorFactory = Self;
type Error = <EthEvmConfig as ConfigureEvm>::Error;
type NextBlockEnvCtx = <EthEvmConfig as ConfigureEvm>::NextBlockEnvCtx;
type Primitives = <EthEvmConfig as ConfigureEvm>::Primitives;
fn block_executor_factory(&self) -> &Self::BlockExecutorFactory {
self
}
fn block_assembler(&self) -> &Self::BlockAssembler {
self.inner.block_assembler()
}
fn evm_env(&self, header: &Header) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner.evm_env(header)
}
fn next_evm_env(
&self,
parent: &Header,
attributes: &Self::NextBlockEnvCtx,
) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner.next_evm_env(parent, attributes)
}
fn context_for_block<'a>(
&self,
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> Result<reth_evm::ExecutionCtxFor<'a, Self>, Self::Error> {
self.inner.context_for_block(block)
}
fn context_for_next_block(
&self,
parent: &SealedHeader,
attributes: Self::NextBlockEnvCtx,
) -> Result<reth_evm::ExecutionCtxFor<'_, Self>, Self::Error> {
self.inner.context_for_next_block(parent, attributes)
}
}
impl ConfigureEngineEvm<ExecutionData> for MockEvmConfig {
fn evm_env_for_payload(&self, payload: &ExecutionData) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner.evm_env_for_payload(payload)
}
fn context_for_payload<'a>(
&self,
payload: &'a ExecutionData,
) -> Result<ExecutionCtxFor<'a, Self>, Self::Error> {
self.inner.context_for_payload(payload)
}
fn tx_iterator_for_payload(
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
self.inner.tx_iterator_for_payload(payload)
}
}
/// Mock for EVM config.
pub type MockEvmConfig = NoopEvmConfig<EthEvmConfig>;

View File

@@ -315,7 +315,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
&'a self,
evm: EvmFor<Self, &'a mut State<DB>, I>,
ctx: <Self::BlockExecutorFactory as BlockExecutorFactory>::ExecutionCtx<'a>,
) -> impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB, I>
) -> impl BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>, I>
where
DB: Database,
I: InspectorFor<Self, &'a mut State<DB>> + 'a,
@@ -328,7 +328,8 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
&'a self,
db: &'a mut State<DB>,
block: &'a SealedBlock<<Self::Primitives as NodePrimitives>::Block>,
) -> Result<impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB>, Self::Error> {
) -> Result<impl BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>>, Self::Error>
{
let evm = self.evm_for_block(db, block.header())?;
let ctx = self.context_for_block(block)?;
Ok(self.create_executor(evm, ctx))
@@ -356,7 +357,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
ctx: <Self::BlockExecutorFactory as BlockExecutorFactory>::ExecutionCtx<'a>,
) -> impl BlockBuilder<
Primitives = Self::Primitives,
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, DB, I>,
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>, I>,
>
where
DB: Database,
@@ -408,7 +409,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
) -> Result<
impl BlockBuilder<
Primitives = Self::Primitives,
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, DB>,
Executor: BlockExecutorFor<'a, Self::BlockExecutorFactory, &'a mut State<DB>>,
>,
Self::Error,
> {

View File

@@ -70,3 +70,27 @@ where
self.inner().context_for_next_block(parent, attributes)
}
}
#[cfg(feature = "std")]
impl<Inner, T> crate::ConfigureEngineEvm<T> for NoopEvmConfig<Inner>
where
Inner: crate::ConfigureEngineEvm<T>,
{
fn evm_env_for_payload(&self, payload: &T) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner().evm_env_for_payload(payload)
}
fn context_for_payload<'a>(
&self,
payload: &'a T,
) -> Result<crate::ExecutionCtxFor<'a, Self>, Self::Error> {
self.inner().context_for_payload(payload)
}
fn tx_iterator_for_payload(
&self,
payload: &T,
) -> Result<impl crate::ExecutableTxIterator<Self>, Self::Error> {
self.inner().tx_iterator_for_payload(payload)
}
}

View File

@@ -202,6 +202,18 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks().iter().map(|block| block.1)
}
/// Returns an iterator over all transactions in the chain.
pub fn transactions_iter(&self) -> impl Iterator<Item = &N::SignedTx> + '_ {
self.blocks_iter().flat_map(|block| block.body().transactions())
}
/// Returns an iterator over all [`Recovered`] transaction references in the chain.
pub fn transactions_recovered_iter(
&self,
) -> impl Iterator<Item = Recovered<&N::SignedTx>> + '_ {
self.blocks_iter().flat_map(|block| block.transactions_recovered())
}
/// Returns an iterator over all blocks and their receipts in the chain.
pub fn blocks_and_receipts(
&self,

View File

@@ -376,7 +376,7 @@ mod tests {
);
for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
pipeline_results.iter().zip(backfill_results).enumerate()
{
backfill_output.state.reverts.sort();

View File

@@ -505,9 +505,6 @@ where
}
let buffer_full = this.buffer.len() >= this.max_capacity;
// Update capacity
this.update_capacity();
// Advance all poll senders
let mut min_id = usize::MAX;
for idx in (0..this.exex_handles.len()).rev() {

View File

@@ -955,10 +955,8 @@ impl Discv4Service {
// Check if ENR was updated
match (last_enr_seq, old_enr) {
(Some(new), Some(old)) => {
if new > old {
self.send_enr_request(record);
}
(Some(new), Some(old)) if new > old => {
self.send_enr_request(record);
}
(Some(_), None) => {
// got an ENR
@@ -1195,10 +1193,8 @@ impl Discv4Service {
} else {
// Request ENR if included in the ping
match (ping.enr_sq, old_enr) {
(Some(new), Some(old)) => {
if new > old {
self.send_enr_request(record);
}
(Some(new), Some(old)) if new > old => {
self.send_enr_request(record);
}
(Some(_), None) => {
self.send_enr_request(record);
@@ -1355,10 +1351,8 @@ impl Discv4Service {
_ => return,
};
match (fork_id, old_fork_id) {
(Some(new), Some(old)) => {
if new != old {
self.notify(DiscoveryUpdate::EnrForkId(record, new))
}
(Some(new), Some(old)) if new != old => {
self.notify(DiscoveryUpdate::EnrForkId(record, new))
}
(Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
_ => {}

View File

@@ -52,6 +52,7 @@ where
pub(crate) fn clear(&mut self) {
self.inner.clear();
self.last_requested_block_number.take();
self.metrics.clear();
}
/// Add new request to the queue.
/// Expects a sorted list of headers.

View File

@@ -60,6 +60,15 @@ impl BodyDownloaderMetrics {
_error => self.unexpected_errors.increment(1),
}
}
/// Clear all gauge metrics by setting them to 0.
pub fn clear(&self) {
self.in_flight_requests.set(0);
self.buffered_responses.set(0);
self.buffered_blocks.set(0);
self.buffered_blocks_size_bytes.set(0);
self.queued_blocks.set(0);
}
}
/// Metrics for an individual response, i.e. the size in bytes, and length (number of bodies) in the

View File

@@ -301,6 +301,20 @@ impl Discovery {
}
}
impl Drop for Discovery {
fn drop(&mut self) {
if let Some(discv4) = &self.discv4 {
discv4.terminate();
}
if let Some(handle) = self._discv4_service.take() {
handle.abort();
}
if let Some(handle) = self._dns_disc_service.take() {
handle.abort();
}
}
}
impl Stream for Discovery {
type Item = DiscoveryEvent;

View File

@@ -431,19 +431,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Returns an iterator over all peers in the peer set.
pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.swarm.state().peers().iter_peers()
self.swarm.peers().iter_peers()
}
/// Returns the number of peers in the peer set.
pub fn num_known_peers(&self) -> usize {
self.swarm.state().peers().num_known_peers()
self.swarm.peers().num_known_peers()
}
/// Returns a new [`PeersHandle`] that can be cloned and shared.
///
/// The [`PeersHandle`] can be used to interact with the network's peer set.
pub fn peers_handle(&self) -> PeersHandle {
self.swarm.state().peers().handle()
self.swarm.peers().handle()
}
/// Collect the peers from the [`NetworkManager`] and write them to the given
@@ -452,7 +452,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Only persists peers that are not currently backed off or banned. Includes metadata like
/// peer kind, fork ID, and reputation.
pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
let peers = self.swarm.state().peers().persistable_peers().collect::<Vec<_>>();
let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
Ok(())
@@ -730,10 +730,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
let _ = tx.send(());
}
NetworkHandleMessage::ReputationChange(peer_id, kind) => {
self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
}
NetworkHandleMessage::GetReputationById(peer_id, tx) => {
let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
}
NetworkHandleMessage::FetchClient(tx) => {
let _ = tx.send(self.fetch_client());
@@ -756,7 +756,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
let peer_ids = self.swarm.peers().peers_by_kind(kind);
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
@@ -791,7 +791,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.metrics.total_incoming_connections.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
.set(self.swarm.peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
@@ -829,7 +829,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}
if direction.is_outgoing() {
self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
self.swarm.peers_mut().on_active_outgoing_established(peer_id);
}
self.update_active_connection_metrics();
@@ -857,12 +857,12 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
@@ -877,23 +877,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
);
// Capture direction before state is reset to Idle
let is_inbound = self.swarm.state().peers().is_inbound_peer(&peer_id);
let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
let reason = if let Some(ref err) = error {
// If the connection was closed due to an error, we report
// the peer
self.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
err.as_disconnected()
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
self.backed_off_peers_metrics
.increment_for_reason(BackoffReason::GracefulClose);
None
@@ -908,9 +904,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.disconnect_metrics.increment_outbound(reason);
}
}
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
@@ -940,7 +934,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.closed_sessions_metrics.incoming_pending.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
.set(self.swarm.peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
@@ -952,7 +946,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
);
if let Some(ref err) = error {
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
self.swarm.peers_mut().on_outgoing_pending_session_dropped(
&remote_addr,
&peer_id,
err,
@@ -972,9 +966,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}
self.closed_sessions_metrics.outgoing_pending.increment(1);
self.update_pending_connection_metrics();
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
@@ -985,16 +977,14 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
"Outgoing connection error"
);
self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
self.swarm.peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
self.update_pending_connection_metrics();
}
SwarmEvent::BadMessage { peer_id } => {
@@ -1052,12 +1042,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Updates the metrics for active,established connections
#[inline]
fn update_active_connection_metrics(&self) {
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
}
/// Updates the metrics for pending connections
@@ -1065,7 +1051,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
fn update_pending_connection_metrics(&self) {
self.metrics
.pending_outgoing_connections
.set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
.set(self.swarm.peers().num_pending_outbound_connections() as f64);
self.metrics
.total_pending_connections
.set(self.swarm.sessions().num_pending_connections() as f64);

View File

@@ -862,7 +862,7 @@ impl PeersManager {
}
}
if kind.filter(|kind| kind.is_trusted()).is_some() {
if kind.is_some_and(|kind| kind.is_trusted()) {
// also track the peer in the peer id set
self.trusted_peer_ids.insert(peer_id);
}

View File

@@ -1,7 +1,7 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::PeerMessage,
peers::InboundConnectionError,
peers::{InboundConnectionError, PeersManager},
protocol::IntoRlpxSubProtocol,
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
@@ -98,6 +98,16 @@ impl<N: NetworkPrimitives> Swarm<N> {
pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
&mut self.sessions
}
/// Access to the [`PeersManager`].
pub(crate) const fn peers(&self) -> &PeersManager {
self.state.peers()
}
/// Mutable access to the [`PeersManager`].
pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
self.state.peers_mut()
}
}
impl<N: NetworkPrimitives> Swarm<N> {
@@ -190,9 +200,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
return None
}
// ensure we can handle an incoming connection from this address
if let Err(err) =
self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
{
if let Err(err) = self.peers_mut().on_incoming_pending_session(remote_addr.ip()) {
match err {
InboundConnectionError::IpBanned => {
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
@@ -256,21 +264,21 @@ impl<N: NetworkPrimitives> Swarm<N> {
//
// When disabled (default), peers without a fork ID are admitted immediately.
// Peers that *do* carry a fork ID are always validated against ours.
let enforce = self.state().peers().enforce_enr_fork_id();
let enforce = self.peers().enforce_enr_fork_id();
let allow = match fork_id {
Some(f) => self.sessions.is_valid_fork_id(f),
None => !enforce,
};
if allow {
self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
self.peers_mut().add_peer(peer_id, addr, fork_id);
}
}
StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().add_peer(peer_id, addr, Some(fork_id));
self.peers_mut().add_peer(peer_id, addr, Some(fork_id));
} else {
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
self.state_mut().peers_mut().remove_peer(peer_id);
self.peers_mut().remove_peer(peer_id);
}
}
}
@@ -279,18 +287,18 @@ impl<N: NetworkPrimitives> Swarm<N> {
/// Set network connection state to `ShuttingDown`
pub(crate) const fn on_shutdown_requested(&mut self) {
self.state_mut().peers_mut().on_shutdown();
self.peers_mut().on_shutdown();
}
/// Checks if the node's network connection state is '`ShuttingDown`'
#[inline]
pub(crate) const fn is_shutting_down(&self) -> bool {
self.state().peers().connection_state().is_shutting_down()
self.peers().connection_state().is_shutting_down()
}
/// Set network connection state to `Hibernate` or `Active`
pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
self.state_mut().peers_mut().on_network_state_change(network_state);
self.peers_mut().on_network_state_change(network_state);
}
}

View File

@@ -78,6 +78,10 @@ pub struct BenchmarkArgs {
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
pub reth_new_payload: bool,
/// Fetch and replay RLP-encoded blocks. Implies `reth_new_payload`.
#[arg(long, default_value = "false", verbatim_doc_comment)]
pub rlp_blocks: bool,
}
#[cfg(test)]

View File

@@ -461,8 +461,6 @@ impl EngineArgs {
self.always_process_payload_attributes_on_canonical_head,
)
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
.with_storage_worker_count_opt(self.storage_worker_count)
.with_account_worker_count_opt(self.account_worker_count)
.without_cache_metrics(self.cache_metrics_disabled)
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)

View File

@@ -8,7 +8,7 @@ use jsonrpsee_types::error::{
};
use reth_engine_primitives::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::EngineObjectValidationError;
use reth_payload_primitives::{EngineObjectValidationError, VersionSpecificValidationError};
use thiserror::Error;
/// The Engine API result type
@@ -117,11 +117,14 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EngineObjectValidationError::Payload(_) |
EngineObjectValidationError::InvalidParams(_) |
// Per Engine API spec, structure validation errors for PayloadAttributes
// (e.g., missing withdrawals post-Shanghai, missing parentBeaconBlockRoot
// post-Cancun) should return -32602 "Invalid params", not -38003.
// (e.g., missing withdrawals post-Shanghai) should return -32602 "Invalid params".
// See: https://github.com/ethereum/execution-apis/blob/main/src/engine/shanghai.md
// Fixes: https://github.com/paradigmxyz/reth/issues/8732
EngineObjectValidationError::PayloadAttributes(_),
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::WithdrawalsNotSupportedInV1 |
VersionSpecificValidationError::NoWithdrawalsPostShanghai |
VersionSpecificValidationError::HasWithdrawalsPreShanghai,
),
) |
EngineApiError::UnexpectedRequestsHash => {
// Note: the data field is not required by the spec, but is also included by other
@@ -145,6 +148,16 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
Some(ErrorData::new(error)),
)
}
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3 |
VersionSpecificValidationError::NoParentBeaconBlockRootPostCancun,
),
) => jsonrpsee_types::error::ErrorObject::owned(
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
Some(ErrorData::new(error)),
),
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::UnsupportedFork,
) => jsonrpsee_types::error::ErrorObject::owned(
@@ -198,8 +211,6 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
mod tests {
use super::*;
use alloy_rpc_types_engine::ForkchoiceUpdateError;
use reth_payload_primitives::VersionSpecificValidationError;
#[track_caller]
fn ensure_engine_rpc_error(
code: i32,
@@ -265,5 +276,16 @@ mod tests {
),
),
);
// Beacon root shape mismatches on PayloadAttributes are reported as -38003.
ensure_engine_rpc_error(
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3,
),
),
);
}
}

View File

@@ -74,8 +74,12 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
block_id: BlockId,
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send {
async move {
// If no pending block from provider, build the pending block locally.
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// If no pending block from provider, build the pending block locally.
if let Some(pending) = self.local_pending_block().await? {
return Ok(Some(pending.block.body().transaction_count()));
}
@@ -180,6 +184,10 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
{
async move {
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// First, try to get the pending block from the provider, in case we already
// received the actual pending block from the CL.
if let Some((block, receipts)) = self
@@ -284,6 +292,10 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt {
> + Send {
async move {
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// Pending block can be fetched directly without need for caching
if let Some(pending_block) =
self.provider().pending_block().map_err(Self::Error::from_eth_err)?

View File

@@ -20,8 +20,8 @@ use alloy_rpc_types_eth::{
use futures::Future;
use reth_errors::{ProviderError, RethError};
use reth_evm::{
env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor,
InspectorFor, TransactionEnv, TxEnvFor,
block::BlockExecutor, env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm,
EvmEnvFor, HaltReasonFor, InspectorFor, TransactionEnv, TxEnvFor,
};
use reth_node_api::BlockBody;
use reth_primitives_traits::Recovered;
@@ -746,6 +746,14 @@ pub trait Call:
self.spawn_with_state_at_block(parent_block, move |this, mut db| {
let block_txs = block.transactions_recovered();
// apply pre-execution changes (e.g. EIP-4788 beacon root, EIP-2935 blockhashes)
RpcNodeCore::evm_config(&this)
.executor_for_block(&mut db, block.sealed_block())
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?
.apply_pre_execution_changes()
.map_err(Self::Error::from_eth_err)?;
// replay all transactions prior to the targeted transaction
this.replay_transactions_until(&mut db, evm_env.clone(), block_txs, *tx.tx_hash())?;

View File

@@ -437,6 +437,8 @@ where
if replay_block_txs {
// only need to replay the transactions in the block if not all transactions are
// to be replayed
eth_api.apply_pre_execution_changes(&block, &mut db)?;
let transactions = block.transactions_recovered().take(num_txs);
// Execute all transactions until index

View File

@@ -155,6 +155,10 @@ where
return Ok(None)
};
if start_block == 0 {
return Ok(Some(ExecutionOutcome::default()))
}
let state_provider = self.provider().history_by_block_number(start_block - 1)?;
let db = reth_revm::database::StateProviderDatabase::new(&state_provider);

View File

@@ -476,26 +476,28 @@ where
&self,
block_id: BlockId,
) -> Result<Option<Vec<LocalizedTransactionTrace>>, Eth::Error> {
let traces = self.eth_api().trace_block_with(
block_id,
None,
TracingInspectorConfig::default_parity(),
|tx_info, mut ctx| {
let traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
);
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let block = self.eth_api().recovered_block(block_id);
let (maybe_traces, maybe_block) = futures::try_join!(traces, block)?;
let mut traces = self
.eth_api()
.trace_block_with(
block_id,
Some(block.clone()),
TracingInspectorConfig::default_parity(),
|tx_info, mut ctx| {
let traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await?
.map(|traces| traces.into_iter().flatten().collect::<Vec<_>>());
let mut maybe_traces =
maybe_traces.map(|traces| traces.into_iter().flatten().collect::<Vec<_>>());
if let (Some(block), Some(traces)) = (maybe_block, maybe_traces.as_mut()) &&
if let Some(traces) = traces.as_mut() &&
let Some(base_block_reward) = self.calculate_base_block_reward(block.header())?
{
traces.extend(self.extract_reward_traces(
@@ -505,7 +507,7 @@ where
));
}
Ok(maybe_traces)
Ok(traces)
}
/// Replays all transactions in a block
@@ -550,11 +552,15 @@ where
&self,
block_id: BlockId,
) -> Result<Option<BlockOpcodeGas>, Eth::Error> {
let res = self
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let Some(transactions) = self
.eth_api()
.trace_block_inspector(
block_id,
None,
Some(block.clone()),
OpcodeGasInspector::default,
move |tx_info, ctx| {
let trace = TransactionOpcodeGas {
@@ -564,11 +570,10 @@ where
Ok(trace)
},
)
.await?;
let Some(transactions) = res else { return Ok(None) };
let Some(block) = self.eth_api().recovered_block(block_id).await? else { return Ok(None) };
.await?
else {
return Ok(None);
};
Ok(Some(BlockOpcodeGas {
block_hash: block.hash(),
@@ -583,11 +588,15 @@ where
&self,
block_id: BlockId,
) -> Result<Option<BlockStorageAccess>, Eth::Error> {
let res = self
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let Some(transactions) = self
.eth_api()
.trace_block_inspector(
block_id,
None,
Some(block.clone()),
StorageInspector::default,
move |tx_info, ctx| {
let trace = TransactionStorageAccess {
@@ -599,11 +608,10 @@ where
Ok(trace)
},
)
.await?;
let Some(transactions) = res else { return Ok(None) };
let Some(block) = self.eth_api().recovered_block(block_id).await? else { return Ok(None) };
.await?
else {
return Ok(None);
};
Ok(Some(BlockStorageAccess {
block_hash: block.hash(),

View File

@@ -552,8 +552,7 @@ mod tests {
if storage_cursor
.seek_by_key_subkey(bn_address.address(), entry.key)?
.filter(|e| e.key == entry.key)
.is_some()
.is_some_and(|e| e.key == entry.key)
{
storage_cursor.delete_current()?;
}

View File

@@ -742,7 +742,7 @@ mod tests {
accounts.insert(key, (account, storage));
}
Ok(state_root_prehashed(accounts.into_iter()))
Ok(state_root_prehashed(accounts))
})?;
let static_file_provider = self.db.factory.static_file_provider();

View File

@@ -64,10 +64,8 @@ where
let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
for (key, indices) in cache.drain() {
let last = *indices.last().expect("qed");
collector.insert(
sharded_key_factory(key, last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
)?;
collector
.insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
}
Ok::<(), StageError>(())
};
@@ -129,10 +127,8 @@ where
let mut insert_fn = |address: Address, indices: Vec<u64>| {
let last = indices.last().expect("indices is non-empty");
collector.insert(
ShardedKey::new(address, *last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
)?;
collector
.insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
Ok(())
};
@@ -190,7 +186,7 @@ where
let last = indices.last().expect("qed");
collector.insert(
StorageShardedKey::new(key.0 .0, key.0 .1, *last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
BlockNumberList::new_pre_sorted(indices),
)?;
Ok::<(), StageError>(())
};

View File

@@ -424,8 +424,7 @@ impl TestStageDB {
let mut cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
if cursor
.seek_by_key_subkey(address, entry.key)?
.filter(|e| e.key == entry.key)
.is_some()
.is_some_and(|e| e.key == entry.key)
{
cursor.delete_current()?;
}
@@ -434,8 +433,7 @@ impl TestStageDB {
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
if cursor
.seek_by_key_subkey(hashed_address, hashed_entry.key)?
.filter(|e| e.key == hashed_entry.key)
.is_some()
.is_some_and(|e| e.key == hashed_entry.key)
{
cursor.delete_current()?;
}

View File

@@ -161,7 +161,7 @@ impl Environment {
mdbx_result(ffi::mdbx_env_stat_ex(
self.env_ptr(),
ptr::null(),
stat.mdb_stat(),
stat.mdbx_stat(),
size_of::<Stat>(),
))?;
Ok(stat)
@@ -305,13 +305,13 @@ unsafe impl Sync for EnvPtr {}
pub struct Stat(ffi::MDBX_stat);
impl Stat {
/// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
/// Create a new Stat with zero'd inner struct `ffi::MDBX_stat`.
pub(crate) const fn new() -> Self {
unsafe { Self(mem::zeroed()) }
}
/// Returns a mut pointer to `ffi::MDB_stat`.
pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
/// Returns a mut pointer to `ffi::MDBX_stat`.
pub(crate) const fn mdbx_stat(&mut self) -> *mut ffi::MDBX_stat {
&mut self.0
}
}

View File

@@ -177,7 +177,7 @@ where
self.env().txn_manager().remove_active_read_transaction(txn);
let mut latency = CommitLatency::new();
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) })
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdbx_commit_latency()) })
.map(|v| (v, latency))
} else {
let (sender, rx) = sync_channel(0);
@@ -246,7 +246,7 @@ where
unsafe {
let mut stat = Stat::new();
self.txn_execute(|txn| {
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdbx_stat(), size_of::<Stat>()))
})??;
Ok(stat)
}
@@ -647,7 +647,7 @@ impl CommitLatency {
}
/// Returns a mut pointer to `ffi::MDBX_commit_latency`.
pub(crate) const fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
pub(crate) const fn mdbx_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
&mut self.0
}
}

View File

@@ -100,7 +100,7 @@ impl TxnManager {
.send({
let mut latency = CommitLatency::new();
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
ffi::mdbx_txn_commit_ex(tx.0, latency.mdbx_commit_latency())
})
.map(|v| (v, latency))
})

View File

@@ -617,10 +617,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
let mut tx_num = tx_nums[i];
for transaction in recovered_block.body().transactions_iter() {
for (tx_num, transaction) in
(tx_nums[i]..).zip(recovered_block.body().transactions_iter())
{
all_tx_hashes.push((*transaction.tx_hash(), tx_num));
tx_num += 1;
}
}
@@ -2756,8 +2756,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: hashed_storage_key, value: *old_storage_value };
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, hashed_storage_key)?
.filter(|s| s.key == hashed_storage_key)
.is_some()
.is_some_and(|s| s.key == hashed_storage_key)
{
hashed_storage_cursor.delete_current()?
}
@@ -2798,8 +2797,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
.is_some_and(|s| s.key == *storage_key)
{
plain_storage_cursor.delete_current()?
}
@@ -2921,8 +2919,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: hashed_storage_key, value: *old_storage_value };
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, hashed_storage_key)?
.filter(|s| s.key == hashed_storage_key)
.is_some()
.is_some_and(|s| s.key == hashed_storage_key)
{
hashed_storage_cursor.delete_current()?
}
@@ -2965,8 +2962,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
.is_some_and(|s| s.key == *storage_key)
{
plain_storage_cursor.delete_current()?
}
@@ -3200,8 +3196,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
.is_some_and(|entry| entry.key == key)
{
hashed_storage.delete_current()?;
}
@@ -3248,8 +3243,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
.is_some_and(|entry| entry.key == key)
{
hashed_storage_cursor.delete_current()?;
}

View File

@@ -87,7 +87,7 @@ impl RocksDBProvider {
/// Heals the `TransactionHashNumbers` table.
///
/// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything
/// - Fast path: if checkpoint == 0, clear any stale data and return
/// - If `sf_tip` < checkpoint, return unwind target (static files behind)
/// - If `sf_tip` == checkpoint, nothing to do
/// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
@@ -112,11 +112,11 @@ impl RocksDBProvider {
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"TransactionHashNumbers has data but checkpoint is 0, clearing all"
"TransactionHashNumbers: checkpoint is 0, clearing stale data"
);
self.clear::<tables::TransactionHashNumbers>()?;
return Ok(None);
@@ -264,11 +264,11 @@ impl RocksDBProvider {
.map(|cp| cp.block_number)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"StoragesHistory has data but checkpoint is 0, clearing all"
"StoragesHistory: checkpoint is 0, clearing stale data"
);
self.clear::<tables::StoragesHistory>()?;
return Ok(None);
@@ -358,11 +358,11 @@ impl RocksDBProvider {
.map(|cp| cp.block_number)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"AccountsHistory has data but checkpoint is 0, clearing all"
"AccountsHistory: checkpoint is 0, clearing stale data"
);
self.clear::<tables::AccountsHistory>()?;
return Ok(None);
@@ -506,6 +506,35 @@ mod tests {
assert_eq!(result, None);
}
/// Tests that `checkpoint=0` with empty `RocksDB` returns early without attempting
/// an expensive healing loop. Previously, when `sf_tip` > `checkpoint=0`, the healer
/// would iterate billions of transactions from static files for no effect, causing
/// the node to hang on startup with MDBX read transaction timeouts.
#[test]
fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());
// No checkpoints set — all default to 0 via unwrap_or(0).
// RocksDB tables are empty.
let provider = factory.database_provider_ro().unwrap();
let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
let result = rocksdb.heal_storages_history(&provider).unwrap();
assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_none());
let result = rocksdb.heal_accounts_history(&provider).unwrap();
assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_none());
}
#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -1285,10 +1285,8 @@ impl RocksDBProvider {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());

View File

@@ -1166,7 +1166,7 @@ where
Node: NodeTypes,
{
fn state_root(&self, hashed_state: HashedPostState) -> Result<B256, ProviderError> {
self.state_root_from_nodes(TrieInput::from_state(hashed_state))
self.state_root_with_updates(hashed_state).map(|(root, _)| root)
}
fn state_root_from_nodes(&self, _input: TrieInput) -> Result<B256, ProviderError> {

View File

@@ -802,7 +802,7 @@ impl RuntimeBuilder {
let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
let proof_storage_worker_threads =
config.rayon.proof_storage_worker_threads.unwrap_or(default_threads);
config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
let proof_storage_worker_pool = WorkerPool::from_builder(
rayon::ThreadPoolBuilder::new()
.num_threads(proof_storage_worker_threads)
@@ -810,7 +810,7 @@ impl RuntimeBuilder {
)?;
let proof_account_worker_threads =
config.rayon.proof_account_worker_threads.unwrap_or(default_threads);
config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
let proof_account_worker_pool = WorkerPool::from_builder(
rayon::ThreadPoolBuilder::new()
.num_threads(proof_account_worker_threads)

View File

@@ -21,7 +21,7 @@ fn generate_transactions(num_senders: usize, txs_per_sender: usize) -> Vec<MockT
for sender_idx in 0..num_senders {
// Create a unique sender address
let sender_bytes = sender_idx.to_be_bytes();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
// Generate transactions for this sender

View File

@@ -17,7 +17,7 @@ fn generate_transactions(num_senders: usize, txs_per_sender: usize) -> Vec<MockT
for sender_idx in 0..num_senders {
// Create a unique sender address
let sender_bytes = sender_idx.to_be_bytes();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
// Generate transactions for this sender

View File

@@ -77,7 +77,7 @@ fn generate_many_transactions(
let idx_slice = idx.to_be_bytes();
// pad with 12 bytes of zeros before rest
let addr_slice = [0u8; 12].into_iter().chain(idx_slice.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(idx_slice).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
txs.extend(create_transactions_for_sender(&mut runner, sender, depth, only_eip4844));

View File

@@ -213,7 +213,7 @@ pub async fn maintain_transaction_pool<N, Client, P, St>(
dirty_addresses.remove(acc);
}
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let res = load_accounts(c, at, accs_to_reload);
let _ = tx.send(res);
}
.boxed()
@@ -221,7 +221,7 @@ pub async fn maintain_transaction_pool<N, Client, P, St>(
// can fetch all dirty accounts at once
let accs_to_reload = std::mem::take(&mut dirty_addresses);
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let res = load_accounts(c, at, accs_to_reload);
let _ = tx.send(res);
}
.boxed()

View File

@@ -142,9 +142,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
base_fee_per_blob_gas: u64,
) -> BestTransactionsWithFees<T> {
let mut best = self.best();
let mut submission_id = self.submission_id;
for tx in unlocked {
submission_id += 1;
for (submission_id, tx) in (self.submission_id + 1..).zip(unlocked) {
debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
let priority = self.ordering.priority(&tx.transaction, base_fee);
let tx_id = *tx.id();

View File

@@ -405,45 +405,37 @@ where
) -> Result<Tx, TransactionValidationOutcome<Tx>> {
// Checks for tx_type
match transaction.ty() {
LEGACY_TX_TYPE_ID => {
// Accept legacy transactions
// Accept only legacy transactions until EIP-2718/2930 activates
EIP2930_TX_TYPE_ID if !self.eip2718 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip2930Disabled.into(),
))
}
EIP2930_TX_TYPE_ID => {
// Accept only legacy transactions until EIP-2718/2930 activates
if !self.eip2718 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip2930Disabled.into(),
))
}
// Reject dynamic fee transactions until EIP-1559 activates.
EIP1559_TX_TYPE_ID if !self.eip1559 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip1559Disabled.into(),
))
}
EIP1559_TX_TYPE_ID => {
// Reject dynamic fee transactions until EIP-1559 activates.
if !self.eip1559 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip1559Disabled.into(),
))
}
// Reject blob transactions.
EIP4844_TX_TYPE_ID if !self.eip4844 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip4844Disabled.into(),
))
}
EIP4844_TX_TYPE_ID => {
// Reject blob transactions.
if !self.eip4844 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip4844Disabled.into(),
))
}
}
EIP7702_TX_TYPE_ID => {
// Reject EIP-7702 transactions.
if !self.eip7702 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip7702Disabled.into(),
))
}
// Reject EIP-7702 transactions.
EIP7702_TX_TYPE_ID if !self.eip7702 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip7702Disabled.into(),
))
}
// Accept known transaction types when their respective fork is active
LEGACY_TX_TYPE_ID | EIP2930_TX_TYPE_ID | EIP1559_TX_TYPE_ID | EIP4844_TX_TYPE_ID |
EIP7702_TX_TYPE_ID => {}
ty if !self.other_tx_types.bit(ty as usize) => {
return Err(TransactionValidationOutcome::Invalid(

View File

@@ -295,8 +295,8 @@ where
if self
.cursor
.seek_by_key_subkey(self.hashed_address, nibbles.clone())?
.filter(|e| *e.nibbles() == nibbles)
.is_some()
.as_ref()
.is_some_and(|e| *e.nibbles() == nibbles)
{
self.cursor.delete_current()?;
}

View File

@@ -404,8 +404,7 @@ fn account_and_storage_trie() {
if hashed_storage_cursor
.seek_by_key_subkey(key3, hashed_slot)
.unwrap()
.filter(|e| e.key == hashed_slot)
.is_some()
.is_some_and(|e| e.key == hashed_slot)
{
hashed_storage_cursor.delete_current().unwrap();
}

View File

@@ -118,7 +118,7 @@ fn includes_nodes_for_destroyed_storage_nodes() {
for node in multiproof.account_subtree.values() {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
for node in multiproof.storages.iter().flat_map(|(_, storage)| storage.subtree.values()) {
for node in multiproof.storages.values().flat_map(|storage| storage.subtree.values()) {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
});
@@ -172,7 +172,7 @@ fn correctly_decodes_branch_node_values() {
for node in multiproof.account_subtree.values() {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
for node in multiproof.storages.iter().flat_map(|(_, storage)| storage.subtree.values()) {
for node in multiproof.storages.values().flat_map(|storage| storage.subtree.values()) {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
});

View File

@@ -3820,7 +3820,7 @@ mod tests {
let mut stack = Vec::new();
let mut state_mask = TrieMask::default();
for (&idx, hash) in children_indices.iter().zip(child_hashes.into_iter()) {
for (&idx, hash) in children_indices.iter().zip(child_hashes) {
state_mask.set_bit(idx);
stack.push(hash);
}

View File

@@ -35,6 +35,9 @@
- [`reth db prune-checkpoints`](./reth/db/prune-checkpoints.mdx)
- [`reth db prune-checkpoints get`](./reth/db/prune-checkpoints/get.mdx)
- [`reth db prune-checkpoints set`](./reth/db/prune-checkpoints/set.mdx)
- [`reth db stage-checkpoints`](./reth/db/stage-checkpoints.mdx)
- [`reth db stage-checkpoints get`](./reth/db/stage-checkpoints/get.mdx)
- [`reth db stage-checkpoints set`](./reth/db/stage-checkpoints/set.mdx)
- [`reth db account-storage`](./reth/db/account-storage.mdx)
- [`reth db state`](./reth/db/state.mdx)
- [`reth download`](./reth/download.mdx)

View File

@@ -46,7 +46,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -32,7 +32,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -23,6 +23,7 @@ Commands:
path Returns the full database path
settings Manage storage settings
prune-checkpoints View or set prune checkpoints
stage-checkpoints `reth db stage-checkpoints` subcommand
account-storage Gets storage size information for an account
state Gets account state and storage at a specific block
help Print this message or the help of the given subcommand(s)
@@ -154,7 +155,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -42,7 +42,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -49,7 +49,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -48,7 +48,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -57,7 +57,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -46,7 +46,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -49,7 +49,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -92,7 +92,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -39,7 +39,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -55,7 +55,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -55,7 +55,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -82,7 +82,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -36,7 +36,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -58,7 +58,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -44,7 +44,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -36,7 +36,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

Some files were not shown because too many files have changed in this diff Show More