Compare commits

...

12 Commits

Author SHA1 Message Date
DaniPopes
fcfa8287f6 chore(mdbx): replace deprecated MDBX_NOTLS with MDBX_NOSTICKYTHREADS (#23378) 2026-04-30 03:19:09 +00:00
Arsenii Kulikov
d25de30050 feat: customizable discovery defaults (#23843)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-29 22:59:10 +00:00
Derek Cofausper
4ffde69d94 fix(engine): apply finalized state after syncing FCU head import (#23838)
Co-authored-by: Centaur AI <ai@centaur.local>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: klkvr <klkvrr@gmail.com>
2026-04-29 15:29:42 +00:00
Arsenii Kulikov
077e5eecfe chore: don't enforce non-empty blocks in e2e payload building (#23837)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-29 14:36:44 +00:00
Sergei Shulepov
709485dcb7 perf(bench): buffer RPC fetches in generate-big-block (#23830)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 12:57:05 +00:00
Arsenii Kulikov
88505c7fcb fix(re-execute): properly handle selfdestructed storage slots (#23832) 2026-04-29 12:30:27 +00:00
Emma Jamieson-Hoare
c14bc59236 chore: release 2.2.0 (#23831)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-29 12:25:15 +00:00
Derek Cofausper
347c1325cc fix: skip move_to_static_files for storage.v2 (#23814) 2026-04-29 12:19:43 +00:00
Matthias Seitz
5f85eb7ac8 feat(engine): add getBlobsV4 endpoint (#23767) 2026-04-29 12:02:25 +00:00
Brian Picciano
a12454d2e6 perf(db): prebind cursor operation metrics (#23654)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-29 11:40:17 +00:00
Matthias Seitz
c194c17a27 chore(deps): bump alloy to 2.0.4 (#23828) 2026-04-29 13:41:02 +02:00
figtracer
43a7452b0e fix(rpc): narrow getLogs retry range (#23818)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-29 10:09:49 +00:00
35 changed files with 1284 additions and 495 deletions

532
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "2.1.0"
version = "2.2.0"
edition = "2024"
rust-version = "1.93"
license = "MIT OR Apache-2.0"
@@ -456,33 +456,33 @@ alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.7"
alloy-consensus = { version = "2.0.1", default-features = false }
alloy-contract = { version = "2.0.1", default-features = false }
alloy-eips = { version = "2.0.1", default-features = false }
alloy-genesis = { version = "2.0.1", default-features = false }
alloy-json-rpc = { version = "2.0.1", default-features = false }
alloy-network = { version = "2.0.1", default-features = false }
alloy-network-primitives = { version = "2.0.1", default-features = false }
alloy-provider = { version = "2.0.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "2.0.1", default-features = false }
alloy-rpc-client = { version = "2.0.1", default-features = false }
alloy-rpc-types = { version = "2.0.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "2.0.1", default-features = false }
alloy-rpc-types-anvil = { version = "2.0.1", default-features = false }
alloy-rpc-types-beacon = { version = "2.0.1", default-features = false }
alloy-rpc-types-debug = { version = "2.0.1", default-features = false }
alloy-rpc-types-engine = { version = "2.0.1", default-features = false }
alloy-rpc-types-eth = { version = "2.0.1", default-features = false }
alloy-rpc-types-mev = { version = "2.0.1", default-features = false }
alloy-rpc-types-trace = { version = "2.0.1", default-features = false }
alloy-rpc-types-txpool = { version = "2.0.1", default-features = false }
alloy-serde = { version = "2.0.1", default-features = false }
alloy-signer = { version = "2.0.1", default-features = false }
alloy-signer-local = { version = "2.0.1", default-features = false }
alloy-transport = { version = "2.0.1" }
alloy-transport-http = { version = "2.0.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "2.0.1", default-features = false }
alloy-transport-ws = { version = "2.0.1", default-features = false }
alloy-consensus = { version = "2.0.4", default-features = false }
alloy-contract = { version = "2.0.4", default-features = false }
alloy-eips = { version = "2.0.4", default-features = false }
alloy-genesis = { version = "2.0.4", default-features = false }
alloy-json-rpc = { version = "2.0.4", default-features = false }
alloy-network = { version = "2.0.4", default-features = false }
alloy-network-primitives = { version = "2.0.4", default-features = false }
alloy-provider = { version = "2.0.4", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "2.0.4", default-features = false }
alloy-rpc-client = { version = "2.0.4", default-features = false }
alloy-rpc-types = { version = "2.0.4", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "2.0.4", default-features = false }
alloy-rpc-types-anvil = { version = "2.0.4", default-features = false }
alloy-rpc-types-beacon = { version = "2.0.4", default-features = false }
alloy-rpc-types-debug = { version = "2.0.4", default-features = false }
alloy-rpc-types-engine = { version = "2.0.4", default-features = false }
alloy-rpc-types-eth = { version = "2.0.4", default-features = false }
alloy-rpc-types-mev = { version = "2.0.4", default-features = false }
alloy-rpc-types-trace = { version = "2.0.4", default-features = false }
alloy-rpc-types-txpool = { version = "2.0.4", default-features = false }
alloy-serde = { version = "2.0.4", default-features = false }
alloy-signer = { version = "2.0.4", default-features = false }
alloy-signer-local = { version = "2.0.4", default-features = false }
alloy-transport = { version = "2.0.4" }
alloy-transport-http = { version = "2.0.4", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "2.0.4", default-features = false }
alloy-transport-ws = { version = "2.0.4", default-features = false }
# misc
either = { version = "1.15.0", default-features = false }

View File

@@ -21,6 +21,7 @@ use alloy_rpc_types_engine::{
};
use clap::Parser;
use eyre::Context;
use futures::{stream, StreamExt};
use reth_chainspec::EthChainSpec;
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
@@ -270,6 +271,15 @@ pub struct Command {
/// the flattened BAL on the stored payload.
#[arg(long, default_value_t = false)]
bal: bool,
/// Maximum number of in-flight RPC fetches to keep buffered ahead of the merger.
///
/// Each entry is one full per-block fetch (block + receipts, plus BAL when `--bal` is
/// set). Larger values absorb RPC latency at the cost of more concurrent connections
/// and memory; the buffer persists across `--num-big-blocks` so prefetching continues
/// across big-block boundaries.
#[arg(long, value_name = "PREFETCH_BUFFER", default_value_t = 32)]
prefetch_buffer: usize,
}
impl Command {
@@ -322,13 +332,27 @@ impl Command {
}
let mut prev_big_block_header: Option<PrevBigBlockHeader> = None;
// Track the next block to fetch across big blocks so they don't overlap.
// Persistent prefetch stream: keeps `prefetch_buffer` per-block fetches in flight
// ahead of the merger across all big blocks. Each item is a fully materialized
// `FetchedBlock` (or `None` once the chain tip is reached on this fetch).
let prefetch_buffer = self.prefetch_buffer.max(1);
let bal_enabled = self.bal;
let block_stream = stream::iter(self.from_block..)
.map(|block_number| {
let provider = provider.clone();
async move { fetch_one_block(provider, block_number, bal_enabled).await }
})
.buffered(prefetch_buffer);
let mut block_stream = Box::pin(block_stream);
// Track the next block number we expect from the stream (purely for logging /
// big-block range bookkeeping; the stream produces blocks in `from_block..` order).
let mut next_block = self.from_block;
for big_block_idx in 0..self.num_big_blocks {
let range_start = next_block;
// Fetch consecutive blocks until the gas target is reached.
// Drain the prefetch stream until the gas target is reached for this big block.
let mut blocks = Vec::new();
let mut block_receipts: Vec<Vec<Receipt>> = Vec::new();
let mut block_access_lists: Vec<Option<BlockAccessList>> = Vec::new();
@@ -337,16 +361,11 @@ impl Command {
let mut reached_chain_tip = false;
while accumulated_block_gas < self.target_gas {
let block_number = next_block;
info!(target: "reth-bench", block_number, big_block = big_block_idx, "Fetching block");
info!(target: "reth-bench", block_number, big_block = big_block_idx, "Awaiting prefetched block");
let fetch_result = tokio::try_join!(
provider.get_block_by_number(block_number.into()).full(),
provider.get_block_receipts(block_number.into()),
);
let (rpc_block, receipts) = match fetch_result {
Ok((Some(block), Some(receipts))) => (block, receipts),
Ok((None, _) | (_, None)) => {
let fetched = match block_stream.next().await {
Some(Ok(Some(fetched))) => fetched,
Some(Ok(None)) => {
warn!(
target: "reth-bench",
block_number,
@@ -355,52 +374,16 @@ impl Command {
reached_chain_tip = true;
break;
}
Err(e) => return Err(e.into()),
Some(Err(e)) => return Err(e),
// The block-number stream is open-ended; this only fires if the
// upstream `iter(from..)` is somehow exhausted.
None => {
reached_chain_tip = true;
break;
}
};
let block_access_list = if self.bal {
Some(fetch_block_access_list(&provider, block_number).await.wrap_err_with(
|| format!("Failed to fetch BAL for block {block_number}"),
)?)
} else {
None
};
// Convert RPC receipts to consensus receipts
let consensus_receipts: Vec<Receipt> = receipts
.iter()
.map(|r| {
let inner = &r.inner.inner.inner;
let tx_type = r.inner.inner.r#type.try_into().unwrap_or_default();
Receipt {
tx_type,
success: inner.receipt.status.coerce_status(),
cumulative_gas_used: inner.receipt.cumulative_gas_used,
logs: inner
.receipt
.logs
.iter()
.map(|log| alloy_primitives::Log {
address: log.inner.address,
data: log.inner.data.clone(),
})
.collect(),
}
})
.collect();
// Convert to consensus block
let block = rpc_block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
.try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
})?
.into_consensus();
// Convert to ExecutionData
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
let execution_data = ExecutionData { payload, sidecar };
let FetchedBlock { execution_data, consensus_receipts, block_access_list } =
fetched;
let block_gas = execution_data.payload.as_v1().gas_used;
let block_blob_gas =
@@ -674,6 +657,79 @@ impl Command {
}
}
/// One fully-materialized block fetched by the prefetcher.
struct FetchedBlock {
/// Execution payload with sidecar derived from the RPC block.
execution_data: ExecutionData,
/// Consensus-format receipts (`cumulative_gas_used` is still per-block, callers offset
/// it when merging).
consensus_receipts: Vec<Receipt>,
/// `eth_getBlockAccessListByBlockNumber` result when `--bal` is enabled.
block_access_list: Option<BlockAccessList>,
}
/// Fetches one block + receipts (and optionally its BAL) from the RPC. Returns `Ok(None)`
/// when the block doesn't exist yet (chain-tip reached).
async fn fetch_one_block(
provider: RootProvider<AnyNetwork>,
block_number: u64,
bal_enabled: bool,
) -> eyre::Result<Option<FetchedBlock>> {
let (rpc_block, receipts) = tokio::try_join!(
provider.get_block_by_number(block_number.into()).full(),
provider.get_block_receipts(block_number.into()),
)?;
let (rpc_block, receipts) = match (rpc_block, receipts) {
(Some(b), Some(r)) => (b, r),
_ => return Ok(None),
};
let block_access_list = if bal_enabled {
Some(
fetch_block_access_list(&provider, block_number)
.await
.wrap_err_with(|| format!("Failed to fetch BAL for block {block_number}"))?,
)
} else {
None
};
let consensus_receipts: Vec<Receipt> = receipts
.iter()
.map(|r| {
let inner = &r.inner.inner.inner;
let tx_type = r.inner.inner.r#type.try_into().unwrap_or_default();
Receipt {
tx_type,
success: inner.receipt.status.coerce_status(),
cumulative_gas_used: inner.receipt.cumulative_gas_used,
logs: inner
.receipt
.logs
.iter()
.map(|log| alloy_primitives::Log {
address: log.inner.address,
data: log.inner.data.clone(),
})
.collect(),
}
})
.collect();
let block = rpc_block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
.try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
})?
.into_consensus();
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
let execution_data = ExecutionData { payload, sidecar };
Ok(Some(FetchedBlock { execution_data, consensus_receipts, block_access_list }))
}
fn merge_block_access_list(
merged: &mut BlockAccessList,
incoming: BlockAccessList,

View File

@@ -20,7 +20,10 @@ use reth_provider::{
};
use reth_revm::{
database::StateProviderDatabase,
db::{states::reverts::AccountInfoRevert, BundleState},
db::{
states::reverts::{AccountInfoRevert, RevertToSlot},
BundleState,
},
};
use reth_stages::stages::calculate_gas_used_from_headers;
use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
@@ -425,14 +428,19 @@ where
let mut cs_slots = cs_storage.get_mut(addr);
for (slot_key, revert_slot) in &revert.storage {
let b256_key = B256::from(*slot_key);
match cs_slots.as_mut().and_then(|s| s.remove(&b256_key)) {
Some(cs_value) => eyre::ensure!(
revert_slot.to_previous_value() == cs_value,
let cs_value = cs_slots.as_mut().and_then(|s| s.remove(&b256_key));
match (revert_slot, cs_value) {
// When a contract is selfdestructed and re-created at the same address
// within the same block, revm marks slots touched by the new contract
// as `Destroyed` and never reads the original DB value, so
// `to_previous_value()` would resolve to zero, which might be wrong.
(RevertToSlot::Destroyed, _) => {}
(RevertToSlot::Some(prev), Some(cs_value)) => eyre::ensure!(
*prev == cs_value,
"Block {block_number}: {addr} slot {b256_key} mismatch: \
revert={} cs={cs_value}",
revert_slot.to_previous_value(),
revert={prev} cs={cs_value}",
),
None => eyre::ensure!(
(RevertToSlot::Some(_), None) => eyre::ensure!(
revert.wipe_storage,
"Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
),

View File

@@ -1,8 +1,8 @@
use futures_util::StreamExt;
use reth_node_api::{BlockBody, PayloadAttributes, PayloadKind};
use reth_node_api::{PayloadAttributes, PayloadKind};
use reth_payload_builder::{PayloadBuilderHandle, PayloadId};
use reth_payload_builder_primitives::Events;
use reth_payload_primitives::{BuiltPayload, PayloadTypes};
use reth_payload_primitives::PayloadTypes;
use tokio_stream::wrappers::BroadcastStream;
/// Helper for payload operations
@@ -53,27 +53,11 @@ impl<T: PayloadTypes> PayloadTestContext<T> {
///
/// Panics if the payload builder does not produce a non-empty payload within 30 seconds.
pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
let start = std::time::Instant::now();
loop {
let payload =
self.payload_builder.best_payload(payload_id).await.transpose().ok().flatten();
if payload.is_none_or(|p| p.block().body().transactions().is_empty()) {
assert!(
start.elapsed() < std::time::Duration::from_secs(30),
"timed out waiting for a non-empty payload for {payload_id} — \
check that the chain spec supports all generated tx types"
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
continue
}
// Resolve payload once its built
self.payload_builder
.resolve_kind(payload_id, PayloadKind::Earliest)
.await
.unwrap()
.unwrap();
break;
}
self.payload_builder
.resolve_kind(payload_id, PayloadKind::WaitForPending)
.await
.unwrap()
.unwrap();
}
/// Expects the next event to be a built payload event or panics

View File

@@ -21,7 +21,8 @@ impl ForkchoiceStateTracker {
/// `sync_target` to `None`, since we're now fully synced.
pub const fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
if status.is_valid() {
self.set_valid(state);
self.last_syncing = None;
self.last_valid = Some(state);
} else if status.is_syncing() {
self.last_syncing = Some(state);
}
@@ -30,11 +31,24 @@ impl ForkchoiceStateTracker {
self.latest = Some(received);
}
const fn set_valid(&mut self, state: ForkchoiceState) {
// we no longer need to sync to this state.
/// Promotes a previously tracked syncing forkchoice state to valid, without overwriting a
/// newer `latest` state.
///
/// This is used when a `Syncing` FCU's head finally becomes canonical via the downloaded-block
/// flow, so the safe/finalized anchors of that FCU can be applied. Unlike
/// [`Self::set_latest`], this preserves a newer `latest` (e.g. an `Invalid` FCU received
/// after the syncing one) and only flips `latest` to `Valid` when it still refers to the same
/// syncing FCU being promoted.
pub fn promote_sync_target_to_valid(&mut self, state: ForkchoiceState) {
self.last_syncing = None;
self.last_valid = Some(state);
if let Some(received) = self.latest.as_mut() &&
received.state == state &&
received.status.is_syncing()
{
received.status = ForkchoiceStatus::Valid;
}
}
/// Returns the [`ForkchoiceStatus`] of the latest received FCU.

View File

@@ -1917,9 +1917,37 @@ where
self.on_canonical_chain_update(chain_update);
}
self.on_canonicalized_sync_target(target);
Ok(())
}
/// Applies the tracked forkchoice state once its sync target head becomes canonical.
fn on_canonicalized_sync_target(&mut self, target: B256) {
let Some(sync_target_state) = self
.state
.forkchoice_state_tracker
.sync_target_state()
.filter(|state| state.head_block_hash == target)
else {
return;
};
if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
debug!(
target: "engine::tree",
head = %sync_target_state.head_block_hash,
safe = %sync_target_state.safe_block_hash,
finalized = %sync_target_state.finalized_block_hash,
?outcome,
"Canonicalized sync target head before safe/finalized could be applied"
);
return;
}
self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
}
/// Convenience function to handle an optional tree event.
fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
if let Some(event) = event {

View File

@@ -13,7 +13,7 @@ use std::{hash::Hash, sync::Arc};
use tracing::error;
/// Default max cache size for [`PrecompileCache`]
const MAX_CACHE_SIZE: u32 = 10_000;
const MAX_CACHE_SIZE: u32 = 1024 * 1024;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
@@ -54,6 +54,9 @@ where
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
.initial_capacity(MAX_CACHE_SIZE as usize)
.eviction_policy(EvictionPolicy::lru())
.weigher(|key: &Bytes, value: &CacheEntry<S>| {
(key.len() + value.output.bytes.len()) as u32
})
.build_with_hasher(Default::default()),
)
}

View File

@@ -2254,3 +2254,65 @@ fn test_on_valid_downloaded_head_sync_target_returns_make_canonical() {
other => panic!("Expected MakeCanonical for head block, got: {other:?}"),
}
}
/// Tests that canonicalizing a downloaded sync target head also applies the tracked finalized
/// block from the original `SYNCING` forkchoice state.
#[test]
fn test_canonicalizing_downloaded_sync_target_head_updates_finalized() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..3).collect();
let genesis = &blocks[0];
let finalized_block = &blocks[1];
let head_block = &blocks[2];
test_harness = test_harness.with_blocks(vec![
genesis.clone(),
finalized_block.clone(),
head_block.clone(),
]);
let finalized_num_hash = finalized_block.recovered_block().num_hash();
let head_num_hash = head_block.recovered_block().num_hash();
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
let fcu_state = ForkchoiceState {
head_block_hash: head_num_hash.hash,
safe_block_hash: head_num_hash.hash,
finalized_block_hash: finalized_num_hash.hash,
};
test_harness
.tree
.state
.forkchoice_state_tracker
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
let event = test_harness
.tree
.on_valid_downloaded_block(head_num_hash)
.unwrap()
.expect("expected canonicalization event for sync target head");
test_harness.tree.on_tree_event(event).unwrap();
assert_eq!(test_harness.tree.state.tree_state.canonical_block_hash(), head_num_hash.hash);
assert_eq!(
test_harness.tree.canonical_in_memory_state.get_finalized_num_hash(),
Some(finalized_num_hash),
"Finalized block from the syncing FCU should be applied once the head becomes canonical"
);
assert_eq!(
test_harness.tree.canonical_in_memory_state.get_safe_num_hash(),
Some(head_num_hash),
"Safe block from the syncing FCU should be applied once the head becomes canonical"
);
assert_eq!(
test_harness.tree.state.forkchoice_state_tracker.last_valid_state(),
Some(fcu_state)
);
assert!(test_harness.tree.state.forkchoice_state_tracker.sync_target_state().is_none());
}

View File

@@ -2,7 +2,7 @@
/// NetworkArg struct for configuring the network
mod network;
pub use network::{DefaultNetworkArgs, DiscoveryArgs, NetworkArgs};
pub use network::{DefaultDiscoveryArgs, DefaultNetworkArgs, DiscoveryArgs, NetworkArgs};
/// RpcServerArg struct for configuring the RPC
mod rpc_server;

View File

@@ -11,7 +11,10 @@ use std::{
};
use crate::version::version_metadata;
use clap::Args;
use clap::{
builder::{OsStr, Resettable},
Args,
};
use reth_chainspec::EthChainSpec;
use reth_cli_util::{get_secret_key, load_secret_key::SecretKeyError};
use reth_config::Config;
@@ -569,15 +572,8 @@ impl NetworkArgs {
let rlpx_socket = (addr, self.port).into();
self.discovery.apply_to_builder(builder, rlpx_socket, chain_bootnodes)
})
.listener_addr(SocketAddr::new(
addr, // set discovery port based on instance number
self.port,
))
.discovery_addr(SocketAddr::new(
self.discovery.addr,
// set discovery port based on instance number
self.discovery.port,
))
.listener_addr(SocketAddr::new(addr, self.port))
.discovery_addr(SocketAddr::new(self.discovery.addr, self.discovery.port))
.disable_tx_gossip(self.disable_tx_gossip)
.required_block_hashes(self.required_block_hashes.clone())
.eth_max_message_size_opt(self.eth_max_message_size.map(NonZeroUsize::get))
@@ -727,19 +723,172 @@ impl Default for NetworkArgs {
}
}
/// Global static discovery defaults
static DISCOVERY_DEFAULTS: OnceLock<DefaultDiscoveryArgs> = OnceLock::new();
/// Default values for discovery CLI arguments that can be customized.
#[derive(Debug, Clone, Copy)]
pub struct DefaultDiscoveryArgs {
/// Default for `--disable-discovery`.
pub disable_discovery: bool,
/// Default for `--disable-dns-discovery`.
pub disable_dns_discovery: bool,
/// Default for `--disable-discv4-discovery`.
pub disable_discv4_discovery: bool,
/// Default for `--disable-discv5-discovery`.
pub disable_discv5_discovery: bool,
/// Default for `--disable-nat`.
pub disable_nat: bool,
/// Default UDP address for devp2p discovery v4.
pub addr: IpAddr,
/// Default UDP port for devp2p discovery v4.
pub port: u16,
/// Default UDP IPv4 address for devp2p discovery v5.
pub discv5_addr: Option<Ipv4Addr>,
/// Default UDP IPv6 address for devp2p discovery v5.
pub discv5_addr_ipv6: Option<Ipv6Addr>,
/// Default UDP IPv4 port for devp2p discovery v5.
pub discv5_port: Option<u16>,
/// Default UDP IPv6 port for devp2p discovery v5.
pub discv5_port_ipv6: Option<u16>,
/// Default discv5 periodic lookup interval (seconds).
pub discv5_lookup_interval: u64,
/// Default discv5 bootstrap lookup interval (seconds).
pub discv5_bootstrap_lookup_interval: u64,
/// Default discv5 bootstrap lookup countdown.
pub discv5_bootstrap_lookup_countdown: u64,
}
impl DefaultDiscoveryArgs {
/// Initialize the global discovery defaults with this configuration.
pub fn try_init(self) -> Result<(), Self> {
DISCOVERY_DEFAULTS.set(self)
}
/// Get a reference to the global discovery defaults.
pub fn get_global() -> &'static Self {
DISCOVERY_DEFAULTS.get_or_init(Self::default)
}
/// Set the default for `--disable-discovery`.
pub const fn with_disable_discovery(mut self, disable: bool) -> Self {
self.disable_discovery = disable;
self
}
/// Set the default for `--disable-dns-discovery`.
pub const fn with_disable_dns_discovery(mut self, disable: bool) -> Self {
self.disable_dns_discovery = disable;
self
}
/// Set the default for `--disable-discv4-discovery`.
pub const fn with_disable_discv4_discovery(mut self, disable: bool) -> Self {
self.disable_discv4_discovery = disable;
self
}
/// Set the default for `--disable-discv5-discovery`.
pub const fn with_disable_discv5_discovery(mut self, disable: bool) -> Self {
self.disable_discv5_discovery = disable;
self
}
/// Set the default for `--disable-nat`.
pub const fn with_disable_nat(mut self, disable: bool) -> Self {
self.disable_nat = disable;
self
}
/// Set the default discovery v4 address.
pub const fn with_addr(mut self, addr: IpAddr) -> Self {
self.addr = addr;
self
}
/// Set the default discovery v4 port.
pub const fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
/// Set the default discovery v5 IPv4 address.
pub fn with_discv5_addr(mut self, addr: impl Into<Option<Ipv4Addr>>) -> Self {
self.discv5_addr = addr.into();
self
}
/// Set the default discovery v5 IPv6 address.
pub fn with_discv5_addr_ipv6(mut self, addr: impl Into<Option<Ipv6Addr>>) -> Self {
self.discv5_addr_ipv6 = addr.into();
self
}
/// Set the default discovery V5 port.
pub fn with_discv5_port(mut self, port: impl Into<Option<u16>>) -> Self {
self.discv5_port = port.into();
self
}
/// Set the default discovery v5 IPv6 port.
pub fn with_discv5_port_ipv6(mut self, port: impl Into<Option<u16>>) -> Self {
self.discv5_port_ipv6 = port.into();
self
}
/// Set the default discv5 periodic lookup interval (seconds).
pub const fn with_discv5_lookup_interval(mut self, interval: u64) -> Self {
self.discv5_lookup_interval = interval;
self
}
/// Set the default discv5 bootstrap lookup interval (seconds).
pub const fn with_discv5_bootstrap_lookup_interval(mut self, interval: u64) -> Self {
self.discv5_bootstrap_lookup_interval = interval;
self
}
/// Set the default discv5 bootstrap lookup countdown.
pub const fn with_discv5_bootstrap_lookup_countdown(mut self, countdown: u64) -> Self {
self.discv5_bootstrap_lookup_countdown = countdown;
self
}
}
impl Default for DefaultDiscoveryArgs {
fn default() -> Self {
Self {
disable_discovery: false,
disable_dns_discovery: false,
disable_discv4_discovery: false,
disable_discv5_discovery: false,
disable_nat: false,
addr: DEFAULT_DISCOVERY_ADDR,
port: DEFAULT_DISCOVERY_PORT,
discv5_addr: None,
discv5_addr_ipv6: None,
discv5_port: Some(DEFAULT_DISCOVERY_V5_PORT),
discv5_port_ipv6: Some(DEFAULT_DISCOVERY_V5_PORT),
discv5_lookup_interval: DEFAULT_SECONDS_LOOKUP_INTERVAL,
discv5_bootstrap_lookup_interval: DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL,
discv5_bootstrap_lookup_countdown: DEFAULT_COUNT_BOOTSTRAP_LOOKUPS,
}
}
}
/// Arguments to setup discovery
#[derive(Debug, Clone, Args, PartialEq, Eq)]
pub struct DiscoveryArgs {
/// Disable the discovery service.
#[arg(short, long, default_value_if("dev", "true", "true"))]
#[arg(short, long, default_value_if("dev", "true", "true"), default_value_t = DefaultDiscoveryArgs::get_global().disable_discovery)]
pub disable_discovery: bool,
/// Disable the DNS discovery.
#[arg(long, conflicts_with = "disable_discovery")]
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_dns_discovery)]
pub disable_dns_discovery: bool,
/// Disable Discv4 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_discv4_discovery)]
pub disable_discv4_discovery: bool,
/// Enable Discv5 discovery.
@@ -750,57 +899,57 @@ pub struct DiscoveryArgs {
pub enable_discv5_discovery: bool,
/// Disable Discv5 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_discv5_discovery)]
pub disable_discv5_discovery: bool,
/// Disable Nat discovery.
#[arg(long, conflicts_with = "disable_discovery")]
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_nat)]
pub disable_nat: bool,
/// The UDP address to use for devp2p peer discovery version 4.
#[arg(id = "discovery.addr", long = "discovery.addr", value_name = "DISCOVERY_ADDR", default_value_t = DEFAULT_DISCOVERY_ADDR)]
#[arg(id = "discovery.addr", long = "discovery.addr", value_name = "DISCOVERY_ADDR", default_value_t = DefaultDiscoveryArgs::get_global().addr)]
pub addr: IpAddr,
/// The UDP port to use for devp2p peer discovery version 4.
#[arg(id = "discovery.port", long = "discovery.port", value_name = "DISCOVERY_PORT", default_value_t = DEFAULT_DISCOVERY_PORT)]
#[arg(id = "discovery.port", long = "discovery.port", value_name = "DISCOVERY_PORT", default_value_t = DefaultDiscoveryArgs::get_global().port)]
pub port: u16,
/// The UDP IPv4 address to use for devp2p peer discovery version 5. Overwritten by `RLPx`
/// address, if it's also IPv4.
#[arg(id = "discovery.v5.addr", long = "discovery.v5.addr", value_name = "DISCOVERY_V5_ADDR", default_value = None)]
#[arg(id = "discovery.v5.addr", long = "discovery.v5.addr", value_name = "DISCOVERY_V5_ADDR", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_addr.map(|a| OsStr::from(a.to_string()))))]
pub discv5_addr: Option<Ipv4Addr>,
/// The UDP IPv6 address to use for devp2p peer discovery version 5. Overwritten by `RLPx`
/// address, if it's also IPv6.
#[arg(id = "discovery.v5.addr.ipv6", long = "discovery.v5.addr.ipv6", value_name = "DISCOVERY_V5_ADDR_IPV6", default_value = None)]
#[arg(id = "discovery.v5.addr.ipv6", long = "discovery.v5.addr.ipv6", value_name = "DISCOVERY_V5_ADDR_IPV6", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_addr_ipv6.map(|a| OsStr::from(a.to_string()))))]
pub discv5_addr_ipv6: Option<Ipv6Addr>,
/// The UDP IPv4 port to use for devp2p peer discovery version 5. Not used unless `--addr` is
/// IPv4, or `--discovery.v5.addr` is set.
#[arg(id = "discovery.v5.port", long = "discovery.v5.port", value_name = "DISCOVERY_V5_PORT",
default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
pub discv5_port: u16,
#[arg(id = "discovery.v5.port", long = "discovery.v5.port", value_name = "DISCOVERY_V5_PORT", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_port.map(|p| OsStr::from(p.to_string()))))]
pub discv5_port: Option<u16>,
/// The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is
/// IPv6, or `--discovery.addr.ipv6` is set.
#[arg(id = "discovery.v5.port.ipv6", long = "discovery.v5.port.ipv6", value_name = "DISCOVERY_V5_PORT_IPV6",
default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
pub discv5_port_ipv6: u16,
///
/// If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
#[arg(id = "discovery.v5.port.ipv6", long = "discovery.v5.port.ipv6", value_name = "DISCOVERY_V5_PORT_IPV6", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_port_ipv6.map(|p| OsStr::from(p.to_string()))))]
pub discv5_port_ipv6: Option<u16>,
/// The interval in seconds at which to carry out periodic lookup queries, for the whole
/// run of the program.
#[arg(id = "discovery.v5.lookup-interval", long = "discovery.v5.lookup-interval", value_name = "DISCOVERY_V5_LOOKUP_INTERVAL", default_value_t = DEFAULT_SECONDS_LOOKUP_INTERVAL)]
#[arg(id = "discovery.v5.lookup-interval", long = "discovery.v5.lookup-interval", value_name = "DISCOVERY_V5_LOOKUP_INTERVAL", default_value_t = DefaultDiscoveryArgs::get_global().discv5_lookup_interval)]
pub discv5_lookup_interval: u64,
/// The interval in seconds at which to carry out boost lookup queries, for a fixed number of
/// times, at bootstrap.
#[arg(id = "discovery.v5.bootstrap.lookup-interval", long = "discovery.v5.bootstrap.lookup-interval", value_name = "DISCOVERY_V5_BOOTSTRAP_LOOKUP_INTERVAL",
default_value_t = DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL)]
default_value_t = DefaultDiscoveryArgs::get_global().discv5_bootstrap_lookup_interval)]
pub discv5_bootstrap_lookup_interval: u64,
/// The number of times to carry out boost lookup queries at bootstrap.
#[arg(id = "discovery.v5.bootstrap.lookup-countdown", long = "discovery.v5.bootstrap.lookup-countdown", value_name = "DISCOVERY_V5_BOOTSTRAP_LOOKUP_COUNTDOWN",
default_value_t = DEFAULT_COUNT_BOOTSTRAP_LOOKUPS)]
default_value_t = DefaultDiscoveryArgs::get_global().discv5_bootstrap_lookup_countdown)]
pub discv5_bootstrap_lookup_countdown: u64,
}
@@ -850,6 +999,7 @@ impl DiscoveryArgs {
discv5_lookup_interval,
discv5_bootstrap_lookup_interval,
discv5_bootstrap_lookup_countdown,
port,
..
} = self;
@@ -867,8 +1017,9 @@ impl DiscoveryArgs {
let mut discv5_config_builder =
reth_discv5::discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, *discv5_port)),
discv5_addr_ipv6.map(|addr| SocketAddrV6::new(addr, *discv5_port_ipv6, 0, 0)),
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, discv5_port.unwrap_or(*port))),
discv5_addr_ipv6
.map(|addr| SocketAddrV6::new(addr, discv5_port_ipv6.unwrap_or(*port), 0, 0)),
));
if has_discv5_addr_args || self.disable_nat {
@@ -898,14 +1049,14 @@ impl DiscoveryArgs {
/// discovery binds to the sockets.
pub const fn with_unused_discovery_port(mut self) -> Self {
self.port = 0;
self.discv5_port = 0;
self.discv5_port_ipv6 = 0;
self.discv5_port = Some(0);
self.discv5_port_ipv6 = Some(0);
self
}
/// Set the discovery V5 port
pub const fn with_discv5_port(mut self, port: u16) -> Self {
self.discv5_port = port;
pub fn with_discv5_port(mut self, port: impl Into<Option<u16>>) -> Self {
self.discv5_port = port.into();
self
}
@@ -917,29 +1068,45 @@ impl DiscoveryArgs {
pub fn adjust_instance_ports(&mut self, instance: u16) {
debug_assert_ne!(instance, 0, "instance must be non-zero");
self.port += instance - 1;
self.discv5_port += instance - 1;
self.discv5_port_ipv6 += instance - 1;
self.discv5_port = self.discv5_port.map(|port| port + instance - 1);
self.discv5_port_ipv6 = self.discv5_port_ipv6.map(|port| port + instance - 1);
}
}
impl Default for DiscoveryArgs {
fn default() -> Self {
let DefaultDiscoveryArgs {
disable_discovery,
disable_dns_discovery,
disable_discv4_discovery,
disable_discv5_discovery,
disable_nat,
addr,
port,
discv5_addr,
discv5_addr_ipv6,
discv5_port,
discv5_port_ipv6,
discv5_lookup_interval,
discv5_bootstrap_lookup_interval,
discv5_bootstrap_lookup_countdown,
} = *DefaultDiscoveryArgs::get_global();
Self {
disable_discovery: false,
disable_dns_discovery: false,
disable_discv4_discovery: false,
disable_discovery,
disable_dns_discovery,
disable_discv4_discovery,
enable_discv5_discovery: false,
disable_discv5_discovery: false,
disable_nat: false,
addr: DEFAULT_DISCOVERY_ADDR,
port: DEFAULT_DISCOVERY_PORT,
discv5_addr: None,
discv5_addr_ipv6: None,
discv5_port: DEFAULT_DISCOVERY_V5_PORT,
discv5_port_ipv6: DEFAULT_DISCOVERY_V5_PORT,
discv5_lookup_interval: DEFAULT_SECONDS_LOOKUP_INTERVAL,
discv5_bootstrap_lookup_interval: DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL,
discv5_bootstrap_lookup_countdown: DEFAULT_COUNT_BOOTSTRAP_LOOKUPS,
disable_discv5_discovery,
disable_nat,
addr,
port,
discv5_addr,
discv5_addr_ipv6,
discv5_port,
discv5_port_ipv6,
discv5_lookup_interval,
discv5_bootstrap_lookup_interval,
discv5_bootstrap_lookup_countdown,
}
}
}

View File

@@ -4,12 +4,12 @@
//! the consensus client.
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7685::RequestsOrHash,
BlockId, BlockNumberOrTag,
};
use alloy_json_rpc::RpcObject;
use alloy_primitives::{Address, BlockHash, Bytes, B256, U256, U64};
use alloy_primitives::{Address, BlockHash, Bytes, B128, B256, U256, U64};
use alloy_rpc_types_engine::{
ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadBodiesV2, ExecutionPayloadInputV2,
ExecutionPayloadV1, ExecutionPayloadV3, ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated,
@@ -324,6 +324,20 @@ pub trait EngineApi<Engine: EngineTypes> {
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>>;
/// Fetch blob cells for the consensus layer from the blob store.
///
/// Returns a response of the same length as the request. Missing blobs are returned as `null`
/// elements; missing requested cells within an available blob are returned as `null` cell and
/// proof entries.
///
/// Returns `null` if syncing.
#[method(name = "getBlobsV4")]
async fn get_blobs_v4(
&self,
versioned_hashes: Vec<B256>,
indices_bitarray: B128,
) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>>;
}
/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation>

View File

@@ -37,6 +37,7 @@ pub const CAPABILITIES: &[&str] = &[
"engine_getBlobsV1",
"engine_getBlobsV2",
"engine_getBlobsV3",
"engine_getBlobsV4",
];
/// Engine API capabilities set.
@@ -218,6 +219,7 @@ mod tests {
assert!(!is_critical_method("engine_getBlobsV1"));
assert!(!is_critical_method("engine_getBlobsV3"));
assert!(!is_critical_method("engine_getBlobsV4"));
assert!(!is_critical_method("engine_getPayloadBodiesByHashV1"));
assert!(!is_critical_method("engine_getPayloadBodiesByRangeV1"));
assert!(!is_critical_method("engine_getClientVersionV1"));

View File

@@ -3,11 +3,11 @@ use crate::{
};
use alloy_eips::{
eip1898::BlockHashOrNumber,
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip4895::Withdrawals,
eip7685::RequestsOrHash,
};
use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
use alloy_primitives::{BlockHash, BlockNumber, B128, B256, U64};
use alloy_rpc_types_engine::{
CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
@@ -953,6 +953,35 @@ where
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}
fn get_blobs_v4(
&self,
versioned_hashes: Vec<B256>,
indices_bitarray: B128,
) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
let current_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
if !self.inner.chain_spec.is_amsterdam_active_at_timestamp(current_timestamp) {
return Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
));
}
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
}
// Spec requires returning `null` if syncing.
if (*self.inner.is_syncing)() {
return Ok(None)
}
self.inner
.tx_pool
.get_blobs_for_versioned_hashes_v4(&versioned_hashes, indices_bitarray)
.map(Some)
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}
/// Metered version of `get_blobs_v2`.
pub fn get_blobs_v2_metered(
&self,
@@ -1009,6 +1038,28 @@ where
res
}
/// Metered version of `get_blobs_v4`.
pub fn get_blobs_v4_metered(
&self,
versioned_hashes: Vec<B256>,
indices_bitarray: B128,
) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
let hashes_len = versioned_hashes.len();
let start = Instant::now();
let res = Self::get_blobs_v4(self, versioned_hashes, indices_bitarray);
self.inner.metrics.latency.get_blobs_v4.record(start.elapsed());
if let Ok(Some(blobs)) = &res {
let blobs_found = blobs.iter().flatten().count();
let blobs_missed = hashes_len - blobs_found;
self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
}
res
}
}
// This is the concrete ethereum engine API implementation.
@@ -1375,6 +1426,15 @@ where
trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
Ok(self.get_blobs_v3_metered(versioned_hashes)?)
}
async fn get_blobs_v4(
&self,
versioned_hashes: Vec<B256>,
indices_bitarray: B128,
) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
trace!(target: "rpc::engine", "Serving engine_getBlobsV4");
Ok(self.get_blobs_v4_metered(versioned_hashes, indices_bitarray)?)
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
@@ -1660,6 +1720,37 @@ mod tests {
assert_matches!(res, Ok(None));
}
#[tokio::test]
async fn get_blobs_v4_returns_null_when_syncing() {
let chain_spec: Arc<ChainSpec> =
Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service::<EthEngineTypes>();
let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
let api = EngineApi::new(
provider,
chain_spec.clone(),
ConsensusEngineHandle::new(to_engine),
payload_store.into(),
NoopTransactionPool::default(),
Runtime::test(),
ClientVersionV1 {
code: ClientCode::RH,
name: "Reth".to_string(),
version: "v0.0.0-test".to_string(),
commit: "test".to_string(),
},
EngineCapabilities::default(),
EthereumEngineValidator::new(chain_spec),
false,
TestNetworkInfo { syncing: true },
);
let res = api.get_blobs_v4_metered(vec![B256::ZERO], B128::from(1u128));
assert_matches!(res, Ok(None));
}
#[tokio::test]
async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
let (mut handle, api) = setup_engine_api();

