Compare commits

..

2 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
79829b76a3 feat(libmdbx): add lifetime parameter to TableObject for safe zero-copy reads
Amp-Thread-ID: https://ampcode.com/threads/T-019bfc77-cb7b-77a9-b53b-8f366e4adbb3
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 23:18:26 +00:00
Georgios Konstantopoulos
507520114c feat(libmdbx): add TxPtrAccess trait for transaction pointer abstraction
This PR introduces the TxPtrAccess trait that abstracts over different ways
of accessing the underlying MDBX transaction pointer. This is foundational
work that enables future support for unsynchronized transaction types.

## Motivation

Based on optimizations from signet-libmdbx (https://test.signet.sh/updates/optimizing-mdbx-access-in-rust/):
- Current implementation uses a Mutex for every DB operation
- This is correct but adds overhead in single-threaded hot paths
- TxPtrAccess enables future TxUnsync type that uses &mut self instead
  of runtime locks, achieving up to 3-4x faster writes

## Changes

- Add tx_access module with TxPtrAccess trait
- Implement TxPtrAccess for TransactionPtr
- Make TransactionPtr public to support the sealed trait pattern
- Export TxPtrAccess from lib.rs

## Future Work

This is PR 1 of a series:
- PR 2: Add lifetime-safe TableObject<'a> to fix zero-copy read bugs
- PR 3: Add TxUnsync<K> unsynchronized transaction type
- PR 4: Integrate TxUnsync in reth hot paths (block execution)

Amp-Thread-ID: https://ampcode.com/threads/T-019bfc77-cb7b-77a9-b53b-8f366e4adbb3
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 22:51:17 +00:00
85 changed files with 820 additions and 2132 deletions

View File

@@ -44,24 +44,3 @@ jobs:
--exclude 'op-reth' \
--exclude 'reth' \
-E 'binary(e2e_testsuite)'
rocksdb:
name: e2e-rocksdb
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: taiki-e/install-action@nextest
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

1
Cargo.lock generated
View File

@@ -10248,7 +10248,6 @@ dependencies = [
"strum 0.27.2",
"thiserror 2.0.18",
"toml",
"tracing",
]
[[package]]

View File

@@ -68,8 +68,6 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
RUN cp /app/target/$BUILD_PROFILE/$BINARY /app/binary || \

View File

@@ -401,38 +401,25 @@ impl Command {
let mut current_block = start_block;
for payload_idx in 0..count {
const MAX_RETRIES: u32 = 5;
let mut attempts = 0;
let result = loop {
attempts += 1;
match collector.collect(current_block).await {
Ok(res) => break Some(res),
Err(e) => {
if attempts >= MAX_RETRIES {
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries");
break None;
}
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
};
let Some((transactions, total_gas, next_block)) = result else {
break;
};
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});

View File

@@ -101,8 +101,8 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
// We may be using the tui for a long time
tx.disable_long_read_transaction_safety();
let table_db = tx.inner().open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner().db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let total_entries = stats.entries();
let final_entry_idx = total_entries.saturating_sub(1);
if self.args.skip > final_entry_idx {

View File

@@ -92,10 +92,10 @@ impl Command {
db_tables.sort();
let mut total_size = 0;
for db_table in db_tables {
let table_db = tx.inner().open_db(Some(db_table)).wrap_err("Could not open db.")?;
let table_db = tx.inner.open_db(Some(db_table)).wrap_err("Could not open db.")?;
let stats = tx
.inner()
.inner
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {db_table}"))?;
@@ -136,9 +136,9 @@ impl Command {
.add_cell(Cell::new(human_bytes(total_size as f64)));
table.add_row(row);
let freelist = tx.inner().env().freelist()?;
let freelist = tx.inner.env().freelist()?;
let pagesize =
tx.inner().db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
let freelist_size = freelist * pagesize;
let mut row = Row::new();

View File

@@ -79,4 +79,4 @@ path = "tests/rocksdb/main.rs"
required-features = ["edge"]
[features]
edge = ["reth-node-core/edge", "reth-provider/rocksdb", "reth-cli-commands/edge"]
edge = ["reth-node-core/edge"]

View File

@@ -125,10 +125,7 @@ pub async fn setup_engine_with_chain_import(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)?;
// Initialize genesis if needed
@@ -331,7 +328,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
.with_default_tables()
.build()
.unwrap(),
)
@@ -396,7 +392,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
)
@@ -495,10 +490,7 @@ mod tests {
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create provider factory");

View File

@@ -38,18 +38,6 @@ impl TransactionTestContext {
signed.encoded_2718().into()
}
/// Creates a transfer with a specific nonce and signs it, returning bytes.
/// Uses high `max_fee_per_gas` (1000 gwei) to ensure tx acceptance regardless of basefee.
pub async fn transfer_tx_bytes_with_nonce(
chain_id: u64,
wallet: PrivateKeySigner,
nonce: u64,
) -> Bytes {
let tx = tx(chain_id, 21000, None, None, nonce, Some(1000e9 as u128));
let signed = Self::sign_tx(wallet, tx).await;
signed.encoded_2718().into()
}
/// Creates a deployment transaction and signs it, returning an envelope.
pub async fn deploy_tx(
chain_id: u64,

View File

@@ -8,67 +8,12 @@ use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
use eyre::Result;
use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_e2e_test_utils::{transaction::TransactionTestContext, E2ETestSetupBuilder};
use reth_node_builder::NodeConfig;
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::{RocksDBProviderFactory, StorageSettings};
use std::{sync::Arc, time::Duration};
const ROCKSDB_POLL_TIMEOUT: Duration = Duration::from_secs(60);
const ROCKSDB_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// Polls RPC until the given `tx_hash` is visible as pending (not yet mined).
/// Prevents race conditions where `advance_block` is called before txs are in the pool.
/// Returns the pending transaction.
async fn wait_for_pending_tx<C: ClientT>(client: &C, tx_hash: B256) -> Transaction {
let start = std::time::Instant::now();
loop {
let tx: Option<Transaction> = client
.request("eth_getTransactionByHash", [tx_hash])
.await
.expect("RPC request failed");
if let Some(tx) = tx {
assert!(
tx.block_number.is_none(),
"Expected pending tx but tx_hash={tx_hash:?} is already mined in block {:?}",
tx.block_number
);
return tx;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} to appear in pending pool",
start.elapsed()
);
tokio::time::sleep(ROCKSDB_POLL_INTERVAL).await;
}
}
/// Polls `RocksDB` until the given `tx_hash` appears in `TransactionHashNumbers`.
/// Returns the `tx_number` on success, or panics on timeout.
async fn poll_tx_in_rocksdb<P: RocksDBProviderFactory>(provider: &P, tx_hash: B256) -> u64 {
let start = std::time::Instant::now();
let mut interval = ROCKSDB_POLL_INTERVAL;
loop {
// Re-acquire handle each iteration to avoid stale snapshot reads
let rocksdb = provider.rocksdb_provider();
let tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(tx_hash).expect("RocksDB get failed");
if let Some(n) = tx_number {
return n;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} in RocksDB",
start.elapsed()
);
tokio::time::sleep(interval).await;
// Simple backoff: 50ms -> 100ms -> 200ms (capped)
interval = std::cmp::min(interval * 2, Duration::from_millis(200));
}
}
use std::sync::Arc;
/// Returns the test chain spec for `RocksDB` tests.
fn test_chain_spec() -> Arc<ChainSpec> {
@@ -96,24 +41,10 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults match `StorageSettings::base()`.
#[test]
fn test_rocksdb_defaults_match_storage_settings() {
let args = RocksDbArgs::default();
let settings = StorageSettings::base();
assert_eq!(
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
"tx_hash default should match StorageSettings::base()"
);
assert_eq!(
args.storages_history, settings.storages_history_in_rocksdb,
"storages_history default should match StorageSettings::base()"
);
assert_eq!(
args.account_history, settings.account_history_in_rocksdb,
"account_history default should match StorageSettings::base()"
);
/// Enables `RocksDB` for all supported tables.
fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
config.rocksdb = RocksDbArgs { all: true, ..Default::default() };
config
}
/// Smoke test: node boots with `RocksDB` routing enabled.
@@ -125,16 +56,19 @@ async fn test_rocksdb_node_startup() -> Result<()> {
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Verify RocksDB provider is functional (can query without error)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_hash = B256::from([0xab; 32]);
let result: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(result.is_none(), "Missing hash should return None");
// Verify RocksDB directory exists
let rocksdb_path = nodes[0].inner.data_dir.rocksdb();
assert!(rocksdb_path.exists(), "RocksDB directory should exist at {rocksdb_path:?}");
assert!(
std::fs::read_dir(&rocksdb_path).map(|mut d| d.next().is_some()).unwrap_or(false),
"RocksDB directory should be non-empty"
);
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
@@ -148,10 +82,10 @@ async fn test_rocksdb_block_mining() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
@@ -160,30 +94,12 @@ async fn test_rocksdb_block_mining() -> Result<()> {
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
// Mine 3 blocks with transactions
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
for i in 1..=3u64 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), i - 1)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
// Mine 3 blocks
for i in 1..=3 {
let payload = nodes[0].advance_block().await?;
let block = payload.block();
assert_eq!(block.number(), i);
assert_ne!(block.hash(), B256::ZERO);
// Verify tx was actually included in the block
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should exist after mining");
assert_eq!(receipt.block_number, Some(i), "Tx should be in block {i}");
}
// Verify all blocks are stored
@@ -203,53 +119,48 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let (mut nodes, _tasks, wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Inject and mine a transaction
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let mut tx_hashes = Vec::new();
// Inject and mine 3 transactions (new wallet per tx to avoid nonce tracking)
for i in 0..3 {
let wallets = wallet.wallet_gen();
let signer = wallets[0].clone();
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
tx_hashes.push(tx_hash);
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), i + 1);
}
let client = nodes[0].rpc_client().expect("RPC client should be available");
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Query each transaction by hash
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1));
for (i, tx_hash) in tx_hashes.iter().enumerate() {
let expected_block_number = (i + 1) as u64;
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should be found");
assert_eq!(receipt.block_number, Some(1));
assert!(receipt.status());
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(expected_block_number));
// Direct RocksDB assertion - poll with timeout since persistence is async
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have TxNumber 0");
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should be found");
assert_eq!(receipt.block_number, Some(expected_block_number));
assert!(receipt.status());
}
// Verify missing hash returns None
// Negative test: querying a non-existent tx hash returns None
let missing_hash = B256::from([0xde; 32]);
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(missing_tx_number.is_none());
let missing_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [missing_hash]).await?;
assert!(missing_tx.is_none(), "expected no transaction for missing hash");
@@ -260,212 +171,3 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
Ok(())
}
/// Multiple transactions in the same block are correctly persisted to `RocksDB`.
#[tokio::test]
async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
// Create 3 txs from the same wallet with sequential nonces
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
let mut tx_hashes = Vec::new();
for nonce in 0..3 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), nonce)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
tx_hashes.push(tx_hash);
}
// Wait for all txs to appear in pending pool before mining
for tx_hash in &tx_hashes {
wait_for_pending_tx(&client, *tx_hash).await;
}
// Mine one block containing all 3 txs
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Verify block contains all 3 txs
let block: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
let block = block.expect("Block 1 should exist");
assert_eq!(block.transactions.len(), 3, "Block should contain 3 txs");
// Verify each tx via RPC
for tx_hash in &tx_hashes {
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1), "All txs should be in block 1");
}
// Poll RocksDB for all tx hashes and collect tx_numbers
let mut tx_numbers = Vec::new();
for tx_hash in &tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify tx_numbers form the set {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be 0, 1, 2");
Ok(())
}
/// Transactions across multiple blocks have globally continuous `tx_numbers`.
#[tokio::test]
async fn test_rocksdb_txs_across_blocks() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
// Block 1: 2 transactions
let tx_hash_0 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 0).await,
)
.await?;
let tx_hash_1 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await,
)
.await?;
// Wait for both txs to appear in pending pool
wait_for_pending_tx(&client, tx_hash_0).await;
wait_for_pending_tx(&client, tx_hash_1).await;
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
// Block 2: 1 transaction
let tx_hash_2 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 2).await,
)
.await?;
wait_for_pending_tx(&client, tx_hash_2).await;
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// Verify block contents via RPC
let tx0: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_1]).await?;
let tx2: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_2]).await?;
assert_eq!(tx0.expect("tx0").block_number, Some(1));
assert_eq!(tx1.expect("tx1").block_number, Some(1));
assert_eq!(tx2.expect("tx2").block_number, Some(2));
// Poll RocksDB and verify global tx_number continuity
let all_tx_hashes = [tx_hash_0, tx_hash_1, tx_hash_2];
let mut tx_numbers = Vec::new();
for tx_hash in &all_tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify they form a continuous sequence {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be globally continuous: 0, 1, 2");
// Re-query block 1 txs after block 2 is mined (regression guard)
let tx0_again: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
assert!(tx0_again.is_some(), "Block 1 tx should still be queryable after block 2");
Ok(())
}
/// Pending transactions should NOT appear in `RocksDB` until mined.
#[tokio::test]
async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
// Inject tx but do NOT mine
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Verify tx is in pending pool via RPC
let client = nodes[0].rpc_client().expect("RPC client");
wait_for_pending_tx(&client, tx_hash).await;
let pending_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert!(pending_tx.is_some(), "Pending tx should be visible via RPC");
assert!(pending_tx.unwrap().block_number.is_none(), "Pending tx should have no block_number");
// Assert tx is NOT in RocksDB before mining (single check - tx is confirmed pending)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let tx_number: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(tx_hash)?;
assert!(
tx_number.is_none(),
"Pending tx should NOT be in RocksDB before mining, but found tx_number={:?}",
tx_number
);
// Now mine the block
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Poll until tx appears in RocksDB
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
// Verify tx is now mined via RPC
let mined_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert_eq!(mined_tx.expect("mined tx").block_number, Some(1));
Ok(())
}

View File

