Compare commits

..

15 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
378c5851d5 wip 2025-12-18 19:22:54 -05:00
Arsenii Kulikov
30162c535e perf: properly share precompile cache + use moka (#20502) 2025-12-18 22:42:44 +00:00
Federico Gimenez
cd8fec3273 feat(stages): use EitherWriter for TransactionLookupStage RocksDB writes (#20428) 2025-12-18 21:34:17 +00:00
Tomass
1e38c7fea8 chore(hardforks): drop unnecessary field reassignment in TTD branch (#20457)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-18 21:02:56 +00:00
Block Wizard
4dfaf238c9 chore(net): fix misleading comment about uncompressed message size check (#19510) 2025-12-18 20:34:50 +00:00
forkfury
4cf36dda54 docs: correct FinishedStateUpdates message name (#20471) 2025-12-18 20:16:15 +00:00
phrwlk
41ce3d3bbf docs: fix Docker db-access troubleshooting example (#20483) 2025-12-18 20:13:01 +00:00
sashass1315
429d13772e chore(cli): correct p2p body error message (#20498) 2025-12-18 20:01:59 +00:00
Gigi
0cbf89193d docs: correct intra-doc link references (#20467) 2025-12-18 19:56:57 +00:00
radik878
0c3c42bffe chore(primitives-traits): correct SealedBlock::senders return description (#20465) 2025-12-18 19:56:22 +00:00
cui
cdbbd08677 fix: session config should be read from config file (#20484)
Co-authored-by: weixie.cui <weixie.cui@okg.com>
2025-12-18 19:53:18 +00:00
Alexey Shekhirin
4adb1fa5ac fix(cli): default to 0 genesis block number (#20494) 2025-12-18 15:07:59 +00:00
Brian Picciano
b3a792ad1e fix(engine): Use OverlayStateProviderFactory for state root fallback (#20462) 2025-12-18 14:30:11 +00:00
Arsenii Kulikov
98a7095c7a fix: properly determine first stage during pipeline consistency check (#20460) 2025-12-18 10:43:08 +00:00
Matthias Seitz
701e5ec455 chore: add engine terminate (#20420)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2025-12-18 09:01:36 +00:00
44 changed files with 1084 additions and 278 deletions

6
Cargo.lock generated
View File

@@ -8244,10 +8244,10 @@ dependencies = [
"derive_more",
"eyre",
"futures",
"libc",
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
"rand 0.8.5",
@@ -8281,6 +8281,7 @@ dependencies = [
"reth-stages",
"reth-stages-api",
"reth-static-file",
"reth-storage-errors",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
@@ -9128,6 +9129,7 @@ dependencies = [
"fdlimit",
"futures",
"jsonrpsee",
"parking_lot",
"rayon",
"reth-basic-payload-builder",
"reth-chain-state",
@@ -10582,6 +10584,7 @@ dependencies = [
"reth-stages-api",
"reth-static-file",
"reth-static-file-types",
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-trie",
@@ -10996,7 +10999,6 @@ dependencies = [
"dashmap 6.1.0",
"derive_more",
"itertools 0.14.0",
"libc",
"metrics",
"proptest",
"proptest-arbitrary-interop",

View File

@@ -587,6 +587,7 @@ url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
chrono = "0.4.41"

View File

@@ -103,7 +103,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
}
info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage");
let genesis_block_number = self.chain.genesis().number.unwrap();
let genesis_block_number = self.chain.genesis().number.unwrap_or_default();
let (db, sfp) = match access {
AccessRights::RW => (
Arc::new(init_db(db_path, self.db.database_args())?),

View File

@@ -72,7 +72,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
.split();
if result.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
"Invalid number of bodies received. Expected: 1. Received: {}",
result.len()
)
}

View File

@@ -29,6 +29,7 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-storage-errors.workspace = true
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
@@ -52,6 +53,7 @@ futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics
@@ -67,9 +69,6 @@ derive_more.workspace = true
parking_lot.workspace = true
crossbeam-channel.workspace = true
[target.'cfg(unix)'.dependencies]
libc = "0.2"
# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
reth-stages = { workspace = true, optional = true }

View File

@@ -128,12 +128,12 @@ we send them along with the state updates to the [Sparse Trie Task](#sparse-trie
### Finishing the calculation
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishStateUpdates` message
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishedStateUpdates` message
to the State Root Task, marking the end of receiving state updates.
Every time we receive a new proof from the [MultiProof Manager](#multiproof-manager), we also check
the following conditions:
1. Are all updates received? (`StateRootMessage::FinishStateUpdates` was sent)
1. Are all updates received? (`StateRootMessage::FinishedStateUpdates` was sent)
2. Is `ProofSequencer` empty? (no proofs are pending for sequencing)
3. Are all proofs that were sent to the [`MultiProofManager::spawn_or_queue`](#multiproof-manager) finished
calculating and were sent to the [Sparse Trie Task](#sparse-trie-task)?

View File

@@ -219,10 +219,19 @@ pub enum HandlerEvent<T> {
}
/// Internal events issued by the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum FromOrchestrator {
/// Invoked when backfill sync finished
BackfillSyncFinished(ControlFlow),
/// Invoked when backfill sync started
BackfillSyncStarted,
/// Gracefully terminate the engine service.
///
/// When this variant is received, the engine will persist all remaining in-memory blocks
/// to disk before shutting down. Once persistence is complete, a signal is sent through
/// the oneshot channel to notify the caller.
Terminate {
/// Channel to signal termination completion.
tx: tokio::sync::oneshot::Sender<()>,
},
}

View File

@@ -39,6 +39,7 @@ use revm::state::EvmState;
use state::TreeState;
use std::{
fmt::Debug,
ops,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
@@ -426,9 +427,13 @@ where
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
match self.on_engine_message(msg) {
Ok(ops::ControlFlow::Break(())) => return,
Ok(ops::ControlFlow::Continue(())) => {}
Err(fatal) => {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
}
}
Ok(None) => {
@@ -1315,7 +1320,8 @@ where
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
} else if self.should_persist() {
let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
let blocks_to_persist =
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
self.persist_blocks(blocks_to_persist);
}
}
@@ -1323,6 +1329,41 @@ where
Ok(())
}
/// Finishes termination by persisting all remaining blocks and signaling completion.
///
/// This blocks until all persistence is complete. Always signals completion,
/// even if an error occurs.
fn finish_termination(
&mut self,
pending_termination: oneshot::Sender<()>,
) -> Result<(), AdvancePersistenceError> {
trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
let result = self.persist_until_complete();
let _ = pending_termination.send(());
result
}
/// Persists all remaining blocks until none are left.
fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
loop {
// Wait for any in-progress persistence to complete (blocking)
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
self.on_persistence_complete(result, start_time)?;
}
let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
if blocks_to_persist.is_empty() {
debug!(target: "engine::tree", "persistence complete, signaling termination");
return Ok(())
}
debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(blocks_to_persist);
}
}
/// Handles a completed persistence task.
fn on_persistence_complete(
&mut self,
@@ -1348,10 +1389,12 @@ where
}
/// Handles a message from the engine.
///
/// Returns `ControlFlow::Break(())` if the engine should terminate.
fn on_engine_message(
&mut self,
msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
) -> Result<(), InsertBlockFatalError> {
) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncStarted => {
@@ -1361,6 +1404,13 @@ where
FromOrchestrator::BackfillSyncFinished(ctrl) => {
self.on_backfill_sync_finished(ctrl)?;
}
FromOrchestrator::Terminate { tx } => {
debug!(target: "engine::tree", "received terminate request");
if let Err(err) = self.finish_termination(tx) {
error!(target: "engine::tree", %err, "Termination failed");
}
return Ok(ops::ControlFlow::Break(()))
}
},
FromEngine::Request(request) => {
match request {
@@ -1368,7 +1418,7 @@ where
let block_num_hash = block.recovered_block().num_hash();
if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
// outdated block that can be skipped
return Ok(())
return Ok(ops::ControlFlow::Continue(()))
}
debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
@@ -1476,7 +1526,7 @@ where
}
}
}
Ok(())
Ok(ops::ControlFlow::Continue(()))
}
/// Invoked if the backfill sync has finished to target.
@@ -1710,10 +1760,10 @@ where
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. canonical_head - threshold]`. The expected
/// order is oldest -> newest.
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
fn get_canonical_blocks_to_persist(
&self,
target: PersistTarget,
) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
// We will calculate the state root using the database, so we need to be sure there are no
// changes
@@ -1724,9 +1774,12 @@ where
let last_persisted_number = self.persistence_state.last_persisted_block.number;
let canonical_head_number = self.state.tree_state.canonical_block_number();
// Persist only up to block buffer target
let target_number =
canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
let target_number = match target {
PersistTarget::Head => canonical_head_number,
PersistTarget::Threshold => {
canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
}
};
debug!(
target: "engine::tree",
@@ -2869,3 +2922,12 @@ pub enum InsertPayloadOk {
/// The payload was valid and inserted into the tree.
Inserted(BlockStatus),
}
/// Target for block persistence.
#[derive(Debug, Clone, Copy)]
enum PersistTarget {
/// Persist up to `canonical_head - memory_block_buffer_target`.
Threshold,
/// Persist all blocks up to and including the canonical head.
Head,
}

View File

@@ -6,34 +6,6 @@ use tokio::{
task::JoinHandle,
};
/// Sets the current thread's name for profiling visibility.
#[inline]
fn set_thread_name(name: &str) {
#[cfg(target_os = "linux")]
{
// SAFETY: name is a valid string, prctl with PR_SET_NAME is safe
unsafe {
// PR_SET_NAME expects a null-terminated string, truncated to 16 bytes (including null)
let mut buf = [0u8; 16];
let len = name.len().min(15);
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
}
}
#[cfg(target_os = "macos")]
{
// SAFETY: name is a valid string
unsafe {
let c_name = std::ffi::CString::new(name).unwrap_or_default();
libc::pthread_setname_np(c_name.as_ptr());
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = name;
}
}
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
@@ -64,22 +36,6 @@ impl WorkloadExecutor {
{
self.inner.handle.spawn_blocking(func)
}
/// Spawns a blocking task with a descriptive thread name for profiling.
///
/// Sets the thread name at runtime, making it identifiable in profiling tools like Samply.
/// Uses Tokio's blocking thread pool for efficiency.
#[track_caller]
pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(move || {
set_thread_name(name);
func()
})
}
}
#[derive(Debug, Clone)]

View File

@@ -296,7 +296,7 @@ where
// spawn multi-proof task
let parent_span = span.clone();
self.executor.spawn_blocking_named("reth-multiproof", move || {
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
@@ -362,7 +362,7 @@ where
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking_named("reth-tx-conv", move || {
self.executor.spawn_blocking(move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
@@ -376,7 +376,7 @@ where
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
self.executor.spawn_blocking_named("reth-tx-order", move || {
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = ooo_rx.recv() {
@@ -450,7 +450,7 @@ where
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking_named("reth-prewarm", move || {
self.executor.spawn_blocking(move || {
prewarm_task.run(transactions, to_prewarm_task);
});
}
@@ -515,7 +515,7 @@ where
);
let span = Span::current();
self.executor.spawn_blocking_named("reth-sparse", move || {
self.executor.spawn_blocking(move || {
let _enter = span.entered();
let (result, trie) = task.run();

View File

@@ -149,7 +149,7 @@ where
let transaction_count_hint = self.transaction_count_hint;
let span = Span::current();
self.executor.spawn_blocking_named("reth-prewarm", move || {
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
let (done_tx, done_rx) = mpsc::channel();
@@ -393,7 +393,7 @@ where
metrics,
terminate_execution,
precompile_cache_disabled,
mut precompile_cache_map,
precompile_cache_map,
} = self;
let mut state_provider = match provider.build() {
@@ -551,7 +551,7 @@ where
let span =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking_named("reth-prewarm-w", move || {
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, actions_tx, done_tx);
});

View File

@@ -35,12 +35,13 @@ use reth_primitives_traits::{
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockReader,
DatabaseProviderFactory, ExecutionOutcome, HashedPostStateProvider, ProviderError,
PruneCheckpointReader, StageCheckpointReader, StateProvider, StateProviderFactory, StateReader,
StateRootProvider, TrieReader,
DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, TrieReader,
};
use reth_revm::db::State;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInputSorted};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -519,7 +520,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
state_provider.state_root_with_updates(hashed_state.clone()),
self.compute_state_root_serial(block.parent_hash(), &hashed_state, ctx.state()),
block
);
(root, updates, root_time.elapsed())
@@ -657,8 +658,6 @@ where
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
@@ -688,6 +687,36 @@ where
ParallelStateRoot::new(factory, prefix_sets).incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
fn compute_state_root_serial(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> ProviderResult<(B256, TrieUpdates)> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, .. } = input;
let prefix_sets = hashed_state.construct_prefix_sets();
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
let provider = factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets.freeze())
.root_with_updates()
.map_err(Into::<DatabaseError>::into)?)
}
/// Validates the block after execution.
///
/// This performs:

View File

@@ -1,50 +1,56 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use parking_lot::Mutex;
use dashmap::DashMap;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
use revm_primitives::Address;
use schnellru::LruMap;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
};
use std::{hash::Hash, sync::Arc};
/// Default max cache size for [`PrecompileCache`]
const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(HashMap<Address, PrecompileCache<S>>)
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
impl<S> PrecompileCacheMap<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache<S> {
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
// Try just using `.get` first to avoid acquiring a write lock.
if let Some(cache) = self.0.get(&address) {
return cache.clone();
}
// Otherwise, fallback to `.entry` and initialize the cache.
//
// This should be very rare as caches for all precompiles will be initialized as soon as
// first EVM is created.
self.0.entry(address).or_default().clone()
}
}
/// Cache for precompiles, for each input stores the result.
///
/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order,
/// so we use a [`Mutex`] instead of an `RwLock`.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(Arc<Mutex<LruMap<CacheKey<S>, CacheEntry>>>)
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
impl<S> Default for PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn default() -> Self {
Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE)))))
Self(
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
.initial_capacity(MAX_CACHE_SIZE as usize)
.build_with_hasher(Default::default()),
)
}
}
@@ -52,63 +58,31 @@ impl<S> PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn get(&self, key: &CacheKeyRef<'_, S>) -> Option<CacheEntry> {
self.0.lock().get(key).cloned()
fn get(&self, input: &[u8], spec: S) -> Option<CacheEntry<S>> {
self.0.get(input).filter(|e| e.spec == spec)
}
/// Inserts the given key and value into the cache, returning the new cache size.
fn insert(&self, key: CacheKey<S>, value: CacheEntry) -> usize {
let mut cache = self.0.lock();
cache.insert(key, value);
cache.len()
}
}
/// Cache key, spec id and precompile call input. spec id is included in the key to account for
/// precompile repricing across fork activations.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheKey<S>((S, Bytes));
impl<S> CacheKey<S> {
const fn new(spec_id: S, input: Bytes) -> Self {
Self((spec_id, input))
}
}
/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheKeyRef<'a, S>((S, &'a [u8]));
impl<'a, S> CacheKeyRef<'a, S> {
const fn new(spec_id: S, input: &'a [u8]) -> Self {
Self((spec_id, input))
}
}
impl<S: PartialEq> PartialEq<CacheKey<S>> for CacheKeyRef<'_, S> {
fn eq(&self, other: &CacheKey<S>) -> bool {
self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref()
}
}
impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0 .0.hash(state);
self.0 .1.hash(state);
fn insert(&self, input: Bytes, value: CacheEntry<S>) -> usize {
self.0.insert(input, value);
self.0.entry_count() as usize
}
}
/// Cache entry, precompile successful output.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheEntry(PrecompileOutput);
pub struct CacheEntry<S> {
output: PrecompileOutput,
spec: S,
}
impl CacheEntry {
impl<S> CacheEntry<S> {
const fn gas_used(&self) -> u64 {
self.0.gas_used
self.output.gas_used
}
fn to_precompile_result(&self) -> PrecompileResult {
Ok(self.0.clone())
Ok(self.output.clone())
}
}
@@ -190,9 +164,7 @@ where
}
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
let key = CacheKeyRef::new(self.spec_id.clone(), input.data);
if let Some(entry) = &self.cache.get(&key) {
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
self.increment_by_one_precompile_cache_hits();
if input.gas >= entry.gas_used() {
return entry.to_precompile_result()
@@ -204,8 +176,10 @@ where
match &result {
Ok(output) => {
let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata));
let size = self.cache.insert(key, CacheEntry(output.clone()));
let size = self.cache.insert(
Bytes::copy_from_slice(calldata),
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
);
self.set_precompile_cache_size_metric(size as f64);
self.increment_by_one_precompile_cache_misses();
}
@@ -246,31 +220,12 @@ impl CachedPrecompileMetrics {
#[cfg(test)]
mod tests {
use std::hash::DefaultHasher;
use super::*;
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
use reth_revm::db::EmptyDB;
use revm::{context::TxEnv, precompile::PrecompileOutput};
use revm_primitives::hardfork::SpecId;
#[test]
fn test_cache_key_ref_hash() {
let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
assert!(PartialEq::eq(&key2, &key1));
let mut hasher = DefaultHasher::new();
key1.hash(&mut hasher);
let hash1 = hasher.finish();
let mut hasher = DefaultHasher::new();
key2.hash(&mut hasher);
let hash2 = hasher.finish();
assert_eq!(hash1, hash2);
}
#[test]
fn test_precompile_cache_basic() {
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
@@ -293,12 +248,11 @@ mod tests {
reverted: false,
};
let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let expected = CacheEntry(output);
cache.cache.insert(key, expected.clone());
let input = b"test_input";
let expected = CacheEntry { output, spec: SpecId::PRAGUE };
cache.cache.insert(input.into(), expected.clone());
let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
let actual = cache.cache.get(&key).unwrap();
let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap();
assert_eq!(actual, expected);
}
@@ -312,7 +266,7 @@ mod tests {
let address1 = Address::repeat_byte(1);
let address2 = Address::repeat_byte(2);
let mut cache_map = PrecompileCacheMap::default();
let cache_map = PrecompileCacheMap::default();
// create the first precompile with a specific output
let precompile1: DynPrecompile = (PrecompileId::custom("custom"), {

View File

@@ -4,7 +4,7 @@ use crate::{
tree::{
payload_validator::{BasicEngineValidator, TreeCtx, ValidationOutcome},
persistence_state::CurrentPersistenceAction,
TreeConfig,
PersistTarget, TreeConfig,
},
};
@@ -285,7 +285,8 @@ impl TestHarness {
let fcu_state = self.fcu_state(block_hash);
let (tx, rx) = oneshot::channel();
self.tree
let _ = self
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: fcu_state,
@@ -498,7 +499,7 @@ fn test_tree_persist_block_batch() {
// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
test_harness.tree.on_engine_message(msg).unwrap();
let _ = test_harness.tree.on_engine_message(msg).unwrap();
// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
@@ -577,7 +578,7 @@ async fn test_engine_request_during_backfill() {
.with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
test_harness
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
@@ -658,7 +659,7 @@ async fn test_holesky_payload() {
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
test_harness
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
@@ -883,7 +884,8 @@ async fn test_get_canonical_blocks_to_persist() {
.with_persistence_threshold(persistence_threshold)
.with_memory_block_buffer_target(memory_block_buffer_target);
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
let expected_blocks_to_persist_length: usize =
(canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
@@ -902,7 +904,8 @@ async fn test_get_canonical_blocks_to_persist() {
assert!(test_harness.tree.state.tree_state.sealed_header_by_hash(&fork_block_hash).is_some());
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
// check that the fork block is not included in the blocks to persist
@@ -981,7 +984,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
// add block to mock provider to enable persistence clean up.
test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
let _ = test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
@@ -991,7 +994,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
_ => panic!("Unexpected event: {event:#?}"),
}
test_harness
let _ = test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
.last()
@@ -1047,7 +1050,7 @@ async fn test_fcu_with_canonical_ancestor_updates_latest_block() {
// Send FCU to the canonical ancestor
let (tx, rx) = oneshot::channel();
test_harness
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
@@ -1943,4 +1946,53 @@ mod forkchoice_updated_tests {
.unwrap();
assert!(result.is_some(), "OpStack should handle canonical head");
}
/// Test that engine termination persists all blocks and signals completion.
#[test]
fn test_engine_termination_with_everything_persisted() {
let chain_spec = MAINNET.clone();
let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
// Create 10 blocks to persist
let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..11).collect();
let canonical_tip = blocks.last().unwrap().recovered_block().number;
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
// Create termination channel
let (terminate_tx, mut terminate_rx) = oneshot::channel();
let to_tree_tx = test_harness.to_tree_tx.clone();
let action_rx = test_harness.action_rx;
// Spawn tree in background thread
std::thread::Builder::new()
.name("Engine Task".to_string())
.spawn(|| test_harness.tree.run())
.unwrap();
// Send terminate request
to_tree_tx
.send(FromEngine::Event(FromOrchestrator::Terminate { tx: terminate_tx }))
.unwrap();
// Handle persistence actions until termination completes
let mut last_persisted_number = 0;
loop {
if terminate_rx.try_recv().is_ok() {
break;
}
if let Ok(PersistenceAction::SaveBlocks(saved_blocks, sender)) =
action_rx.recv_timeout(std::time::Duration::from_millis(100))
{
if let Some(last) = saved_blocks.last() {
last_persisted_number = last.recovered_block().number;
}
sender.send(saved_blocks.last().map(|b| b.recovered_block().num_hash())).unwrap();
}
}
// Ensure we persisted right to the tip
assert_eq!(last_persisted_number, canonical_tip);
}
}

View File

@@ -170,7 +170,7 @@ impl DisplayHardforks {
let mut post_merge = Vec::new();
for (fork, condition, metadata) in hardforks {
let mut display_fork = DisplayFork {
let display_fork = DisplayFork {
name: fork.name().to_string(),
activated_at: condition,
eip: None,
@@ -181,12 +181,7 @@ impl DisplayHardforks {
ForkCondition::Block(_) => {
pre_merge.push(display_fork);
}
ForkCondition::TTD { activation_block_number, total_difficulty, fork_block } => {
display_fork.activated_at = ForkCondition::TTD {
activation_block_number,
fork_block,
total_difficulty,
};
ForkCondition::TTD { .. } => {
with_merge.push(display_fork);
}
ForkCondition::Timestamp(_) => {

View File

@@ -7,6 +7,7 @@ use reth_e2e_test_utils::{
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_provider::BlockNumReader;
use reth_tasks::TaskManager;
use std::sync::Arc;
@@ -127,3 +128,55 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
Ok(())
}
#[tokio::test]
async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
),
false,
eth_payload_attributes,
)
.await?;
let mut node = nodes.pop().unwrap();
let raw_tx = TransactionTestContext::transfer_tx_bytes(1, wallet.inner).await;
let tx_hash = node.rpc.inject_tx(raw_tx).await?;
let payload = node.advance_block().await?;
node.assert_new_block(tx_hash, payload.block().hash(), payload.block().number).await?;
// Get block number before shutdown
let block_before = node.inner.provider.best_block_number()?;
assert_eq!(block_before, 1, "Expected 1 block before shutdown");
// Verify block is NOT yet persisted to database
let db_block_before = node.inner.provider.last_block_number()?;
assert_eq!(db_block_before, 0, "Block should not be persisted yet");
// Trigger graceful shutdown
let done_rx = node
.inner
.add_ons_handle
.engine_shutdown
.shutdown()
.expect("shutdown should return receiver");
tokio::time::timeout(std::time::Duration::from_secs(2), done_rx)
.await
.expect("shutdown timed out")
.expect("shutdown completion channel should not be closed");
let db_block = node.inner.provider.last_block_number()?;
assert_eq!(db_block, 1, "Database should have persisted block 1");
Ok(())
}

View File

@@ -101,8 +101,9 @@ where
.or(Err(P2PStreamError::HandshakeError(P2PHandshakeError::Timeout)))?
.ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoResponse))??;
// let's check the compressed length first, we will need to check again once confirming
// that it contains snappy-compressed data (this will be the case for all non-p2p messages).
// Check that the uncompressed message length does not exceed the max payload size.
// Note: The first message (Hello/Disconnect) is not snappy compressed. We will check the
// decompressed length again for subsequent messages after the handshake.
if first_message_bytes.len() > MAX_PAYLOAD_SIZE {
return Err(P2PStreamError::MessageTooBig {
message_size: first_message_bytes.len(),

View File

@@ -76,6 +76,7 @@ secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"]
## misc
aquamarine.workspace = true
eyre.workspace = true
parking_lot.workspace = true
jsonrpsee.workspace = true
fdlimit.workspace = true
rayon.workspace = true

View File

@@ -938,11 +938,15 @@ where
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
// Get the expected first stage based on config.
let first_stage =
if self.era_import_source().is_some() { StageId::Era } else { StageId::Headers };
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.get_stage_checkpoint(first_stage)?
.unwrap_or_default()
.block_number;

View File

@@ -3,7 +3,7 @@
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
setup::build_networked_pipeline,
AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
@@ -13,6 +13,7 @@ use futures::{stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
chain::FromOrchestrator,
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
@@ -260,8 +261,16 @@ impl EngineNodeLauncher {
)),
);
let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
add_ons.launch_add_ons(add_ons_ctx).await?;
let RpcHandle {
rpc_server_handles,
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown: _,
} = add_ons.launch_add_ons(add_ons_ctx).await?;
// Create engine shutdown handle
let (engine_shutdown, mut shutdown_rx) = EngineShutdown::new();
// Run consensus engine to completion
let initial_target = ctx.initial_backfill_target()?;
@@ -295,6 +304,14 @@ impl EngineNodeLauncher {
// advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
loop {
tokio::select! {
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}
}
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
@@ -366,6 +383,7 @@ impl EngineNodeLauncher {
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown,
},
};
// Notify on node started

View File

@@ -11,6 +11,7 @@ use crate::{
use alloy_rpc_types::engine::ClientVersionV1;
use alloy_rpc_types_engine::ExecutionData;
use jsonrpsee::{core::middleware::layer::Either, RpcModule};
use parking_lot::Mutex;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks};
use reth_node_api::{
@@ -41,7 +42,9 @@ use std::{
fmt::{self, Debug},
future::Future,
ops::{Deref, DerefMut},
sync::Arc,
};
use tokio::sync::oneshot;
/// Contains the handles to the spawned RPC servers.
///
@@ -332,6 +335,8 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
/// Handle to the beacon consensus engine.
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
/// Handle to trigger engine shutdown.
pub engine_shutdown: EngineShutdown,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
@@ -341,6 +346,7 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
rpc_registry: self.rpc_registry.clone(),
engine_events: self.engine_events.clone(),
beacon_engine_handle: self.beacon_engine_handle.clone(),
engine_shutdown: self.engine_shutdown.clone(),
}
}
}
@@ -361,6 +367,7 @@ where
f.debug_struct("RpcHandle")
.field("rpc_server_handles", &self.rpc_server_handles)
.field("rpc_registry", &self.rpc_registry)
.field("engine_shutdown", &self.engine_shutdown)
.finish()
}
}
@@ -956,6 +963,7 @@ where
rpc_registry: registry,
engine_events,
beacon_engine_handle: engine_handle,
engine_shutdown: EngineShutdown::default(),
})
}
@@ -1428,3 +1436,48 @@ impl IntoEngineApiRpcModule for NoopEngineApi {
RpcModule::new(())
}
}
/// Handle to trigger graceful engine shutdown.
///
/// This handle can be used to request a graceful shutdown of the engine,
/// which will persist all remaining in-memory blocks before terminating.
#[derive(Clone, Debug)]
pub struct EngineShutdown {
/// Channel to send shutdown signal.
tx: Arc<Mutex<Option<oneshot::Sender<EngineShutdownRequest>>>>,
}
impl EngineShutdown {
/// Creates a new [`EngineShutdown`] handle and returns the receiver.
pub fn new() -> (Self, oneshot::Receiver<EngineShutdownRequest>) {
let (tx, rx) = oneshot::channel();
(Self { tx: Arc::new(Mutex::new(Some(tx))) }, rx)
}
/// Requests a graceful engine shutdown.
///
/// All remaining in-memory blocks will be persisted before the engine terminates.
///
/// Returns a receiver that resolves when shutdown is complete.
/// Returns `None` if shutdown was already triggered.
pub fn shutdown(&self) -> Option<oneshot::Receiver<()>> {
let mut guard = self.tx.lock();
let tx = guard.take()?;
let (done_tx, done_rx) = oneshot::channel();
let _ = tx.send(EngineShutdownRequest { done_tx });
Some(done_rx)
}
}
impl Default for EngineShutdown {
fn default() -> Self {
Self { tx: Arc::new(Mutex::new(None)) }
}
}
/// Request to shutdown the engine.
#[derive(Debug)]
pub struct EngineShutdownRequest {
/// Channel to signal shutdown completion.
pub done_tx: oneshot::Sender<()>,
}

View File

@@ -36,7 +36,7 @@ use reth_network::{
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
},
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, SessionsConfig,
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives,
};
use reth_network_peers::{mainnet_nodes, TrustedPeer};
use secp256k1::SecretKey;
@@ -339,7 +339,7 @@ impl NetworkArgs {
NetworkConfigBuilder::<N>::new(secret_key)
.external_ip_resolver(self.nat.clone())
.sessions_config(
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
config.sessions.clone().with_upscaled_event_buffer(peers_config.max_peers()),
)
.peer_config(peers_config)
.boot_nodes(chain_bootnodes.clone())

View File

@@ -179,7 +179,7 @@ impl<B: Block> SealedBlock<B> {
/// Recovers all senders from the transactions in the block.
///
/// Returns `None` if any of the transactions fail to recover the sender.
/// Returns an error if any of the transactions fail to recover the sender.
pub fn senders(&self) -> Result<Vec<Address>, RecoveryError> {
self.body().recover_signers()
}

View File

@@ -94,7 +94,7 @@ impl<H: Sealable> SealedHeader<H> {
*self.hash_ref()
}
/// This is the inverse of [`Header::seal_slow`] which returns the raw header and hash.
/// This is the inverse of [`Self::seal_slow`] which returns the raw header and hash.
pub fn split(self) -> (H, BlockHash) {
let hash = self.hash();
(self.header, hash)

View File

@@ -92,7 +92,7 @@ where
///
/// Can fail if the element is rejected by the limiter or if we fail to grow an empty map.
///
/// See [`Schnellru::insert`](LruMap::insert) for more info.
/// See [`LruMap::insert`] for more info.
pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool
where
L::KeyToInsert<'a>: Hash + PartialEq<K>,

View File

@@ -75,6 +75,7 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-downloaders.workspace = true
reth-static-file.workspace = true
reth-stages-api = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
@@ -116,6 +117,7 @@ test-utils = [
"reth-ethereum-primitives?/test-utils",
"reth-evm-ethereum/test-utils",
]
rocksdb = ["reth-provider/rocksdb"]
[[bench]]
name = "criterion"

View File

@@ -3,17 +3,16 @@ use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
table::Value,
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -65,7 +64,9 @@ where
+ PruneCheckpointReader
+ StatsReader
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ TransactionsProviderExt,
+ TransactionsProviderExt
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -150,16 +151,27 @@ where
);
if range_output.is_final_range {
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
let mut txhash_cursor = provider
.tx_ref()
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
// Use append mode when table is empty (first sync) - significantly faster
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
let (hash_bytes, number_bytes) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
@@ -169,12 +181,16 @@ where
);
}
let key = RawKey::<TxHash>::from_vec(hash);
if append_only {
txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
} else {
txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
trace!(target: "sync::stages::transaction_lookup",
@@ -199,11 +215,19 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Cursor to unwind tx hash to number
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
@@ -218,15 +242,18 @@ where
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
// First delete the transaction and hash to id mapping
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
{
tx_hash_number_cursor.delete_current()?;
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
@@ -266,7 +293,7 @@ mod tests {
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::transaction::DbTx;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_ethereum_primitives::Block;
use reth_primitives_traits::SealedBlock;
use reth_provider::{
@@ -581,4 +608,160 @@ mod tests {
self.ensure_no_hash_by_block(input.unwind_to)
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// writes transaction hash mappings to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
assert_eq!(
mdbx_count, 0,
"MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = runner.db.factory.rocksdb_provider();
let mut rocksdb_count = 0;
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
rocksdb_count += 1;
}
}
assert_eq!(
rocksdb_count, expected_tx_count,
"RocksDB should contain all transaction hashes"
);
}
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
#[tokio::test]
async fn unwind_deletes_from_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage first to populate RocksDB
let exec_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let rx = runner.execute(exec_input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify RocksDB has the data before unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_some(),
"Transaction hash {:?} should exist before unwind",
hash
);
}
}
// Now unwind to stage_progress (removing all the blocks we added)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(previous_stage),
unwind_to: stage_progress,
bad_block: None,
};
let unwind_result = runner.unwind(unwind_input).await;
assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
// Verify RocksDB data is deleted after unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_none(),
"Transaction hash {:?} should be deleted from RocksDB after unwind",
hash
);
}
}
}
}
}

View File

@@ -50,7 +50,7 @@ impl Default for TestStageDB {
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
)
.expect("failed to create test provider factory"),
}
@@ -68,7 +68,7 @@ impl TestStageDB {
create_test_rw_db_with_path(path),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
)
.expect("failed to create test provider factory"),
}

View File

@@ -12936,6 +12936,125 @@ int mdbx_txn_renew(MDBX_txn *txn) {
return LOG_IFERR(rc);
}
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
if (unlikely(!dest))
return LOG_IFERR(MDBX_EINVAL);
*dest = nullptr;
int rc = check_txn(src, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
return LOG_IFERR(MDBX_EINVAL);
MDBX_env *const env = src->env;
rc = check_env(env, true);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
MDBX_txn *txn = nullptr;
const intptr_t bitmap_bytes =
#if MDBX_ENABLE_DBI_SPARSE
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
#else
0;
#endif /* MDBX_ENABLE_DBI_SPARSE */
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
txn = osal_malloc(size);
if (unlikely(txn == nullptr))
return LOG_IFERR(MDBX_ENOMEM);
#if MDBX_DEBUG
memset(txn, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
#endif /* MDBX_DEBUG */
MDBX_ANALYSIS_ASSUME(size > base);
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
txn->dbs = ptr_disp(txn, base);
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
#if MDBX_DEBUG
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
#endif
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
#if MDBX_ENABLE_DBI_SPARSE
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->env = env;
txn->flags = src->flags & ~txn_state_flags;
txn->parent = nullptr;
txn->nested = nullptr;
txn->txnid = src->txnid;
txn->front_txnid = src->front_txnid;
txn->geo = src->geo;
txn->canary = src->canary;
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
if (unlikely(src->n_dbi > env->max_dbi)) {
rc = MDBX_CORRUPTED;
goto bailout;
}
txn->n_dbi = src->n_dbi;
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memset(txn->dbi_sparse, 0, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->to.reader = nullptr;
if (env->lck_mmap.lck) {
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
rc = brs.err;
goto bailout;
}
txn->to.reader = brs.rslot;
safe64_reset(&txn->to.reader->txnid, true);
if (src->to.reader) {
atomic_store32(&txn->to.reader->snapshot_pages_used,
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired,
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
} else {
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
}
safe64_write(&txn->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
}
txn->signature = txn_signature;
txn->userctx = nullptr;
*dest = txn;
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
return MDBX_SUCCESS;
bailout:
osal_free(txn);
return LOG_IFERR(rc);
}
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
int rc = check_txn(txn, MDBX_TXN_FINISHED);
if (unlikely(rc != MDBX_SUCCESS))

View File

@@ -3882,6 +3882,35 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
void *context);
/** \brief Clone a read-only transaction snapshot.
* \ingroup c_transactions
*
* Creates a new read-only transaction that uses the same MVCC snapshot as
* the \p src transaction. This allows parallel read operations across threads
* without re-opening a read transaction and re-fetching state.
*
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
* to be enabled for the environment. Otherwise it will return
* \ref MDBX_TXN_OVERLAPPING.
*
* \note The \p src transaction must be an active read-only transaction.
*
* \note The \p src transaction and the cloned transaction must not be used
* concurrently from multiple threads. Each transaction and its cursors must
* be confined to a single thread at a time.
*
* \param [in] src A read-only transaction handle returned by
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
* stored. Must not be NULL.
*
* \returns A non-zero error value on failure and 0 on success.
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
* \retval MDBX_READERS_FULL Reader lock table is full.
* \retval MDBX_ENOMEM Out of memory. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
/** \brief Create a transaction for use with the environment.
* \ingroup c_transactions
*

View File

@@ -483,6 +483,20 @@ impl Transaction<RW> {
}
impl Transaction<RO> {
/// Clones this read-only transaction, preserving the same MVCC snapshot.
///
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
/// The cloned transaction must not be used concurrently with this transaction from multiple
/// threads.
pub fn clone_snapshot(&self) -> Result<Self> {
let cloned = self.txn_execute(|txn| {
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
})??;
Ok(Self::new_from_ptr(self.env().clone(), cloned))
}
/// Closes the database handle.
///
/// # Safety

View File

@@ -373,3 +373,35 @@ fn test_stat_dupsort() {
assert_eq!(stat.entries(), 8);
}
}
#[test]
fn test_txn_clone_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let ro = env.begin_ro_txn().unwrap();
let clone = ro.clone_snapshot().unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let db = ro.open_db(None).unwrap();
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let db = clone.open_db(None).unwrap();
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let ro2 = env.begin_ro_txn().unwrap();
let db = ro2.open_db(None).unwrap();
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
}

View File

@@ -187,6 +187,21 @@ impl<'a> EitherWriter<'a, (), ()> {
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
/// `None` for other variants.
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
Self::RocksDB(batch) => Some(batch.into_inner()),
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -304,13 +319,24 @@ where
CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
{
/// Puts a transaction hash number mapping.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.insert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_number(
&mut self,
hash: TxHash,
tx_num: TxNumber,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?),
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(hash, &tx_num)?)
} else {
Ok(cursor.insert(hash, &tx_num)?)
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
@@ -663,12 +689,18 @@ mod tests {
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_tests {
use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider};
use super::*;
use crate::{
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
test_utils::create_test_provider_factory,
RocksDBProviderFactory,
};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
};
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -682,6 +714,87 @@ mod rocksdb_tests {
(temp_dir, provider)
}
/// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
/// when the storage setting is enabled, and that put operations followed by commit
/// persist the data to `RocksDB`.
#[test]
fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create EitherWriter with RocksDB
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Verify we got a RocksDB writer
assert!(matches!(writer, EitherWriter::RocksDB(_)));
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the batch and register with provider for commit
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Commit via provider - this commits RocksDB batch too
provider.commit().unwrap();
// Verify data was written to RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
}
/// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
#[test]
fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash = B256::from([1u8; 32]);
let tx_num = 100u64;
// First, write a value directly to RocksDB
let rocksdb = factory.rocksdb_provider();
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
// Now delete using EitherWriter
let batch = rocksdb.batch();
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
writer.delete_transaction_hash_number(hash).unwrap();
// Extract the batch and commit via provider
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
provider.commit().unwrap();
// Verify deletion
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
}
#[test]
fn test_rocksdb_batch_transaction_hash_numbers() {
let (_temp_dir, provider) = create_rocksdb_provider();
@@ -816,4 +929,65 @@ mod rocksdb_tests {
// Verify deletion
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
}
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
///
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
/// in a single place, making it easier to reason about commit ordering and consistency.
#[test]
fn test_rocksdb_commits_at_provider_level() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create provider and EitherWriter
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the raw batch from the writer and register it with the provider
let raw_batch = writer.into_raw_rocksdb_batch();
if let Some(batch) = raw_batch {
provider.set_pending_rocksdb_batch(batch);
}
// Data should NOT be visible yet (batch not committed)
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
None,
"Data should not be visible before provider.commit()"
);
// Commit the provider - this should commit both MDBX and RocksDB
provider.commit().unwrap();
// Now data should be visible in RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
Some(tx_num1),
"Data should be visible after provider.commit()"
);
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
Some(tx_num2),
"Data should be visible after provider.commit()"
);
}
}

View File

@@ -181,6 +181,11 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

View File

@@ -153,6 +153,11 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {

View File

@@ -151,7 +151,6 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Database transaction.
tx: TX,
@@ -167,10 +166,29 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
}
impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("DatabaseProvider");
s.field("tx", &self.tx)
.field("chain_spec", &self.chain_spec)
.field("static_file_provider", &self.static_file_provider)
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
}
}
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
/// Returns reference to prune modes.
pub const fn prune_modes_ref(&self) -> &PruneModes {
@@ -259,6 +277,11 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
}
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
@@ -290,6 +313,8 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -545,6 +570,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -3178,7 +3205,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
/// Commit database transaction and static files.
/// Commit database transaction, static files, and pending `RocksDB` batches.
fn commit(self) -> ProviderResult<bool> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3186,9 +3213,27 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.tx.commit()?;
}

View File

@@ -450,6 +450,19 @@ impl RocksDBProvider {
batch_handle.commit()
})
}
/// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
///
/// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
/// and needs to be committed at a later point (e.g., at provider commit time).
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
}
/// Handle for building a batch of operations atomically.
@@ -529,6 +542,13 @@ impl<'a> RocksDBBatch<'a> {
pub const fn provider(&self) -> &RocksDBProvider {
self.provider
}
/// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
///
/// This is used to defer commits to the provider level.
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.

View File

@@ -1,5 +1,5 @@
use crate::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider},
HashingWriter, ProviderFactory, TrieWriter,
};
use alloy_primitives::B256;
@@ -62,7 +62,10 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
RocksDBBuilder::new(&rocksdb_dir)
.with_default_tables()
.build()
.expect("failed to create test RocksDB provider"),
)
.expect("failed to create test provider factory")
}

View File

@@ -29,4 +29,9 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
// No-op for NoopProvider
}
}

View File

@@ -6,4 +6,11 @@ use crate::providers::RocksDBProvider;
pub trait RocksDBProviderFactory {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider;
/// Adds a pending `RocksDB` batch to be committed when this provider is committed.
///
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
}

View File

@@ -36,9 +36,6 @@ itertools.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
crossbeam-channel.workspace = true
[target.'cfg(unix)'.dependencies]
libc = "0.2"
# `metrics` feature
reth-metrics = { workspace = true, optional = true }
metrics = { workspace = true, optional = true }

View File

@@ -262,6 +262,7 @@ mod tests {
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
use reth_trie::proof::Proof;
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use tokio::runtime::Runtime;
#[test]
fn random_parallel_proof() {
@@ -325,7 +326,7 @@ mod tests {
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_rw.tx_ref());
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(provider_rw.tx_ref());
let rt = tokio::runtime::Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(factory);

View File

@@ -71,33 +71,6 @@ use std::{
use tokio::runtime::Handle;
use tracing::{debug, debug_span, error, trace};
/// Sets the current thread's name for profiling visibility.
#[inline]
fn set_thread_name(name: &str) {
#[cfg(target_os = "linux")]
{
// SAFETY: prctl with PR_SET_NAME is safe with a valid string pointer
unsafe {
let mut buf = [0u8; 16];
let len = name.len().min(15);
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
}
}
#[cfg(target_os = "macos")]
{
// SAFETY: pthread_setname_np is safe with a valid CString
unsafe {
let c_name = std::ffi::CString::new(name).unwrap_or_default();
libc::pthread_setname_np(c_name.as_ptr());
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = name;
}
}
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::{
ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
@@ -178,7 +151,6 @@ impl ProofWorkerHandle {
let storage_available_workers_clone = storage_available_workers.clone();
executor.spawn_blocking(move || {
set_thread_name("reth-stor-proof");
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
@@ -219,7 +191,6 @@ impl ProofWorkerHandle {
let account_available_workers_clone = account_available_workers.clone();
executor.spawn_blocking(move || {
set_thread_name("reth-acct-proof");
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
@@ -1617,7 +1588,7 @@ enum AccountWorkerJob {
mod tests {
use super::*;
use reth_provider::test_utils::create_test_provider_factory;
use tokio::runtime::Builder;
use tokio::{runtime::Builder, task};
fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
ProofTaskCtx::new(factory)
@@ -1627,16 +1598,21 @@ mod tests {
#[test]
fn spawn_proof_workers_creates_handle() {
let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
let provider_factory = create_test_provider_factory();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let ctx = test_ctx(factory);
runtime.block_on(async {
let handle = tokio::runtime::Handle::current();
let provider_factory = create_test_provider_factory();
let factory =
reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let ctx = test_ctx(factory);
let proof_handle = ProofWorkerHandle::new(runtime.handle().clone(), ctx, 5, 3);
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();
// Workers shut down automatically when handle is dropped
drop(proof_handle);
// Workers shut down automatically when handle is dropped
drop(proof_handle);
task::yield_now().await;
});
}
}

View File

@@ -24,7 +24,7 @@ This page tries to answer how to deal with the most popular issues.
Externally accessing a `datadir` inside a named docker volume will usually come with folder/file ownership/permissions issues.
**It is not recommended** to use the path to the named volume as it will trigger an error code 13. `RETH_DB_PATH: /var/lib/docker/volumes/named_volume/_data/eth/db cargo r --examples db-access --path ` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead.
**It is not recommended** to use the path to the named volume as it will trigger an error code 13. For example, `RETH_DATADIR=/var/lib/docker/volumes/named_volume/_data/eth cargo run -p db-access` is **DISCOURAGED** and a mounted volume with the right permissions should be used instead.
### Error code 13