View File

@@ -58,6 +58,8 @@ pub(crate) struct EngineApiLatencyMetrics {
pub(crate) get_blobs_v2: Histogram,
/// Latency for `engine_getBlobsV3`
pub(crate) get_blobs_v3: Histogram,
/// Latency for `engine_getBlobsV4`
pub(crate) get_blobs_v4: Histogram,
}
#[derive(Metrics)]

View File

@@ -737,18 +737,21 @@ where
is_multi_block_range &&
all_logs.len() > max_logs_per_response
{
let retry_to_block =
if num_hash.number == from_block { from_block } else { num_hash.number - 1 };
debug!(
target: "rpc::eth::filter",
logs_found = all_logs.len(),
max_logs_per_response,
from_block,
to_block = num_hash.number,
to_block = retry_to_block,
"Query exceeded max logs per response limit"
);
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number,
to_block: retry_to_block,
});
}
}
@@ -1816,6 +1819,87 @@ mod tests {
assert!(result.is_none());
}
#[tokio::test]
async fn test_log_limit_retry_range_excludes_overflow_block() {
let provider = MockEthProvider::default();
use alloy_consensus::TxLegacy;
use reth_db_api::models::StoredBlockBodyIndices;
use reth_ethereum_primitives::{TransactionSigned, TxType};
let tx_inner = TxLegacy {
chain_id: Some(1),
nonce: 0,
gas_price: 21_000,
gas_limit: 21_000,
to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
value: alloy_primitives::U256::ZERO,
input: alloy_primitives::Bytes::new(),
};
let signature = alloy_primitives::Signature::test_signature();
let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
let mock_log = alloy_primitives::Log {
address: alloy_primitives::Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
};
let receipt = reth_ethereum_primitives::Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
logs: vec![mock_log],
success: true,
};
let mut prev_hash = alloy_primitives::B256::default();
for (idx, block_number) in (100u64..=102).enumerate() {
let header = alloy_consensus::Header {
number: block_number,
parent_hash: prev_hash,
logs_bloom: alloy_primitives::Bloom::from([1u8; 256]),
..Default::default()
};
let hash = header.hash_slow();
prev_hash = hash;
let block = reth_ethereum_primitives::Block {
header,
body: reth_ethereum_primitives::BlockBody {
transactions: vec![tx.clone()],
..Default::default()
},
};
provider.add_block(hash, block);
provider.add_receipts(block_number, vec![receipt.clone()]);
provider.add_block_body_indices(
block_number,
StoredBlockBodyIndices { first_tx_num: idx as u64, tx_count: 1 },
);
}
let eth_api = build_test_eth_api(provider);
let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
let err = eth_filter
.inner
.clone()
.get_logs_in_block_range(
Filter::default(),
100,
102,
QueryLimits { max_blocks_per_filter: None, max_logs_per_response: Some(2) },
)
.await
.expect_err("range should exceed max logs");
let EthFilterError::QueryExceedsMaxResults { max_logs, from_block, to_block } = err else {
panic!("unexpected error: {err:?}");
};
assert_eq!(max_logs, 2);
assert_eq!(from_block, 100);
assert_eq!(to_block, 101);
}
#[tokio::test]
async fn test_non_consecutive_headers_after_bloom_filter() {
let provider = MockEthProvider::default();

View File

@@ -9,7 +9,7 @@ use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, StorageSettingsCache,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@@ -269,9 +269,16 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
/// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
/// -> [`StageId::Bodies`]
///
/// This is a legacy storage.v1 backfill step. Storage.v2 writes directly to static files and
/// `RocksDB`, so there is no MDBX -> static-file migration to perform.
///
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
pub fn move_to_static_files(&self) -> RethResult<()> {
if self.provider_factory.cached_storage_settings().is_v2() {
return Ok(())
}
// Copies data from database to static files
let lowest_static_file_height =
self.static_file_producer.lock().copy_to_static_files()?.min_block_num();

View File

@@ -2,7 +2,7 @@
use super::utils::*;
use crate::{
metrics::{DatabaseEnvMetrics, Operation},
metrics::{Operation, TableOperationMetrics},
DatabaseError,
};
use reth_db_api::{
@@ -15,7 +15,7 @@ use reth_db_api::{
};
use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};
/// Read only Cursor.
pub type CursorRO<T> = Cursor<RO, T>;
@@ -29,8 +29,8 @@ pub struct Cursor<K: TransactionKind, T: Table> {
pub(crate) inner: reth_libmdbx::Cursor<K>,
/// Cache buffer that receives compressed values.
buf: Vec<u8>,
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Per-table operation metrics. If `None`, metrics are not recorded.
metrics: Option<TableOperationMetrics>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}
@@ -38,7 +38,7 @@ pub struct Cursor<K: TransactionKind, T: Table> {
impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
metrics: Option<TableOperationMetrics>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
}
@@ -54,7 +54,7 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
f: impl FnOnce(&mut Self) -> R,
) -> R {
if let Some(metrics) = self.metrics.clone() {
metrics.record_operation(T::NAME, operation, value_size, || f(self))
metrics[operation.index()].record(value_size, || f(self))
} else {
f(self)
}

View File

@@ -100,7 +100,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(Cursor::new_with_metrics(
inner,
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
self.metrics_handler.as_ref().map(|h| h.env_metrics.table_operation_metrics(T::NAME)),
))
}

View File

@@ -3,7 +3,7 @@ use metrics::Histogram;
use quanta::Instant;
use reth_metrics::{metrics::Counter, Metrics};
use rustc_hash::FxHashMap;
use std::time::Duration;
use std::{array, sync::Arc, time::Duration};
use strum::{EnumCount, EnumIter, IntoEnumIterator};
const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
@@ -15,8 +15,8 @@ const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
/// Otherwise, metric recording will no-op.
#[derive(Debug)]
pub(crate) struct DatabaseEnvMetrics {
/// Caches `OperationMetrics` handles for each table and operation tuple.
operations: FxHashMap<(&'static str, Operation), OperationMetrics>,
/// Caches per-table operation metric handles for all database operation metrics.
operations: FxHashMap<&'static str, TableOperationMetrics>,
/// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
/// Updated both at tx open and close.
transactions: FxHashMap<TransactionMode, TransactionMetrics>,
@@ -26,6 +26,9 @@ pub(crate) struct DatabaseEnvMetrics {
FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
}
/// Per-table operation metric handles cached for hot cursor paths.
pub(crate) type TableOperationMetrics = Arc<[OperationMetrics; Operation::COUNT]>;
impl DatabaseEnvMetrics {
pub(crate) fn new() -> Self {
// Pre-populate metric handle maps with all possible combinations of labels
@@ -37,24 +40,23 @@ impl DatabaseEnvMetrics {
}
}
/// Generate a map of all possible operation handles for each table and operation tuple.
/// Used for tracking all operation metrics.
fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(
Tables::COUNT * Operation::COUNT,
Default::default(),
);
/// Generate a map of pre-bound operation handles for each table.
fn generate_operation_handles() -> FxHashMap<&'static str, TableOperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(Tables::COUNT, Default::default());
for table in Tables::ALL {
for operation in Operation::iter() {
operations.insert(
(table.name(), operation),
OperationMetrics::new_with_labels(&[
(Labels::Table.as_str(), table.name()),
(Labels::Operation.as_str(), operation.as_str()),
]),
);
}
let table_name = table.name();
let metrics = array::from_fn(|index| {
let operation = Operation::from_index(index);
OperationMetrics::new_with_labels(&[
(Labels::Table.as_str(), table_name),
(Labels::Operation.as_str(), operation.as_str()),
])
});
operations.insert(table_name, Arc::new(metrics));
}
operations
}
@@ -105,13 +107,18 @@ impl DatabaseEnvMetrics {
value_size: Option<usize>,
f: impl FnOnce() -> R,
) -> R {
if let Some(metrics) = self.operations.get(&(table, operation)) {
metrics.record(value_size, f)
if let Some(metrics) = self.operations.get(table) {
metrics[operation.index()].record(value_size, f)
} else {
f()
}
}
/// Returns pre-bound operation metric handles for a single table.
pub(crate) fn table_operation_metrics(&self, table: &'static str) -> TableOperationMetrics {
self.operations.get(table).expect("table operation metric handles not found").clone()
}
/// Record metrics for opening a database transaction.
pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
self.transactions
@@ -219,6 +226,39 @@ pub(crate) enum Operation {
}
impl Operation {
/// Returns the index of the operation in the cached per-table operation array.
pub(crate) const fn index(&self) -> usize {
match self {
Self::Get => 0,
Self::PutUpsert => 1,
Self::PutAppend => 2,
Self::Delete => 3,
Self::CursorUpsert => 4,
Self::CursorInsert => 5,
Self::CursorAppend => 6,
Self::CursorAppendDup => 7,
Self::CursorDeleteCurrent => 8,
Self::CursorDeleteCurrentDuplicates => 9,
}
}
/// Returns the operation for the given index in the cached per-table operation array.
const fn from_index(index: usize) -> Self {
match index {
0 => Self::Get,
1 => Self::PutUpsert,
2 => Self::PutAppend,
3 => Self::Delete,
4 => Self::CursorUpsert,
5 => Self::CursorInsert,
6 => Self::CursorAppend,
7 => Self::CursorAppendDup,
8 => Self::CursorDeleteCurrent,
9 => Self::CursorDeleteCurrentDuplicates,
_ => panic!("invalid operation index"),
}
}
/// Returns the operation as a string.
pub(crate) const fn as_str(&self) -> &'static str {
match self {

View File

@@ -97,6 +97,7 @@ fn generate_bindings(mdbx: &Path, out_file: &Path) {
let bindings = bindgen::Builder::default()
.header(mdbx.join("mdbx.h").to_string_lossy())
.allowlist_var("^(MDBX|mdbx)_.*")
.blocklist_item("MDBX_NOTLS")
.allowlist_type("^(MDBX|mdbx)_.*")
.allowlist_function("^(MDBX|mdbx)_.*")
.size_t_is_usize(true)

View File

@@ -199,7 +199,7 @@ impl EnvironmentFlags {
flags |= ffi::MDBX_LIFORECLAIM;
}
flags |= ffi::MDBX_NOTLS;
flags |= ffi::MDBX_NOSTICKYTHREADS;
flags
}

View File

@@ -3,8 +3,8 @@ use crossbeam_queue::ArrayQueue;
/// Lock-free pool of reset read-only MDBX transaction handles.
///
/// With `MDBX_NOTLS` (which reth always sets), every `mdbx_txn_begin_ex` for a read transaction
/// calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
/// With `MDBX_NOSTICKYTHREADS` (which reth always sets), every `mdbx_txn_begin_ex` for a read
/// transaction calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
/// concurrency (e.g., prewarming), this becomes a contention point.
///
/// This pool caches transaction handles that have been reset via `mdbx_txn_reset`. A reset handle

View File

@@ -2,12 +2,12 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip7594::BlobTransactionSidecarVariant,
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::{BlobCellMask, BlobTransactionSidecarVariant},
eip7840::BlobParams,
merge::EPOCH_SLOTS,
};
use alloy_primitives::{map::B256Set, TxHash, B256};
use alloy_primitives::{map::B256Set, TxHash, B128, B256};
use parking_lot::{Mutex, RwLock};
use schnellru::{ByLength, LruMap};
use std::{fmt, fs, io, path::PathBuf, sync::Arc};
@@ -133,6 +133,74 @@ impl DiskFileBlobStore {
Ok(result)
}
/// Look up EIP-7594 blob cells by their versioned hashes.
fn get_by_versioned_hashes_cells_eip7594(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
let cell_mask = BlobCellMask::new(indices_bitarray);
let mut result = vec![None; versioned_hashes.len()];
let mut missing_count = result.len();
let cached_blob_sidecars = self
.inner
.blob_cache
.lock()
.iter()
.map(|(_, blob_sidecar)| Arc::clone(blob_sidecar))
.collect::<Vec<_>>();
for blob_sidecar in cached_blob_sidecars {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in blob_sidecar
.match_versioned_hashes_cells(versioned_hashes, cell_mask)
.map_err(|err| BlobStoreError::Other(Box::new(err)))?
{
let slot = &mut result[hash_idx];
if slot.is_none() {
missing_count -= 1;
}
*slot = Some(match_result);
}
}
if missing_count == 0 && result.iter().all(Option::is_some) {
return Ok(result)
}
}
let mut missing_tx_hashes = Vec::new();
{
let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
for (idx, _) in
result.iter().enumerate().filter(|(_, cells_and_proofs)| cells_and_proofs.is_none())
{
let versioned_hash = versioned_hashes[idx];
if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
missing_tx_hashes.push(tx_hash);
}
}
}
if !missing_tx_hashes.is_empty() {
let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
for (_, blob_sidecar) in blobs_from_disk {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in blob_sidecar
.match_versioned_hashes_cells(versioned_hashes, cell_mask)
.map_err(|err| BlobStoreError::Other(Box::new(err)))?
{
if result[hash_idx].is_none() {
result[hash_idx] = Some(match_result);
}
}
}
}
}
Ok(result)
}
}
impl BlobStore for DiskFileBlobStore {
@@ -303,6 +371,14 @@ impl BlobStore for DiskFileBlobStore {
self.get_by_versioned_hashes_eip7594(versioned_hashes)
}
fn get_by_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
@@ -925,6 +1001,27 @@ mod tests {
assert_eq!(v3, vec![Some(expected), None]);
}
#[test]
fn disk_get_blobs_v4_returns_requested_cells() {
let (store, _dir) = tmp_store();
let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
store.insert(TxHash::random(), sidecar).unwrap();
let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
let request = vec![versioned_hash, B256::ZERO];
let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
assert_eq!(v4.len(), request.len());
assert!(v4[1].is_none());
let cells_and_proofs = v4[0].as_ref().unwrap();
assert_eq!(cells_and_proofs.blob_cells.len(), 2);
assert_eq!(cells_and_proofs.proofs.len(), 2);
assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
}
#[test]
fn disk_get_blobs_v3_can_fallback_to_disk() {
let (store, _dir) = tmp_store();
@@ -937,6 +1034,20 @@ mod tests {
assert_eq!(v3, vec![Some(expected)]);
}
#[test]
fn disk_get_blobs_v4_can_fallback_to_disk() {
let (store, _dir) = tmp_store();
let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
store.insert(TxHash::random(), sidecar).unwrap();
store.clear_cache();
let v4 = store.get_by_versioned_hashes_v4(&[versioned_hash], B128::from(1u128)).unwrap();
let cells_and_proofs = v4[0].as_ref().unwrap();
assert_eq!(cells_and_proofs.blob_cells.len(), 1);
assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default())]);
}
#[test]
fn disk_double_cleanup_no_failure() {
let (store, _dir) = tmp_store();

View File

@@ -1,9 +1,9 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip7594::BlobTransactionSidecarVariant,
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::{BlobCellMask, BlobTransactionSidecarVariant},
};
use alloy_primitives::{map::B256Map, B256};
use alloy_primitives::{map::B256Map, B128, B256};
use parking_lot::RwLock;
use std::sync::Arc;
@@ -48,6 +48,37 @@ impl InMemoryBlobStore {
}
result
}
/// Look up EIP-7594 blob cells by their versioned hashes.
fn get_by_versioned_hashes_cells_eip7594(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
let cell_mask = BlobCellMask::new(indices_bitarray);
let mut result = vec![None; versioned_hashes.len()];
let mut missing_count = result.len();
let blob_sidecars = self.inner.store.read().values().cloned().collect::<Vec<_>>();
for blob_sidecar in blob_sidecars {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in blob_sidecar
.match_versioned_hashes_cells(versioned_hashes, cell_mask)
.map_err(|err| BlobStoreError::Other(Box::new(err)))?
{
let slot = &mut result[hash_idx];
if slot.is_none() {
missing_count -= 1;
}
*slot = Some(match_result);
}
}
if missing_count == 0 && result.iter().all(Option::is_some) {
break;
}
}
Ok(result)
}
}
#[derive(Debug, Default)]
@@ -186,6 +217,14 @@ impl BlobStore for InMemoryBlobStore {
Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
}
fn get_by_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
@@ -255,4 +294,25 @@ mod tests {
let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
assert_eq!(v3, vec![Some(expected), None]);
}
#[test]
fn mem_get_blobs_v4_returns_requested_cells() {
let store = InMemoryBlobStore::default();
let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
store.insert(B256::random(), sidecar).unwrap();
let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
let request = vec![versioned_hash, B256::ZERO];
let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
assert_eq!(v4.len(), request.len());
assert!(v4[1].is_none());
let cells_and_proofs = v4[0].as_ref().unwrap();
assert_eq!(cells_and_proofs.blob_cells.len(), 2);
assert_eq!(cells_and_proofs.proofs.len(), 2);
assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
}
}

