mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fcfa8287f6 | ||
|
|
d25de30050 | ||
|
|
4ffde69d94 | ||
|
|
077e5eecfe | ||
|
|
709485dcb7 |
@@ -21,6 +21,7 @@ use alloy_rpc_types_engine::{
|
||||
};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use futures::{stream, StreamExt};
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_cli_runner::CliContext;
|
||||
@@ -270,6 +271,15 @@ pub struct Command {
|
||||
/// the flattened BAL on the stored payload.
|
||||
#[arg(long, default_value_t = false)]
|
||||
bal: bool,
|
||||
|
||||
/// Maximum number of in-flight RPC fetches to keep buffered ahead of the merger.
|
||||
///
|
||||
/// Each entry is one full per-block fetch (block + receipts, plus BAL when `--bal` is
|
||||
/// set). Larger values absorb RPC latency at the cost of more concurrent connections
|
||||
/// and memory; the buffer persists across `--num-big-blocks` so prefetching continues
|
||||
/// across big-block boundaries.
|
||||
#[arg(long, value_name = "PREFETCH_BUFFER", default_value_t = 32)]
|
||||
prefetch_buffer: usize,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -322,13 +332,27 @@ impl Command {
|
||||
}
|
||||
let mut prev_big_block_header: Option<PrevBigBlockHeader> = None;
|
||||
|
||||
// Track the next block to fetch across big blocks so they don't overlap.
|
||||
// Persistent prefetch stream: keeps `prefetch_buffer` per-block fetches in flight
|
||||
// ahead of the merger across all big blocks. Each item is a fully materialized
|
||||
// `FetchedBlock` (or `None` once the chain tip is reached on this fetch).
|
||||
let prefetch_buffer = self.prefetch_buffer.max(1);
|
||||
let bal_enabled = self.bal;
|
||||
let block_stream = stream::iter(self.from_block..)
|
||||
.map(|block_number| {
|
||||
let provider = provider.clone();
|
||||
async move { fetch_one_block(provider, block_number, bal_enabled).await }
|
||||
})
|
||||
.buffered(prefetch_buffer);
|
||||
let mut block_stream = Box::pin(block_stream);
|
||||
|
||||
// Track the next block number we expect from the stream (purely for logging /
|
||||
// big-block range bookkeeping; the stream produces blocks in `from_block..` order).
|
||||
let mut next_block = self.from_block;
|
||||
|
||||
for big_block_idx in 0..self.num_big_blocks {
|
||||
let range_start = next_block;
|
||||
|
||||
// Fetch consecutive blocks until the gas target is reached.
|
||||
// Drain the prefetch stream until the gas target is reached for this big block.
|
||||
let mut blocks = Vec::new();
|
||||
let mut block_receipts: Vec<Vec<Receipt>> = Vec::new();
|
||||
let mut block_access_lists: Vec<Option<BlockAccessList>> = Vec::new();
|
||||
@@ -337,16 +361,11 @@ impl Command {
|
||||
let mut reached_chain_tip = false;
|
||||
while accumulated_block_gas < self.target_gas {
|
||||
let block_number = next_block;
|
||||
info!(target: "reth-bench", block_number, big_block = big_block_idx, "Fetching block");
|
||||
info!(target: "reth-bench", block_number, big_block = big_block_idx, "Awaiting prefetched block");
|
||||
|
||||
let fetch_result = tokio::try_join!(
|
||||
provider.get_block_by_number(block_number.into()).full(),
|
||||
provider.get_block_receipts(block_number.into()),
|
||||
);
|
||||
|
||||
let (rpc_block, receipts) = match fetch_result {
|
||||
Ok((Some(block), Some(receipts))) => (block, receipts),
|
||||
Ok((None, _) | (_, None)) => {
|
||||
let fetched = match block_stream.next().await {
|
||||
Some(Ok(Some(fetched))) => fetched,
|
||||
Some(Ok(None)) => {
|
||||
warn!(
|
||||
target: "reth-bench",
|
||||
block_number,
|
||||
@@ -355,52 +374,16 @@ impl Command {
|
||||
reached_chain_tip = true;
|
||||
break;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
Some(Err(e)) => return Err(e),
|
||||
// The block-number stream is open-ended; this only fires if the
|
||||
// upstream `iter(from..)` is somehow exhausted.
|
||||
None => {
|
||||
reached_chain_tip = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let block_access_list = if self.bal {
|
||||
Some(fetch_block_access_list(&provider, block_number).await.wrap_err_with(
|
||||
|| format!("Failed to fetch BAL for block {block_number}"),
|
||||
)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Convert RPC receipts to consensus receipts
|
||||
let consensus_receipts: Vec<Receipt> = receipts
|
||||
.iter()
|
||||
.map(|r| {
|
||||
let inner = &r.inner.inner.inner;
|
||||
let tx_type = r.inner.inner.r#type.try_into().unwrap_or_default();
|
||||
Receipt {
|
||||
tx_type,
|
||||
success: inner.receipt.status.coerce_status(),
|
||||
cumulative_gas_used: inner.receipt.cumulative_gas_used,
|
||||
logs: inner
|
||||
.receipt
|
||||
.logs
|
||||
.iter()
|
||||
.map(|log| alloy_primitives::Log {
|
||||
address: log.inner.address,
|
||||
data: log.inner.data.clone(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Convert to consensus block
|
||||
let block = rpc_block
|
||||
.into_inner()
|
||||
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
|
||||
.try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
|
||||
tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
|
||||
})?
|
||||
.into_consensus();
|
||||
|
||||
// Convert to ExecutionData
|
||||
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
|
||||
let execution_data = ExecutionData { payload, sidecar };
|
||||
let FetchedBlock { execution_data, consensus_receipts, block_access_list } =
|
||||
fetched;
|
||||
|
||||
let block_gas = execution_data.payload.as_v1().gas_used;
|
||||
let block_blob_gas =
|
||||
@@ -674,6 +657,79 @@ impl Command {
|
||||
}
|
||||
}
|
||||
|
||||
/// One fully-materialized block fetched by the prefetcher.
|
||||
struct FetchedBlock {
|
||||
/// Execution payload with sidecar derived from the RPC block.
|
||||
execution_data: ExecutionData,
|
||||
/// Consensus-format receipts (`cumulative_gas_used` is still per-block, callers offset
|
||||
/// it when merging).
|
||||
consensus_receipts: Vec<Receipt>,
|
||||
/// `eth_getBlockAccessListByBlockNumber` result when `--bal` is enabled.
|
||||
block_access_list: Option<BlockAccessList>,
|
||||
}
|
||||
|
||||
/// Fetches one block + receipts (and optionally its BAL) from the RPC. Returns `Ok(None)`
|
||||
/// when the block doesn't exist yet (chain-tip reached).
|
||||
async fn fetch_one_block(
|
||||
provider: RootProvider<AnyNetwork>,
|
||||
block_number: u64,
|
||||
bal_enabled: bool,
|
||||
) -> eyre::Result<Option<FetchedBlock>> {
|
||||
let (rpc_block, receipts) = tokio::try_join!(
|
||||
provider.get_block_by_number(block_number.into()).full(),
|
||||
provider.get_block_receipts(block_number.into()),
|
||||
)?;
|
||||
let (rpc_block, receipts) = match (rpc_block, receipts) {
|
||||
(Some(b), Some(r)) => (b, r),
|
||||
_ => return Ok(None),
|
||||
};
|
||||
|
||||
let block_access_list = if bal_enabled {
|
||||
Some(
|
||||
fetch_block_access_list(&provider, block_number)
|
||||
.await
|
||||
.wrap_err_with(|| format!("Failed to fetch BAL for block {block_number}"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let consensus_receipts: Vec<Receipt> = receipts
|
||||
.iter()
|
||||
.map(|r| {
|
||||
let inner = &r.inner.inner.inner;
|
||||
let tx_type = r.inner.inner.r#type.try_into().unwrap_or_default();
|
||||
Receipt {
|
||||
tx_type,
|
||||
success: inner.receipt.status.coerce_status(),
|
||||
cumulative_gas_used: inner.receipt.cumulative_gas_used,
|
||||
logs: inner
|
||||
.receipt
|
||||
.logs
|
||||
.iter()
|
||||
.map(|log| alloy_primitives::Log {
|
||||
address: log.inner.address,
|
||||
data: log.inner.data.clone(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let block = rpc_block
|
||||
.into_inner()
|
||||
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
|
||||
.try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
|
||||
tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
|
||||
})?
|
||||
.into_consensus();
|
||||
|
||||
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
|
||||
let execution_data = ExecutionData { payload, sidecar };
|
||||
|
||||
Ok(Some(FetchedBlock { execution_data, consensus_receipts, block_access_list }))
|
||||
}
|
||||
|
||||
fn merge_block_access_list(
|
||||
merged: &mut BlockAccessList,
|
||||
incoming: BlockAccessList,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use futures_util::StreamExt;
|
||||
use reth_node_api::{BlockBody, PayloadAttributes, PayloadKind};
|
||||
use reth_node_api::{PayloadAttributes, PayloadKind};
|
||||
use reth_payload_builder::{PayloadBuilderHandle, PayloadId};
|
||||
use reth_payload_builder_primitives::Events;
|
||||
use reth_payload_primitives::{BuiltPayload, PayloadTypes};
|
||||
use reth_payload_primitives::PayloadTypes;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
/// Helper for payload operations
|
||||
@@ -53,27 +53,11 @@ impl<T: PayloadTypes> PayloadTestContext<T> {
|
||||
///
|
||||
/// Panics if the payload builder does not produce a non-empty payload within 30 seconds.
|
||||
pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
|
||||
let start = std::time::Instant::now();
|
||||
loop {
|
||||
let payload =
|
||||
self.payload_builder.best_payload(payload_id).await.transpose().ok().flatten();
|
||||
if payload.is_none_or(|p| p.block().body().transactions().is_empty()) {
|
||||
assert!(
|
||||
start.elapsed() < std::time::Duration::from_secs(30),
|
||||
"timed out waiting for a non-empty payload for {payload_id} — \
|
||||
check that the chain spec supports all generated tx types"
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
continue
|
||||
}
|
||||
// Resolve payload once its built
|
||||
self.payload_builder
|
||||
.resolve_kind(payload_id, PayloadKind::Earliest)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
break;
|
||||
}
|
||||
self.payload_builder
|
||||
.resolve_kind(payload_id, PayloadKind::WaitForPending)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Expects the next event to be a built payload event or panics
|
||||
|
||||
@@ -21,7 +21,8 @@ impl ForkchoiceStateTracker {
|
||||
/// `sync_target` to `None`, since we're now fully synced.
|
||||
pub const fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
|
||||
if status.is_valid() {
|
||||
self.set_valid(state);
|
||||
self.last_syncing = None;
|
||||
self.last_valid = Some(state);
|
||||
} else if status.is_syncing() {
|
||||
self.last_syncing = Some(state);
|
||||
}
|
||||
@@ -30,11 +31,24 @@ impl ForkchoiceStateTracker {
|
||||
self.latest = Some(received);
|
||||
}
|
||||
|
||||
const fn set_valid(&mut self, state: ForkchoiceState) {
|
||||
// we no longer need to sync to this state.
|
||||
/// Promotes a previously tracked syncing forkchoice state to valid, without overwriting a
|
||||
/// newer `latest` state.
|
||||
///
|
||||
/// This is used when a `Syncing` FCU's head finally becomes canonical via the downloaded-block
|
||||
/// flow, so the safe/finalized anchors of that FCU can be applied. Unlike
|
||||
/// [`Self::set_latest`], this preserves a newer `latest` (e.g. an `Invalid` FCU received
|
||||
/// after the syncing one) and only flips `latest` to `Valid` when it still refers to the same
|
||||
/// syncing FCU being promoted.
|
||||
pub fn promote_sync_target_to_valid(&mut self, state: ForkchoiceState) {
|
||||
self.last_syncing = None;
|
||||
|
||||
self.last_valid = Some(state);
|
||||
|
||||
if let Some(received) = self.latest.as_mut() &&
|
||||
received.state == state &&
|
||||
received.status.is_syncing()
|
||||
{
|
||||
received.status = ForkchoiceStatus::Valid;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [`ForkchoiceStatus`] of the latest received FCU.
|
||||
|
||||
@@ -1917,9 +1917,37 @@ where
|
||||
self.on_canonical_chain_update(chain_update);
|
||||
}
|
||||
|
||||
self.on_canonicalized_sync_target(target);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Applies the tracked forkchoice state once its sync target head becomes canonical.
|
||||
fn on_canonicalized_sync_target(&mut self, target: B256) {
|
||||
let Some(sync_target_state) = self
|
||||
.state
|
||||
.forkchoice_state_tracker
|
||||
.sync_target_state()
|
||||
.filter(|state| state.head_block_hash == target)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Err(outcome) = self.ensure_consistent_forkchoice_state(sync_target_state) {
|
||||
debug!(
|
||||
target: "engine::tree",
|
||||
head = %sync_target_state.head_block_hash,
|
||||
safe = %sync_target_state.safe_block_hash,
|
||||
finalized = %sync_target_state.finalized_block_hash,
|
||||
?outcome,
|
||||
"Canonicalized sync target head before safe/finalized could be applied"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
self.state.forkchoice_state_tracker.promote_sync_target_to_valid(sync_target_state);
|
||||
}
|
||||
|
||||
/// Convenience function to handle an optional tree event.
|
||||
fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
|
||||
if let Some(event) = event {
|
||||
|
||||
@@ -2254,3 +2254,65 @@ fn test_on_valid_downloaded_head_sync_target_returns_make_canonical() {
|
||||
other => panic!("Expected MakeCanonical for head block, got: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests that canonicalizing a downloaded sync target head also applies the tracked finalized
|
||||
/// block from the original `SYNCING` forkchoice state.
|
||||
#[test]
|
||||
fn test_canonicalizing_downloaded_sync_target_head_updates_finalized() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = MAINNET.clone();
|
||||
let mut test_harness = TestHarness::new(chain_spec);
|
||||
|
||||
let blocks: Vec<_> = test_harness.block_builder.get_executed_blocks(0..3).collect();
|
||||
let genesis = &blocks[0];
|
||||
let finalized_block = &blocks[1];
|
||||
let head_block = &blocks[2];
|
||||
|
||||
test_harness = test_harness.with_blocks(vec![
|
||||
genesis.clone(),
|
||||
finalized_block.clone(),
|
||||
head_block.clone(),
|
||||
]);
|
||||
|
||||
let finalized_num_hash = finalized_block.recovered_block().num_hash();
|
||||
let head_num_hash = head_block.recovered_block().num_hash();
|
||||
|
||||
test_harness.tree.state.tree_state.set_canonical_head(genesis.recovered_block().num_hash());
|
||||
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: head_num_hash.hash,
|
||||
safe_block_hash: head_num_hash.hash,
|
||||
finalized_block_hash: finalized_num_hash.hash,
|
||||
};
|
||||
test_harness
|
||||
.tree
|
||||
.state
|
||||
.forkchoice_state_tracker
|
||||
.set_latest(fcu_state, ForkchoiceStatus::Syncing);
|
||||
|
||||
let event = test_harness
|
||||
.tree
|
||||
.on_valid_downloaded_block(head_num_hash)
|
||||
.unwrap()
|
||||
.expect("expected canonicalization event for sync target head");
|
||||
|
||||
test_harness.tree.on_tree_event(event).unwrap();
|
||||
|
||||
assert_eq!(test_harness.tree.state.tree_state.canonical_block_hash(), head_num_hash.hash);
|
||||
assert_eq!(
|
||||
test_harness.tree.canonical_in_memory_state.get_finalized_num_hash(),
|
||||
Some(finalized_num_hash),
|
||||
"Finalized block from the syncing FCU should be applied once the head becomes canonical"
|
||||
);
|
||||
assert_eq!(
|
||||
test_harness.tree.canonical_in_memory_state.get_safe_num_hash(),
|
||||
Some(head_num_hash),
|
||||
"Safe block from the syncing FCU should be applied once the head becomes canonical"
|
||||
);
|
||||
assert_eq!(
|
||||
test_harness.tree.state.forkchoice_state_tracker.last_valid_state(),
|
||||
Some(fcu_state)
|
||||
);
|
||||
assert!(test_harness.tree.state.forkchoice_state_tracker.sync_target_state().is_none());
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
/// NetworkArg struct for configuring the network
|
||||
mod network;
|
||||
pub use network::{DefaultNetworkArgs, DiscoveryArgs, NetworkArgs};
|
||||
pub use network::{DefaultDiscoveryArgs, DefaultNetworkArgs, DiscoveryArgs, NetworkArgs};
|
||||
|
||||
/// RpcServerArg struct for configuring the RPC
|
||||
mod rpc_server;
|
||||
|
||||
@@ -11,7 +11,10 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::version::version_metadata;
|
||||
use clap::Args;
|
||||
use clap::{
|
||||
builder::{OsStr, Resettable},
|
||||
Args,
|
||||
};
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_cli_util::{get_secret_key, load_secret_key::SecretKeyError};
|
||||
use reth_config::Config;
|
||||
@@ -569,15 +572,8 @@ impl NetworkArgs {
|
||||
let rlpx_socket = (addr, self.port).into();
|
||||
self.discovery.apply_to_builder(builder, rlpx_socket, chain_bootnodes)
|
||||
})
|
||||
.listener_addr(SocketAddr::new(
|
||||
addr, // set discovery port based on instance number
|
||||
self.port,
|
||||
))
|
||||
.discovery_addr(SocketAddr::new(
|
||||
self.discovery.addr,
|
||||
// set discovery port based on instance number
|
||||
self.discovery.port,
|
||||
))
|
||||
.listener_addr(SocketAddr::new(addr, self.port))
|
||||
.discovery_addr(SocketAddr::new(self.discovery.addr, self.discovery.port))
|
||||
.disable_tx_gossip(self.disable_tx_gossip)
|
||||
.required_block_hashes(self.required_block_hashes.clone())
|
||||
.eth_max_message_size_opt(self.eth_max_message_size.map(NonZeroUsize::get))
|
||||
@@ -727,19 +723,172 @@ impl Default for NetworkArgs {
|
||||
}
|
||||
}
|
||||
|
||||
/// Global static discovery defaults
|
||||
static DISCOVERY_DEFAULTS: OnceLock<DefaultDiscoveryArgs> = OnceLock::new();
|
||||
|
||||
/// Default values for discovery CLI arguments that can be customized.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct DefaultDiscoveryArgs {
|
||||
/// Default for `--disable-discovery`.
|
||||
pub disable_discovery: bool,
|
||||
/// Default for `--disable-dns-discovery`.
|
||||
pub disable_dns_discovery: bool,
|
||||
/// Default for `--disable-discv4-discovery`.
|
||||
pub disable_discv4_discovery: bool,
|
||||
/// Default for `--disable-discv5-discovery`.
|
||||
pub disable_discv5_discovery: bool,
|
||||
/// Default for `--disable-nat`.
|
||||
pub disable_nat: bool,
|
||||
/// Default UDP address for devp2p discovery v4.
|
||||
pub addr: IpAddr,
|
||||
/// Default UDP port for devp2p discovery v4.
|
||||
pub port: u16,
|
||||
/// Default UDP IPv4 address for devp2p discovery v5.
|
||||
pub discv5_addr: Option<Ipv4Addr>,
|
||||
/// Default UDP IPv6 address for devp2p discovery v5.
|
||||
pub discv5_addr_ipv6: Option<Ipv6Addr>,
|
||||
/// Default UDP IPv4 port for devp2p discovery v5.
|
||||
pub discv5_port: Option<u16>,
|
||||
/// Default UDP IPv6 port for devp2p discovery v5.
|
||||
pub discv5_port_ipv6: Option<u16>,
|
||||
/// Default discv5 periodic lookup interval (seconds).
|
||||
pub discv5_lookup_interval: u64,
|
||||
/// Default discv5 bootstrap lookup interval (seconds).
|
||||
pub discv5_bootstrap_lookup_interval: u64,
|
||||
/// Default discv5 bootstrap lookup countdown.
|
||||
pub discv5_bootstrap_lookup_countdown: u64,
|
||||
}
|
||||
|
||||
impl DefaultDiscoveryArgs {
|
||||
/// Initialize the global discovery defaults with this configuration.
|
||||
pub fn try_init(self) -> Result<(), Self> {
|
||||
DISCOVERY_DEFAULTS.set(self)
|
||||
}
|
||||
|
||||
/// Get a reference to the global discovery defaults.
|
||||
pub fn get_global() -> &'static Self {
|
||||
DISCOVERY_DEFAULTS.get_or_init(Self::default)
|
||||
}
|
||||
|
||||
/// Set the default for `--disable-discovery`.
|
||||
pub const fn with_disable_discovery(mut self, disable: bool) -> Self {
|
||||
self.disable_discovery = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default for `--disable-dns-discovery`.
|
||||
pub const fn with_disable_dns_discovery(mut self, disable: bool) -> Self {
|
||||
self.disable_dns_discovery = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default for `--disable-discv4-discovery`.
|
||||
pub const fn with_disable_discv4_discovery(mut self, disable: bool) -> Self {
|
||||
self.disable_discv4_discovery = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default for `--disable-discv5-discovery`.
|
||||
pub const fn with_disable_discv5_discovery(mut self, disable: bool) -> Self {
|
||||
self.disable_discv5_discovery = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default for `--disable-nat`.
|
||||
pub const fn with_disable_nat(mut self, disable: bool) -> Self {
|
||||
self.disable_nat = disable;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery v4 address.
|
||||
pub const fn with_addr(mut self, addr: IpAddr) -> Self {
|
||||
self.addr = addr;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery v4 port.
|
||||
pub const fn with_port(mut self, port: u16) -> Self {
|
||||
self.port = port;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery v5 IPv4 address.
|
||||
pub fn with_discv5_addr(mut self, addr: impl Into<Option<Ipv4Addr>>) -> Self {
|
||||
self.discv5_addr = addr.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery v5 IPv6 address.
|
||||
pub fn with_discv5_addr_ipv6(mut self, addr: impl Into<Option<Ipv6Addr>>) -> Self {
|
||||
self.discv5_addr_ipv6 = addr.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery V5 port.
|
||||
pub fn with_discv5_port(mut self, port: impl Into<Option<u16>>) -> Self {
|
||||
self.discv5_port = port.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discovery v5 IPv6 port.
|
||||
pub fn with_discv5_port_ipv6(mut self, port: impl Into<Option<u16>>) -> Self {
|
||||
self.discv5_port_ipv6 = port.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discv5 periodic lookup interval (seconds).
|
||||
pub const fn with_discv5_lookup_interval(mut self, interval: u64) -> Self {
|
||||
self.discv5_lookup_interval = interval;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discv5 bootstrap lookup interval (seconds).
|
||||
pub const fn with_discv5_bootstrap_lookup_interval(mut self, interval: u64) -> Self {
|
||||
self.discv5_bootstrap_lookup_interval = interval;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default discv5 bootstrap lookup countdown.
|
||||
pub const fn with_discv5_bootstrap_lookup_countdown(mut self, countdown: u64) -> Self {
|
||||
self.discv5_bootstrap_lookup_countdown = countdown;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultDiscoveryArgs {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
disable_discovery: false,
|
||||
disable_dns_discovery: false,
|
||||
disable_discv4_discovery: false,
|
||||
disable_discv5_discovery: false,
|
||||
disable_nat: false,
|
||||
addr: DEFAULT_DISCOVERY_ADDR,
|
||||
port: DEFAULT_DISCOVERY_PORT,
|
||||
discv5_addr: None,
|
||||
discv5_addr_ipv6: None,
|
||||
discv5_port: Some(DEFAULT_DISCOVERY_V5_PORT),
|
||||
discv5_port_ipv6: Some(DEFAULT_DISCOVERY_V5_PORT),
|
||||
discv5_lookup_interval: DEFAULT_SECONDS_LOOKUP_INTERVAL,
|
||||
discv5_bootstrap_lookup_interval: DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL,
|
||||
discv5_bootstrap_lookup_countdown: DEFAULT_COUNT_BOOTSTRAP_LOOKUPS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Arguments to setup discovery
|
||||
#[derive(Debug, Clone, Args, PartialEq, Eq)]
|
||||
pub struct DiscoveryArgs {
|
||||
/// Disable the discovery service.
|
||||
#[arg(short, long, default_value_if("dev", "true", "true"))]
|
||||
#[arg(short, long, default_value_if("dev", "true", "true"), default_value_t = DefaultDiscoveryArgs::get_global().disable_discovery)]
|
||||
pub disable_discovery: bool,
|
||||
|
||||
/// Disable the DNS discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_dns_discovery)]
|
||||
pub disable_dns_discovery: bool,
|
||||
|
||||
/// Disable Discv4 discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_discv4_discovery)]
|
||||
pub disable_discv4_discovery: bool,
|
||||
|
||||
/// Enable Discv5 discovery.
|
||||
@@ -750,57 +899,57 @@ pub struct DiscoveryArgs {
|
||||
pub enable_discv5_discovery: bool,
|
||||
|
||||
/// Disable Discv5 discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_discv5_discovery)]
|
||||
pub disable_discv5_discovery: bool,
|
||||
|
||||
/// Disable Nat discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
#[arg(long, conflicts_with = "disable_discovery", default_value_t = DefaultDiscoveryArgs::get_global().disable_nat)]
|
||||
pub disable_nat: bool,
|
||||
|
||||
/// The UDP address to use for devp2p peer discovery version 4.
|
||||
#[arg(id = "discovery.addr", long = "discovery.addr", value_name = "DISCOVERY_ADDR", default_value_t = DEFAULT_DISCOVERY_ADDR)]
|
||||
#[arg(id = "discovery.addr", long = "discovery.addr", value_name = "DISCOVERY_ADDR", default_value_t = DefaultDiscoveryArgs::get_global().addr)]
|
||||
pub addr: IpAddr,
|
||||
|
||||
/// The UDP port to use for devp2p peer discovery version 4.
|
||||
#[arg(id = "discovery.port", long = "discovery.port", value_name = "DISCOVERY_PORT", default_value_t = DEFAULT_DISCOVERY_PORT)]
|
||||
#[arg(id = "discovery.port", long = "discovery.port", value_name = "DISCOVERY_PORT", default_value_t = DefaultDiscoveryArgs::get_global().port)]
|
||||
pub port: u16,
|
||||
|
||||
/// The UDP IPv4 address to use for devp2p peer discovery version 5. Overwritten by `RLPx`
|
||||
/// address, if it's also IPv4.
|
||||
#[arg(id = "discovery.v5.addr", long = "discovery.v5.addr", value_name = "DISCOVERY_V5_ADDR", default_value = None)]
|
||||
#[arg(id = "discovery.v5.addr", long = "discovery.v5.addr", value_name = "DISCOVERY_V5_ADDR", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_addr.map(|a| OsStr::from(a.to_string()))))]
|
||||
pub discv5_addr: Option<Ipv4Addr>,
|
||||
|
||||
/// The UDP IPv6 address to use for devp2p peer discovery version 5. Overwritten by `RLPx`
|
||||
/// address, if it's also IPv6.
|
||||
#[arg(id = "discovery.v5.addr.ipv6", long = "discovery.v5.addr.ipv6", value_name = "DISCOVERY_V5_ADDR_IPV6", default_value = None)]
|
||||
#[arg(id = "discovery.v5.addr.ipv6", long = "discovery.v5.addr.ipv6", value_name = "DISCOVERY_V5_ADDR_IPV6", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_addr_ipv6.map(|a| OsStr::from(a.to_string()))))]
|
||||
pub discv5_addr_ipv6: Option<Ipv6Addr>,
|
||||
|
||||
/// The UDP IPv4 port to use for devp2p peer discovery version 5. Not used unless `--addr` is
|
||||
/// IPv4, or `--discovery.v5.addr` is set.
|
||||
#[arg(id = "discovery.v5.port", long = "discovery.v5.port", value_name = "DISCOVERY_V5_PORT",
|
||||
default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
|
||||
pub discv5_port: u16,
|
||||
#[arg(id = "discovery.v5.port", long = "discovery.v5.port", value_name = "DISCOVERY_V5_PORT", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_port.map(|p| OsStr::from(p.to_string()))))]
|
||||
pub discv5_port: Option<u16>,
|
||||
|
||||
/// The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is
|
||||
/// IPv6, or `--discovery.addr.ipv6` is set.
|
||||
#[arg(id = "discovery.v5.port.ipv6", long = "discovery.v5.port.ipv6", value_name = "DISCOVERY_V5_PORT_IPV6",
|
||||
default_value_t = DEFAULT_DISCOVERY_V5_PORT)]
|
||||
pub discv5_port_ipv6: u16,
|
||||
///
|
||||
/// If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
|
||||
#[arg(id = "discovery.v5.port.ipv6", long = "discovery.v5.port.ipv6", value_name = "DISCOVERY_V5_PORT_IPV6", default_value = Resettable::from(DefaultDiscoveryArgs::get_global().discv5_port_ipv6.map(|p| OsStr::from(p.to_string()))))]
|
||||
pub discv5_port_ipv6: Option<u16>,
|
||||
|
||||
/// The interval in seconds at which to carry out periodic lookup queries, for the whole
|
||||
/// run of the program.
|
||||
#[arg(id = "discovery.v5.lookup-interval", long = "discovery.v5.lookup-interval", value_name = "DISCOVERY_V5_LOOKUP_INTERVAL", default_value_t = DEFAULT_SECONDS_LOOKUP_INTERVAL)]
|
||||
#[arg(id = "discovery.v5.lookup-interval", long = "discovery.v5.lookup-interval", value_name = "DISCOVERY_V5_LOOKUP_INTERVAL", default_value_t = DefaultDiscoveryArgs::get_global().discv5_lookup_interval)]
|
||||
pub discv5_lookup_interval: u64,
|
||||
|
||||
/// The interval in seconds at which to carry out boost lookup queries, for a fixed number of
|
||||
/// times, at bootstrap.
|
||||
#[arg(id = "discovery.v5.bootstrap.lookup-interval", long = "discovery.v5.bootstrap.lookup-interval", value_name = "DISCOVERY_V5_BOOTSTRAP_LOOKUP_INTERVAL",
|
||||
default_value_t = DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL)]
|
||||
default_value_t = DefaultDiscoveryArgs::get_global().discv5_bootstrap_lookup_interval)]
|
||||
pub discv5_bootstrap_lookup_interval: u64,
|
||||
|
||||
/// The number of times to carry out boost lookup queries at bootstrap.
|
||||
#[arg(id = "discovery.v5.bootstrap.lookup-countdown", long = "discovery.v5.bootstrap.lookup-countdown", value_name = "DISCOVERY_V5_BOOTSTRAP_LOOKUP_COUNTDOWN",
|
||||
default_value_t = DEFAULT_COUNT_BOOTSTRAP_LOOKUPS)]
|
||||
default_value_t = DefaultDiscoveryArgs::get_global().discv5_bootstrap_lookup_countdown)]
|
||||
pub discv5_bootstrap_lookup_countdown: u64,
|
||||
}
|
||||
|
||||
@@ -850,6 +999,7 @@ impl DiscoveryArgs {
|
||||
discv5_lookup_interval,
|
||||
discv5_bootstrap_lookup_interval,
|
||||
discv5_bootstrap_lookup_countdown,
|
||||
port,
|
||||
..
|
||||
} = self;
|
||||
|
||||
@@ -867,8 +1017,9 @@ impl DiscoveryArgs {
|
||||
|
||||
let mut discv5_config_builder =
|
||||
reth_discv5::discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
|
||||
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, *discv5_port)),
|
||||
discv5_addr_ipv6.map(|addr| SocketAddrV6::new(addr, *discv5_port_ipv6, 0, 0)),
|
||||
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, discv5_port.unwrap_or(*port))),
|
||||
discv5_addr_ipv6
|
||||
.map(|addr| SocketAddrV6::new(addr, discv5_port_ipv6.unwrap_or(*port), 0, 0)),
|
||||
));
|
||||
|
||||
if has_discv5_addr_args || self.disable_nat {
|
||||
@@ -898,14 +1049,14 @@ impl DiscoveryArgs {
|
||||
/// discovery binds to the sockets.
|
||||
pub const fn with_unused_discovery_port(mut self) -> Self {
|
||||
self.port = 0;
|
||||
self.discv5_port = 0;
|
||||
self.discv5_port_ipv6 = 0;
|
||||
self.discv5_port = Some(0);
|
||||
self.discv5_port_ipv6 = Some(0);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the discovery V5 port
|
||||
pub const fn with_discv5_port(mut self, port: u16) -> Self {
|
||||
self.discv5_port = port;
|
||||
pub fn with_discv5_port(mut self, port: impl Into<Option<u16>>) -> Self {
|
||||
self.discv5_port = port.into();
|
||||
self
|
||||
}
|
||||
|
||||
@@ -917,29 +1068,45 @@ impl DiscoveryArgs {
|
||||
pub fn adjust_instance_ports(&mut self, instance: u16) {
|
||||
debug_assert_ne!(instance, 0, "instance must be non-zero");
|
||||
self.port += instance - 1;
|
||||
self.discv5_port += instance - 1;
|
||||
self.discv5_port_ipv6 += instance - 1;
|
||||
self.discv5_port = self.discv5_port.map(|port| port + instance - 1);
|
||||
self.discv5_port_ipv6 = self.discv5_port_ipv6.map(|port| port + instance - 1);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DiscoveryArgs {
|
||||
fn default() -> Self {
|
||||
let DefaultDiscoveryArgs {
|
||||
disable_discovery,
|
||||
disable_dns_discovery,
|
||||
disable_discv4_discovery,
|
||||
disable_discv5_discovery,
|
||||
disable_nat,
|
||||
addr,
|
||||
port,
|
||||
discv5_addr,
|
||||
discv5_addr_ipv6,
|
||||
discv5_port,
|
||||
discv5_port_ipv6,
|
||||
discv5_lookup_interval,
|
||||
discv5_bootstrap_lookup_interval,
|
||||
discv5_bootstrap_lookup_countdown,
|
||||
} = *DefaultDiscoveryArgs::get_global();
|
||||
Self {
|
||||
disable_discovery: false,
|
||||
disable_dns_discovery: false,
|
||||
disable_discv4_discovery: false,
|
||||
disable_discovery,
|
||||
disable_dns_discovery,
|
||||
disable_discv4_discovery,
|
||||
enable_discv5_discovery: false,
|
||||
disable_discv5_discovery: false,
|
||||
disable_nat: false,
|
||||
addr: DEFAULT_DISCOVERY_ADDR,
|
||||
port: DEFAULT_DISCOVERY_PORT,
|
||||
discv5_addr: None,
|
||||
discv5_addr_ipv6: None,
|
||||
discv5_port: DEFAULT_DISCOVERY_V5_PORT,
|
||||
discv5_port_ipv6: DEFAULT_DISCOVERY_V5_PORT,
|
||||
discv5_lookup_interval: DEFAULT_SECONDS_LOOKUP_INTERVAL,
|
||||
discv5_bootstrap_lookup_interval: DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL,
|
||||
discv5_bootstrap_lookup_countdown: DEFAULT_COUNT_BOOTSTRAP_LOOKUPS,
|
||||
disable_discv5_discovery,
|
||||
disable_nat,
|
||||
addr,
|
||||
port,
|
||||
discv5_addr,
|
||||
discv5_addr_ipv6,
|
||||
discv5_port,
|
||||
discv5_port_ipv6,
|
||||
discv5_lookup_interval,
|
||||
discv5_bootstrap_lookup_interval,
|
||||
discv5_bootstrap_lookup_countdown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ fn generate_bindings(mdbx: &Path, out_file: &Path) {
|
||||
let bindings = bindgen::Builder::default()
|
||||
.header(mdbx.join("mdbx.h").to_string_lossy())
|
||||
.allowlist_var("^(MDBX|mdbx)_.*")
|
||||
.blocklist_item("MDBX_NOTLS")
|
||||
.allowlist_type("^(MDBX|mdbx)_.*")
|
||||
.allowlist_function("^(MDBX|mdbx)_.*")
|
||||
.size_t_is_usize(true)
|
||||
|
||||
@@ -199,7 +199,7 @@ impl EnvironmentFlags {
|
||||
flags |= ffi::MDBX_LIFORECLAIM;
|
||||
}
|
||||
|
||||
flags |= ffi::MDBX_NOTLS;
|
||||
flags |= ffi::MDBX_NOSTICKYTHREADS;
|
||||
|
||||
flags
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@ use crossbeam_queue::ArrayQueue;
|
||||
|
||||
/// Lock-free pool of reset read-only MDBX transaction handles.
|
||||
///
|
||||
/// With `MDBX_NOTLS` (which reth always sets), every `mdbx_txn_begin_ex` for a read transaction
|
||||
/// calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
|
||||
/// With `MDBX_NOSTICKYTHREADS` (which reth always sets), every `mdbx_txn_begin_ex` for a read
|
||||
/// transaction calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
|
||||
/// concurrency (e.g., prewarming), this becomes a contention point.
|
||||
///
|
||||
/// This pool caches transaction handles that have been reset via `mdbx_txn_reset`. A reset handle
|
||||
|
||||
@@ -115,7 +115,9 @@ Networking:
|
||||
[default: 9200]
|
||||
|
||||
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
|
||||
|
||||
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
|
||||
|
||||
[default: 9200]
|
||||
|
||||
|
||||
@@ -55,7 +55,9 @@ Networking:
|
||||
[default: 9200]
|
||||
|
||||
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
|
||||
|
||||
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
|
||||
|
||||
[default: 9200]
|
||||
|
||||
|
||||
@@ -55,7 +55,9 @@ Networking:
|
||||
[default: 9200]
|
||||
|
||||
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
|
||||
|
||||
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
|
||||
|
||||
[default: 9200]
|
||||
|
||||
|
||||
@@ -208,7 +208,9 @@ Networking:
|
||||
[default: 9200]
|
||||
|
||||
--discovery.v5.port.ipv6 <DISCOVERY_V5_PORT_IPV6>
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set
|
||||
The UDP IPv6 port to use for devp2p peer discovery version 5. Not used unless `--addr` is IPv6, or `--discovery.addr.ipv6` is set.
|
||||
|
||||
If not provided, discovery V5 defaults to same port as discovery V4 (--discovery.port).
|
||||
|
||||
[default: 9200]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user