chore: bump revm to branch with backported hashmap perf fixes

This commit is contained in:
Dan Cline
2025-12-19 13:56:48 -05:00
40 changed files with 659 additions and 431 deletions

41
Cargo.lock generated
View File

@@ -6354,8 +6354,7 @@ dependencies = [
[[package]]
name = "op-revm"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1475a779c73999fc803778524042319691b31f3d6699d2b560c4ed8be1db802a"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"revm",
@@ -8247,6 +8246,7 @@ dependencies = [
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
"rand 0.8.5",
@@ -10583,6 +10583,7 @@ dependencies = [
"reth-stages-api",
"reth-static-file",
"reth-static-file-types",
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-trie",
@@ -11088,8 +11089,7 @@ dependencies = [
[[package]]
name = "revm"
version = "33.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c85ed0028f043f87b3c88d4a4cb6f0a76440085523b6a8afe5ff003cf418054"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"revm-bytecode",
"revm-context",
@@ -11107,8 +11107,7 @@ dependencies = [
[[package]]
name = "revm-bytecode"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2c6b5e6e8dd1e28a4a60e5f46615d4ef0809111c9e63208e55b5c7058200fb0"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitvec",
"phf",
@@ -11119,8 +11118,7 @@ dependencies = [
[[package]]
name = "revm-context"
version = "12.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f038f0c9c723393ac897a5df9140b21cfa98f5753a2cb7d0f28fa430c4118abf"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitvec",
"cfg-if",
@@ -11136,8 +11134,7 @@ dependencies = [
[[package]]
name = "revm-context-interface"
version = "13.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431c9a14e4ef1be41ae503708fd02d974f80ef1f2b6b23b5e402e8d854d1b225"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-eip2930",
"alloy-eip7702",
@@ -11152,8 +11149,7 @@ dependencies = [
[[package]]
name = "revm-database"
version = "9.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "980d8d6bba78c5dd35b83abbb6585b0b902eb25ea4448ed7bfba6283b0337191"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-eips",
"revm-bytecode",
@@ -11166,8 +11162,7 @@ dependencies = [
[[package]]
name = "revm-database-interface"
version = "8.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cce03e3780287b07abe58faf4a7f5d8be7e81321f93ccf3343c8f7755602bae"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"either",
@@ -11179,8 +11174,7 @@ dependencies = [
[[package]]
name = "revm-handler"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d44f8f6dbeec3fecf9fe55f78ef0a758bdd92ea46cd4f1ca6e2a946b32c367f3"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"derive-where",
@@ -11198,8 +11192,7 @@ dependencies = [
[[package]]
name = "revm-inspector"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5617e49216ce1ca6c8826bcead0386bc84f49359ef67cde6d189961735659f93"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"either",
@@ -11236,8 +11229,7 @@ dependencies = [
[[package]]
name = "revm-interpreter"
version = "31.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26ec36405f7477b9dccdc6caa3be19adf5662a7a0dffa6270cdb13a090c077e5"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"revm-bytecode",
"revm-context-interface",
@@ -11249,8 +11241,7 @@ dependencies = [
[[package]]
name = "revm-precompile"
version = "31.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a62958af953cc4043e93b5be9b8497df84cc3bd612b865c49a7a7dfa26a84e2"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"ark-bls12-381",
"ark-bn254",
@@ -11274,8 +11265,7 @@ dependencies = [
[[package]]
name = "revm-primitives"
version = "21.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29e161db429d465c09ba9cbff0df49e31049fe6b549e28eb0b7bd642fcbd4412"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-primitives",
"num_enum",
@@ -11286,8 +11276,7 @@ dependencies = [
[[package]]
name = "revm-state"
version = "8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d8be953b7e374dbdea0773cf360debed8df394ea8d82a8b240a6b5da37592fc"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitflags 2.10.0",
"revm-bytecode",

View File

@@ -587,6 +587,7 @@ url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
chrono = "0.4.41"
@@ -785,3 +786,13 @@ ipnet = "2.11"
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
[patch.crates-io]
revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-bytecode = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-database = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-state = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-primitives = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-interpreter = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-database-interface = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
op-revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }

View File

@@ -103,6 +103,7 @@ asm-keccak = [
"reth-node-ethereum/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
]
jemalloc = [

View File

@@ -103,7 +103,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
}
info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage");
let genesis_block_number = self.chain.genesis().number.unwrap();
let genesis_block_number = self.chain.genesis().number.unwrap_or_default();
let (db, sfp) = match access {
AccessRights::RW => (
Arc::new(init_db(db_path, self.db.database_args())?),

View File

@@ -72,7 +72,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
.split();
if result.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
"Invalid number of bodies received. Expected: 1. Received: {}",
result.len()
)
}

View File

@@ -53,6 +53,7 @@ futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics

View File

@@ -128,12 +128,12 @@ we send them along with the state updates to the [Sparse Trie Task](#sparse-trie
### Finishing the calculation
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishStateUpdates` message
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishedStateUpdates` message
to the State Root Task, marking the end of receiving state updates.
Every time we receive a new proof from the [MultiProof Manager](#multiproof-manager), we also check
the following conditions:
1. Are all updates received? (`StateRootMessage::FinishStateUpdates` was sent)
1. Are all updates received? (`StateRootMessage::FinishedStateUpdates` was sent)
2. Is `ProofSequencer` empty? (no proofs are pending for sequencing)
3. Are all proofs that were sent to the [`MultiProofManager::spawn_or_queue`](#multiproof-manager) finished
calculating and were sent to the [Sparse Trie Task](#sparse-trie-task)?

View File

@@ -1,5 +1,4 @@
use crate::metrics::PersistenceMetrics;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
@@ -142,27 +141,23 @@ where
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks");
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
hash: block.recovered_block().hash(),
number: block.recovered_block().header().number(),
});
if last_block_hash_num.is_some() {
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.commit()?;
}
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks");
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)
Ok(last_block)
}
}

View File

@@ -931,48 +931,6 @@ where
Ok(())
}
/// Determines if the given block is part of a fork by checking that these
/// conditions are true:
/// * walking back from the target hash to verify that the target hash is not part of an
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
///
/// The header is required as an arg, because we might be checking that the header is a fork
/// block before it's in the tree state and before it's in the database.
fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
let target_hash = target.block.hash;
// verify that the given hash is not part of an extension of the canon chain.
let canonical_head = self.state.tree_state.canonical_head();
let mut current_hash;
let mut current_block = target;
loop {
if current_block.block.hash == canonical_head.hash {
return Ok(false)
}
// We already passed the canonical head
if current_block.block.number <= canonical_head.number {
break
}
current_hash = current_block.parent;
let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
current_block = next_block.block_with_parent();
}
// verify that the given hash is not already part of canonical chain stored in memory
if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
return Ok(false)
}
// verify that the given hash is not already part of persisted canonical chain
if self.provider.block_number(target_hash)?.is_some() {
return Ok(false)
}
Ok(true)
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
@@ -2569,14 +2527,11 @@ where
Ok(Some(_)) => {}
}
// determine whether we are on a fork chain
let is_fork = match self.is_fork(block_id) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
}
Ok(is_fork) => is_fork,
};
// determine whether we are on a fork chain by comparing the block number with the
// canonical head. This is a simple check that is sufficient for the event emission below.
// A block is considered a fork if its number is less than or equal to the canonical head,
// as this indicates there's already a canonical block at that height.
let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);

View File

@@ -3,11 +3,7 @@
use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
use alloy_eip7928::BlockAccessList;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{
keccak256,
map::{B256Set, HashSet},
B256,
};
use alloy_primitives::{keccak256, map::HashSet, B256};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use derive_more::derive::Deref;
@@ -23,7 +19,6 @@ use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
StorageProofInput,
},
};
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
@@ -236,74 +231,6 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
hashed_state
}
/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
#[derive(Debug)]
enum PendingMultiproofTask {
/// A storage multiproof task input.
Storage(StorageMultiproofInput),
/// A regular multiproof task input.
Regular(MultiproofInput),
}
impl PendingMultiproofTask {
/// Returns the proof sequence number of the task.
const fn proof_sequence_number(&self) -> u64 {
match self {
Self::Storage(input) => input.proof_sequence_number,
Self::Regular(input) => input.proof_sequence_number,
}
}
/// Returns whether or not the proof targets are empty.
fn proof_targets_is_empty(&self) -> bool {
match self {
Self::Storage(input) => input.proof_targets.is_empty(),
Self::Regular(input) => input.proof_targets.is_empty(),
}
}
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
match self {
Self::Storage(input) => input.send_empty_proof(),
Self::Regular(input) => input.send_empty_proof(),
}
}
}
impl From<StorageMultiproofInput> for PendingMultiproofTask {
fn from(input: StorageMultiproofInput) -> Self {
Self::Storage(input)
}
}
impl From<MultiproofInput> for PendingMultiproofTask {
fn from(input: MultiproofInput) -> Self {
Self::Regular(input)
}
}
/// Input parameters for dispatching a dedicated storage multiproof calculation.
#[derive(Debug)]
struct StorageMultiproofInput {
hashed_state_update: HashedPostState,
hashed_address: B256,
proof_targets: B256Set,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
}
impl StorageMultiproofInput {
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
sequence_number: self.proof_sequence_number,
state: self.hashed_state_update,
});
}
}
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
@@ -378,91 +305,18 @@ impl MultiproofManager {
}
/// Dispatches a new multiproof calculation to worker pools.
fn dispatch(&self, input: PendingMultiproofTask) {
fn dispatch(&self, input: MultiproofInput) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
if input.proof_targets.is_empty() {
trace!(
sequence_number = input.proof_sequence_number(),
sequence_number = input.proof_sequence_number,
"No proof targets, sending empty multiproof back immediately"
);
input.send_empty_proof();
return;
}
match input {
PendingMultiproofTask::Storage(storage_input) => {
self.dispatch_storage_proof(storage_input);
}
PendingMultiproofTask::Regular(multiproof_input) => {
self.dispatch_multiproof(multiproof_input);
}
}
}
/// Dispatches a single storage proof calculation to worker pool.
fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) {
let StorageMultiproofInput {
hashed_state_update,
hashed_address,
proof_targets,
proof_sequence_number,
multi_added_removed_keys,
state_root_message_sender: _,
} = storage_multiproof_input;
let storage_targets = proof_targets.len();
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
storage_targets,
"Dispatching storage proof to workers"
);
let start = Instant::now();
// Create prefix set from targets
let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
proof_targets.iter().map(reth_trie::Nibbles::unpack),
);
let prefix_set = prefix_set.freeze();
// Build computation input (data only)
let input = StorageProofInput::new(
hashed_address,
prefix_set,
proof_targets,
true, // with_branch_node_masks
Some(multi_added_removed_keys),
);
// Dispatch to storage worker
if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
input,
ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
),
) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
return;
}
self.metrics
.active_storage_workers_histogram
.record(self.proof_worker_handle.active_storage_workers() as f64);
self.metrics
.active_account_workers_histogram
.record(self.proof_worker_handle.active_account_workers() as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
self.dispatch_multiproof(input);
}
/// Signals that a multiproof calculation has finished.
@@ -809,17 +663,14 @@ impl MultiProofTask {
available_storage_workers,
MultiProofTargets::chunks,
|proof_targets| {
self.multiproof_manager.dispatch(
MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
},
);
self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
@@ -967,17 +818,14 @@ impl MultiProofTask {
);
spawned_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.dispatch(
MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
},
);
self.metrics