View File

@@ -1,10 +1,10 @@
//! Storage for blob data of EIP4844 transactions.
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::BlobTransactionSidecarVariant,
};
use alloy_primitives::B256;
use alloy_primitives::{B128, B256};
pub use converter::BlobSidecarConverter;
pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
pub use mem::InMemoryBlobStore;
@@ -109,6 +109,17 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
/// Return the [`BlobCellsAndProofsV1`]s for a list of blob versioned hashes and requested cell
/// indices.
///
/// The response is always the same length as the request. Missing or older-version blobs are
/// returned as `None` elements.
fn get_by_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError>;
/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;

View File

@@ -1,9 +1,9 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError};
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::BlobTransactionSidecarVariant,
};
use alloy_primitives::B256;
use alloy_primitives::{B128, B256};
use std::sync::Arc;
/// A blobstore implementation that does nothing
@@ -85,6 +85,14 @@ impl BlobStore for NoopBlobStore {
Ok(vec![None; versioned_hashes.len()])
}
fn get_by_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
_indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
fn data_size_hint(&self) -> Option<usize> {
Some(0)
}

View File

@@ -303,10 +303,10 @@ pub use crate::{
};
use crate::{identifier::TransactionId, pool::PoolInner};
use alloy_eips::{
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::BlobTransactionSidecarVariant,
};
use alloy_primitives::{map::AddressSet, Address, TxHash, B256, U256};
use alloy_primitives::{map::AddressSet, Address, TxHash, B128, B256, U256};
use aquamarine as _;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_eth_wire_types::HandleMempoolData;
@@ -808,6 +808,14 @@ where
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
self.pool.blob_store().get_by_versioned_hashes_v3(versioned_hashes)
}
fn get_blobs_for_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
self.pool.blob_store().get_by_versioned_hashes_v4(versioned_hashes, indices_bitarray)
}
}
impl<V, T, S> TransactionPoolExt for Pool<V, T, S>