@@ -148,8 +148,8 @@ pub struct TreeConfig {
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable V2 storage proofs.
disable_proof_v2: bool,
/// Whether to enable V2 storage proofs.
enable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
}
@@ -179,7 +179,7 @@ impl Default for TreeConfig {
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
enable_proof_v2: false,
disable_cache_metrics: false,
}
}
@@ -211,7 +211,7 @@ impl TreeConfig {
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_proof_v2: bool,
enable_proof_v2: bool,
disable_cache_metrics: bool,
) -> Self {
Self {
@@ -237,7 +237,7 @@ impl TreeConfig {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
disable_cache_metrics,
}
}
@@ -280,8 +280,7 @@ impl TreeConfig {
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if !self.disable_proof_v2 &&
self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
@@ -519,14 +518,14 @@ impl TreeConfig {
self
}
/// Return whether V2 storage proofs are disabled.
pub const fn disable_proof_v2(&self) -> bool {
self.disable_proof_v2
/// Return whether V2 storage proofs are enabled.
pub const fn enable_proof_v2(&self) -> bool {
self.enable_proof_v2
}
/// Setter for whether to disable V2 storage proofs.
pub const fn with_disable_proof_v2(mut self, disable_proof_v2: bool) -> Self {
self.disable_proof_v2 = disable_proof_v2;
/// Setter for whether to enable V2 storage proofs.
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
self.enable_proof_v2 = enable_proof_v2;
self
}

View File

@@ -101,7 +101,7 @@ where
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
blockchain_db,
consensus,
payload_validator,

View File

@@ -312,7 +312,14 @@ impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
match self.caches.get_or_try_insert_account_with(*address, || {
self.state_provider.basic_account(address)
})? {
CachedStatus::NotCached(value) | CachedStatus::Cached(value) => Ok(value),
CachedStatus::NotCached(value) => {
self.metrics.account_cache_misses.increment(1);
Ok(value)
}
CachedStatus::Cached(value) => {
self.metrics.account_cache_hits.increment(1);
Ok(value)
}
}
} else if let Some(account) = self.caches.account_cache.get(address) {
self.metrics.account_cache_hits.increment(1);
@@ -343,7 +350,14 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
})? {
CachedStatus::NotCached(value) | CachedStatus::Cached(value) => {
CachedStatus::NotCached(value) => {
self.metrics.storage_cache_misses.increment(1);
// The slot that was never written to is indistinguishable from a slot
// explicitly set to zero. We return `None` in both cases.
Ok(Some(value).filter(|v| !v.is_zero()))
}
CachedStatus::Cached(value) => {
self.metrics.storage_cache_hits.increment(1);
// The slot that was never written to is indistinguishable from a slot
// explicitly set to zero. We return `None` in both cases.
Ok(Some(value).filter(|v| !v.is_zero()))
@@ -365,7 +379,14 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
match self.caches.get_or_try_insert_code_with(*code_hash, || {
self.state_provider.bytecode_by_hash(code_hash)
})? {
CachedStatus::NotCached(code) | CachedStatus::Cached(code) => Ok(code),
CachedStatus::NotCached(code) => {
self.metrics.code_cache_misses.increment(1);
Ok(code)
}
CachedStatus::Cached(code) => {
self.metrics.code_cache_hits.increment(1);
Ok(code)
}
}
} else if let Some(code) = self.caches.code_cache.get(code_hash) {
self.metrics.code_cache_hits.increment(1);

View File

@@ -1492,10 +1492,6 @@ where
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,

View File

@@ -238,7 +238,7 @@ where
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
let v2_proofs_enabled = config.enable_proof_v2();
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
@@ -286,7 +286,9 @@ where
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
config
.multiproof_chunking_enabled()
.then_some(config.effective_multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
)

View File

@@ -563,7 +563,6 @@ where
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();

View File

@@ -77,22 +77,8 @@ impl<R: Receipt> ReceiptRootTaskHandle<R> {
receipt_with_bloom.encode_2718(&mut encode_buf);
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
match builder.push(indexed_receipt.index, &encode_buf) {
Ok(()) => {
received_count += 1;
}
Err(err) => {
// If a duplicate or out-of-bounds index is streamed, skip it and
// fall back to computing the receipt root from the full receipts
// vector later.
tracing::error!(
target: "engine::tree::payload_processor",
index = indexed_receipt.index,
?err,
"Receipt root task received invalid receipt index, skipping"
);
}
}
builder.push_unchecked(indexed_receipt.index, &encode_buf);
received_count += 1;
}
let Ok(root) = builder.finalize() else {

View File

@@ -792,11 +792,6 @@ where
// Execute transactions
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
let mut transactions = transactions.into_iter();
// Some executors may execute transactions that do not append receipts during the
// main loop (e.g., system transactions whose receipts are added during finalization).
// In that case, invoking the callback on every transaction would resend the previous
// receipt with the same index and can panic the ordered root builder.
let mut last_sent_len = 0usize;
loop {
// Measure time spent waiting for next transaction from iterator
// (e.g., parallel signature recovery)
@@ -823,14 +818,10 @@ where
let gas_used = executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
// Send the latest receipt to the background task for incremental root computation
if let Some(receipt) = executor.receipts().last() {
let tx_index = executor.receipts().len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
enter.record("gas_used", gas_used);

View File

@@ -251,7 +251,7 @@ pub async fn test_exex_context_with_chain_spec(
db,
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
)?;
let genesis_hash = init_genesis(&provider_factory)?;

View File

@@ -35,7 +35,7 @@ pub struct DefaultEngineValues {
allow_unwind_canonical_header: bool,
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
disable_proof_v2: bool,
enable_proof_v2: bool,
cache_metrics_disabled: bool,
}
@@ -161,9 +161,9 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable proof V2 by default
pub const fn with_disable_proof_v2(mut self, v: bool) -> Self {
self.disable_proof_v2 = v;
/// Set whether to enable proof V2 by default
pub const fn with_enable_proof_v2(mut self, v: bool) -> Self {
self.enable_proof_v2 = v;
self
}
@@ -195,7 +195,7 @@ impl Default for DefaultEngineValues {
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
disable_proof_v2: false,
enable_proof_v2: false,
cache_metrics_disabled: false,
}
}
@@ -317,9 +317,9 @@ pub struct EngineArgs {
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
pub account_worker_count: Option<usize>,
/// Disable V2 storage proofs for state root calculations
#[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)]
pub disable_proof_v2: bool,
/// Enable V2 storage proofs for state root calculations
#[arg(long = "engine.enable-proof-v2", default_value_t = DefaultEngineValues::get_global().enable_proof_v2)]
pub enable_proof_v2: bool,
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
@@ -348,7 +348,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
cache_metrics_disabled,
} = DefaultEngineValues::get_global().clone();
Self {
@@ -374,7 +374,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
cache_metrics_disabled,
}
}
@@ -410,7 +410,7 @@ impl EngineArgs {
config = config.with_account_worker_count(count);
}
config = config.with_disable_proof_v2(self.disable_proof_v2);
config = config.with_enable_proof_v2(self.enable_proof_v2);
config = config.without_cache_metrics(self.cache_metrics_disabled);
config
@@ -462,7 +462,7 @@ mod tests {
allow_unwind_canonical_header: true,
storage_worker_count: Some(16),
account_worker_count: Some(8),
disable_proof_v2: false,
enable_proof_v2: false,
cache_metrics_disabled: true,
};

View File

@@ -1,27 +1,13 @@
//! clap [Args](clap::Args) for `RocksDB` table routing configuration
use clap::{ArgAction, Args};
use reth_storage_api::StorageSettings;
/// Default value for `tx_hash` routing flag.
/// Default value for `RocksDB` routing flags.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_tx_hash_in_rocksdb() -> bool {
StorageSettings::base().transaction_hash_numbers_in_rocksdb
}
/// Default value for `storages_history` routing flag.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_storages_history_in_rocksdb() -> bool {
StorageSettings::base().storages_history_in_rocksdb
}
/// Default value for `account_history` routing flag.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_account_history_in_rocksdb() -> bool {
StorageSettings::base().account_history_in_rocksdb
/// When the `edge` feature is enabled, defaults to `true` to enable edge storage features.
/// Otherwise defaults to `false` for legacy behavior.
const fn default_rocksdb_flag() -> bool {
cfg!(feature = "edge")
}
/// Parameters for `RocksDB` table routing configuration.
@@ -42,21 +28,21 @@ pub struct RocksDbArgs {
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.tx-hash", default_value_t = default_tx_hash_in_rocksdb(), action = ArgAction::Set)]
#[arg(long = "rocksdb.tx-hash", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub tx_hash: bool,
/// Route storages history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `false`.
#[arg(long = "rocksdb.storages-history", default_value_t = default_storages_history_in_rocksdb(), action = ArgAction::Set)]
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.storages-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub storages_history: bool,
/// Route account history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `false`.
#[arg(long = "rocksdb.account-history", default_value_t = default_account_history_in_rocksdb(), action = ArgAction::Set)]
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.account-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub account_history: bool,
}
@@ -64,9 +50,9 @@ impl Default for RocksDbArgs {
fn default() -> Self {
Self {
all: false,
tx_hash: default_tx_hash_in_rocksdb(),
storages_history: default_storages_history_in_rocksdb(),
account_history: default_account_history_in_rocksdb(),
tx_hash: default_rocksdb_flag(),
storages_history: default_rocksdb_flag(),
account_history: default_rocksdb_flag(),
}
}
}
@@ -120,25 +106,7 @@ mod tests {
fn test_parse_all_flag() {
let args = CommandParser::<RocksDbArgs>::parse_from(["reth", "--rocksdb.all"]).args;
assert!(args.all);
assert_eq!(args.tx_hash, default_tx_hash_in_rocksdb());
}
#[test]
fn test_defaults_match_storage_settings() {
let args = RocksDbArgs::default();
let settings = StorageSettings::base();
assert_eq!(
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
"tx_hash default should match StorageSettings::base()"
);
assert_eq!(
args.storages_history, settings.storages_history_in_rocksdb,
"storages_history default should match StorageSettings::base()"
);
assert_eq!(
args.account_history, settings.account_history_in_rocksdb,
"account_history default should match StorageSettings::base()"
);
assert_eq!(args.tx_hash, default_rocksdb_flag());
}
#[test]

View File

@@ -90,7 +90,7 @@ impl StaticFilesArgs {
/// args.
///
/// If `minimal` is true, uses [`MINIMAL_BLOCKS_PER_FILE`] blocks per file as the default for
/// all segments.
/// headers, transactions, and receipts segments.
pub fn merge_with_config(&self, config: StaticFilesConfig, minimal: bool) -> StaticFilesConfig {
let minimal_blocks_per_file = minimal.then_some(MINIMAL_BLOCKS_PER_FILE);
StaticFilesConfig {
@@ -109,15 +109,12 @@ impl StaticFilesArgs {
.or(config.blocks_per_file.receipts),
transaction_senders: self
.blocks_per_file_transaction_senders
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.transaction_senders),
account_change_sets: self
.blocks_per_file_account_change_sets
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.account_change_sets),
storage_change_sets: self
.blocks_per_file_storage_change_sets
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.storage_change_sets),
},
}

View File

@@ -507,7 +507,7 @@ impl RethTransactionPoolConfig for TxPoolArgs {
PoolConfig {
local_transactions_config: LocalTransactionConfig {
no_exemptions: self.no_locals,
local_addresses: self.locals.iter().copied().collect(),
local_addresses: self.locals.clone().into_iter().collect(),
propagate_local_transactions: !self.no_local_transactions_propagation,
},
pending_limit: SubPoolLimit {

View File

@@ -149,7 +149,21 @@ where
let elapsed = start.elapsed();
self.metrics.duration_seconds.record(elapsed);
output.debug_log(tip_block_number, deleted_entries, elapsed);
let message = match output.progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
PruneProgress::Finished => "Pruner finished",
};
debug!(
target: "pruner",
%tip_block_number,
?elapsed,
?deleted_entries,
?limiter,
?output,
?stats,
"{message}",
);
self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });

View File

@@ -18,7 +18,6 @@ alloy-primitives.workspace = true
derive_more.workspace = true
strum = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
modular-bitfield = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
@@ -43,9 +42,8 @@ std = [
"derive_more/std",
"serde?/std",
"serde_json/std",
"strum/std",
"thiserror/std",
"tracing/std",
"strum/std",
]
test-utils = [
"std",

View File

@@ -1,9 +1,7 @@
use crate::{PruneCheckpoint, PruneMode, PruneSegment};
use alloc::{format, string::ToString, vec::Vec};
use alloc::vec::Vec;
use alloy_primitives::{BlockNumber, TxNumber};
use core::time::Duration;
use derive_more::Display;
use tracing::debug;
/// Pruner run output.
#[derive(Debug)]
@@ -20,49 +18,6 @@ impl From<PruneProgress> for PrunerOutput {
}
}
impl PrunerOutput {
/// Logs a human-readable summary of the pruner run at DEBUG level.
///
/// Format: `"Pruner finished tip=24328929 deleted=10886 elapsed=148ms
/// segments=AccountHistory[24318865, done] ..."`
#[inline]
pub fn debug_log(
&self,
tip_block_number: BlockNumber,
deleted_entries: usize,
elapsed: Duration,
) {
let message = match self.progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted, has more data",
PruneProgress::Finished => "Pruner finished",
};
let segments: Vec<_> = self
.segments
.iter()
.filter(|(_, seg)| seg.pruned > 0)
.map(|(segment, seg)| {
let block = seg
.checkpoint
.and_then(|c| c.block_number)
.map(|b| b.to_string())
.unwrap_or_else(|| "?".to_string());
let status = if seg.progress.is_finished() { "done" } else { "in_progress" };
format!("{segment}[{block}, {status}]")
})
.collect();
debug!(
target: "pruner",
%tip_block_number,
deleted_entries,
?elapsed,
segments = %segments.join(" "),
"{message}",
);
}
}
/// Represents information of a pruner run for a segment.
#[derive(Debug, Clone, PartialEq, Eq, Display)]
#[display("(table={segment}, pruned={pruned}, status={progress})")]

View File

@@ -318,9 +318,9 @@ where
.map(|inner| {
let full_log = alloy_rpc_types_eth::Log {
inner,
block_hash: Some(current_block.hash()),
block_number: Some(current_block.number()),
block_timestamp: Some(current_block.timestamp()),
block_hash: None,
block_number: None,
block_timestamp: None,
transaction_hash: Some(*item.tx.tx_hash()),
transaction_index: Some(tx_index as u64),
log_index: Some(log_index),

View File

@@ -63,9 +63,9 @@ impl StorageSettings {
transaction_senders_in_static_files: true,
account_changesets_in_static_files: true,
storage_changesets_in_static_files: true,
storages_history_in_rocksdb: true,
storages_history_in_rocksdb: false,
transaction_hash_numbers_in_rocksdb: true,
account_history_in_rocksdb: true,
account_history_in_rocksdb: false,
}
}

View File

@@ -20,9 +20,9 @@ pub type DupCursorMutTy<TX, T> = <TX as DbTxMut>::DupCursorMut<T>;
/// Read only transaction
pub trait DbTx: Debug + Send {
/// Cursor type for this read-only transaction
type Cursor<T: Table>: DbCursorRO<T> + Send;
type Cursor<T: Table>: DbCursorRO<T> + Send + Sync;
/// `DupCursor` type for this read-only transaction
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send;
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send + Sync;
/// Get value by an owned key
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, DatabaseError>;
@@ -51,13 +51,14 @@ pub trait DbTx: Debug + Send {
/// Read write transaction that allows writing to database
pub trait DbTxMut: Send {
/// Read-Write Cursor type
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send;
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send + Sync;
/// Read-Write `DupCursor` type
type DupCursorMut<T: DupSort>: DbDupCursorRW<T>
+ DbCursorRW<T>
+ DbDupCursorRO<T>
+ DbCursorRO<T>
+ Send;
+ Send
+ Sync;
/// Put value to database
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;

View File

@@ -6,12 +6,7 @@ use alloy_primitives::{keccak256, map::HashMap, Address, B256, U256};
use reth_chainspec::EthChainSpec;
use reth_codecs::Compact;
use reth_config::config::EtlConfig;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
tables,
transaction::DbTxMut,
BlockNumberList, DatabaseError,
};
use reth_db_api::{tables, transaction::DbTxMut, DatabaseError};
use reth_etl::Collector;
use reth_execution_errors::StateRootError;
use reth_primitives_traits::{
@@ -19,11 +14,11 @@ use reth_primitives_traits::{
};
use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, EitherWriter,
ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider,
MetadataWriter, NodePrimitivesProvider, OriginalValuesKnown, ProviderError, RevertsInit,
RocksDBProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider, MetadataWriter,
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter,
StateWriteConfig, StateWriter, StaticFileProviderFactory, StorageSettings,
StorageSettingsCache, TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
@@ -108,9 +103,6 @@ where
+ TrieWriter
+ MetadataWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<PF::ProviderRW>,
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
{
@@ -146,9 +138,6 @@ where
+ TrieWriter
+ MetadataWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<PF::ProviderRW>,
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
{
@@ -397,64 +386,37 @@ where
}
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_genesis_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter + ChainSpecProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_history(provider, alloc, genesis_block_number)
}
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter,
{
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_accounts_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, _) in alloc.clone() {
writer.upsert_account_history(ShardedKey::last(*addr), &list)?;
}
trace!(target: "reth::cli", "Inserted account history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
let account_transitions = alloc.clone().map(|(addr, _)| (*addr, [block]));
provider.insert_account_history_index(account_transitions)?;
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_storages_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, account) in alloc {
if let Some(storage) = &account.storage {
for key in storage.keys() {
writer.upsert_storage_history(StorageShardedKey::last(*addr, *key), &list)?;
}
}
}
trace!(target: "reth::cli", "Inserted storage history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
trace!(target: "reth::cli", "Inserted account history");
let storage_transitions = alloc
.filter_map(|(addr, account)| account.storage.as_ref().map(|storage| (addr, storage)))
.flat_map(|(addr, storage)| storage.keys().map(|key| ((*addr, *key), [block])));
provider.insert_storage_history_index(storage_transitions)?;
trace!(target: "reth::cli", "Inserted storage history");
Ok(())
}
@@ -530,9 +492,6 @@ where
+ HashingWriter
+ TrieWriter
+ StateWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<Provider>,
{
if etl_config.file_size == 0 {
@@ -669,9 +628,6 @@ where
+ HashingWriter
+ HistoryWriter
+ StateWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<Provider>,
{
let accounts_len = collector.len();
@@ -932,59 +888,27 @@ mod tests {
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
init_genesis(&factory).unwrap();
let expected_accounts = vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap()),
];
let expected_storages = vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap(),
)];
let provider = factory.provider().unwrap();
let collect_from_mdbx = |factory: &ProviderFactory<MockNodeTypesWithDB>| {
let provider = factory.provider().unwrap();
let tx = provider.tx_ref();
(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx).unwrap(),
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx).unwrap(),
)
};
let tx = provider.tx_ref();
#[cfg(feature = "edge")]
{
let settings = factory.cached_storage_settings();
let rocksdb = factory.rocksdb_provider();
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
.expect("failed to collect"),
vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap())
],
);
let collect_rocksdb = |rocksdb: &reth_provider::providers::RocksDBProvider| {
(
rocksdb
.iter::<tables::AccountsHistory>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
rocksdb
.iter::<tables::StoragesHistory>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
)
};
let (accounts, storages) = if settings.account_history_in_rocksdb {
collect_rocksdb(&rocksdb)
} else {
collect_from_mdbx(&factory)
};
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
#[cfg(not(feature = "edge"))]
{
let (accounts, storages) = collect_from_mdbx(&factory);
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}
#[test]

View File

@@ -137,8 +137,7 @@ where
for (k, _, v, _) in input {
crsr.append(k, &v).expect("submit");
}
drop(crsr);
tx.commit().unwrap()
tx.inner.commit().unwrap()
},
)
});
@@ -158,8 +157,8 @@ where
let (k, _, v, _) = input.get(index).unwrap().clone();
crsr.insert(k, &v).expect("submit");
}
drop(crsr);
tx.commit().unwrap()
tx.inner.commit().unwrap()
},
)
});
@@ -220,8 +219,7 @@ where
for (k, _, v, _) in input {
crsr.append_dup(k, v).expect("submit");
}
drop(crsr);
tx.commit().unwrap()
tx.inner.commit().unwrap()
},
)
});
@@ -241,7 +239,7 @@ where
let (k, _, v, _) = input.get(index).unwrap().clone();
tx.put::<T>(k, v).unwrap();
}
tx.commit().unwrap()
tx.inner.commit().unwrap();
},
)
});