View File

@@ -393,7 +393,7 @@ where
metrics,
terminate_execution,
precompile_cache_disabled,
mut precompile_cache_map,
precompile_cache_map,
} = self;
let mut state_provider = match provider.build() {

View File

@@ -166,8 +166,7 @@ where
// Update storage slots with new values and calculate storage roots.
let span = tracing::Span::current();
let (tx, rx) = mpsc::channel();
state
let results: Vec<_> = state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
@@ -217,13 +216,7 @@ where
SparseStateTrieResult::Ok((address, storage_trie))
})
.for_each_init(
|| tx.clone(),
|tx, result| {
let _ = tx.send(result);
},
);
drop(tx);
.collect();
// Defer leaf removals until after updates/additions, so that we don't delete an intermediate
// branch node during a removal and then re-add that branch back during a later leaf addition.
@@ -235,7 +228,7 @@ where
let _enter =
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
.entered();
for result in rx {
for result in results {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);

View File

@@ -1,50 +1,56 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use parking_lot::Mutex;
use dashmap::DashMap;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
use revm_primitives::Address;
use schnellru::LruMap;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
};
use std::{hash::Hash, sync::Arc};
/// Default max cache size for [`PrecompileCache`]
const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(HashMap<Address, PrecompileCache<S>>)
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
impl<S> PrecompileCacheMap<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache<S> {
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
// Try just using `.get` first to avoid acquiring a write lock.
if let Some(cache) = self.0.get(&address) {
return cache.clone();
}
// Otherwise, fallback to `.entry` and initialize the cache.
//
// This should be very rare as caches for all precompiles will be initialized as soon as
// first EVM is created.
self.0.entry(address).or_default().clone()
}
}
/// Cache for precompiles, for each input stores the result.
///
/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order,
/// so we use a [`Mutex`] instead of an `RwLock`.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(Arc<Mutex<LruMap<CacheKey<S>, CacheEntry>>>)
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
impl<S> Default for PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn default() -> Self {
Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE)))))
Self(
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
.initial_capacity(MAX_CACHE_SIZE as usize)
.build_with_hasher(Default::default()),
)
}
}
@@ -52,63 +58,31 @@ impl<S> PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn get(&self, key: &CacheKeyRef<'_, S>) -> Option<CacheEntry> {
self.0.lock().get(key).cloned()
fn get(&self, input: &[u8], spec: S) -> Option<CacheEntry<S>> {
self.0.get(input).filter(|e| e.spec == spec)
}
/// Inserts the given key and value into the cache, returning the new cache size.
fn insert(&self, key: CacheKey<S>, value: CacheEntry) -> usize {
let mut cache = self.0.lock();
cache.insert(key, value);
cache.len()
}
}
/// Cache key, spec id and precompile call input. spec id is included in the key to account for
/// precompile repricing across fork activations.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheKey<S>((S, Bytes));
impl<S> CacheKey<S> {
const fn new(spec_id: S, input: Bytes) -> Self {
Self((spec_id, input))
}
}
/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheKeyRef<'a, S>((S, &'a [u8]));
impl<'a, S> CacheKeyRef<'a, S> {
const fn new(spec_id: S, input: &'a [u8]) -> Self {
Self((spec_id, input))
}
}
impl<S: PartialEq> PartialEq<CacheKey<S>> for CacheKeyRef<'_, S> {
fn eq(&self, other: &CacheKey<S>) -> bool {
self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref()
}
}
impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0 .0.hash(state);
self.0 .1.hash(state);
fn insert(&self, input: Bytes, value: CacheEntry<S>) -> usize {
self.0.insert(input, value);
self.0.entry_count() as usize
}
}
/// Cache entry, precompile successful output.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheEntry(PrecompileOutput);
pub struct CacheEntry<S> {
output: PrecompileOutput,
spec: S,
}
impl CacheEntry {
impl<S> CacheEntry<S> {
const fn gas_used(&self) -> u64 {
self.0.gas_used
self.output.gas_used
}
fn to_precompile_result(&self) -> PrecompileResult {
Ok(self.0.clone())
Ok(self.output.clone())
}
}
@@ -190,9 +164,7 @@ where
}
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
let key = CacheKeyRef::new(self.spec_id.clone(), input.data);
if let Some(entry) = &self.cache.get(&key) {
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
self.increment_by_one_precompile_cache_hits();
if input.gas >= entry.gas_used() {
return entry.to_precompile_result()
@@ -204,8 +176,10 @@ where
match &result {
Ok(output) => {
let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata));
let size = self.cache.insert(key, CacheEntry(output.clone()));
let size = self.cache.insert(
Bytes::copy_from_slice(calldata),
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
);
self.set_precompile_cache_size_metric(size as f64);
self.increment_by_one_precompile_cache_misses();
}
@@ -246,31 +220,12 @@ impl CachedPrecompileMetrics {
#[cfg(test)]
mod tests {
use std::hash::DefaultHasher;
use super::*;
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
use reth_revm::db::EmptyDB;
use revm::{context::TxEnv, precompile::PrecompileOutput};
use revm_primitives::hardfork::SpecId;
#[test]
fn test_cache_key_ref_hash() {
let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
assert!(PartialEq::eq(&key2, &key1));
let mut hasher = DefaultHasher::new();
key1.hash(&mut hasher);
let hash1 = hasher.finish();
let mut hasher = DefaultHasher::new();
key2.hash(&mut hasher);
let hash2 = hasher.finish();
assert_eq!(hash1, hash2);
}
#[test]
fn test_precompile_cache_basic() {
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
@@ -293,12 +248,11 @@ mod tests {
reverted: false,
};
let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let expected = CacheEntry(output);
cache.cache.insert(key, expected.clone());
let input = b"test_input";
let expected = CacheEntry { output, spec: SpecId::PRAGUE };
cache.cache.insert(input.into(), expected.clone());
let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
let actual = cache.cache.get(&key).unwrap();
let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap();
assert_eq!(actual, expected);
}
@@ -312,7 +266,7 @@ mod tests {
let address1 = Address::repeat_byte(1);
let address2 = Address::repeat_byte(2);
let mut cache_map = PrecompileCacheMap::default();
let cache_map = PrecompileCacheMap::default();
// create the first precompile with a specific output
let precompile1: DynPrecompile = (PrecompileId::custom("custom"), {

View File

@@ -170,7 +170,7 @@ impl DisplayHardforks {
let mut post_merge = Vec::new();
for (fork, condition, metadata) in hardforks {
let mut display_fork = DisplayFork {
let display_fork = DisplayFork {
name: fork.name().to_string(),
activated_at: condition,
eip: None,
@@ -181,12 +181,7 @@ impl DisplayHardforks {
ForkCondition::Block(_) => {
pre_merge.push(display_fork);
}
ForkCondition::TTD { activation_block_number, total_difficulty, fork_block } => {
display_fork.activated_at = ForkCondition::TTD {
activation_block_number,
fork_block,
total_difficulty,
};
ForkCondition::TTD { .. } => {
with_merge.push(display_fork);
}
ForkCondition::Timestamp(_) => {

View File

@@ -91,6 +91,7 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
]
js-tracer = [
"reth-node-builder/js-tracer",

View File

@@ -81,6 +81,7 @@ arbitrary = [
]
keccak-cache-global = [
"reth-node-ethereum?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -101,8 +101,9 @@ where
.or(Err(P2PStreamError::HandshakeError(P2PHandshakeError::Timeout)))?
.ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoResponse))??;
// let's check the compressed length first, we will need to check again once confirming
// that it contains snappy-compressed data (this will be the case for all non-p2p messages).
// Check that the uncompressed message length does not exceed the max payload size.
// Note: The first message (Hello/Disconnect) is not snappy compressed. We will check the
// decompressed length again for subsequent messages after the handshake.
if first_message_bytes.len() > MAX_PAYLOAD_SIZE {
return Err(P2PStreamError::MessageTooBig {
message_size: first_message_bytes.len(),

View File

@@ -938,9 +938,13 @@ where
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
// We skip the era stage if it's not enabled
let era_enabled = self.era_import_source().is_some();
let mut all_stages =
StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
// Get the expected first stage based on config.
let first_stage =
if self.era_import_source().is_some() { StageId::Era } else { StageId::Headers };
let first_stage = all_stages.next().expect("there must be at least one stage");
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
@@ -950,20 +954,28 @@ where
.unwrap_or_default()
.block_number;
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
// Compare all other stages against the first
for stage_id in all_stages {
let stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*stage_id)?
.get_stage_checkpoint(stage_id)?
.unwrap_or_default()
.block_number;
// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
stage_id = %stage_id,
stage_checkpoint = stage_checkpoint,
"Checking stage against first stage",
);
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,

View File

@@ -80,7 +80,7 @@ tokio.workspace = true
# Features for vergen to generate correct env vars
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
# Feature to enable opentelemetry export
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
otlp = ["reth-tracing/otlp"]
min-error-logs = ["tracing/release_max_level_error"]

View File

@@ -36,7 +36,7 @@ use reth_network::{
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
},
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, SessionsConfig,
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives,
};
use reth_network_peers::{mainnet_nodes, TrustedPeer};
use secp256k1::SecretKey;
@@ -339,7 +339,7 @@ impl NetworkArgs {
NetworkConfigBuilder::<N>::new(secret_key)
.external_ip_resolver(self.nat.clone())
.sessions_config(
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
config.sessions.clone().with_upscaled_event_buffer(peers_config.max_peers()),
)
.peer_config(peers_config)
.boot_nodes(chain_bootnodes.clone())

View File

@@ -96,6 +96,7 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
js-tracer = [

View File

@@ -76,6 +76,7 @@ arbitrary = [
]
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -179,7 +179,7 @@ impl<B: Block> SealedBlock<B> {
/// Recovers all senders from the transactions in the block.
///
/// Returns `None` if any of the transactions fail to recover the sender.
/// Returns an error if any of the transactions fail to recover the sender.
pub fn senders(&self) -> Result<Vec<Address>, RecoveryError> {
self.body().recover_signers()
}

View File

@@ -94,7 +94,7 @@ impl<H: Sealable> SealedHeader<H> {
*self.hash_ref()
}
/// This is the inverse of [`Header::seal_slow`] which returns the raw header and hash.
/// This is the inverse of [`Self::seal_slow`] which returns the raw header and hash.
pub fn split(self) -> (H, BlockHash) {
let hash = self.hash();
(self.header, hash)

View File

@@ -92,7 +92,7 @@ where
///
/// Can fail if the element is rejected by the limiter or if we fail to grow an empty map.
///
/// See [`Schnellru::insert`](LruMap::insert) for more info.
/// See [`LruMap::insert`] for more info.
pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool
where
L::KeyToInsert<'a>: Hash + PartialEq<K>,

View File

@@ -75,6 +75,7 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-downloaders.workspace = true
reth-static-file.workspace = true
reth-stages-api = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
@@ -116,6 +117,7 @@ test-utils = [
"reth-ethereum-primitives?/test-utils",
"reth-evm-ethereum/test-utils",
]
rocksdb = ["reth-provider/rocksdb"]
[[bench]]
name = "criterion"

View File

@@ -5,7 +5,7 @@ use reth_consensus::ConsensusError;
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_provider::{
ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, StageCheckpointReader, TrieWriter,
PruneCheckpointWriter, StageCheckpointReader, StageCheckpointWriter, TrieWriter,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneSegment, MERKLE_CHANGESETS_RETENTION_BLOCKS,
@@ -300,6 +300,7 @@ where
+ DBProvider
+ HeaderProvider
+ ChainStateBlockReader
+ StageCheckpointWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter,
{
@@ -404,6 +405,28 @@ where
computed_range.start = computed_range.end;
}
// If we've unwound so far that there are no longer enough trie changesets available then
// simply clear them and the checkpoints, so that on next pipeline startup they will be
// regenerated.
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Checking if computed range is over retention threshold",
);
if computed_range.end - computed_range.start < self.retention_blocks {
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Clearing checkpoints completely",
);
provider.clear_trie_changesets()?;
provider
.save_stage_checkpoint(StageId::MerkleChangeSets, StageCheckpoint::default())?;
return Ok(UnwindOutput { checkpoint: StageCheckpoint::default() })
}
// `computed_range.end` is exclusive
let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1));

View File

@@ -3,17 +3,16 @@ use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
table::Value,
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -65,7 +64,9 @@ where
+ PruneCheckpointReader
+ StatsReader
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ TransactionsProviderExt,
+ TransactionsProviderExt
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -150,16 +151,27 @@ where
);
if range_output.is_final_range {
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
let mut txhash_cursor = provider
.tx_ref()
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
// Use append mode when table is empty (first sync) - significantly faster
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
let (hash_bytes, number_bytes) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
@@ -169,12 +181,16 @@ where
);
}
let key = RawKey::<TxHash>::from_vec(hash);
if append_only {
txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
} else {
txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
trace!(target: "sync::stages::transaction_lookup",
@@ -199,11 +215,19 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Cursor to unwind tx hash to number
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
@@ -218,15 +242,18 @@ where
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
// First delete the transaction and hash to id mapping
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
{
tx_hash_number_cursor.delete_current()?;
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
@@ -266,7 +293,7 @@ mod tests {
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::transaction::DbTx;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_ethereum_primitives::Block;
use reth_primitives_traits::SealedBlock;
use reth_provider::{
@@ -581,4 +608,160 @@ mod tests {
self.ensure_no_hash_by_block(input.unwind_to)
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// writes transaction hash mappings to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
assert_eq!(
mdbx_count, 0,
"MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = runner.db.factory.rocksdb_provider();
let mut rocksdb_count = 0;
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
rocksdb_count += 1;
}
}
assert_eq!(
rocksdb_count, expected_tx_count,
"RocksDB should contain all transaction hashes"
);
}
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
#[tokio::test]
async fn unwind_deletes_from_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage first to populate RocksDB
let exec_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let rx = runner.execute(exec_input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify RocksDB has the data before unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_some(),
"Transaction hash {:?} should exist before unwind",
hash
);
}
}
// Now unwind to stage_progress (removing all the blocks we added)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(previous_stage),
unwind_to: stage_progress,
bad_block: None,
};
let unwind_result = runner.unwind(unwind_input).await;
assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
// Verify RocksDB data is deleted after unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_none(),
"Transaction hash {:?} should be deleted from RocksDB after unwind",
hash
);
}
}
}
}
}

View File

@@ -50,7 +50,7 @@ impl Default for TestStageDB {
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
)
.expect("failed to create test provider factory"),
}
@@ -68,7 +68,7 @@ impl TestStageDB {
create_test_rw_db_with_path(path),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
)
.expect("failed to create test provider factory"),
}

View File

@@ -30,7 +30,7 @@ pub trait DbTxUnwindExt: DbTxMut {
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
if selector(entry_key) <= key {
break
}
reverse_walker.delete_current()?;

View File

@@ -187,6 +187,21 @@ impl<'a> EitherWriter<'a, (), ()> {
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
/// `None` for other variants.
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
Self::RocksDB(batch) => Some(batch.into_inner()),
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -304,13 +319,24 @@ where
CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
{
/// Puts a transaction hash number mapping.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.insert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_number(
&mut self,
hash: TxHash,
tx_num: TxNumber,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?),
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(hash, &tx_num)?)
} else {
Ok(cursor.insert(hash, &tx_num)?)
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
@@ -663,12 +689,18 @@ mod tests {
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_tests {
use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider};
use super::*;
use crate::{
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
test_utils::create_test_provider_factory,
RocksDBProviderFactory,
};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
};
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -682,6 +714,87 @@ mod rocksdb_tests {
(temp_dir, provider)
}
/// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
/// when the storage setting is enabled, and that put operations followed by commit
/// persist the data to `RocksDB`.
#[test]
fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create EitherWriter with RocksDB
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Verify we got a RocksDB writer
assert!(matches!(writer, EitherWriter::RocksDB(_)));
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the batch and register with provider for commit
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Commit via provider - this commits RocksDB batch too
provider.commit().unwrap();
// Verify data was written to RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
}
/// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
#[test]
fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash = B256::from([1u8; 32]);
let tx_num = 100u64;
// First, write a value directly to RocksDB
let rocksdb = factory.rocksdb_provider();
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
// Now delete using EitherWriter
let batch = rocksdb.batch();
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
writer.delete_transaction_hash_number(hash).unwrap();
// Extract the batch and commit via provider
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
provider.commit().unwrap();
// Verify deletion
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
}
#[test]
fn test_rocksdb_batch_transaction_hash_numbers() {
let (_temp_dir, provider) = create_rocksdb_provider();
@@ -816,4 +929,65 @@ mod rocksdb_tests {
// Verify deletion
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
}
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
///
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
/// in a single place, making it easier to reason about commit ordering and consistency.
#[test]
fn test_rocksdb_commits_at_provider_level() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create provider and EitherWriter
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the raw batch from the writer and register it with the provider
let raw_batch = writer.into_raw_rocksdb_batch();
if let Some(batch) = raw_batch {
provider.set_pending_rocksdb_batch(batch);
}
// Data should NOT be visible yet (batch not committed)
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
None,
"Data should not be visible before provider.commit()"
);
// Commit the provider - this should commit both MDBX and RocksDB
provider.commit().unwrap();
// Now data should be visible in RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
Some(tx_num1),
"Data should be visible after provider.commit()"
);
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
Some(tx_num2),
"Data should be visible after provider.commit()"
);
}
}