View File

@@ -16,10 +16,10 @@ use crate::{
};
use alloy_eips::{
eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M,
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
eip7594::BlobTransactionSidecarVariant,
};
use alloy_primitives::{map::AddressSet, Address, TxHash, B256, U256};
use alloy_primitives::{map::AddressSet, Address, TxHash, B128, B256, U256};
use reth_eth_wire_types::HandleMempoolData;
use reth_primitives_traits::Recovered;
use std::{marker::PhantomData, sync::Arc};
@@ -380,6 +380,14 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
fn get_blobs_for_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
_indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
}
/// A [`TransactionValidator`] that does nothing.

View File

@@ -65,12 +65,13 @@ use alloy_eips::{
eip2718::{Encodable2718, WithEncoded},
eip2930::AccessList,
eip4844::{
env_settings::KzgSettings, BlobAndProofV1, BlobAndProofV2, BlobTransactionValidationError,
env_settings::KzgSettings, BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1,
BlobTransactionValidationError,
},
eip7594::BlobTransactionSidecarVariant,
eip7702::SignedAuthorization,
};
use alloy_primitives::{map::AddressSet, Address, Bytes, TxHash, TxKind, B256, U256};
use alloy_primitives::{map::AddressSet, Address, Bytes, TxHash, TxKind, B128, B256, U256};
use futures_util::{ready, Stream};
use reth_eth_wire_types::HandleMempoolData;
use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
@@ -722,6 +723,17 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
/// Return the [`BlobCellsAndProofsV1`]s for a list of blob versioned hashes and requested cell
/// indices.
///
/// The response is always the same length as the request. Missing or older-version blobs are
/// returned as `None` elements.
fn get_blobs_for_versioned_hashes_v4(
&self,
versioned_hashes: &[B256],
indices_bitarray: B128,
) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError>;
}
/// Extension for [`TransactionPool`] trait that allows to set the current block info.

View File

@@ -115,7 +115,9 @@ Networking:
[default: 9200]
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
[default: 9200]

View File

@@ -55,7 +55,9 @@ Networking:
[default: 9200]
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
[default: 9200]

View File

@@ -55,7 +55,9 @@ Networking:
[default: 9200]
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
[default: 9200]

View File

@@ -208,7 +208,9 @@ Networking:
[default: 9200]
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
[default: 9200]

View File

@@ -22,7 +22,7 @@ export default defineConfig({
},
{ text: 'GitHub', link: 'https://github.com/paradigmxyz/reth' },
{
text: 'v2.1.0',
text: 'v2.2.0',
items: [
{
text: 'Releases',