View File

@@ -16,9 +16,10 @@ use reth_db_api::{
cursor::DbCursorRW,
database::Database,
table::{Table, TableRow},
transaction::{DbTx, DbTxMut},
transaction::DbTxMut,
};
use reth_fs_util as fs;
use std::hint::black_box;
mod utils;
use utils::*;
@@ -177,13 +178,17 @@ fn append<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value
where
T: Table,
{
let tx = db.tx_mut().expect("tx");
let mut crsr = tx.cursor_write::<T>().expect("cursor");
for (k, v) in input {
crsr.append(k, &v).expect("submit");
{
let tx = db.tx_mut().expect("tx");
let mut crsr = tx.cursor_write::<T>().expect("cursor");
black_box({
for (k, v) in input {
crsr.append(k, &v).expect("submit");
}
tx.inner.commit().unwrap()
});
}
drop(crsr);
tx.commit().unwrap();
db
}
@@ -191,13 +196,17 @@ fn insert<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value
where
T: Table,
{
let tx = db.tx_mut().expect("tx");
let mut crsr = tx.cursor_write::<T>().expect("cursor");
for (k, v) in input {
crsr.insert(k, &v).expect("submit");
{
let tx = db.tx_mut().expect("tx");
let mut crsr = tx.cursor_write::<T>().expect("cursor");
black_box({
for (k, v) in input {
crsr.insert(k, &v).expect("submit");
}
tx.inner.commit().unwrap()
});
}
drop(crsr);
tx.commit().unwrap();
db
}
@@ -205,11 +214,16 @@ fn put<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value)>)
where
T: Table,
{
let tx = db.tx_mut().expect("tx");
for (k, v) in input {
tx.put::<T>(k, v).expect("submit");
{
let tx = db.tx_mut().expect("tx");
black_box({
for (k, v) in input {
tx.put::<T>(k, v).expect("submit");
}
tx.inner.commit().unwrap()
});
}
tx.commit().unwrap();
db
}
@@ -229,11 +243,11 @@ where
T: Table,
{
db.view(|tx| {
let table_db = tx.inner().open_db(Some(T::NAME)).map_err(|_| "Could not open db.").unwrap();
let table_db = tx.inner.open_db(Some(T::NAME)).map_err(|_| "Could not open db.").unwrap();
println!(
"{:?}\n",
tx.inner()
tx.inner
.db_stat(table_db.dbi())
.map_err(|_| format!("Could not find table: {}", T::NAME))
.map(|stats| {

View File

@@ -5,7 +5,7 @@ use alloy_primitives::Bytes;
use reth_db::{test_utils::create_test_rw_db_with_path, DatabaseEnv};
use reth_db_api::{
table::{Compress, Encode, Table, TableRow},
transaction::{DbTx, DbTxMut},
transaction::DbTxMut,
Database,
};
use reth_fs_util as fs;
@@ -68,7 +68,7 @@ where
for (k, _, v, _) in pair.clone() {
tx.put::<T>(k, v).expect("submit");
}
tx.commit().unwrap();
tx.inner.commit().unwrap();
}
db.into_inner_db()

View File

@@ -274,11 +274,10 @@ impl DatabaseMetrics for DatabaseEnv {
let _ = self
.view(|tx| {
for table in Tables::ALL.iter().map(Tables::name) {
let table_db =
tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
let stats = tx
.inner()
.inner
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {table}"))?;

View File

@@ -30,7 +30,7 @@ const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
#[derive(Debug)]
pub struct Tx<K: TransactionKind> {
/// Libmdbx-sys transaction.
inner: Transaction<K>,
pub inner: Transaction<K>,
/// Cached MDBX DBIs for reuse.
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
@@ -62,11 +62,6 @@ impl<K: TransactionKind> Tx<K> {
Ok(Self { inner, dbis, metrics_handler })
}
/// Returns a reference to the inner libmdbx transaction.
pub const fn inner(&self) -> &Transaction<K> {
&self.inner
}
/// Gets this transaction ID.
pub fn id(&self) -> reth_libmdbx::Result<u64> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))

View File

@@ -2,29 +2,49 @@ use crate::{Error, TransactionKind};
use derive_more::{Debug, Deref, DerefMut};
use std::{borrow::Cow, slice};
/// Implement this to be able to decode data values
pub trait TableObject: Sized {
/// Decodes the object from the given bytes.
fn decode(data_val: &[u8]) -> Result<Self, Error>;
/// A marker trait for types that can be deserialized from a database value
/// without borrowing from the transaction.
///
/// Types implementing this trait can be used with iterators that need to
/// return owned values. This is automatically implemented for any type that
/// implements [`TableObject<'a>`] for all lifetimes `'a`.
pub trait TableObjectOwned: for<'de> TableObject<'de> {
/// Decodes the object from the given bytes, without borrowing them.
fn decode(data_val: &[u8]) -> Result<Self, Error> {
<Self as TableObject<'_>>::decode_borrow(Cow::Borrowed(data_val))
}
}
impl<T> TableObjectOwned for T where T: for<'de> TableObject<'de> {}
/// Decodes values read from the database into Rust types.
///
/// Implement this to be able to decode data values. The lifetime parameter `'a`
/// allows types to borrow data from the transaction when appropriate (e.g.,
/// `Cow<'a, [u8]>`).
pub trait TableObject<'a>: Sized {
/// Creates the object from a `Cow` of bytes. This allows for efficient
/// handling of both owned and borrowed data.
fn decode_borrow(data: Cow<'a, [u8]>) -> Result<Self, Error>;
/// Decodes the value directly from the given MDBX_val pointer.
///
/// # Safety
///
/// This should only in the context of an MDBX transaction.
/// This should only be called in the context of an MDBX transaction.
#[doc(hidden)]
unsafe fn decode_val<K: TransactionKind>(
_: *const ffi::MDBX_txn,
tx: *const ffi::MDBX_txn,
data_val: ffi::MDBX_val,
) -> Result<Self, Error> {
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
Self::decode(s)
let cow = unsafe { Cow::<'a, [u8]>::decode_val::<K>(tx, data_val)? };
Self::decode_borrow(cow)
}
}
impl TableObject for Cow<'_, [u8]> {
fn decode(_: &[u8]) -> Result<Self, Error> {
unreachable!()
impl<'a> TableObject<'a> for Cow<'a, [u8]> {
fn decode_borrow(data: Cow<'a, [u8]>) -> Result<Self, Error> {
Ok(data)
}
#[doc(hidden)]
@@ -51,14 +71,22 @@ impl TableObject for Cow<'_, [u8]> {
}
}
impl TableObject for Vec<u8> {
fn decode(data_val: &[u8]) -> Result<Self, Error> {
Ok(data_val.to_vec())
impl TableObject<'_> for Vec<u8> {
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
Ok(data.into_owned())
}
unsafe fn decode_val<K: TransactionKind>(
_tx: *const ffi::MDBX_txn,
data_val: ffi::MDBX_val,
) -> Result<Self, Error> {
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
Ok(s.to_vec())
}
}
impl TableObject for () {
fn decode(_: &[u8]) -> Result<Self, Error> {
impl TableObject<'_> for () {
fn decode_borrow(_: Cow<'_, [u8]>) -> Result<Self, Error> {
Ok(())
}
@@ -74,19 +102,26 @@ impl TableObject for () {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deref, DerefMut)]
pub struct ObjectLength(pub usize);
impl TableObject for ObjectLength {
fn decode(data_val: &[u8]) -> Result<Self, Error> {
Ok(Self(data_val.len()))
impl TableObject<'_> for ObjectLength {
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
Ok(Self(data.len()))
}
unsafe fn decode_val<K: TransactionKind>(
_tx: *const ffi::MDBX_txn,
data_val: ffi::MDBX_val,
) -> Result<Self, Error> {
Ok(Self(data_val.iov_len))
}
}
impl<const LEN: usize> TableObject for [u8; LEN] {
fn decode(data_val: &[u8]) -> Result<Self, Error> {
if data_val.len() != LEN {
return Err(Error::DecodeErrorLenDiff)
impl<const LEN: usize> TableObject<'_> for [u8; LEN] {
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
if data.len() != LEN {
return Err(Error::DecodeErrorLenDiff);
}
let mut a = [0; LEN];
a[..].copy_from_slice(data_val);
a[..].copy_from_slice(&data);
Ok(a)
}
}

View File

@@ -3,7 +3,7 @@ use crate::{
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
TableObject, Transaction,
TableObjectOwned, Transaction,
};
use ffi::{
MDBX_cursor_op, MDBX_FIRST, MDBX_FIRST_DUP, MDBX_GET_BOTH, MDBX_GET_BOTH_RANGE,
@@ -11,7 +11,7 @@ use ffi::{
MDBX_NEXT_MULTIPLE, MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE,
MDBX_PREV_NODUP, MDBX_SET, MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE,
};
use std::{borrow::Cow, ffi::c_void, fmt, marker::PhantomData, mem, ptr};
use std::{ffi::c_void, fmt, marker::PhantomData, mem, ptr};
/// A cursor for navigating the items within a database.
pub struct Cursor<K>
@@ -58,8 +58,8 @@ where
self.cursor
}
/// Returns an iterator over the raw key value slices.
pub fn iter_slices<'a>(self) -> IntoIter<K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
/// Returns an iterator over the raw key value bytes (as owned `Vec<u8>`).
pub fn iter_slices(self) -> IntoIter<K, Vec<u8>, Vec<u8>> {
self.into_iter()
}
@@ -67,8 +67,8 @@ where
#[expect(clippy::should_implement_trait)]
pub fn into_iter<Key, Value>(self) -> IntoIter<K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
IntoIter::new(self, MDBX_NEXT, MDBX_NEXT)
}
@@ -82,8 +82,8 @@ where
op: MDBX_cursor_op,
) -> Result<(Option<Key>, Value, bool)>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
unsafe {
let mut key_val = slice_to_val(key);
@@ -119,7 +119,7 @@ where
op: MDBX_cursor_op,
) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
let (_, v, _) = mdbx_try_optional!(self.get::<(), Value>(key, data, op));
@@ -133,8 +133,8 @@ where
op: MDBX_cursor_op,
) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
let (k, v, _) = mdbx_try_optional!(self.get(key, data, op));
@@ -144,8 +144,8 @@ where
/// Position at first key/data item.
pub fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_FIRST)
}
@@ -153,7 +153,7 @@ where
/// [`DatabaseFlags::DUP_SORT`]-only: Position at first data item of current key.
pub fn first_dup<Value>(&mut self) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(None, None, MDBX_FIRST_DUP)
}
@@ -161,7 +161,7 @@ where
/// [`DatabaseFlags::DUP_SORT`]-only: Position at key/data pair.
pub fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(Some(k), Some(v), MDBX_GET_BOTH)
}
@@ -170,7 +170,7 @@ where
/// equal to specified data.
pub fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(Some(k), Some(v), MDBX_GET_BOTH_RANGE)
}
@@ -178,8 +178,8 @@ where
/// Return key/data at current cursor position.
pub fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_GET_CURRENT)
}
@@ -188,7 +188,7 @@ where
/// Move cursor to prepare for [`Self::next_multiple()`].
pub fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(None, None, MDBX_GET_MULTIPLE)
}
@@ -196,8 +196,8 @@ where
/// Position at last key/data item.
pub fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_LAST)
}
@@ -205,7 +205,7 @@ where
/// DupSort-only: Position at last data item of current key.
pub fn last_dup<Value>(&mut self) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(None, None, MDBX_LAST_DUP)
}
@@ -214,8 +214,8 @@ where
#[expect(clippy::should_implement_trait)]
pub fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_NEXT)
}
@@ -223,8 +223,8 @@ where
/// [`DatabaseFlags::DUP_SORT`]-only: Position at next data item of current key.
pub fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_NEXT_DUP)
}
@@ -233,8 +233,8 @@ where
/// cursor position. Move cursor to prepare for `MDBX_NEXT_MULTIPLE`.
pub fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_NEXT_MULTIPLE)
}
@@ -242,8 +242,8 @@ where
/// Position at first data item of next key.
pub fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_NEXT_NODUP)
}
@@ -251,8 +251,8 @@ where
/// Position at previous data item.
pub fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_PREV)
}
@@ -260,8 +260,8 @@ where
/// [`DatabaseFlags::DUP_SORT`]-only: Position at previous data item of current key.
pub fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_PREV_DUP)
}
@@ -269,8 +269,8 @@ where
/// Position at last data item of previous key.
pub fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_PREV_NODUP)
}
@@ -278,7 +278,7 @@ where
/// Position at specified key.
pub fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
where
Value: TableObject,
Value: TableObjectOwned,
{
self.get_value(Some(key), None, MDBX_SET)
}
@@ -286,8 +286,8 @@ where
/// Position at specified key, return both key and data.
pub fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(Some(key), None, MDBX_SET_KEY)
}
@@ -295,8 +295,8 @@ where
/// Position at first key greater than or equal to specified key.
pub fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(Some(key), None, MDBX_SET_RANGE)
}
@@ -305,8 +305,8 @@ where
/// duplicate data items.
pub fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
self.get_full(None, None, MDBX_PREV_MULTIPLE)
}
@@ -322,8 +322,8 @@ where
/// exactly and [true] if the next pair was returned.
pub fn set_lowerbound<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(bool, Key, Value)>>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
let (k, v, found) = mdbx_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND));
@@ -340,8 +340,8 @@ where
/// the next key.
pub fn iter<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
Iter::new(self, ffi::MDBX_NEXT, ffi::MDBX_NEXT)
}
@@ -353,8 +353,8 @@ where
/// the next key.
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
Iter::new(self, ffi::MDBX_FIRST, ffi::MDBX_NEXT)
}
@@ -366,8 +366,8 @@ where
/// the next key.
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
let res: Result<Option<((), ())>> = self.set_range(key);
if let Err(error) = res {
@@ -381,8 +381,8 @@ where
/// Each item will be returned as an iterator of its duplicates.
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
IterDup::new(self, ffi::MDBX_NEXT)
}
@@ -391,8 +391,8 @@ where
/// database. Each item will be returned as an iterator of its duplicates.
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
IterDup::new(self, ffi::MDBX_FIRST)
}
@@ -401,8 +401,8 @@ where
/// key. Each item will be returned as an iterator of its duplicates.
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
let res: Result<Option<((), ())>> = self.set_range(key);
if let Err(error) = res {
@@ -414,8 +414,8 @@ where
/// Iterate over the duplicates of the item in the database with the given key.
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
let res: Result<Option<()>> = self.set(key);
match res {
@@ -510,8 +510,8 @@ unsafe impl<K> Sync for Cursor<K> where K: TransactionKind {}
pub enum IntoIter<K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// An iterator that returns an error on every call to [`Iter::next()`].
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
@@ -541,8 +541,8 @@ where
impl<K, Key, Value> IntoIter<K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// Creates a new iterator backed by the given cursor.
fn new(cursor: Cursor<K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
@@ -553,8 +553,8 @@ where
impl<K, Key, Value> Iterator for IntoIter<K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
type Item = Result<(Key, Value)>;
@@ -601,8 +601,8 @@ where
pub enum Iter<'cur, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// An iterator that returns an error on every call to [`Iter::next()`].
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
@@ -632,8 +632,8 @@ where
impl<'cur, K, Key, Value> Iter<'cur, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// Creates a new iterator backed by the given cursor.
fn new(
@@ -648,8 +648,8 @@ where
impl<K, Key, Value> Iterator for Iter<'_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
type Item = Result<(Key, Value)>;
@@ -698,8 +698,8 @@ where
pub enum IterDup<'cur, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// An iterator that returns an error on every call to `Iter.next()`.
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
@@ -726,8 +726,8 @@ where
impl<'cur, K, Key, Value> IterDup<'cur, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
/// Creates a new iterator backed by the given cursor.
fn new(cursor: &'cur mut Cursor<K>, op: MDBX_cursor_op) -> Self {
@@ -738,8 +738,8 @@ where
impl<K, Key, Value> fmt::Debug for IterDup<'_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IterDup").finish()
@@ -749,8 +749,8 @@ where
impl<K, Key, Value> Iterator for IterDup<'_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
Key: TableObjectOwned,
Value: TableObjectOwned,
{
type Item = IntoIter<K, Key, Value>;

View File

@@ -12,7 +12,7 @@
pub extern crate reth_mdbx_sys as ffi;
pub use crate::{
codec::*,
codec::{ObjectLength, TableObject, TableObjectOwned},
cursor::{Cursor, Iter, IterDup},
database::Database,
environment::{
@@ -21,7 +21,8 @@ pub use crate::{
},
error::{Error, Result},
flags::*,
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
transaction::{CommitLatency, Transaction, TransactionKind, TransactionPtr, RO, RW},
tx_access::TxPtrAccess,
};
#[cfg(feature = "read-tx-timeouts")]
@@ -34,6 +35,7 @@ mod environment;
mod error;
mod flags;
mod transaction;
mod tx_access;
mod txn_manager;
#[cfg(test)]

View File

@@ -3,8 +3,9 @@ use crate::{
environment::Environment,
error::{mdbx_result, Result},
flags::{DatabaseFlags, WriteFlags},
tx_access::TxPtrAccess,
txn_manager::{TxnManagerMessage, TxnPtr},
Cursor, Error, Stat, TableObject,
Cursor, Error, Stat, TableObjectOwned,
};
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use parking_lot::{Mutex, MutexGuard};
@@ -152,7 +153,7 @@ where
/// [None] will be returned.
pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
where
Key: TableObject,
Key: TableObjectOwned,
{
let key_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
@@ -537,8 +538,12 @@ impl Transaction<RW> {
}
/// A shareable pointer to an MDBX transaction.
///
/// This type provides synchronized access to the underlying MDBX transaction
/// pointer via a mutex. It implements [`TxPtrAccess`] to enable abstraction
/// over different access patterns.
#[derive(Debug, Clone)]
pub(crate) struct TransactionPtr {
pub struct TransactionPtr {
txn: *mut ffi::MDBX_txn,
#[cfg(feature = "read-tx-timeouts")]
timed_out: Arc<AtomicBool>,
@@ -598,7 +603,7 @@ impl TransactionPtr {
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
// let _lck = self.lock();
let _lck = self.lock();
// No race condition with the `TxnManager` timing out the transaction is possible here,
// because we're taking a lock for any actions on the transaction pointer, including a call
@@ -716,6 +721,22 @@ unsafe impl Send for TransactionPtr {}
// SAFETY: Access to the transaction is synchronized by the lock.
unsafe impl Sync for TransactionPtr {}
impl TxPtrAccess for TransactionPtr {
fn with_txn_ptr<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.txn_execute_fail_on_timeout(f)
}
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.txn_execute_renew_on_timeout(f)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,72 @@
//! Transaction pointer access abstraction.
//!
//! This module provides the [`TxPtrAccess`] trait which abstracts over different ways
//! of accessing the underlying MDBX transaction pointer. This enables future support
//! for both synchronized (mutex-protected) and unsynchronized transaction types.
//!
//! # Design
//!
//! MDBX has strict requirements for transaction access:
//! - All operations on a transaction must be totally ordered and non-concurrent
//! - Read-write transactions can only be used from the thread that created them
//!
//! The current implementation uses a `Mutex` to enforce these requirements at runtime.
//! This is correct and safe, but the synchronization overhead adds up in hot paths.
//!
//! This trait enables future implementations that can enforce these requirements at
//! compile time instead (e.g., using `&mut self` for exclusive access), eliminating
//! the runtime overhead for single-threaded workloads.
use crate::error::Result;
use std::fmt;
mod sealed {
use crate::transaction::TransactionPtr;
pub trait Sealed {}
impl Sealed for TransactionPtr {}
}
/// Trait for accessing the transaction pointer.
///
/// This trait abstracts over different ways transaction pointers are stored
/// and accessed. It enables both synchronized (mutex-protected) and
/// unsynchronized access patterns for transactions.
///
/// # Implementors
///
/// - [`TransactionPtr`](crate::transaction::TransactionPtr) - Synchronized access via mutex
///
/// # Future Extensions
///
/// This trait is designed to support future unsynchronized transaction types
/// that use `&mut self` to enforce exclusive access at compile time, avoiding
/// mutex overhead in single-threaded contexts.
pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
/// Execute a closure with the transaction pointer.
///
/// The closure receives a valid transaction pointer that can be used for
/// MDBX operations. For synchronized implementations, this acquires the
/// appropriate lock before executing the closure.
///
/// # Errors
///
/// Returns an error if the transaction has timed out or is otherwise
/// invalid.
fn with_txn_ptr<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R;
/// Execute a closure with the transaction pointer, attempting to renew
/// the transaction if it has timed out.
///
/// This is primarily used for cleanup operations (like closing cursors)
/// that need to succeed even after a timeout. For implementations that
/// don't support renewal, this falls back to [`with_txn_ptr`](Self::with_txn_ptr).
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.with_txn_ptr(f)
}
}

View File

@@ -1,6 +1,5 @@
#![allow(missing_docs)]
use reth_libmdbx::*;
use std::borrow::Cow;
use tempfile::tempdir;
#[test]
@@ -324,22 +323,10 @@ fn test_put_del() {
cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap();
cursor.put(b"key3", b"val3", WriteFlags::empty()).unwrap();
assert_eq!(
cursor.set_key(b"key2").unwrap(),
Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8])))
);
assert_eq!(
cursor.get_current().unwrap(),
Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8])))
);
assert_eq!(cursor.set_key::<[u8; 4], [u8; 4]>(b"key2").unwrap(), Some((*b"key2", *b"val2")));
assert_eq!(cursor.get_current::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key2", *b"val2")));
cursor.del(WriteFlags::empty()).unwrap();
assert_eq!(
cursor.get_current().unwrap(),
Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8])))
);
assert_eq!(
cursor.last().unwrap(),
Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8])))
);
assert_eq!(cursor.get_current::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key3", *b"val3")));
assert_eq!(cursor.last::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key3", *b"val3")));
}

View File

@@ -1,7 +1,6 @@
#![allow(missing_docs)]
use reth_libmdbx::*;
use std::{
borrow::Cow,
io::Write,
sync::{Arc, Barrier},
thread::{self, JoinHandle},
@@ -274,8 +273,8 @@ fn test_concurrent_writers() {
for i in 0..n {
assert_eq!(
Cow::<Vec<u8>>::Owned(format!("{val}{i}").into_bytes()),
txn.get(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap()
format!("{val}{i}").into_bytes(),
txn.get::<Vec<u8>>(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap()
);
}
}

View File

@@ -627,31 +627,30 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
StateWriteConfig {
write_receipts: !sf_ctx.write_receipts,
write_account_changesets: !sf_ctx.write_account_changesets,
write_storage_changesets: !sf_ctx.write_storage_changesets,
},
)?;
timings.write_state += start.elapsed();
let trie_data = block.trie_data();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
timings.write_hashed_state += start.elapsed();
}
}
// Write all hashed state and trie updates in single batches.
// Write all trie updates in a single batch.
// This reduces cursor open/close overhead from N calls to 1.
if save_mode.with_state() {
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
let start = Instant::now();
let merged_hashed_state = HashedPostStateSorted::merge_batch(
blocks.iter().rev().map(|b| b.trie_data().hashed_state),
);
if !merged_hashed_state.is_empty() {
self.write_hashed_state(&merged_hashed_state)?;
}
timings.write_hashed_state += start.elapsed();
let start = Instant::now();
let merged_trie =
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
let merged =
TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
if !merged_trie.is_empty() {
self.write_trie_updates_sorted(&merged_trie)?;
if !merged.is_empty() {
self.write_trie_updates_sorted(&merged)?;
}
timings.write_trie_updates += start.elapsed();
}
@@ -1359,7 +1358,7 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
self.tx
.cursor_dup_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.map(|r| r.map_err(Into::into))
.map(|result| -> ProviderResult<_> { Ok(result?) })
.collect()
}
}
@@ -1392,7 +1391,7 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
self.tx
.cursor_dup_read::<tables::StorageChangeSets>()?
.walk_range(BlockNumberAddress::range(range))?
.map(|r| r.map_err(Into::into))
.map(|result| -> ProviderResult<_> { Ok(result?) })
.collect()
}
}
@@ -1449,15 +1448,32 @@ impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
&self,
range: impl core::ops::RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
if self.cached_storage_settings().account_changesets_in_static_files {
self.static_file_provider.account_changesets_range(range)
let range = to_range(range);
let mut changesets = Vec::new();
if self.cached_storage_settings().account_changesets_in_static_files &&
let Some(highest) = self
.static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
{
let static_end = range.end.min(highest + 1);
if range.start < static_end {
for block in range.start..static_end {
let block_changesets = self.account_block_changeset(block)?;
for changeset in block_changesets {
changesets.push((block, changeset));
}
}
}
} else {
self.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(to_range(range))?
.map(|r| r.map_err(Into::into))
.collect()
// Fetch from database for blocks not in static files
let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
for entry in cursor.walk_range(range)? {
let (block_num, account_before) = entry?;
changesets.push((block_num, account_before));
}
}
Ok(changesets)
}
fn account_changeset_count(&self) -> ProviderResult<usize> {
@@ -2288,55 +2304,52 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
config: StateWriteConfig,
) -> ProviderResult<()> {
// Write storage changes
if config.write_storage_changesets {
tracing::trace!("Writing storage changes");
let mut storages_cursor =
self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!("Writing storage changes");
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!(block_number, "Writing block change");
// sort changes by address.
storage_changes.par_sort_unstable_by_key(|a| a.address);
let total_changes =
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
let mut changeset = Vec::with_capacity(total_changes);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.0);
tracing::trace!(block_number, "Writing block change");
// sort changes by address.
storage_changes.par_sort_unstable_by_key(|a| a.address);
let total_changes =
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
let mut changeset = Vec::with_capacity(total_changes);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.0);
// If we are writing the primary storage wipe transition, the pre-existing plain
// storage state has to be taken from the database and written to storage
// history. See [StorageWipe::Primary] for more details.
//
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
// collecting wiped entries into a Vec like this, see
// `write_storage_trie_changesets`.
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
// If we are writing the primary storage wipe transition, the pre-existing plain
// storage state has to be taken from the database and written to storage history.
// See [StorageWipe::Primary] for more details.
//
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
// collecting wiped entries into a Vec like this, see
// `write_storage_trie_changesets`.
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
}
tracing::trace!(?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
changeset.push(StorageBeforeTx { address, key, value });
}
}
let mut storage_changesets_writer =
EitherWriter::new_storage_changesets(self, block_number)?;
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
tracing::trace!(?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
changeset.push(StorageBeforeTx { address, key, value });
}
}
let mut storage_changesets_writer =
EitherWriter::new_storage_changesets(self, block_number)?;
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
}
if !config.write_account_changesets {
@@ -3319,13 +3332,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
Ok(())
}
/// Appends blocks with their execution state to the database.
///
/// **Note:** This function is only used in tests.
///
/// History indices are written to the appropriate backend based on storage settings:
/// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
///
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
fn append_blocks_with_state(
&self,
@@ -3383,31 +3389,8 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
// Use pre-computed transitions for history indices since static file
// writes aren't visible until commit.
// Note: For MDBX we use insert_*_history_index. For RocksDB we use
// append_*_history_shard which handles read-merge-write internally.
let storage_settings = self.cached_storage_settings();
if storage_settings.account_history_in_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for (address, blocks) in account_transitions {
batch.append_account_history_shard(address, blocks)?;
}
Ok(((), Some(batch.into_inner())))
})?;
} else {
self.insert_account_history_index(account_transitions)?;
}
if storage_settings.storages_history_in_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for ((address, key), blocks) in storage_transitions {
batch.append_storage_history_shard(address, key, blocks)?;
}
Ok(((), Some(batch.into_inner())))
})?;
} else {
self.insert_storage_history_index(storage_transitions)?;
}
self.insert_account_history_index(account_transitions)?;
self.insert_storage_history_index(storage_transitions)?;
durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
// Update pipeline progress