View File

@@ -181,6 +181,11 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

View File

@@ -153,6 +153,11 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {

View File

@@ -151,7 +151,6 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Database transaction.
tx: TX,
@@ -167,10 +166,29 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
}
impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("DatabaseProvider");
s.field("tx", &self.tx)
.field("chain_spec", &self.chain_spec)
.field("static_file_provider", &self.static_file_provider)
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
}
}
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
/// Returns reference to prune modes.
pub const fn prune_modes_ref(&self) -> &PruneModes {
@@ -259,6 +277,11 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
}
impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
@@ -290,6 +313,8 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -545,6 +570,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -3178,7 +3205,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
/// Commit database transaction and static files.
/// Commit database transaction, static files, and pending `RocksDB` batches.
fn commit(self) -> ProviderResult<bool> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3186,9 +3213,27 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.tx.commit()?;
}

View File

@@ -450,6 +450,19 @@ impl RocksDBProvider {
batch_handle.commit()
})
}
/// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
///
/// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
/// and needs to be committed at a later point (e.g., at provider commit time).
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
}
/// Handle for building a batch of operations atomically.
@@ -529,6 +542,13 @@ impl<'a> RocksDBBatch<'a> {
pub const fn provider(&self) -> &RocksDBProvider {
self.provider
}
/// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
///
/// This is used to defer commits to the provider level.
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.

View File

@@ -1,5 +1,5 @@
use crate::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider},
HashingWriter, ProviderFactory, TrieWriter,
};
use alloy_primitives::B256;
@@ -62,7 +62,10 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
RocksDBBuilder::new(&rocksdb_dir)
.with_default_tables()
.build()
.expect("failed to create test RocksDB provider"),
)
.expect("failed to create test provider factory")
}

