diff --git a/Cargo.lock b/Cargo.lock index 928af39457..e81919d40a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6354,8 +6354,7 @@ dependencies = [ [[package]] name = "op-revm" version = "14.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1475a779c73999fc803778524042319691b31f3d6699d2b560c4ed8be1db802a" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "auto_impl", "revm", @@ -8247,6 +8246,7 @@ dependencies = [ "metrics", "metrics-util", "mini-moka", + "moka", "parking_lot", "proptest", "rand 0.8.5", @@ -10583,6 +10583,7 @@ dependencies = [ "reth-stages-api", "reth-static-file", "reth-static-file-types", + "reth-storage-api", "reth-storage-errors", "reth-testing-utils", "reth-trie", @@ -11088,8 +11089,7 @@ dependencies = [ [[package]] name = "revm" version = "33.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c85ed0028f043f87b3c88d4a4cb6f0a76440085523b6a8afe5ff003cf418054" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "revm-bytecode", "revm-context", @@ -11107,8 +11107,7 @@ dependencies = [ [[package]] name = "revm-bytecode" version = "7.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2c6b5e6e8dd1e28a4a60e5f46615d4ef0809111c9e63208e55b5c7058200fb0" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "bitvec", "phf", @@ -11119,8 +11118,7 @@ dependencies = [ [[package]] name = "revm-context" version = "12.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f038f0c9c723393ac897a5df9140b21cfa98f5753a2cb7d0f28fa430c4118abf" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "bitvec", "cfg-if", @@ -11136,8 +11134,7 @@ dependencies = [ [[package]] name = "revm-context-interface" version = "13.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431c9a14e4ef1be41ae503708fd02d974f80ef1f2b6b23b5e402e8d854d1b225" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -11152,8 +11149,7 @@ dependencies = [ [[package]] name = "revm-database" version = "9.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "980d8d6bba78c5dd35b83abbb6585b0b902eb25ea4448ed7bfba6283b0337191" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "alloy-eips", "revm-bytecode", @@ -11166,8 +11162,7 @@ dependencies = [ [[package]] name = "revm-database-interface" version = "8.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cce03e3780287b07abe58faf4a7f5d8be7e81321f93ccf3343c8f7755602bae" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "auto_impl", "either", @@ -11179,8 +11174,7 @@ dependencies = [ [[package]] name = "revm-handler" version = "14.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d44f8f6dbeec3fecf9fe55f78ef0a758bdd92ea46cd4f1ca6e2a946b32c367f3" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "auto_impl", "derive-where", @@ -11198,8 +11192,7 @@ dependencies = [ [[package]] name = "revm-inspector" version = "14.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5617e49216ce1ca6c8826bcead0386bc84f49359ef67cde6d189961735659f93" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "auto_impl", "either", @@ -11236,8 +11229,7 @@ dependencies = [ [[package]] name = "revm-interpreter" version = "31.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26ec36405f7477b9dccdc6caa3be19adf5662a7a0dffa6270cdb13a090c077e5" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "revm-bytecode", "revm-context-interface", @@ -11249,8 +11241,7 @@ dependencies = [ [[package]] name = "revm-precompile" version = "31.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a62958af953cc4043e93b5be9b8497df84cc3bd612b865c49a7a7dfa26a84e2" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "ark-bls12-381", "ark-bn254", @@ -11274,8 +11265,7 @@ dependencies = [ [[package]] name = "revm-primitives" version = "21.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e161db429d465c09ba9cbff0df49e31049fe6b549e28eb0b7bd642fcbd4412" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "alloy-primitives", "num_enum", @@ -11286,8 +11276,7 @@ dependencies = [ [[package]] name = "revm-state" version = "8.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d8be953b7e374dbdea0773cf360debed8df394ea8d82a8b240a6b5da37592fc" +source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466" dependencies = [ "bitflags 2.10.0", "revm-bytecode", diff --git a/Cargo.toml b/Cargo.toml index 9995b61338..776fad765a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -587,6 +587,7 @@ url = { version = "2.3", default-features = false } zstd = "0.13" byteorder = "1" mini-moka = "0.10" +moka = "0.12" tar-no-std = { version = "0.3.2", default-features = false } miniz_oxide = { version = "0.8.4", default-features = false } chrono = "0.4.41" @@ -785,3 +786,13 @@ ipnet = "2.11" # alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" } # alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" } + +[patch.crates-io] +revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-bytecode = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-database = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-state = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-primitives = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-interpreter = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +revm-database-interface = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } +op-revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 1bac9af40a..6baeb8679b 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -103,6 +103,7 @@ asm-keccak = [ "reth-node-ethereum/asm-keccak", ] keccak-cache-global = [ + "reth-node-core/keccak-cache-global", "reth-node-ethereum/keccak-cache-global", ] jemalloc = [ diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 2d89ed59ef..77c962f085 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -103,7 +103,7 @@ impl EnvironmentArgs { } info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage"); - let genesis_block_number = self.chain.genesis().number.unwrap(); + let genesis_block_number = self.chain.genesis().number.unwrap_or_default(); let (db, sfp) = match access { AccessRights::RW => ( Arc::new(init_db(db_path, self.db.database_args())?), diff --git a/crates/cli/commands/src/p2p/mod.rs b/crates/cli/commands/src/p2p/mod.rs index 31fa408db6..31d017ba92 100644 --- a/crates/cli/commands/src/p2p/mod.rs +++ b/crates/cli/commands/src/p2p/mod.rs @@ -72,7 +72,7 @@ impl .split(); if result.len() != 1 { eyre::bail!( - "Invalid number of headers received. Expected: 1. Received: {}", + "Invalid number of bodies received. Expected: 1. Received: {}", result.len() ) } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 0380f9de6d..e7f3c46911 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -53,6 +53,7 @@ futures.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] } mini-moka = { workspace = true, features = ["sync"] } +moka = { workspace = true, features = ["sync"] } smallvec.workspace = true # metrics diff --git a/crates/engine/tree/docs/root.md b/crates/engine/tree/docs/root.md index a5b9bcb1d4..a76d09b484 100644 --- a/crates/engine/tree/docs/root.md +++ b/crates/engine/tree/docs/root.md @@ -128,12 +128,12 @@ we send them along with the state updates to the [Sparse Trie Task](#sparse-trie ### Finishing the calculation -Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishStateUpdates` message +Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishedStateUpdates` message to the State Root Task, marking the end of receiving state updates. Every time we receive a new proof from the [MultiProof Manager](#multiproof-manager), we also check the following conditions: -1. Are all updates received? (`StateRootMessage::FinishStateUpdates` was sent) +1. Are all updates received? (`StateRootMessage::FinishedStateUpdates` was sent) 2. Is `ProofSequencer` empty? (no proofs are pending for sequencing) 3. Are all proofs that were sent to the [`MultiProofManager::spawn_or_queue`](#multiproof-manager) finished calculating and were sent to the [Sparse Trie Task](#sparse-trie-task)? diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 12482b1a16..a58189591f 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,5 +1,4 @@ use crate::metrics::PersistenceMetrics; -use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; @@ -142,27 +141,23 @@ where &self, blocks: Vec>, ) -> Result, PersistenceError> { - let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash()); - let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash()); - debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks"); + let first_block = blocks.first().map(|b| b.recovered_block.num_hash()); + let last_block = blocks.last().map(|b| b.recovered_block.num_hash()); + debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saving range of blocks"); let start_time = Instant::now(); - let last_block_hash_num = blocks.last().map(|block| BlockNumHash { - hash: block.recovered_block().hash(), - number: block.recovered_block().header().number(), - }); - if last_block_hash_num.is_some() { + if last_block.is_some() { let provider_rw = self.provider.database_provider_rw()?; provider_rw.save_blocks(blocks)?; provider_rw.commit()?; } - debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks"); + debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks"); self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); - Ok(last_block_hash_num) + Ok(last_block) } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 687c1bfb93..d89dc4a6b9 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -931,48 +931,6 @@ where Ok(()) } - /// Determines if the given block is part of a fork by checking that these - /// conditions are true: - /// * walking back from the target hash to verify that the target hash is not part of an - /// extension of the canonical chain. - /// * walking back from the current head to verify that the target hash is not already part of - /// the canonical chain. - /// - /// The header is required as an arg, because we might be checking that the header is a fork - /// block before it's in the tree state and before it's in the database. - fn is_fork(&self, target: BlockWithParent) -> ProviderResult { - let target_hash = target.block.hash; - // verify that the given hash is not part of an extension of the canon chain. - let canonical_head = self.state.tree_state.canonical_head(); - let mut current_hash; - let mut current_block = target; - loop { - if current_block.block.hash == canonical_head.hash { - return Ok(false) - } - // We already passed the canonical head - if current_block.block.number <= canonical_head.number { - break - } - current_hash = current_block.parent; - - let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break }; - current_block = next_block.block_with_parent(); - } - - // verify that the given hash is not already part of canonical chain stored in memory - if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() { - return Ok(false) - } - - // verify that the given hash is not already part of persisted canonical chain - if self.provider.block_number(target_hash)?.is_some() { - return Ok(false) - } - - Ok(true) - } - /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid /// chain. @@ -2569,14 +2527,11 @@ where Ok(Some(_)) => {} } - // determine whether we are on a fork chain - let is_fork = match self.is_fork(block_id) { - Err(err) => { - let block = convert_to_block(self, input)?; - return Err(InsertBlockError::new(block, err.into()).into()); - } - Ok(is_fork) => is_fork, - }; + // determine whether we are on a fork chain by comparing the block number with the + // canonical head. This is a simple check that is sufficient for the event emission below. + // A block is considered a fork if its number is less than or equal to the canonical head, + // as this indicates there's already a canonical block at that height. + let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number; let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state); diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index dfe0f8338a..db02c61d6e 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -3,11 +3,7 @@ use crate::tree::payload_processor::bal::bal_to_hashed_post_state; use alloy_eip7928::BlockAccessList; use alloy_evm::block::StateChangeSource; -use alloy_primitives::{ - keccak256, - map::{B256Set, HashSet}, - B256, -}; +use alloy_primitives::{keccak256, map::HashSet, B256}; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use dashmap::DashMap; use derive_more::derive::Deref; @@ -23,7 +19,6 @@ use reth_trie_parallel::{ proof::ParallelProof, proof_task::{ AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle, - StorageProofInput, }, }; use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant}; @@ -236,74 +231,6 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat hashed_state } -/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`]. -#[derive(Debug)] -enum PendingMultiproofTask { - /// A storage multiproof task input. - Storage(StorageMultiproofInput), - /// A regular multiproof task input. - Regular(MultiproofInput), -} - -impl PendingMultiproofTask { - /// Returns the proof sequence number of the task. - const fn proof_sequence_number(&self) -> u64 { - match self { - Self::Storage(input) => input.proof_sequence_number, - Self::Regular(input) => input.proof_sequence_number, - } - } - - /// Returns whether or not the proof targets are empty. - fn proof_targets_is_empty(&self) -> bool { - match self { - Self::Storage(input) => input.proof_targets.is_empty(), - Self::Regular(input) => input.proof_targets.is_empty(), - } - } - - /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. - fn send_empty_proof(self) { - match self { - Self::Storage(input) => input.send_empty_proof(), - Self::Regular(input) => input.send_empty_proof(), - } - } -} - -impl From for PendingMultiproofTask { - fn from(input: StorageMultiproofInput) -> Self { - Self::Storage(input) - } -} - -impl From for PendingMultiproofTask { - fn from(input: MultiproofInput) -> Self { - Self::Regular(input) - } -} - -/// Input parameters for dispatching a dedicated storage multiproof calculation. -#[derive(Debug)] -struct StorageMultiproofInput { - hashed_state_update: HashedPostState, - hashed_address: B256, - proof_targets: B256Set, - proof_sequence_number: u64, - state_root_message_sender: CrossbeamSender, - multi_added_removed_keys: Arc, -} - -impl StorageMultiproofInput { - /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. - fn send_empty_proof(self) { - let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { - sequence_number: self.proof_sequence_number, - state: self.hashed_state_update, - }); - } -} - /// Input parameters for dispatching a multiproof calculation. #[derive(Debug)] struct MultiproofInput { @@ -378,91 +305,18 @@ impl MultiproofManager { } /// Dispatches a new multiproof calculation to worker pools. - fn dispatch(&self, input: PendingMultiproofTask) { + fn dispatch(&self, input: MultiproofInput) { // If there are no proof targets, we can just send an empty multiproof back immediately - if input.proof_targets_is_empty() { + if input.proof_targets.is_empty() { trace!( - sequence_number = input.proof_sequence_number(), + sequence_number = input.proof_sequence_number, "No proof targets, sending empty multiproof back immediately" ); input.send_empty_proof(); return; } - match input { - PendingMultiproofTask::Storage(storage_input) => { - self.dispatch_storage_proof(storage_input); - } - PendingMultiproofTask::Regular(multiproof_input) => { - self.dispatch_multiproof(multiproof_input); - } - } - } - - /// Dispatches a single storage proof calculation to worker pool. - fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) { - let StorageMultiproofInput { - hashed_state_update, - hashed_address, - proof_targets, - proof_sequence_number, - multi_added_removed_keys, - state_root_message_sender: _, - } = storage_multiproof_input; - - let storage_targets = proof_targets.len(); - - trace!( - target: "engine::tree::payload_processor::multiproof", - proof_sequence_number, - ?proof_targets, - storage_targets, - "Dispatching storage proof to workers" - ); - - let start = Instant::now(); - - // Create prefix set from targets - let prefix_set = reth_trie::prefix_set::PrefixSetMut::from( - proof_targets.iter().map(reth_trie::Nibbles::unpack), - ); - let prefix_set = prefix_set.freeze(); - - // Build computation input (data only) - let input = StorageProofInput::new( - hashed_address, - prefix_set, - proof_targets, - true, // with_branch_node_masks - Some(multi_added_removed_keys), - ); - - // Dispatch to storage worker - if let Err(e) = self.proof_worker_handle.dispatch_storage_proof( - input, - ProofResultContext::new( - self.proof_result_tx.clone(), - proof_sequence_number, - hashed_state_update, - start, - ), - ) { - error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof"); - return; - } - - self.metrics - .active_storage_workers_histogram - .record(self.proof_worker_handle.active_storage_workers() as f64); - self.metrics - .active_account_workers_histogram - .record(self.proof_worker_handle.active_account_workers() as f64); - self.metrics - .pending_storage_multiproofs_histogram - .record(self.proof_worker_handle.pending_storage_tasks() as f64); - self.metrics - .pending_account_multiproofs_histogram - .record(self.proof_worker_handle.pending_account_tasks() as f64); + self.dispatch_multiproof(input); } /// Signals that a multiproof calculation has finished. @@ -809,17 +663,14 @@ impl MultiProofTask { available_storage_workers, MultiProofTargets::chunks, |proof_targets| { - self.multiproof_manager.dispatch( - MultiproofInput { - source: None, - hashed_state_update: Default::default(), - proof_targets, - proof_sequence_number: self.proof_sequencer.next_sequence(), - state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: Some(multi_added_removed_keys.clone()), - } - .into(), - ); + self.multiproof_manager.dispatch(MultiproofInput { + source: None, + hashed_state_update: Default::default(), + proof_targets, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + multi_added_removed_keys: Some(multi_added_removed_keys.clone()), + }); }, ); self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64); @@ -967,17 +818,14 @@ impl MultiProofTask { ); spawned_proof_targets.extend_ref(&proof_targets); - self.multiproof_manager.dispatch( - MultiproofInput { - source: Some(source), - hashed_state_update, - proof_targets, - proof_sequence_number: self.proof_sequencer.next_sequence(), - state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: Some(multi_added_removed_keys.clone()), - } - .into(), - ); + self.multiproof_manager.dispatch(MultiproofInput { + source: Some(source), + hashed_state_update, + proof_targets, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + multi_added_removed_keys: Some(multi_added_removed_keys.clone()), + }); }, ); self.metrics diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 29f8635020..75912f6f1b 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -393,7 +393,7 @@ where metrics, terminate_execution, precompile_cache_disabled, - mut precompile_cache_map, + precompile_cache_map, } = self; let mut state_provider = match provider.build() { diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 6302abde5f..70adbb6911 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -166,8 +166,7 @@ where // Update storage slots with new values and calculate storage roots. let span = tracing::Span::current(); - let (tx, rx) = mpsc::channel(); - state + let results: Vec<_> = state .storages .into_iter() .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address))) @@ -217,13 +216,7 @@ where SparseStateTrieResult::Ok((address, storage_trie)) }) - .for_each_init( - || tx.clone(), - |tx, result| { - let _ = tx.send(result); - }, - ); - drop(tx); + .collect(); // Defer leaf removals until after updates/additions, so that we don't delete an intermediate // branch node during a removal and then re-add that branch back during a later leaf addition. @@ -235,7 +228,7 @@ where let _enter = tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie") .entered(); - for result in rx { + for result in results { let (address, storage_trie) = result?; trie.insert_storage_trie(address, storage_trie); diff --git a/crates/engine/tree/src/tree/precompile_cache.rs b/crates/engine/tree/src/tree/precompile_cache.rs index fd58eee4d6..4dd0d6c043 100644 --- a/crates/engine/tree/src/tree/precompile_cache.rs +++ b/crates/engine/tree/src/tree/precompile_cache.rs @@ -1,50 +1,56 @@ //! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length). use alloy_primitives::Bytes; -use parking_lot::Mutex; +use dashmap::DashMap; use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput}; use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult}; use revm_primitives::Address; -use schnellru::LruMap; -use std::{ - collections::HashMap, - hash::{Hash, Hasher}, - sync::Arc, -}; +use std::{hash::Hash, sync::Arc}; /// Default max cache size for [`PrecompileCache`] const MAX_CACHE_SIZE: u32 = 10_000; /// Stores caches for each precompile. #[derive(Debug, Clone, Default)] -pub struct PrecompileCacheMap(HashMap>) +pub struct PrecompileCacheMap(Arc>>) where - S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone; + S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static; impl PrecompileCacheMap where S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static, { - pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache { + pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache { + // Try just using `.get` first to avoid acquiring a write lock. + if let Some(cache) = self.0.get(&address) { + return cache.clone(); + } + // Otherwise, fallback to `.entry` and initialize the cache. + // + // This should be very rare as caches for all precompiles will be initialized as soon as + // first EVM is created. self.0.entry(address).or_default().clone() } } /// Cache for precompiles, for each input stores the result. -/// -/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order, -/// so we use a [`Mutex`] instead of an `RwLock`. #[derive(Debug, Clone)] -pub struct PrecompileCache(Arc, CacheEntry>>>) +pub struct PrecompileCache( + moka::sync::Cache, alloy_primitives::map::DefaultHashBuilder>, +) where - S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone; + S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static; impl Default for PrecompileCache where S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static, { fn default() -> Self { - Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE))))) + Self( + moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64) + .initial_capacity(MAX_CACHE_SIZE as usize) + .build_with_hasher(Default::default()), + ) } } @@ -52,63 +58,31 @@ impl PrecompileCache where S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static, { - fn get(&self, key: &CacheKeyRef<'_, S>) -> Option { - self.0.lock().get(key).cloned() + fn get(&self, input: &[u8], spec: S) -> Option> { + self.0.get(input).filter(|e| e.spec == spec) } /// Inserts the given key and value into the cache, returning the new cache size. - fn insert(&self, key: CacheKey, value: CacheEntry) -> usize { - let mut cache = self.0.lock(); - cache.insert(key, value); - cache.len() - } -} - -/// Cache key, spec id and precompile call input. spec id is included in the key to account for -/// precompile repricing across fork activations. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct CacheKey((S, Bytes)); - -impl CacheKey { - const fn new(spec_id: S, input: Bytes) -> Self { - Self((spec_id, input)) - } -} - -/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct CacheKeyRef<'a, S>((S, &'a [u8])); - -impl<'a, S> CacheKeyRef<'a, S> { - const fn new(spec_id: S, input: &'a [u8]) -> Self { - Self((spec_id, input)) - } -} - -impl PartialEq> for CacheKeyRef<'_, S> { - fn eq(&self, other: &CacheKey) -> bool { - self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref() - } -} - -impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> { - fn hash(&self, state: &mut H) { - self.0 .0.hash(state); - self.0 .1.hash(state); + fn insert(&self, input: Bytes, value: CacheEntry) -> usize { + self.0.insert(input, value); + self.0.entry_count() as usize } } /// Cache entry, precompile successful output. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct CacheEntry(PrecompileOutput); +pub struct CacheEntry { + output: PrecompileOutput, + spec: S, +} -impl CacheEntry { +impl CacheEntry { const fn gas_used(&self) -> u64 { - self.0.gas_used + self.output.gas_used } fn to_precompile_result(&self) -> PrecompileResult { - Ok(self.0.clone()) + Ok(self.output.clone()) } } @@ -190,9 +164,7 @@ where } fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult { - let key = CacheKeyRef::new(self.spec_id.clone(), input.data); - - if let Some(entry) = &self.cache.get(&key) { + if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) { self.increment_by_one_precompile_cache_hits(); if input.gas >= entry.gas_used() { return entry.to_precompile_result() @@ -204,8 +176,10 @@ where match &result { Ok(output) => { - let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata)); - let size = self.cache.insert(key, CacheEntry(output.clone())); + let size = self.cache.insert( + Bytes::copy_from_slice(calldata), + CacheEntry { output: output.clone(), spec: self.spec_id.clone() }, + ); self.set_precompile_cache_size_metric(size as f64); self.increment_by_one_precompile_cache_misses(); } @@ -246,31 +220,12 @@ impl CachedPrecompileMetrics { #[cfg(test)] mod tests { - use std::hash::DefaultHasher; - use super::*; use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory}; use reth_revm::db::EmptyDB; use revm::{context::TxEnv, precompile::PrecompileOutput}; use revm_primitives::hardfork::SpecId; - #[test] - fn test_cache_key_ref_hash() { - let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into()); - let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input"); - assert!(PartialEq::eq(&key2, &key1)); - - let mut hasher = DefaultHasher::new(); - key1.hash(&mut hasher); - let hash1 = hasher.finish(); - - let mut hasher = DefaultHasher::new(); - key2.hash(&mut hasher); - let hash2 = hasher.finish(); - - assert_eq!(hash1, hash2); - } - #[test] fn test_precompile_cache_basic() { let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult { @@ -293,12 +248,11 @@ mod tests { reverted: false, }; - let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into()); - let expected = CacheEntry(output); - cache.cache.insert(key, expected.clone()); + let input = b"test_input"; + let expected = CacheEntry { output, spec: SpecId::PRAGUE }; + cache.cache.insert(input.into(), expected.clone()); - let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input"); - let actual = cache.cache.get(&key).unwrap(); + let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap(); assert_eq!(actual, expected); } @@ -312,7 +266,7 @@ mod tests { let address1 = Address::repeat_byte(1); let address2 = Address::repeat_byte(2); - let mut cache_map = PrecompileCacheMap::default(); + let cache_map = PrecompileCacheMap::default(); // create the first precompile with a specific output let precompile1: DynPrecompile = (PrecompileId::custom("custom"), { diff --git a/crates/ethereum/hardforks/src/display.rs b/crates/ethereum/hardforks/src/display.rs index 9cbd253b28..3980a01b88 100644 --- a/crates/ethereum/hardforks/src/display.rs +++ b/crates/ethereum/hardforks/src/display.rs @@ -170,7 +170,7 @@ impl DisplayHardforks { let mut post_merge = Vec::new(); for (fork, condition, metadata) in hardforks { - let mut display_fork = DisplayFork { + let display_fork = DisplayFork { name: fork.name().to_string(), activated_at: condition, eip: None, @@ -181,12 +181,7 @@ impl DisplayHardforks { ForkCondition::Block(_) => { pre_merge.push(display_fork); } - ForkCondition::TTD { activation_block_number, total_difficulty, fork_block } => { - display_fork.activated_at = ForkCondition::TTD { - activation_block_number, - fork_block, - total_difficulty, - }; + ForkCondition::TTD { .. } => { with_merge.push(display_fork); } ForkCondition::Timestamp(_) => { diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index f78c16b1a3..306bbf54fb 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -91,6 +91,7 @@ asm-keccak = [ ] keccak-cache-global = [ "alloy-primitives/keccak-cache-global", + "reth-node-core/keccak-cache-global", ] js-tracer = [ "reth-node-builder/js-tracer", diff --git a/crates/ethereum/reth/Cargo.toml b/crates/ethereum/reth/Cargo.toml index a74d53c8eb..a24f39c1a7 100644 --- a/crates/ethereum/reth/Cargo.toml +++ b/crates/ethereum/reth/Cargo.toml @@ -81,6 +81,7 @@ arbitrary = [ ] keccak-cache-global = [ "reth-node-ethereum?/keccak-cache-global", + "reth-node-core?/keccak-cache-global", ] test-utils = [ "reth-chainspec/test-utils", diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index e794795b1c..4c5e569394 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -101,8 +101,9 @@ where .or(Err(P2PStreamError::HandshakeError(P2PHandshakeError::Timeout)))? .ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoResponse))??; - // let's check the compressed length first, we will need to check again once confirming - // that it contains snappy-compressed data (this will be the case for all non-p2p messages). + // Check that the uncompressed message length does not exceed the max payload size. + // Note: The first message (Hello/Disconnect) is not snappy compressed. We will check the + // decompressed length again for subsequent messages after the handshake. if first_message_bytes.len() > MAX_PAYLOAD_SIZE { return Err(P2PStreamError::MessageTooBig { message_size: first_message_bytes.len(), diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index b9ccc5632e..7f0e944c92 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -938,9 +938,13 @@ where /// /// A target block hash if the pipeline is inconsistent, otherwise `None`. pub fn check_pipeline_consistency(&self) -> ProviderResult> { + // We skip the era stage if it's not enabled + let era_enabled = self.era_import_source().is_some(); + let mut all_stages = + StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era); + // Get the expected first stage based on config. - let first_stage = - if self.era_import_source().is_some() { StageId::Era } else { StageId::Headers }; + let first_stage = all_stages.next().expect("there must be at least one stage"); // If no target was provided, check if the stages are congruent - check if the // checkpoint of the last stage matches the checkpoint of the first. @@ -950,20 +954,28 @@ where .unwrap_or_default() .block_number; - // Skip the first stage as we've already retrieved it and comparing all other checkpoints - // against it. - for stage_id in StageId::ALL.iter().skip(1) { + // Compare all other stages against the first + for stage_id in all_stages { let stage_checkpoint = self .blockchain_db() - .get_stage_checkpoint(*stage_id)? + .get_stage_checkpoint(stage_id)? .unwrap_or_default() .block_number; // If the checkpoint of any stage is less than the checkpoint of the first stage, // retrieve and return the block hash of the latest header and use it as the target. + debug!( + target: "consensus::engine", + first_stage_id = %first_stage, + first_stage_checkpoint, + stage_id = %stage_id, + stage_checkpoint = stage_checkpoint, + "Checking stage against first stage", + ); if stage_checkpoint < first_stage_checkpoint { debug!( target: "consensus::engine", + first_stage_id = %first_stage, first_stage_checkpoint, inconsistent_stage_id = %stage_id, inconsistent_stage_checkpoint = stage_checkpoint, diff --git a/crates/node/core/Cargo.toml b/crates/node/core/Cargo.toml index 28dc12b2c7..eccccd8c14 100644 --- a/crates/node/core/Cargo.toml +++ b/crates/node/core/Cargo.toml @@ -80,7 +80,7 @@ tokio.workspace = true # Features for vergen to generate correct env vars jemalloc = ["reth-cli-util/jemalloc"] asm-keccak = ["alloy-primitives/asm-keccak"] -# Feature to enable opentelemetry export +keccak-cache-global = ["alloy-primitives/keccak-cache-global"] otlp = ["reth-tracing/otlp"] min-error-logs = ["tracing/release_max_level_error"] diff --git a/crates/node/core/src/args/network.rs b/crates/node/core/src/args/network.rs index 3da236f169..619a79bb81 100644 --- a/crates/node/core/src/args/network.rs +++ b/crates/node/core/src/args/network.rs @@ -36,7 +36,7 @@ use reth_network::{ DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, }, - HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, SessionsConfig, + HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, }; use reth_network_peers::{mainnet_nodes, TrustedPeer}; use secp256k1::SecretKey; @@ -339,7 +339,7 @@ impl NetworkArgs { NetworkConfigBuilder::::new(secret_key) .external_ip_resolver(self.nat.clone()) .sessions_config( - SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()), + config.sessions.clone().with_upscaled_event_buffer(peers_config.max_peers()), ) .peer_config(peers_config) .boot_nodes(chain_bootnodes.clone()) diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index 25fc9880b5..8d5a816545 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -96,6 +96,7 @@ asm-keccak = [ ] keccak-cache-global = [ "alloy-primitives/keccak-cache-global", + "reth-node-core/keccak-cache-global", "reth-optimism-node/keccak-cache-global", ] js-tracer = [ diff --git a/crates/optimism/reth/Cargo.toml b/crates/optimism/reth/Cargo.toml index e62f396d7f..f18f0e10db 100644 --- a/crates/optimism/reth/Cargo.toml +++ b/crates/optimism/reth/Cargo.toml @@ -76,6 +76,7 @@ arbitrary = [ ] keccak-cache-global = [ "reth-optimism-node?/keccak-cache-global", + "reth-node-core?/keccak-cache-global", ] test-utils = [ "reth-chainspec/test-utils", diff --git a/crates/primitives-traits/src/block/sealed.rs b/crates/primitives-traits/src/block/sealed.rs index 5c43178146..4ae6e4cdc7 100644 --- a/crates/primitives-traits/src/block/sealed.rs +++ b/crates/primitives-traits/src/block/sealed.rs @@ -179,7 +179,7 @@ impl SealedBlock { /// Recovers all senders from the transactions in the block. /// - /// Returns `None` if any of the transactions fail to recover the sender. + /// Returns an error if any of the transactions fail to recover the sender. pub fn senders(&self) -> Result, RecoveryError> { self.body().recover_signers() } diff --git a/crates/primitives-traits/src/header/sealed.rs b/crates/primitives-traits/src/header/sealed.rs index bcf69813f9..fa393d73ee 100644 --- a/crates/primitives-traits/src/header/sealed.rs +++ b/crates/primitives-traits/src/header/sealed.rs @@ -94,7 +94,7 @@ impl SealedHeader { *self.hash_ref() } - /// This is the inverse of [`Header::seal_slow`] which returns the raw header and hash. + /// This is the inverse of [`Self::seal_slow`] which returns the raw header and hash. pub fn split(self) -> (H, BlockHash) { let hash = self.hash(); (self.header, hash) diff --git a/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs b/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs index dec5dcb09a..3ce52ee5a4 100644 --- a/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs +++ b/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs @@ -92,7 +92,7 @@ where /// /// Can fail if the element is rejected by the limiter or if we fail to grow an empty map. /// - /// See [`Schnellru::insert`](LruMap::insert) for more info. + /// See [`LruMap::insert`] for more info. pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool where L::KeyToInsert<'a>: Hash + PartialEq, diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index 32114c58e1..462a6d74c7 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -75,6 +75,7 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-downloaders.workspace = true reth-static-file.workspace = true reth-stages-api = { workspace = true, features = ["test-utils"] } +reth-storage-api.workspace = true reth-testing-utils.workspace = true reth-trie = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } @@ -116,6 +117,7 @@ test-utils = [ "reth-ethereum-primitives?/test-utils", "reth-evm-ethereum/test-utils", ] +rocksdb = ["reth-provider/rocksdb"] [[bench]] name = "criterion" diff --git a/crates/stages/stages/src/stages/merkle_changesets.rs b/crates/stages/stages/src/stages/merkle_changesets.rs index 3daf4731d5..dfea760123 100644 --- a/crates/stages/stages/src/stages/merkle_changesets.rs +++ b/crates/stages/stages/src/stages/merkle_changesets.rs @@ -5,7 +5,7 @@ use reth_consensus::ConsensusError; use reth_primitives_traits::{GotExpected, SealedHeader}; use reth_provider::{ ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader, - PruneCheckpointWriter, StageCheckpointReader, TrieWriter, + PruneCheckpointWriter, StageCheckpointReader, StageCheckpointWriter, TrieWriter, }; use reth_prune_types::{ PruneCheckpoint, PruneMode, PruneSegment, MERKLE_CHANGESETS_RETENTION_BLOCKS, @@ -300,6 +300,7 @@ where + DBProvider + HeaderProvider + ChainStateBlockReader + + StageCheckpointWriter + PruneCheckpointReader + PruneCheckpointWriter, { @@ -404,6 +405,28 @@ where computed_range.start = computed_range.end; } + // If we've unwound so far that there are no longer enough trie changesets available then + // simply clear them and the checkpoints, so that on next pipeline startup they will be + // regenerated. + debug!( + target: "sync::stages::merkle_changesets", + ?computed_range, + retention_blocks=?self.retention_blocks, + "Checking if computed range is over retention threshold", + ); + if computed_range.end - computed_range.start < self.retention_blocks { + debug!( + target: "sync::stages::merkle_changesets", + ?computed_range, + retention_blocks=?self.retention_blocks, + "Clearing checkpoints completely", + ); + provider.clear_trie_changesets()?; + provider + .save_stage_checkpoint(StageId::MerkleChangeSets, StageCheckpoint::default())?; + return Ok(UnwindOutput { checkpoint: StageCheckpoint::default() }) + } + // `computed_range.end` is exclusive let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1)); diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index daf63828b0..087a040f79 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -3,17 +3,16 @@ use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db_api::{ - cursor::{DbCursorRO, DbCursorRW}, - table::Value, + table::{Decode, Decompress, Value}, tables, transaction::DbTxMut, - RawKey, RawValue, }; use reth_etl::Collector; use reth_primitives_traits::{NodePrimitives, SignedTransaction}; use reth_provider::{ - BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, - StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt, + BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache, + TransactionsProvider, TransactionsProviderExt, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ @@ -65,7 +64,9 @@ where + PruneCheckpointReader + StatsReader + StaticFileProviderFactory> - + TransactionsProviderExt, + + TransactionsProviderExt + + StorageSettingsCache + + RocksDBProviderFactory, { /// Return the id of the stage fn id(&self) -> StageId { @@ -150,16 +151,27 @@ where ); if range_output.is_final_range { - let append_only = - provider.count_entries::()?.is_zero(); - let mut txhash_cursor = provider - .tx_ref() - .cursor_write::>()?; - let total_hashes = hash_collector.len(); let interval = (total_hashes / 10).max(1); + + // Use append mode when table is empty (first sync) - significantly faster + let append_only = + provider.count_entries::()?.is_zero(); + + // Create RocksDB batch if feature is enabled + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb = provider.rocksdb_provider(); + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_batch = rocksdb.batch(); + #[cfg(not(all(unix, feature = "rocksdb")))] + let rocksdb_batch = (); + + // Create writer that routes to either MDBX or RocksDB based on settings + let mut writer = + EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + for (index, hash_to_number) in hash_collector.iter()?.enumerate() { - let (hash, number) = hash_to_number?; + let (hash_bytes, number_bytes) = hash_to_number?; if index > 0 && index.is_multiple_of(interval) { info!( target: "sync::stages::transaction_lookup", @@ -169,12 +181,16 @@ where ); } - let key = RawKey::::from_vec(hash); - if append_only { - txhash_cursor.append(key, &RawValue::::from_vec(number))? - } else { - txhash_cursor.insert(key, &RawValue::::from_vec(number))? - } + // Decode from raw ETL bytes + let hash = TxHash::decode(&hash_bytes)?; + let tx_num = TxNumber::decompress(&number_bytes)?; + writer.put_transaction_hash_number(hash, tx_num, append_only)?; + } + + // Extract and register RocksDB batch for commit at provider level + #[cfg(all(unix, feature = "rocksdb"))] + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } trace!(target: "sync::stages::transaction_lookup", @@ -199,11 +215,19 @@ where provider: &Provider, input: UnwindInput, ) -> Result { - let tx = provider.tx_ref(); let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - // Cursor to unwind tx hash to number - let mut tx_hash_number_cursor = tx.cursor_write::()?; + // Create RocksDB batch if feature is enabled + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb = provider.rocksdb_provider(); + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_batch = rocksdb.batch(); + #[cfg(not(all(unix, feature = "rocksdb")))] + let rocksdb_batch = (); + + // Create writer that routes to either MDBX or RocksDB based on settings + let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + let static_file_provider = provider.static_file_provider(); let rev_walker = provider .block_body_indices_range(range.clone())? @@ -218,15 +242,18 @@ where // Delete all transactions that belong to this block for tx_id in body.tx_num_range() { - // First delete the transaction and hash to id mapping - if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? && - tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() - { - tx_hash_number_cursor.delete_current()?; + if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? { + writer.delete_transaction_hash_number(transaction.trie_hash())?; } } } + // Extract and register RocksDB batch for commit at provider level + #[cfg(all(unix, feature = "rocksdb"))] + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), @@ -266,7 +293,7 @@ mod tests { }; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; - use reth_db_api::transaction::DbTx; + use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_ethereum_primitives::Block; use reth_primitives_traits::SealedBlock; use reth_provider::{ @@ -581,4 +608,160 @@ mod tests { self.ensure_no_hash_by_block(input.unwind_to) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage + /// writes transaction hash mappings to `RocksDB` instead of MDBX. + #[tokio::test] + async fn execute_writes_to_rocksdb_when_enabled() { + let (previous_stage, stage_progress) = (110, 100); + let mut rng = generators::rng(); + + // Set up the runner + let runner = TransactionLookupTestRunner::default(); + + // Enable RocksDB for transaction hash numbers + runner.db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + + // Insert blocks with transactions + let blocks = random_block_range( + &mut rng, + stage_progress + 1..=previous_stage, + BlockRangeParams { + parent: Some(B256::ZERO), + tx_count: 1..3, // Ensure we have transactions + ..Default::default() + }, + ); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); + + // Count expected transactions + let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum(); + assert!(expected_tx_count > 0, "test requires at least one transaction"); + + // Execute the stage + let rx = runner.execute(input); + let result = rx.await.unwrap(); + assert!(result.is_ok(), "stage execution failed: {:?}", result); + + // Verify MDBX table is empty (data should be in RocksDB) + let mdbx_count = runner.db.count_entries::().unwrap(); + assert_eq!( + mdbx_count, 0, + "MDBX TransactionHashNumbers should be empty when RocksDB is enabled" + ); + + // Verify RocksDB has the data + let rocksdb = runner.db.factory.rocksdb_provider(); + let mut rocksdb_count = 0; + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash); + rocksdb_count += 1; + } + } + assert_eq!( + rocksdb_count, expected_tx_count, + "RocksDB should contain all transaction hashes" + ); + } + + /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage + /// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX. + #[tokio::test] + async fn unwind_deletes_from_rocksdb_when_enabled() { + let (previous_stage, stage_progress) = (110, 100); + let mut rng = generators::rng(); + + // Set up the runner + let runner = TransactionLookupTestRunner::default(); + + // Enable RocksDB for transaction hash numbers + runner.db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + // Insert blocks with transactions + let blocks = random_block_range( + &mut rng, + stage_progress + 1..=previous_stage, + BlockRangeParams { + parent: Some(B256::ZERO), + tx_count: 1..3, // Ensure we have transactions + ..Default::default() + }, + ); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); + + // Count expected transactions + let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum(); + assert!(expected_tx_count > 0, "test requires at least one transaction"); + + // Execute the stage first to populate RocksDB + let exec_input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + let rx = runner.execute(exec_input); + let result = rx.await.unwrap(); + assert!(result.is_ok(), "stage execution failed: {:?}", result); + + // Verify RocksDB has the data before unwind + let rocksdb = runner.db.factory.rocksdb_provider(); + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!( + result.is_some(), + "Transaction hash {:?} should exist before unwind", + hash + ); + } + } + + // Now unwind to stage_progress (removing all the blocks we added) + let unwind_input = UnwindInput { + checkpoint: StageCheckpoint::new(previous_stage), + unwind_to: stage_progress, + bad_block: None, + }; + let unwind_result = runner.unwind(unwind_input).await; + assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result); + + // Verify RocksDB data is deleted after unwind + let rocksdb = runner.db.factory.rocksdb_provider(); + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!( + result.is_none(), + "Transaction hash {:?} should be deleted from RocksDB after unwind", + hash + ); + } + } + } + } } diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index e404450379..c137c38826 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -50,7 +50,7 @@ impl Default for TestStageDB { create_test_rw_db(), MAINNET.clone(), StaticFileProvider::read_write(static_dir_path).unwrap(), - RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(), + RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(), ) .expect("failed to create test provider factory"), } @@ -68,7 +68,7 @@ impl TestStageDB { create_test_rw_db_with_path(path), MAINNET.clone(), StaticFileProvider::read_write(static_dir_path).unwrap(), - RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(), + RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(), ) .expect("failed to create test provider factory"), } diff --git a/crates/storage/db-api/src/unwind.rs b/crates/storage/db-api/src/unwind.rs index 79cf585a62..e737edfd51 100644 --- a/crates/storage/db-api/src/unwind.rs +++ b/crates/storage/db-api/src/unwind.rs @@ -30,7 +30,7 @@ pub trait DbTxUnwindExt: DbTxMut { let mut deleted = 0; while let Some(Ok((entry_key, _))) = reverse_walker.next() { - if selector(entry_key.clone()) <= key { + if selector(entry_key) <= key { break } reverse_walker.delete_current()?; diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 9e976b057a..80ca829e6e 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -187,6 +187,21 @@ impl<'a> EitherWriter<'a, (), ()> { } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> { + /// Extracts the raw `RocksDB` write batch from this writer, if it contains one. + /// + /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant, + /// `None` for other variants. + /// + /// This is used to defer `RocksDB` commits to the provider level, ensuring all + /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place. + #[cfg(all(unix, feature = "rocksdb"))] + pub fn into_raw_rocksdb_batch(self) -> Option> { + match self { + Self::Database(_) | Self::StaticFile(_) => None, + Self::RocksDB(batch) => Some(batch.into_inner()), + } + } + /// Increment the block number. /// /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`]. @@ -304,13 +319,24 @@ where CURSOR: DbCursorRW + DbCursorRO, { /// Puts a transaction hash number mapping. + /// + /// When `append_only` is true, uses `cursor.append()` which is significantly faster + /// but requires entries to be inserted in order and the table to be empty. + /// When false, uses `cursor.insert()` which handles arbitrary insertion order. pub fn put_transaction_hash_number( &mut self, hash: TxHash, tx_num: TxNumber, + append_only: bool, ) -> ProviderResult<()> { match self { - Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?), + Self::Database(cursor) => { + if append_only { + Ok(cursor.append(hash, &tx_num)?) + } else { + Ok(cursor.insert(hash, &tx_num)?) + } + } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] Self::RocksDB(batch) => batch.put::(hash, &tx_num), @@ -663,12 +689,18 @@ mod tests { #[cfg(all(test, unix, feature = "rocksdb"))] mod rocksdb_tests { - use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider}; + use super::*; + use crate::{ + providers::rocksdb::{RocksDBBuilder, RocksDBProvider}, + test_utils::create_test_provider_factory, + RocksDBProviderFactory, + }; use alloy_primitives::{Address, B256}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey}, tables, }; + use reth_storage_api::{DatabaseProviderFactory, StorageSettings}; use tempfile::TempDir; fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) { @@ -682,6 +714,87 @@ mod rocksdb_tests { (temp_dir, provider) } + /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer + /// when the storage setting is enabled, and that put operations followed by commit + /// persist the data to `RocksDB`. + #[test] + fn test_either_writer_transaction_hash_numbers_with_rocksdb() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash1 = B256::from([1u8; 32]); + let hash2 = B256::from([2u8; 32]); + let tx_num1 = 100u64; + let tx_num2 = 200u64; + + // Get the RocksDB batch from the provider + let rocksdb = factory.rocksdb_provider(); + let batch = rocksdb.batch(); + + // Create EitherWriter with RocksDB + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + + // Verify we got a RocksDB writer + assert!(matches!(writer, EitherWriter::RocksDB(_))); + + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); + + // Extract the batch and register with provider for commit + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + + // Commit via provider - this commits RocksDB batch too + provider.commit().unwrap(); + + // Verify data was written to RocksDB + let rocksdb = factory.rocksdb_provider(); + assert_eq!(rocksdb.get::(hash1).unwrap(), Some(tx_num1)); + assert_eq!(rocksdb.get::(hash2).unwrap(), Some(tx_num2)); + } + + /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`. + #[test] + fn test_either_writer_delete_transaction_hash_number_with_rocksdb() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash = B256::from([1u8; 32]); + let tx_num = 100u64; + + // First, write a value directly to RocksDB + let rocksdb = factory.rocksdb_provider(); + rocksdb.put::(hash, &tx_num).unwrap(); + assert_eq!(rocksdb.get::(hash).unwrap(), Some(tx_num)); + + // Now delete using EitherWriter + let batch = rocksdb.batch(); + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + writer.delete_transaction_hash_number(hash).unwrap(); + + // Extract the batch and commit via provider + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + provider.commit().unwrap(); + + // Verify deletion + let rocksdb = factory.rocksdb_provider(); + assert_eq!(rocksdb.get::(hash).unwrap(), None); + } + #[test] fn test_rocksdb_batch_transaction_hash_numbers() { let (_temp_dir, provider) = create_rocksdb_provider(); @@ -816,4 +929,65 @@ mod rocksdb_tests { // Verify deletion assert_eq!(provider.get::(key).unwrap(), None); } + + /// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level. + /// + /// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically + /// in a single place, making it easier to reason about commit ordering and consistency. + #[test] + fn test_rocksdb_commits_at_provider_level() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash1 = B256::from([1u8; 32]); + let hash2 = B256::from([2u8; 32]); + let tx_num1 = 100u64; + let tx_num2 = 200u64; + + // Get the RocksDB batch from the provider + let rocksdb = factory.rocksdb_provider(); + let batch = rocksdb.batch(); + + // Create provider and EitherWriter + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); + + // Extract the raw batch from the writer and register it with the provider + let raw_batch = writer.into_raw_rocksdb_batch(); + if let Some(batch) = raw_batch { + provider.set_pending_rocksdb_batch(batch); + } + + // Data should NOT be visible yet (batch not committed) + let rocksdb = factory.rocksdb_provider(); + assert_eq!( + rocksdb.get::(hash1).unwrap(), + None, + "Data should not be visible before provider.commit()" + ); + + // Commit the provider - this should commit both MDBX and RocksDB + provider.commit().unwrap(); + + // Now data should be visible in RocksDB + let rocksdb = factory.rocksdb_provider(); + assert_eq!( + rocksdb.get::(hash1).unwrap(), + Some(tx_num1), + "Data should be visible after provider.commit()" + ); + assert_eq!( + rocksdb.get::(hash2).unwrap(), + Some(tx_num2), + "Data should be visible after provider.commit()" + ); + } } diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 9a6286cc8b..44ae667f50 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -181,6 +181,11 @@ impl RocksDBProviderFactory for BlockchainProvider { fn rocksdb_provider(&self) -> RocksDBProvider { self.database.rocksdb_provider() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead") + } } impl HeaderProvider for BlockchainProvider { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 0270f24644..64323d65e9 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -153,6 +153,11 @@ impl RocksDBProviderFactory for ProviderFactory { fn rocksdb_provider(&self) -> RocksDBProvider { self.rocksdb_provider.clone() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead") + } } impl>> ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0f234453b1..0b6dfe08df 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -151,7 +151,6 @@ impl From> /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] -#[derive(Debug)] pub struct DatabaseProvider { /// Database transaction. tx: TX, @@ -167,10 +166,29 @@ pub struct DatabaseProvider { storage_settings: Arc>, /// `RocksDB` provider rocksdb_provider: RocksDBProvider, + /// Pending `RocksDB` batches to be committed at provider commit time. + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex>>, /// Minimum distance from tip required for pruning minimum_pruning_distance: u64, } +impl Debug for DatabaseProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut s = f.debug_struct("DatabaseProvider"); + s.field("tx", &self.tx) + .field("chain_spec", &self.chain_spec) + .field("static_file_provider", &self.static_file_provider) + .field("prune_modes", &self.prune_modes) + .field("storage", &self.storage) + .field("storage_settings", &self.storage_settings) + .field("rocksdb_provider", &self.rocksdb_provider); + #[cfg(all(unix, feature = "rocksdb"))] + s.field("pending_rocksdb_batches", &""); + s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish() + } +} + impl DatabaseProvider { /// Returns reference to prune modes. pub const fn prune_modes_ref(&self) -> &PruneModes { @@ -259,6 +277,11 @@ impl RocksDBProviderFactory for DatabaseProvider { fn rocksdb_provider(&self) -> RocksDBProvider { self.rocksdb_provider.clone() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction) { + self.pending_rocksdb_batches.lock().push(batch); + } } impl> ChainSpecProvider @@ -290,6 +313,8 @@ impl DatabaseProvider { storage, storage_settings, rocksdb_provider, + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, } } @@ -545,6 +570,8 @@ impl DatabaseProvider { storage, storage_settings, rocksdb_provider, + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, } } @@ -3178,7 +3205,7 @@ impl DBProvider for DatabaseProvider self.prune_modes_ref() } - /// Commit database transaction and static files. + /// Commit database transaction, static files, and pending `RocksDB` batches. fn commit(self) -> ProviderResult { // For unwinding it makes more sense to commit the database first, since if // it is interrupted before the static files commit, we can just @@ -3186,9 +3213,27 @@ impl DBProvider for DatabaseProvider // checkpoints on the next start-up. if self.static_file_provider.has_unwind_queued() { self.tx.commit()?; + + #[cfg(all(unix, feature = "rocksdb"))] + { + let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); + for batch in batches { + self.rocksdb_provider.commit_batch(batch)?; + } + } + self.static_file_provider.commit()?; } else { self.static_file_provider.commit()?; + + #[cfg(all(unix, feature = "rocksdb"))] + { + let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); + for batch in batches { + self.rocksdb_provider.commit_batch(batch)?; + } + } + self.tx.commit()?; } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 1d6f4b230a..5039e86d3f 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -450,6 +450,19 @@ impl RocksDBProvider { batch_handle.commit() }) } + + /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`. + /// + /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`] + /// and needs to be committed at a later point (e.g., at provider commit time). + pub fn commit_batch(&self, batch: WriteBatchWithTransaction) -> ProviderResult<()> { + self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| { + ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + }) + } } /// Handle for building a batch of operations atomically. @@ -529,6 +542,13 @@ impl<'a> RocksDBBatch<'a> { pub const fn provider(&self) -> &RocksDBProvider { self.provider } + + /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`. + /// + /// This is used to defer commits to the provider level. + pub fn into_inner(self) -> WriteBatchWithTransaction { + self.inner + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index 6b1e7c4b84..3002f31abd 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -1,5 +1,5 @@ use crate::{ - providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider}, + providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider}, HashingWriter, ProviderFactory, TrieWriter, }; use alloy_primitives::B256; @@ -62,7 +62,10 @@ pub fn create_test_provider_factory_with_node_types( db, chain_spec, StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"), - RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"), + RocksDBBuilder::new(&rocksdb_dir) + .with_default_tables() + .build() + .expect("failed to create test RocksDB provider"), ) .expect("failed to create test provider factory") } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 61609c3758..64eff68b03 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -29,4 +29,9 @@ impl RocksDBProviderFactory for NoopProvider< fn rocksdb_provider(&self) -> RocksDBProvider { RocksDBProvider::builder(PathBuf::default()).build().unwrap() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + // No-op for NoopProvider + } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index c1ffcd4358..9d2186677d 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -6,4 +6,11 @@ use crate::providers::RocksDBProvider; pub trait RocksDBProviderFactory { /// Returns the `RocksDB` provider. fn rocksdb_provider(&self) -> RocksDBProvider; + + /// Adds a pending `RocksDB` batch to be committed when this provider is committed. + /// + /// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file + /// commits, ensuring atomicity across all storage backends. + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction); } diff --git a/crates/trie/sparse/src/state.rs b/crates/trie/sparse/src/state.rs index c276bc6b5b..f02c748430 100644 --- a/crates/trie/sparse/src/state.rs +++ b/crates/trie/sparse/src/state.rs @@ -276,13 +276,12 @@ where { use rayon::iter::{ParallelBridge, ParallelIterator}; - let (tx, rx) = std::sync::mpsc::channel(); let retain_updates = self.retain_updates; // Process all storage trie revealings in parallel, having first removed the // `reveal_nodes` tracking and `SparseTrie`s for each account from their HashMaps. // These will be returned after processing. - storages + let results: Vec<_> = storages .into_iter() .map(|(account, storage_subtree)| { let revealed_nodes = self.storage.take_or_create_revealed_paths(&account); @@ -301,14 +300,12 @@ where (account, revealed_nodes, trie, result) }) - .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap()); - - drop(tx); + .collect(); // Return `revealed_nodes` and `SparseTrie` for each account, incrementing metrics and // returning the last error seen if any. let mut any_err = Ok(()); - for (account, revealed_nodes, trie, result) in rx { + for (account, revealed_nodes, trie, result) in results { self.storage.revealed_paths.insert(account, revealed_nodes); self.storage.tries.insert(account, trie); if let Ok(_metric_values) = result { diff --git a/docs/vocs/docs/pages/run/faq/troubleshooting.mdx b/docs/vocs/docs/pages/run/faq/troubleshooting.mdx index 1f26cba9da..1c8b4f0893 100644 --- a/docs/vocs/docs/pages/run/faq/troubleshooting.mdx +++ b/docs/vocs/docs/pages/run/faq/troubleshooting.mdx @@ -24,7 +24,7 @@ This page tries to answer how to deal with the most popular issues. Externally accessing a `datadir` inside a named docker volume will usually come with folder/file ownership/permissions issues. -**It is not recommended** to use the path to the named volume as it will trigger an error code 13. `RETH_DB_PATH: /var/lib/docker/volumes/named_volume/_data/eth/db cargo r --examples db-access --path ` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead. +**It is not recommended** to use the path to the named volume as it will trigger an error code 13. For example, `RETH_DATADIR=/var/lib/docker/volumes/named_volume/_data/eth cargo run -p db-access` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead. ### Error code 13