View File

@@ -50,7 +50,7 @@ use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWri
use std::{
collections::BTreeMap,
fmt::Debug,
ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
ops::{Deref, Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
sync::{atomic::AtomicU64, mpsc, Arc},
thread,
@@ -615,13 +615,13 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
let block_number = block.recovered_block().number();
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
let changeset: Vec<_> = reverts
.accounts
.into_iter()
.flatten()
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
.collect();
w.append_account_changeset(changeset, block_number)?;
for account_block_reverts in reverts.accounts {
let changeset = account_block_reverts
.into_iter()
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
.collect::<Vec<_>>();
w.append_account_changeset(changeset, block_number)?;
}
}
Ok(())
}
@@ -636,21 +636,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
let block_number = block.recovered_block().number();
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
let changeset: Vec<_> = reverts
.storage
.into_iter()
.flatten()
.flat_map(|revert| {
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
StorageBeforeTx {
address: revert.address,
key: B256::new(key.to_be_bytes()),
value: revert_to_slot.to_previous_value(),
}
for storage_block_reverts in reverts.storage {
let changeset = storage_block_reverts
.into_iter()
.flat_map(|revert| {
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
StorageBeforeTx {
address: revert.address,
key: B256::new(key.to_be_bytes()),
value: revert_to_slot.to_previous_value(),
}
})
})
})
.collect();
w.append_storage_changeset(changeset, block_number)?;
.collect::<Vec<_>>();
w.append_storage_changeset(changeset, block_number)?;
}
}
Ok(())
}
@@ -1879,33 +1879,6 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
self.indexes.read().get(segment).map(|index| index.max_block)
}
/// Converts a range to a bounded `RangeInclusive` capped to the highest static file block.
///
/// This is necessary because static file iteration beyond the tip would loop forever:
/// blocks beyond the static file tip return `Ok(empty)` which is indistinguishable from
/// blocks with no changes. We cap the end to the highest available block regardless of
/// whether the input was unbounded or an explicit large value like `BlockNumber::MAX`.
fn bound_range(
&self,
range: impl RangeBounds<BlockNumber>,
segment: StaticFileSegment,
) -> RangeInclusive<BlockNumber> {
let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
let start = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.saturating_add(1),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.min(highest_block),
Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
Bound::Unbounded => highest_block,
};
start..=end
}
/// Gets the highest static file transaction.
///
/// If there is nothing on disk for the given segment, this will return [`None`].
@@ -2381,7 +2354,6 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
&self,
range: impl core::ops::RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
self.walk_account_changeset_range(range).collect()
}
@@ -2501,7 +2473,6 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
self.walk_storage_changeset_range(range).collect()
}