View File

@@ -29,4 +29,9 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
// No-op for NoopProvider
}
}

View File

@@ -6,4 +6,11 @@ use crate::providers::RocksDBProvider;
pub trait RocksDBProviderFactory {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider;
/// Adds a pending `RocksDB` batch to be committed when this provider is committed.
///
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
}

View File

@@ -276,13 +276,12 @@ where
{
use rayon::iter::{ParallelBridge, ParallelIterator};
let (tx, rx) = std::sync::mpsc::channel();
let retain_updates = self.retain_updates;
// Process all storage trie revealings in parallel, having first removed the
// `reveal_nodes` tracking and `SparseTrie`s for each account from their HashMaps.
// These will be returned after processing.
storages
let results: Vec<_> = storages
.into_iter()
.map(|(account, storage_subtree)| {
let revealed_nodes = self.storage.take_or_create_revealed_paths(&account);
@@ -301,14 +300,12 @@ where
(account, revealed_nodes, trie, result)
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
drop(tx);
.collect();
// Return `revealed_nodes` and `SparseTrie` for each account, incrementing metrics and
// returning the last error seen if any.
let mut any_err = Ok(());
for (account, revealed_nodes, trie, result) in rx {
for (account, revealed_nodes, trie, result) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
if let Ok(_metric_values) = result {

View File

@@ -24,7 +24,7 @@ This page tries to answer how to deal with the most popular issues.
Externally accessing a `datadir` inside a named docker volume will usually come with folder/file ownership/permissions issues.
**It is not recommended** to use the path to the named volume as it will trigger an error code 13. `RETH_DB_PATH: /var/lib/docker/volumes/named_volume/_data/eth/db cargo r --examples db-access --path ` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead.
**It is not recommended** to use the path to the named volume as it will trigger an error code 13. For example, `RETH_DATADIR=/var/lib/docker/volumes/named_volume/_data/eth cargo run -p db-access` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead.
### Error code 13