diff --git a/.config/zepter.yaml b/.config/zepter.yaml index 62b7e768c1..dcc18676c4 100644 --- a/.config/zepter.yaml +++ b/.config/zepter.yaml @@ -12,7 +12,7 @@ workflows: # Check that `A` activates the features of `B`. "propagate-feature", # These are the features to check: - "--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,js-tracer,portable", + "--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,js-tracer,portable,keccak-cache-global", # Do not try to add a new section to `[features]` of `A` only because `B` exposes that feature. There are edge-cases where this is still needed, but we can add them manually. "--left-side-feature-missing=ignore", # Ignore the case that `A` it outside of the workspace. Otherwise it will report errors in external dependencies that we have no influence on. diff --git a/Cargo.lock b/Cargo.lock index b57e3ad716..ee5a047b2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8941,6 +8941,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "rand 0.9.2", + "rayon", "reth-chainspec", "reth-consensus", "reth-discv4", diff --git a/Cargo.toml b/Cargo.toml index be84f66111..e845e4a944 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -485,15 +485,15 @@ revm-inspectors = "0.33.2" # eth -alloy-primitives = { version = "1.4.1", default-features = false, features = ["map-foldhash"] } +alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] } alloy-chains = { version = "0.2.5", default-features = false } alloy-evm = { version = "0.25.2", default-features = false } alloy-dyn-abi = "1.4.1" alloy-eip7928 = { version = "0.2.0", default-features = false, git = "https://github.com/rakita/alloy-eips.git", rev = "734beaf" } alloy-eip2124 = { version = "0.2.0", default-features = false } alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] } -alloy-sol-macro = "1.4.1" -alloy-sol-types = { version = "1.4.1", default-features = false } +alloy-sol-macro = "1.5.0" +alloy-sol-types = { version = "1.5.0", default-features = false } alloy-trie = { version = "0.9.1", default-features = false } alloy-hardforks = "0.4.5" diff --git a/Dockerfile b/Dockerfile index b61c177525..22fb65ffbc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ FROM chef AS builder COPY --from=planner /app/recipe.json recipe.json # Build profile, release by default -ARG BUILD_PROFILE=release +ARG BUILD_PROFILE=maxperf ENV BUILD_PROFILE=$BUILD_PROFILE # Extra Cargo flags diff --git a/DockerfileOp b/DockerfileOp index ff65dc276b..ba6e6627fd 100644 --- a/DockerfileOp +++ b/DockerfileOp @@ -14,7 +14,7 @@ RUN cargo chef prepare --recipe-path recipe.json FROM chef AS builder COPY --from=planner /app/recipe.json recipe.json -ARG BUILD_PROFILE=release +ARG BUILD_PROFILE=maxperf ENV BUILD_PROFILE=$BUILD_PROFILE ARG RUSTFLAGS="" diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 223dfe1bdb..1bac9af40a 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -81,7 +81,7 @@ backon.workspace = true tempfile.workspace = true [features] -default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer"] +default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"] otlp = [ "reth-ethereum-cli/otlp", @@ -102,7 +102,9 @@ asm-keccak = [ "reth-ethereum-cli/asm-keccak", "reth-node-ethereum/asm-keccak", ] - +keccak-cache-global = [ + "reth-node-ethereum/keccak-cache-global", +] jemalloc = [ "reth-cli-util/jemalloc", "reth-node-core/jemalloc", diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 500439d9a7..bfc871a209 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -112,6 +112,7 @@ impl EnvironmentArgs { }; // TransactionDB only support read-write mode let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb()) + .with_default_tables() .with_database_log_level(self.db.log_level) .build()?; diff --git a/crates/cli/commands/src/p2p/mod.rs b/crates/cli/commands/src/p2p/mod.rs index d255f5fd77..31fa408db6 100644 --- a/crates/cli/commands/src/p2p/mod.rs +++ b/crates/cli/commands/src/p2p/mod.rs @@ -189,7 +189,7 @@ impl DownloadArgs { let net = NetworkConfigBuilder::::new(p2p_secret_key) .peer_config(config.peers_config_with_basic_nodes_from_file(None)) - .external_ip_resolver(self.network.nat) + .external_ip_resolver(self.network.nat.clone()) .network_id(self.network.network_id) .boot_nodes(boot_nodes.clone()) .apply(|builder| { diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index 0978734016..513087bd7d 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -244,6 +244,7 @@ fn bench_state_root(c: &mut Criterion) { StateProviderBuilder::new(provider.clone(), genesis_hash, None), OverlayStateProviderFactory::new(provider), &TreeConfig::default(), + None, ); let mut state_hook = handle.state_hook(); diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index b4b1f755ab..a0f4db4b1c 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -39,7 +39,7 @@ where { /// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and /// [`CachedStateMetrics`]. - pub(crate) const fn new_with_caches( + pub(crate) const fn new( state_provider: S, caches: ExecutionCache, metrics: CachedStateMetrics, @@ -785,7 +785,7 @@ mod tests { let caches = ExecutionCacheBuilder::default().build_caches(1000); let state_provider = - CachedStateProvider::new_with_caches(provider, caches, CachedStateMetrics::zeroed()); + CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed()); // check that the storage is empty let res = state_provider.storage(address, storage_key); @@ -808,7 +808,7 @@ mod tests { let caches = ExecutionCacheBuilder::default().build_caches(1000); let state_provider = - CachedStateProvider::new_with_caches(provider, caches, CachedStateMetrics::zeroed()); + CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed()); // check that the storage returns the expected value let res = state_provider.storage(address, storage_key); diff --git a/crates/engine/tree/src/tree/instrumented_state.rs b/crates/engine/tree/src/tree/instrumented_state.rs index ee3c285957..02ab395dc3 100644 --- a/crates/engine/tree/src/tree/instrumented_state.rs +++ b/crates/engine/tree/src/tree/instrumented_state.rs @@ -83,7 +83,7 @@ where { /// Creates a new [`InstrumentedStateProvider`] from a state provider with the provided label /// for metrics. - pub fn from_state_provider(state_provider: S, source: &'static str) -> Self { + pub fn new(state_provider: S, source: &'static str) -> Self { Self { state_provider, metrics: StateProviderMetrics::new_with_labels(&[("source", source)]), diff --git a/crates/engine/tree/src/tree/payload_processor/bal.rs b/crates/engine/tree/src/tree/payload_processor/bal.rs new file mode 100644 index 0000000000..5bda2cdb36 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/bal.rs @@ -0,0 +1,318 @@ +//! BAL (Block Access List, EIP-7928) related functionality. + +use alloy_consensus::constants::KECCAK_EMPTY; +use alloy_eip7928::BlockAccessList; +use alloy_primitives::{keccak256, U256}; +use reth_primitives_traits::Account; +use reth_provider::{AccountReader, ProviderError}; +use reth_trie::{HashedPostState, HashedStorage}; + +/// Converts a Block Access List into a [`HashedPostState`] by extracting the final state +/// of modified accounts and storage slots. +pub fn bal_to_hashed_post_state