View File

@@ -136,16 +136,10 @@ pub struct StateWriteConfig {
pub write_receipts: bool,
/// Whether to write account changesets.
pub write_account_changesets: bool,
/// Whether to write storage changesets.
pub write_storage_changesets: bool,
}
impl Default for StateWriteConfig {
fn default() -> Self {
Self {
write_receipts: true,
write_account_changesets: true,
write_storage_changesets: true,
}
Self { write_receipts: true, write_account_changesets: true }
}
}

View File

@@ -219,12 +219,11 @@ where
let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
// Step 5: Collect all account trie nodes that changed in the target block
let account_nodes_ref = changesets.account_nodes_ref();
let mut account_nodes = Vec::with_capacity(account_nodes_ref.len());
let mut account_nodes = Vec::new();
let mut account_cursor = cursor_factory.account_trie_cursor()?;
// Iterate over the account nodes from the changesets
for (nibbles, _old_node) in account_nodes_ref {
for (nibbles, _old_node) in changesets.account_nodes_ref() {
// Look up the current value of this trie node using the overlay cursor
let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
account_nodes.push((*nibbles, node_value));
@@ -236,11 +235,10 @@ where
// Iterate over the storage tries from the changesets
for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
let storage_nodes_ref = storage_changeset.storage_nodes_ref();
let mut storage_nodes = Vec::with_capacity(storage_nodes_ref.len());
let mut storage_nodes = Vec::new();
// Iterate over the storage nodes for this account
for (nibbles, _old_node) in storage_nodes_ref {
for (nibbles, _old_node) in storage_changeset.storage_nodes_ref() {
// Look up the current value of this storage trie node
let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
storage_nodes.push((*nibbles, node_value));

View File

@@ -64,7 +64,7 @@ impl<C> DatabaseAccountTrieCursor<C> {
impl<C> TrieCursor for DatabaseAccountTrieCursor<C>
where
C: DbCursorRO<tables::AccountsTrie> + Send,
C: DbCursorRO<tables::AccountsTrie> + Send + Sync,
{
/// Seeks an exact match for the provided key in the account trie.
fn seek_exact(
@@ -160,7 +160,7 @@ where
impl<C> TrieCursor for DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send + Sync,
{
/// Seeks an exact match for the given key in the storage trie.
fn seek_exact(
@@ -202,7 +202,7 @@ where
impl<C> TrieStorageCursor for DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send + Sync,
{
fn set_hashed_address(&mut self, hashed_address: B256) {
self.hashed_address = hashed_address;

View File

@@ -33,7 +33,7 @@ use crate::{
root::ParallelStateRootError,
stats::{ParallelTrieStats, ParallelTrieTracker},
targets_v2::MultiProofTargetsV2,
value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
value_encoder::AsyncAccountValueEncoder,
StorageRootTargets,
};
use alloy_primitives::{
@@ -65,8 +65,6 @@ use reth_trie_common::{
};
use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
use std::{
cell::RefCell,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
@@ -84,22 +82,6 @@ use crate::proof_task_metrics::{
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Type alias for the V2 account proof calculator.
type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
<Provider as HashedCursorFactory>::AccountCursor<'a>,
AsyncAccountValueEncoder<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
>,
>;
/// Type alias for the V2 storage proof calculator.
type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
>;
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -561,6 +543,15 @@ where
ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
storage_node_provider.trie_node(path)
}
/// Process a blinded account node request.
///
/// Used by account workers to retrieve blinded account trie nodes for proof construction.
fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
let account_node_provider =
ProofBlindedAccountProvider::new(&self.provider, &self.provider);
account_node_provider.trie_node(path)
}
}
impl TrieNodeProviderFactory for ProofWorkerHandle {
type AccountNodeProvider = ProofTaskTrieNodeProvider;
@@ -897,12 +888,7 @@ where
// Initially mark this worker as available.
self.available_workers.fetch_add(1, Ordering::Relaxed);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
while let Ok(job) = self.work_rx.recv() {
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
@@ -932,8 +918,6 @@ where
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
idle_start = Instant::now();
}
trace!(
@@ -941,14 +925,12 @@ where
worker_id = self.worker_id,
storage_proofs_processed,
storage_nodes_processed,
total_idle_time_us = total_idle_time.as_micros(),
"Storage worker shutting down"
);
#[cfg(feature = "metrics")]
{
self.metrics.record_storage_nodes(storage_nodes_processed as usize);
self.metrics.record_storage_worker_idle_time(total_idle_time);
self.cursor_metrics.record(&mut cursor_metrics_cache);
}
@@ -1112,7 +1094,7 @@ struct AccountProofWorker<Factory> {
work_rx: CrossbeamReceiver<AccountWorkerJob>,
/// Unique identifier for this worker (used for tracing)
worker_id: usize,
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
/// Channel for dispatching storage proof work
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
@@ -1183,7 +1165,9 @@ where
/// If this function panics, the worker thread terminates but other workers
/// continue operating and the system degrades gracefully.
fn run(mut self) -> ProviderResult<()> {
// Create provider from factory
let provider = self.task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, self.worker_id);
trace!(
target: "trie::proof_task",
@@ -1195,64 +1179,39 @@ where
let mut account_nodes_processed = 0u64;
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
// Create both account and storage calculators for V2 proofs.
// The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled {
let account_trie_cursor = provider.account_trie_cursor()?;
let account_hashed_cursor = provider.hashed_account_cursor()?;
let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
(
Some(proof_v2::ProofCalculator::<
_,
_,
AsyncAccountValueEncoder<
<Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
>::new(account_trie_cursor, account_hashed_cursor)),
Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
storage_trie_cursor,
storage_hashed_cursor,
)))),
)
let mut v2_calculator = if self.v2_enabled {
let trie_cursor = proof_tx.provider.account_trie_cursor()?;
let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
trie_cursor,
hashed_cursor,
))
} else {
(None, None)
None
};
// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
let mut value_encoder_stats_cache = ValueEncoderStats::default();
while let Ok(job) = self.work_rx.recv() {
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
AccountWorkerJob::AccountMultiproof { input } => {
let value_encoder_stats = self.process_account_multiproof(
&provider,
v2_account_calculator.as_mut(),
v2_storage_calculator.clone(),
self.process_account_multiproof(
&proof_tx,
v2_calculator.as_mut(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
total_idle_time += value_encoder_stats.storage_wait_time;
value_encoder_stats_cache.extend(&value_encoder_stats);
}
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
Self::process_blinded_node(
self.worker_id,
&provider,
&proof_tx,
path,
result_sender,
&mut account_nodes_processed,
@@ -1262,8 +1221,6 @@ where
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
idle_start = Instant::now();
}
trace!(
@@ -1271,16 +1228,13 @@ where
worker_id=self.worker_id,
account_proofs_processed,
account_nodes_processed,
total_idle_time_us = total_idle_time.as_micros(),
"Account worker shutting down"
);
#[cfg(feature = "metrics")]
{
self.metrics.record_account_nodes(account_nodes_processed as usize);
self.metrics.record_account_worker_idle_time(total_idle_time);
self.cursor_metrics.record(&mut cursor_metrics_cache);
self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
}
Ok(())
@@ -1288,13 +1242,13 @@ where
fn compute_legacy_account_multiproof<Provider>(
&self,
provider: &Provider,
proof_tx: &ProofTaskTx<Provider>,
targets: MultiProofTargets,
mut prefix_sets: TriePrefixSets,
collect_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
) -> Result<(ProofResult, Duration), ParallelStateRootError>
) -> Result<ProofResult, ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory,
{
@@ -1302,7 +1256,6 @@ where
target: "trie::proof_task",
"Account multiproof calculation",
targets = targets.len(),
num_slots = targets.values().map(|slots| slots.len()).sum::<usize>(),
worker_id=self.worker_id,
);
let _span_guard = span.enter();
@@ -1340,27 +1293,28 @@ where
cached_storage_roots: &self.cached_storage_roots,
};
let mut storage_wait_time = Duration::ZERO;
let result = build_account_multiproof_with_storage_roots(
provider,
&proof_tx.provider,
ctx,
&mut tracker,
proof_cursor_metrics,
&mut storage_wait_time,
)?;
);
let stats = tracker.finish();
Ok((ProofResult::Legacy(result, stats), storage_wait_time))
result.map(|proof| ProofResult::Legacy(proof, stats))
}
fn compute_v2_account_multiproof<'a, Provider>(
fn compute_v2_account_multiproof<Provider>(
&self,
v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
v2_calculator: &mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
targets: MultiProofTargetsV2,
) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError>
) -> Result<ProofResult, ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
Provider: TrieCursorFactory + HashedCursorFactory,
{
let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
@@ -1379,75 +1333,64 @@ where
dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
let mut value_encoder = AsyncAccountValueEncoder::new(
self.storage_work_tx.clone(),
storage_proof_receivers,
self.cached_storage_roots.clone(),
v2_storage_calculator,
);
let account_proofs =
v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
let proof = DecodedMultiProofV2 {
account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
storage_proofs: value_encoder.into_storage_proofs()?,
};
let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
Ok((ProofResult::V2(proof), value_encoder_stats))
Ok(ProofResult::V2(proof))
}
/// Processes an account multiproof request.
///
/// Returns stats from the value encoder used during proof computation.
fn process_account_multiproof<'a, Provider>(
fn process_account_multiproof<Provider>(
&self,
provider: &Provider,
v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>,
v2_storage_calculator: Option<Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>>,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: Option<
&mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
>,
input: AccountMultiproofInput,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) -> ValueEncoderStats
where
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
let proof_start = Instant::now();
let (proof_result_sender, result, value_encoder_stats) = match input {
let (proof_result_sender, result) = match input {
AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
proof_result_sender,
} => {
let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof(
provider,
} => (
proof_result_sender,
self.compute_legacy_account_multiproof(
proof_tx,
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
&mut proof_cursor_metrics,
) {
Ok((proof, wait_time)) => (
Ok(proof),
ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() },
),
Err(e) => (Err(e), ValueEncoderStats::default()),
};
(proof_result_sender, result, value_encoder_stats)
}
AccountMultiproofInput::V2 { targets, proof_result_sender } => {
let (result, value_encoder_stats) = match self
.compute_v2_account_multiproof::<Provider>(
v2_account_calculator.expect("v2 account calculator provided"),
v2_storage_calculator.expect("v2 storage calculator provided"),
targets,
) {
Ok((proof, stats)) => (Ok(proof), stats),
Err(e) => (Err(e), ValueEncoderStats::default()),
};
(proof_result_sender, result, value_encoder_stats)
}
),
),
AccountMultiproofInput::V2 { targets, proof_result_sender } => (
proof_result_sender,
self.compute_v2_account_multiproof::<Provider>(
v2_calculator.expect("v2 calculator provided"),
targets,
),
),
};
let ProofResultContext {
@@ -1500,14 +1443,12 @@ where
#[cfg(feature = "metrics")]
// Accumulate per-proof metrics into the worker's cache
cursor_metrics_cache.extend(&proof_cursor_metrics);
value_encoder_stats
}
/// Processes a blinded account node lookup request.
fn process_blinded_node<Provider>(
worker_id: usize,
provider: &Provider,
proof_tx: &ProofTaskTx<Provider>,
path: Nibbles,
result_sender: Sender<TrieNodeProviderResult>,
account_nodes_processed: &mut u64,
@@ -1528,8 +1469,7 @@ where
);
let start = Instant::now();
let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
let result = account_node_provider.trie_node(&path);
let result = proof_tx.process_blinded_account_node(&path);
let elapsed = start.elapsed();
*account_nodes_processed += 1;
@@ -1560,13 +1500,11 @@ where
/// enabling interleaved parallelism between account trie traversal and storage proof computation.
///
/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
/// Also accumulates the time spent waiting for storage proofs into `storage_wait_time`.
fn build_account_multiproof_with_storage_roots<P>(
provider: &P,
ctx: AccountMultiproofParams<'_>,
tracker: &mut ParallelTrieTracker,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
storage_wait_time: &mut Duration,
) -> Result<DecodedMultiProof, ParallelStateRootError>
where
P: TrieCursorFactory + HashedCursorFactory,
@@ -1630,7 +1568,6 @@ where
);
// Block on this specific storage proof receiver - enables interleaved
// parallelism
let wait_start = Instant::now();
let proof_msg = receiver.recv().map_err(|_| {
ParallelStateRootError::StorageRoot(
reth_execution_errors::StorageRootError::Database(
@@ -1640,7 +1577,6 @@ where
),
)
})?;
*storage_wait_time += wait_start.elapsed();
drop(_guard);
@@ -1732,9 +1668,7 @@ where
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
// Done last to allow storage workers more time to complete while we finalized the account trie.
for (hashed_address, receiver) in storage_proof_receivers {
let wait_start = Instant::now();
if let Ok(proof_msg) = receiver.recv() {
*storage_wait_time += wait_start.elapsed();
let proof_result = proof_msg.result?;
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported");

View File

@@ -1,11 +1,9 @@
use crate::value_encoder::ValueEncoderStats;
use reth_metrics::{metrics::Histogram, Metrics};
use reth_trie::{
hashed_cursor::{HashedCursorMetrics, HashedCursorMetricsCache},
trie_cursor::{TrieCursorMetrics, TrieCursorMetricsCache},
TrieType,
};
use std::time::Duration;
/// Metrics for the proof task.
#[derive(Clone, Metrics)]
@@ -15,17 +13,6 @@ pub struct ProofTaskTrieMetrics {
blinded_account_nodes: Histogram,
/// A histogram for the number of blinded storage nodes fetched.
blinded_storage_nodes: Histogram,
/// Histogram for storage worker idle time in seconds (waiting for proof jobs).
storage_worker_idle_time_seconds: Histogram,
/// Histogram for account worker idle time in seconds (waiting for proof jobs + storage
/// results).
account_worker_idle_time_seconds: Histogram,
/// Histogram for `Dispatched` deferred encoder variant count.
deferred_encoder_dispatched: Histogram,
/// Histogram for `FromCache` deferred encoder variant count.
deferred_encoder_from_cache: Histogram,
/// Histogram for `Sync` deferred encoder variant count.
deferred_encoder_sync: Histogram,
}
impl ProofTaskTrieMetrics {
@@ -38,23 +25,6 @@ impl ProofTaskTrieMetrics {
pub fn record_storage_nodes(&self, count: usize) {
self.blinded_storage_nodes.record(count as f64);
}
/// Record storage worker idle time.
pub fn record_storage_worker_idle_time(&self, duration: Duration) {
self.storage_worker_idle_time_seconds.record(duration.as_secs_f64());
}
/// Record account worker idle time.
pub fn record_account_worker_idle_time(&self, duration: Duration) {
self.account_worker_idle_time_seconds.record(duration.as_secs_f64());
}
/// Record value encoder stats (deferred encoder variant counts).
pub(crate) fn record_value_encoder_stats(&self, stats: &ValueEncoderStats) {
self.deferred_encoder_dispatched.record(stats.dispatched_count as f64);
self.deferred_encoder_from_cache.record(stats.from_cache_count as f64);
self.deferred_encoder_sync.record(stats.sync_count as f64);
}
}
/// Cursor metrics for proof task operations.

View File

@@ -1,79 +1,36 @@
use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
use crate::proof_task::{
StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob,
};
use alloy_primitives::{map::B256Map, B256};
use alloy_rlp::Encodable;
use core::cell::RefCell;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use reth_execution_errors::trie::StateProofError;
use reth_primitives_traits::Account;
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
hashed_cursor::HashedStorageCursor,
proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
trie_cursor::TrieStorageCursor,
proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target},
ProofTrieNode,
};
use std::{
rc::Rc,
sync::Arc,
time::{Duration, Instant},
};
/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation.
///
/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ValueEncoderStats {
/// Accumulated time spent waiting for storage proof results from dispatched workers.
pub(crate) storage_wait_time: Duration,
/// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers).
pub(crate) dispatched_count: u64,
/// Number of times the `FromCache` variant was used (storage root already cached).
pub(crate) from_cache_count: u64,
/// Number of times the `Sync` variant was used (synchronous computation).
pub(crate) sync_count: u64,
}
impl ValueEncoderStats {
/// Extends this metrics by adding the values from another.
pub(crate) fn extend(&mut self, other: &Self) {
self.storage_wait_time += other.storage_wait_time;
self.dispatched_count += other.dispatched_count;
self.from_cache_count += other.from_cache_count;
self.sync_count += other.sync_count;
}
}
use std::{rc::Rc, sync::Arc};
/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
/// A storage proof job was dispatched to the worker pool.
pub(crate) enum AsyncAccountDeferredValueEncoder {
Dispatched {
hashed_address: B256,
account: Account,
proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
/// Shared storage proof results.
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
/// Shared stats for tracking wait time and counts.
stats: Rc<RefCell<ValueEncoderStats>>,
// None if results shouldn't be retained for this dispatched proof.
storage_proof_results: Option<Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>>,
},
/// The storage root was found in cache.
FromCache { account: Account, root: B256 },
/// Synchronous storage root computation.
Sync {
/// Shared storage proof calculator for computing storage roots.
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
hashed_address: B256,
FromCache {
account: Account,
/// Cache to store computed storage roots for future reuse.
cached_storage_roots: Arc<DashMap<B256, B256>>,
root: B256,
},
}
impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
{
impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
let (account, root) = match self {
Self::Dispatched {
@@ -81,9 +38,7 @@ where
account,
proof_result_rx,
storage_proof_results,
stats,
} => {
let wait_start = Instant::now();
let result = proof_result_rx?
.recv()
.map_err(|_| {
@@ -92,27 +47,18 @@ where
)))
})?
.result?;
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { root: Some(root), proof } = result else {
panic!("StorageProofResult is not V2 with root: {result:?}")
};
storage_proof_results.borrow_mut().insert(hashed_address, proof);
if let Some(storage_proof_results) = storage_proof_results.as_ref() {
storage_proof_results.borrow_mut().insert(hashed_address, proof);
}
(account, root)
}
Self::FromCache { account, root } => (account, root),
Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
let mut calculator = storage_calculator.borrow_mut();
let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?;
let storage_root = calculator
.compute_root_hash(&proof)?
.expect("storage_proof with dummy target always returns root");
cached_storage_roots.insert(hashed_address, storage_root);
(account, storage_root)
}
};
let account = account.into_trie_account(root);
@@ -121,15 +67,12 @@ where
}
}
/// Implements the [`LeafValueEncoder`] trait for accounts.
///
/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are
/// being computed asynchronously by worker threads.
///
/// For accounts without pre-dispatched proofs or cached roots, uses a shared
/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across
/// multiple accounts.
pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
/// Implements the [`LeafValueEncoder`] trait for accounts using a [`CrossbeamSender`] to dispatch
/// and compute storage roots asynchronously. Can also accept a set of already dispatched account
/// storage proofs, for cases where it's possible to determine some necessary accounts ahead of
/// time.
pub(crate) struct AsyncAccountValueEncoder {
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Storage proof jobs which were dispatched ahead of time.
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
/// Storage roots which have already been computed. This can be used only if a storage proof
@@ -138,59 +81,39 @@ pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
/// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
/// required because [`DeferredValueEncoder`] cannot have a lifetime.
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
/// Shared storage proof calculator for synchronous computation. Reuses cursors and internal
/// buffers across multiple storage root calculations.
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
/// Shared stats for tracking wait time and variant counts.
stats: Rc<RefCell<ValueEncoderStats>>,
}
impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
/// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate
/// storage roots synchronously.
///
/// # Parameters
/// - `dispatched`: Pre-dispatched storage proof receivers for target accounts
/// - `cached_storage_roots`: Shared cache of already-computed storage roots
/// - `storage_calculator`: Shared storage proof calculator for synchronous computation
impl AsyncAccountValueEncoder {
/// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage
/// roots asynchronously.
pub(crate) fn new(
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
) -> Self {
Self {
storage_work_tx,
dispatched,
cached_storage_roots,
storage_proof_results: Default::default(),
storage_calculator,
stats: Default::default(),
}
}
/// Consume [`Self`] and return all collected storage proofs along with accumulated stats.
///
/// This method collects any remaining dispatched proofs that weren't consumed during proof
/// calculation and includes their wait time in the returned stats.
/// Consume [`Self`] and return all collected storage proofs which had been dispatched.
///
/// # Panics
///
/// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
/// been dropped.
pub(crate) fn finalize(
pub(crate) fn into_storage_proofs(
self,
) -> Result<(B256Map<Vec<ProofTrieNode>>, ValueEncoderStats), StateProofError> {
) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {
let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
.expect("no deferred encoders are still allocated")
.into_inner();
let mut stats = Rc::into_inner(self.stats)
.expect("no deferred encoders are still allocated")
.into_inner();
// Any remaining dispatched proofs need to have their results collected.
// These are proofs that were pre-dispatched but not consumed during proof calculation.
// Any remaining dispatched proofs need to have their results collected
for (hashed_address, rx) in &self.dispatched {
let wait_start = Instant::now();
let result = rx
.recv()
.map_err(|_| {
@@ -199,7 +122,6 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
)))
})?
.result?;
stats.storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { proof, .. } = result else {
panic!("StorageProofResult is not V2: {result:?}")
@@ -208,17 +130,13 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
storage_proof_results.insert(*hashed_address, proof);
}
Ok((storage_proof_results, stats))
Ok(storage_proof_results)
}
}
impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
{
impl LeafValueEncoder for AsyncAccountValueEncoder {
type Value = Account;
type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
type DeferredEncoder = AsyncAccountDeferredValueEncoder;
fn deferred_encoder(
&mut self,
@@ -228,13 +146,11 @@ where
// If the proof job has already been dispatched for this account then it's not necessary to
// dispatch another.
if let Some(rx) = self.dispatched.remove(&hashed_address) {
self.stats.borrow_mut().dispatched_count += 1;
return AsyncAccountDeferredValueEncoder::Dispatched {
hashed_address,
account,
proof_result_rx: Ok(rx),
storage_proof_results: self.storage_proof_results.clone(),
stats: self.stats.clone(),
storage_proof_results: Some(self.storage_proof_results.clone()),
}
}
@@ -243,17 +159,25 @@ where
// If the root is already calculated then just use it directly
if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
self.stats.borrow_mut().from_cache_count += 1;
return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
}
// Compute storage root synchronously using the shared calculator
self.stats.borrow_mut().sync_count += 1;
AsyncAccountDeferredValueEncoder::Sync {
storage_calculator: self.storage_calculator.clone(),
// Create a proof input which targets a bogus key, so that we calculate the root as a
// side-effect.
let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]);
let (tx, rx) = crossbeam_channel::bounded(1);
let proof_result_rx = self
.storage_work_tx
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx })
.map_err(|_| DatabaseError::Other("storage workers unavailable".to_string()))
.map(|_| rx);
AsyncAccountDeferredValueEncoder::Dispatched {
hashed_address,
account,
cached_storage_roots: self.cached_storage_roots.clone(),
proof_result_rx,
storage_proof_results: None,
}
}
}

