Compare commits

...

29 Commits

Author SHA1 Message Date
Emma Jamieson-Hoare
f04db57d3a chore(ci): continue on error when codspeed is flakey/cancelled on main merge 2026-02-27 09:49:17 +00:00
Delweng
09adb83922 fix(engine/tree): continue sync-target progression for already-seen downloaded blocks (#22628)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 08:12:06 +00:00
Delweng
c12b6d4c90 fix(rpc): return -38003 for FCU beacon-root payloadAttributes mismatches (#22634)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 07:54:20 +00:00
Derek Cofausper
7a78044587 chore(libmdbx): fix MDB_ -> MDBX_ typos (#22630)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 06:06:07 +00:00
figtracer
f88538e033 refactor(net): add peers() accessors on Swarm to flatten accessor chains (#22616)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 05:35:14 +00:00
DaniPopes
63dff64b8a chore: simplify tx iterator (#22365) 2026-02-27 05:09:13 +00:00
DaniPopes
233590cefd chore: use better hasher for precompile cache (#22360) 2026-02-27 05:09:12 +00:00
Derek Cofausper
40962ef6fc chore(hive): remove engine-withdrawals from ignored tests (#22625)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-27 03:57:43 +00:00
github-actions[bot]
2f121b099b chore(deps): weekly cargo update (#22624)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-27 03:36:42 +00:00
Delweng
0470050c05 fix(engine): continue downloading head block after making non-head sync target canonical (#22613)
Signed-off-by: Delweng <delweng@gmail.com>
2026-02-27 03:15:52 +00:00
MagicJoshh
cbc416b82a fix(rpc-provider): state_root delegates to stub that always returns zero (#22610) 2026-02-27 02:53:57 +00:00
MagicJoshh
3fddefbd38 fix(rpc): prevent u64 underflow when re-executing genesis block (#22532) 2026-02-27 02:48:59 +00:00
Julian Meyer
f97a6530c1 chore: make cached overlay fetch public (#22619) 2026-02-27 02:47:50 +00:00
Derek Cofausper
80e3e1c79d docs: add storage v2 guide (#22620)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-26 20:22:52 +00:00
Arsenii Kulikov
ee37c25a4b perf: use more multiproof workers (#22615) 2026-02-26 19:59:06 +00:00
Derek Cofausper
c01f9688e2 feat: add transaction iterator helpers to Chain (#22618)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 19:39:34 +00:00
bigbear
815a75833e refactor(exex): remove redundant update_capacity call (#22603) 2026-02-26 13:09:41 +00:00
cui
59c4e24296 fix(downloaders): reset metrics on clear (#21858) 2026-02-26 12:38:55 +00:00
Derek Cofausper
d5b5caa439 docs: add PR title and description guidelines to CLAUDE.md (#22602)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 12:20:54 +00:00
Julio
47f1999654 fix(net): abort discv4 and DNS discovery tasks on Discovery drop (#22590)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-26 10:37:57 +00:00
MergeBot
3ac5637bd1 chore(ci): fix collapsible_match clippy lint in chainspec (#22594)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-26 10:04:19 +00:00
Derek Cofausper
4cec99ed13 chore(bench): include core count in Slack notification when non-default (#22584)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 21:58:39 +00:00
Arsenii Kulikov
2f73835483 feat(reth-bench): support benchmarking via rlp blocks (#22581) 2026-02-25 20:28:47 +00:00
stevencartavia
ed20a40649 refactor(rpc): fetch block before tracing to avoid double lookups (#22503) 2026-02-25 20:17:45 +00:00
MergeBot
080a9cfc10 fix(rpc): add missing apply_pre_execution_changes in spawn_replay_transaction (#22575) 2026-02-25 20:04:02 +00:00
MergeBot
c4cd5c9b7b fix(rpc): add missing apply_pre_execution_changes in debug_traceCallMany (#22577) 2026-02-25 20:00:12 +00:00
Dan Cline
ce2a194fb7 feat(cli): add db stage-checkpoints command (#22579)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 19:58:59 +00:00
Vitalyr
6dcab51c97 fix(rpc): respect pending-block=none for provider blocks (#22556) 2026-02-25 19:45:42 +00:00
Derek Cofausper
4db23809cc fix(storage): return early in RocksDB healing when checkpoint is 0 (#22576)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 19:29:45 +00:00
133 changed files with 1902 additions and 739 deletions

View File

@@ -118,9 +118,12 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
const countsLine = warmup
? `*Warmup:* ${warmup} | *Blocks:* ${summary.blocks}`
: `*Blocks:* ${summary.blocks}`;
const cores = process.env.BENCH_CORES || '0';
const countsParts = [];
if (warmup) countsParts.push(`*Warmup:* ${warmup}`);
countsParts.push(`*Blocks:* ${summary.blocks}`);
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
const countsLine = countsParts.join(' | ');
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');

View File

@@ -11,17 +11,6 @@
#
# When a test should no longer be ignored, remove it from this list.
# flaky
engine-withdrawals:
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
# P2P sync timing issue in hive Docker environment: secondary client returns SYNCING but
# peer discovery/connection doesn't complete within the timeout when running with
# --sim.parallelism 16. Not a correctness bug, purely a CI timing issue.
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
engine-cancun:
- Transaction Re-Org, New Payload on Revert Back (Cancun) (reth)
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)

View File

@@ -62,6 +62,7 @@ permissions:
jobs:
codspeed:
if: github.event_name == 'push'
continue-on-error: true
runs-on: depot-ubuntu-latest
concurrency:
group: bench-codspeed-${{ github.head_ref || github.run_id }}

View File

@@ -191,6 +191,78 @@ The `book` CI job (`.github/workflows/lint.yml`) enforces this by regenerating t
### Opening PRs against <https://github.com/paradigmxyz/reth>
#### Titles
Use [Conventional Commits](https://www.conventionalcommits.org/) with an optional scope:
```
<type>(<scope>): <short description>
```
**Types**: `feat`, `fix`, `perf`, `refactor`, `docs`, `test`, `chore`
**Scope** (optional): crate or area, e.g. `evm`, `trie`, `rpc`, `engine`, `net`
Examples:
- `fix(rpc): correct gas estimation for ERC-20 transfers`
- `perf: batch trie updates to reduce cursor overhead`
- `feat(engine): add new_payload_interval metric`
#### Descriptions
Keep it short. Say what changed and why — nothing more.
**Do:**
- Write 13 sentences summarizing the change
- Explain _why_ if the diff doesn't make it obvious
- Link related issues or EIPs
- Include benchmark numbers for perf changes
**Don't:**
- List every file changed — that's what the diff is for
- Repeat the title in the body
- Add "Files changed" or "Changes" sections
- Write walls of text that go stale when the diff is updated
- Use filler like "This PR introduces...", "comprehensive", "robust", "enhance", "leverage"
**Template:**
```
Closes #<issue>
<what changed, 1-3 sentences>
<why, if not obvious from the diff>
```
**Good example:**
```
Closes #16800
Adds fallback for external IP resolution so node startup doesn't fail
when STUN is unreachable. Falls back to the configured default.
```
**Bad example:**
```
## Summary
This PR introduces comprehensive improvements to the IP resolution system.
## Changes
- Modified `crates/net/discv4/src/lib.rs` to add fallback
- Modified `crates/net/discv4/src/config.rs` to add default IP
- Added tests in `crates/net/discv4/src/tests/ip.rs`
## Files Changed
- crates/net/discv4/src/lib.rs
- crates/net/discv4/src/config.rs
- crates/net/discv4/src/tests/ip.rs
```
#### Labels and CI
Label PRs appropriately, first check the available labels and then apply the relevant ones:
* when changes are RPC related, add A-rpc label
* when changes are docs related, add C-docs label

354
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -31,6 +31,8 @@ pub(crate) struct BenchContext {
pub(crate) is_optimism: bool,
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
pub(crate) use_reth_namespace: bool,
/// Whether to fetch and replay RLP-encoded blocks.
pub(crate) rlp_blocks: bool,
}
impl BenchContext {
@@ -142,7 +144,8 @@ impl BenchContext {
};
let next_block = first_block.header.number + 1;
let use_reth_namespace = bench_args.reth_new_payload;
let rlp_blocks = bench_args.rlp_blocks;
let use_reth_namespace = bench_args.reth_new_payload || rlp_blocks;
Ok(Self {
auth_provider,
block_provider,
@@ -150,6 +153,7 @@ impl BenchContext {
next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
})
}
}

View File

@@ -21,6 +21,7 @@ use reth_chainspec::ChainSpec;
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use reth_rpc_api::RethNewPayloadInput;
use std::{path::PathBuf, time::Instant};
use tracing::info;
@@ -184,34 +185,32 @@ impl Command {
Some(new_payload_version),
)?;
let (version, params) = if self.reth_new_payload {
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?)
} else {
(Some(version), params)
};
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file = GasRampPayloadFile {
version: version as u8,
version: version.map(|v| v as u8),
block_hash,
params: params.clone(),
execution_data: Some(execution_data.clone()),
};
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
let reth_data = self.reth_new_payload.then_some(execution_data);
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
let _ = call_new_payload_with_reth(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated_with_reth(
&provider,
version,
forkchoice_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&provider, version, forkchoice_state).await?;
parent_header = block.header;
parent_hash = block_hash;

View File

@@ -25,7 +25,7 @@ use crate::{
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
},
};
use alloy_provider::Provider;
use alloy_provider::{ext::DebugApi, Provider};
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
@@ -154,6 +154,7 @@ impl Command {
mut next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
@@ -186,6 +187,21 @@ impl Command {
}
};
let rlp = if rlp_blocks {
let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
Ok(rlp) => rlp,
Err(e) => {
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
let _ = error_sender
.send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
break;
}
};
Some(rlp)
} else {
None
};
let head_block_hash = block.header.hash;
let safe_block_hash = block_provider
.get_block_by_number(block.header.number.saturating_sub(32).into());
@@ -207,7 +223,7 @@ impl Command {
next_block += 1;
if let Err(e) = sender
.send((block, head_block_hash, safe_block_hash, finalized_block_hash))
.send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
.await
{
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
@@ -221,7 +237,7 @@ impl Command {
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
while let Some((block, head, safe, finalized)) = {
while let Some((block, head, safe, finalized, rlp)) = {
let wait_start = Instant::now();
let result = receiver.recv().await;
total_wait_time += wait_start.elapsed();
@@ -240,11 +256,11 @@ impl Command {
finalized_block_hash: finalized,
};
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) =
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload_with_reth(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
@@ -263,13 +279,7 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
version,
forkchoice_state,
use_reth_namespace,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = if server_timings.is_some() {

View File

@@ -11,7 +11,7 @@ use crate::{
},
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
};
use alloy_provider::Provider;
use alloy_provider::{ext::DebugApi, Provider};
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
@@ -51,6 +51,7 @@ impl Command {
mut next_block,
is_optimism,
use_reth_namespace,
rlp_blocks,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
@@ -83,8 +84,21 @@ impl Command {
}
};
let rlp = if rlp_blocks {
let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
else {
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
let _ = error_sender
.send(eyre::eyre!("Failed to fetch raw block {next_block}"));
break;
};
Some(rlp)
} else {
None
};
next_block += 1;
if let Err(e) = sender.send(block).await {
if let Err(e) = sender.send((block, rlp)).await {
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
break;
}
@@ -96,7 +110,7 @@ impl Command {
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
while let Some(block) = {
while let Some((block, rlp)) = {
let wait_start = Instant::now();
let result = receiver.recv().await;
total_wait_time += wait_start.elapsed();
@@ -108,12 +122,12 @@ impl Command {
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) =
block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload_with_reth(&auth_provider, version, params).await?;
let latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());

View File

@@ -22,14 +22,14 @@ pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GasRampPayloadFile {
/// Engine API version (1-5).
pub(crate) version: u8,
///
/// `None` indicates that `reth_newPayload` should be used.
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) version: Option<u8>,
/// The block hash for FCU.
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
/// The execution data for `reth_newPayload`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas

View File

@@ -38,6 +38,7 @@ use eyre::Context;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
use reth_rpc_api::RethNewPayloadInput;
use std::{
path::PathBuf,
time::{Duration, Instant},
@@ -161,7 +162,9 @@ struct GasRampPayload {
/// Block number from filename.
block_number: u64,
/// Engine API version for newPayload.
version: EngineApiMessageVersion,
///
/// `None` indicates that `reth_newPayload` should be used.
version: Option<EngineApiMessageVersion>,
/// The file contents.
file: GasRampPayloadFile,
}
@@ -273,13 +276,10 @@ impl Command {
"Executing gas ramp payload (newPayload + FCU)"
);
let reth_data =
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
let _ = call_new_payload_with_reth(
&auth_provider,
payload.version,
payload.file.params.clone(),
reth_data,
)
.await?;
@@ -288,13 +288,7 @@ impl Command {
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated_with_reth(
&auth_provider,
payload.version,
fcu_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, payload.version, fcu_state).await?;
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
@@ -342,31 +336,34 @@ impl Command {
"Sending newPayload"
);
let params = serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?;
let (version, params) = if self.reth_new_payload {
let reth_data = ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields {
requests: envelope.execution_requests.clone().into(),
},
),
};
(None, serde_json::to_value((RethNewPayloadInput::ExecutionData(reth_data),))?)
} else {
(
Some(EngineApiMessageVersion::V4),
serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?,
)
};
let reth_data = self.reth_new_payload.then(|| ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
),
});
let server_timings = call_new_payload_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
params,
reth_data,
)
.await?;
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
@@ -391,13 +388,7 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
fcu_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency =
@@ -558,13 +549,18 @@ impl Command {
let file: GasRampPayloadFile = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let version = match file.version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
let version = if let Some(version) = file.version {
match version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
}
.into()
} else {
None
};
info!(

View File

@@ -3,7 +3,7 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
@@ -12,6 +12,7 @@ use alloy_rpc_types_engine::{
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use reth_rpc_api::RethNewPayloadInput;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error};
@@ -169,7 +170,15 @@ where
pub(crate) fn block_to_new_payload(
block: AnyRpcBlock,
is_optimism: bool,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
rlp: Option<Bytes>,
reth_new_payload: bool,
) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
if let Some(rlp) = rlp {
return Ok((
None,
serde_json::to_value((RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),))?,
));
}
let block = block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
@@ -181,7 +190,14 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
let (version, params, execution_data) =
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
if reth_new_payload {
Ok((None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?))
} else {
Ok((Some(version), params))
}
}
/// Converts an execution payload and sidecar into versioned engine API params and an
@@ -266,17 +282,15 @@ pub(crate) fn payload_to_new_payload(
#[allow(dead_code)]
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
version: Option<EngineApiMessageVersion>,
params: serde_json::Value,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params, None).await
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params).await
}
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
#[derive(Debug, Deserialize)]
struct RethPayloadStatus {
#[serde(flatten)]
status: PayloadStatus,
latency_us: u64,
#[serde(default)]
persistence_wait_us: Option<u64>,
@@ -300,72 +314,50 @@ pub(crate) struct NewPayloadTimingBreakdown {
}
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
/// `reth_execution_data` is provided.
/// `version` is provided.
///
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
///
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
/// the standard engine namespace.
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
version: Option<EngineApiMessageVersion>,
params: serde_json::Value,
reth_execution_data: Option<ExecutionData>,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
if let Some(execution_data) = reth_execution_data {
let method = "reth_newPayload";
let reth_params = serde_json::to_value((execution_data.clone(),))
.expect("ExecutionData serialization cannot fail");
) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
debug!(target: "reth-bench", method, "Sending newPayload");
debug!(target: "reth-bench", method, "Sending newPayload");
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
let resp = loop {
let resp: serde_json::Value = provider.client().request(method, &params).await?;
let status = PayloadStatus::deserialize(&resp)?;
while !resp.status.is_valid() {
if resp.status.is_invalid() {
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
)))
}
if resp.status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
if status.is_valid() {
break resp;
}
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
} else {
let method = version.method_name();
debug!(target: "reth-bench", method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {status:?}")),
)))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
status = provider.client().request(method, &params).await?;
if status.is_invalid() {
return Err(eyre::eyre!("Invalid {method}: {status:?}"));
}
Ok(None)
if status.is_syncing() {
return Err(eyre::eyre!(
"invalid range: no canonical state found for parent of requested block"
));
}
};
if version.is_some() {
return Ok(None);
}
let resp: RethPayloadStatus = serde_json::from_value(resp)?;
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
@@ -403,20 +395,25 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
P: Provider<N> + EngineApiValidWaitExt<N>,
>(
provider: P,
message_version: EngineApiMessageVersion,
message_version: Option<EngineApiMessageVersion>,
forkchoice_state: ForkchoiceState,
use_reth: bool,
) -> TransportResult<ForkchoiceUpdated> {
if use_reth {
if let Some(message_version) = message_version {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
} else {
let method = "reth_forkchoiceUpdated";
let reth_params = serde_json::to_value((forkchoice_state,))
.expect("ForkchoiceState serialization cannot fail");
debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
let mut resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
loop {
let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
if resp.is_valid() {
break Ok(resp)
}
while !resp.is_valid() {
if resp.is_invalid() {
error!(target: "reth-bench", ?resp, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
@@ -428,11 +425,6 @@ pub(crate) async fn call_forkchoice_updated_with_reth<
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
}
Ok(resp)
} else {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
}
}

View File

@@ -855,15 +855,9 @@ impl From<Genesis> for ChainSpec {
// those networks we use the activation
// blocks of those networks
match genesis.config.chain_id {
1 => {
if ttd == MAINNET_PARIS_TTD {
return Some(MAINNET_PARIS_BLOCK)
}
}
11155111 => {
if ttd == SEPOLIA_PARIS_TTD {
return Some(SEPOLIA_PARIS_BLOCK)
}
1 if ttd == MAINNET_PARIS_TTD => return Some(MAINNET_PARIS_BLOCK),
11155111 if ttd == SEPOLIA_PARIS_TTD => {
return Some(SEPOLIA_PARIS_BLOCK)
}
_ => {}
};

View File

@@ -19,6 +19,7 @@ mod list;
mod prune_checkpoints;
mod repair_trie;
mod settings;
mod stage_checkpoints;
mod state;
mod static_file_header;
mod stats;
@@ -70,6 +71,8 @@ pub enum Subcommands {
Settings(settings::Command),
/// View or set prune checkpoints
PruneCheckpoints(prune_checkpoints::Command),
// View or set stage checkpoints
StageCheckpoints(stage_checkpoints::Command),
/// Gets storage size information for an account
AccountStorage(account_storage::Command),
/// Gets account state and storage at a specific block
@@ -213,6 +216,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::StageCheckpoints(command) => {
db_exec!(self.env, tool, N, command.access_rights(), {
command.execute(&tool)?;
});
}
Subcommands::AccountStorage(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -0,0 +1,297 @@
//! `reth db stage-checkpoints` command for viewing and setting stage checkpoint values.
use clap::{Args, Parser, Subcommand, ValueEnum};
use reth_db_common::DbTool;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
StageCheckpointWriter,
};
use reth_stages::StageId;
use crate::common::AccessRights;
/// `reth db stage-checkpoints` subcommand
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
impl Command {
/// Returns database access rights required for the command.
pub fn access_rights(&self) -> AccessRights {
match &self.command {
Subcommands::Get { .. } => AccessRights::RO,
Subcommands::Set(_) => AccessRights::RW,
}
}
/// Execute the command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Get { stage } => Self::get(tool, stage),
Subcommands::Set(args) => Self::set(tool, args),
}
}
fn get<N: ProviderNodeTypes>(tool: &DbTool<N>, stage: Option<StageArg>) -> eyre::Result<()> {
let provider = tool.provider_factory.provider()?;
match stage {
Some(stage) => {
let stage_id = stage.into();
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
println!("{stage_id}: {checkpoint:?}");
}
None => {
let mut checkpoints = provider.get_all_checkpoints()?;
checkpoints.sort_by(|a, b| a.0.cmp(&b.0));
for (stage, checkpoint) in checkpoints {
println!("{stage}: {checkpoint:?}");
}
}
}
Ok(())
}
fn set<N: ProviderNodeTypes>(tool: &DbTool<N>, args: SetArgs) -> eyre::Result<()> {
let stage_id: StageId = args.stage.into();
let provider_rw = tool.provider_factory.database_provider_rw()?;
let previous = provider_rw.get_stage_checkpoint(stage_id)?;
let mut checkpoint = previous.unwrap_or_default();
checkpoint.block_number = args.block_number;
if args.clear_stage_unit {
checkpoint.stage_checkpoint = None;
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
provider_rw.commit()?;
println!("Updated checkpoint for {stage_id}: {checkpoint:?}");
Ok(())
}
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Get stage checkpoint(s) from database.
Get {
/// Specific stage to query. If omitted, shows all stages.
#[arg(long, value_enum)]
stage: Option<StageArg>,
},
/// Set a stage checkpoint.
Set(SetArgs),
}
/// Arguments for the `set` subcommand.
#[derive(Debug, Args)]
pub struct SetArgs {
/// Stage to update.
#[arg(long, value_enum)]
stage: StageArg,
/// Block number to set as stage checkpoint.
#[arg(long)]
block_number: u64,
/// Clear stage-specific unit checkpoint payload.
#[arg(long)]
clear_stage_unit: bool,
}
/// CLI-friendly stage names.
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum StageArg {
Era,
Headers,
Bodies,
SenderRecovery,
Execution,
PruneSenderRecovery,
MerkleUnwind,
AccountHashing,
StorageHashing,
MerkleExecute,
TransactionLookup,
IndexStorageHistory,
IndexAccountHistory,
Prune,
Finish,
}
impl From<StageArg> for StageId {
fn from(arg: StageArg) -> Self {
match arg {
StageArg::Era => Self::Era,
StageArg::Headers => Self::Headers,
StageArg::Bodies => Self::Bodies,
StageArg::SenderRecovery => Self::SenderRecovery,
StageArg::Execution => Self::Execution,
StageArg::PruneSenderRecovery => Self::PruneSenderRecovery,
StageArg::MerkleUnwind => Self::MerkleUnwind,
StageArg::AccountHashing => Self::AccountHashing,
StageArg::StorageHashing => Self::StorageHashing,
StageArg::MerkleExecute => Self::MerkleExecute,
StageArg::TransactionLookup => Self::TransactionLookup,
StageArg::IndexStorageHistory => Self::IndexStorageHistory,
StageArg::IndexAccountHistory => Self::IndexAccountHistory,
StageArg::Prune => Self::Prune,
StageArg::Finish => Self::Finish,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
use reth_provider::{
test_utils::create_test_provider_factory, DBProvider, DatabaseProviderFactory,
StageCheckpointReader, StageCheckpointWriter,
};
use reth_stages::StageCheckpoint;
#[test]
fn parse_set_args() {
let command = Command::parse_from([
"stage-checkpoints",
"set",
"--stage",
"headers",
"--block-number",
"123",
]);
assert!(matches!(
command.command,
Subcommands::Set(SetArgs {
stage: StageArg::Headers,
block_number: 123,
clear_stage_unit: false,
})
));
}
#[test]
fn set_overwrites_block_number() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
provider_rw
.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(10))
.expect("save checkpoint");
provider_rw.commit().expect("commit initial checkpoint");
}
let command = Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Headers,
block_number: 42,
clear_stage_unit: false,
}),
};
command.execute(&tool).expect("execute command");
let provider = provider_factory.provider().expect("provider");
let checkpoint = provider
.get_stage_checkpoint(StageId::Headers)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert_eq!(checkpoint.block_number, 42);
}
#[test]
fn set_preserves_stage_unit_checkpoint_unless_cleared() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
let checkpoint = StageCheckpoint::new(10).with_block_range(&StageId::Execution, 5, 10);
provider_rw
.save_stage_checkpoint(StageId::Execution, checkpoint)
.expect("save checkpoint");
provider_rw.commit().expect("commit initial checkpoint");
}
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Execution,
block_number: 11,
clear_stage_unit: false,
}),
}
.execute(&tool)
.expect("execute command");
let provider = provider_factory.provider().expect("provider");
let checkpoint = provider
.get_stage_checkpoint(StageId::Execution)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert!(checkpoint.stage_checkpoint.is_some());
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::Execution,
block_number: 12,
clear_stage_unit: true,
}),
}
.execute(&tool)
.expect("execute command");
let checkpoint = provider_factory
.provider()
.expect("provider")
.get_stage_checkpoint(StageId::Execution)
.expect("get stage checkpoint")
.expect("missing stage checkpoint");
assert!(checkpoint.stage_checkpoint.is_none());
}
#[test]
fn set_preserves_checkpoint_progress() {
let provider_factory = create_test_provider_factory();
let tool = DbTool::new(provider_factory.clone()).expect("db tool");
{
let provider_rw = provider_factory.database_provider_rw().expect("rw provider");
provider_rw
.save_stage_checkpoint(StageId::MerkleExecute, StageCheckpoint::new(10))
.expect("save checkpoint");
provider_rw
.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![1, 2, 3])
.expect("save progress");
provider_rw.commit().expect("commit initial checkpoint");
}
Command {
command: Subcommands::Set(SetArgs {
stage: StageArg::MerkleExecute,
block_number: 20,
clear_stage_unit: false,
}),
}
.execute(&tool)
.expect("execute command");
let provider = provider_factory.provider().expect("provider");
let progress = provider
.get_stage_checkpoint_progress(StageId::MerkleExecute)
.expect("get stage checkpoint progress");
assert_eq!(progress, Some(vec![1, 2, 3]));
}
}

View File

@@ -297,21 +297,18 @@ where
}
match event {
Event::Key(key) => {
if key.kind == event::KeyEventKind::Press {
match key.code {
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
KeyCode::Down => app.next(),
KeyCode::Up => app.previous(),
KeyCode::Right => app.next_page(),
KeyCode::Left => app.previous_page(),
KeyCode::Char('G') => {
app.mode = ViewMode::GoToPage;
}
_ => {}
}
Event::Key(key) if key.kind == event::KeyEventKind::Press => match key.code {
KeyCode::Char('q') | KeyCode::Char('Q') => return Ok(true),
KeyCode::Down => app.next(),
KeyCode::Up => app.previous(),
KeyCode::Right => app.next_page(),
KeyCode::Left => app.previous_page(),
KeyCode::Char('G') => {
app.mode = ViewMode::GoToPage;
}
}
_ => {}
},
Event::Key(_) => {}
Event::Mouse(e) => match e.kind {
MouseEventKind::ScrollDown => app.next(),
MouseEventKind::ScrollUp => app.previous(),

View File

@@ -9,26 +9,6 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// Returns the default number of storage worker threads based on available parallelism.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map_or(8, |n| n.get() * 2)
}
#[cfg(not(feature = "std"))]
{
8
}
}
/// Returns the default number of account worker threads.
///
/// Account workers coordinate storage proof collection and account trie traversal.
/// They are set to the same count as storage workers for simplicity.
fn default_account_worker_count() -> usize {
default_storage_worker_count()
}
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
@@ -147,10 +127,6 @@ pub struct TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Depth for sparse trie pruning after state root computation.
@@ -187,8 +163,6 @@ impl Default for TreeConfig {
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_cache_metrics: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
@@ -220,8 +194,6 @@ impl TreeConfig {
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
@@ -246,8 +218,6 @@ impl TreeConfig {
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_cache_metrics,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
@@ -479,42 +449,6 @@ impl TreeConfig {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
}
/// Setter for the number of storage proof worker threads.
///
/// No-op if it's [`None`].
pub const fn with_storage_worker_count_opt(
mut self,
storage_worker_count: Option<usize>,
) -> Self {
if let Some(count) = storage_worker_count {
self.storage_worker_count = count;
}
self
}
/// Return the number of account proof worker threads.
pub const fn account_worker_count(&self) -> usize {
self.account_worker_count
}
/// Setter for the number of account proof worker threads.
///
/// No-op if it's [`None`].
pub const fn with_account_worker_count_opt(
mut self,
account_worker_count: Option<usize>,
) -> Self {
if let Some(count) = account_worker_count {
self.account_worker_count = count;
}
self
}
/// Returns whether cache metrics recording is disabled.
pub const fn disable_cache_metrics(&self) -> bool {
self.disable_cache_metrics

View File

@@ -2583,6 +2583,51 @@ where
Some(TreeEvent::Download(request))
}
/// Handles a downloaded block that was successfully inserted as valid.
///
/// If the block matches the sync target head, returns [`TreeAction::MakeCanonical`].
/// If it matches a non-head sync target (safe or finalized), makes it canonical inline
/// and triggers a download for the remaining blocks towards the actual head.
/// Otherwise, tries to connect buffered blocks.
fn on_valid_downloaded_block(
&mut self,
block_num_hash: BlockNumHash,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
// check if we just inserted a block that's part of sync targets,
// i.e. head, safe, or finalized
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
sync_target.contains(block_num_hash.hash)
{
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
if sync_target.head_block_hash == block_num_hash.hash {
// we just inserted the sync target head block, make it canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
})))
}
// This block is part of the sync target (safe or finalized) but not the
// head. Make it canonical and try to connect any buffered children, then
// continue downloading towards the actual head if needed.
self.make_canonical(block_num_hash.hash)?;
self.try_connect_buffered_blocks(block_num_hash)?;
// Check if we've reached the sync target head after connecting buffered
// blocks (e.g. the head block may have already been buffered).
if self.state.tree_state.canonical_block_hash() != sync_target.head_block_hash {
let target = self.lowest_buffered_ancestor_or(sync_target.head_block_hash);
trace!(target: "engine::tree", %target, "sync target head not yet reached, downloading head block");
return Ok(Some(TreeEvent::Download(DownloadRequest::single_block(target))))
}
return Ok(None)
}
trace!(target: "engine::tree", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash)?;
Ok(None)
}
/// Invoked with a block downloaded from the network
///
/// Returns an event with the appropriate action to take, such as:
@@ -2605,22 +2650,11 @@ where
// try to append the block
match self.insert_block(block) {
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
// check if we just inserted a block that's part of sync targets,
// i.e. head, safe, or finalized
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
sync_target.contains(block_num_hash.hash)
{
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
// we just inserted a block that we know is part of the canonical chain, so we
// can make it canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
})))
}
trace!(target: "engine::tree", "appended downloaded block");
self.try_connect_buffered_blocks(block_num_hash)?;
Ok(
InsertPayloadOk::Inserted(BlockStatus::Valid) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid),
) => {
return self.on_valid_downloaded_block(block_num_hash);
}
Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
// block is not connected to the canonical head, we need to download

View File

@@ -822,9 +822,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Returns iterator yielding transactions from the stream.
pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
core::iter::repeat_with(|| self.transactions.recv())
.take_while(|res| res.is_ok())
.map(|res| res.unwrap())
self.transactions.iter()
}
}

View File

@@ -1,6 +1,9 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use alloy_primitives::{
map::{DefaultHashBuilder, FbBuildHasher},
Bytes,
};
use moka::policy::EvictionPolicy;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use reth_primitives_traits::dashmap::DashMap;
@@ -13,7 +16,7 @@ const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>, FbBuildHasher<20>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
@@ -37,9 +40,7 @@ where
/// Cache for precompiles, for each input stores the result.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
pub struct PrecompileCache<S>(moka::sync::Cache<Bytes, CacheEntry<S>, DefaultHashBuilder>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;

View File

@@ -139,7 +139,7 @@ impl<N: NodePrimitives> TreeState<N> {
///
/// Both parent hash and anchor hash must match to ensure the overlay is valid.
/// This prevents using a stale overlay after persistence has advanced the anchor.
pub(crate) fn get_cached_overlay(
pub fn get_cached_overlay(
&self,
parent_hash: B256,
expected_anchor: B256,

View File

@@ -2041,3 +2041,126 @@ mod forkchoice_updated_tests {
assert_eq!(last_persisted_number, canonical_tip);
}
}
/// Tests that `on_valid_downloaded_block` triggers a download for the actual head block when
/// the block matches a non-head sync target (safe or finalized).
///
/// This exercises the exact code path fixed in `on_downloaded_block`: after `insert_block`
/// returns `Inserted(Valid)`, `on_valid_downloaded_block` checks `sync_target.contains()`.
/// If the block is NOT the head, it should make canonical inline and emit a `Download`
/// event for the head — rather than returning `MakeCanonical` which would stop the download
/// pipeline.
///
/// Reproduces the hive test failure:
/// "Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts -
/// No Transactions: Timeout while waiting for secondary client to sync"
#[test]
fn test_on_valid_downloaded_non_head_sync_target_continues_to_head() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
// Build blocks: genesis (0) and safe block (1).
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
let genesis = &blocks[0];
let safe_block = &blocks[1];
// Insert genesis and safe block into the tree. The safe block must be in the tree
// for `make_canonical` to succeed inside `on_valid_downloaded_block`.
test_harness = test_harness.with_blocks(vec![genesis.clone(), safe_block.clone()]);
let genesis_hash = genesis.recovered_block().hash();
let safe_hash = safe_block.recovered_block().hash();
let head_hash = B256::random(); // head block is unknown — hasn't been downloaded yet
// Reset canonical head to genesis so the safe block is in tree but not yet canonical.
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
// Set the forkchoice tracker to SYNCING with head != safe.
let fcu_state = ForkchoiceState {
head_block_hash: head_hash,
safe_block_hash: safe_hash,
finalized_block_hash: genesis_hash,
};
test_harness
.tree
.state
.forkchoice_state_tracker
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
// Call on_valid_downloaded_block — this is called by on_downloaded_block after
// insert_block returns Inserted(Valid).
let safe_num_hash = safe_block.recovered_block().num_hash();
let result = test_harness.tree.on_valid_downloaded_block(safe_num_hash).unwrap();
// With the fix: the engine makes safe canonical inline, then emits Download for head.
// Without the fix: it would return MakeCanonical{safe_hash} and never download head.
match result {
Some(TreeEvent::Download(DownloadRequest::BlockSet(hashes))) => {
assert!(
hashes.contains(&head_hash),
"Expected download for head block {head_hash}, got {hashes:?}"
);
}
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
panic!(
"BUG: returned MakeCanonical for non-head block {sync_target_head} \
instead of downloading the actual head {head_hash}"
);
}
other => panic!("Expected Download event for head block, got: {other:?}"),
}
// Verify the safe block was made canonical.
assert_eq!(
test_harness.tree.state.tree_state.canonical_block_hash(),
safe_hash,
"Safe block should be canonical after on_valid_downloaded_block"
);
}
/// Tests that `on_valid_downloaded_block` returns `MakeCanonical` when the downloaded block
/// IS the sync target head (the normal non-buggy path).
#[test]
fn test_on_valid_downloaded_head_sync_target_returns_make_canonical() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..2).collect();
let genesis = &blocks[0];
let head_block = &blocks[1];
test_harness = test_harness.with_blocks(vec![genesis.clone(), head_block.clone()]);
let genesis_hash = genesis.recovered_block().hash();
let head_hash = head_block.recovered_block().hash();
// Reset canonical head to genesis.
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
// Set the forkchoice tracker: head == the downloaded block.
let fcu_state = ForkchoiceState {
head_block_hash: head_hash,
safe_block_hash: head_hash,
finalized_block_hash: genesis_hash,
};
test_harness
.tree
.state
.forkchoice_state_tracker
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
let head_num_hash = head_block.recovered_block().num_hash();
let result = test_harness.tree.on_valid_downloaded_block(head_num_hash).unwrap();
// When the downloaded block IS the head, should return MakeCanonical.
match result {
Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head })) => {
assert_eq!(sync_target_head, head_hash);
}
other => panic!("Expected MakeCanonical for head block, got: {other:?}"),
}
}

View File

@@ -108,7 +108,7 @@ impl EngineMessageStore {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
Ok(filenames_by_ts.into_values().flatten())
}
}

View File

@@ -242,7 +242,7 @@ mod tests {
#[test]
fn test_valid_gas_limit_increase() {
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
let child = header_with_gas_limit((parent.gas_limit + 5) as u64);
let child = header_with_gas_limit(parent.gas_limit + 5);
assert!(validate_against_parent_gas_limit(
&child,
@@ -260,7 +260,7 @@ mod tests {
assert!(matches!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
ConsensusError::GasLimitInvalidMinimum { child_gas_limit }
if child_gas_limit == child.gas_limit as u64
if child_gas_limit == child.gas_limit
));
}

View File

@@ -202,6 +202,18 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks().iter().map(|block| block.1)
}
/// Returns an iterator over all transactions in the chain.
pub fn transactions_iter(&self) -> impl Iterator<Item = &N::SignedTx> + '_ {
self.blocks_iter().flat_map(|block| block.body().transactions())
}
/// Returns an iterator over all [`Recovered`] transaction references in the chain.
pub fn transactions_recovered_iter(
&self,
) -> impl Iterator<Item = Recovered<&N::SignedTx>> + '_ {
self.blocks_iter().flat_map(|block| block.transactions_recovered())
}
/// Returns an iterator over all blocks and their receipts in the chain.
pub fn blocks_and_receipts(
&self,

View File

@@ -376,7 +376,7 @@ mod tests {
);
for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
pipeline_results.iter().zip(backfill_results).enumerate()
{
backfill_output.state.reverts.sort();

View File

@@ -505,9 +505,6 @@ where
}
let buffer_full = this.buffer.len() >= this.max_capacity;
// Update capacity
this.update_capacity();
// Advance all poll senders
let mut min_id = usize::MAX;
for idx in (0..this.exex_handles.len()).rev() {

View File

@@ -955,10 +955,8 @@ impl Discv4Service {
// Check if ENR was updated
match (last_enr_seq, old_enr) {
(Some(new), Some(old)) => {
if new > old {
self.send_enr_request(record);
}
(Some(new), Some(old)) if new > old => {
self.send_enr_request(record);
}
(Some(_), None) => {
// got an ENR
@@ -1195,10 +1193,8 @@ impl Discv4Service {
} else {
// Request ENR if included in the ping
match (ping.enr_sq, old_enr) {
(Some(new), Some(old)) => {
if new > old {
self.send_enr_request(record);
}
(Some(new), Some(old)) if new > old => {
self.send_enr_request(record);
}
(Some(_), None) => {
self.send_enr_request(record);
@@ -1355,10 +1351,8 @@ impl Discv4Service {
_ => return,
};
match (fork_id, old_fork_id) {
(Some(new), Some(old)) => {
if new != old {
self.notify(DiscoveryUpdate::EnrForkId(record, new))
}
(Some(new), Some(old)) if new != old => {
self.notify(DiscoveryUpdate::EnrForkId(record, new))
}
(Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
_ => {}

View File

@@ -52,6 +52,7 @@ where
pub(crate) fn clear(&mut self) {
self.inner.clear();
self.last_requested_block_number.take();
self.metrics.clear();
}
/// Add new request to the queue.
/// Expects a sorted list of headers.

View File

@@ -60,6 +60,15 @@ impl BodyDownloaderMetrics {
_error => self.unexpected_errors.increment(1),
}
}
/// Clear all gauge metrics by setting them to 0.
pub fn clear(&self) {
self.in_flight_requests.set(0);
self.buffered_responses.set(0);
self.buffered_blocks.set(0);
self.buffered_blocks_size_bytes.set(0);
self.queued_blocks.set(0);
}
}
/// Metrics for an individual response, i.e. the size in bytes, and length (number of bodies) in the

View File

@@ -301,6 +301,20 @@ impl Discovery {
}
}
impl Drop for Discovery {
fn drop(&mut self) {
if let Some(discv4) = &self.discv4 {
discv4.terminate();
}
if let Some(handle) = self._discv4_service.take() {
handle.abort();
}
if let Some(handle) = self._dns_disc_service.take() {
handle.abort();
}
}
}
impl Stream for Discovery {
type Item = DiscoveryEvent;

View File

@@ -431,19 +431,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Returns an iterator over all peers in the peer set.
pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.swarm.state().peers().iter_peers()
self.swarm.peers().iter_peers()
}
/// Returns the number of peers in the peer set.
pub fn num_known_peers(&self) -> usize {
self.swarm.state().peers().num_known_peers()
self.swarm.peers().num_known_peers()
}
/// Returns a new [`PeersHandle`] that can be cloned and shared.
///
/// The [`PeersHandle`] can be used to interact with the network's peer set.
pub fn peers_handle(&self) -> PeersHandle {
self.swarm.state().peers().handle()
self.swarm.peers().handle()
}
/// Collect the peers from the [`NetworkManager`] and write them to the given
@@ -452,7 +452,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Only persists peers that are not currently backed off or banned. Includes metadata like
/// peer kind, fork ID, and reputation.
pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
let peers = self.swarm.state().peers().persistable_peers().collect::<Vec<_>>();
let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
Ok(())
@@ -730,10 +730,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
let _ = tx.send(());
}
NetworkHandleMessage::ReputationChange(peer_id, kind) => {
self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
}
NetworkHandleMessage::GetReputationById(peer_id, tx) => {
let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
}
NetworkHandleMessage::FetchClient(tx) => {
let _ = tx.send(self.fetch_client());
@@ -756,7 +756,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
let peer_ids = self.swarm.peers().peers_by_kind(kind);
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
@@ -791,7 +791,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.metrics.total_incoming_connections.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
.set(self.swarm.peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
@@ -829,7 +829,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}
if direction.is_outgoing() {
self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
self.swarm.peers_mut().on_active_outgoing_established(peer_id);
}
self.update_active_connection_metrics();
@@ -857,12 +857,12 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
@@ -877,23 +877,19 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
);
// Capture direction before state is reset to Idle
let is_inbound = self.swarm.state().peers().is_inbound_peer(&peer_id);
let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
let reason = if let Some(ref err) = error {
// If the connection was closed due to an error, we report
// the peer
self.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
err.as_disconnected()
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
self.backed_off_peers_metrics
.increment_for_reason(BackoffReason::GracefulClose);
None
@@ -908,9 +904,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.disconnect_metrics.increment_outbound(reason);
}
}
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
@@ -940,7 +934,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.closed_sessions_metrics.incoming_pending.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
.set(self.swarm.peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
@@ -952,7 +946,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
);
if let Some(ref err) = error {
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
self.swarm.peers_mut().on_outgoing_pending_session_dropped(
&remote_addr,
&peer_id,
err,
@@ -972,9 +966,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}
self.closed_sessions_metrics.outgoing_pending.increment(1);
self.update_pending_connection_metrics();
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
@@ -985,16 +977,14 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
"Outgoing connection error"
);
self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
self.swarm.peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
self.update_pending_connection_metrics();
}
SwarmEvent::BadMessage { peer_id } => {
@@ -1052,12 +1042,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Updates the metrics for active,established connections
#[inline]
fn update_active_connection_metrics(&self) {
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
}
/// Updates the metrics for pending connections
@@ -1065,7 +1051,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
fn update_pending_connection_metrics(&self) {
self.metrics
.pending_outgoing_connections
.set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
.set(self.swarm.peers().num_pending_outbound_connections() as f64);
self.metrics
.total_pending_connections
.set(self.swarm.sessions().num_pending_connections() as f64);

View File

@@ -862,7 +862,7 @@ impl PeersManager {
}
}
if kind.filter(|kind| kind.is_trusted()).is_some() {
if kind.is_some_and(|kind| kind.is_trusted()) {
// also track the peer in the peer id set
self.trusted_peer_ids.insert(peer_id);
}

View File

@@ -1,7 +1,7 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::PeerMessage,
peers::InboundConnectionError,
peers::{InboundConnectionError, PeersManager},
protocol::IntoRlpxSubProtocol,
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction},
@@ -98,6 +98,16 @@ impl<N: NetworkPrimitives> Swarm<N> {
pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
&mut self.sessions
}
/// Access to the [`PeersManager`].
pub(crate) const fn peers(&self) -> &PeersManager {
self.state.peers()
}
/// Mutable access to the [`PeersManager`].
pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
self.state.peers_mut()
}
}
impl<N: NetworkPrimitives> Swarm<N> {
@@ -190,9 +200,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
return None
}
// ensure we can handle an incoming connection from this address
if let Err(err) =
self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
{
if let Err(err) = self.peers_mut().on_incoming_pending_session(remote_addr.ip()) {
match err {
InboundConnectionError::IpBanned => {
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
@@ -256,21 +264,21 @@ impl<N: NetworkPrimitives> Swarm<N> {
//
// When disabled (default), peers without a fork ID are admitted immediately.
// Peers that *do* carry a fork ID are always validated against ours.
let enforce = self.state().peers().enforce_enr_fork_id();
let enforce = self.peers().enforce_enr_fork_id();
let allow = match fork_id {
Some(f) => self.sessions.is_valid_fork_id(f),
None => !enforce,
};
if allow {
self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
self.peers_mut().add_peer(peer_id, addr, fork_id);
}
}
StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().add_peer(peer_id, addr, Some(fork_id));
self.peers_mut().add_peer(peer_id, addr, Some(fork_id));
} else {
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
self.state_mut().peers_mut().remove_peer(peer_id);
self.peers_mut().remove_peer(peer_id);
}
}
}
@@ -279,18 +287,18 @@ impl<N: NetworkPrimitives> Swarm<N> {
/// Set network connection state to `ShuttingDown`
pub(crate) const fn on_shutdown_requested(&mut self) {
self.state_mut().peers_mut().on_shutdown();
self.peers_mut().on_shutdown();
}
/// Checks if the node's network connection state is '`ShuttingDown`'
#[inline]
pub(crate) const fn is_shutting_down(&self) -> bool {
self.state().peers().connection_state().is_shutting_down()
self.peers().connection_state().is_shutting_down()
}
/// Set network connection state to `Hibernate` or `Active`
pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
self.state_mut().peers_mut().on_network_state_change(network_state);
self.peers_mut().on_network_state_change(network_state);
}
}

View File

@@ -78,6 +78,10 @@ pub struct BenchmarkArgs {
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
pub reth_new_payload: bool,
/// Fetch and replay RLP-encoded blocks. Implies `reth_new_payload`.
#[arg(long, default_value = "false", verbatim_doc_comment)]
pub rlp_blocks: bool,
}
#[cfg(test)]

View File

@@ -461,8 +461,6 @@ impl EngineArgs {
self.always_process_payload_attributes_on_canonical_head,
)
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
.with_storage_worker_count_opt(self.storage_worker_count)
.with_account_worker_count_opt(self.account_worker_count)
.without_cache_metrics(self.cache_metrics_disabled)
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)

View File

@@ -8,7 +8,7 @@ use jsonrpsee_types::error::{
};
use reth_engine_primitives::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::EngineObjectValidationError;
use reth_payload_primitives::{EngineObjectValidationError, VersionSpecificValidationError};
use thiserror::Error;
/// The Engine API result type
@@ -117,11 +117,14 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EngineObjectValidationError::Payload(_) |
EngineObjectValidationError::InvalidParams(_) |
// Per Engine API spec, structure validation errors for PayloadAttributes
// (e.g., missing withdrawals post-Shanghai, missing parentBeaconBlockRoot
// post-Cancun) should return -32602 "Invalid params", not -38003.
// (e.g., missing withdrawals post-Shanghai) should return -32602 "Invalid params".
// See: https://github.com/ethereum/execution-apis/blob/main/src/engine/shanghai.md
// Fixes: https://github.com/paradigmxyz/reth/issues/8732
EngineObjectValidationError::PayloadAttributes(_),
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::WithdrawalsNotSupportedInV1 |
VersionSpecificValidationError::NoWithdrawalsPostShanghai |
VersionSpecificValidationError::HasWithdrawalsPreShanghai,
),
) |
EngineApiError::UnexpectedRequestsHash => {
// Note: the data field is not required by the spec, but is also included by other
@@ -145,6 +148,16 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
Some(ErrorData::new(error)),
)
}
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3 |
VersionSpecificValidationError::NoParentBeaconBlockRootPostCancun,
),
) => jsonrpsee_types::error::ErrorObject::owned(
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
Some(ErrorData::new(error)),
),
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::UnsupportedFork,
) => jsonrpsee_types::error::ErrorObject::owned(
@@ -198,8 +211,6 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
mod tests {
use super::*;
use alloy_rpc_types_engine::ForkchoiceUpdateError;
use reth_payload_primitives::VersionSpecificValidationError;
#[track_caller]
fn ensure_engine_rpc_error(
code: i32,
@@ -265,5 +276,16 @@ mod tests {
),
),
);
// Beacon root shape mismatches on PayloadAttributes are reported as -38003.
ensure_engine_rpc_error(
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
EngineApiError::EngineObjectValidationError(
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::ParentBeaconBlockRootNotSupportedBeforeV3,
),
),
);
}
}

View File

@@ -74,8 +74,12 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
block_id: BlockId,
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send {
async move {
// If no pending block from provider, build the pending block locally.
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// If no pending block from provider, build the pending block locally.
if let Some(pending) = self.local_pending_block().await? {
return Ok(Some(pending.block.body().transaction_count()));
}
@@ -180,6 +184,10 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
{
async move {
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// First, try to get the pending block from the provider, in case we already
// received the actual pending block from the CL.
if let Some((block, receipts)) = self
@@ -284,6 +292,10 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt {
> + Send {
async move {
if block_id.is_pending() {
if self.pending_block_kind().is_none() {
return Ok(None);
}
// Pending block can be fetched directly without need for caching
if let Some(pending_block) =
self.provider().pending_block().map_err(Self::Error::from_eth_err)?

View File

@@ -20,8 +20,8 @@ use alloy_rpc_types_eth::{
use futures::Future;
use reth_errors::{ProviderError, RethError};
use reth_evm::{
env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor,
InspectorFor, TransactionEnv, TxEnvFor,
block::BlockExecutor, env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm,
EvmEnvFor, HaltReasonFor, InspectorFor, TransactionEnv, TxEnvFor,
};
use reth_node_api::BlockBody;
use reth_primitives_traits::Recovered;
@@ -746,6 +746,14 @@ pub trait Call:
self.spawn_with_state_at_block(parent_block, move |this, mut db| {
let block_txs = block.transactions_recovered();
// apply pre-execution changes (e.g. EIP-4788 beacon root, EIP-2935 blockhashes)
RpcNodeCore::evm_config(&this)
.executor_for_block(&mut db, block.sealed_block())
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?
.apply_pre_execution_changes()
.map_err(Self::Error::from_eth_err)?;
// replay all transactions prior to the targeted transaction
this.replay_transactions_until(&mut db, evm_env.clone(), block_txs, *tx.tx_hash())?;

View File

@@ -437,6 +437,8 @@ where
if replay_block_txs {
// only need to replay the transactions in the block if not all transactions are
// to be replayed
eth_api.apply_pre_execution_changes(&block, &mut db)?;
let transactions = block.transactions_recovered().take(num_txs);
// Execute all transactions until index

View File

@@ -155,6 +155,10 @@ where
return Ok(None)
};
if start_block == 0 {
return Ok(Some(ExecutionOutcome::default()))
}
let state_provider = self.provider().history_by_block_number(start_block - 1)?;
let db = reth_revm::database::StateProviderDatabase::new(&state_provider);

View File

@@ -476,26 +476,28 @@ where
&self,
block_id: BlockId,
) -> Result<Option<Vec<LocalizedTransactionTrace>>, Eth::Error> {
let traces = self.eth_api().trace_block_with(
block_id,
None,
TracingInspectorConfig::default_parity(),
|tx_info, mut ctx| {
let traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
);
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let block = self.eth_api().recovered_block(block_id);
let (maybe_traces, maybe_block) = futures::try_join!(traces, block)?;
let mut traces = self
.eth_api()
.trace_block_with(
block_id,
Some(block.clone()),
TracingInspectorConfig::default_parity(),
|tx_info, mut ctx| {
let traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await?
.map(|traces| traces.into_iter().flatten().collect::<Vec<_>>());
let mut maybe_traces =
maybe_traces.map(|traces| traces.into_iter().flatten().collect::<Vec<_>>());
if let (Some(block), Some(traces)) = (maybe_block, maybe_traces.as_mut()) &&
if let Some(traces) = traces.as_mut() &&
let Some(base_block_reward) = self.calculate_base_block_reward(block.header())?
{
traces.extend(self.extract_reward_traces(
@@ -505,7 +507,7 @@ where
));
}
Ok(maybe_traces)
Ok(traces)
}
/// Replays all transactions in a block
@@ -550,11 +552,15 @@ where
&self,
block_id: BlockId,
) -> Result<Option<BlockOpcodeGas>, Eth::Error> {
let res = self
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let Some(transactions) = self
.eth_api()
.trace_block_inspector(
block_id,
None,
Some(block.clone()),
OpcodeGasInspector::default,
move |tx_info, ctx| {
let trace = TransactionOpcodeGas {
@@ -564,11 +570,10 @@ where
Ok(trace)
},
)
.await?;
let Some(transactions) = res else { return Ok(None) };
let Some(block) = self.eth_api().recovered_block(block_id).await? else { return Ok(None) };
.await?
else {
return Ok(None);
};
Ok(Some(BlockOpcodeGas {
block_hash: block.hash(),
@@ -583,11 +588,15 @@ where
&self,
block_id: BlockId,
) -> Result<Option<BlockStorageAccess>, Eth::Error> {
let res = self
let Some(block) = self.eth_api().recovered_block(block_id).await? else {
return Err(EthApiError::HeaderNotFound(block_id).into());
};
let Some(transactions) = self
.eth_api()
.trace_block_inspector(
block_id,
None,
Some(block.clone()),
StorageInspector::default,
move |tx_info, ctx| {
let trace = TransactionStorageAccess {
@@ -599,11 +608,10 @@ where
Ok(trace)
},
)
.await?;
let Some(transactions) = res else { return Ok(None) };
let Some(block) = self.eth_api().recovered_block(block_id).await? else { return Ok(None) };
.await?
else {
return Ok(None);
};
Ok(Some(BlockStorageAccess {
block_hash: block.hash(),

View File

@@ -552,8 +552,7 @@ mod tests {
if storage_cursor
.seek_by_key_subkey(bn_address.address(), entry.key)?
.filter(|e| e.key == entry.key)
.is_some()
.is_some_and(|e| e.key == entry.key)
{
storage_cursor.delete_current()?;
}

View File

@@ -742,7 +742,7 @@ mod tests {
accounts.insert(key, (account, storage));
}
Ok(state_root_prehashed(accounts.into_iter()))
Ok(state_root_prehashed(accounts))
})?;
let static_file_provider = self.db.factory.static_file_provider();

View File

@@ -64,10 +64,8 @@ where
let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
for (key, indices) in cache.drain() {
let last = *indices.last().expect("qed");
collector.insert(
sharded_key_factory(key, last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
)?;
collector
.insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
}
Ok::<(), StageError>(())
};
@@ -129,10 +127,8 @@ where
let mut insert_fn = |address: Address, indices: Vec<u64>| {
let last = indices.last().expect("indices is non-empty");
collector.insert(
ShardedKey::new(address, *last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
)?;
collector
.insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
Ok(())
};
@@ -190,7 +186,7 @@ where
let last = indices.last().expect("qed");
collector.insert(
StorageShardedKey::new(key.0 .0, key.0 .1, *last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
BlockNumberList::new_pre_sorted(indices),
)?;
Ok::<(), StageError>(())
};

View File

@@ -424,8 +424,7 @@ impl TestStageDB {
let mut cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
if cursor
.seek_by_key_subkey(address, entry.key)?
.filter(|e| e.key == entry.key)
.is_some()
.is_some_and(|e| e.key == entry.key)
{
cursor.delete_current()?;
}
@@ -434,8 +433,7 @@ impl TestStageDB {
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
if cursor
.seek_by_key_subkey(hashed_address, hashed_entry.key)?
.filter(|e| e.key == hashed_entry.key)
.is_some()
.is_some_and(|e| e.key == hashed_entry.key)
{
cursor.delete_current()?;
}

View File

@@ -161,7 +161,7 @@ impl Environment {
mdbx_result(ffi::mdbx_env_stat_ex(
self.env_ptr(),
ptr::null(),
stat.mdb_stat(),
stat.mdbx_stat(),
size_of::<Stat>(),
))?;
Ok(stat)
@@ -305,13 +305,13 @@ unsafe impl Sync for EnvPtr {}
pub struct Stat(ffi::MDBX_stat);
impl Stat {
/// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
/// Create a new Stat with zero'd inner struct `ffi::MDBX_stat`.
pub(crate) const fn new() -> Self {
unsafe { Self(mem::zeroed()) }
}
/// Returns a mut pointer to `ffi::MDB_stat`.
pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
/// Returns a mut pointer to `ffi::MDBX_stat`.
pub(crate) const fn mdbx_stat(&mut self) -> *mut ffi::MDBX_stat {
&mut self.0
}
}

View File

@@ -177,7 +177,7 @@ where
self.env().txn_manager().remove_active_read_transaction(txn);
let mut latency = CommitLatency::new();
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) })
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdbx_commit_latency()) })
.map(|v| (v, latency))
} else {
let (sender, rx) = sync_channel(0);
@@ -246,7 +246,7 @@ where
unsafe {
let mut stat = Stat::new();
self.txn_execute(|txn| {
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdbx_stat(), size_of::<Stat>()))
})??;
Ok(stat)
}
@@ -647,7 +647,7 @@ impl CommitLatency {
}
/// Returns a mut pointer to `ffi::MDBX_commit_latency`.
pub(crate) const fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
pub(crate) const fn mdbx_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
&mut self.0
}
}

View File

@@ -100,7 +100,7 @@ impl TxnManager {
.send({
let mut latency = CommitLatency::new();
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
ffi::mdbx_txn_commit_ex(tx.0, latency.mdbx_commit_latency())
})
.map(|v| (v, latency))
})

View File

@@ -617,10 +617,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
let mut tx_num = tx_nums[i];
for transaction in recovered_block.body().transactions_iter() {
for (tx_num, transaction) in
(tx_nums[i]..).zip(recovered_block.body().transactions_iter())
{
all_tx_hashes.push((*transaction.tx_hash(), tx_num));
tx_num += 1;
}
}
@@ -2756,8 +2756,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: hashed_storage_key, value: *old_storage_value };
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, hashed_storage_key)?
.filter(|s| s.key == hashed_storage_key)
.is_some()
.is_some_and(|s| s.key == hashed_storage_key)
{
hashed_storage_cursor.delete_current()?
}
@@ -2798,8 +2797,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
.is_some_and(|s| s.key == *storage_key)
{
plain_storage_cursor.delete_current()?
}
@@ -2921,8 +2919,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: hashed_storage_key, value: *old_storage_value };
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, hashed_storage_key)?
.filter(|s| s.key == hashed_storage_key)
.is_some()
.is_some_and(|s| s.key == hashed_storage_key)
{
hashed_storage_cursor.delete_current()?
}
@@ -2965,8 +2962,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
.is_some_and(|s| s.key == *storage_key)
{
plain_storage_cursor.delete_current()?
}
@@ -3200,8 +3196,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
.is_some_and(|entry| entry.key == key)
{
hashed_storage.delete_current()?;
}
@@ -3248,8 +3243,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
.is_some_and(|entry| entry.key == key)
{
hashed_storage_cursor.delete_current()?;
}

View File

@@ -87,7 +87,7 @@ impl RocksDBProvider {
/// Heals the `TransactionHashNumbers` table.
///
/// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything
/// - Fast path: if checkpoint == 0, clear any stale data and return
/// - If `sf_tip` < checkpoint, return unwind target (static files behind)
/// - If `sf_tip` == checkpoint, nothing to do
/// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
@@ -112,11 +112,11 @@ impl RocksDBProvider {
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"TransactionHashNumbers has data but checkpoint is 0, clearing all"
"TransactionHashNumbers: checkpoint is 0, clearing stale data"
);
self.clear::<tables::TransactionHashNumbers>()?;
return Ok(None);
@@ -264,11 +264,11 @@ impl RocksDBProvider {
.map(|cp| cp.block_number)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::StoragesHistory>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"StoragesHistory has data but checkpoint is 0, clearing all"
"StoragesHistory: checkpoint is 0, clearing stale data"
);
self.clear::<tables::StoragesHistory>()?;
return Ok(None);
@@ -358,11 +358,11 @@ impl RocksDBProvider {
.map(|cp| cp.block_number)
.unwrap_or(0);
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
if checkpoint == 0 && self.first::<tables::AccountsHistory>()?.is_some() {
// Fast path: clear any stale data and return.
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"AccountsHistory has data but checkpoint is 0, clearing all"
"AccountsHistory: checkpoint is 0, clearing stale data"
);
self.clear::<tables::AccountsHistory>()?;
return Ok(None);
@@ -506,6 +506,35 @@ mod tests {
assert_eq!(result, None);
}
/// Tests that `checkpoint=0` with empty `RocksDB` returns early without attempting
/// an expensive healing loop. Previously, when `sf_tip` > `checkpoint=0`, the healer
/// would iterate billions of transactions from static files for no effect, causing
/// the node to hang on startup with MDBX read transaction timeouts.
#[test]
fn test_check_consistency_checkpoint_zero_empty_rocksdb_returns_early() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());
// No checkpoints set — all default to 0 via unwrap_or(0).
// RocksDB tables are empty.
let provider = factory.database_provider_ro().unwrap();
let result = rocksdb.heal_transaction_hash_numbers(&provider).unwrap();
assert_eq!(result, None, "TransactionHashNumbers should return early at checkpoint 0");
assert!(rocksdb.first::<tables::TransactionHashNumbers>().unwrap().is_none());
let result = rocksdb.heal_storages_history(&provider).unwrap();
assert_eq!(result, None, "StoragesHistory should return early at checkpoint 0");
assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_none());
let result = rocksdb.heal_accounts_history(&provider).unwrap();
assert_eq!(result, None, "AccountsHistory should return early at checkpoint 0");
assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_none());
}
#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -1285,10 +1285,8 @@ impl RocksDBProvider {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());

View File

@@ -1166,7 +1166,7 @@ where
Node: NodeTypes,
{
fn state_root(&self, hashed_state: HashedPostState) -> Result<B256, ProviderError> {
self.state_root_from_nodes(TrieInput::from_state(hashed_state))
self.state_root_with_updates(hashed_state).map(|(root, _)| root)
}
fn state_root_from_nodes(&self, _input: TrieInput) -> Result<B256, ProviderError> {

View File

@@ -802,7 +802,7 @@ impl RuntimeBuilder {
let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
let proof_storage_worker_threads =
config.rayon.proof_storage_worker_threads.unwrap_or(default_threads);
config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
let proof_storage_worker_pool = WorkerPool::from_builder(
rayon::ThreadPoolBuilder::new()
.num_threads(proof_storage_worker_threads)
@@ -810,7 +810,7 @@ impl RuntimeBuilder {
)?;
let proof_account_worker_threads =
config.rayon.proof_account_worker_threads.unwrap_or(default_threads);
config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
let proof_account_worker_pool = WorkerPool::from_builder(
rayon::ThreadPoolBuilder::new()
.num_threads(proof_account_worker_threads)

View File

@@ -21,7 +21,7 @@ fn generate_transactions(num_senders: usize, txs_per_sender: usize) -> Vec<MockT
for sender_idx in 0..num_senders {
// Create a unique sender address
let sender_bytes = sender_idx.to_be_bytes();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
// Generate transactions for this sender

View File

@@ -17,7 +17,7 @@ fn generate_transactions(num_senders: usize, txs_per_sender: usize) -> Vec<MockT
for sender_idx in 0..num_senders {
// Create a unique sender address
let sender_bytes = sender_idx.to_be_bytes();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(sender_bytes).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
// Generate transactions for this sender

View File

@@ -77,7 +77,7 @@ fn generate_many_transactions(
let idx_slice = idx.to_be_bytes();
// pad with 12 bytes of zeros before rest
let addr_slice = [0u8; 12].into_iter().chain(idx_slice.into_iter()).collect::<Vec<_>>();
let addr_slice = [0u8; 12].into_iter().chain(idx_slice).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
txs.extend(create_transactions_for_sender(&mut runner, sender, depth, only_eip4844));

View File

@@ -213,7 +213,7 @@ pub async fn maintain_transaction_pool<N, Client, P, St>(
dirty_addresses.remove(acc);
}
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let res = load_accounts(c, at, accs_to_reload);
let _ = tx.send(res);
}
.boxed()
@@ -221,7 +221,7 @@ pub async fn maintain_transaction_pool<N, Client, P, St>(
// can fetch all dirty accounts at once
let accs_to_reload = std::mem::take(&mut dirty_addresses);
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let res = load_accounts(c, at, accs_to_reload);
let _ = tx.send(res);
}
.boxed()

View File

@@ -142,9 +142,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
base_fee_per_blob_gas: u64,
) -> BestTransactionsWithFees<T> {
let mut best = self.best();
let mut submission_id = self.submission_id;
for tx in unlocked {
submission_id += 1;
for (submission_id, tx) in (self.submission_id + 1..).zip(unlocked) {
debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
let priority = self.ordering.priority(&tx.transaction, base_fee);
let tx_id = *tx.id();

View File

@@ -405,45 +405,37 @@ where
) -> Result<Tx, TransactionValidationOutcome<Tx>> {
// Checks for tx_type
match transaction.ty() {
LEGACY_TX_TYPE_ID => {
// Accept legacy transactions
// Accept only legacy transactions until EIP-2718/2930 activates
EIP2930_TX_TYPE_ID if !self.eip2718 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip2930Disabled.into(),
))
}
EIP2930_TX_TYPE_ID => {
// Accept only legacy transactions until EIP-2718/2930 activates
if !self.eip2718 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip2930Disabled.into(),
))
}
// Reject dynamic fee transactions until EIP-1559 activates.
EIP1559_TX_TYPE_ID if !self.eip1559 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip1559Disabled.into(),
))
}
EIP1559_TX_TYPE_ID => {
// Reject dynamic fee transactions until EIP-1559 activates.
if !self.eip1559 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip1559Disabled.into(),
))
}
// Reject blob transactions.
EIP4844_TX_TYPE_ID if !self.eip4844 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip4844Disabled.into(),
))
}
EIP4844_TX_TYPE_ID => {
// Reject blob transactions.
if !self.eip4844 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip4844Disabled.into(),
))
}
}
EIP7702_TX_TYPE_ID => {
// Reject EIP-7702 transactions.
if !self.eip7702 {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip7702Disabled.into(),
))
}
// Reject EIP-7702 transactions.
EIP7702_TX_TYPE_ID if !self.eip7702 => {
return Err(TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::Eip7702Disabled.into(),
))
}
// Accept known transaction types when their respective fork is active
LEGACY_TX_TYPE_ID | EIP2930_TX_TYPE_ID | EIP1559_TX_TYPE_ID | EIP4844_TX_TYPE_ID |
EIP7702_TX_TYPE_ID => {}
ty if !self.other_tx_types.bit(ty as usize) => {
return Err(TransactionValidationOutcome::Invalid(

View File

@@ -295,8 +295,8 @@ where
if self
.cursor
.seek_by_key_subkey(self.hashed_address, nibbles.clone())?
.filter(|e| *e.nibbles() == nibbles)
.is_some()
.as_ref()
.is_some_and(|e| *e.nibbles() == nibbles)
{
self.cursor.delete_current()?;
}

View File

@@ -404,8 +404,7 @@ fn account_and_storage_trie() {
if hashed_storage_cursor
.seek_by_key_subkey(key3, hashed_slot)
.unwrap()
.filter(|e| e.key == hashed_slot)
.is_some()
.is_some_and(|e| e.key == hashed_slot)
{
hashed_storage_cursor.delete_current().unwrap();
}

View File

@@ -118,7 +118,7 @@ fn includes_nodes_for_destroyed_storage_nodes() {
for node in multiproof.account_subtree.values() {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
for node in multiproof.storages.iter().flat_map(|(_, storage)| storage.subtree.values()) {
for node in multiproof.storages.values().flat_map(|storage| storage.subtree.values()) {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
});
@@ -172,7 +172,7 @@ fn correctly_decodes_branch_node_values() {
for node in multiproof.account_subtree.values() {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
for node in multiproof.storages.iter().flat_map(|(_, storage)| storage.subtree.values()) {
for node in multiproof.storages.values().flat_map(|storage| storage.subtree.values()) {
assert_eq!(witness.get(&keccak256(node)), Some(node));
}
});

View File

@@ -3820,7 +3820,7 @@ mod tests {
let mut stack = Vec::new();
let mut state_mask = TrieMask::default();
for (&idx, hash) in children_indices.iter().zip(child_hashes.into_iter()) {
for (&idx, hash) in children_indices.iter().zip(child_hashes) {
state_mask.set_bit(idx);
stack.push(hash);
}

View File

@@ -35,6 +35,9 @@
- [`reth db prune-checkpoints`](./reth/db/prune-checkpoints.mdx)
- [`reth db prune-checkpoints get`](./reth/db/prune-checkpoints/get.mdx)
- [`reth db prune-checkpoints set`](./reth/db/prune-checkpoints/set.mdx)
- [`reth db stage-checkpoints`](./reth/db/stage-checkpoints.mdx)
- [`reth db stage-checkpoints get`](./reth/db/stage-checkpoints/get.mdx)
- [`reth db stage-checkpoints set`](./reth/db/stage-checkpoints/set.mdx)
- [`reth db account-storage`](./reth/db/account-storage.mdx)
- [`reth db state`](./reth/db/state.mdx)
- [`reth download`](./reth/download.mdx)

View File

@@ -46,7 +46,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -32,7 +32,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -23,6 +23,7 @@ Commands:
path Returns the full database path
settings Manage storage settings
prune-checkpoints View or set prune checkpoints
stage-checkpoints `reth db stage-checkpoints` subcommand
account-storage Gets storage size information for an account
state Gets account state and storage at a specific block
help Print this message or the help of the given subcommand(s)
@@ -154,7 +155,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -42,7 +42,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -49,7 +49,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -48,7 +48,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -57,7 +57,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -46,7 +46,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -49,7 +49,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -92,7 +92,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -39,7 +39,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -55,7 +55,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -55,7 +55,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -82,7 +82,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -36,7 +36,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -58,7 +58,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -44,7 +44,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -36,7 +36,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -0,0 +1,171 @@
# reth db stage-checkpoints
`reth db stage-checkpoints` subcommand
```bash
$ reth db stage-checkpoints --help
```
```txt
Usage: reth db stage-checkpoints [OPTIONS] <COMMAND>
Commands:
get Get stage checkpoint(s) from database
set Set a stage checkpoint
help Print this message or the help of the given subcommand(s)
Options:
-h, --help
Print help (see a summary with '-h')
Datadir:
--chain <CHAIN_OR_PATH>
The chain this node is running.
Possible values are either a built-in chain or the path to a chain specification file.
Built-in chains:
mainnet, sepolia, holesky, hoodi, dev
[default: mainnet]
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.name <NAME>
The prefix name of the log files
[default: reth.log]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled.
Default: 5 for `node` command, 0 for non-node utility subcommands.
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
Possible values:
- always: Colors on
- auto: Auto-detect
- never: Colors off
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
Tracing:
--tracing-otlp[=<URL>]
Enable `Opentelemetry` tracing export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/traces` - gRPC: `http://localhost:4317`
Example: --tracing-otlp=http://collector:4318/v1/traces
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.
Possible values:
- http: HTTP/Protobuf transport, port 4318, requires `/v1/traces` path
- grpc: gRPC transport, port 4317
[env: OTEL_EXPORTER_OTLP_PROTOCOL=]
[default: http]
--tracing-otlp.filter <FILTER>
Set a filter directive for the OTLP tracer. This controls the verbosity of spans and events sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --tracing-otlp.filter=info,reth=debug,hyper_util=off
Defaults to TRACE if not specified.
[default: debug]
--tracing-otlp.sample-ratio <RATIO>
Trace sampling ratio to control the percentage of traces to export.
Valid range: 0.0 to 1.0 - 1.0, default: Sample all traces - 0.01: Sample 1% of traces - 0.0: Disable sampling
Example: --tracing-otlp.sample-ratio=0.0.
[env: OTEL_TRACES_SAMPLER_ARG=]
```

View File

@@ -0,0 +1,171 @@
# reth db stage-checkpoints get
Get stage checkpoint(s) from database
```bash
$ reth db stage-checkpoints get --help
```
```txt
Usage: reth db stage-checkpoints get [OPTIONS]
Options:
--stage <STAGE>
Specific stage to query. If omitted, shows all stages
[possible values: era, headers, bodies, sender-recovery, execution, prune-sender-recovery, merkle-unwind, account-hashing, storage-hashing, merkle-execute, transaction-lookup, index-storage-history, index-account-history, prune, finish]
-h, --help
Print help (see a summary with '-h')
Datadir:
--chain <CHAIN_OR_PATH>
The chain this node is running.
Possible values are either a built-in chain or the path to a chain specification file.
Built-in chains:
mainnet, sepolia, holesky, hoodi, dev
[default: mainnet]
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.name <NAME>
The prefix name of the log files
[default: reth.log]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled.
Default: 5 for `node` command, 0 for non-node utility subcommands.
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
Possible values:
- always: Colors on
- auto: Auto-detect
- never: Colors off
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
Tracing:
--tracing-otlp[=<URL>]
Enable `Opentelemetry` tracing export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/traces` - gRPC: `http://localhost:4317`
Example: --tracing-otlp=http://collector:4318/v1/traces
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.
Possible values:
- http: HTTP/Protobuf transport, port 4318, requires `/v1/traces` path
- grpc: gRPC transport, port 4317
[env: OTEL_EXPORTER_OTLP_PROTOCOL=]
[default: http]
--tracing-otlp.filter <FILTER>
Set a filter directive for the OTLP tracer. This controls the verbosity of spans and events sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --tracing-otlp.filter=info,reth=debug,hyper_util=off
Defaults to TRACE if not specified.
[default: debug]
--tracing-otlp.sample-ratio <RATIO>
Trace sampling ratio to control the percentage of traces to export.
Valid range: 0.0 to 1.0 - 1.0, default: Sample all traces - 0.01: Sample 1% of traces - 0.0: Disable sampling
Example: --tracing-otlp.sample-ratio=0.0.
[env: OTEL_TRACES_SAMPLER_ARG=]
```

View File

@@ -0,0 +1,177 @@
# reth db stage-checkpoints set
Set a stage checkpoint
```bash
$ reth db stage-checkpoints set --help
```
```txt
Usage: reth db stage-checkpoints set [OPTIONS] --stage <STAGE> --block-number <BLOCK_NUMBER>
Options:
--stage <STAGE>
Stage to update
[possible values: era, headers, bodies, sender-recovery, execution, prune-sender-recovery, merkle-unwind, account-hashing, storage-hashing, merkle-execute, transaction-lookup, index-storage-history, index-account-history, prune, finish]
--block-number <BLOCK_NUMBER>
Block number to set as stage checkpoint
--clear-stage-unit
Clear stage-specific unit checkpoint payload
-h, --help
Print help (see a summary with '-h')
Datadir:
--chain <CHAIN_OR_PATH>
The chain this node is running.
Possible values are either a built-in chain or the path to a chain specification file.
Built-in chains:
mainnet, sepolia, holesky, hoodi, dev
[default: mainnet]
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.name <NAME>
The prefix name of the log files
[default: reth.log]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled.
Default: 5 for `node` command, 0 for non-node utility subcommands.
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
Possible values:
- always: Colors on
- auto: Auto-detect
- never: Colors off
[default: always]
--logs-otlp[=<URL>]
Enable `Opentelemetry` logs export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
Example: --logs-otlp=http://collector:4318/v1/logs
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
--logs-otlp.filter <FILTER>
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --logs-otlp.filter=info,reth=debug
Defaults to INFO if not specified.
[default: info]
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
Tracing:
--tracing-otlp[=<URL>]
Enable `Opentelemetry` tracing export to an OTLP endpoint.
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/traces` - gRPC: `http://localhost:4317`
Example: --tracing-otlp=http://collector:4318/v1/traces
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
--tracing-otlp-protocol <PROTOCOL>
OTLP transport protocol to use for exporting traces and logs.
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
Defaults to HTTP if not specified.
Possible values:
- http: HTTP/Protobuf transport, port 4318, requires `/v1/traces` path
- grpc: gRPC transport, port 4317
[env: OTEL_EXPORTER_OTLP_PROTOCOL=]
[default: http]
--tracing-otlp.filter <FILTER>
Set a filter directive for the OTLP tracer. This controls the verbosity of spans and events sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
Example: --tracing-otlp.filter=info,reth=debug,hyper_util=off
Defaults to TRACE if not specified.
[default: debug]
--tracing-otlp.sample-ratio <RATIO>
Trace sampling ratio to control the percentage of traces to export.
Valid range: 0.0 to 1.0 - 1.0, default: Sample all traces - 0.01: Sample 1% of traces - 0.0: Disable sampling
Example: --tracing-otlp.sample-ratio=0.0.
[env: OTEL_TRACES_SAMPLER_ARG=]
```

View File

@@ -54,7 +54,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -41,7 +41,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -51,7 +51,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -40,7 +40,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

View File

@@ -52,7 +52,7 @@ Logging:
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
[default: ""]
--log.file.format <FORMAT>
The format to use for logs written to the log file

Some files were not shown because too many files have changed in this diff Show More