( + bal: &BlockAccessList, + provider: &P, +) -> Result +where + P: AccountReader, +{ + let mut hashed_state = HashedPostState::with_capacity(bal.len()); + + for account_changes in bal { + let address = account_changes.address; + let hashed_address = keccak256(address); + + // Get the latest balance (last balance change if any) + let balance = account_changes.balance_changes.last().map(|change| change.post_balance); + + // Get the latest nonce (last nonce change if any) + let nonce = account_changes.nonce_changes.last().map(|change| change.new_nonce); + + // Get the latest code (last code change if any) + let code_hash = if let Some(code_change) = account_changes.code_changes.last() { + if code_change.new_code.is_empty() { + Some(Some(KECCAK_EMPTY)) + } else { + Some(Some(keccak256(&code_change.new_code))) + } + } else { + None + }; + + // Only fetch account from provider if we're missing any field + let existing_account = if balance.is_none() || nonce.is_none() || code_hash.is_none() { + provider.basic_account(&address)? + } else { + None + }; + + // Build the final account state + let account = Account { + balance: balance.unwrap_or_else(|| { + existing_account.as_ref().map(|acc| acc.balance).unwrap_or(U256::ZERO) + }), + nonce: nonce + .unwrap_or_else(|| existing_account.as_ref().map(|acc| acc.nonce).unwrap_or(0)), + bytecode_hash: code_hash.unwrap_or_else(|| { + existing_account.as_ref().and_then(|acc| acc.bytecode_hash).or(Some(KECCAK_EMPTY)) + }), + }; + + hashed_state.accounts.insert(hashed_address, Some(account)); + + // Process storage changes + if !account_changes.storage_changes.is_empty() { + let mut storage_map = HashedStorage::new(false); + + for slot_changes in &account_changes.storage_changes { + let hashed_slot = keccak256(slot_changes.slot); + + // Get the last change for this slot + if let Some(last_change) = slot_changes.changes.last() { + storage_map + .storage + .insert(hashed_slot, U256::from_be_bytes(last_change.new_value.0)); + } + } + + if !storage_map.storage.is_empty() { + hashed_state.storages.insert(hashed_address, storage_map); + } + } + } + + Ok(hashed_state) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_eip7928::{ + AccountChanges, BalanceChange, CodeChange, NonceChange, SlotChanges, StorageChange, + }; + use alloy_primitives::{Address, Bytes, StorageKey, B256}; + use reth_revm::test_utils::StateProviderTest; + + #[test] + fn test_bal_to_hashed_post_state_basic() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + let account_changes = AccountChanges { + address, + storage_changes: vec![], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(100))], + nonce_changes: vec![NonceChange::new(0, 1)], + code_changes: vec![], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + assert_eq!(result.accounts.len(), 1); + + let hashed_address = keccak256(address); + let account_opt = result.accounts.get(&hashed_address).unwrap(); + assert!(account_opt.is_some()); + + let account = account_opt.as_ref().unwrap(); + assert_eq!(account.balance, U256::from(100)); + assert_eq!(account.nonce, 1); + assert_eq!(account.bytecode_hash, Some(KECCAK_EMPTY)); + } + + #[test] + fn test_bal_with_storage_changes() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + let slot = StorageKey::random(); + let value = B256::random(); + + let slot_changes = SlotChanges { slot, changes: vec![StorageChange::new(0, value)] }; + + let account_changes = AccountChanges { + address, + storage_changes: vec![slot_changes], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(500))], + nonce_changes: vec![NonceChange::new(0, 2)], + code_changes: vec![], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + assert!(result.storages.contains_key(&hashed_address)); + + let storage = result.storages.get(&hashed_address).unwrap(); + let hashed_slot = keccak256(slot); + + let stored_value = storage.storage.get(&hashed_slot).unwrap(); + assert_eq!(*stored_value, U256::from_be_bytes(value.0)); + } + + #[test] + fn test_bal_with_code_change() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + let code = Bytes::from(vec![0x60, 0x80, 0x60, 0x40]); // Some bytecode + + let account_changes = AccountChanges { + address, + storage_changes: vec![], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(1000))], + nonce_changes: vec![NonceChange::new(0, 1)], + code_changes: vec![CodeChange::new(0, code.clone())], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + let account_opt = result.accounts.get(&hashed_address).unwrap(); + let account = account_opt.as_ref().unwrap(); + + let expected_code_hash = keccak256(&code); + assert_eq!(account.bytecode_hash, Some(expected_code_hash)); + } + + #[test] + fn test_bal_with_empty_code() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + let empty_code = Bytes::default(); + + let account_changes = AccountChanges { + address, + storage_changes: vec![], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(1000))], + nonce_changes: vec![NonceChange::new(0, 1)], + code_changes: vec![CodeChange::new(0, empty_code)], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + let account_opt = result.accounts.get(&hashed_address).unwrap(); + let account = account_opt.as_ref().unwrap(); + + assert_eq!(account.bytecode_hash, Some(KECCAK_EMPTY)); + } + + #[test] + fn test_bal_multiple_changes_takes_last() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + + // Multiple balance changes - should take the last one + let account_changes = AccountChanges { + address, + storage_changes: vec![], + storage_reads: vec![], + balance_changes: vec![ + BalanceChange::new(0, U256::from(100)), + BalanceChange::new(1, U256::from(200)), + BalanceChange::new(2, U256::from(300)), + ], + nonce_changes: vec![ + NonceChange::new(0, 1), + NonceChange::new(1, 2), + NonceChange::new(2, 3), + ], + code_changes: vec![], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + let account_opt = result.accounts.get(&hashed_address).unwrap(); + let account = account_opt.as_ref().unwrap(); + + // Should have the last values + assert_eq!(account.balance, U256::from(300)); + assert_eq!(account.nonce, 3); + } + + #[test] + fn test_bal_uses_provider_for_missing_fields() { + let mut provider = StateProviderTest::default(); + + let address = Address::random(); + let code_hash = B256::random(); + let existing_account = + Account { balance: U256::from(999), nonce: 42, bytecode_hash: Some(code_hash) }; + provider.insert_account(address, existing_account, None, Default::default()); + + // Only change balance, nonce and code should come from provider + let account_changes = AccountChanges { + address, + storage_changes: vec![], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(1500))], + nonce_changes: vec![], + code_changes: vec![], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + let account_opt = result.accounts.get(&hashed_address).unwrap(); + let account = account_opt.as_ref().unwrap(); + + // Balance should be updated + assert_eq!(account.balance, U256::from(1500)); + // Nonce and bytecode_hash should come from provider + assert_eq!(account.nonce, 42); + assert_eq!(account.bytecode_hash, Some(code_hash)); + } + + #[test] + fn test_bal_multiple_storage_changes_per_slot() { + let provider = StateProviderTest::default(); + + let address = Address::random(); + let slot = StorageKey::random(); + + // Multiple changes to the same slot - should take the last one + let slot_changes = SlotChanges { + slot, + changes: vec![ + StorageChange::new(0, B256::from(U256::from(100).to_be_bytes::<32>())), + StorageChange::new(1, B256::from(U256::from(200).to_be_bytes::<32>())), + StorageChange::new(2, B256::from(U256::from(300).to_be_bytes::<32>())), + ], + }; + + let account_changes = AccountChanges { + address, + storage_changes: vec![slot_changes], + storage_reads: vec![], + balance_changes: vec![BalanceChange::new(0, U256::from(100))], + nonce_changes: vec![NonceChange::new(0, 1)], + code_changes: vec![], + }; + + let bal = vec![account_changes]; + let result = bal_to_hashed_post_state(&bal, &provider).unwrap(); + + let hashed_address = keccak256(address); + let storage = result.storages.get(&hashed_address).unwrap(); + let hashed_slot = keccak256(slot); + + let stored_value = storage.storage.get(&hashed_slot).unwrap(); + + // Should have the last value + assert_eq!(*stored_value, U256::from(300)); + } +} diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 3d3231ee11..b8a4cbc492 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -13,6 +13,7 @@ use crate::tree::{ sparse_trie::SparseTrieTask, StateProviderBuilder, TreeConfig, }; +use alloy_eip7928::BlockAccessList; use alloy_eips::eip1898::BlockWithParent; use alloy_evm::{block::StateChangeSource, ToTxEnv}; use alloy_primitives::B256; @@ -49,8 +50,9 @@ use std::{ }, time::Instant, }; -use tracing::{debug, debug_span, instrument, warn, Span}; +use tracing::{debug, debug_span, error, instrument, warn, Span}; +pub mod bal; mod configured_sparse_trie; pub mod executor; pub mod multiproof; @@ -212,6 +214,7 @@ where provider_builder: StateProviderBuilder, multiproof_provider_factory: F, config: &TreeConfig, + bal: Option>, ) -> PayloadHandle, I::Tx>, I::Error> where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, @@ -252,19 +255,45 @@ where // wire the multiproof task to the prewarm task let to_multi_proof = Some(multi_proof_task.state_root_message_sender()); - let prewarm_handle = self.spawn_caching_with( - env, - prewarm_rx, - transaction_count_hint, - provider_builder, - to_multi_proof.clone(), - ); + // Handle BAL-based optimization if available + let prewarm_handle = if let Some(bal) = bal { + // When BAL is present, skip spawning prewarm tasks entirely and send BAL to multiproof + debug!(target: "engine::tree::payload_processor", "BAL present, skipping prewarm tasks"); + + // Send BAL message immediately to MultiProofTask + if let Some(ref sender) = to_multi_proof && + let Err(err) = sender.send(MultiProofMessage::BlockAccessList(bal)) + { + // In this case state root validation will simply fail + error!(target: "engine::tree::payload_processor", ?err, "Failed to send BAL to MultiProofTask"); + } + + // Spawn minimal cache-only task without prewarming + self.spawn_caching_with( + env, + prewarm_rx, + transaction_count_hint, + provider_builder.clone(), + None, // Don't send proof targets when BAL is present + ) + } else { + // Normal path: spawn with full prewarming + self.spawn_caching_with( + env, + prewarm_rx, + transaction_count_hint, + provider_builder.clone(), + to_multi_proof.clone(), + ) + }; // spawn multi-proof task let parent_span = span.clone(); self.executor.spawn_blocking(move || { let _enter = parent_span.entered(); - multi_proof_task.run(); + // Build a state provider for the multiproof task + let provider = provider_builder.build().expect("failed to build provider"); + multi_proof_task.run(provider); }); // wire the sparse trie to the state root response receiver @@ -595,7 +624,7 @@ impl PayloadHandle { move |source: StateChangeSource, state: &EvmState| { if let Some(sender) = &to_multi_proof { - let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone())); + let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone())); } } } @@ -721,6 +750,8 @@ impl ExecutionCache { cache .as_ref() + // Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold + // a reference to this cache. We can only reuse it when we have exclusive access. .filter(|c| c.executed_block_hash() == parent_hash && c.is_available()) .cloned() } @@ -1064,6 +1095,7 @@ mod tests { StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), OverlayStateProviderFactory::new(provider_factory), &TreeConfig::default(), + None, // No BAL for test ); let mut state_hook = handle.state_hook(); diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 6a071fc5e4..05018c1fff 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -1,5 +1,7 @@ //! Multiproof task related functionality. +use crate::tree::payload_processor::bal::bal_to_hashed_post_state; +use alloy_eip7928::BlockAccessList; use alloy_evm::block::StateChangeSource; use alloy_primitives::{ keccak256, @@ -11,6 +13,7 @@ use dashmap::DashMap; use derive_more::derive::Deref; use metrics::{Gauge, Histogram}; use reth_metrics::Metrics; +use reth_provider::AccountReader; use reth_revm::state::EvmState; use reth_trie::{ added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage, @@ -26,6 +29,30 @@ use reth_trie_parallel::{ use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant}; use tracing::{debug, error, instrument, trace}; +/// Source of state changes, either from EVM execution or from a Block Access List. +#[derive(Clone, Copy)] +pub enum Source { + /// State changes from EVM execution. + Evm(StateChangeSource), + /// State changes from Block Access List (EIP-7928). + BlockAccessList, +} + +impl std::fmt::Debug for Source { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Evm(source) => source.fmt(f), + Self::BlockAccessList => f.write_str("BlockAccessList"), + } + } +} + +impl From for Source { + fn from(source: StateChangeSource) -> Self { + Self::Evm(source) + } +} + /// Maximum number of targets to batch together for prefetch batching. /// Prefetches are just proof requests (no state merging), so we allow a higher cap than state /// updates @@ -82,7 +109,7 @@ pub(super) enum MultiProofMessage { /// Prefetch proof targets PrefetchProofs(MultiProofTargets), /// New state update from transaction execution with its source - StateUpdate(StateChangeSource, EvmState), + StateUpdate(Source, EvmState), /// State update that can be applied to the sparse trie without any new proofs. /// /// It can be the case when all accounts and storage slots from the state update were already @@ -93,6 +120,11 @@ pub(super) enum MultiProofMessage { /// The state update that was used to calculate the proof state: HashedPostState, }, + /// Block Access List (EIP-7928; BAL) containing complete state changes for the block. + /// + /// When received, the task generates a single state update from the BAL and processes it. + /// No further messages are expected after receiving this variant. + BlockAccessList(Arc), /// Signals state update stream end. /// /// This is triggered by block execution, indicating that no additional state updates are @@ -280,7 +312,7 @@ impl StorageMultiproofInput { /// Input parameters for dispatching a multiproof calculation. #[derive(Debug)] struct MultiproofInput { - source: Option, + source: Option, hashed_state_update: HashedPostState, proof_targets: MultiProofTargets, proof_sequence_number: u64, @@ -883,9 +915,19 @@ impl MultiProofTask { skip(self, update), fields(accounts = update.len(), chunks = 0) )] - fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 { + fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 { let hashed_state_update = evm_state_to_hashed_post_state(update); + self.on_hashed_state_update(source, hashed_state_update) + } + /// Processes a hashed state update and dispatches multiproofs as needed. + /// + /// Returns the number of state updates dispatched (both `EmptyProof` and regular multiproofs). + fn on_hashed_state_update( + &mut self, + source: Source, + hashed_state_update: HashedPostState, + ) -> u64 { // Update removed keys based on the state update. self.multi_added_removed_keys.update_with_state(&hashed_state_update); @@ -982,12 +1024,16 @@ impl MultiProofTask { /// This preserves ordering without requeuing onto the channel. /// /// Returns `true` if done, `false` to continue. - fn process_multiproof_message( + fn process_multiproof_message

( &mut self, msg: MultiProofMessage, ctx: &mut MultiproofBatchCtx, batch_metrics: &mut MultiproofBatchMetrics, - ) -> bool { + provider: &P, + ) -> bool + where + P: AccountReader, + { match msg { // Prefetch proofs: batch consecutive prefetch requests up to target/message limits MultiProofMessage::PrefetchProofs(targets) => { @@ -1146,6 +1192,56 @@ impl MultiProofTask { false } + // Process Block Access List (BAL) - complete state changes provided upfront + MultiProofMessage::BlockAccessList(bal) => { + trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::BAL"); + + if ctx.first_update_time.is_none() { + self.metrics + .first_update_wait_time_histogram + .record(ctx.start.elapsed().as_secs_f64()); + ctx.first_update_time = Some(Instant::now()); + debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation from BAL"); + } + + // Convert BAL to HashedPostState and process it + match bal_to_hashed_post_state(&bal, &provider) { + Ok(hashed_state) => { + debug!( + target: "engine::tree::payload_processor::multiproof", + accounts = hashed_state.accounts.len(), + storages = hashed_state.storages.len(), + "Processing BAL state update" + ); + + // Use BlockAccessList as source for BAL-derived state updates + batch_metrics.state_update_proofs_requested += + self.on_hashed_state_update(Source::BlockAccessList, hashed_state); + } + Err(err) => { + error!(target: "engine::tree::payload_processor::multiproof", ?err, "Failed to convert BAL to hashed state"); + return true; + } + } + + // Mark updates as finished since BAL provides complete state + ctx.updates_finished_time = Some(Instant::now()); + + // Check if we're done (might need to wait for proofs to complete) + if self.is_done( + batch_metrics.proofs_processed, + batch_metrics.state_update_proofs_requested, + batch_metrics.prefetch_proofs_requested, + ctx.updates_finished(), + ) { + debug!( + target: "engine::tree::payload_processor::multiproof", + "BAL processed and all proofs complete, ending calculation" + ); + return true; + } + false + } // Signal that no more state updates will arrive MultiProofMessage::FinishedStateUpdates => { trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates"); @@ -1238,7 +1334,10 @@ impl MultiProofTask { target = "engine::tree::payload_processor::multiproof", skip_all )] - pub(crate) fn run(mut self) { + pub(crate) fn run

(mut self, provider: P) + where + P: AccountReader, + { let mut ctx = MultiproofBatchCtx::new(Instant::now()); let mut batch_metrics = MultiproofBatchMetrics::default(); @@ -1248,7 +1347,7 @@ impl MultiProofTask { trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop"); if let Some(msg) = ctx.pending_msg.take() { - if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics) { + if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) { break 'main; } continue; @@ -1323,7 +1422,7 @@ impl MultiProofTask { } }; - if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics) { + if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) { break 'main; } } @@ -1359,6 +1458,9 @@ impl MultiProofTask { /// Context for multiproof message batching loop. /// /// Contains processing state that persists across loop iterations. +/// +/// Used by `process_multiproof_message` to batch consecutive same-type messages received via +/// `try_recv` for efficient processing. struct MultiproofBatchCtx { /// Buffers a non-matching message type encountered during batching. /// Processed first in next iteration to preserve ordering while allowing same-type @@ -1374,7 +1476,7 @@ struct MultiproofBatchCtx { /// Reusable buffer for accumulating prefetch targets during batching. accumulated_prefetch_targets: Vec, /// Reusable buffer for accumulating state updates during batching. - accumulated_state_updates: Vec<(StateChangeSource, EvmState)>, + accumulated_state_updates: Vec<(Source, EvmState)>, } impl MultiproofBatchCtx { @@ -1492,34 +1594,44 @@ where /// are safe to merge because they originate from the same logical execution and can be /// coalesced to amortize proof work. fn can_batch_state_update( - batch_source: StateChangeSource, + batch_source: Source, batch_update: &EvmState, - next_source: StateChangeSource, + next_source: Source, next_update: &EvmState, ) -> bool { - if !same_state_change_source(batch_source, next_source) { + if !same_source(batch_source, next_source) { return false; } match (batch_source, next_source) { - (StateChangeSource::PreBlock(_), StateChangeSource::PreBlock(_)) | - (StateChangeSource::PostBlock(_), StateChangeSource::PostBlock(_)) => { - batch_update == next_update - } + ( + Source::Evm(StateChangeSource::PreBlock(_)), + Source::Evm(StateChangeSource::PreBlock(_)), + ) | + ( + Source::Evm(StateChangeSource::PostBlock(_)), + Source::Evm(StateChangeSource::PostBlock(_)), + ) => batch_update == next_update, _ => true, } } -/// Checks whether two state change sources refer to the same origin. -fn same_state_change_source(lhs: StateChangeSource, rhs: StateChangeSource) -> bool { +/// Checks whether two sources refer to the same origin. +fn same_source(lhs: Source, rhs: Source) -> bool { match (lhs, rhs) { - (StateChangeSource::Transaction(a), StateChangeSource::Transaction(b)) => a == b, - (StateChangeSource::PreBlock(a), StateChangeSource::PreBlock(b)) => { - mem::discriminant(&a) == mem::discriminant(&b) - } - (StateChangeSource::PostBlock(a), StateChangeSource::PostBlock(b)) => { - mem::discriminant(&a) == mem::discriminant(&b) - } + ( + Source::Evm(StateChangeSource::Transaction(a)), + Source::Evm(StateChangeSource::Transaction(b)), + ) => a == b, + ( + Source::Evm(StateChangeSource::PreBlock(a)), + Source::Evm(StateChangeSource::PreBlock(b)), + ) => mem::discriminant(&a) == mem::discriminant(&b), + ( + Source::Evm(StateChangeSource::PostBlock(a)), + Source::Evm(StateChangeSource::PostBlock(b)), + ) => mem::discriminant(&a) == mem::discriminant(&b), + (Source::BlockAccessList, Source::BlockAccessList) => true, _ => false, } } @@ -1539,7 +1651,8 @@ fn estimate_evm_state_targets(state: &EvmState) -> usize { #[cfg(test)] mod tests { use super::*; - use alloy_primitives::map::B256Set; + use alloy_eip7928::{AccountChanges, BalanceChange}; + use alloy_primitives::{map::B256Set, Address}; use reth_provider::{ providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory, BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader, @@ -1548,7 +1661,7 @@ mod tests { use reth_trie::MultiProof; use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle}; use revm_primitives::{B256, U256}; - use std::sync::OnceLock; + use std::sync::{Arc, OnceLock}; use tokio::runtime::{Handle, Runtime}; /// Get a handle to the test runtime, creating it if necessary @@ -2113,8 +2226,8 @@ mod tests { let source = StateChangeSource::Transaction(0); let tx = task.state_root_message_sender(); - tx.send(MultiProofMessage::StateUpdate(source, update1.clone())).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, update2.clone())).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap(); let proofs_requested = if let Ok(MultiProofMessage::StateUpdate(_src, update)) = task.rx.recv() { @@ -2133,7 +2246,7 @@ mod tests { assert!(merged_update.contains_key(&addr1)); assert!(merged_update.contains_key(&addr2)); - task.on_state_update(source, merged_update) + task.on_state_update(source.into(), merged_update) } else { panic!("Expected StateUpdate message"); }; @@ -2179,20 +2292,20 @@ mod tests { // Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending) let tx = task.state_root_message_sender(); - tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a1, 100))) + tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100))) .unwrap(); - tx.send(MultiProofMessage::StateUpdate(source_b, create_state_update(addr_b1, 200))) + tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200))) .unwrap(); - tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a2, 300))) + tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a2, 300))) .unwrap(); let mut pending_msg: Option = None; if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() { - assert!(same_state_change_source(first_source, source_a)); + assert!(same_source(first_source, source_a.into())); // Simulate batching loop for remaining messages - let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new(); + let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new(); let mut accumulated_targets = 0usize; loop { @@ -2240,7 +2353,7 @@ mod tests { assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources"); let batch_source = accumulated_updates[0].0; - assert!(same_state_change_source(batch_source, source_b)); + assert!(same_source(batch_source, source_b.into())); let batch_source = accumulated_updates[0].0; let mut merged_update = accumulated_updates.remove(0).1; @@ -2248,10 +2361,7 @@ mod tests { merged_update.extend(next_update); } - assert!( - same_state_change_source(batch_source, source_b), - "Batch should use matching source" - ); + assert!(same_source(batch_source, source_b.into()), "Batch should use matching source"); assert!(merged_update.contains_key(&addr_b1)); assert!(!merged_update.contains_key(&addr_a1)); assert!(!merged_update.contains_key(&addr_a2)); @@ -2261,7 +2371,7 @@ mod tests { match pending_msg { Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => { - assert!(same_state_change_source(pending_source, source_a)); + assert!(same_source(pending_source, source_a.into())); assert!(pending_update.contains_key(&addr_a2)); } other => panic!("Expected pending StateUpdate with source_a, got {:?}", other), @@ -2306,17 +2416,20 @@ mod tests { // Queue: first update dispatched immediately, next two should not merge let tx = task.state_root_message_sender(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr1, 100))).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr2, 200))).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr3, 300))).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100))) + .unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200))) + .unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr3, 300))) + .unwrap(); let mut pending_msg: Option = None; if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() { - assert!(same_state_change_source(first_source, source)); + assert!(same_source(first_source, source.into())); assert!(first_update.contains_key(&addr1)); - let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new(); + let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new(); let mut accumulated_targets = 0usize; loop { @@ -2368,7 +2481,7 @@ mod tests { "Second pre-block update should not merge with a different payload" ); let (batched_source, batched_update) = accumulated_updates.remove(0); - assert!(same_state_change_source(batched_source, source)); + assert!(same_source(batched_source, source.into())); assert!(batched_update.contains_key(&addr2)); assert!(!batched_update.contains_key(&addr3)); @@ -2452,8 +2565,8 @@ mod tests { let tx = task.state_root_message_sender(); tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, state_update1)).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, state_update2)).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap(); // Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2) @@ -2520,6 +2633,7 @@ mod tests { use revm_state::Account; let test_provider_factory = create_test_provider_factory(); + let test_provider = test_provider_factory.latest().unwrap(); let mut task = create_test_state_root_task(test_provider_factory); // Queue: Prefetch1, StateUpdate, Prefetch2 @@ -2553,7 +2667,7 @@ mod tests { let tx = task.state_root_message_sender(); tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, state_update)).unwrap(); + tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap(); let mut ctx = MultiproofBatchCtx::new(Instant::now()); @@ -2562,12 +2676,22 @@ mod tests { // First message: Prefetch1 batches; StateUpdate becomes pending. let first = task.rx.recv().unwrap(); assert!(matches!(first, MultiProofMessage::PrefetchProofs(_))); - assert!(!task.process_multiproof_message(first, &mut ctx, &mut batch_metrics)); + assert!(!task.process_multiproof_message( + first, + &mut ctx, + &mut batch_metrics, + &test_provider + )); let pending = ctx.pending_msg.take().expect("pending message captured"); assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _))); // Pending message should be handled before the next select loop. - assert!(!task.process_multiproof_message(pending, &mut ctx, &mut batch_metrics)); + assert!(!task.process_multiproof_message( + pending, + &mut ctx, + &mut batch_metrics, + &test_provider + )); // Prefetch2 should now be in pending_msg (captured by StateUpdate's batching loop). match ctx.pending_msg.take() { @@ -2641,12 +2765,21 @@ mod tests { // Queue: [Prefetch1, State1, State2, State3, Prefetch2] let tx = task.state_root_message_sender(); tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(state_addr1, 100))) - .unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(state_addr2, 200))) - .unwrap(); - tx.send(MultiProofMessage::StateUpdate(source, create_state_update(state_addr3, 300))) - .unwrap(); + tx.send(MultiProofMessage::StateUpdate( + source.into(), + create_state_update(state_addr1, 100), + )) + .unwrap(); + tx.send(MultiProofMessage::StateUpdate( + source.into(), + create_state_update(state_addr2, 200), + )) + .unwrap(); + tx.send(MultiProofMessage::StateUpdate( + source.into(), + create_state_update(state_addr3, 300), + )) + .unwrap(); tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap(); // Simulate the state-machine loop behavior @@ -2719,4 +2852,44 @@ mod tests { _ => panic!("Prefetch2 was lost!"), } } + + /// Verifies that BAL messages are processed correctly and generate state updates. + #[test] + fn test_bal_message_processing() { + let test_provider_factory = create_test_provider_factory(); + let test_provider = test_provider_factory.latest().unwrap(); + let mut task = create_test_state_root_task(test_provider_factory); + + // Create a simple BAL with one account change + let account_address = Address::random(); + let account_changes = AccountChanges { + address: account_address, + balance_changes: vec![BalanceChange::new(0, U256::from(1000))], + nonce_changes: vec![], + code_changes: vec![], + storage_changes: vec![], + storage_reads: vec![], + }; + + let bal = Arc::new(vec![account_changes]); + + let mut ctx = MultiproofBatchCtx::new(Instant::now()); + let mut batch_metrics = MultiproofBatchMetrics::default(); + + let should_finish = task.process_multiproof_message( + MultiProofMessage::BlockAccessList(bal), + &mut ctx, + &mut batch_metrics, + &test_provider, + ); + + // BAL should mark updates as finished + assert!(ctx.updates_finished_time.is_some()); + + // Should have dispatched state update proofs + assert!(batch_metrics.state_update_proofs_requested > 0); + + // Should need to wait for the results of those proofs to arrive + assert!(!should_finish, "Should continue waiting for proofs"); + } } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 17845d50dd..d57ee5836b 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -407,7 +407,7 @@ where let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache { let caches = saved_cache.cache().clone(); let cache_metrics = saved_cache.metrics().clone(); - Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics)) + Box::new(CachedStateProvider::new(state_provider, caches, cache_metrics)) } else { state_provider }; diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 95eee44739..64ae8ee227 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -374,7 +374,8 @@ where let mut state_provider = ensure_ok!(provider_builder.build()); drop(_enter); - // fetch parent block + // Fetch parent block. This goes to memory most of the time unless the parent block is + // beyond the in-memory buffer. let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state())) else { return Err(InsertBlockError::new( @@ -399,9 +400,17 @@ where "Decided which state root algorithm to run" ); - // use prewarming background task + // Get an iterator over the transactions in the payload let txs = self.tx_iterator_for(&input)?; + // Extract the BAL, if valid and available + let block_access_list = ensure_ok!(input + .block_access_list() + .transpose() + // Eventually gets converted to a `InsertBlockErrorKind::Other` + .map_err(Box::::from)) + .map(Arc::new); + // Spawn the appropriate processor based on strategy let mut handle = ensure_ok!(self.spawn_payload_processor( env.clone(), @@ -410,26 +419,23 @@ where parent_hash, ctx.state(), strategy, + block_access_list, )); // Use cached state provider before executing, used in execution after prewarming threads // complete if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) { - state_provider = Box::new(CachedStateProvider::new_with_caches( - state_provider, - caches, - cache_metrics, - )); + state_provider = + Box::new(CachedStateProvider::new(state_provider, caches, cache_metrics)); }; + if self.config.state_provider_metrics() { + state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine")); + } + // Execute the block and handle any execution errors - let (output, senders) = match if self.config.state_provider_metrics() { - let state_provider = - InstrumentedStateProvider::from_state_provider(&state_provider, "engine"); - self.execute_block(&state_provider, env, &input, &mut handle) - } else { - self.execute_block(&state_provider, env, &input, &mut handle) - } { + let (output, senders) = match self.execute_block(&state_provider, env, &input, &mut handle) + { Ok(output) => output, Err(err) => return self.handle_execution_error(input, err, &parent_block), }; @@ -607,7 +613,7 @@ where { debug!(target: "engine::tree::payload_validator", "Executing block"); let mut db = State::builder() - .with_database(StateProviderDatabase::new(&state_provider)) + .with_database(StateProviderDatabase::new(state_provider)) .with_bundle_update() .with_bal_builder() //TODO .without_state_clear() @@ -781,6 +787,7 @@ where parent_hash: B256, state: &EngineApiTreeState, strategy: StateRootStrategy, + block_access_list: Option>, ) -> Result< PayloadHandle< impl ExecutableTxFor + use, @@ -810,12 +817,14 @@ where .record(trie_input_start.elapsed().as_secs_f64()); let spawn_start = Instant::now(); + let handle = self.payload_processor.spawn( env, txs, provider_builder, multiproof_provider_factory, &self.config, + block_access_list, ); // record prewarming initialization duration diff --git a/crates/era-utils/src/export.rs b/crates/era-utils/src/export.rs index 0502f0e2ea..db8538d3c4 100644 --- a/crates/era-utils/src/export.rs +++ b/crates/era-utils/src/export.rs @@ -150,6 +150,12 @@ where let era1_id = Era1Id::new(&config.network, start_block, block_count as u32) .with_hash(historical_root); + let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 { + era1_id + } else { + era1_id.with_era_count() + }; + debug!("Final file name {}", era1_id.to_file_name()); let file_path = config.dir.join(era1_id.to_file_name()); let file = std::fs::File::create(&file_path)?; diff --git a/crates/era-utils/tests/it/genesis.rs b/crates/era-utils/tests/it/genesis.rs index 0c35c458aa..700fb1f006 100644 --- a/crates/era-utils/tests/it/genesis.rs +++ b/crates/era-utils/tests/it/genesis.rs @@ -24,7 +24,7 @@ fn test_export_with_genesis_only() { assert!(file_path.exists(), "Exported file should exist on disk"); let file_name = file_path.file_name().unwrap().to_str().unwrap(); assert!( - file_name.starts_with("mainnet-00000-00001-"), + file_name.starts_with("mainnet-00000-"), "File should have correct prefix with era format" ); assert!(file_name.ends_with(".era1"), "File should have correct extension"); diff --git a/crates/era/src/common/file_ops.rs b/crates/era/src/common/file_ops.rs index 1a1e1defb7..3938f9ebe8 100644 --- a/crates/era/src/common/file_ops.rs +++ b/crates/era/src/common/file_ops.rs @@ -30,8 +30,11 @@ pub trait EraFileFormat: Sized { /// Era file identifiers pub trait EraFileId: Clone { - /// Convert to standardized file name - fn to_file_name(&self) -> String; + /// File type for this identifier + const FILE_TYPE: EraFileType; + + /// Number of items, slots for `era`, blocks for `era1`, per era + const ITEMS_PER_ERA: u64; /// Get the network name fn network_name(&self) -> &str; @@ -41,6 +44,43 @@ pub trait EraFileId: Clone { /// Get the count of items fn count(&self) -> u32; + + /// Get the optional hash identifier + fn hash(&self) -> Option<[u8; 4]>; + + /// Whether to include era count in filename + fn include_era_count(&self) -> bool; + + /// Calculate era number + fn era_number(&self) -> u64 { + self.start_number() / Self::ITEMS_PER_ERA + } + + /// Calculate the number of eras spanned per file. + /// + /// If the user can decide how many slots/blocks per era file there are, we need to calculate + /// it. Most of the time it should be 1, but it can never be more than 2 eras per file + /// as there is a maximum of 8192 slots/blocks per era file. + fn era_count(&self) -> u64 { + if self.count() == 0 { + return 0; + } + let first_era = self.era_number(); + let last_number = self.start_number() + self.count() as u64 - 1; + let last_era = last_number / Self::ITEMS_PER_ERA; + last_era - first_era + 1 + } + + /// Convert to standardized file name. + fn to_file_name(&self) -> String { + Self::FILE_TYPE.format_filename( + self.network_name(), + self.era_number(), + self.hash(), + self.include_era_count(), + self.era_count(), + ) + } } /// [`StreamReader`] for reading era-format files @@ -154,6 +194,37 @@ impl EraFileType { } } + /// Generate era file name. + /// + /// Standard format: `--.` + /// See also + /// + /// With era count (for custom exports): + /// `---.` + pub fn format_filename( + &self, + network_name: &str, + era_number: u64, + hash: Option<[u8; 4]>, + include_era_count: bool, + era_count: u64, + ) -> String { + let hash = format_hash(hash); + + if include_era_count { + format!( + "{}-{:05}-{:05}-{}{}", + network_name, + era_number, + era_count, + hash, + self.extension() + ) + } else { + format!("{}-{:05}-{}{}", network_name, era_number, hash, self.extension()) + } + } + /// Detect file type from URL /// By default, it assumes `Era` type pub fn from_url(url: &str) -> Self { @@ -164,3 +235,11 @@ impl EraFileType { } } } + +/// Format hash as hex string, or placeholder if none +pub fn format_hash(hash: Option<[u8; 4]>) -> String { + match hash { + Some(h) => format!("{:02x}{:02x}{:02x}{:02x}", h[0], h[1], h[2], h[3]), + None => "00000000".to_string(), + } +} diff --git a/crates/era/src/era/types/group.rs b/crates/era/src/era/types/group.rs index 5051ddae47..2536c0394c 100644 --- a/crates/era/src/era/types/group.rs +++ b/crates/era/src/era/types/group.rs @@ -3,7 +3,7 @@ //! See also use crate::{ - common::file_ops::EraFileId, + common::file_ops::{EraFileId, EraFileType}, e2s::types::{Entry, IndexEntry, SLOT_INDEX}, era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}, }; @@ -163,12 +163,22 @@ pub struct EraId { /// Optional hash identifier for this file /// First 4 bytes of the last historical root in the last state in the era file pub hash: Option<[u8; 4]>, + + /// Whether to include era count in filename + /// It is used for custom exports when we don't use the max number of items per file + include_era_count: bool, } impl EraId { /// Create a new [`EraId`] pub fn new(network_name: impl Into, start_slot: u64, slot_count: u32) -> Self { - Self { network_name: network_name.into(), start_slot, slot_count, hash: None } + Self { + network_name: network_name.into(), + start_slot, + slot_count, + hash: None, + include_era_count: false, + } } /// Add a hash identifier to [`EraId`] @@ -177,32 +187,18 @@ impl EraId { self } - /// Calculate which era number the file starts at - pub const fn era_number(&self) -> u64 { - self.start_slot / SLOTS_PER_HISTORICAL_ROOT - } - - // Helper function to calculate the number of eras per era1 file, - // If the user can decide how many blocks per era1 file there are, we need to calculate it. - // Most of the time it should be 1, but it can never be more than 2 eras per file - // as there is a maximum of 8192 blocks per era1 file. - const fn calculate_era_count(&self) -> u64 { - if self.slot_count == 0 { - return 0; - } - - let first_era = self.era_number(); - - // Calculate the actual last slot number in the range - let last_slot = self.start_slot + self.slot_count as u64 - 1; - // Find which era the last block belongs to - let last_era = last_slot / SLOTS_PER_HISTORICAL_ROOT; - // Count how many eras we span - last_era - first_era + 1 + /// Include era count in filename, for custom slot-per-file exports + pub const fn with_era_count(mut self) -> Self { + self.include_era_count = true; + self } } impl EraFileId for EraId { + const FILE_TYPE: EraFileType = EraFileType::Era; + + const ITEMS_PER_ERA: u64 = SLOTS_PER_HISTORICAL_ROOT; + fn network_name(&self) -> &str { &self.network_name } @@ -214,24 +210,13 @@ impl EraFileId for EraId { fn count(&self) -> u32 { self.slot_count } - /// Convert to file name following the era file naming: - /// `---.era` - /// - /// See also - fn to_file_name(&self) -> String { - let era_number = self.era_number(); - let era_count = self.calculate_era_count(); - if let Some(hash) = self.hash { - format!( - "{}-{:05}-{:05}-{:02x}{:02x}{:02x}{:02x}.era", - self.network_name, era_number, era_count, hash[0], hash[1], hash[2], hash[3] - ) - } else { - // era spec format with placeholder hash when no hash available - // Format: `---00000000.era` - format!("{}-{:05}-{:05}-00000000.era", self.network_name, era_number, era_count) - } + fn hash(&self) -> Option<[u8; 4]> { + self.hash + } + + fn include_era_count(&self) -> bool { + self.include_era_count } } @@ -399,4 +384,40 @@ mod tests { let parsed_offset = index.offsets[0]; assert_eq!(parsed_offset, -1024); } + + #[test_case::test_case( + EraId::new("mainnet", 0, 8192).with_hash([0x4b, 0x36, 0x3d, 0xb9]), + "mainnet-00000-4b363db9.era"; + "Mainnet era 0" + )] + #[test_case::test_case( + EraId::new("mainnet", 8192, 8192).with_hash([0x40, 0xcf, 0x2f, 0x3c]), + "mainnet-00001-40cf2f3c.era"; + "Mainnet era 1" + )] + #[test_case::test_case( + EraId::new("mainnet", 0, 8192), + "mainnet-00000-00000000.era"; + "Without hash" + )] + fn test_era_id_file_naming(id: EraId, expected_file_name: &str) { + let actual_file_name = id.to_file_name(); + assert_eq!(actual_file_name, expected_file_name); + } + + // File naming with era-count, for custom exports + #[test_case::test_case( + EraId::new("mainnet", 0, 8192).with_hash([0x4b, 0x36, 0x3d, 0xb9]).with_era_count(), + "mainnet-00000-00001-4b363db9.era"; + "Mainnet era 0 with count" + )] + #[test_case::test_case( + EraId::new("mainnet", 8000, 500).with_hash([0xab, 0xcd, 0xef, 0x12]).with_era_count(), + "mainnet-00000-00002-abcdef12.era"; + "Spanning two eras with count" + )] + fn test_era_id_file_naming_with_era_count(id: EraId, expected_file_name: &str) { + let actual_file_name = id.to_file_name(); + assert_eq!(actual_file_name, expected_file_name); + } } diff --git a/crates/era/src/era1/types/group.rs b/crates/era/src/era1/types/group.rs index 0f1b8cabd5..4d3a049aa6 100644 --- a/crates/era/src/era1/types/group.rs +++ b/crates/era/src/era1/types/group.rs @@ -3,7 +3,7 @@ //! See also use crate::{ - common::file_ops::EraFileId, + common::file_ops::{EraFileId, EraFileType}, e2s::types::{Entry, IndexEntry}, era1::types::execution::{Accumulator, BlockTuple, MAX_BLOCKS_PER_ERA1}, }; @@ -105,6 +105,10 @@ pub struct Era1Id { /// Optional hash identifier for this file /// First 4 bytes of the last historical root in the last state in the era file pub hash: Option<[u8; 4]>, + + /// Whether to include era count in filename + /// It is used for custom exports when we don't use the max number of items per file + pub include_era_count: bool, } impl Era1Id { @@ -114,7 +118,13 @@ impl Era1Id { start_block: BlockNumber, block_count: u32, ) -> Self { - Self { network_name: network_name.into(), start_block, block_count, hash: None } + Self { + network_name: network_name.into(), + start_block, + block_count, + hash: None, + include_era_count: false, + } } /// Add a hash identifier to [`Era1Id`] @@ -123,21 +133,17 @@ impl Era1Id { self } - // Helper function to calculate the number of eras per era1 file, - // If the user can decide how many blocks per era1 file there are, we need to calculate it. - // Most of the time it should be 1, but it can never be more than 2 eras per file - // as there is a maximum of 8192 blocks per era1 file. - const fn calculate_era_count(&self, first_era: u64) -> u64 { - // Calculate the actual last block number in the range - let last_block = self.start_block + self.block_count as u64 - 1; - // Find which era the last block belongs to - let last_era = last_block / MAX_BLOCKS_PER_ERA1 as u64; - // Count how many eras we span - last_era - first_era + 1 + /// Include era count in filename, for custom block-per-file exports + pub const fn with_era_count(mut self) -> Self { + self.include_era_count = true; + self } } impl EraFileId for Era1Id { + const FILE_TYPE: EraFileType = EraFileType::Era1; + + const ITEMS_PER_ERA: u64 = MAX_BLOCKS_PER_ERA1 as u64; fn network_name(&self) -> &str { &self.network_name } @@ -149,24 +155,13 @@ impl EraFileId for Era1Id { fn count(&self) -> u32 { self.block_count } - /// Convert to file name following the era file naming: - /// `---.era(1)` - /// - /// See also - fn to_file_name(&self) -> String { - // Find which era the first block belongs to - let era_number = self.start_block / MAX_BLOCKS_PER_ERA1 as u64; - let era_count = self.calculate_era_count(era_number); - if let Some(hash) = self.hash { - format!( - "{}-{:05}-{:05}-{:02x}{:02x}{:02x}{:02x}.era1", - self.network_name, era_number, era_count, hash[0], hash[1], hash[2], hash[3] - ) - } else { - // era spec format with placeholder hash when no hash available - // Format: `---00000000.era1` - format!("{}-{:05}-{:05}-00000000.era1", self.network_name, era_number, era_count) - } + + fn hash(&self) -> Option<[u8; 4]> { + self.hash + } + + fn include_era_count(&self) -> bool { + self.include_era_count } } @@ -314,35 +309,51 @@ mod tests { #[test_case::test_case( Era1Id::new("mainnet", 0, 8192).with_hash([0x5e, 0xc1, 0xff, 0xb8]), - "mainnet-00000-00001-5ec1ffb8.era1"; + "mainnet-00000-5ec1ffb8.era1"; "Mainnet era 0" )] #[test_case::test_case( Era1Id::new("mainnet", 8192, 8192).with_hash([0x5e, 0xcb, 0x9b, 0xf9]), - "mainnet-00001-00001-5ecb9bf9.era1"; + "mainnet-00001-5ecb9bf9.era1"; "Mainnet era 1" )] #[test_case::test_case( Era1Id::new("sepolia", 0, 8192).with_hash([0x90, 0x91, 0x84, 0x72]), - "sepolia-00000-00001-90918472.era1"; + "sepolia-00000-90918472.era1"; "Sepolia era 0" )] #[test_case::test_case( Era1Id::new("sepolia", 155648, 8192).with_hash([0xfa, 0x77, 0x00, 0x19]), - "sepolia-00019-00001-fa770019.era1"; + "sepolia-00019-fa770019.era1"; "Sepolia era 19" )] #[test_case::test_case( Era1Id::new("mainnet", 1000, 100), - "mainnet-00000-00001-00000000.era1"; + "mainnet-00000-00000000.era1"; "ID without hash" )] #[test_case::test_case( Era1Id::new("sepolia", 101130240, 8192).with_hash([0xab, 0xcd, 0xef, 0x12]), - "sepolia-12345-00001-abcdef12.era1"; + "sepolia-12345-abcdef12.era1"; "Large block number era 12345" )] - fn test_era1id_file_naming(id: Era1Id, expected_file_name: &str) { + fn test_era1_id_file_naming(id: Era1Id, expected_file_name: &str) { + let actual_file_name = id.to_file_name(); + assert_eq!(actual_file_name, expected_file_name); + } + + // File naming with era-count, for custom exports + #[test_case::test_case( + Era1Id::new("mainnet", 0, 8192).with_hash([0x5e, 0xc1, 0xff, 0xb8]).with_era_count(), + "mainnet-00000-00001-5ec1ffb8.era1"; + "Mainnet era 0 with count" + )] + #[test_case::test_case( + Era1Id::new("mainnet", 8000, 500).with_hash([0xab, 0xcd, 0xef, 0x12]).with_era_count(), + "mainnet-00000-00002-abcdef12.era1"; + "Spanning two eras with count" + )] + fn test_era1_id_file_naming_with_era_count(id: Era1Id, expected_file_name: &str) { let actual_file_name = id.to_file_name(); assert_eq!(actual_file_name, expected_file_name); } diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index 02d1fc4b83..7e250f1498 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -88,6 +88,9 @@ asm-keccak = [ "reth-node-core/asm-keccak", "revm/asm-keccak", ] +keccak-cache-global = [ + "alloy-primitives/keccak-cache-global", +] js-tracer = [ "reth-node-builder/js-tracer", "reth-rpc/js-tracer", diff --git a/crates/ethereum/reth/Cargo.toml b/crates/ethereum/reth/Cargo.toml index a03e847582..a74d53c8eb 100644 --- a/crates/ethereum/reth/Cargo.toml +++ b/crates/ethereum/reth/Cargo.toml @@ -79,7 +79,9 @@ arbitrary = [ "alloy-rpc-types-engine?/arbitrary", "reth-codecs?/arbitrary", ] - +keccak-cache-global = [ + "reth-node-ethereum?/keccak-cache-global", +] test-utils = [ "reth-chainspec/test-utils", "reth-consensus?/test-utils", diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index fdc5ba5de5..36c37e8ab1 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -24,7 +24,7 @@ pub struct Discv4Config { /// The number of allowed consecutive failures for `FindNode` requests. Default: 5. pub max_find_node_failures: u8, /// The interval to use when checking for expired nodes that need to be re-pinged. Default: - /// 10min. + /// 10 seconds. pub ping_interval: Duration, /// The duration of we consider a ping timed out. pub ping_expiration: Duration, @@ -93,7 +93,7 @@ impl Discv4Config { /// Returns the corresponding [`ResolveNatInterval`], if a [`NatResolver`] and an interval was /// configured pub fn resolve_external_ip_interval(&self) -> Option { - let resolver = self.external_ip_resolver?; + let resolver = self.external_ip_resolver.clone()?; let interval = self.resolve_external_ip_interval?; Some(ResolveNatInterval::interval_at(resolver, tokio::time::Instant::now(), interval)) } @@ -275,10 +275,7 @@ impl Discv4ConfigBuilder { } /// Configures if and how the external IP of the node should be resolved. - pub const fn external_ip_resolver( - &mut self, - external_ip_resolver: Option, - ) -> &mut Self { + pub fn external_ip_resolver(&mut self, external_ip_resolver: Option) -> &mut Self { self.config.external_ip_resolver = external_ip_resolver; self } diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 83106cbbe6..0daad65d5a 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -625,10 +625,13 @@ impl Discv4Service { self.lookup_interval = tokio::time::interval(duration); } - /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`]. + /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or + /// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return + /// the first IP address found for the domain associated with the discv4 UDP port. fn resolve_external_ip(&mut self) { if let Some(r) = &self.resolve_external_ip_interval && - let Some(external_ip) = r.resolver().as_external_ip() + let Some(external_ip) = + r.resolver().clone().as_external_ip(self.local_node_record.udp_port) { self.set_external_ip_addr(external_ip); } diff --git a/crates/net/eth-wire-types/src/broadcast.rs b/crates/net/eth-wire-types/src/broadcast.rs index 1900cf004a..9855f6f6cb 100644 --- a/crates/net/eth-wire-types/src/broadcast.rs +++ b/crates/net/eth-wire-types/src/broadcast.rs @@ -169,7 +169,7 @@ impl NewPooledTransactionHashes { matches!(version, EthVersion::Eth67 | EthVersion::Eth66) } Self::Eth68(_) => { - matches!(version, EthVersion::Eth68 | EthVersion::Eth69) + matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70) } } } diff --git a/crates/net/eth-wire-types/src/capability.rs b/crates/net/eth-wire-types/src/capability.rs index f7cd00671f..d35e4c17ee 100644 --- a/crates/net/eth-wire-types/src/capability.rs +++ b/crates/net/eth-wire-types/src/capability.rs @@ -100,6 +100,16 @@ impl Capability { Self::eth(EthVersion::Eth68) } + /// Returns the [`EthVersion::Eth69`] capability. + pub const fn eth_69() -> Self { + Self::eth(EthVersion::Eth69) + } + + /// Returns the [`EthVersion::Eth70`] capability. + pub const fn eth_70() -> Self { + Self::eth(EthVersion::Eth70) + } + /// Whether this is eth v66 protocol. #[inline] pub fn is_eth_v66(&self) -> bool { @@ -118,10 +128,26 @@ impl Capability { self.name == "eth" && self.version == 68 } + /// Whether this is eth v69. + #[inline] + pub fn is_eth_v69(&self) -> bool { + self.name == "eth" && self.version == 69 + } + + /// Whether this is eth v70. + #[inline] + pub fn is_eth_v70(&self) -> bool { + self.name == "eth" && self.version == 70 + } + /// Whether this is any eth version. #[inline] pub fn is_eth(&self) -> bool { - self.is_eth_v66() || self.is_eth_v67() || self.is_eth_v68() + self.is_eth_v66() || + self.is_eth_v67() || + self.is_eth_v68() || + self.is_eth_v69() || + self.is_eth_v70() } } @@ -141,7 +167,7 @@ impl From for Capability { #[cfg(any(test, feature = "arbitrary"))] impl<'a> arbitrary::Arbitrary<'a> for Capability { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { - let version = u.int_in_range(66..=69)?; // Valid eth protocol versions are 66-69 + let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70 // Only generate valid eth protocol name for now since it's the only supported protocol Ok(Self::new_static("eth", version)) } @@ -155,6 +181,8 @@ pub struct Capabilities { eth_66: bool, eth_67: bool, eth_68: bool, + eth_69: bool, + eth_70: bool, } impl Capabilities { @@ -164,6 +192,8 @@ impl Capabilities { eth_66: value.iter().any(Capability::is_eth_v66), eth_67: value.iter().any(Capability::is_eth_v67), eth_68: value.iter().any(Capability::is_eth_v68), + eth_69: value.iter().any(Capability::is_eth_v69), + eth_70: value.iter().any(Capability::is_eth_v70), inner: value, } } @@ -182,7 +212,7 @@ impl Capabilities { /// Whether the peer supports `eth` sub-protocol. #[inline] pub const fn supports_eth(&self) -> bool { - self.eth_68 || self.eth_67 || self.eth_66 + self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66 } /// Whether this peer supports eth v66 protocol. @@ -202,6 +232,18 @@ impl Capabilities { pub const fn supports_eth_v68(&self) -> bool { self.eth_68 } + + /// Whether this peer supports eth v69 protocol. + #[inline] + pub const fn supports_eth_v69(&self) -> bool { + self.eth_69 + } + + /// Whether this peer supports eth v70 protocol. + #[inline] + pub const fn supports_eth_v70(&self) -> bool { + self.eth_70 + } } impl From> for Capabilities { @@ -224,6 +266,8 @@ impl Decodable for Capabilities { eth_66: inner.iter().any(Capability::is_eth_v66), eth_67: inner.iter().any(Capability::is_eth_v67), eth_68: inner.iter().any(Capability::is_eth_v68), + eth_69: inner.iter().any(Capability::is_eth_v69), + eth_70: inner.iter().any(Capability::is_eth_v70), inner, }) } diff --git a/crates/net/eth-wire-types/src/message.rs b/crates/net/eth-wire-types/src/message.rs index 30ec63b43d..cb284145b1 100644 --- a/crates/net/eth-wire-types/src/message.rs +++ b/crates/net/eth-wire-types/src/message.rs @@ -1,4 +1,4 @@ -//! Implements Ethereum wire protocol for versions 66, 67, and 68. +//! Implements Ethereum wire protocol for versions 66 through 70. //! Defines structs/enums for messages, request-response pairs, and broadcasts. //! Handles compatibility with [`EthVersion`]. //! @@ -8,13 +8,13 @@ use super::{ broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, - GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66, + GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69, Transactions, }; use crate::{ status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives, - RawCapabilityMessage, Receipts69, SharedTransactions, + RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions, }; use alloc::{boxed::Box, string::String, sync::Arc}; use alloy_primitives::{ @@ -111,13 +111,29 @@ impl ProtocolMessage { } EthMessage::NodeData(RequestPair::decode(buf)?) } - EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?), - EthMessageID::Receipts => { - if version < EthVersion::Eth69 { - EthMessage::Receipts(RequestPair::decode(buf)?) + EthMessageID::GetReceipts => { + if version >= EthVersion::Eth70 { + EthMessage::GetReceipts70(RequestPair::decode(buf)?) } else { - // with eth69, receipts no longer include the bloom - EthMessage::Receipts69(RequestPair::decode(buf)?) + EthMessage::GetReceipts(RequestPair::decode(buf)?) + } + } + EthMessageID::Receipts => { + match version { + v if v >= EthVersion::Eth70 => { + // eth/70 continues to omit bloom filters and adds the + // `lastBlockIncomplete` flag, encoded as + // `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`. + EthMessage::Receipts70(RequestPair::decode(buf)?) + } + EthVersion::Eth69 => { + // with eth69, receipts no longer include the bloom + EthMessage::Receipts69(RequestPair::decode(buf)?) + } + _ => { + // before eth69 we need to decode the bloom as well + EthMessage::Receipts(RequestPair::decode(buf)?) + } } } EthMessageID::BlockRangeUpdate => { @@ -205,6 +221,9 @@ impl From> for ProtocolBroadcastMes /// /// The `eth/69` announces the historical block range served by the node. Removes total difficulty /// information. And removes the Bloom field from receipts transferred over the protocol. +/// +/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts. +/// requests/responses. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum EthMessage { @@ -259,6 +278,12 @@ pub enum EthMessage { NodeData(RequestPair), /// Represents a `GetReceipts` request-response pair. GetReceipts(RequestPair), + /// Represents a `GetReceipts` request for eth/70. + /// + /// Note: Unlike earlier protocol versions, the eth/70 encoding for + /// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps + /// a [`RequestPair`], but with a custom inline encoding. + GetReceipts70(RequestPair), /// Represents a Receipts request-response pair. #[cfg_attr( feature = "serde", @@ -271,6 +296,16 @@ pub enum EthMessage { serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned") )] Receipts69(RequestPair>), + /// Represents a Receipts request-response pair for eth/70. + #[cfg_attr( + feature = "serde", + serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned") + )] + /// + /// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the + /// request id. The type still wraps a [`RequestPair`], but with a custom + /// inline encoding. + Receipts70(RequestPair>), /// Represents a `BlockRangeUpdate` message broadcast to the network. #[cfg_attr( feature = "serde", @@ -300,8 +335,8 @@ impl EthMessage { Self::PooledTransactions(_) => EthMessageID::PooledTransactions, Self::GetNodeData(_) => EthMessageID::GetNodeData, Self::NodeData(_) => EthMessageID::NodeData, - Self::GetReceipts(_) => EthMessageID::GetReceipts, - Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts, + Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts, + Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts, Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate, Self::Other(msg) => EthMessageID::Other(msg.id as u8), } @@ -314,6 +349,7 @@ impl EthMessage { Self::GetBlockBodies(_) | Self::GetBlockHeaders(_) | Self::GetReceipts(_) | + Self::GetReceipts70(_) | Self::GetPooledTransactions(_) | Self::GetNodeData(_) ) @@ -326,11 +362,40 @@ impl EthMessage { Self::PooledTransactions(_) | Self::Receipts(_) | Self::Receipts69(_) | + Self::Receipts70(_) | Self::BlockHeaders(_) | Self::BlockBodies(_) | Self::NodeData(_) ) } + + /// Converts the message types where applicable. + /// + /// This handles up/downcasting where appropriate, for example for different receipt request + /// types. + pub fn map_versioned(self, version: EthVersion) -> Self { + // For eth/70 peers we send `GetReceipts` using the new eth/70 + // encoding with `firstBlockReceiptIndex = 0`, while keeping the + // user-facing `PeerRequest` API unchanged. + if version >= EthVersion::Eth70 { + return match self { + Self::GetReceipts(pair) => { + let RequestPair { request_id, message } = pair; + let req = RequestPair { + request_id, + message: GetReceipts70 { + first_block_receipt_index: 0, + block_hashes: message.0, + }, + }; + Self::GetReceipts70(req) + } + other => other, + } + } + + self + } } impl Encodable for EthMessage { @@ -351,8 +416,10 @@ impl Encodable for EthMessage { Self::GetNodeData(request) => request.encode(out), Self::NodeData(data) => data.encode(out), Self::GetReceipts(request) => request.encode(out), + Self::GetReceipts70(request) => request.encode(out), Self::Receipts(receipts) => receipts.encode(out), Self::Receipts69(receipt69) => receipt69.encode(out), + Self::Receipts70(receipt70) => receipt70.encode(out), Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out), Self::Other(unknown) => out.put_slice(&unknown.payload), } @@ -374,8 +441,10 @@ impl Encodable for EthMessage { Self::GetNodeData(request) => request.length(), Self::NodeData(data) => data.length(), Self::GetReceipts(request) => request.length(), + Self::GetReceipts70(request) => request.length(), Self::Receipts(receipts) => receipts.length(), Self::Receipts69(receipt69) => receipt69.length(), + Self::Receipts70(receipt70) => receipt70.length(), Self::BlockRangeUpdate(block_range_update) => block_range_update.length(), Self::Other(unknown) => unknown.length(), } diff --git a/crates/net/eth-wire-types/src/receipts.rs b/crates/net/eth-wire-types/src/receipts.rs index 5efa31ede0..3ddd936e04 100644 --- a/crates/net/eth-wire-types/src/receipts.rs +++ b/crates/net/eth-wire-types/src/receipts.rs @@ -17,6 +17,42 @@ pub struct GetReceipts( pub Vec, ); +/// Eth/70 `GetReceipts` request payload that supports partial receipt queries. +/// +/// When used with eth/70, the request id is carried by the surrounding +/// [`crate::message::RequestPair`], and the on-wire shape is the flattened list +/// `firstBlockReceiptIndex, [blockhash₁, ...]`. +/// +/// See also [eip-7975](https://eips.ethereum.org/EIPS/eip-7975) +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct GetReceipts70 { + /// Index into the receipts of the first requested block hash. + pub first_block_receipt_index: u64, + /// The block hashes to request receipts for. + pub block_hashes: Vec, +} + +impl alloy_rlp::Encodable for GetReceipts70 { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + self.first_block_receipt_index.encode(out); + self.block_hashes.encode(out); + } + + fn length(&self) -> usize { + self.first_block_receipt_index.length() + self.block_hashes.length() + } +} + +impl alloy_rlp::Decodable for GetReceipts70 { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let first_block_receipt_index = u64::decode(buf)?; + let block_hashes = Vec::::decode(buf)?; + Ok(Self { first_block_receipt_index, block_hashes }) + } +} + /// The response to [`GetReceipts`], containing receipt lists that correspond to each block /// requested. #[derive(Clone, Debug, PartialEq, Eq, Default)] @@ -58,7 +94,13 @@ pub struct Receipts69(pub Vec>); impl Receipts69 { /// Encodes all receipts with the bloom filter. /// - /// Note: This is an expensive operation that recalculates the bloom for each receipt. + /// Eth/69 omits bloom filters on the wire, while some internal callers + /// (and legacy APIs) still operate on [`Receipts`] with + /// [`ReceiptWithBloom`]. This helper reconstructs the bloom locally from + /// each receipt's logs so the older API can be used on top of eth/69 data. + /// + /// Note: This is an expensive operation that recalculates the bloom for + /// every receipt. pub fn into_with_bloom(self) -> Receipts { Receipts( self.0 @@ -75,6 +117,68 @@ impl From> for Receipts { } } +/// Eth/70 `Receipts` response payload. +/// +/// This is used in conjunction with [`crate::message::RequestPair`] to encode the full wire +/// message `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct Receipts70 { + /// Whether the receipts list for the last block is incomplete. + pub last_block_incomplete: bool, + /// Receipts grouped by block. + pub receipts: Vec>, +} + +impl alloy_rlp::Encodable for Receipts70 +where + T: alloy_rlp::Encodable, +{ + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + self.last_block_incomplete.encode(out); + self.receipts.encode(out); + } + + fn length(&self) -> usize { + self.last_block_incomplete.length() + self.receipts.length() + } +} + +impl alloy_rlp::Decodable for Receipts70 +where + T: alloy_rlp::Decodable, +{ + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let last_block_incomplete = bool::decode(buf)?; + let receipts = Vec::>::decode(buf)?; + Ok(Self { last_block_incomplete, receipts }) + } +} + +impl Receipts70 { + /// Encodes all receipts with the bloom filter. + /// + /// Just like eth/69, eth/70 does not transmit bloom filters over the wire. + /// When higher layers still expect the older bloom-bearing [`Receipts`] + /// type, this helper converts the eth/70 payload into that shape by + /// recomputing the bloom locally from the contained receipts. + /// + /// Note: This is an expensive operation that recalculates the bloom for + /// every receipt. + pub fn into_with_bloom(self) -> Receipts { + // Reuse the eth/69 helper, since both variants carry the same + // receipt list shape (only eth/70 adds request metadata). + Receipts69(self.receipts).into_with_bloom() + } +} + +impl From> for Receipts { + fn from(receipts: Receipts70) -> Self { + receipts.into_with_bloom() + } +} + #[cfg(test)] mod tests { use super::*; @@ -225,4 +329,70 @@ mod tests { let encoded = alloy_rlp::encode(&request); assert_eq!(encoded, data); } + + #[test] + fn encode_get_receipts70_inline_shape() { + let req = RequestPair { + request_id: 1111, + message: GetReceipts70 { + first_block_receipt_index: 0, + block_hashes: vec![ + hex!("00000000000000000000000000000000000000000000000000000000deadc0de").into(), + hex!("00000000000000000000000000000000000000000000000000000000feedbeef").into(), + ], + }, + }; + + let mut out = vec![]; + req.encode(&mut out); + + let mut buf = out.as_slice(); + let header = alloy_rlp::Header::decode(&mut buf).unwrap(); + let payload_start = buf.len(); + let request_id = u64::decode(&mut buf).unwrap(); + let first_block_receipt_index = u64::decode(&mut buf).unwrap(); + let block_hashes = Vec::::decode(&mut buf).unwrap(); + + assert!(buf.is_empty(), "buffer not fully consumed"); + assert_eq!(request_id, 1111); + assert_eq!(first_block_receipt_index, 0); + assert_eq!(block_hashes.len(), 2); + // ensure payload length matches header + assert_eq!(payload_start - buf.len(), header.payload_length); + + let mut buf = out.as_slice(); + let decoded = RequestPair::::decode(&mut buf).unwrap(); + assert!(buf.is_empty(), "buffer not fully consumed on decode"); + assert_eq!(decoded, req); + } + + #[test] + fn encode_receipts70_inline_shape() { + let payload: Receipts70 = + Receipts70 { last_block_incomplete: true, receipts: vec![vec![Receipt::default()]] }; + + let resp = RequestPair { request_id: 7, message: payload }; + + let mut out = vec![]; + resp.encode(&mut out); + + let mut buf = out.as_slice(); + let header = alloy_rlp::Header::decode(&mut buf).unwrap(); + let payload_start = buf.len(); + let request_id = u64::decode(&mut buf).unwrap(); + let last_block_incomplete = bool::decode(&mut buf).unwrap(); + let receipts = Vec::>::decode(&mut buf).unwrap(); + + assert!(buf.is_empty(), "buffer not fully consumed"); + assert_eq!(payload_start - buf.len(), header.payload_length); + assert_eq!(request_id, 7); + assert!(last_block_incomplete); + assert_eq!(receipts.len(), 1); + assert_eq!(receipts[0].len(), 1); + + let mut buf = out.as_slice(); + let decoded = RequestPair::::decode(&mut buf).unwrap(); + assert!(buf.is_empty(), "buffer not fully consumed on decode"); + assert_eq!(decoded, resp); + } } diff --git a/crates/net/eth-wire-types/src/status.rs b/crates/net/eth-wire-types/src/status.rs index 7c9bbe1af4..3a9a28e447 100644 --- a/crates/net/eth-wire-types/src/status.rs +++ b/crates/net/eth-wire-types/src/status.rs @@ -13,7 +13,7 @@ use reth_codecs_derive::add_arbitrary_tests; /// unsupported fields are stripped out. #[derive(Clone, Debug, PartialEq, Eq, Copy)] pub struct UnifiedStatus { - /// The eth protocol version (e.g. eth/66 to eth/69). + /// The eth protocol version (e.g. eth/66 to eth/70). pub version: EthVersion, /// The chain ID identifying the peer’s network. pub chain: Chain, @@ -157,7 +157,7 @@ impl StatusBuilder { self.status } - /// Sets the eth protocol version (e.g., eth/66, eth/69). + /// Sets the eth protocol version (e.g., eth/66, eth/70). pub const fn version(mut self, version: EthVersion) -> Self { self.status.version = version; self @@ -378,8 +378,8 @@ impl Debug for StatusEth69 { } } -/// `StatusMessage` can store either the Legacy version (with TD) or the -/// eth/69 version (omits TD). +/// `StatusMessage` can store either the Legacy version (with TD), or the eth/69+/eth/70 version +/// (omits TD, includes block range). #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum StatusMessage { @@ -546,6 +546,24 @@ mod tests { assert_eq!(unified_status, roundtripped_unified_status); } + #[test] + fn roundtrip_eth70() { + let unified_status = UnifiedStatus::builder() + .version(EthVersion::Eth70) + .chain(Chain::mainnet()) + .genesis(MAINNET_GENESIS_HASH) + .forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 }) + .blockhash(b256!("0xfeb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")) + .total_difficulty(None) + .earliest_block(Some(1)) + .latest_block(Some(2)) + .build(); + + let status_message = unified_status.into_message(); + let roundtripped_unified_status = UnifiedStatus::from_message(status_message); + assert_eq!(unified_status, roundtripped_unified_status); + } + #[test] fn encode_eth69_status_message() { let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d"); diff --git a/crates/net/eth-wire-types/src/version.rs b/crates/net/eth-wire-types/src/version.rs index 8b2e3a424d..6553bd2e41 100644 --- a/crates/net/eth-wire-types/src/version.rs +++ b/crates/net/eth-wire-types/src/version.rs @@ -27,6 +27,8 @@ pub enum EthVersion { Eth68 = 68, /// The `eth` protocol version 69. Eth69 = 69, + /// The `eth` protocol version 70. + Eth70 = 70, } impl EthVersion { @@ -55,6 +57,11 @@ impl EthVersion { pub const fn is_eth69(&self) -> bool { matches!(self, Self::Eth69) } + + /// Returns true if the version is eth/70 + pub const fn is_eth70(&self) -> bool { + matches!(self, Self::Eth70) + } } /// RLP encodes `EthVersion` as a single byte (66-69). @@ -96,6 +103,7 @@ impl TryFrom<&str> for EthVersion { "67" => Ok(Self::Eth67), "68" => Ok(Self::Eth68), "69" => Ok(Self::Eth69), + "70" => Ok(Self::Eth70), _ => Err(ParseVersionError(s.to_string())), } } @@ -120,6 +128,7 @@ impl TryFrom for EthVersion { 67 => Ok(Self::Eth67), 68 => Ok(Self::Eth68), 69 => Ok(Self::Eth69), + 70 => Ok(Self::Eth70), _ => Err(ParseVersionError(u.to_string())), } } @@ -149,6 +158,7 @@ impl From for &'static str { EthVersion::Eth67 => "67", EthVersion::Eth68 => "68", EthVersion::Eth69 => "69", + EthVersion::Eth70 => "70", } } } @@ -195,7 +205,7 @@ impl Decodable for ProtocolVersion { #[cfg(test)] mod tests { - use super::{EthVersion, ParseVersionError}; + use super::EthVersion; use alloy_rlp::{Decodable, Encodable, Error as RlpError}; use bytes::BytesMut; @@ -205,7 +215,7 @@ mod tests { assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap()); assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap()); assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), EthVersion::try_from("70")); + assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap()); } #[test] @@ -214,12 +224,18 @@ mod tests { assert_eq!(EthVersion::Eth67, "67".parse().unwrap()); assert_eq!(EthVersion::Eth68, "68".parse().unwrap()); assert_eq!(EthVersion::Eth69, "69".parse().unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), "70".parse::()); + assert_eq!(EthVersion::Eth70, "70".parse().unwrap()); } #[test] fn test_eth_version_rlp_encode() { - let versions = [EthVersion::Eth66, EthVersion::Eth67, EthVersion::Eth68, EthVersion::Eth69]; + let versions = [ + EthVersion::Eth66, + EthVersion::Eth67, + EthVersion::Eth68, + EthVersion::Eth69, + EthVersion::Eth70, + ]; for version in versions { let mut encoded = BytesMut::new(); @@ -236,7 +252,7 @@ mod tests { (67_u8, Ok(EthVersion::Eth67)), (68_u8, Ok(EthVersion::Eth68)), (69_u8, Ok(EthVersion::Eth69)), - (70_u8, Err(RlpError::Custom("invalid eth version"))), + (70_u8, Ok(EthVersion::Eth70)), (65_u8, Err(RlpError::Custom("invalid eth version"))), ]; diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 9b706a02cf..9691acb439 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -418,6 +418,8 @@ mod tests { Capability::new_static("eth", 66), Capability::new_static("eth", 67), Capability::new_static("eth", 68), + Capability::new_static("eth", 69), + Capability::new_static("eth", 70), ] .into(); @@ -425,6 +427,8 @@ mod tests { assert!(capabilities.supports_eth_v66()); assert!(capabilities.supports_eth_v67()); assert!(capabilities.supports_eth_v68()); + assert!(capabilities.supports_eth_v69()); + assert!(capabilities.supports_eth_v70()); } #[test] diff --git a/crates/net/eth-wire/src/hello.rs b/crates/net/eth-wire/src/hello.rs index 40deebb631..9cad7223a0 100644 --- a/crates/net/eth-wire/src/hello.rs +++ b/crates/net/eth-wire/src/hello.rs @@ -260,10 +260,11 @@ mod tests { assert_eq!(hello_encoded.len(), hello.length()); } + //TODO: add test for eth70 here once we have fully support it #[test] - fn test_default_protocols_include_eth69() { - // ensure that the default protocol list includes Eth69 as the latest version + fn test_default_protocols_still_include_eth69() { + // ensure that older eth/69 remains advertised for compatibility let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let id = pk2id(&secret_key.public_key(SECP256K1)); let hello = HelloMessageWithProtocols::builder(id).build(); diff --git a/crates/net/nat/src/lib.rs b/crates/net/nat/src/lib.rs index edea0fb644..83b24f2ac5 100644 --- a/crates/net/nat/src/lib.rs +++ b/crates/net/nat/src/lib.rs @@ -19,7 +19,7 @@ pub use net_if::{NetInterfaceError, DEFAULT_NET_IF_NAME}; use std::{ fmt, future::{poll_fn, Future}, - net::{AddrParseError, IpAddr}, + net::{AddrParseError, IpAddr, ToSocketAddrs}, pin::Pin, str::FromStr, task::{Context, Poll}, @@ -38,7 +38,7 @@ const EXTERNAL_IP_APIS: &[&str] = &["https://ipinfo.io/ip", "https://icanhazip.com", "https://ifconfig.me"]; /// All builtin resolvers. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Default, Hash)] #[cfg_attr(feature = "serde", derive(SerializeDisplay, DeserializeFromStr))] pub enum NatResolver { /// Resolve with any available resolver. @@ -50,6 +50,14 @@ pub enum NatResolver { PublicIp, /// Use the given [`IpAddr`] ExternalIp(IpAddr), + /// Use the given domain name as the external address to expose to peers. + /// This is behaving essentially the same as [`NatResolver::ExternalIp`], but supports domain + /// names. Domain names are resolved to IP addresses using the OS's resolver. The first IP + /// address found is used. + /// This may be useful in docker bridge networks where containers are usually queried by DNS + /// instead of direct IP addresses. + /// Note: the domain shouldn't include a port number. Only the IP address is resolved. + ExternalAddr(String), /// Resolve external IP via the network interface. NetIf, /// Resolve nothing @@ -62,10 +70,17 @@ impl NatResolver { external_addr_with(self).await } - /// Returns the external ip, if it is [`NatResolver::ExternalIp`] - pub const fn as_external_ip(self) -> Option { + /// Returns the fixed ip, if it is [`NatResolver::ExternalIp`] or [`NatResolver::ExternalAddr`]. + /// + /// In the case of [`NatResolver::ExternalAddr`], it will return the first IP address found for + /// the domain. + pub fn as_external_ip(self, port: u16) -> Option { match self { Self::ExternalIp(ip) => Some(ip), + Self::ExternalAddr(domain) => format!("{domain}:{port}") + .to_socket_addrs() + .ok() + .and_then(|mut addrs| addrs.next().map(|addr| addr.ip())), _ => None, } } @@ -78,6 +93,7 @@ impl fmt::Display for NatResolver { Self::Upnp => f.write_str("upnp"), Self::PublicIp => f.write_str("publicip"), Self::ExternalIp(ip) => write!(f, "extip:{ip}"), + Self::ExternalAddr(domain) => write!(f, "extaddr:{domain}"), Self::NetIf => f.write_str("netif"), Self::None => f.write_str("none"), } @@ -106,12 +122,15 @@ impl FromStr for NatResolver { "publicip" | "public-ip" => Self::PublicIp, "netif" => Self::NetIf, s => { - let Some(ip) = s.strip_prefix("extip:") else { + if let Some(ip) = s.strip_prefix("extip:") { + Self::ExternalIp(ip.parse()?) + } else if let Some(domain) = s.strip_prefix("extaddr:") { + Self::ExternalAddr(domain.to_string()) + } else { return Err(ParseNatResolverError::UnknownVariant(format!( "Unknown Nat Resolver: {s}" - ))) - }; - Self::ExternalIp(ip.parse()?) + ))); + } } }; Ok(r) @@ -180,7 +199,7 @@ impl ResolveNatInterval { /// `None` if the attempt was unsuccessful. pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll> { if self.interval.poll_tick(cx).is_ready() { - self.future = Some(Box::pin(self.resolver.external_addr())); + self.future = Some(Box::pin(self.resolver.clone().external_addr())); } if let Some(mut fut) = self.future.take() { @@ -212,6 +231,9 @@ pub async fn external_addr_with(resolver: NatResolver) -> Option { ); }) .ok(), + NatResolver::ExternalAddr(domain) => { + domain.to_socket_addrs().ok().and_then(|mut addrs| addrs.next().map(|addr| addr.ip())) + } NatResolver::None => None, } } @@ -245,7 +267,7 @@ async fn resolve_external_ip_url(url: &str) -> Option { #[cfg(test)] mod tests { use super::*; - use std::net::Ipv4Addr; + use std::net::{Ipv4Addr, Ipv6Addr}; #[tokio::test] #[ignore] @@ -267,6 +289,18 @@ mod tests { dbg!(ip); } + #[test] + fn as_external_ip_test() { + let resolver = NatResolver::ExternalAddr("localhost".to_string()); + let ip = resolver.as_external_ip(30303).expect("localhost should be resolvable"); + + if ip.is_ipv4() { + assert_eq!(ip, IpAddr::V4(Ipv4Addr::LOCALHOST)); + } else { + assert_eq!(ip, IpAddr::V6(Ipv6Addr::LOCALHOST)); + } + } + #[test] fn test_from_str() { assert_eq!(NatResolver::Any, "any".parse().unwrap()); diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 8a5c754149..44cd07aebb 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -3,8 +3,8 @@ use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, - GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts, - Receipts69, UnifiedStatus, + GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData, + PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus, }; use reth_ethereum_forks::ForkId; use reth_network_p2p::error::{RequestError, RequestResult}; @@ -238,6 +238,15 @@ pub enum PeerRequest { /// The channel to send the response for receipts. response: oneshot::Sender>>, }, + /// Requests receipts from the peer using eth/70 (supports `firstBlockReceiptIndex`). + /// + /// The response should be sent through the channel. + GetReceipts70 { + /// The request for receipts. + request: GetReceipts70, + /// The channel to send the response for receipts. + response: oneshot::Sender>>, + }, } // === impl PeerRequest === @@ -257,6 +266,7 @@ impl PeerRequest { Self::GetNodeData { response, .. } => response.send(Err(err)).ok(), Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(), + Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(), }; } @@ -281,6 +291,9 @@ impl PeerRequest { Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => { EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) } + Self::GetReceipts70 { request, .. } => { + EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() }) + } } } diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 54902ef478..cbe93a2386 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -66,6 +66,7 @@ tracing.workspace = true rustc-hash.workspace = true thiserror.workspace = true parking_lot.workspace = true +rayon.workspace = true rand.workspace = true rand_08.workspace = true secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 047970aac0..93223fabcd 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -433,7 +433,7 @@ impl NetworkConfigBuilder { pub fn external_ip_resolver(mut self, resolver: NatResolver) -> Self { self.discovery_v4_builder .get_or_insert_with(Discv4Config::builder) - .external_ip_resolver(Some(resolver)); + .external_ip_resolver(Some(resolver.clone())); self.nat = Some(resolver); self } @@ -484,7 +484,7 @@ impl NetworkConfigBuilder { } // Disable nat - pub const fn disable_nat(mut self) -> Self { + pub fn disable_nat(mut self) -> Self { self.nat = None; self } @@ -579,7 +579,7 @@ impl NetworkConfigBuilder { } /// Sets the NAT resolver for external IP. - pub const fn add_nat(mut self, nat: Option) -> Self { + pub fn add_nat(mut self, nat: Option) -> Self { self.nat = nat; self } diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 492bf8bd55..c110c5b11b 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -10,7 +10,8 @@ use alloy_rlp::Encodable; use futures::StreamExt; use reth_eth_wire::{ BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData, - GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, + GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts, + Receipts69, Receipts70, }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::error::RequestResult; @@ -217,6 +218,69 @@ where let _ = response.send(Ok(Receipts69(receipts))); } + /// Handles partial responses for [`GetReceipts70`] queries. + /// + /// This will adhere to the soft limit but allow filling the last vec partially. + fn on_receipts70_request( + &self, + _peer_id: PeerId, + request: GetReceipts70, + response: oneshot::Sender>>, + ) { + self.metrics.eth_receipts_requests_received_total.increment(1); + + let GetReceipts70 { first_block_receipt_index, block_hashes } = request; + + let mut receipts = Vec::new(); + let mut total_bytes = 0usize; + let mut last_block_incomplete = false; + + for (idx, hash) in block_hashes.into_iter().enumerate() { + if idx >= MAX_RECEIPTS_SERVE { + break + } + + let Some(mut block_receipts) = + self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default() + else { + break + }; + + if idx == 0 && first_block_receipt_index > 0 { + let skip = first_block_receipt_index as usize; + if skip >= block_receipts.len() { + block_receipts.clear(); + } else { + block_receipts.drain(0..skip); + } + } + + let block_size = block_receipts.length(); + + if total_bytes + block_size <= SOFT_RESPONSE_LIMIT { + total_bytes += block_size; + receipts.push(block_receipts); + continue; + } + + let mut partial_block = Vec::new(); + for receipt in block_receipts { + let receipt_size = receipt.length(); + if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT { + break; + } + total_bytes += receipt_size; + partial_block.push(receipt); + } + + receipts.push(partial_block); + last_block_incomplete = true; + break; + } + + let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts })); + } + #[inline] fn get_receipts_response(&self, request: GetReceipts, transform_fn: F) -> Vec> where @@ -285,6 +349,9 @@ where IncomingEthRequest::GetReceipts69 { peer_id, request, response } => { this.on_receipts69_request(peer_id, request, response) } + IncomingEthRequest::GetReceipts70 { peer_id, request, response } => { + this.on_receipts70_request(peer_id, request, response) + } } }, ); @@ -359,4 +426,15 @@ pub enum IncomingEthRequest { /// The channel sender for the response containing Receipts69. response: oneshot::Sender>>, }, + /// Request Receipts from the peer using eth/70. + /// + /// The response should be sent through the channel. + GetReceipts70 { + /// The ID of the peer to request receipts from. + peer_id: PeerId, + /// The specific receipts requested including the `firstBlockReceiptIndex`. + request: GetReceipts70, + /// The channel sender for the response containing Receipts70. + response: oneshot::Sender>>, + }, } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index c0a2934df7..52d8757a50 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -532,6 +532,13 @@ impl NetworkManager { response, }) } + PeerRequest::GetReceipts70 { request, response } => { + self.delegate_eth_request(IncomingEthRequest::GetReceipts70 { + peer_id, + request, + response, + }) + } PeerRequest::GetPooledTransactions { request, response } => { self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions { peer_id, diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 115939b161..58df7006e1 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -3,7 +3,7 @@ //! An `RLPx` stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, -use crate::types::Receipts69; +use crate::types::{Receipts69, Receipts70}; use alloy_consensus::{BlockHeader, ReceiptWithBloom}; use alloy_primitives::{Bytes, B256}; use futures::FutureExt; @@ -116,6 +116,11 @@ pub enum PeerResponse { /// The receiver channel for the response to a receipts request. response: oneshot::Receiver>>, }, + /// Represents a response to a request for receipts using eth/70. + Receipts70 { + /// The receiver channel for the response to a receipts request. + response: oneshot::Receiver>>, + }, } // === impl PeerResponse === @@ -151,6 +156,10 @@ impl PeerResponse { Self::Receipts69 { response } => { poll_request!(response, Receipts69, cx) } + Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) { + Ok(res) => PeerResponseResult::Receipts70(res), + Err(err) => PeerResponseResult::Receipts70(Err(err.into())), + }, }; Poll::Ready(res) } @@ -171,6 +180,8 @@ pub enum PeerResponseResult { Receipts(RequestResult>>>), /// Represents a result containing receipts or an error for eth/69. Receipts69(RequestResult>>), + /// Represents a result containing receipts or an error for eth/70. + Receipts70(RequestResult>), } // === impl PeerResponseResult === @@ -208,6 +219,13 @@ impl PeerResponseResult { Self::Receipts69(resp) => { to_message!(resp, Receipts69, id) } + Self::Receipts70(resp) => match resp { + Ok(res) => { + let request = RequestPair { request_id: id, message: res }; + Ok(EthMessage::Receipts70(request)) + } + Err(err) => Err(err), + }, } } @@ -220,6 +238,7 @@ impl PeerResponseResult { Self::NodeData(res) => res.as_ref().err(), Self::Receipts(res) => res.as_ref().err(), Self::Receipts69(res) => res.as_ref().err(), + Self::Receipts70(res) => res.as_ref().err(), } } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index f804144a4e..93e14ec04e 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -237,7 +237,9 @@ impl PeersInfo for NetworkHandle { discv4.node_record() } else if let Some(discv5) = self.inner.discv5.as_ref() { // for disv5 we must check if we have an external ip configured - if let Some(external) = self.inner.nat.and_then(|nat| nat.as_external_ip()) { + if let Some(external) = + self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port())) + { NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id()) } else { // use the node record that discv5 tracks or use localhost @@ -252,9 +254,11 @@ impl PeersInfo for NetworkHandle { // also use the tcp port .with_tcp_port(self.inner.listener_address.lock().port()) } else { - let external_ip = self.inner.nat.and_then(|nat| nat.as_external_ip()); - let mut socket_addr = *self.inner.listener_address.lock(); + + let external_ip = + self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port())); + if let Some(ip) = external_ip { // if able to resolve external ip, use it instead and also set the local address socket_addr.set_ip(ip) diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 98ccbad3b9..0d405b903e 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -25,10 +25,10 @@ use futures::{stream::Fuse, SinkExt, StreamExt}; use metrics::Gauge; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError}, - message::{EthBroadcastMessage, MessageError, RequestPair}, + message::{EthBroadcastMessage, MessageError}, Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload, }; -use reth_eth_wire_types::RawCapabilityMessage; +use reth_eth_wire_types::{message::RequestPair, RawCapabilityMessage}; use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_api::PeerRequest; use reth_network_p2p::error::RequestError; @@ -270,12 +270,18 @@ impl ActiveSession { on_request!(req, Receipts, GetReceipts) } } + EthMessage::GetReceipts70(req) => { + on_request!(req, Receipts70, GetReceipts70) + } EthMessage::Receipts(resp) => { on_response!(resp, GetReceipts) } EthMessage::Receipts69(resp) => { on_response!(resp, GetReceipts69) } + EthMessage::Receipts70(resp) => { + on_response!(resp, GetReceipts70) + } EthMessage::BlockRangeUpdate(msg) => { // Validate that earliest <= latest according to the spec if msg.earliest > msg.latest { @@ -311,9 +317,9 @@ impl ActiveSession { /// Handle an internal peer request that will be sent to the remote. fn on_internal_peer_request(&mut self, request: PeerRequest, deadline: Instant) { let request_id = self.next_id(); - trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer"); - let msg = request.create_request_message(request_id); + let msg = request.create_request_message(request_id).map_versioned(self.conn.version()); + self.queued_outgoing.push_back(msg.into()); let req = InflightRequest { request: RequestState::Waiting(request), diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 45b68617b5..8d80c108f2 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1,6 +1,7 @@ //! Transactions management for the p2p network. use alloy_consensus::transaction::TxHashRef; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; /// Aggregation on configurable parameters for [`TransactionsManager`]. pub mod config; @@ -1368,51 +1369,49 @@ where // tracks the quality of the given transactions let mut has_bad_transactions = false; - // 2. filter out transactions that are invalid or already pending import pre-size to avoid - // reallocations - let mut new_txs = Vec::with_capacity(transactions.len()); - for tx in transactions { - match self.transactions_by_peers.entry(*tx.tx_hash()) { - Entry::Occupied(mut entry) => { - // transaction was already inserted - entry.get_mut().insert(peer_id); - } - Entry::Vacant(entry) => { - if self.bad_imports.contains(tx.tx_hash()) { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%tx.tx_hash(), - client_version=%peer.client_version, - "received a known bad transaction from peer" - ); - has_bad_transactions = true; - } else { - // this is a new transaction that should be imported into the pool - - // recover transaction - let tx = match tx.try_into_recovered() { - Ok(tx) => tx, - Err(badtx) => { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%badtx.tx_hash(), - client_version=%peer.client_version, - "failed ecrecovery for transaction" - ); - has_bad_transactions = true; - continue - } - }; - - let pool_transaction = Pool::Transaction::from_pooled(tx); - new_txs.push(pool_transaction); - - entry.insert(HashSet::from([peer_id])); - } - } + // Remove known and invalid transactions + transactions.retain(|tx| { + if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) { + entry.get_mut().insert(peer_id); + return false } + if self.bad_imports.contains(tx.tx_hash()) { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%tx.tx_hash(), + client_version=%peer.client_version, + "received a known bad transaction from peer" + ); + has_bad_transactions = true; + return false; + } + true + }); + + let txs_len = transactions.len(); + + let new_txs = transactions + .into_par_iter() + .filter_map(|tx| match tx.try_into_recovered() { + Ok(tx) => Some(Pool::Transaction::from_pooled(tx)), + Err(badtx) => { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%badtx.tx_hash(), + client_version=%peer.client_version, + "failed ecrecovery for transaction" + ); + None + } + }) + .collect::>(); + + has_bad_transactions |= new_txs.len() != txs_len; + + // Record the transactions as seen by the peer + for tx in &new_txs { + self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id])); } - new_txs.shrink_to_fit(); // 3. import new transactions as a batch to minimize lock contention on the underlying // pool @@ -1925,7 +1924,9 @@ impl PooledTransactionsHashesBuilder { fn new(version: EthVersion) -> Self { match version { EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()), - EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()), + EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => { + Self::Eth68(Default::default()) + } } } diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 9d40526135..00984a2739 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -485,8 +485,9 @@ where .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map()) .build()?; - // Initialize RocksDB provider with metrics and statistics enabled + // Initialize RocksDB provider with metrics, statistics, and default tables let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb()) + .with_default_tables() .with_metrics() .with_statistics() .build()?; diff --git a/crates/node/core/src/args/network.rs b/crates/node/core/src/args/network.rs index 2d8f7dc3dd..3da236f169 100644 --- a/crates/node/core/src/args/network.rs +++ b/crates/node/core/src/args/network.rs @@ -337,7 +337,7 @@ impl NetworkArgs { // Configure basic network stack NetworkConfigBuilder::::new(secret_key) - .external_ip_resolver(self.nat) + .external_ip_resolver(self.nat.clone()) .sessions_config( SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()), ) @@ -399,7 +399,7 @@ impl NetworkArgs { } /// Configures the [`NatResolver`] - pub const fn with_nat_resolver(mut self, nat: NatResolver) -> Self { + pub fn with_nat_resolver(mut self, nat: NatResolver) -> Self { self.nat = nat; self } diff --git a/crates/optimism/bin/Cargo.toml b/crates/optimism/bin/Cargo.toml index ef203df0fc..a13ab3fed9 100644 --- a/crates/optimism/bin/Cargo.toml +++ b/crates/optimism/bin/Cargo.toml @@ -27,7 +27,7 @@ tracing.workspace = true workspace = true [features] -default = ["jemalloc", "otlp", "reth-optimism-evm/portable", "js-tracer"] +default = ["jemalloc", "otlp", "reth-optimism-evm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"] otlp = ["reth-optimism-cli/otlp"] @@ -40,7 +40,9 @@ jemalloc-prof = ["reth-cli-util/jemalloc-prof"] tracy-allocator = ["reth-cli-util/tracy-allocator"] asm-keccak = ["reth-optimism-cli/asm-keccak", "reth-optimism-node/asm-keccak"] - +keccak-cache-global = [ + "reth-optimism-node/keccak-cache-global", +] dev = [ "reth-optimism-cli/dev", "reth-optimism-primitives/arbitrary", diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index 313748512b..11d617fd28 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -93,6 +93,10 @@ asm-keccak = [ "reth-node-core/asm-keccak", "revm/asm-keccak", ] +keccak-cache-global = [ + "alloy-primitives/keccak-cache-global", + "reth-optimism-node/keccak-cache-global", +] js-tracer = [ "reth-node-builder/js-tracer", "reth-optimism-node/js-tracer", diff --git a/crates/optimism/reth/Cargo.toml b/crates/optimism/reth/Cargo.toml index d0e30f9331..e62f396d7f 100644 --- a/crates/optimism/reth/Cargo.toml +++ b/crates/optimism/reth/Cargo.toml @@ -74,7 +74,9 @@ arbitrary = [ "reth-eth-wire?/arbitrary", "reth-codecs?/arbitrary", ] - +keccak-cache-global = [ + "reth-optimism-node?/keccak-cache-global", +] test-utils = [ "reth-chainspec/test-utils", "reth-consensus?/test-utils", diff --git a/crates/primitives-traits/src/block/recovered.rs b/crates/primitives-traits/src/block/recovered.rs index 9c5e42bbde..983bff697f 100644 --- a/crates/primitives-traits/src/block/recovered.rs +++ b/crates/primitives-traits/src/block/recovered.rs @@ -218,7 +218,7 @@ impl RecoveredBlock { /// A safer variant of [`Self::new_sealed`] that checks if the number of senders is equal to /// the number of transactions in the block and recovers the senders from the transactions, if - /// not using [`SignedTransaction::recover_signer_unchecked`](crate::transaction::signed::SignedTransaction) + /// not using [`SignedTransaction::recover_signer`](crate::transaction::signed::SignedTransaction) /// to recover the senders. /// /// Returns an error if any of the transactions fail to recover the sender. diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index cf7a232b44..b905f1dc7d 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -496,7 +496,7 @@ where /// Returns the most recent version of the payload that is available in the corresponding /// payload build process at the time of receiving this call. /// - /// See also + /// See also /// /// Note: /// > Provider software MAY stop the corresponding build process after serving this call. @@ -1013,7 +1013,7 @@ where Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?) } - /// Handler for `engine_forkchoiceUpdatedV2` + /// Handler for `engine_forkchoiceUpdatedV3` /// /// See also async fn fork_choice_updated_v3( diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 9732ac4269..0ead02a537 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -175,8 +175,6 @@ where // need to apply the state changes of this call before executing the // next call if calls.peek().is_some() { - // need to apply the state changes of this call before executing - // the next call db.commit(res.state) } } diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index ce50175fce..9e976b057a 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -9,14 +9,19 @@ use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut}, StaticFileProviderFactory, }; -use alloy_primitives::{map::HashMap, Address, BlockNumber, TxNumber}; +use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber}; use reth_db::{ cursor::DbCursorRO, static_file::TransactionSenderMask, table::Value, transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut}, }; -use reth_db_api::{cursor::DbCursorRW, tables}; +use reth_db_api::{ + cursor::DbCursorRW, + models::{storage_sharded_key::StorageShardedKey, ShardedKey}, + tables, + tables::BlockNumberList, +}; use reth_errors::ProviderError; use reth_node_types::NodePrimitives; use reth_primitives_traits::ReceiptTy; @@ -294,6 +299,108 @@ where } } +impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> +where + CURSOR: DbCursorRW + DbCursorRO, +{ + /// Puts a transaction hash number mapping. + pub fn put_transaction_hash_number( + &mut self, + hash: TxHash, + tx_num: TxNumber, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(hash, &tx_num), + } + } + + /// Deletes a transaction hash number mapping. + pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> { + match self { + Self::Database(cursor) => { + if cursor.seek_exact(hash)?.is_some() { + cursor.delete_current()?; + } + Ok(()) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.delete::(hash), + } + } +} + +impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> +where + CURSOR: DbCursorRW + DbCursorRO, +{ + /// Puts a storage history entry. + pub fn put_storage_history( + &mut self, + key: StorageShardedKey, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Deletes a storage history entry. + pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> { + match self { + Self::Database(cursor) => { + if cursor.seek_exact(key)?.is_some() { + cursor.delete_current()?; + } + Ok(()) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.delete::(key), + } + } +} + +impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> +where + CURSOR: DbCursorRW + DbCursorRO, +{ + /// Puts an account history entry. + pub fn put_account_history( + &mut self, + key: ShardedKey

, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Deletes an account history entry. + pub fn delete_account_history(&mut self, key: ShardedKey
) -> ProviderResult<()> { + match self { + Self::Database(cursor) => { + if cursor.seek_exact(key)?.is_some() { + cursor.delete_current()?; + } + Ok(()) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.delete::(key), + } + } +} + /// Represents a source for reading data, either from database, static files, or `RocksDB`. #[derive(Debug, Display)] pub enum EitherReader<'a, CURSOR, N> { @@ -418,6 +525,60 @@ where } } +impl EitherReader<'_, CURSOR, N> +where + CURSOR: DbCursorRO, +{ + /// Gets a transaction number by its hash. + pub fn get_transaction_hash_number( + &mut self, + hash: TxHash, + ) -> ProviderResult> { + match self { + Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)), + Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(tx) => tx.get::(hash), + } + } +} + +impl EitherReader<'_, CURSOR, N> +where + CURSOR: DbCursorRO, +{ + /// Gets a storage history entry. + pub fn get_storage_history( + &mut self, + key: StorageShardedKey, + ) -> ProviderResult> { + match self { + Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)), + Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(tx) => tx.get::(key), + } + } +} + +impl EitherReader<'_, CURSOR, N> +where + CURSOR: DbCursorRO, +{ + /// Gets an account history entry. + pub fn get_account_history( + &mut self, + key: ShardedKey
, + ) -> ProviderResult> { + match self { + Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)), + Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(tx) => tx.get::(key), + } + } +} + /// Destination for writing data. #[derive(Debug, EnumIs)] pub enum EitherWriterDestination { @@ -499,3 +660,160 @@ mod tests { } } } + +#[cfg(all(test, unix, feature = "rocksdb"))] +mod rocksdb_tests { + use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider}; + use alloy_primitives::{Address, B256}; + use reth_db_api::{ + models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey}, + tables, + }; + use tempfile::TempDir; + + fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .with_table::() + .with_table::() + .build() + .unwrap(); + (temp_dir, provider) + } + + #[test] + fn test_rocksdb_batch_transaction_hash_numbers() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let hash1 = B256::from([1u8; 32]); + let hash2 = B256::from([2u8; 32]); + let tx_num1 = 100u64; + let tx_num2 = 200u64; + + // Write via RocksDBBatch (same as EitherWriter::RocksDB would use internally) + let mut batch = provider.batch(); + batch.put::(hash1, &tx_num1).unwrap(); + batch.put::(hash2, &tx_num2).unwrap(); + batch.commit().unwrap(); + + // Read via RocksTx (same as EitherReader::RocksDB would use internally) + let tx = provider.tx(); + assert_eq!(tx.get::(hash1).unwrap(), Some(tx_num1)); + assert_eq!(tx.get::(hash2).unwrap(), Some(tx_num2)); + + // Test missing key + let missing_hash = B256::from([99u8; 32]); + assert_eq!(tx.get::(missing_hash).unwrap(), None); + } + + #[test] + fn test_rocksdb_batch_storage_history() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let address = Address::random(); + let storage_key = B256::from([1u8; 32]); + let key = StorageShardedKey::new(address, storage_key, 1000); + let value = IntegerList::new([1, 5, 10, 50]).unwrap(); + + // Write via RocksDBBatch + let mut batch = provider.batch(); + batch.put::(key.clone(), &value).unwrap(); + batch.commit().unwrap(); + + // Read via RocksTx + let tx = provider.tx(); + let result = tx.get::(key).unwrap(); + assert_eq!(result, Some(value)); + + // Test missing key + let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0); + assert_eq!(tx.get::(missing_key).unwrap(), None); + } + + #[test] + fn test_rocksdb_batch_account_history() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let address = Address::random(); + let key = ShardedKey::new(address, 1000); + let value = IntegerList::new([1, 10, 100, 500]).unwrap(); + + // Write via RocksDBBatch + let mut batch = provider.batch(); + batch.put::(key.clone(), &value).unwrap(); + batch.commit().unwrap(); + + // Read via RocksTx + let tx = provider.tx(); + let result = tx.get::(key).unwrap(); + assert_eq!(result, Some(value)); + + // Test missing key + let missing_key = ShardedKey::new(Address::random(), 0); + assert_eq!(tx.get::(missing_key).unwrap(), None); + } + + #[test] + fn test_rocksdb_batch_delete_transaction_hash_number() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let hash = B256::from([1u8; 32]); + let tx_num = 100u64; + + // First write + provider.put::(hash, &tx_num).unwrap(); + assert_eq!(provider.get::(hash).unwrap(), Some(tx_num)); + + // Delete via RocksDBBatch + let mut batch = provider.batch(); + batch.delete::(hash).unwrap(); + batch.commit().unwrap(); + + // Verify deletion + assert_eq!(provider.get::(hash).unwrap(), None); + } + + #[test] + fn test_rocksdb_batch_delete_storage_history() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let address = Address::random(); + let storage_key = B256::from([1u8; 32]); + let key = StorageShardedKey::new(address, storage_key, 1000); + let value = IntegerList::new([1, 5, 10]).unwrap(); + + // First write + provider.put::(key.clone(), &value).unwrap(); + assert!(provider.get::(key.clone()).unwrap().is_some()); + + // Delete via RocksDBBatch + let mut batch = provider.batch(); + batch.delete::(key.clone()).unwrap(); + batch.commit().unwrap(); + + // Verify deletion + assert_eq!(provider.get::(key).unwrap(), None); + } + + #[test] + fn test_rocksdb_batch_delete_account_history() { + let (_temp_dir, provider) = create_rocksdb_provider(); + + let address = Address::random(); + let key = ShardedKey::new(address, 1000); + let value = IntegerList::new([1, 10, 100]).unwrap(); + + // First write + provider.put::(key.clone(), &value).unwrap(); + assert!(provider.get::(key.clone()).unwrap().is_some()); + + // Delete via RocksDBBatch + let mut batch = provider.batch(); + batch.delete::(key.clone()).unwrap(); + batch.commit().unwrap(); + + // Verify deletion + assert_eq!(provider.get::(key).unwrap(), None); + } +} diff --git a/crates/storage/provider/src/providers/database/builder.rs b/crates/storage/provider/src/providers/database/builder.rs index 4b1b55b4d4..8c4816bb6e 100644 --- a/crates/storage/provider/src/providers/database/builder.rs +++ b/crates/storage/provider/src/providers/database/builder.rs @@ -109,7 +109,7 @@ impl ProviderFactoryBuilder { self.db(Arc::new(open_db_read_only(db_dir, db_args)?)) .chainspec(chainspec) .static_file(StaticFileProvider::read_only(static_files_dir, watch_static_files)?) - .rocksdb_provider(RocksDBProvider::builder(&rocksdb_dir).build()?) + .rocksdb_provider(RocksDBProvider::builder(&rocksdb_dir).with_default_tables().build()?) .build_provider_factory() .map_err(Into::into) } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 975772f808..7c34f13270 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1,7 +1,7 @@ use super::metrics::{RocksDBMetrics, RocksDBOperation}; use reth_db_api::{ table::{Compress, Decompress, Encode, Table}, - DatabaseError, + tables, DatabaseError, }; use reth_storage_errors::{ db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel}, @@ -143,6 +143,18 @@ impl RocksDBBuilder { self } + /// Registers the default tables used by reth for `RocksDB` storage. + /// + /// This registers: + /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping + /// - [`tables::AccountsHistory`] - Account history index + /// - [`tables::StoragesHistory`] - Storage history index + pub fn with_default_tables(self) -> Self { + self.with_table::() + .with_table::() + .with_table::() + } + /// Enables metrics. pub const fn with_metrics(mut self) -> Self { self.enable_metrics = true; @@ -630,10 +642,38 @@ const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel { #[cfg(test)] mod tests { use super::*; - use alloy_primitives::{TxHash, B256}; - use reth_db_api::{table::Table, tables}; + use alloy_primitives::{Address, TxHash, B256}; + use reth_db_api::{ + models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList}, + table::Table, + tables, + }; use tempfile::TempDir; + #[test] + fn test_with_default_tables_registers_required_column_families() { + let temp_dir = TempDir::new().unwrap(); + + // Build with default tables + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + // Should be able to write/read TransactionHashNumbers + let tx_hash = TxHash::from(B256::from([1u8; 32])); + provider.put::(tx_hash, &100).unwrap(); + assert_eq!(provider.get::(tx_hash).unwrap(), Some(100)); + + // Should be able to write/read AccountsHistory + let key = ShardedKey::new(Address::ZERO, 100); + let value = IntegerList::default(); + provider.put::(key.clone(), &value).unwrap(); + assert!(provider.get::(key).unwrap().is_some()); + + // Should be able to write/read StoragesHistory + let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100); + provider.put::(key.clone(), &value).unwrap(); + assert!(provider.get::(key).unwrap().is_some()); + } + #[derive(Debug)] struct TestTable; diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 17fd3d2507..32b79c1880 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -119,6 +119,11 @@ impl RocksDBBuilder { self } + /// Registers the default tables used by reth for `RocksDB` storage (stub implementation). + pub const fn with_default_tables(self) -> Self { + self + } + /// Enables metrics (stub implementation). pub const fn with_metrics(self) -> Self { self diff --git a/crates/trie/common/benches/prefix_set.rs b/crates/trie/common/benches/prefix_set.rs index bc2a8dc259..b5703e1941 100644 --- a/crates/trie/common/benches/prefix_set.rs +++ b/crates/trie/common/benches/prefix_set.rs @@ -76,12 +76,6 @@ pub fn prefix_set_lookups(c: &mut Criterion) { test_data.clone(), size, ); - prefix_set_bench::( - &mut group, - "`Vec` with binary search lookup", - test_data.clone(), - size, - ); } } @@ -207,43 +201,6 @@ mod implementations { false } } - - #[derive(Default)] - pub struct VecBinarySearchPrefixSet { - keys: Vec, - sorted: bool, - } - - impl PrefixSetMutAbstraction for VecBinarySearchPrefixSet { - type Frozen = Self; - - fn insert(&mut self, key: Nibbles) { - self.sorted = false; - self.keys.push(key); - } - - fn freeze(self) -> Self::Frozen { - self - } - } - - impl PrefixSetAbstraction for VecBinarySearchPrefixSet { - fn contains(&mut self, prefix: Nibbles) -> bool { - if !self.sorted { - self.keys.sort(); - self.sorted = true; - } - - match self.keys.binary_search(&prefix) { - Ok(_) => true, - Err(idx) => match self.keys.get(idx) { - Some(key) => key.starts_with(&prefix), - None => false, // prefix > last key - }, - } - } - } - #[derive(Default)] pub struct VecCursorPrefixSet { keys: Vec, diff --git a/crates/trie/sparse-parallel/src/trie.rs b/crates/trie/sparse-parallel/src/trie.rs index 5911a4563b..3ccc5aad1a 100644 --- a/crates/trie/sparse-parallel/src/trie.rs +++ b/crates/trie/sparse-parallel/src/trie.rs @@ -18,10 +18,7 @@ use reth_trie_sparse::{ SparseTrieUpdates, }; use smallvec::SmallVec; -use std::{ - cmp::{Ord, Ordering, PartialOrd}, - sync::mpsc, -}; +use std::cmp::{Ord, Ordering, PartialOrd}; use tracing::{debug, instrument, trace}; /// The maximum length of a path, in nibbles, which belongs to the upper subtrie of a @@ -265,11 +262,9 @@ impl SparseTrieInterface for ParallelSparseTrie { }) .collect(); - let (tx, rx) = mpsc::channel(); - // Zip the lower subtries and their corresponding node groups, and reveal lower subtrie // nodes in parallel - lower_subtries + let results: Vec<_> = lower_subtries .into_par_iter() .zip(node_groups.into_par_iter()) .map(|((subtrie_idx, mut subtrie), nodes)| { @@ -286,16 +281,12 @@ impl SparseTrieInterface for ParallelSparseTrie { } (subtrie_idx, subtrie, Ok(())) }) - .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap()); + .collect(); - drop(tx); - - // Take back all lower subtries which were sent to the rayon pool, collecting the last - // seen error in the process and returning that. If we don't fully drain the channel - // then we lose lower sparse tries, putting the whole ParallelSparseTrie in an - // inconsistent state. + // Put subtries back which were processed in the rayon pool, collecting the last + // seen error in the process and returning that. let mut any_err = Ok(()); - for (subtrie_idx, subtrie, res) in rx { + for (subtrie_idx, subtrie, res) in results { self.lower_subtries[subtrie_idx] = LowerSparseSubtrie::Revealed(subtrie); if res.is_err() { any_err = res; @@ -745,11 +736,9 @@ impl SparseTrieInterface for ParallelSparseTrie { { use rayon::iter::{IntoParallelIterator, ParallelIterator}; - let (tx, rx) = mpsc::channel(); - let branch_node_tree_masks = &self.branch_node_tree_masks; let branch_node_hash_masks = &self.branch_node_hash_masks; - changed_subtries + let updated_subtries: Vec<_> = changed_subtries .into_par_iter() .map(|mut changed_subtrie| { #[cfg(feature = "metrics")] @@ -764,10 +753,9 @@ impl SparseTrieInterface for ParallelSparseTrie { self.metrics.subtrie_hash_update_latency.record(start.elapsed()); changed_subtrie }) - .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap()); + .collect(); - drop(tx); - self.insert_changed_subtries(rx); + self.insert_changed_subtries(updated_subtries); } } diff --git a/docs/crates/eth-wire.md b/docs/crates/eth-wire.md index cf62ab143e..3915e9241c 100644 --- a/docs/crates/eth-wire.md +++ b/docs/crates/eth-wire.md @@ -413,3 +413,6 @@ additional "satellite" protocols (e.g. `snap`) using negotiated `SharedCapabilit - Starting with ETH69: - `BlockRangeUpdate (0x11)` announces the historical block range served. - Receipts omit bloom: encoded as `Receipts69` instead of `Receipts`. +- Starting with ETH70 (EIP-7975): + - Status reuses the ETH69 format (no additional block range fields). + - Receipts continue to omit bloom; `GetReceipts`/`Receipts` add the eth/70 variants to support partial receipt ranges (`firstBlockReceiptIndex` and `lastBlockIncomplete`). diff --git a/docs/vocs/docs/pages/cli/op-reth.md b/docs/vocs/docs/pages/cli/op-reth.md index 2b56fa662c..27738005fb 100644 --- a/docs/vocs/docs/pages/cli/op-reth.md +++ b/docs/vocs/docs/pages/cli/op-reth.md @@ -78,7 +78,7 @@ Logging: Possible values: - always: Colors on - - auto: Colors on + - auto: Auto-detect - never: Colors off Display: @@ -93,4 +93,4 @@ Display: -q, --quiet Silence all log output -``` \ No newline at end of file +``` diff --git a/docs/vocs/docs/pages/cli/reth/debug.mdx b/docs/vocs/docs/pages/cli/reth/debug.mdx index f56a60aa94..c0e2b92975 100644 --- a/docs/vocs/docs/pages/cli/reth/debug.mdx +++ b/docs/vocs/docs/pages/cli/reth/debug.mdx @@ -78,7 +78,7 @@ Logging: Possible values: - always: Colors on - - auto: Colors on + - auto: Auto-detect - never: Colors off Display: @@ -93,4 +93,4 @@ Display: -q, --quiet Silence all log output -``` \ No newline at end of file +``` diff --git a/docs/vocs/docs/pages/cli/reth/recover.mdx b/docs/vocs/docs/pages/cli/reth/recover.mdx index 880b8482d0..3f493a2d0b 100644 --- a/docs/vocs/docs/pages/cli/reth/recover.mdx +++ b/docs/vocs/docs/pages/cli/reth/recover.mdx @@ -80,7 +80,7 @@ Logging: Possible values: - always: Colors on - - auto: Colors on + - auto: Auto-detect - never: Colors off [default: always] @@ -97,4 +97,4 @@ Display: -q, --quiet Silence all log output -``` \ No newline at end of file +``` diff --git a/docs/vocs/docs/pages/cli/reth/recover/storage-tries.mdx b/docs/vocs/docs/pages/cli/reth/recover/storage-tries.mdx index 701dd39368..e17e397011 100644 --- a/docs/vocs/docs/pages/cli/reth/recover/storage-tries.mdx +++ b/docs/vocs/docs/pages/cli/reth/recover/storage-tries.mdx @@ -134,7 +134,7 @@ Logging: Possible values: - always: Colors on - - auto: Colors on + - auto: Auto-detect - never: Colors off [default: always] @@ -151,4 +151,4 @@ Display: -q, --quiet Silence all log output -``` \ No newline at end of file +``` diff --git a/docs/vocs/docs/pages/cli/reth/test-vectors/tables.mdx b/docs/vocs/docs/pages/cli/reth/test-vectors/tables.mdx index 2a3023817b..d5623133bf 100644 --- a/docs/vocs/docs/pages/cli/reth/test-vectors/tables.mdx +++ b/docs/vocs/docs/pages/cli/reth/test-vectors/tables.mdx @@ -95,7 +95,7 @@ Logging: Possible values: - always: Colors on - - auto: Colors on + - auto: Auto-detect - never: Colors off Display: @@ -110,4 +110,4 @@ Display: -q, --quiet Silence all log output -``` \ No newline at end of file +``` diff --git a/docs/vocs/docs/pages/exex/hello-world.mdx b/docs/vocs/docs/pages/exex/hello-world.mdx index 30eac91ee9..25263ec0ea 100644 --- a/docs/vocs/docs/pages/exex/hello-world.mdx +++ b/docs/vocs/docs/pages/exex/hello-world.mdx @@ -73,7 +73,7 @@ Now, let's extend our simplest ExEx and start actually listening to new notifica Woah, there's a lot of new stuff here! Let's go through it step by step: -- First, we've added a `while let Some(notification) = ctx.notifications.recv().await` loop that waits for new notifications to come in. +- First, we've added a `while let Some(notification) = ctx.notifications.try_next().await?` loop that waits for new notifications to come in. - The main node is responsible for sending notifications to the ExEx, so we're waiting for them to come in. - Next, we've added a `match ¬ification { ... }` block that matches on the type of the notification. - In each case, we're logging the notification and the corresponding block range, be it a chain commit, revert, or reorg. diff --git a/docs/vocs/docs/pages/run/opstack.mdx b/docs/vocs/docs/pages/run/opstack.mdx index bfa4446685..3d929c3f94 100644 --- a/docs/vocs/docs/pages/run/opstack.mdx +++ b/docs/vocs/docs/pages/run/opstack.mdx @@ -60,6 +60,15 @@ op-reth supports additional OP Stack specific CLI arguments: 1. `--rollup.sequencer ` - The sequencer endpoint to connect to. Transactions sent to the `op-reth` EL are also forwarded to this sequencer endpoint for inclusion, as the sequencer is the entity that builds blocks on OP Stack chains. Aliases: `--rollup.sequencer-http`, `--rollup.sequencer-ws`. 1. `--rollup.disable-tx-pool-gossip` - Disables gossiping of transactions in the mempool to peers. This can be omitted for personal nodes, though providers should always opt to enable this flag. 1. `--rollup.discovery.v4` - Enables the discovery v4 protocol for peer discovery. By default, op-reth, similar to op-geth, has discovery v5 enabled and discovery v4 disabled, whereas regular reth has discovery v4 enabled and discovery v5 disabled. +1. `--rollup.compute-pending-block` - Enables computing of the pending block from the tx-pool instead of using the latest block. By default the pending block equals the latest block to save resources and not leak txs from the tx-pool. +1. `--rollup.enable-tx-conditional` - Enable transaction conditional support on sequencer. +1. `--rollup.supervisor-http ` - HTTP endpoint for the interop supervisor. +1. `--rollup.supervisor-safety-level ` - Safety level for the supervisor (default: `CrossUnsafe`). +1. `--rollup.sequencer-headers ` - Optional headers to use when connecting to the sequencer. Requires `--rollup.sequencer`. +1. `--rollup.historicalrpc ` - RPC endpoint for historical data. Alias: `--rollup.historical-rpc`. +1. `--min-suggested-priority-fee ` - Minimum suggested priority fee (tip) in wei (default: `1000000`). +1. `--flashblocks-url ` - A URL pointing to a secure websocket subscription that streams out flashblocks. If given, the flashblocks are received to build pending block. +1. `--flashblock-consensus` - Enable flashblock consensus client to drive the chain forward. Requires `--flashblocks-url`. First, ensure that your L1 archival node is running and synced to tip. Also make sure that the beacon node / consensus layer client is running and has http APIs enabled. Then, start `op-reth` with the `--rollup.sequencer` flag set to the `Base Mainnet` sequencer endpoint: diff --git a/docs/vocs/docs/pages/sdk/node-components.mdx b/docs/vocs/docs/pages/sdk/node-components.mdx index d569d499dd..f53310698b 100644 --- a/docs/vocs/docs/pages/sdk/node-components.mdx +++ b/docs/vocs/docs/pages/sdk/node-components.mdx @@ -53,44 +53,56 @@ Provides external API access to the node: ## Component Customization -Each component can be customized through Reth's builder pattern: +Each component can be customized through Reth's builder pattern. You can replace individual components +while keeping the defaults for others using `EthereumNode::components()`: ```rust -use reth_ethereum::node::{EthereumNode, NodeBuilder}; +use reth_ethereum::{ + cli::interface::Cli, + node::{ + node::EthereumAddOns, + EthereumNode, + }, +}; -let node = NodeBuilder::new(config) - .with_types::() - .with_components(|ctx| { - // Use the ComponentBuilder to customize components - ctx.components_builder() - // Custom network configuration - .network(|network_builder| { - network_builder - .peer_manager(custom_peer_manager) - .build() - }) - // Custom transaction pool - .pool(|pool_builder| { - pool_builder - .validator(custom_validator) - .ordering(custom_ordering) - .build() - }) - // Custom consensus - .consensus(custom_consensus) - // Custom EVM configuration - .evm(|evm_builder| { - evm_builder - .with_precompiles(custom_precompiles) - .build() - }) - // Build all components - .build() - }) - .build() - .await?; +fn main() { + Cli::parse_args() + .run(|builder, _| async move { + let handle = builder + // Use the default ethereum node types + .with_types::() + // Configure the components of the node + // Use default ethereum components but replace specific ones + .with_components( + EthereumNode::components() + // Custom transaction pool + .pool(CustomPoolBuilder::default()) + // Other customizable components: + // .network(CustomNetworkBuilder::default()) + // .executor(CustomExecutorBuilder::default()) + // .consensus(CustomConsensusBuilder::default()) + // .payload(CustomPayloadBuilder::default()) + ) + .with_add_ons(EthereumAddOns::default()) + .launch() + .await?; + + handle.wait_for_node_exit().await + }) + .unwrap(); +} ``` +Custom component builders must implement their respective traits (`PoolBuilder`, `NetworkBuilder`, +`ExecutorBuilder`, `ConsensusBuilder`, `PayloadServiceBuilder`). Each trait requires implementing +an async `build_*` method that receives a `BuilderContext` with access to node configuration, +providers, and task executors. + +For complete working examples with full trait implementations, see: +- [custom-node-components](https://github.com/paradigmxyz/reth/tree/main/examples/custom-node-components) - Custom transaction pool +- [custom-payload-builder](https://github.com/paradigmxyz/reth/tree/main/examples/custom-payload-builder) - Custom payload builder +- [custom-evm](https://github.com/paradigmxyz/reth/tree/main/examples/custom-evm) - Custom EVM configuration + ## Component Lifecycle Components follow a specific lifecycle starting from node builder initialization to shutdown: diff --git a/examples/network-proxy/src/main.rs b/examples/network-proxy/src/main.rs index 51ba8e2b4a..50ef9e4e72 100644 --- a/examples/network-proxy/src/main.rs +++ b/examples/network-proxy/src/main.rs @@ -82,6 +82,7 @@ async fn main() -> eyre::Result<()> { IncomingEthRequest::GetNodeData { .. } => {} IncomingEthRequest::GetReceipts { .. } => {} IncomingEthRequest::GetReceipts69 { .. } => {} + IncomingEthRequest::GetReceipts70 { .. } => {} } } transaction_message = transactions_rx.recv() => {