View File

@@ -15,7 +15,7 @@ use reth_trie_common::{
use reth_trie_sparse::{
provider::{RevealedNode, TrieNodeProvider},
LeafLookup, LeafLookupError, RlpNodeStackItem, SparseNode, SparseNodeType, SparseTrie,
SparseTrieExt, SparseTrieUpdates,
SparseTrieUpdates,
};
use smallvec::SmallVec;
use std::cmp::{Ord, Ordering, PartialOrd};
@@ -908,162 +908,6 @@ impl SparseTrie for ParallelSparseTrie {
}
}
impl SparseTrieExt for ParallelSparseTrie {
/// Returns the count of revealed (non-hash) nodes across all subtries.
fn revealed_node_count(&self) -> usize {
let upper_count = self.upper_subtrie.nodes.values().filter(|n| !n.is_hash()).count();
let lower_count: usize = self
.lower_subtries
.iter()
.filter_map(|s| s.as_revealed_ref())
.map(|s| s.nodes.values().filter(|n| !n.is_hash()).count())
.sum();
upper_count + lower_count
}
fn prune(&mut self, max_depth: usize) -> usize {
// DFS traversal to find nodes at max_depth that can be pruned.
// Collects "effective pruned roots" - children of nodes at max_depth with computed hashes.
// We replace nodes with Hash stubs inline during traversal.
let mut effective_pruned_roots = Vec::<(Nibbles, B256)>::new();
let mut stack: SmallVec<[(Nibbles, usize); 32]> = SmallVec::new();
stack.push((Nibbles::default(), 0));
// DFS traversal: pop path and depth, skip if subtrie or node not found.
while let Some((path, depth)) = stack.pop() {
// Get children to visit from current node (immutable access)
let children: SmallVec<[Nibbles; 16]> = {
let Some(subtrie) = self.subtrie_for_path(&path) else { continue };
let Some(node) = subtrie.nodes.get(&path) else { continue };
match node {
SparseNode::Empty | SparseNode::Hash(_) | SparseNode::Leaf { .. } => {
SmallVec::new()
}
SparseNode::Extension { key, .. } => {
let mut child = path;
child.extend(key);
SmallVec::from_buf_and_len([child; 16], 1)
}
SparseNode::Branch { state_mask, .. } => {
let mut children = SmallVec::new();
let mut mask = state_mask.get();
while mask != 0 {
let nibble = mask.trailing_zeros() as u8;
mask &= mask - 1;
let mut child = path;
child.push_unchecked(nibble);
children.push(child);
}
children
}
}
};
// Process children - either continue traversal or prune
for child in children {
if depth == max_depth {
// Check if child has a computed hash and replace inline
let hash = self
.subtrie_for_path(&child)
.and_then(|s| s.nodes.get(&child))
.filter(|n| !n.is_hash())
.and_then(|n| n.hash());
if let Some(hash) = hash {
self.subtrie_for_path_mut(&child)
.nodes
.insert(child, SparseNode::Hash(hash));
effective_pruned_roots.push((child, hash));
}
} else {
stack.push((child, depth + 1));
}
}
}
if effective_pruned_roots.is_empty() {
return 0;
}
let nodes_converted = effective_pruned_roots.len();
// Sort roots by subtrie type (upper first), then by path for efficient partitioning.
effective_pruned_roots.sort_unstable_by(|(path_a, _), (path_b, _)| {
let subtrie_type_a = SparseSubtrieType::from_path(path_a);
let subtrie_type_b = SparseSubtrieType::from_path(path_b);
subtrie_type_a.cmp(&subtrie_type_b).then(path_a.cmp(path_b))
});
// Split off upper subtrie roots (they come first due to sorting)
let num_upper_roots = effective_pruned_roots
.iter()
.position(|(p, _)| !SparseSubtrieType::path_len_is_upper(p.len()))
.unwrap_or(effective_pruned_roots.len());
let roots_upper = &effective_pruned_roots[..num_upper_roots];
let roots_lower = &effective_pruned_roots[num_upper_roots..];
debug_assert!(
{
let mut all_roots: Vec<_> = effective_pruned_roots.iter().map(|(p, _)| p).collect();
all_roots.sort_unstable();
all_roots.windows(2).all(|w| !w[1].starts_with(w[0]))
},
"prune roots must be prefix-free"
);
// Upper prune roots that are prefixes of lower subtrie root paths cause the entire
// subtrie to be cleared (preserving allocations for reuse).
if !roots_upper.is_empty() {
for subtrie in &mut self.lower_subtries {
let should_clear = subtrie.as_revealed_ref().is_some_and(|s| {
let search_idx = roots_upper.partition_point(|(root, _)| root <= &s.path);
search_idx > 0 && s.path.starts_with(&roots_upper[search_idx - 1].0)
});
if should_clear {
subtrie.clear();
}
}
}
// Upper subtrie: prune nodes and values
self.upper_subtrie.nodes.retain(|p, _| !is_strict_descendant_in(roots_upper, p));
self.upper_subtrie.inner.values.retain(|p, _| {
!starts_with_pruned_in(roots_upper, p) && !starts_with_pruned_in(roots_lower, p)
});
// Process lower subtries using chunk_by to group roots by subtrie
for roots_group in roots_lower.chunk_by(|(path_a, _), (path_b, _)| {
SparseSubtrieType::from_path(path_a) == SparseSubtrieType::from_path(path_b)
}) {
let subtrie_idx = path_subtrie_index_unchecked(&roots_group[0].0);
// Skip unrevealed/blinded subtries - nothing to prune
let Some(subtrie) = self.lower_subtries[subtrie_idx].as_revealed_mut() else {
continue;
};
// Retain only nodes/values not descended from any pruned root.
subtrie.nodes.retain(|p, _| !is_strict_descendant_in(roots_group, p));
subtrie.inner.values.retain(|p, _| !starts_with_pruned_in(roots_group, p));
}
// Branch node masks pruning
self.branch_node_masks.retain(|p, _| {
if SparseSubtrieType::path_len_is_upper(p.len()) {
!starts_with_pruned_in(roots_upper, p)
} else {
!starts_with_pruned_in(roots_lower, p) && !starts_with_pruned_in(roots_upper, p)
}
});
nodes_converted
}
}
impl ParallelSparseTrie {
/// Sets the thresholds that control when parallelism is used during operations.
pub const fn with_parallelism_thresholds(mut self, thresholds: ParallelismThresholds) -> Self {
@@ -2810,44 +2654,6 @@ fn path_subtrie_index_unchecked(path: &Nibbles) -> usize {
path.get_byte_unchecked(0) as usize
}
/// Checks if `path` is a strict descendant of any root in a sorted slice.
///
/// Uses binary search to find the candidate root that could be an ancestor.
/// Returns `true` if `path` starts with a root and is longer (strict descendant).
fn is_strict_descendant_in(roots: &[(Nibbles, B256)], path: &Nibbles) -> bool {
if roots.is_empty() {
return false;
}
debug_assert!(roots.windows(2).all(|w| w[0].0 <= w[1].0), "roots must be sorted by path");
let idx = roots.partition_point(|(root, _)| root <= path);
if idx > 0 {
let candidate = &roots[idx - 1].0;
if path.starts_with(candidate) && path.len() > candidate.len() {
return true;
}
}
false
}
/// Checks if `path` starts with any root in a sorted slice (inclusive).
///
/// Uses binary search to find the candidate root that could be a prefix.
/// Returns `true` if `path` starts with a root (including exact match).
fn starts_with_pruned_in(roots: &[(Nibbles, B256)], path: &Nibbles) -> bool {
if roots.is_empty() {
return false;
}
debug_assert!(roots.windows(2).all(|w| w[0].0 <= w[1].0), "roots must be sorted by path");
let idx = roots.partition_point(|(root, _)| root <= path);
if idx > 0 {
let candidate = &roots[idx - 1].0;
if path.starts_with(candidate) {
return true;
}
}
false
}
/// Used by lower subtries to communicate updates to the top-level [`SparseTrieUpdates`] set.
#[derive(Clone, Debug, Eq, PartialEq)]
enum SparseTrieUpdatesAction {
@@ -2898,8 +2704,7 @@ mod tests {
use reth_trie_db::DatabaseTrieCursorFactory;
use reth_trie_sparse::{
provider::{DefaultTrieNodeProvider, RevealedNode, TrieNodeProvider},
LeafLookup, LeafLookupError, SerialSparseTrie, SparseNode, SparseTrie, SparseTrieExt,
SparseTrieUpdates,
LeafLookup, LeafLookupError, SerialSparseTrie, SparseNode, SparseTrie, SparseTrieUpdates,
};
use std::collections::{BTreeMap, BTreeSet};
@@ -2944,17 +2749,6 @@ mod tests {
Account { nonce, ..Default::default() }
}
fn large_account_value() -> Vec<u8> {
let account = Account {
nonce: 0x123456789abcdef,
balance: U256::from(0x123456789abcdef0123456789abcdef_u128),
..Default::default()
};
let mut buf = Vec::new();
account.into_trie_account(EMPTY_ROOT_HASH).encode(&mut buf);
buf
}
fn encode_account_value(nonce: u64) -> Vec<u8> {
let account = Account { nonce, ..Default::default() };
let trie_account = account.into_trie_account(EMPTY_ROOT_HASH);
@@ -7312,372 +7106,4 @@ mod tests {
// Value should be retrievable
assert_eq!(trie.get_leaf_value(&slot_path), Some(&slot_value));
}
#[test]
fn test_prune_empty_suffix_key_regression() {
// Regression test: when a leaf has an empty suffix key (full path == node path),
// the value must be removed when that path becomes a pruned root.
// This catches the bug where is_strict_descendant fails to remove p == pruned_root.
use reth_trie_sparse::provider::DefaultTrieNodeProvider;
let provider = DefaultTrieNodeProvider;
let mut parallel = ParallelSparseTrie::default();
// Large value to ensure nodes have hashes (RLP >= 32 bytes)
let value = {
let account = Account {
nonce: 0x123456789abcdef,
balance: U256::from(0x123456789abcdef0123456789abcdef_u128),
..Default::default()
};
let mut buf = Vec::new();
account.into_trie_account(EMPTY_ROOT_HASH).encode(&mut buf);
buf
};
// Create a trie with multiple leaves to force a branch at root
for i in 0..16u8 {
parallel
.update_leaf(
Nibbles::from_nibbles([i, 0x1, 0x2, 0x3, 0x4, 0x5]),
value.clone(),
&provider,
)
.unwrap();
}
// Compute root to get hashes
let root_before = parallel.root();
// Prune at depth 0: the children of root become pruned roots
parallel.prune(0);
let root_after = parallel.root();
assert_eq!(root_before, root_after, "root hash must be preserved");
// Key assertion: values under pruned paths must be removed
// With the bug, values at pruned_root paths (not strict descendants) would remain
for i in 0..16u8 {
let path = Nibbles::from_nibbles([i, 0x1, 0x2, 0x3, 0x4, 0x5]);
assert!(
parallel.get_leaf_value(&path).is_none(),
"value at {:?} should be removed after prune",
path
);
}
}
#[test]
fn test_prune_at_various_depths() {
for max_depth in [0, 1, 2] {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
for i in 0..4u8 {
for j in 0..4u8 {
for k in 0..4u8 {
trie.update_leaf(
Nibbles::from_nibbles([i, j, k, 0x1, 0x2, 0x3]),
value.clone(),
&provider,
)
.unwrap();
}
}
}
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
trie.prune(max_depth);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved after prune");
let nodes_after = trie.revealed_node_count();
assert!(
nodes_after < nodes_before,
"node count should decrease after prune at depth {max_depth}"
);
if max_depth == 0 {
assert_eq!(nodes_after, 1, "only root should be revealed after prune(0)");
}
}
}
#[test]
fn test_prune_empty_trie() {
let mut trie = ParallelSparseTrie::default();
trie.prune(2);
let root = trie.root();
assert_eq!(root, EMPTY_ROOT_HASH, "empty trie should have empty root hash");
}
#[test]
fn test_prune_preserves_root_hash() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
for i in 0..8u8 {
for j in 0..4u8 {
trie.update_leaf(
Nibbles::from_nibbles([i, j, 0x3, 0x4, 0x5, 0x6]),
value.clone(),
&provider,
)
.unwrap();
}
}
let root_before = trie.root();
trie.prune(1);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash must be preserved after prune");
}
#[test]
fn test_prune_single_leaf_trie() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
trie.update_leaf(Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]), value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
trie.prune(0);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), nodes_before, "single leaf trie should not change");
}
#[test]
fn test_prune_deep_depth_no_effect() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
for i in 0..4u8 {
trie.update_leaf(Nibbles::from_nibbles([i, 0x2, 0x3, 0x4]), value.clone(), &provider)
.unwrap();
}
trie.root();
let nodes_before = trie.revealed_node_count();
trie.prune(100);
assert_eq!(nodes_before, trie.revealed_node_count(), "deep prune should have no effect");
}
#[test]
fn test_prune_extension_node_depth_semantics() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
trie.update_leaf(Nibbles::from_nibbles([0, 1, 2, 3, 0, 5, 6, 7]), value.clone(), &provider)
.unwrap();
trie.update_leaf(Nibbles::from_nibbles([0, 1, 2, 3, 1, 5, 6, 7]), value, &provider)
.unwrap();
let root_before = trie.root();
trie.prune(1);
assert_eq!(root_before, trie.root(), "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), 2, "should have root + extension after prune(1)");
}
#[test]
fn test_prune_embedded_node_preserved() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let small_value = vec![0x80];
trie.update_leaf(Nibbles::from_nibbles([0x0]), small_value.clone(), &provider).unwrap();
trie.update_leaf(Nibbles::from_nibbles([0x1]), small_value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
trie.prune(0);
assert_eq!(root_before, trie.root(), "root hash must be preserved");
if trie.revealed_node_count() == nodes_before {
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x0])).is_some());
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x1])).is_some());
}
}
#[test]
fn test_prune_mixed_embedded_and_hashed() {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let large_value = large_account_value();
let small_value = vec![0x80];
for i in 0..8u8 {
let value = if i < 4 { large_value.clone() } else { small_value.clone() };
trie.update_leaf(Nibbles::from_nibbles([i, 0x1, 0x2, 0x3]), value, &provider).unwrap();
}
let root_before = trie.root();
trie.prune(0);
assert_eq!(root_before, trie.root(), "root hash must be preserved");
}
#[test]
fn test_prune_many_lower_subtries() {
let provider = DefaultTrieNodeProvider;
let large_value = large_account_value();
let mut keys = Vec::new();
for first in 0..16u8 {
for second in 0..16u8 {
keys.push(Nibbles::from_nibbles([first, second, 0x1, 0x2, 0x3, 0x4]));
}
}
let mut trie = ParallelSparseTrie::default();
for key in &keys {
trie.update_leaf(*key, large_value.clone(), &provider).unwrap();
}
let root_before = trie.root();
let pruned = trie.prune(1);
assert!(pruned > 0, "should have pruned some nodes");
assert_eq!(root_before, trie.root(), "root hash should be preserved");
for key in &keys {
assert!(trie.get_leaf_value(key).is_none(), "value should be pruned");
}
}
#[test]
#[ignore = "profiling test - run manually"]
fn test_prune_profile() {
use std::time::Instant;
let provider = DefaultTrieNodeProvider;
let large_value = large_account_value();
// Generate 65536 keys (16^4) for a large trie
let mut keys = Vec::with_capacity(65536);
for a in 0..16u8 {
for b in 0..16u8 {
for c in 0..16u8 {
for d in 0..16u8 {
keys.push(Nibbles::from_nibbles([a, b, c, d, 0x5, 0x6, 0x7, 0x8]));
}
}
}
}
// Build base trie once
let mut base_trie = ParallelSparseTrie::default();
for key in &keys {
base_trie.update_leaf(*key, large_value.clone(), &provider).unwrap();
}
base_trie.root(); // ensure hashes computed
// Pre-clone tries to exclude clone time from profiling
let iterations = 100;
let mut tries: Vec<_> = (0..iterations).map(|_| base_trie.clone()).collect();
// Measure only prune()
let mut total_pruned = 0;
let start = Instant::now();
for trie in &mut tries {
total_pruned += trie.prune(2);
}
let elapsed = start.elapsed();
println!(
"Prune benchmark: {} iterations, total: {:?}, avg: {:?}, pruned/iter: {}",
iterations,
elapsed,
elapsed / iterations as u32,
total_pruned / iterations
);
}
#[test]
fn test_prune_max_depth_overflow() {
// Verify that max_depth > 255 is not truncated (was u8, now usize)
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
for i in 0..4u8 {
trie.update_leaf(Nibbles::from_nibbles([i, 0x1, 0x2, 0x3]), value.clone(), &provider)
.unwrap();
}
trie.root();
let nodes_before = trie.revealed_node_count();
// If depth were truncated to u8, 300 would become 44 and might prune something
trie.prune(300);
assert_eq!(
nodes_before,
trie.revealed_node_count(),
"prune(300) should have no effect on a shallow trie"
);
}
#[test]
fn test_prune_fast_path_case2_update_after() {
// Test fast-path Case 2: upper prune root is prefix of lower subtrie.
// After pruning, we should be able to update leaves without panic.
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
let value = large_account_value();
// Create keys that span into lower subtries (path.len() >= UPPER_TRIE_MAX_DEPTH)
// UPPER_TRIE_MAX_DEPTH is typically 2, so paths of length 3+ go to lower subtries
for first in 0..4u8 {
for second in 0..4u8 {
trie.update_leaf(
Nibbles::from_nibbles([first, second, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]),
value.clone(),
&provider,
)
.unwrap();
}
}
let root_before = trie.root();
// Prune at depth 0 - upper roots become prefixes of lower subtrie paths
trie.prune(0);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved");
// Now try to update a leaf - this should not panic even though lower subtries
// were replaced with Blind(None)
let new_path = Nibbles::from_nibbles([0x5, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]);
trie.update_leaf(new_path, value, &provider).unwrap();
// The trie should still be functional
let _ = trie.root();
}
}

View File

@@ -5,12 +5,6 @@
extern crate alloc;
/// Default depth to prune sparse tries to for cross-payload caching.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default number of storage tries to preserve across payload validations.
pub const DEFAULT_MAX_PRESERVED_STORAGE_TRIES: usize = 100;
mod state;
pub use state::*;

View File

@@ -1,6 +1,6 @@
use crate::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
traits::{SparseTrie as SparseTrieTrait, SparseTrieExt},
traits::SparseTrie as SparseTrieTrait,
RevealableSparseTrie, SerialSparseTrie,
};
use alloc::{collections::VecDeque, vec::Vec};
@@ -972,117 +972,6 @@ where
}
}
impl<A, S> SparseStateTrie<A, S>
where
A: SparseTrieTrait + SparseTrieExt + Default,
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
{
/// Minimum number of storage tries before parallel pruning is enabled.
const PARALLEL_PRUNE_THRESHOLD: usize = 16;
/// Returns true if parallelism should be enabled for pruning the given number of tries.
/// Will always return false in `no_std` builds.
const fn is_prune_parallelism_enabled(num_tries: usize) -> bool {
#[cfg(not(feature = "std"))]
return false;
num_tries >= Self::PARALLEL_PRUNE_THRESHOLD
}
/// Prunes the account trie and selected storage tries to reduce memory usage.
///
/// Storage tries not in the top `max_storage_tries` by revealed node count are cleared
/// entirely.
///
/// # Preconditions
///
/// Node hashes must be computed via `root()` before calling this method. Otherwise, nodes
/// cannot be converted to hash stubs and pruning will have no effect.
///
/// # Effects
///
/// - Clears `revealed_account_paths` and `revealed_paths` for all storage tries
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
if let Some(trie) = self.state.as_revealed_mut() {
trie.prune(max_depth);
}
self.revealed_account_paths.clear();
let mut storage_trie_counts: Vec<(B256, usize)> = self
.storage
.tries
.iter()
.map(|(hash, trie)| {
let count = match trie {
RevealableSparseTrie::Revealed(t) => t.revealed_node_count(),
RevealableSparseTrie::Blind(_) => 0,
};
(*hash, count)
})
.collect();
// Use O(n) selection instead of O(n log n) sort
let tries_to_keep: HashSet<B256> = if storage_trie_counts.len() <= max_storage_tries {
storage_trie_counts.iter().map(|(hash, _)| *hash).collect()
} else {
storage_trie_counts
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.1.cmp(&a.1));
storage_trie_counts[..max_storage_tries].iter().map(|(hash, _)| *hash).collect()
};
// Collect keys to avoid borrow conflict
let tries_to_clear: Vec<B256> = self
.storage
.tries
.keys()
.filter(|hash| !tries_to_keep.contains(*hash))
.copied()
.collect();
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
for hash in tries_to_clear {
if let Some(trie) = self.storage.tries.remove(&hash) {
self.storage.cleared_tries.push(trie.clear());
}
if let Some(mut paths) = self.storage.revealed_paths.remove(&hash) {
paths.clear();
self.storage.cleared_revealed_paths.push(paths);
}
}
// Prune storage tries that are kept
if Self::is_prune_parallelism_enabled(tries_to_keep.len()) {
#[cfg(feature = "std")]
{
use rayon::prelude::*;
self.storage.tries.par_iter_mut().for_each(|(hash, trie)| {
if tries_to_keep.contains(hash) &&
let Some(t) = trie.as_revealed_mut()
{
t.prune(max_depth);
}
});
}
} else {
for hash in &tries_to_keep {
if let Some(trie) =
self.storage.tries.get_mut(hash).and_then(|t| t.as_revealed_mut())
{
trie.prune(max_depth);
}
}
}
// Clear revealed_paths for kept tries
for hash in &tries_to_keep {
if let Some(paths) = self.storage.revealed_paths.get_mut(hash) {
paths.clear();
}
}
}
}
/// The fields of [`SparseStateTrie`] related to storage tries. This is kept separate from the rest
/// of [`SparseStateTrie`] both to help enforce allocation re-use and to allow us to implement
/// methods like `get_trie_and_revealed_paths` which return multiple mutable borrows.
@@ -1371,7 +1260,7 @@ mod tests {
use reth_trie::{updates::StorageTrieUpdates, HashBuilder, MultiProof, EMPTY_ROOT_HASH};
use reth_trie_common::{
proof::{ProofNodes, ProofRetainer},
BranchNode, BranchNodeMasks, BranchNodeMasksMap, LeafNode, StorageMultiProof, TrieMask,
BranchNode, BranchNodeMasks, LeafNode, StorageMultiProof, TrieMask,
};
#[test]

View File

@@ -232,36 +232,6 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
fn shrink_values_to(&mut self, size: usize);
}
/// Extension trait for sparse tries that support pruning.
///
/// This trait provides the `prune` method for sparse trie implementations that support
/// converting nodes beyond a certain depth into hash stubs. This is useful for reducing
/// memory usage when caching tries across payload validations.
pub trait SparseTrieExt: SparseTrie {
/// Returns the number of revealed (non-Hash) nodes in the trie.
fn revealed_node_count(&self) -> usize;
/// Replaces nodes beyond `max_depth` with hash stubs and removes their descendants.
///
/// Depth counts nodes traversed (not nibbles), so extension nodes count as 1 depth
/// regardless of key length. `max_depth == 0` prunes all children of the root node.
///
/// # Preconditions
///
/// Must be called after `root()` to ensure all nodes have computed hashes.
/// Calling on a trie without computed hashes will result in no pruning.
///
/// # Behavior
///
/// - Embedded nodes (RLP < 32 bytes) are preserved since they have no hash
/// - Returns 0 if `max_depth` exceeds trie depth or trie is empty
///
/// # Returns
///
/// The number of nodes converted to hash stubs.
fn prune(&mut self, max_depth: usize) -> usize;
}
/// Tracks modifications to the sparse trie structure.
///
/// Maintains references to both modified and pruned/removed branches, enabling

View File

@@ -79,7 +79,7 @@ impl HashedPostStateCursorValue for U256 {
type NonZero = Self;
fn into_option(self) -> Option<Self::NonZero> {
(!self.is_zero()).then_some(self)
(self != Self::ZERO).then_some(self)
}
}
@@ -351,7 +351,7 @@ where
/// [`HashedCursor::next`].
fn is_storage_empty(&mut self) -> Result<bool, DatabaseError> {
// Storage is not empty if it has non-zero slots.
if self.post_state_cursor.has_any(|(_, value)| !value.is_zero()) {
if self.post_state_cursor.has_any(|(_, value)| value.into_option().is_some()) {
return Ok(false);
}

View File

@@ -367,7 +367,7 @@ where
let target_nibbles = targets.into_iter().map(Nibbles::unpack).collect::<Vec<_>>();
let mut prefix_set = self.prefix_set;
prefix_set.extend_keys(target_nibbles.iter().copied());
prefix_set.extend_keys(target_nibbles.clone());
let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?;

View File

@@ -109,20 +109,39 @@ where
T: TrieCursorFactory,
H: HashedCursorFactory,
{
// Synchronously computes the storage root for this account and RLP-encodes the resulting
// `TrieAccount` into `buf`
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
// Create cursors for storage proof calculation
let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?;
let hashed_cursor =
self.hashed_cursor_factory.hashed_storage_cursor(self.hashed_address)?;
// Create storage proof calculator with StorageValueEncoder
let mut storage_proof_calculator = ProofCalculator::new_storage(trie_cursor, hashed_cursor);
let proof = storage_proof_calculator
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])?;
// Compute storage root by calling storage_proof with the root path as a target.
// This returns just the root node of the storage trie.
let storage_root = storage_proof_calculator
.compute_root_hash(&proof)?
.expect("storage_proof with dummy target always returns root");
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])
.map(|nodes| {
// Encode the root node to RLP and hash it
let root_node =
nodes.first().expect("storage_proof always returns at least the root");
root_node.node.encode(buf);
let storage_root = alloy_primitives::keccak256(buf.as_slice());
// Clear the buffer so we can re-use it to encode the TrieAccount
buf.clear();
storage_root
})?;
// Combine account with storage root to create TrieAccount
let trie_account = self.account.into_trie_account(storage_root);
// Encode the trie account
trie_account.encode(buf);
Ok(())

View File

@@ -60,9 +60,8 @@ impl CursorSubNode {
let position = node.as_ref().filter(|n| n.root_hash.is_none()).map_or(
SubNodePosition::ParentBranch,
|n| {
let mut child_index_range = CHILD_INDEX_RANGE;
SubNodePosition::Child(
child_index_range.find(|i| n.state_mask.is_bit_set(*i)).unwrap(),
CHILD_INDEX_RANGE.clone().find(|i| n.state_mask.is_bit_set(*i)).unwrap(),
)
},
);

View File

@@ -115,10 +115,9 @@ where
} else {
self.get_proof_targets(&state)?
};
let prefix_sets = core::mem::take(&mut self.prefix_sets);
let multiproof =
Proof::new(self.trie_cursor_factory.clone(), self.hashed_cursor_factory.clone())
.with_prefix_sets_mut(prefix_sets)
.with_prefix_sets_mut(self.prefix_sets.clone())
.multiproof(proof_targets.clone())?;
// No need to reconstruct the rest of the trie, we just need to include

View File

@@ -184,7 +184,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -192,7 +192,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -919,7 +919,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -927,7 +927,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -1001,8 +1001,8 @@ Engine:
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
--engine.disable-proof-v2
Disable V2 storage proofs for state root calculations
--engine.enable-proof-v2
Enable V2 storage proofs for state root calculations
--engine.disable-cache-metrics
Disable cache metrics recording, which can take up to 50ms with large cached state

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -175,7 +175,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -183,7 +183,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -173,7 +173,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -181,7 +181,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -184,7 +184,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -192,7 +192,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -919,7 +919,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -927,7 +927,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -1001,8 +1001,8 @@ Engine:
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
--engine.disable-proof-v2
Disable V2 storage proofs for state root calculations
--engine.enable-proof-v2
Enable V2 storage proofs for state root calculations
--engine.disable-cache-metrics
Disable cache metrics recording, which can take up to 50ms with large cached state

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -175,7 +175,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -183,7 +183,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -168,7 +168,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -176,7 +176,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]

View File

@@ -173,7 +173,7 @@ RocksDB:
--rocksdb.storages-history <STORAGES_HISTORY>
Route storages history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]
@@ -181,7 +181,7 @@ RocksDB:
--rocksdb.account-history <ACCOUNT_HISTORY>
Route account history tables to `RocksDB` instead of MDBX.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
[default: false]
[possible values: true, false]