mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
matt/rm-lo
...
feat/libmd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79829b76a3 | ||
|
|
507520114c |
21
.github/workflows/e2e.yml
vendored
21
.github/workflows/e2e.yml
vendored
@@ -44,24 +44,3 @@ jobs:
|
||||
--exclude 'op-reth' \
|
||||
--exclude 'reth' \
|
||||
-E 'binary(e2e_testsuite)'
|
||||
|
||||
rocksdb:
|
||||
name: e2e-rocksdb
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: mozilla-actions/sccache-action@v0.0.9
|
||||
- uses: taiki-e/install-action@nextest
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: Run RocksDB e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10248,7 +10248,6 @@ dependencies = [
|
||||
"strum 0.27.2",
|
||||
"thiserror 2.0.18",
|
||||
"toml",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -68,8 +68,6 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
|
||||
|
||||
RUN sccache --show-stats || true
|
||||
|
||||
# Copy binary to a known location (ARG not resolved in COPY)
|
||||
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
|
||||
RUN cp /app/target/$BUILD_PROFILE/$BINARY /app/binary || \
|
||||
|
||||
@@ -401,38 +401,25 @@ impl Command {
|
||||
let mut current_block = start_block;
|
||||
|
||||
for payload_idx in 0..count {
|
||||
const MAX_RETRIES: u32 = 5;
|
||||
let mut attempts = 0;
|
||||
let result = loop {
|
||||
attempts += 1;
|
||||
match collector.collect(current_block).await {
|
||||
Ok(res) => break Some(res),
|
||||
Err(e) => {
|
||||
if attempts >= MAX_RETRIES {
|
||||
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries");
|
||||
break None;
|
||||
}
|
||||
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
match collector.collect(current_block).await {
|
||||
Ok((transactions, total_gas, next_block)) => {
|
||||
info!(
|
||||
payload = payload_idx + 1,
|
||||
tx_count = transactions.len(),
|
||||
total_gas,
|
||||
blocks = format!("{}..{}", current_block, next_block),
|
||||
"Fetched transactions"
|
||||
);
|
||||
current_block = next_block;
|
||||
|
||||
if tx_sender.send(transactions).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let Some((transactions, total_gas, next_block)) = result else {
|
||||
break;
|
||||
};
|
||||
|
||||
info!(
|
||||
payload = payload_idx + 1,
|
||||
tx_count = transactions.len(),
|
||||
total_gas,
|
||||
blocks = format!("{}..{}", current_block, next_block),
|
||||
"Fetched transactions"
|
||||
);
|
||||
current_block = next_block;
|
||||
|
||||
if tx_sender.send(transactions).await.is_err() {
|
||||
break;
|
||||
Err(e) => {
|
||||
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -101,8 +101,8 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
|
||||
// We may be using the tui for a long time
|
||||
tx.disable_long_read_transaction_safety();
|
||||
|
||||
let table_db = tx.inner().open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
|
||||
let stats = tx.inner().db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
|
||||
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
|
||||
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
|
||||
let total_entries = stats.entries();
|
||||
let final_entry_idx = total_entries.saturating_sub(1);
|
||||
if self.args.skip > final_entry_idx {
|
||||
|
||||
@@ -92,10 +92,10 @@ impl Command {
|
||||
db_tables.sort();
|
||||
let mut total_size = 0;
|
||||
for db_table in db_tables {
|
||||
let table_db = tx.inner().open_db(Some(db_table)).wrap_err("Could not open db.")?;
|
||||
let table_db = tx.inner.open_db(Some(db_table)).wrap_err("Could not open db.")?;
|
||||
|
||||
let stats = tx
|
||||
.inner()
|
||||
.inner
|
||||
.db_stat(table_db.dbi())
|
||||
.wrap_err(format!("Could not find table: {db_table}"))?;
|
||||
|
||||
@@ -136,9 +136,9 @@ impl Command {
|
||||
.add_cell(Cell::new(human_bytes(total_size as f64)));
|
||||
table.add_row(row);
|
||||
|
||||
let freelist = tx.inner().env().freelist()?;
|
||||
let freelist = tx.inner.env().freelist()?;
|
||||
let pagesize =
|
||||
tx.inner().db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
|
||||
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
|
||||
let freelist_size = freelist * pagesize;
|
||||
|
||||
let mut row = Row::new();
|
||||
|
||||
@@ -79,4 +79,4 @@ path = "tests/rocksdb/main.rs"
|
||||
required-features = ["edge"]
|
||||
|
||||
[features]
|
||||
edge = ["reth-node-core/edge", "reth-provider/rocksdb", "reth-cli-commands/edge"]
|
||||
edge = ["reth-node-core/edge"]
|
||||
|
||||
@@ -125,10 +125,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -331,7 +328,6 @@ mod tests {
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -396,7 +392,6 @@ mod tests {
|
||||
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -495,10 +490,7 @@ mod tests {
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -38,18 +38,6 @@ impl TransactionTestContext {
|
||||
signed.encoded_2718().into()
|
||||
}
|
||||
|
||||
/// Creates a transfer with a specific nonce and signs it, returning bytes.
|
||||
/// Uses high `max_fee_per_gas` (1000 gwei) to ensure tx acceptance regardless of basefee.
|
||||
pub async fn transfer_tx_bytes_with_nonce(
|
||||
chain_id: u64,
|
||||
wallet: PrivateKeySigner,
|
||||
nonce: u64,
|
||||
) -> Bytes {
|
||||
let tx = tx(chain_id, 21000, None, None, nonce, Some(1000e9 as u128));
|
||||
let signed = Self::sign_tx(wallet, tx).await;
|
||||
signed.encoded_2718().into()
|
||||
}
|
||||
|
||||
/// Creates a deployment transaction and signs it, returning an envelope.
|
||||
pub async fn deploy_tx(
|
||||
chain_id: u64,
|
||||
|
||||
@@ -8,67 +8,12 @@ use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
|
||||
use eyre::Result;
|
||||
use jsonrpsee::core::client::ClientT;
|
||||
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
|
||||
use reth_db::tables;
|
||||
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
|
||||
use reth_e2e_test_utils::{transaction::TransactionTestContext, E2ETestSetupBuilder};
|
||||
use reth_node_builder::NodeConfig;
|
||||
use reth_node_core::args::RocksDbArgs;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_payload_builder::EthPayloadBuilderAttributes;
|
||||
use reth_provider::{RocksDBProviderFactory, StorageSettings};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
const ROCKSDB_POLL_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
const ROCKSDB_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
|
||||
/// Polls RPC until the given `tx_hash` is visible as pending (not yet mined).
|
||||
/// Prevents race conditions where `advance_block` is called before txs are in the pool.
|
||||
/// Returns the pending transaction.
|
||||
async fn wait_for_pending_tx<C: ClientT>(client: &C, tx_hash: B256) -> Transaction {
|
||||
let start = std::time::Instant::now();
|
||||
loop {
|
||||
let tx: Option<Transaction> = client
|
||||
.request("eth_getTransactionByHash", [tx_hash])
|
||||
.await
|
||||
.expect("RPC request failed");
|
||||
if let Some(tx) = tx {
|
||||
assert!(
|
||||
tx.block_number.is_none(),
|
||||
"Expected pending tx but tx_hash={tx_hash:?} is already mined in block {:?}",
|
||||
tx.block_number
|
||||
);
|
||||
return tx;
|
||||
}
|
||||
assert!(
|
||||
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
|
||||
"Timed out after {:?} waiting for tx_hash={tx_hash:?} to appear in pending pool",
|
||||
start.elapsed()
|
||||
);
|
||||
tokio::time::sleep(ROCKSDB_POLL_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls `RocksDB` until the given `tx_hash` appears in `TransactionHashNumbers`.
|
||||
/// Returns the `tx_number` on success, or panics on timeout.
|
||||
async fn poll_tx_in_rocksdb<P: RocksDBProviderFactory>(provider: &P, tx_hash: B256) -> u64 {
|
||||
let start = std::time::Instant::now();
|
||||
let mut interval = ROCKSDB_POLL_INTERVAL;
|
||||
loop {
|
||||
// Re-acquire handle each iteration to avoid stale snapshot reads
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
let tx_number: Option<u64> =
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(tx_hash).expect("RocksDB get failed");
|
||||
if let Some(n) = tx_number {
|
||||
return n;
|
||||
}
|
||||
assert!(
|
||||
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
|
||||
"Timed out after {:?} waiting for tx_hash={tx_hash:?} in RocksDB",
|
||||
start.elapsed()
|
||||
);
|
||||
tokio::time::sleep(interval).await;
|
||||
// Simple backoff: 50ms -> 100ms -> 200ms (capped)
|
||||
interval = std::cmp::min(interval * 2, Duration::from_millis(200));
|
||||
}
|
||||
}
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Returns the test chain spec for `RocksDB` tests.
|
||||
fn test_chain_spec() -> Arc<ChainSpec> {
|
||||
@@ -96,24 +41,10 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
|
||||
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
|
||||
}
|
||||
|
||||
/// Verifies that `RocksDB` CLI defaults match `StorageSettings::base()`.
|
||||
#[test]
|
||||
fn test_rocksdb_defaults_match_storage_settings() {
|
||||
let args = RocksDbArgs::default();
|
||||
let settings = StorageSettings::base();
|
||||
|
||||
assert_eq!(
|
||||
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
|
||||
"tx_hash default should match StorageSettings::base()"
|
||||
);
|
||||
assert_eq!(
|
||||
args.storages_history, settings.storages_history_in_rocksdb,
|
||||
"storages_history default should match StorageSettings::base()"
|
||||
);
|
||||
assert_eq!(
|
||||
args.account_history, settings.account_history_in_rocksdb,
|
||||
"account_history default should match StorageSettings::base()"
|
||||
);
|
||||
/// Enables `RocksDB` for all supported tables.
|
||||
fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
|
||||
config.rocksdb = RocksDbArgs { all: true, ..Default::default() };
|
||||
config
|
||||
}
|
||||
|
||||
/// Smoke test: node boots with `RocksDB` routing enabled.
|
||||
@@ -125,16 +56,19 @@ async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_node_config_modifier(with_rocksdb_enabled)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
// Verify RocksDB provider is functional (can query without error)
|
||||
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
|
||||
let missing_hash = B256::from([0xab; 32]);
|
||||
let result: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
|
||||
assert!(result.is_none(), "Missing hash should return None");
|
||||
// Verify RocksDB directory exists
|
||||
let rocksdb_path = nodes[0].inner.data_dir.rocksdb();
|
||||
assert!(rocksdb_path.exists(), "RocksDB directory should exist at {rocksdb_path:?}");
|
||||
assert!(
|
||||
std::fs::read_dir(&rocksdb_path).map(|mut d| d.next().is_some()).unwrap_or(false),
|
||||
"RocksDB directory should be non-empty"
|
||||
);
|
||||
|
||||
let genesis_hash = nodes[0].block_hash(0);
|
||||
assert_ne!(genesis_hash, B256::ZERO);
|
||||
@@ -148,10 +82,10 @@ async fn test_rocksdb_block_mining() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_node_config_modifier(with_rocksdb_enabled)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
@@ -160,30 +94,12 @@ async fn test_rocksdb_block_mining() -> Result<()> {
|
||||
let genesis_hash = nodes[0].block_hash(0);
|
||||
assert_ne!(genesis_hash, B256::ZERO);
|
||||
|
||||
// Mine 3 blocks with transactions
|
||||
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
let client = nodes[0].rpc_client().expect("RPC client should be available");
|
||||
|
||||
for i in 1..=3u64 {
|
||||
let raw_tx =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), i - 1)
|
||||
.await;
|
||||
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
|
||||
|
||||
// Wait for tx to enter pending pool before mining
|
||||
wait_for_pending_tx(&client, tx_hash).await;
|
||||
|
||||
// Mine 3 blocks
|
||||
for i in 1..=3 {
|
||||
let payload = nodes[0].advance_block().await?;
|
||||
let block = payload.block();
|
||||
assert_eq!(block.number(), i);
|
||||
assert_ne!(block.hash(), B256::ZERO);
|
||||
|
||||
// Verify tx was actually included in the block
|
||||
let receipt: Option<TransactionReceipt> =
|
||||
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
|
||||
let receipt = receipt.expect("Receipt should exist after mining");
|
||||
assert_eq!(receipt.block_number, Some(i), "Tx should be in block {i}");
|
||||
}
|
||||
|
||||
// Verify all blocks are stored
|
||||
@@ -203,53 +119,48 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
)
|
||||
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
|
||||
.build()
|
||||
.await?;
|
||||
let (mut nodes, _tasks, wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_node_config_modifier(with_rocksdb_enabled)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
// Inject and mine a transaction
|
||||
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
let mut tx_hashes = Vec::new();
|
||||
|
||||
// Inject and mine 3 transactions (new wallet per tx to avoid nonce tracking)
|
||||
for i in 0..3 {
|
||||
let wallets = wallet.wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
|
||||
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
|
||||
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
|
||||
tx_hashes.push(tx_hash);
|
||||
|
||||
let payload = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload.block().number(), i + 1);
|
||||
}
|
||||
|
||||
let client = nodes[0].rpc_client().expect("RPC client should be available");
|
||||
|
||||
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
|
||||
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
|
||||
|
||||
// Wait for tx to enter pending pool before mining
|
||||
wait_for_pending_tx(&client, tx_hash).await;
|
||||
|
||||
let payload = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload.block().number(), 1);
|
||||
|
||||
// Query each transaction by hash
|
||||
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
|
||||
let tx = tx.expect("Transaction should be found");
|
||||
assert_eq!(tx.block_number, Some(1));
|
||||
for (i, tx_hash) in tx_hashes.iter().enumerate() {
|
||||
let expected_block_number = (i + 1) as u64;
|
||||
|
||||
let receipt: Option<TransactionReceipt> =
|
||||
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
|
||||
let receipt = receipt.expect("Receipt should be found");
|
||||
assert_eq!(receipt.block_number, Some(1));
|
||||
assert!(receipt.status());
|
||||
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
|
||||
let tx = tx.expect("Transaction should be found");
|
||||
assert_eq!(tx.block_number, Some(expected_block_number));
|
||||
|
||||
// Direct RocksDB assertion - poll with timeout since persistence is async
|
||||
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
|
||||
assert_eq!(tx_number, 0, "First tx should have TxNumber 0");
|
||||
let receipt: Option<TransactionReceipt> =
|
||||
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
|
||||
let receipt = receipt.expect("Receipt should be found");
|
||||
assert_eq!(receipt.block_number, Some(expected_block_number));
|
||||
assert!(receipt.status());
|
||||
}
|
||||
|
||||
// Verify missing hash returns None
|
||||
// Negative test: querying a non-existent tx hash returns None
|
||||
let missing_hash = B256::from([0xde; 32]);
|
||||
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
|
||||
let missing_tx_number: Option<u64> =
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
|
||||
assert!(missing_tx_number.is_none());
|
||||
|
||||
let missing_tx: Option<Transaction> =
|
||||
client.request("eth_getTransactionByHash", [missing_hash]).await?;
|
||||
assert!(missing_tx.is_none(), "expected no transaction for missing hash");
|
||||
@@ -260,212 +171,3 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Multiple transactions in the same block are correctly persisted to `RocksDB`.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
)
|
||||
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
// Create 3 txs from the same wallet with sequential nonces
|
||||
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
let client = nodes[0].rpc_client().expect("RPC client");
|
||||
|
||||
let mut tx_hashes = Vec::new();
|
||||
for nonce in 0..3 {
|
||||
let raw_tx =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), nonce)
|
||||
.await;
|
||||
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
|
||||
tx_hashes.push(tx_hash);
|
||||
}
|
||||
|
||||
// Wait for all txs to appear in pending pool before mining
|
||||
for tx_hash in &tx_hashes {
|
||||
wait_for_pending_tx(&client, *tx_hash).await;
|
||||
}
|
||||
|
||||
// Mine one block containing all 3 txs
|
||||
let payload = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload.block().number(), 1);
|
||||
|
||||
// Verify block contains all 3 txs
|
||||
let block: Option<alloy_rpc_types_eth::Block> =
|
||||
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
|
||||
let block = block.expect("Block 1 should exist");
|
||||
assert_eq!(block.transactions.len(), 3, "Block should contain 3 txs");
|
||||
|
||||
// Verify each tx via RPC
|
||||
for tx_hash in &tx_hashes {
|
||||
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
|
||||
let tx = tx.expect("Transaction should be found");
|
||||
assert_eq!(tx.block_number, Some(1), "All txs should be in block 1");
|
||||
}
|
||||
|
||||
// Poll RocksDB for all tx hashes and collect tx_numbers
|
||||
let mut tx_numbers = Vec::new();
|
||||
for tx_hash in &tx_hashes {
|
||||
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
|
||||
tx_numbers.push(n);
|
||||
}
|
||||
|
||||
// Verify tx_numbers form the set {0, 1, 2}
|
||||
tx_numbers.sort();
|
||||
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be 0, 1, 2");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Transactions across multiple blocks have globally continuous `tx_numbers`.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_txs_across_blocks() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
)
|
||||
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
let client = nodes[0].rpc_client().expect("RPC client");
|
||||
|
||||
// Block 1: 2 transactions
|
||||
let tx_hash_0 = nodes[0]
|
||||
.rpc
|
||||
.inject_tx(
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 0).await,
|
||||
)
|
||||
.await?;
|
||||
let tx_hash_1 = nodes[0]
|
||||
.rpc
|
||||
.inject_tx(
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Wait for both txs to appear in pending pool
|
||||
wait_for_pending_tx(&client, tx_hash_0).await;
|
||||
wait_for_pending_tx(&client, tx_hash_1).await;
|
||||
|
||||
let payload1 = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload1.block().number(), 1);
|
||||
|
||||
// Block 2: 1 transaction
|
||||
let tx_hash_2 = nodes[0]
|
||||
.rpc
|
||||
.inject_tx(
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 2).await,
|
||||
)
|
||||
.await?;
|
||||
|
||||
wait_for_pending_tx(&client, tx_hash_2).await;
|
||||
|
||||
let payload2 = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload2.block().number(), 2);
|
||||
|
||||
// Verify block contents via RPC
|
||||
let tx0: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
|
||||
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_1]).await?;
|
||||
let tx2: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_2]).await?;
|
||||
|
||||
assert_eq!(tx0.expect("tx0").block_number, Some(1));
|
||||
assert_eq!(tx1.expect("tx1").block_number, Some(1));
|
||||
assert_eq!(tx2.expect("tx2").block_number, Some(2));
|
||||
|
||||
// Poll RocksDB and verify global tx_number continuity
|
||||
let all_tx_hashes = [tx_hash_0, tx_hash_1, tx_hash_2];
|
||||
let mut tx_numbers = Vec::new();
|
||||
for tx_hash in &all_tx_hashes {
|
||||
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
|
||||
tx_numbers.push(n);
|
||||
}
|
||||
|
||||
// Verify they form a continuous sequence {0, 1, 2}
|
||||
tx_numbers.sort();
|
||||
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be globally continuous: 0, 1, 2");
|
||||
|
||||
// Re-query block 1 txs after block 2 is mined (regression guard)
|
||||
let tx0_again: Option<Transaction> =
|
||||
client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
|
||||
assert!(tx0_again.is_some(), "Block 1 tx should still be queryable after block 2");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pending transactions should NOT appear in `RocksDB` until mined.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
)
|
||||
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
|
||||
let signer = wallets[0].clone();
|
||||
|
||||
// Inject tx but do NOT mine
|
||||
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
|
||||
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
|
||||
|
||||
// Verify tx is in pending pool via RPC
|
||||
let client = nodes[0].rpc_client().expect("RPC client");
|
||||
wait_for_pending_tx(&client, tx_hash).await;
|
||||
|
||||
let pending_tx: Option<Transaction> =
|
||||
client.request("eth_getTransactionByHash", [tx_hash]).await?;
|
||||
assert!(pending_tx.is_some(), "Pending tx should be visible via RPC");
|
||||
assert!(pending_tx.unwrap().block_number.is_none(), "Pending tx should have no block_number");
|
||||
|
||||
// Assert tx is NOT in RocksDB before mining (single check - tx is confirmed pending)
|
||||
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
|
||||
let tx_number: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(tx_hash)?;
|
||||
assert!(
|
||||
tx_number.is_none(),
|
||||
"Pending tx should NOT be in RocksDB before mining, but found tx_number={:?}",
|
||||
tx_number
|
||||
);
|
||||
|
||||
// Now mine the block
|
||||
let payload = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload.block().number(), 1);
|
||||
|
||||
// Poll until tx appears in RocksDB
|
||||
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
|
||||
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
|
||||
|
||||
// Verify tx is now mined via RPC
|
||||
let mined_tx: Option<Transaction> =
|
||||
client.request("eth_getTransactionByHash", [tx_hash]).await?;
|
||||
assert_eq!(mined_tx.expect("mined tx").block_number, Some(1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -148,8 +148,8 @@ pub struct TreeConfig {
|
||||
storage_worker_count: usize,
|
||||
/// Number of account proof worker threads.
|
||||
account_worker_count: usize,
|
||||
/// Whether to disable V2 storage proofs.
|
||||
disable_proof_v2: bool,
|
||||
/// Whether to enable V2 storage proofs.
|
||||
enable_proof_v2: bool,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
}
|
||||
@@ -179,7 +179,7 @@ impl Default for TreeConfig {
|
||||
allow_unwind_canonical_header: false,
|
||||
storage_worker_count: default_storage_worker_count(),
|
||||
account_worker_count: default_account_worker_count(),
|
||||
disable_proof_v2: false,
|
||||
enable_proof_v2: false,
|
||||
disable_cache_metrics: false,
|
||||
}
|
||||
}
|
||||
@@ -211,7 +211,7 @@ impl TreeConfig {
|
||||
allow_unwind_canonical_header: bool,
|
||||
storage_worker_count: usize,
|
||||
account_worker_count: usize,
|
||||
disable_proof_v2: bool,
|
||||
enable_proof_v2: bool,
|
||||
disable_cache_metrics: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -237,7 +237,7 @@ impl TreeConfig {
|
||||
allow_unwind_canonical_header,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
enable_proof_v2,
|
||||
disable_cache_metrics,
|
||||
}
|
||||
}
|
||||
@@ -280,8 +280,7 @@ impl TreeConfig {
|
||||
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
|
||||
/// and the chunk size is at the default value.
|
||||
pub const fn effective_multiproof_chunk_size(&self) -> usize {
|
||||
if !self.disable_proof_v2 &&
|
||||
self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
|
||||
if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
|
||||
{
|
||||
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
|
||||
} else {
|
||||
@@ -519,14 +518,14 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Return whether V2 storage proofs are disabled.
|
||||
pub const fn disable_proof_v2(&self) -> bool {
|
||||
self.disable_proof_v2
|
||||
/// Return whether V2 storage proofs are enabled.
|
||||
pub const fn enable_proof_v2(&self) -> bool {
|
||||
self.enable_proof_v2
|
||||
}
|
||||
|
||||
/// Setter for whether to disable V2 storage proofs.
|
||||
pub const fn with_disable_proof_v2(mut self, disable_proof_v2: bool) -> Self {
|
||||
self.disable_proof_v2 = disable_proof_v2;
|
||||
/// Setter for whether to enable V2 storage proofs.
|
||||
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
|
||||
self.enable_proof_v2 = enable_proof_v2;
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -101,7 +101,7 @@ where
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
|
||||
let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
|
||||
blockchain_db,
|
||||
consensus,
|
||||
payload_validator,
|
||||
|
||||
@@ -312,7 +312,14 @@ impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
|
||||
match self.caches.get_or_try_insert_account_with(*address, || {
|
||||
self.state_provider.basic_account(address)
|
||||
})? {
|
||||
CachedStatus::NotCached(value) | CachedStatus::Cached(value) => Ok(value),
|
||||
CachedStatus::NotCached(value) => {
|
||||
self.metrics.account_cache_misses.increment(1);
|
||||
Ok(value)
|
||||
}
|
||||
CachedStatus::Cached(value) => {
|
||||
self.metrics.account_cache_hits.increment(1);
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
} else if let Some(account) = self.caches.account_cache.get(address) {
|
||||
self.metrics.account_cache_hits.increment(1);
|
||||
@@ -343,7 +350,14 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
|
||||
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
|
||||
})? {
|
||||
CachedStatus::NotCached(value) | CachedStatus::Cached(value) => {
|
||||
CachedStatus::NotCached(value) => {
|
||||
self.metrics.storage_cache_misses.increment(1);
|
||||
// The slot that was never written to is indistinguishable from a slot
|
||||
// explicitly set to zero. We return `None` in both cases.
|
||||
Ok(Some(value).filter(|v| !v.is_zero()))
|
||||
}
|
||||
CachedStatus::Cached(value) => {
|
||||
self.metrics.storage_cache_hits.increment(1);
|
||||
// The slot that was never written to is indistinguishable from a slot
|
||||
// explicitly set to zero. We return `None` in both cases.
|
||||
Ok(Some(value).filter(|v| !v.is_zero()))
|
||||
@@ -365,7 +379,14 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
|
||||
match self.caches.get_or_try_insert_code_with(*code_hash, || {
|
||||
self.state_provider.bytecode_by_hash(code_hash)
|
||||
})? {
|
||||
CachedStatus::NotCached(code) | CachedStatus::Cached(code) => Ok(code),
|
||||
CachedStatus::NotCached(code) => {
|
||||
self.metrics.code_cache_misses.increment(1);
|
||||
Ok(code)
|
||||
}
|
||||
CachedStatus::Cached(code) => {
|
||||
self.metrics.code_cache_hits.increment(1);
|
||||
Ok(code)
|
||||
}
|
||||
}
|
||||
} else if let Some(code) = self.caches.code_cache.get(code_hash) {
|
||||
self.metrics.code_cache_hits.increment(1);
|
||||
|
||||
@@ -1492,10 +1492,6 @@ where
|
||||
self.on_maybe_tree_event(res.event.take())?;
|
||||
}
|
||||
|
||||
if let Err(ref err) = output {
|
||||
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
|
||||
}
|
||||
|
||||
self.metrics.engine.forkchoice_updated.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.new_payload.latest_finish_at,
|
||||
|
||||
@@ -238,7 +238,7 @@ where
|
||||
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
|
||||
|
||||
// Extract V2 proofs flag early so we can pass it to prewarm
|
||||
let v2_proofs_enabled = !config.disable_proof_v2();
|
||||
let v2_proofs_enabled = config.enable_proof_v2();
|
||||
|
||||
// Handle BAL-based optimization if available
|
||||
let prewarm_handle = if let Some(bal) = bal {
|
||||
@@ -286,7 +286,9 @@ where
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
config
|
||||
.multiproof_chunking_enabled()
|
||||
.then_some(config.effective_multiproof_chunk_size()),
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof,
|
||||
)
|
||||
|
||||
@@ -563,7 +563,6 @@ where
|
||||
index,
|
||||
tx_hash = %tx.tx().tx_hash(),
|
||||
is_success = tracing::field::Empty,
|
||||
gas_used = tracing::field::Empty,
|
||||
)
|
||||
.entered();
|
||||
|
||||
|
||||
@@ -77,22 +77,8 @@ impl<R: Receipt> ReceiptRootTaskHandle<R> {
|
||||
receipt_with_bloom.encode_2718(&mut encode_buf);
|
||||
|
||||
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
|
||||
match builder.push(indexed_receipt.index, &encode_buf) {
|
||||
Ok(()) => {
|
||||
received_count += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
// If a duplicate or out-of-bounds index is streamed, skip it and
|
||||
// fall back to computing the receipt root from the full receipts
|
||||
// vector later.
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
index = indexed_receipt.index,
|
||||
?err,
|
||||
"Receipt root task received invalid receipt index, skipping"
|
||||
);
|
||||
}
|
||||
}
|
||||
builder.push_unchecked(indexed_receipt.index, &encode_buf);
|
||||
received_count += 1;
|
||||
}
|
||||
|
||||
let Ok(root) = builder.finalize() else {
|
||||
|
||||
@@ -792,11 +792,6 @@ where
|
||||
// Execute transactions
|
||||
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
|
||||
let mut transactions = transactions.into_iter();
|
||||
// Some executors may execute transactions that do not append receipts during the
|
||||
// main loop (e.g., system transactions whose receipts are added during finalization).
|
||||
// In that case, invoking the callback on every transaction would resend the previous
|
||||
// receipt with the same index and can panic the ordered root builder.
|
||||
let mut last_sent_len = 0usize;
|
||||
loop {
|
||||
// Measure time spent waiting for next transaction from iterator
|
||||
// (e.g., parallel signature recovery)
|
||||
@@ -823,14 +818,10 @@ where
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
// Send the latest receipt to the background task for incremental root computation.
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = current_len - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
}
|
||||
// Send the latest receipt to the background task for incremental root computation
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = executor.receipts().len() - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
}
|
||||
|
||||
enter.record("gas_used", gas_used);
|
||||
|
||||
@@ -251,7 +251,7 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
db,
|
||||
chain_spec.clone(),
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
|
||||
)?;
|
||||
|
||||
let genesis_hash = init_genesis(&provider_factory)?;
|
||||
|
||||
@@ -35,7 +35,7 @@ pub struct DefaultEngineValues {
|
||||
allow_unwind_canonical_header: bool,
|
||||
storage_worker_count: Option<usize>,
|
||||
account_worker_count: Option<usize>,
|
||||
disable_proof_v2: bool,
|
||||
enable_proof_v2: bool,
|
||||
cache_metrics_disabled: bool,
|
||||
}
|
||||
|
||||
@@ -161,9 +161,9 @@ impl DefaultEngineValues {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to disable proof V2 by default
|
||||
pub const fn with_disable_proof_v2(mut self, v: bool) -> Self {
|
||||
self.disable_proof_v2 = v;
|
||||
/// Set whether to enable proof V2 by default
|
||||
pub const fn with_enable_proof_v2(mut self, v: bool) -> Self {
|
||||
self.enable_proof_v2 = v;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ impl Default for DefaultEngineValues {
|
||||
allow_unwind_canonical_header: false,
|
||||
storage_worker_count: None,
|
||||
account_worker_count: None,
|
||||
disable_proof_v2: false,
|
||||
enable_proof_v2: false,
|
||||
cache_metrics_disabled: false,
|
||||
}
|
||||
}
|
||||
@@ -317,9 +317,9 @@ pub struct EngineArgs {
|
||||
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
|
||||
pub account_worker_count: Option<usize>,
|
||||
|
||||
/// Disable V2 storage proofs for state root calculations
|
||||
#[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)]
|
||||
pub disable_proof_v2: bool,
|
||||
/// Enable V2 storage proofs for state root calculations
|
||||
#[arg(long = "engine.enable-proof-v2", default_value_t = DefaultEngineValues::get_global().enable_proof_v2)]
|
||||
pub enable_proof_v2: bool,
|
||||
|
||||
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
|
||||
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
|
||||
@@ -348,7 +348,7 @@ impl Default for EngineArgs {
|
||||
allow_unwind_canonical_header,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
enable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
} = DefaultEngineValues::get_global().clone();
|
||||
Self {
|
||||
@@ -374,7 +374,7 @@ impl Default for EngineArgs {
|
||||
allow_unwind_canonical_header,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
enable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
}
|
||||
}
|
||||
@@ -410,7 +410,7 @@ impl EngineArgs {
|
||||
config = config.with_account_worker_count(count);
|
||||
}
|
||||
|
||||
config = config.with_disable_proof_v2(self.disable_proof_v2);
|
||||
config = config.with_enable_proof_v2(self.enable_proof_v2);
|
||||
config = config.without_cache_metrics(self.cache_metrics_disabled);
|
||||
|
||||
config
|
||||
@@ -462,7 +462,7 @@ mod tests {
|
||||
allow_unwind_canonical_header: true,
|
||||
storage_worker_count: Some(16),
|
||||
account_worker_count: Some(8),
|
||||
disable_proof_v2: false,
|
||||
enable_proof_v2: false,
|
||||
cache_metrics_disabled: true,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,27 +1,13 @@
|
||||
//! clap [Args](clap::Args) for `RocksDB` table routing configuration
|
||||
|
||||
use clap::{ArgAction, Args};
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
/// Default value for `tx_hash` routing flag.
|
||||
/// Default value for `RocksDB` routing flags.
|
||||
///
|
||||
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
|
||||
const fn default_tx_hash_in_rocksdb() -> bool {
|
||||
StorageSettings::base().transaction_hash_numbers_in_rocksdb
|
||||
}
|
||||
|
||||
/// Default value for `storages_history` routing flag.
|
||||
///
|
||||
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
|
||||
const fn default_storages_history_in_rocksdb() -> bool {
|
||||
StorageSettings::base().storages_history_in_rocksdb
|
||||
}
|
||||
|
||||
/// Default value for `account_history` routing flag.
|
||||
///
|
||||
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
|
||||
const fn default_account_history_in_rocksdb() -> bool {
|
||||
StorageSettings::base().account_history_in_rocksdb
|
||||
/// When the `edge` feature is enabled, defaults to `true` to enable edge storage features.
|
||||
/// Otherwise defaults to `false` for legacy behavior.
|
||||
const fn default_rocksdb_flag() -> bool {
|
||||
cfg!(feature = "edge")
|
||||
}
|
||||
|
||||
/// Parameters for `RocksDB` table routing configuration.
|
||||
@@ -42,21 +28,21 @@ pub struct RocksDbArgs {
|
||||
///
|
||||
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
|
||||
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
#[arg(long = "rocksdb.tx-hash", default_value_t = default_tx_hash_in_rocksdb(), action = ArgAction::Set)]
|
||||
#[arg(long = "rocksdb.tx-hash", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
|
||||
pub tx_hash: bool,
|
||||
|
||||
/// Route storages history tables to `RocksDB` instead of MDBX.
|
||||
///
|
||||
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
|
||||
/// Defaults to `false`.
|
||||
#[arg(long = "rocksdb.storages-history", default_value_t = default_storages_history_in_rocksdb(), action = ArgAction::Set)]
|
||||
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
#[arg(long = "rocksdb.storages-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
|
||||
pub storages_history: bool,
|
||||
|
||||
/// Route account history tables to `RocksDB` instead of MDBX.
|
||||
///
|
||||
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
|
||||
/// Defaults to `false`.
|
||||
#[arg(long = "rocksdb.account-history", default_value_t = default_account_history_in_rocksdb(), action = ArgAction::Set)]
|
||||
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
#[arg(long = "rocksdb.account-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
|
||||
pub account_history: bool,
|
||||
}
|
||||
|
||||
@@ -64,9 +50,9 @@ impl Default for RocksDbArgs {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
all: false,
|
||||
tx_hash: default_tx_hash_in_rocksdb(),
|
||||
storages_history: default_storages_history_in_rocksdb(),
|
||||
account_history: default_account_history_in_rocksdb(),
|
||||
tx_hash: default_rocksdb_flag(),
|
||||
storages_history: default_rocksdb_flag(),
|
||||
account_history: default_rocksdb_flag(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,25 +106,7 @@ mod tests {
|
||||
fn test_parse_all_flag() {
|
||||
let args = CommandParser::<RocksDbArgs>::parse_from(["reth", "--rocksdb.all"]).args;
|
||||
assert!(args.all);
|
||||
assert_eq!(args.tx_hash, default_tx_hash_in_rocksdb());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_defaults_match_storage_settings() {
|
||||
let args = RocksDbArgs::default();
|
||||
let settings = StorageSettings::base();
|
||||
assert_eq!(
|
||||
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
|
||||
"tx_hash default should match StorageSettings::base()"
|
||||
);
|
||||
assert_eq!(
|
||||
args.storages_history, settings.storages_history_in_rocksdb,
|
||||
"storages_history default should match StorageSettings::base()"
|
||||
);
|
||||
assert_eq!(
|
||||
args.account_history, settings.account_history_in_rocksdb,
|
||||
"account_history default should match StorageSettings::base()"
|
||||
);
|
||||
assert_eq!(args.tx_hash, default_rocksdb_flag());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -90,7 +90,7 @@ impl StaticFilesArgs {
|
||||
/// args.
|
||||
///
|
||||
/// If `minimal` is true, uses [`MINIMAL_BLOCKS_PER_FILE`] blocks per file as the default for
|
||||
/// all segments.
|
||||
/// headers, transactions, and receipts segments.
|
||||
pub fn merge_with_config(&self, config: StaticFilesConfig, minimal: bool) -> StaticFilesConfig {
|
||||
let minimal_blocks_per_file = minimal.then_some(MINIMAL_BLOCKS_PER_FILE);
|
||||
StaticFilesConfig {
|
||||
@@ -109,15 +109,12 @@ impl StaticFilesArgs {
|
||||
.or(config.blocks_per_file.receipts),
|
||||
transaction_senders: self
|
||||
.blocks_per_file_transaction_senders
|
||||
.or(minimal_blocks_per_file)
|
||||
.or(config.blocks_per_file.transaction_senders),
|
||||
account_change_sets: self
|
||||
.blocks_per_file_account_change_sets
|
||||
.or(minimal_blocks_per_file)
|
||||
.or(config.blocks_per_file.account_change_sets),
|
||||
storage_change_sets: self
|
||||
.blocks_per_file_storage_change_sets
|
||||
.or(minimal_blocks_per_file)
|
||||
.or(config.blocks_per_file.storage_change_sets),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -507,7 +507,7 @@ impl RethTransactionPoolConfig for TxPoolArgs {
|
||||
PoolConfig {
|
||||
local_transactions_config: LocalTransactionConfig {
|
||||
no_exemptions: self.no_locals,
|
||||
local_addresses: self.locals.iter().copied().collect(),
|
||||
local_addresses: self.locals.clone().into_iter().collect(),
|
||||
propagate_local_transactions: !self.no_local_transactions_propagation,
|
||||
},
|
||||
pending_limit: SubPoolLimit {
|
||||
|
||||
@@ -149,7 +149,21 @@ where
|
||||
let elapsed = start.elapsed();
|
||||
self.metrics.duration_seconds.record(elapsed);
|
||||
|
||||
output.debug_log(tip_block_number, deleted_entries, elapsed);
|
||||
let message = match output.progress {
|
||||
PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
|
||||
PruneProgress::Finished => "Pruner finished",
|
||||
};
|
||||
|
||||
debug!(
|
||||
target: "pruner",
|
||||
%tip_block_number,
|
||||
?elapsed,
|
||||
?deleted_entries,
|
||||
?limiter,
|
||||
?output,
|
||||
?stats,
|
||||
"{message}",
|
||||
);
|
||||
|
||||
self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ alloy-primitives.workspace = true
|
||||
derive_more.workspace = true
|
||||
strum = { workspace = true, features = ["derive"] }
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
modular-bitfield = { workspace = true, optional = true }
|
||||
serde = { workspace = true, features = ["derive"], optional = true }
|
||||
@@ -43,9 +42,8 @@ std = [
|
||||
"derive_more/std",
|
||||
"serde?/std",
|
||||
"serde_json/std",
|
||||
"strum/std",
|
||||
"thiserror/std",
|
||||
"tracing/std",
|
||||
"strum/std",
|
||||
]
|
||||
test-utils = [
|
||||
"std",
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use crate::{PruneCheckpoint, PruneMode, PruneSegment};
|
||||
use alloc::{format, string::ToString, vec::Vec};
|
||||
use alloc::vec::Vec;
|
||||
use alloy_primitives::{BlockNumber, TxNumber};
|
||||
use core::time::Duration;
|
||||
use derive_more::Display;
|
||||
use tracing::debug;
|
||||
|
||||
/// Pruner run output.
|
||||
#[derive(Debug)]
|
||||
@@ -20,49 +18,6 @@ impl From<PruneProgress> for PrunerOutput {
|
||||
}
|
||||
}
|
||||
|
||||
impl PrunerOutput {
|
||||
/// Logs a human-readable summary of the pruner run at DEBUG level.
|
||||
///
|
||||
/// Format: `"Pruner finished tip=24328929 deleted=10886 elapsed=148ms
|
||||
/// segments=AccountHistory[24318865, done] ..."`
|
||||
#[inline]
|
||||
pub fn debug_log(
|
||||
&self,
|
||||
tip_block_number: BlockNumber,
|
||||
deleted_entries: usize,
|
||||
elapsed: Duration,
|
||||
) {
|
||||
let message = match self.progress {
|
||||
PruneProgress::HasMoreData(_) => "Pruner interrupted, has more data",
|
||||
PruneProgress::Finished => "Pruner finished",
|
||||
};
|
||||
|
||||
let segments: Vec<_> = self
|
||||
.segments
|
||||
.iter()
|
||||
.filter(|(_, seg)| seg.pruned > 0)
|
||||
.map(|(segment, seg)| {
|
||||
let block = seg
|
||||
.checkpoint
|
||||
.and_then(|c| c.block_number)
|
||||
.map(|b| b.to_string())
|
||||
.unwrap_or_else(|| "?".to_string());
|
||||
let status = if seg.progress.is_finished() { "done" } else { "in_progress" };
|
||||
format!("{segment}[{block}, {status}]")
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!(
|
||||
target: "pruner",
|
||||
%tip_block_number,
|
||||
deleted_entries,
|
||||
?elapsed,
|
||||
segments = %segments.join(" "),
|
||||
"{message}",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents information of a pruner run for a segment.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Display)]
|
||||
#[display("(table={segment}, pruned={pruned}, status={progress})")]
|
||||
|
||||
@@ -318,9 +318,9 @@ where
|
||||
.map(|inner| {
|
||||
let full_log = alloy_rpc_types_eth::Log {
|
||||
inner,
|
||||
block_hash: Some(current_block.hash()),
|
||||
block_number: Some(current_block.number()),
|
||||
block_timestamp: Some(current_block.timestamp()),
|
||||
block_hash: None,
|
||||
block_number: None,
|
||||
block_timestamp: None,
|
||||
transaction_hash: Some(*item.tx.tx_hash()),
|
||||
transaction_index: Some(tx_index as u64),
|
||||
log_index: Some(log_index),
|
||||
|
||||
@@ -63,9 +63,9 @@ impl StorageSettings {
|
||||
transaction_senders_in_static_files: true,
|
||||
account_changesets_in_static_files: true,
|
||||
storage_changesets_in_static_files: true,
|
||||
storages_history_in_rocksdb: true,
|
||||
storages_history_in_rocksdb: false,
|
||||
transaction_hash_numbers_in_rocksdb: true,
|
||||
account_history_in_rocksdb: true,
|
||||
account_history_in_rocksdb: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,9 +20,9 @@ pub type DupCursorMutTy<TX, T> = <TX as DbTxMut>::DupCursorMut<T>;
|
||||
/// Read only transaction
|
||||
pub trait DbTx: Debug + Send {
|
||||
/// Cursor type for this read-only transaction
|
||||
type Cursor<T: Table>: DbCursorRO<T> + Send;
|
||||
type Cursor<T: Table>: DbCursorRO<T> + Send + Sync;
|
||||
/// `DupCursor` type for this read-only transaction
|
||||
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send;
|
||||
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send + Sync;
|
||||
|
||||
/// Get value by an owned key
|
||||
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, DatabaseError>;
|
||||
@@ -51,13 +51,14 @@ pub trait DbTx: Debug + Send {
|
||||
/// Read write transaction that allows writing to database
|
||||
pub trait DbTxMut: Send {
|
||||
/// Read-Write Cursor type
|
||||
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send;
|
||||
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send + Sync;
|
||||
/// Read-Write `DupCursor` type
|
||||
type DupCursorMut<T: DupSort>: DbDupCursorRW<T>
|
||||
+ DbCursorRW<T>
|
||||
+ DbDupCursorRO<T>
|
||||
+ DbCursorRO<T>
|
||||
+ Send;
|
||||
+ Send
|
||||
+ Sync;
|
||||
|
||||
/// Put value to database
|
||||
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
|
||||
|
||||
@@ -6,12 +6,7 @@ use alloy_primitives::{keccak256, map::HashMap, Address, B256, U256};
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_codecs::Compact;
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
BlockNumberList, DatabaseError,
|
||||
};
|
||||
use reth_db_api::{tables, transaction::DbTxMut, DatabaseError};
|
||||
use reth_etl::Collector;
|
||||
use reth_execution_errors::StateRootError;
|
||||
use reth_primitives_traits::{
|
||||
@@ -19,11 +14,11 @@ use reth_primitives_traits::{
|
||||
};
|
||||
use reth_provider::{
|
||||
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
|
||||
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, EitherWriter,
|
||||
ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider,
|
||||
MetadataWriter, NodePrimitivesProvider, OriginalValuesKnown, ProviderError, RevertsInit,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
|
||||
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
|
||||
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
|
||||
HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider, MetadataWriter,
|
||||
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter,
|
||||
StateWriteConfig, StateWriter, StaticFileProviderFactory, StorageSettings,
|
||||
StorageSettingsCache, TrieWriter,
|
||||
};
|
||||
use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
@@ -108,9 +103,6 @@ where
|
||||
+ TrieWriter
|
||||
+ MetadataWriter
|
||||
+ ChainSpecProvider
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider
|
||||
+ AsRef<PF::ProviderRW>,
|
||||
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
|
||||
{
|
||||
@@ -146,9 +138,6 @@ where
|
||||
+ TrieWriter
|
||||
+ MetadataWriter
|
||||
+ ChainSpecProvider
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider
|
||||
+ AsRef<PF::ProviderRW>,
|
||||
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
|
||||
{
|
||||
@@ -397,64 +386,37 @@ where
|
||||
}
|
||||
|
||||
/// Inserts history indices for genesis accounts and storage.
|
||||
///
|
||||
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
|
||||
/// using [`EitherWriter`] to abstract over the storage backend.
|
||||
pub fn insert_genesis_history<'a, 'b, Provider>(
|
||||
provider: &Provider,
|
||||
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
Provider: DBProvider<Tx: DbTxMut>
|
||||
+ HistoryWriter
|
||||
+ ChainSpecProvider
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider,
|
||||
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter + ChainSpecProvider,
|
||||
{
|
||||
let genesis_block_number = provider.chain_spec().genesis_header().number();
|
||||
insert_history(provider, alloc, genesis_block_number)
|
||||
}
|
||||
|
||||
/// Inserts history indices for genesis accounts and storage.
|
||||
///
|
||||
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
|
||||
/// using [`EitherWriter`] to abstract over the storage backend.
|
||||
pub fn insert_history<'a, 'b, Provider>(
|
||||
provider: &Provider,
|
||||
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
|
||||
block: u64,
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
Provider: DBProvider<Tx: DbTxMut>
|
||||
+ HistoryWriter
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider,
|
||||
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter,
|
||||
{
|
||||
provider.with_rocksdb_batch(|batch| {
|
||||
let mut writer = EitherWriter::new_accounts_history(provider, batch)?;
|
||||
let list = BlockNumberList::new([block]).expect("single block always fits");
|
||||
for (addr, _) in alloc.clone() {
|
||||
writer.upsert_account_history(ShardedKey::last(*addr), &list)?;
|
||||
}
|
||||
trace!(target: "reth::cli", "Inserted account history");
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
let account_transitions = alloc.clone().map(|(addr, _)| (*addr, [block]));
|
||||
provider.insert_account_history_index(account_transitions)?;
|
||||
|
||||
provider.with_rocksdb_batch(|batch| {
|
||||
let mut writer = EitherWriter::new_storages_history(provider, batch)?;
|
||||
let list = BlockNumberList::new([block]).expect("single block always fits");
|
||||
for (addr, account) in alloc {
|
||||
if let Some(storage) = &account.storage {
|
||||
for key in storage.keys() {
|
||||
writer.upsert_storage_history(StorageShardedKey::last(*addr, *key), &list)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!(target: "reth::cli", "Inserted storage history");
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
trace!(target: "reth::cli", "Inserted account history");
|
||||
|
||||
let storage_transitions = alloc
|
||||
.filter_map(|(addr, account)| account.storage.as_ref().map(|storage| (addr, storage)))
|
||||
.flat_map(|(addr, storage)| storage.keys().map(|key| ((*addr, *key), [block])));
|
||||
provider.insert_storage_history_index(storage_transitions)?;
|
||||
|
||||
trace!(target: "reth::cli", "Inserted storage history");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -530,9 +492,6 @@ where
|
||||
+ HashingWriter
|
||||
+ TrieWriter
|
||||
+ StateWriter
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider
|
||||
+ AsRef<Provider>,
|
||||
{
|
||||
if etl_config.file_size == 0 {
|
||||
@@ -669,9 +628,6 @@ where
|
||||
+ HashingWriter
|
||||
+ HistoryWriter
|
||||
+ StateWriter
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory
|
||||
+ NodePrimitivesProvider
|
||||
+ AsRef<Provider>,
|
||||
{
|
||||
let accounts_len = collector.len();
|
||||
@@ -932,59 +888,27 @@ mod tests {
|
||||
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
|
||||
init_genesis(&factory).unwrap();
|
||||
|
||||
let expected_accounts = vec![
|
||||
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
];
|
||||
let expected_storages = vec![(
|
||||
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
|
||||
IntegerList::new([0]).unwrap(),
|
||||
)];
|
||||
let provider = factory.provider().unwrap();
|
||||
|
||||
let collect_from_mdbx = |factory: &ProviderFactory<MockNodeTypesWithDB>| {
|
||||
let provider = factory.provider().unwrap();
|
||||
let tx = provider.tx_ref();
|
||||
(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx).unwrap(),
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx).unwrap(),
|
||||
)
|
||||
};
|
||||
let tx = provider.tx_ref();
|
||||
|
||||
#[cfg(feature = "edge")]
|
||||
{
|
||||
let settings = factory.cached_storage_settings();
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
assert_eq!(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
|
||||
.expect("failed to collect"),
|
||||
vec![
|
||||
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap())
|
||||
],
|
||||
);
|
||||
|
||||
let collect_rocksdb = |rocksdb: &reth_provider::providers::RocksDBProvider| {
|
||||
(
|
||||
rocksdb
|
||||
.iter::<tables::AccountsHistory>()
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap(),
|
||||
rocksdb
|
||||
.iter::<tables::StoragesHistory>()
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap(),
|
||||
)
|
||||
};
|
||||
|
||||
let (accounts, storages) = if settings.account_history_in_rocksdb {
|
||||
collect_rocksdb(&rocksdb)
|
||||
} else {
|
||||
collect_from_mdbx(&factory)
|
||||
};
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "edge"))]
|
||||
{
|
||||
let (accounts, storages) = collect_from_mdbx(&factory);
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
assert_eq!(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
|
||||
.expect("failed to collect"),
|
||||
vec![(
|
||||
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
|
||||
IntegerList::new([0]).unwrap()
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -137,8 +137,7 @@ where
|
||||
for (k, _, v, _) in input {
|
||||
crsr.append(k, &v).expect("submit");
|
||||
}
|
||||
drop(crsr);
|
||||
tx.commit().unwrap()
|
||||
tx.inner.commit().unwrap()
|
||||
},
|
||||
)
|
||||
});
|
||||
@@ -158,8 +157,8 @@ where
|
||||
let (k, _, v, _) = input.get(index).unwrap().clone();
|
||||
crsr.insert(k, &v).expect("submit");
|
||||
}
|
||||
drop(crsr);
|
||||
tx.commit().unwrap()
|
||||
|
||||
tx.inner.commit().unwrap()
|
||||
},
|
||||
)
|
||||
});
|
||||
@@ -220,8 +219,7 @@ where
|
||||
for (k, _, v, _) in input {
|
||||
crsr.append_dup(k, v).expect("submit");
|
||||
}
|
||||
drop(crsr);
|
||||
tx.commit().unwrap()
|
||||
tx.inner.commit().unwrap()
|
||||
},
|
||||
)
|
||||
});
|
||||
@@ -241,7 +239,7 @@ where
|
||||
let (k, _, v, _) = input.get(index).unwrap().clone();
|
||||
tx.put::<T>(k, v).unwrap();
|
||||
}
|
||||
tx.commit().unwrap()
|
||||
tx.inner.commit().unwrap();
|
||||
},
|
||||
)
|
||||
});
|
||||
|
||||
@@ -16,9 +16,10 @@ use reth_db_api::{
|
||||
cursor::DbCursorRW,
|
||||
database::Database,
|
||||
table::{Table, TableRow},
|
||||
transaction::{DbTx, DbTxMut},
|
||||
transaction::DbTxMut,
|
||||
};
|
||||
use reth_fs_util as fs;
|
||||
use std::hint::black_box;
|
||||
|
||||
mod utils;
|
||||
use utils::*;
|
||||
@@ -177,13 +178,17 @@ fn append<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value
|
||||
where
|
||||
T: Table,
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
let mut crsr = tx.cursor_write::<T>().expect("cursor");
|
||||
for (k, v) in input {
|
||||
crsr.append(k, &v).expect("submit");
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
let mut crsr = tx.cursor_write::<T>().expect("cursor");
|
||||
black_box({
|
||||
for (k, v) in input {
|
||||
crsr.append(k, &v).expect("submit");
|
||||
}
|
||||
|
||||
tx.inner.commit().unwrap()
|
||||
});
|
||||
}
|
||||
drop(crsr);
|
||||
tx.commit().unwrap();
|
||||
db
|
||||
}
|
||||
|
||||
@@ -191,13 +196,17 @@ fn insert<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value
|
||||
where
|
||||
T: Table,
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
let mut crsr = tx.cursor_write::<T>().expect("cursor");
|
||||
for (k, v) in input {
|
||||
crsr.insert(k, &v).expect("submit");
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
let mut crsr = tx.cursor_write::<T>().expect("cursor");
|
||||
black_box({
|
||||
for (k, v) in input {
|
||||
crsr.insert(k, &v).expect("submit");
|
||||
}
|
||||
|
||||
tx.inner.commit().unwrap()
|
||||
});
|
||||
}
|
||||
drop(crsr);
|
||||
tx.commit().unwrap();
|
||||
db
|
||||
}
|
||||
|
||||
@@ -205,11 +214,16 @@ fn put<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value)>)
|
||||
where
|
||||
T: Table,
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
for (k, v) in input {
|
||||
tx.put::<T>(k, v).expect("submit");
|
||||
{
|
||||
let tx = db.tx_mut().expect("tx");
|
||||
black_box({
|
||||
for (k, v) in input {
|
||||
tx.put::<T>(k, v).expect("submit");
|
||||
}
|
||||
|
||||
tx.inner.commit().unwrap()
|
||||
});
|
||||
}
|
||||
tx.commit().unwrap();
|
||||
db
|
||||
}
|
||||
|
||||
@@ -229,11 +243,11 @@ where
|
||||
T: Table,
|
||||
{
|
||||
db.view(|tx| {
|
||||
let table_db = tx.inner().open_db(Some(T::NAME)).map_err(|_| "Could not open db.").unwrap();
|
||||
let table_db = tx.inner.open_db(Some(T::NAME)).map_err(|_| "Could not open db.").unwrap();
|
||||
|
||||
println!(
|
||||
"{:?}\n",
|
||||
tx.inner()
|
||||
tx.inner
|
||||
.db_stat(table_db.dbi())
|
||||
.map_err(|_| format!("Could not find table: {}", T::NAME))
|
||||
.map(|stats| {
|
||||
|
||||
@@ -5,7 +5,7 @@ use alloy_primitives::Bytes;
|
||||
use reth_db::{test_utils::create_test_rw_db_with_path, DatabaseEnv};
|
||||
use reth_db_api::{
|
||||
table::{Compress, Encode, Table, TableRow},
|
||||
transaction::{DbTx, DbTxMut},
|
||||
transaction::DbTxMut,
|
||||
Database,
|
||||
};
|
||||
use reth_fs_util as fs;
|
||||
@@ -68,7 +68,7 @@ where
|
||||
for (k, _, v, _) in pair.clone() {
|
||||
tx.put::<T>(k, v).expect("submit");
|
||||
}
|
||||
tx.commit().unwrap();
|
||||
tx.inner.commit().unwrap();
|
||||
}
|
||||
|
||||
db.into_inner_db()
|
||||
|
||||
@@ -274,11 +274,10 @@ impl DatabaseMetrics for DatabaseEnv {
|
||||
let _ = self
|
||||
.view(|tx| {
|
||||
for table in Tables::ALL.iter().map(Tables::name) {
|
||||
let table_db =
|
||||
tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
|
||||
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
|
||||
|
||||
let stats = tx
|
||||
.inner()
|
||||
.inner
|
||||
.db_stat(table_db.dbi())
|
||||
.wrap_err(format!("Could not find table: {table}"))?;
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
|
||||
#[derive(Debug)]
|
||||
pub struct Tx<K: TransactionKind> {
|
||||
/// Libmdbx-sys transaction.
|
||||
inner: Transaction<K>,
|
||||
pub inner: Transaction<K>,
|
||||
|
||||
/// Cached MDBX DBIs for reuse.
|
||||
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
|
||||
@@ -62,11 +62,6 @@ impl<K: TransactionKind> Tx<K> {
|
||||
Ok(Self { inner, dbis, metrics_handler })
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner libmdbx transaction.
|
||||
pub const fn inner(&self) -> &Transaction<K> {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
/// Gets this transaction ID.
|
||||
pub fn id(&self) -> reth_libmdbx::Result<u64> {
|
||||
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
|
||||
|
||||
@@ -2,29 +2,49 @@ use crate::{Error, TransactionKind};
|
||||
use derive_more::{Debug, Deref, DerefMut};
|
||||
use std::{borrow::Cow, slice};
|
||||
|
||||
/// Implement this to be able to decode data values
|
||||
pub trait TableObject: Sized {
|
||||
/// Decodes the object from the given bytes.
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error>;
|
||||
/// A marker trait for types that can be deserialized from a database value
|
||||
/// without borrowing from the transaction.
|
||||
///
|
||||
/// Types implementing this trait can be used with iterators that need to
|
||||
/// return owned values. This is automatically implemented for any type that
|
||||
/// implements [`TableObject<'a>`] for all lifetimes `'a`.
|
||||
pub trait TableObjectOwned: for<'de> TableObject<'de> {
|
||||
/// Decodes the object from the given bytes, without borrowing them.
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error> {
|
||||
<Self as TableObject<'_>>::decode_borrow(Cow::Borrowed(data_val))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TableObjectOwned for T where T: for<'de> TableObject<'de> {}
|
||||
|
||||
/// Decodes values read from the database into Rust types.
|
||||
///
|
||||
/// Implement this to be able to decode data values. The lifetime parameter `'a`
|
||||
/// allows types to borrow data from the transaction when appropriate (e.g.,
|
||||
/// `Cow<'a, [u8]>`).
|
||||
pub trait TableObject<'a>: Sized {
|
||||
/// Creates the object from a `Cow` of bytes. This allows for efficient
|
||||
/// handling of both owned and borrowed data.
|
||||
fn decode_borrow(data: Cow<'a, [u8]>) -> Result<Self, Error>;
|
||||
|
||||
/// Decodes the value directly from the given MDBX_val pointer.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This should only in the context of an MDBX transaction.
|
||||
/// This should only be called in the context of an MDBX transaction.
|
||||
#[doc(hidden)]
|
||||
unsafe fn decode_val<K: TransactionKind>(
|
||||
_: *const ffi::MDBX_txn,
|
||||
tx: *const ffi::MDBX_txn,
|
||||
data_val: ffi::MDBX_val,
|
||||
) -> Result<Self, Error> {
|
||||
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
|
||||
Self::decode(s)
|
||||
let cow = unsafe { Cow::<'a, [u8]>::decode_val::<K>(tx, data_val)? };
|
||||
Self::decode_borrow(cow)
|
||||
}
|
||||
}
|
||||
|
||||
impl TableObject for Cow<'_, [u8]> {
|
||||
fn decode(_: &[u8]) -> Result<Self, Error> {
|
||||
unreachable!()
|
||||
impl<'a> TableObject<'a> for Cow<'a, [u8]> {
|
||||
fn decode_borrow(data: Cow<'a, [u8]>) -> Result<Self, Error> {
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
@@ -51,14 +71,22 @@ impl TableObject for Cow<'_, [u8]> {
|
||||
}
|
||||
}
|
||||
|
||||
impl TableObject for Vec<u8> {
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error> {
|
||||
Ok(data_val.to_vec())
|
||||
impl TableObject<'_> for Vec<u8> {
|
||||
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
|
||||
Ok(data.into_owned())
|
||||
}
|
||||
|
||||
unsafe fn decode_val<K: TransactionKind>(
|
||||
_tx: *const ffi::MDBX_txn,
|
||||
data_val: ffi::MDBX_val,
|
||||
) -> Result<Self, Error> {
|
||||
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
|
||||
Ok(s.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
impl TableObject for () {
|
||||
fn decode(_: &[u8]) -> Result<Self, Error> {
|
||||
impl TableObject<'_> for () {
|
||||
fn decode_borrow(_: Cow<'_, [u8]>) -> Result<Self, Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -74,19 +102,26 @@ impl TableObject for () {
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deref, DerefMut)]
|
||||
pub struct ObjectLength(pub usize);
|
||||
|
||||
impl TableObject for ObjectLength {
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error> {
|
||||
Ok(Self(data_val.len()))
|
||||
impl TableObject<'_> for ObjectLength {
|
||||
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
|
||||
Ok(Self(data.len()))
|
||||
}
|
||||
|
||||
unsafe fn decode_val<K: TransactionKind>(
|
||||
_tx: *const ffi::MDBX_txn,
|
||||
data_val: ffi::MDBX_val,
|
||||
) -> Result<Self, Error> {
|
||||
Ok(Self(data_val.iov_len))
|
||||
}
|
||||
}
|
||||
|
||||
impl<const LEN: usize> TableObject for [u8; LEN] {
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error> {
|
||||
if data_val.len() != LEN {
|
||||
return Err(Error::DecodeErrorLenDiff)
|
||||
impl<const LEN: usize> TableObject<'_> for [u8; LEN] {
|
||||
fn decode_borrow(data: Cow<'_, [u8]>) -> Result<Self, Error> {
|
||||
if data.len() != LEN {
|
||||
return Err(Error::DecodeErrorLenDiff);
|
||||
}
|
||||
let mut a = [0; LEN];
|
||||
a[..].copy_from_slice(data_val);
|
||||
a[..].copy_from_slice(&data);
|
||||
Ok(a)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{
|
||||
flags::*,
|
||||
mdbx_try_optional,
|
||||
transaction::{TransactionKind, RW},
|
||||
TableObject, Transaction,
|
||||
TableObjectOwned, Transaction,
|
||||
};
|
||||
use ffi::{
|
||||
MDBX_cursor_op, MDBX_FIRST, MDBX_FIRST_DUP, MDBX_GET_BOTH, MDBX_GET_BOTH_RANGE,
|
||||
@@ -11,7 +11,7 @@ use ffi::{
|
||||
MDBX_NEXT_MULTIPLE, MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE,
|
||||
MDBX_PREV_NODUP, MDBX_SET, MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE,
|
||||
};
|
||||
use std::{borrow::Cow, ffi::c_void, fmt, marker::PhantomData, mem, ptr};
|
||||
use std::{ffi::c_void, fmt, marker::PhantomData, mem, ptr};
|
||||
|
||||
/// A cursor for navigating the items within a database.
|
||||
pub struct Cursor<K>
|
||||
@@ -58,8 +58,8 @@ where
|
||||
self.cursor
|
||||
}
|
||||
|
||||
/// Returns an iterator over the raw key value slices.
|
||||
pub fn iter_slices<'a>(self) -> IntoIter<K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
|
||||
/// Returns an iterator over the raw key value bytes (as owned `Vec<u8>`).
|
||||
pub fn iter_slices(self) -> IntoIter<K, Vec<u8>, Vec<u8>> {
|
||||
self.into_iter()
|
||||
}
|
||||
|
||||
@@ -67,8 +67,8 @@ where
|
||||
#[expect(clippy::should_implement_trait)]
|
||||
pub fn into_iter<Key, Value>(self) -> IntoIter<K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
IntoIter::new(self, MDBX_NEXT, MDBX_NEXT)
|
||||
}
|
||||
@@ -82,8 +82,8 @@ where
|
||||
op: MDBX_cursor_op,
|
||||
) -> Result<(Option<Key>, Value, bool)>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
unsafe {
|
||||
let mut key_val = slice_to_val(key);
|
||||
@@ -119,7 +119,7 @@ where
|
||||
op: MDBX_cursor_op,
|
||||
) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let (_, v, _) = mdbx_try_optional!(self.get::<(), Value>(key, data, op));
|
||||
|
||||
@@ -133,8 +133,8 @@ where
|
||||
op: MDBX_cursor_op,
|
||||
) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let (k, v, _) = mdbx_try_optional!(self.get(key, data, op));
|
||||
|
||||
@@ -144,8 +144,8 @@ where
|
||||
/// Position at first key/data item.
|
||||
pub fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_FIRST)
|
||||
}
|
||||
@@ -153,7 +153,7 @@ where
|
||||
/// [`DatabaseFlags::DUP_SORT`]-only: Position at first data item of current key.
|
||||
pub fn first_dup<Value>(&mut self) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(None, None, MDBX_FIRST_DUP)
|
||||
}
|
||||
@@ -161,7 +161,7 @@ where
|
||||
/// [`DatabaseFlags::DUP_SORT`]-only: Position at key/data pair.
|
||||
pub fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(Some(k), Some(v), MDBX_GET_BOTH)
|
||||
}
|
||||
@@ -170,7 +170,7 @@ where
|
||||
/// equal to specified data.
|
||||
pub fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(Some(k), Some(v), MDBX_GET_BOTH_RANGE)
|
||||
}
|
||||
@@ -178,8 +178,8 @@ where
|
||||
/// Return key/data at current cursor position.
|
||||
pub fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_GET_CURRENT)
|
||||
}
|
||||
@@ -188,7 +188,7 @@ where
|
||||
/// Move cursor to prepare for [`Self::next_multiple()`].
|
||||
pub fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(None, None, MDBX_GET_MULTIPLE)
|
||||
}
|
||||
@@ -196,8 +196,8 @@ where
|
||||
/// Position at last key/data item.
|
||||
pub fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_LAST)
|
||||
}
|
||||
@@ -205,7 +205,7 @@ where
|
||||
/// DupSort-only: Position at last data item of current key.
|
||||
pub fn last_dup<Value>(&mut self) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(None, None, MDBX_LAST_DUP)
|
||||
}
|
||||
@@ -214,8 +214,8 @@ where
|
||||
#[expect(clippy::should_implement_trait)]
|
||||
pub fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_NEXT)
|
||||
}
|
||||
@@ -223,8 +223,8 @@ where
|
||||
/// [`DatabaseFlags::DUP_SORT`]-only: Position at next data item of current key.
|
||||
pub fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_NEXT_DUP)
|
||||
}
|
||||
@@ -233,8 +233,8 @@ where
|
||||
/// cursor position. Move cursor to prepare for `MDBX_NEXT_MULTIPLE`.
|
||||
pub fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_NEXT_MULTIPLE)
|
||||
}
|
||||
@@ -242,8 +242,8 @@ where
|
||||
/// Position at first data item of next key.
|
||||
pub fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_NEXT_NODUP)
|
||||
}
|
||||
@@ -251,8 +251,8 @@ where
|
||||
/// Position at previous data item.
|
||||
pub fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_PREV)
|
||||
}
|
||||
@@ -260,8 +260,8 @@ where
|
||||
/// [`DatabaseFlags::DUP_SORT`]-only: Position at previous data item of current key.
|
||||
pub fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_PREV_DUP)
|
||||
}
|
||||
@@ -269,8 +269,8 @@ where
|
||||
/// Position at last data item of previous key.
|
||||
pub fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_PREV_NODUP)
|
||||
}
|
||||
@@ -278,7 +278,7 @@ where
|
||||
/// Position at specified key.
|
||||
pub fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
|
||||
where
|
||||
Value: TableObject,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_value(Some(key), None, MDBX_SET)
|
||||
}
|
||||
@@ -286,8 +286,8 @@ where
|
||||
/// Position at specified key, return both key and data.
|
||||
pub fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(Some(key), None, MDBX_SET_KEY)
|
||||
}
|
||||
@@ -295,8 +295,8 @@ where
|
||||
/// Position at first key greater than or equal to specified key.
|
||||
pub fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(Some(key), None, MDBX_SET_RANGE)
|
||||
}
|
||||
@@ -305,8 +305,8 @@ where
|
||||
/// duplicate data items.
|
||||
pub fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
self.get_full(None, None, MDBX_PREV_MULTIPLE)
|
||||
}
|
||||
@@ -322,8 +322,8 @@ where
|
||||
/// exactly and [true] if the next pair was returned.
|
||||
pub fn set_lowerbound<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(bool, Key, Value)>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let (k, v, found) = mdbx_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND));
|
||||
|
||||
@@ -340,8 +340,8 @@ where
|
||||
/// the next key.
|
||||
pub fn iter<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
Iter::new(self, ffi::MDBX_NEXT, ffi::MDBX_NEXT)
|
||||
}
|
||||
@@ -353,8 +353,8 @@ where
|
||||
/// the next key.
|
||||
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
Iter::new(self, ffi::MDBX_FIRST, ffi::MDBX_NEXT)
|
||||
}
|
||||
@@ -366,8 +366,8 @@ where
|
||||
/// the next key.
|
||||
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let res: Result<Option<((), ())>> = self.set_range(key);
|
||||
if let Err(error) = res {
|
||||
@@ -381,8 +381,8 @@ where
|
||||
/// Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
IterDup::new(self, ffi::MDBX_NEXT)
|
||||
}
|
||||
@@ -391,8 +391,8 @@ where
|
||||
/// database. Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
IterDup::new(self, ffi::MDBX_FIRST)
|
||||
}
|
||||
@@ -401,8 +401,8 @@ where
|
||||
/// key. Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let res: Result<Option<((), ())>> = self.set_range(key);
|
||||
if let Err(error) = res {
|
||||
@@ -414,8 +414,8 @@ where
|
||||
/// Iterate over the duplicates of the item in the database with the given key.
|
||||
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
let res: Result<Option<()>> = self.set(key);
|
||||
match res {
|
||||
@@ -510,8 +510,8 @@ unsafe impl<K> Sync for Cursor<K> where K: TransactionKind {}
|
||||
pub enum IntoIter<K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// An iterator that returns an error on every call to [`Iter::next()`].
|
||||
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
|
||||
@@ -541,8 +541,8 @@ where
|
||||
impl<K, Key, Value> IntoIter<K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(cursor: Cursor<K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
|
||||
@@ -553,8 +553,8 @@ where
|
||||
impl<K, Key, Value> Iterator for IntoIter<K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
type Item = Result<(Key, Value)>;
|
||||
|
||||
@@ -601,8 +601,8 @@ where
|
||||
pub enum Iter<'cur, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// An iterator that returns an error on every call to [`Iter::next()`].
|
||||
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
|
||||
@@ -632,8 +632,8 @@ where
|
||||
impl<'cur, K, Key, Value> Iter<'cur, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(
|
||||
@@ -648,8 +648,8 @@ where
|
||||
impl<K, Key, Value> Iterator for Iter<'_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
type Item = Result<(Key, Value)>;
|
||||
|
||||
@@ -698,8 +698,8 @@ where
|
||||
pub enum IterDup<'cur, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// An iterator that returns an error on every call to `Iter.next()`.
|
||||
/// Cursor.iter*() creates an Iter of this type when MDBX returns an error
|
||||
@@ -726,8 +726,8 @@ where
|
||||
impl<'cur, K, Key, Value> IterDup<'cur, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(cursor: &'cur mut Cursor<K>, op: MDBX_cursor_op) -> Self {
|
||||
@@ -738,8 +738,8 @@ where
|
||||
impl<K, Key, Value> fmt::Debug for IterDup<'_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("IterDup").finish()
|
||||
@@ -749,8 +749,8 @@ where
|
||||
impl<K, Key, Value> Iterator for IterDup<'_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
Value: TableObjectOwned,
|
||||
{
|
||||
type Item = IntoIter<K, Key, Value>;
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
pub extern crate reth_mdbx_sys as ffi;
|
||||
|
||||
pub use crate::{
|
||||
codec::*,
|
||||
codec::{ObjectLength, TableObject, TableObjectOwned},
|
||||
cursor::{Cursor, Iter, IterDup},
|
||||
database::Database,
|
||||
environment::{
|
||||
@@ -21,7 +21,8 @@ pub use crate::{
|
||||
},
|
||||
error::{Error, Result},
|
||||
flags::*,
|
||||
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
|
||||
transaction::{CommitLatency, Transaction, TransactionKind, TransactionPtr, RO, RW},
|
||||
tx_access::TxPtrAccess,
|
||||
};
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
@@ -34,6 +35,7 @@ mod environment;
|
||||
mod error;
|
||||
mod flags;
|
||||
mod transaction;
|
||||
mod tx_access;
|
||||
mod txn_manager;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -3,8 +3,9 @@ use crate::{
|
||||
environment::Environment,
|
||||
error::{mdbx_result, Result},
|
||||
flags::{DatabaseFlags, WriteFlags},
|
||||
tx_access::TxPtrAccess,
|
||||
txn_manager::{TxnManagerMessage, TxnPtr},
|
||||
Cursor, Error, Stat, TableObject,
|
||||
Cursor, Error, Stat, TableObjectOwned,
|
||||
};
|
||||
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
@@ -152,7 +153,7 @@ where
|
||||
/// [None] will be returned.
|
||||
pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
|
||||
where
|
||||
Key: TableObject,
|
||||
Key: TableObjectOwned,
|
||||
{
|
||||
let key_val: ffi::MDBX_val =
|
||||
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
|
||||
@@ -537,8 +538,12 @@ impl Transaction<RW> {
|
||||
}
|
||||
|
||||
/// A shareable pointer to an MDBX transaction.
|
||||
///
|
||||
/// This type provides synchronized access to the underlying MDBX transaction
|
||||
/// pointer via a mutex. It implements [`TxPtrAccess`] to enable abstraction
|
||||
/// over different access patterns.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TransactionPtr {
|
||||
pub struct TransactionPtr {
|
||||
txn: *mut ffi::MDBX_txn,
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
timed_out: Arc<AtomicBool>,
|
||||
@@ -598,7 +603,7 @@ impl TransactionPtr {
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
// let _lck = self.lock();
|
||||
let _lck = self.lock();
|
||||
|
||||
// No race condition with the `TxnManager` timing out the transaction is possible here,
|
||||
// because we're taking a lock for any actions on the transaction pointer, including a call
|
||||
@@ -716,6 +721,22 @@ unsafe impl Send for TransactionPtr {}
|
||||
// SAFETY: Access to the transaction is synchronized by the lock.
|
||||
unsafe impl Sync for TransactionPtr {}
|
||||
|
||||
impl TxPtrAccess for TransactionPtr {
|
||||
fn with_txn_ptr<F, R>(&self, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> R,
|
||||
{
|
||||
self.txn_execute_fail_on_timeout(f)
|
||||
}
|
||||
|
||||
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> R,
|
||||
{
|
||||
self.txn_execute_renew_on_timeout(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
72
crates/storage/libmdbx-rs/src/tx_access.rs
Normal file
72
crates/storage/libmdbx-rs/src/tx_access.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
//! Transaction pointer access abstraction.
|
||||
//!
|
||||
//! This module provides the [`TxPtrAccess`] trait which abstracts over different ways
|
||||
//! of accessing the underlying MDBX transaction pointer. This enables future support
|
||||
//! for both synchronized (mutex-protected) and unsynchronized transaction types.
|
||||
//!
|
||||
//! # Design
|
||||
//!
|
||||
//! MDBX has strict requirements for transaction access:
|
||||
//! - All operations on a transaction must be totally ordered and non-concurrent
|
||||
//! - Read-write transactions can only be used from the thread that created them
|
||||
//!
|
||||
//! The current implementation uses a `Mutex` to enforce these requirements at runtime.
|
||||
//! This is correct and safe, but the synchronization overhead adds up in hot paths.
|
||||
//!
|
||||
//! This trait enables future implementations that can enforce these requirements at
|
||||
//! compile time instead (e.g., using `&mut self` for exclusive access), eliminating
|
||||
//! the runtime overhead for single-threaded workloads.
|
||||
|
||||
use crate::error::Result;
|
||||
use std::fmt;
|
||||
|
||||
mod sealed {
|
||||
use crate::transaction::TransactionPtr;
|
||||
|
||||
pub trait Sealed {}
|
||||
impl Sealed for TransactionPtr {}
|
||||
}
|
||||
|
||||
/// Trait for accessing the transaction pointer.
|
||||
///
|
||||
/// This trait abstracts over different ways transaction pointers are stored
|
||||
/// and accessed. It enables both synchronized (mutex-protected) and
|
||||
/// unsynchronized access patterns for transactions.
|
||||
///
|
||||
/// # Implementors
|
||||
///
|
||||
/// - [`TransactionPtr`](crate::transaction::TransactionPtr) - Synchronized access via mutex
|
||||
///
|
||||
/// # Future Extensions
|
||||
///
|
||||
/// This trait is designed to support future unsynchronized transaction types
|
||||
/// that use `&mut self` to enforce exclusive access at compile time, avoiding
|
||||
/// mutex overhead in single-threaded contexts.
|
||||
pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
|
||||
/// Execute a closure with the transaction pointer.
|
||||
///
|
||||
/// The closure receives a valid transaction pointer that can be used for
|
||||
/// MDBX operations. For synchronized implementations, this acquires the
|
||||
/// appropriate lock before executing the closure.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if the transaction has timed out or is otherwise
|
||||
/// invalid.
|
||||
fn with_txn_ptr<F, R>(&self, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> R;
|
||||
|
||||
/// Execute a closure with the transaction pointer, attempting to renew
|
||||
/// the transaction if it has timed out.
|
||||
///
|
||||
/// This is primarily used for cleanup operations (like closing cursors)
|
||||
/// that need to succeed even after a timeout. For implementations that
|
||||
/// don't support renewal, this falls back to [`with_txn_ptr`](Self::with_txn_ptr).
|
||||
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> R,
|
||||
{
|
||||
self.with_txn_ptr(f)
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
#![allow(missing_docs)]
|
||||
use reth_libmdbx::*;
|
||||
use std::borrow::Cow;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
@@ -324,22 +323,10 @@ fn test_put_del() {
|
||||
cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap();
|
||||
cursor.put(b"key3", b"val3", WriteFlags::empty()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
cursor.set_key(b"key2").unwrap(),
|
||||
Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8])))
|
||||
);
|
||||
assert_eq!(
|
||||
cursor.get_current().unwrap(),
|
||||
Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8])))
|
||||
);
|
||||
assert_eq!(cursor.set_key::<[u8; 4], [u8; 4]>(b"key2").unwrap(), Some((*b"key2", *b"val2")));
|
||||
assert_eq!(cursor.get_current::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key2", *b"val2")));
|
||||
|
||||
cursor.del(WriteFlags::empty()).unwrap();
|
||||
assert_eq!(
|
||||
cursor.get_current().unwrap(),
|
||||
Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8])))
|
||||
);
|
||||
assert_eq!(
|
||||
cursor.last().unwrap(),
|
||||
Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8])))
|
||||
);
|
||||
assert_eq!(cursor.get_current::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key3", *b"val3")));
|
||||
assert_eq!(cursor.last::<[u8; 4], [u8; 4]>().unwrap(), Some((*b"key3", *b"val3")));
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#![allow(missing_docs)]
|
||||
use reth_libmdbx::*;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
io::Write,
|
||||
sync::{Arc, Barrier},
|
||||
thread::{self, JoinHandle},
|
||||
@@ -274,8 +273,8 @@ fn test_concurrent_writers() {
|
||||
|
||||
for i in 0..n {
|
||||
assert_eq!(
|
||||
Cow::<Vec<u8>>::Owned(format!("{val}{i}").into_bytes()),
|
||||
txn.get(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap()
|
||||
format!("{val}{i}").into_bytes(),
|
||||
txn.get::<Vec<u8>>(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -627,31 +627,30 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
StateWriteConfig {
|
||||
write_receipts: !sf_ctx.write_receipts,
|
||||
write_account_changesets: !sf_ctx.write_account_changesets,
|
||||
write_storage_changesets: !sf_ctx.write_storage_changesets,
|
||||
},
|
||||
)?;
|
||||
timings.write_state += start.elapsed();
|
||||
|
||||
let trie_data = block.trie_data();
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
let start = Instant::now();
|
||||
self.write_hashed_state(&trie_data.hashed_state)?;
|
||||
timings.write_hashed_state += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
// Write all hashed state and trie updates in single batches.
|
||||
// Write all trie updates in a single batch.
|
||||
// This reduces cursor open/close overhead from N calls to 1.
|
||||
if save_mode.with_state() {
|
||||
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
|
||||
let start = Instant::now();
|
||||
let merged_hashed_state = HashedPostStateSorted::merge_batch(
|
||||
blocks.iter().rev().map(|b| b.trie_data().hashed_state),
|
||||
);
|
||||
if !merged_hashed_state.is_empty() {
|
||||
self.write_hashed_state(&merged_hashed_state)?;
|
||||
}
|
||||
timings.write_hashed_state += start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
let merged_trie =
|
||||
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
|
||||
let merged =
|
||||
TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
|
||||
if !merged_trie.is_empty() {
|
||||
self.write_trie_updates_sorted(&merged_trie)?;
|
||||
|
||||
if !merged.is_empty() {
|
||||
self.write_trie_updates_sorted(&merged)?;
|
||||
}
|
||||
timings.write_trie_updates += start.elapsed();
|
||||
}
|
||||
@@ -1359,7 +1358,7 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
|
||||
self.tx
|
||||
.cursor_dup_read::<tables::StorageChangeSets>()?
|
||||
.walk_range(storage_range)?
|
||||
.map(|r| r.map_err(Into::into))
|
||||
.map(|result| -> ProviderResult<_> { Ok(result?) })
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
@@ -1392,7 +1391,7 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
|
||||
self.tx
|
||||
.cursor_dup_read::<tables::StorageChangeSets>()?
|
||||
.walk_range(BlockNumberAddress::range(range))?
|
||||
.map(|r| r.map_err(Into::into))
|
||||
.map(|result| -> ProviderResult<_> { Ok(result?) })
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
@@ -1449,15 +1448,32 @@ impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
|
||||
&self,
|
||||
range: impl core::ops::RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
|
||||
if self.cached_storage_settings().account_changesets_in_static_files {
|
||||
self.static_file_provider.account_changesets_range(range)
|
||||
let range = to_range(range);
|
||||
let mut changesets = Vec::new();
|
||||
if self.cached_storage_settings().account_changesets_in_static_files &&
|
||||
let Some(highest) = self
|
||||
.static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
|
||||
{
|
||||
let static_end = range.end.min(highest + 1);
|
||||
if range.start < static_end {
|
||||
for block in range.start..static_end {
|
||||
let block_changesets = self.account_block_changeset(block)?;
|
||||
for changeset in block_changesets {
|
||||
changesets.push((block, changeset));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.tx
|
||||
.cursor_read::<tables::AccountChangeSets>()?
|
||||
.walk_range(to_range(range))?
|
||||
.map(|r| r.map_err(Into::into))
|
||||
.collect()
|
||||
// Fetch from database for blocks not in static files
|
||||
let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
|
||||
for entry in cursor.walk_range(range)? {
|
||||
let (block_num, account_before) = entry?;
|
||||
changesets.push((block_num, account_before));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(changesets)
|
||||
}
|
||||
|
||||
fn account_changeset_count(&self) -> ProviderResult<usize> {
|
||||
@@ -2288,55 +2304,52 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()> {
|
||||
// Write storage changes
|
||||
if config.write_storage_changesets {
|
||||
tracing::trace!("Writing storage changes");
|
||||
let mut storages_cursor =
|
||||
self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
|
||||
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
|
||||
let block_number = first_block + block_index as BlockNumber;
|
||||
tracing::trace!("Writing storage changes");
|
||||
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
|
||||
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
|
||||
let block_number = first_block + block_index as BlockNumber;
|
||||
|
||||
tracing::trace!(block_number, "Writing block change");
|
||||
// sort changes by address.
|
||||
storage_changes.par_sort_unstable_by_key(|a| a.address);
|
||||
let total_changes =
|
||||
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
|
||||
let mut changeset = Vec::with_capacity(total_changes);
|
||||
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
|
||||
let mut storage = storage_revert
|
||||
.into_iter()
|
||||
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
|
||||
.collect::<Vec<_>>();
|
||||
// sort storage slots by key.
|
||||
storage.par_sort_unstable_by_key(|a| a.0);
|
||||
tracing::trace!(block_number, "Writing block change");
|
||||
// sort changes by address.
|
||||
storage_changes.par_sort_unstable_by_key(|a| a.address);
|
||||
let total_changes =
|
||||
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
|
||||
let mut changeset = Vec::with_capacity(total_changes);
|
||||
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
|
||||
let mut storage = storage_revert
|
||||
.into_iter()
|
||||
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
|
||||
.collect::<Vec<_>>();
|
||||
// sort storage slots by key.
|
||||
storage.par_sort_unstable_by_key(|a| a.0);
|
||||
|
||||
// If we are writing the primary storage wipe transition, the pre-existing plain
|
||||
// storage state has to be taken from the database and written to storage
|
||||
// history. See [StorageWipe::Primary] for more details.
|
||||
//
|
||||
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
|
||||
// collecting wiped entries into a Vec like this, see
|
||||
// `write_storage_trie_changesets`.
|
||||
let mut wiped_storage = Vec::new();
|
||||
if wiped {
|
||||
tracing::trace!(?address, "Wiping storage");
|
||||
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
|
||||
wiped_storage.push((entry.key, entry.value));
|
||||
while let Some(entry) = storages_cursor.next_dup_val()? {
|
||||
wiped_storage.push((entry.key, entry.value))
|
||||
}
|
||||
// If we are writing the primary storage wipe transition, the pre-existing plain
|
||||
// storage state has to be taken from the database and written to storage history.
|
||||
// See [StorageWipe::Primary] for more details.
|
||||
//
|
||||
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
|
||||
// collecting wiped entries into a Vec like this, see
|
||||
// `write_storage_trie_changesets`.
|
||||
let mut wiped_storage = Vec::new();
|
||||
if wiped {
|
||||
tracing::trace!(?address, "Wiping storage");
|
||||
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
|
||||
wiped_storage.push((entry.key, entry.value));
|
||||
while let Some(entry) = storages_cursor.next_dup_val()? {
|
||||
wiped_storage.push((entry.key, entry.value))
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(?address, ?storage, "Writing storage reverts");
|
||||
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
|
||||
changeset.push(StorageBeforeTx { address, key, value });
|
||||
}
|
||||
}
|
||||
|
||||
let mut storage_changesets_writer =
|
||||
EitherWriter::new_storage_changesets(self, block_number)?;
|
||||
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
|
||||
tracing::trace!(?address, ?storage, "Writing storage reverts");
|
||||
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
|
||||
changeset.push(StorageBeforeTx { address, key, value });
|
||||
}
|
||||
}
|
||||
|
||||
let mut storage_changesets_writer =
|
||||
EitherWriter::new_storage_changesets(self, block_number)?;
|
||||
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
|
||||
}
|
||||
|
||||
if !config.write_account_changesets {
|
||||
@@ -3319,13 +3332,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Appends blocks with their execution state to the database.
|
||||
///
|
||||
/// **Note:** This function is only used in tests.
|
||||
///
|
||||
/// History indices are written to the appropriate backend based on storage settings:
|
||||
/// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
|
||||
///
|
||||
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
|
||||
fn append_blocks_with_state(
|
||||
&self,
|
||||
@@ -3383,31 +3389,8 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
|
||||
// Use pre-computed transitions for history indices since static file
|
||||
// writes aren't visible until commit.
|
||||
// Note: For MDBX we use insert_*_history_index. For RocksDB we use
|
||||
// append_*_history_shard which handles read-merge-write internally.
|
||||
let storage_settings = self.cached_storage_settings();
|
||||
if storage_settings.account_history_in_rocksdb {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for (address, blocks) in account_transitions {
|
||||
batch.append_account_history_shard(address, blocks)?;
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
} else {
|
||||
self.insert_account_history_index(account_transitions)?;
|
||||
}
|
||||
if storage_settings.storages_history_in_rocksdb {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, key), blocks) in storage_transitions {
|
||||
batch.append_storage_history_shard(address, key, blocks)?;
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
} else {
|
||||
self.insert_storage_history_index(storage_transitions)?;
|
||||
}
|
||||
self.insert_account_history_index(account_transitions)?;
|
||||
self.insert_storage_history_index(storage_transitions)?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
|
||||
|
||||
// Update pipeline progress
|
||||
|
||||
@@ -50,7 +50,7 @@ use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWri
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
|
||||
ops::{Deref, Range, RangeBounds, RangeInclusive},
|
||||
path::{Path, PathBuf},
|
||||
sync::{atomic::AtomicU64, mpsc, Arc},
|
||||
thread,
|
||||
@@ -615,13 +615,13 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let block_number = block.recovered_block().number();
|
||||
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
|
||||
|
||||
let changeset: Vec<_> = reverts
|
||||
.accounts
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
|
||||
.collect();
|
||||
w.append_account_changeset(changeset, block_number)?;
|
||||
for account_block_reverts in reverts.accounts {
|
||||
let changeset = account_block_reverts
|
||||
.into_iter()
|
||||
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
|
||||
.collect::<Vec<_>>();
|
||||
w.append_account_changeset(changeset, block_number)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -636,21 +636,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let block_number = block.recovered_block().number();
|
||||
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
|
||||
|
||||
let changeset: Vec<_> = reverts
|
||||
.storage
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.flat_map(|revert| {
|
||||
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
|
||||
StorageBeforeTx {
|
||||
address: revert.address,
|
||||
key: B256::new(key.to_be_bytes()),
|
||||
value: revert_to_slot.to_previous_value(),
|
||||
}
|
||||
for storage_block_reverts in reverts.storage {
|
||||
let changeset = storage_block_reverts
|
||||
.into_iter()
|
||||
.flat_map(|revert| {
|
||||
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
|
||||
StorageBeforeTx {
|
||||
address: revert.address,
|
||||
key: B256::new(key.to_be_bytes()),
|
||||
value: revert_to_slot.to_previous_value(),
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
w.append_storage_changeset(changeset, block_number)?;
|
||||
.collect::<Vec<_>>();
|
||||
w.append_storage_changeset(changeset, block_number)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1879,33 +1879,6 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
self.indexes.read().get(segment).map(|index| index.max_block)
|
||||
}
|
||||
|
||||
/// Converts a range to a bounded `RangeInclusive` capped to the highest static file block.
|
||||
///
|
||||
/// This is necessary because static file iteration beyond the tip would loop forever:
|
||||
/// blocks beyond the static file tip return `Ok(empty)` which is indistinguishable from
|
||||
/// blocks with no changes. We cap the end to the highest available block regardless of
|
||||
/// whether the input was unbounded or an explicit large value like `BlockNumber::MAX`.
|
||||
fn bound_range(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
segment: StaticFileSegment,
|
||||
) -> RangeInclusive<BlockNumber> {
|
||||
let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
|
||||
|
||||
let start = match range.start_bound() {
|
||||
Bound::Included(&n) => n,
|
||||
Bound::Excluded(&n) => n.saturating_add(1),
|
||||
Bound::Unbounded => 0,
|
||||
};
|
||||
let end = match range.end_bound() {
|
||||
Bound::Included(&n) => n.min(highest_block),
|
||||
Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
|
||||
Bound::Unbounded => highest_block,
|
||||
};
|
||||
|
||||
start..=end
|
||||
}
|
||||
|
||||
/// Gets the highest static file transaction.
|
||||
///
|
||||
/// If there is nothing on disk for the given segment, this will return [`None`].
|
||||
@@ -2381,7 +2354,6 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
&self,
|
||||
range: impl core::ops::RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
|
||||
let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
|
||||
self.walk_account_changeset_range(range).collect()
|
||||
}
|
||||
|
||||
@@ -2501,7 +2473,6 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
|
||||
let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
|
||||
self.walk_storage_changeset_range(range).collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -136,16 +136,10 @@ pub struct StateWriteConfig {
|
||||
pub write_receipts: bool,
|
||||
/// Whether to write account changesets.
|
||||
pub write_account_changesets: bool,
|
||||
/// Whether to write storage changesets.
|
||||
pub write_storage_changesets: bool,
|
||||
}
|
||||
|
||||
impl Default for StateWriteConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
write_receipts: true,
|
||||
write_account_changesets: true,
|
||||
write_storage_changesets: true,
|
||||
}
|
||||
Self { write_receipts: true, write_account_changesets: true }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,12 +219,11 @@ where
|
||||
let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
|
||||
|
||||
// Step 5: Collect all account trie nodes that changed in the target block
|
||||
let account_nodes_ref = changesets.account_nodes_ref();
|
||||
let mut account_nodes = Vec::with_capacity(account_nodes_ref.len());
|
||||
let mut account_nodes = Vec::new();
|
||||
let mut account_cursor = cursor_factory.account_trie_cursor()?;
|
||||
|
||||
// Iterate over the account nodes from the changesets
|
||||
for (nibbles, _old_node) in account_nodes_ref {
|
||||
for (nibbles, _old_node) in changesets.account_nodes_ref() {
|
||||
// Look up the current value of this trie node using the overlay cursor
|
||||
let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
|
||||
account_nodes.push((*nibbles, node_value));
|
||||
@@ -236,11 +235,10 @@ where
|
||||
// Iterate over the storage tries from the changesets
|
||||
for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
|
||||
let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
|
||||
let storage_nodes_ref = storage_changeset.storage_nodes_ref();
|
||||
let mut storage_nodes = Vec::with_capacity(storage_nodes_ref.len());
|
||||
let mut storage_nodes = Vec::new();
|
||||
|
||||
// Iterate over the storage nodes for this account
|
||||
for (nibbles, _old_node) in storage_nodes_ref {
|
||||
for (nibbles, _old_node) in storage_changeset.storage_nodes_ref() {
|
||||
// Look up the current value of this storage trie node
|
||||
let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
|
||||
storage_nodes.push((*nibbles, node_value));
|
||||
|
||||
@@ -64,7 +64,7 @@ impl<C> DatabaseAccountTrieCursor<C> {
|
||||
|
||||
impl<C> TrieCursor for DatabaseAccountTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::AccountsTrie> + Send,
|
||||
C: DbCursorRO<tables::AccountsTrie> + Send + Sync,
|
||||
{
|
||||
/// Seeks an exact match for the provided key in the account trie.
|
||||
fn seek_exact(
|
||||
@@ -160,7 +160,7 @@ where
|
||||
|
||||
impl<C> TrieCursor for DatabaseStorageTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send + Sync,
|
||||
{
|
||||
/// Seeks an exact match for the given key in the storage trie.
|
||||
fn seek_exact(
|
||||
@@ -202,7 +202,7 @@ where
|
||||
|
||||
impl<C> TrieStorageCursor for DatabaseStorageTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send + Sync,
|
||||
{
|
||||
fn set_hashed_address(&mut self, hashed_address: B256) {
|
||||
self.hashed_address = hashed_address;
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::{
|
||||
root::ParallelStateRootError,
|
||||
stats::{ParallelTrieStats, ParallelTrieTracker},
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
|
||||
value_encoder::AsyncAccountValueEncoder,
|
||||
StorageRootTargets,
|
||||
};
|
||||
use alloy_primitives::{
|
||||
@@ -65,8 +65,6 @@ use reth_trie_common::{
|
||||
};
|
||||
use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
@@ -84,22 +82,6 @@ use crate::proof_task_metrics::{
|
||||
|
||||
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
|
||||
|
||||
/// Type alias for the V2 account proof calculator.
|
||||
type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
|
||||
<Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
|
||||
<Provider as HashedCursorFactory>::AccountCursor<'a>,
|
||||
AsyncAccountValueEncoder<
|
||||
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
|
||||
<Provider as HashedCursorFactory>::StorageCursor<'a>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Type alias for the V2 storage proof calculator.
|
||||
type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
|
||||
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
|
||||
<Provider as HashedCursorFactory>::StorageCursor<'a>,
|
||||
>;
|
||||
|
||||
/// A handle that provides type-safe access to proof worker pools.
|
||||
///
|
||||
/// The handle stores direct senders to both storage and account worker pools,
|
||||
@@ -561,6 +543,15 @@ where
|
||||
ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
|
||||
storage_node_provider.trie_node(path)
|
||||
}
|
||||
|
||||
/// Process a blinded account node request.
|
||||
///
|
||||
/// Used by account workers to retrieve blinded account trie nodes for proof construction.
|
||||
fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
|
||||
let account_node_provider =
|
||||
ProofBlindedAccountProvider::new(&self.provider, &self.provider);
|
||||
account_node_provider.trie_node(path)
|
||||
}
|
||||
}
|
||||
impl TrieNodeProviderFactory for ProofWorkerHandle {
|
||||
type AccountNodeProvider = ProofTaskTrieNodeProvider;
|
||||
@@ -897,12 +888,7 @@ where
|
||||
// Initially mark this worker as available.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let mut total_idle_time = Duration::ZERO;
|
||||
let mut idle_start = Instant::now();
|
||||
|
||||
while let Ok(job) = self.work_rx.recv() {
|
||||
total_idle_time += idle_start.elapsed();
|
||||
|
||||
// Mark worker as busy.
|
||||
self.available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
@@ -932,8 +918,6 @@ where
|
||||
|
||||
// Mark worker as available again.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
idle_start = Instant::now();
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -941,14 +925,12 @@ where
|
||||
worker_id = self.worker_id,
|
||||
storage_proofs_processed,
|
||||
storage_nodes_processed,
|
||||
total_idle_time_us = total_idle_time.as_micros(),
|
||||
"Storage worker shutting down"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
self.metrics.record_storage_nodes(storage_nodes_processed as usize);
|
||||
self.metrics.record_storage_worker_idle_time(total_idle_time);
|
||||
self.cursor_metrics.record(&mut cursor_metrics_cache);
|
||||
}
|
||||
|
||||
@@ -1112,7 +1094,7 @@ struct AccountProofWorker<Factory> {
|
||||
work_rx: CrossbeamReceiver<AccountWorkerJob>,
|
||||
/// Unique identifier for this worker (used for tracing)
|
||||
worker_id: usize,
|
||||
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
|
||||
/// Channel for dispatching storage proof work
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
/// Counter tracking worker availability
|
||||
available_workers: Arc<AtomicUsize>,
|
||||
@@ -1183,7 +1165,9 @@ where
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
/// continue operating and the system degrades gracefully.
|
||||
fn run(mut self) -> ProviderResult<()> {
|
||||
// Create provider from factory
|
||||
let provider = self.task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, self.worker_id);
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -1195,64 +1179,39 @@ where
|
||||
let mut account_nodes_processed = 0u64;
|
||||
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
|
||||
|
||||
// Create both account and storage calculators for V2 proofs.
|
||||
// The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
|
||||
let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled {
|
||||
let account_trie_cursor = provider.account_trie_cursor()?;
|
||||
let account_hashed_cursor = provider.hashed_account_cursor()?;
|
||||
|
||||
let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
|
||||
let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
|
||||
|
||||
(
|
||||
Some(proof_v2::ProofCalculator::<
|
||||
_,
|
||||
_,
|
||||
AsyncAccountValueEncoder<
|
||||
<Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
|
||||
<Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
|
||||
>,
|
||||
>::new(account_trie_cursor, account_hashed_cursor)),
|
||||
Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
|
||||
storage_trie_cursor,
|
||||
storage_hashed_cursor,
|
||||
)))),
|
||||
)
|
||||
let mut v2_calculator = if self.v2_enabled {
|
||||
let trie_cursor = proof_tx.provider.account_trie_cursor()?;
|
||||
let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
|
||||
Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
|
||||
trie_cursor,
|
||||
hashed_cursor,
|
||||
))
|
||||
} else {
|
||||
(None, None)
|
||||
None
|
||||
};
|
||||
|
||||
// Count this worker as available only after successful initialization.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let mut total_idle_time = Duration::ZERO;
|
||||
let mut idle_start = Instant::now();
|
||||
let mut value_encoder_stats_cache = ValueEncoderStats::default();
|
||||
|
||||
while let Ok(job) = self.work_rx.recv() {
|
||||
total_idle_time += idle_start.elapsed();
|
||||
|
||||
// Mark worker as busy.
|
||||
self.available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
AccountWorkerJob::AccountMultiproof { input } => {
|
||||
let value_encoder_stats = self.process_account_multiproof(
|
||||
&provider,
|
||||
v2_account_calculator.as_mut(),
|
||||
v2_storage_calculator.clone(),
|
||||
self.process_account_multiproof(
|
||||
&proof_tx,
|
||||
v2_calculator.as_mut(),
|
||||
*input,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
total_idle_time += value_encoder_stats.storage_wait_time;
|
||||
value_encoder_stats_cache.extend(&value_encoder_stats);
|
||||
}
|
||||
|
||||
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
|
||||
Self::process_blinded_node(
|
||||
self.worker_id,
|
||||
&provider,
|
||||
&proof_tx,
|
||||
path,
|
||||
result_sender,
|
||||
&mut account_nodes_processed,
|
||||
@@ -1262,8 +1221,6 @@ where
|
||||
|
||||
// Mark worker as available again.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
idle_start = Instant::now();
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -1271,16 +1228,13 @@ where
|
||||
worker_id=self.worker_id,
|
||||
account_proofs_processed,
|
||||
account_nodes_processed,
|
||||
total_idle_time_us = total_idle_time.as_micros(),
|
||||
"Account worker shutting down"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
self.metrics.record_account_nodes(account_nodes_processed as usize);
|
||||
self.metrics.record_account_worker_idle_time(total_idle_time);
|
||||
self.cursor_metrics.record(&mut cursor_metrics_cache);
|
||||
self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1288,13 +1242,13 @@ where
|
||||
|
||||
fn compute_legacy_account_multiproof<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
targets: MultiProofTargets,
|
||||
mut prefix_sets: TriePrefixSets,
|
||||
collect_branch_node_masks: bool,
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
|
||||
) -> Result<(ProofResult, Duration), ParallelStateRootError>
|
||||
) -> Result<ProofResult, ParallelStateRootError>
|
||||
where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
@@ -1302,7 +1256,6 @@ where
|
||||
target: "trie::proof_task",
|
||||
"Account multiproof calculation",
|
||||
targets = targets.len(),
|
||||
num_slots = targets.values().map(|slots| slots.len()).sum::<usize>(),
|
||||
worker_id=self.worker_id,
|
||||
);
|
||||
let _span_guard = span.enter();
|
||||
@@ -1340,27 +1293,28 @@ where
|
||||
cached_storage_roots: &self.cached_storage_roots,
|
||||
};
|
||||
|
||||
let mut storage_wait_time = Duration::ZERO;
|
||||
let result = build_account_multiproof_with_storage_roots(
|
||||
provider,
|
||||
&proof_tx.provider,
|
||||
ctx,
|
||||
&mut tracker,
|
||||
proof_cursor_metrics,
|
||||
&mut storage_wait_time,
|
||||
)?;
|
||||
);
|
||||
|
||||
let stats = tracker.finish();
|
||||
Ok((ProofResult::Legacy(result, stats), storage_wait_time))
|
||||
result.map(|proof| ProofResult::Legacy(proof, stats))
|
||||
}
|
||||
|
||||
fn compute_v2_account_multiproof<'a, Provider>(
|
||||
fn compute_v2_account_multiproof<Provider>(
|
||||
&self,
|
||||
v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
|
||||
v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
|
||||
v2_calculator: &mut proof_v2::ProofCalculator<
|
||||
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
|
||||
<Provider as HashedCursorFactory>::AccountCursor<'_>,
|
||||
AsyncAccountValueEncoder,
|
||||
>,
|
||||
targets: MultiProofTargetsV2,
|
||||
) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError>
|
||||
) -> Result<ProofResult, ParallelStateRootError>
|
||||
where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
|
||||
|
||||
@@ -1379,75 +1333,64 @@ where
|
||||
dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
|
||||
|
||||
let mut value_encoder = AsyncAccountValueEncoder::new(
|
||||
self.storage_work_tx.clone(),
|
||||
storage_proof_receivers,
|
||||
self.cached_storage_roots.clone(),
|
||||
v2_storage_calculator,
|
||||
);
|
||||
|
||||
let account_proofs =
|
||||
v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
|
||||
let proof = DecodedMultiProofV2 {
|
||||
account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
|
||||
storage_proofs: value_encoder.into_storage_proofs()?,
|
||||
};
|
||||
|
||||
let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
|
||||
|
||||
let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
|
||||
|
||||
Ok((ProofResult::V2(proof), value_encoder_stats))
|
||||
Ok(ProofResult::V2(proof))
|
||||
}
|
||||
|
||||
/// Processes an account multiproof request.
|
||||
///
|
||||
/// Returns stats from the value encoder used during proof computation.
|
||||
fn process_account_multiproof<'a, Provider>(
|
||||
fn process_account_multiproof<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>,
|
||||
v2_storage_calculator: Option<Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>>,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
v2_calculator: Option<
|
||||
&mut proof_v2::ProofCalculator<
|
||||
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
|
||||
<Provider as HashedCursorFactory>::AccountCursor<'_>,
|
||||
AsyncAccountValueEncoder,
|
||||
>,
|
||||
>,
|
||||
input: AccountMultiproofInput,
|
||||
account_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) -> ValueEncoderStats
|
||||
where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
|
||||
) where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
|
||||
let proof_start = Instant::now();
|
||||
|
||||
let (proof_result_sender, result, value_encoder_stats) = match input {
|
||||
let (proof_result_sender, result) = match input {
|
||||
AccountMultiproofInput::Legacy {
|
||||
targets,
|
||||
prefix_sets,
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys,
|
||||
proof_result_sender,
|
||||
} => {
|
||||
let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof(
|
||||
provider,
|
||||
} => (
|
||||
proof_result_sender,
|
||||
self.compute_legacy_account_multiproof(
|
||||
proof_tx,
|
||||
targets,
|
||||
prefix_sets,
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys,
|
||||
&mut proof_cursor_metrics,
|
||||
) {
|
||||
Ok((proof, wait_time)) => (
|
||||
Ok(proof),
|
||||
ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() },
|
||||
),
|
||||
Err(e) => (Err(e), ValueEncoderStats::default()),
|
||||
};
|
||||
(proof_result_sender, result, value_encoder_stats)
|
||||
}
|
||||
AccountMultiproofInput::V2 { targets, proof_result_sender } => {
|
||||
let (result, value_encoder_stats) = match self
|
||||
.compute_v2_account_multiproof::<Provider>(
|
||||
v2_account_calculator.expect("v2 account calculator provided"),
|
||||
v2_storage_calculator.expect("v2 storage calculator provided"),
|
||||
targets,
|
||||
) {
|
||||
Ok((proof, stats)) => (Ok(proof), stats),
|
||||
Err(e) => (Err(e), ValueEncoderStats::default()),
|
||||
};
|
||||
(proof_result_sender, result, value_encoder_stats)
|
||||
}
|
||||
),
|
||||
),
|
||||
AccountMultiproofInput::V2 { targets, proof_result_sender } => (
|
||||
proof_result_sender,
|
||||
self.compute_v2_account_multiproof::<Provider>(
|
||||
v2_calculator.expect("v2 calculator provided"),
|
||||
targets,
|
||||
),
|
||||
),
|
||||
};
|
||||
|
||||
let ProofResultContext {
|
||||
@@ -1500,14 +1443,12 @@ where
|
||||
#[cfg(feature = "metrics")]
|
||||
// Accumulate per-proof metrics into the worker's cache
|
||||
cursor_metrics_cache.extend(&proof_cursor_metrics);
|
||||
|
||||
value_encoder_stats
|
||||
}
|
||||
|
||||
/// Processes a blinded account node lookup request.
|
||||
fn process_blinded_node<Provider>(
|
||||
worker_id: usize,
|
||||
provider: &Provider,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
path: Nibbles,
|
||||
result_sender: Sender<TrieNodeProviderResult>,
|
||||
account_nodes_processed: &mut u64,
|
||||
@@ -1528,8 +1469,7 @@ where
|
||||
);
|
||||
|
||||
let start = Instant::now();
|
||||
let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
|
||||
let result = account_node_provider.trie_node(&path);
|
||||
let result = proof_tx.process_blinded_account_node(&path);
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
*account_nodes_processed += 1;
|
||||
@@ -1560,13 +1500,11 @@ where
|
||||
/// enabling interleaved parallelism between account trie traversal and storage proof computation.
|
||||
///
|
||||
/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
|
||||
/// Also accumulates the time spent waiting for storage proofs into `storage_wait_time`.
|
||||
fn build_account_multiproof_with_storage_roots<P>(
|
||||
provider: &P,
|
||||
ctx: AccountMultiproofParams<'_>,
|
||||
tracker: &mut ParallelTrieTracker,
|
||||
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
|
||||
storage_wait_time: &mut Duration,
|
||||
) -> Result<DecodedMultiProof, ParallelStateRootError>
|
||||
where
|
||||
P: TrieCursorFactory + HashedCursorFactory,
|
||||
@@ -1630,7 +1568,6 @@ where
|
||||
);
|
||||
// Block on this specific storage proof receiver - enables interleaved
|
||||
// parallelism
|
||||
let wait_start = Instant::now();
|
||||
let proof_msg = receiver.recv().map_err(|_| {
|
||||
ParallelStateRootError::StorageRoot(
|
||||
reth_execution_errors::StorageRootError::Database(
|
||||
@@ -1640,7 +1577,6 @@ where
|
||||
),
|
||||
)
|
||||
})?;
|
||||
*storage_wait_time += wait_start.elapsed();
|
||||
|
||||
drop(_guard);
|
||||
|
||||
@@ -1732,9 +1668,7 @@ where
|
||||
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
|
||||
// Done last to allow storage workers more time to complete while we finalized the account trie.
|
||||
for (hashed_address, receiver) in storage_proof_receivers {
|
||||
let wait_start = Instant::now();
|
||||
if let Ok(proof_msg) = receiver.recv() {
|
||||
*storage_wait_time += wait_start.elapsed();
|
||||
let proof_result = proof_msg.result?;
|
||||
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
|
||||
.expect("Partial proofs are not yet supported");
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
use crate::value_encoder::ValueEncoderStats;
|
||||
use reth_metrics::{metrics::Histogram, Metrics};
|
||||
use reth_trie::{
|
||||
hashed_cursor::{HashedCursorMetrics, HashedCursorMetricsCache},
|
||||
trie_cursor::{TrieCursorMetrics, TrieCursorMetricsCache},
|
||||
TrieType,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
/// Metrics for the proof task.
|
||||
#[derive(Clone, Metrics)]
|
||||
@@ -15,17 +13,6 @@ pub struct ProofTaskTrieMetrics {
|
||||
blinded_account_nodes: Histogram,
|
||||
/// A histogram for the number of blinded storage nodes fetched.
|
||||
blinded_storage_nodes: Histogram,
|
||||
/// Histogram for storage worker idle time in seconds (waiting for proof jobs).
|
||||
storage_worker_idle_time_seconds: Histogram,
|
||||
/// Histogram for account worker idle time in seconds (waiting for proof jobs + storage
|
||||
/// results).
|
||||
account_worker_idle_time_seconds: Histogram,
|
||||
/// Histogram for `Dispatched` deferred encoder variant count.
|
||||
deferred_encoder_dispatched: Histogram,
|
||||
/// Histogram for `FromCache` deferred encoder variant count.
|
||||
deferred_encoder_from_cache: Histogram,
|
||||
/// Histogram for `Sync` deferred encoder variant count.
|
||||
deferred_encoder_sync: Histogram,
|
||||
}
|
||||
|
||||
impl ProofTaskTrieMetrics {
|
||||
@@ -38,23 +25,6 @@ impl ProofTaskTrieMetrics {
|
||||
pub fn record_storage_nodes(&self, count: usize) {
|
||||
self.blinded_storage_nodes.record(count as f64);
|
||||
}
|
||||
|
||||
/// Record storage worker idle time.
|
||||
pub fn record_storage_worker_idle_time(&self, duration: Duration) {
|
||||
self.storage_worker_idle_time_seconds.record(duration.as_secs_f64());
|
||||
}
|
||||
|
||||
/// Record account worker idle time.
|
||||
pub fn record_account_worker_idle_time(&self, duration: Duration) {
|
||||
self.account_worker_idle_time_seconds.record(duration.as_secs_f64());
|
||||
}
|
||||
|
||||
/// Record value encoder stats (deferred encoder variant counts).
|
||||
pub(crate) fn record_value_encoder_stats(&self, stats: &ValueEncoderStats) {
|
||||
self.deferred_encoder_dispatched.record(stats.dispatched_count as f64);
|
||||
self.deferred_encoder_from_cache.record(stats.from_cache_count as f64);
|
||||
self.deferred_encoder_sync.record(stats.sync_count as f64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cursor metrics for proof task operations.
|
||||
|
||||
@@ -1,79 +1,36 @@
|
||||
use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
|
||||
use crate::proof_task::{
|
||||
StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob,
|
||||
};
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_rlp::Encodable;
|
||||
use core::cell::RefCell;
|
||||
use crossbeam_channel::Receiver as CrossbeamReceiver;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use dashmap::DashMap;
|
||||
use reth_execution_errors::trie::StateProofError;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
use reth_trie::{
|
||||
hashed_cursor::HashedStorageCursor,
|
||||
proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
|
||||
trie_cursor::TrieStorageCursor,
|
||||
proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target},
|
||||
ProofTrieNode,
|
||||
};
|
||||
use std::{
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation.
|
||||
///
|
||||
/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub(crate) struct ValueEncoderStats {
|
||||
/// Accumulated time spent waiting for storage proof results from dispatched workers.
|
||||
pub(crate) storage_wait_time: Duration,
|
||||
/// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers).
|
||||
pub(crate) dispatched_count: u64,
|
||||
/// Number of times the `FromCache` variant was used (storage root already cached).
|
||||
pub(crate) from_cache_count: u64,
|
||||
/// Number of times the `Sync` variant was used (synchronous computation).
|
||||
pub(crate) sync_count: u64,
|
||||
}
|
||||
|
||||
impl ValueEncoderStats {
|
||||
/// Extends this metrics by adding the values from another.
|
||||
pub(crate) fn extend(&mut self, other: &Self) {
|
||||
self.storage_wait_time += other.storage_wait_time;
|
||||
self.dispatched_count += other.dispatched_count;
|
||||
self.from_cache_count += other.from_cache_count;
|
||||
self.sync_count += other.sync_count;
|
||||
}
|
||||
}
|
||||
use std::{rc::Rc, sync::Arc};
|
||||
|
||||
/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
|
||||
pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
|
||||
/// A storage proof job was dispatched to the worker pool.
|
||||
pub(crate) enum AsyncAccountDeferredValueEncoder {
|
||||
Dispatched {
|
||||
hashed_address: B256,
|
||||
account: Account,
|
||||
proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
|
||||
/// Shared storage proof results.
|
||||
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
|
||||
/// Shared stats for tracking wait time and counts.
|
||||
stats: Rc<RefCell<ValueEncoderStats>>,
|
||||
// None if results shouldn't be retained for this dispatched proof.
|
||||
storage_proof_results: Option<Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>>,
|
||||
},
|
||||
/// The storage root was found in cache.
|
||||
FromCache { account: Account, root: B256 },
|
||||
/// Synchronous storage root computation.
|
||||
Sync {
|
||||
/// Shared storage proof calculator for computing storage roots.
|
||||
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
|
||||
hashed_address: B256,
|
||||
FromCache {
|
||||
account: Account,
|
||||
/// Cache to store computed storage roots for future reuse.
|
||||
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
||||
root: B256,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
|
||||
where
|
||||
TC: TrieStorageCursor,
|
||||
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
|
||||
{
|
||||
impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
|
||||
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
|
||||
let (account, root) = match self {
|
||||
Self::Dispatched {
|
||||
@@ -81,9 +38,7 @@ where
|
||||
account,
|
||||
proof_result_rx,
|
||||
storage_proof_results,
|
||||
stats,
|
||||
} => {
|
||||
let wait_start = Instant::now();
|
||||
let result = proof_result_rx?
|
||||
.recv()
|
||||
.map_err(|_| {
|
||||
@@ -92,27 +47,18 @@ where
|
||||
)))
|
||||
})?
|
||||
.result?;
|
||||
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
|
||||
|
||||
let StorageProofResult::V2 { root: Some(root), proof } = result else {
|
||||
panic!("StorageProofResult is not V2 with root: {result:?}")
|
||||
};
|
||||
|
||||
storage_proof_results.borrow_mut().insert(hashed_address, proof);
|
||||
if let Some(storage_proof_results) = storage_proof_results.as_ref() {
|
||||
storage_proof_results.borrow_mut().insert(hashed_address, proof);
|
||||
}
|
||||
|
||||
(account, root)
|
||||
}
|
||||
Self::FromCache { account, root } => (account, root),
|
||||
Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
|
||||
let mut calculator = storage_calculator.borrow_mut();
|
||||
let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?;
|
||||
let storage_root = calculator
|
||||
.compute_root_hash(&proof)?
|
||||
.expect("storage_proof with dummy target always returns root");
|
||||
|
||||
cached_storage_roots.insert(hashed_address, storage_root);
|
||||
(account, storage_root)
|
||||
}
|
||||
};
|
||||
|
||||
let account = account.into_trie_account(root);
|
||||
@@ -121,15 +67,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the [`LeafValueEncoder`] trait for accounts.
|
||||
///
|
||||
/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are
|
||||
/// being computed asynchronously by worker threads.
|
||||
///
|
||||
/// For accounts without pre-dispatched proofs or cached roots, uses a shared
|
||||
/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across
|
||||
/// multiple accounts.
|
||||
pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
|
||||
/// Implements the [`LeafValueEncoder`] trait for accounts using a [`CrossbeamSender`] to dispatch
|
||||
/// and compute storage roots asynchronously. Can also accept a set of already dispatched account
|
||||
/// storage proofs, for cases where it's possible to determine some necessary accounts ahead of
|
||||
/// time.
|
||||
pub(crate) struct AsyncAccountValueEncoder {
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
/// Storage proof jobs which were dispatched ahead of time.
|
||||
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
|
||||
/// Storage roots which have already been computed. This can be used only if a storage proof
|
||||
@@ -138,59 +81,39 @@ pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
|
||||
/// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
|
||||
/// required because [`DeferredValueEncoder`] cannot have a lifetime.
|
||||
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
|
||||
/// Shared storage proof calculator for synchronous computation. Reuses cursors and internal
|
||||
/// buffers across multiple storage root calculations.
|
||||
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
|
||||
/// Shared stats for tracking wait time and variant counts.
|
||||
stats: Rc<RefCell<ValueEncoderStats>>,
|
||||
}
|
||||
|
||||
impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
|
||||
/// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate
|
||||
/// storage roots synchronously.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `dispatched`: Pre-dispatched storage proof receivers for target accounts
|
||||
/// - `cached_storage_roots`: Shared cache of already-computed storage roots
|
||||
/// - `storage_calculator`: Shared storage proof calculator for synchronous computation
|
||||
impl AsyncAccountValueEncoder {
|
||||
/// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage
|
||||
/// roots asynchronously.
|
||||
pub(crate) fn new(
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
|
||||
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
||||
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
storage_work_tx,
|
||||
dispatched,
|
||||
cached_storage_roots,
|
||||
storage_proof_results: Default::default(),
|
||||
storage_calculator,
|
||||
stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume [`Self`] and return all collected storage proofs along with accumulated stats.
|
||||
///
|
||||
/// This method collects any remaining dispatched proofs that weren't consumed during proof
|
||||
/// calculation and includes their wait time in the returned stats.
|
||||
/// Consume [`Self`] and return all collected storage proofs which had been dispatched.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
|
||||
/// been dropped.
|
||||
pub(crate) fn finalize(
|
||||
pub(crate) fn into_storage_proofs(
|
||||
self,
|
||||
) -> Result<(B256Map<Vec<ProofTrieNode>>, ValueEncoderStats), StateProofError> {
|
||||
) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {
|
||||
let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
|
||||
.expect("no deferred encoders are still allocated")
|
||||
.into_inner();
|
||||
|
||||
let mut stats = Rc::into_inner(self.stats)
|
||||
.expect("no deferred encoders are still allocated")
|
||||
.into_inner();
|
||||
|
||||
// Any remaining dispatched proofs need to have their results collected.
|
||||
// These are proofs that were pre-dispatched but not consumed during proof calculation.
|
||||
// Any remaining dispatched proofs need to have their results collected
|
||||
for (hashed_address, rx) in &self.dispatched {
|
||||
let wait_start = Instant::now();
|
||||
let result = rx
|
||||
.recv()
|
||||
.map_err(|_| {
|
||||
@@ -199,7 +122,6 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
|
||||
)))
|
||||
})?
|
||||
.result?;
|
||||
stats.storage_wait_time += wait_start.elapsed();
|
||||
|
||||
let StorageProofResult::V2 { proof, .. } = result else {
|
||||
panic!("StorageProofResult is not V2: {result:?}")
|
||||
@@ -208,17 +130,13 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
|
||||
storage_proof_results.insert(*hashed_address, proof);
|
||||
}
|
||||
|
||||
Ok((storage_proof_results, stats))
|
||||
Ok(storage_proof_results)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
|
||||
where
|
||||
TC: TrieStorageCursor,
|
||||
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
|
||||
{
|
||||
impl LeafValueEncoder for AsyncAccountValueEncoder {
|
||||
type Value = Account;
|
||||
type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
|
||||
type DeferredEncoder = AsyncAccountDeferredValueEncoder;
|
||||
|
||||
fn deferred_encoder(
|
||||
&mut self,
|
||||
@@ -228,13 +146,11 @@ where
|
||||
// If the proof job has already been dispatched for this account then it's not necessary to
|
||||
// dispatch another.
|
||||
if let Some(rx) = self.dispatched.remove(&hashed_address) {
|
||||
self.stats.borrow_mut().dispatched_count += 1;
|
||||
return AsyncAccountDeferredValueEncoder::Dispatched {
|
||||
hashed_address,
|
||||
account,
|
||||
proof_result_rx: Ok(rx),
|
||||
storage_proof_results: self.storage_proof_results.clone(),
|
||||
stats: self.stats.clone(),
|
||||
storage_proof_results: Some(self.storage_proof_results.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,17 +159,25 @@ where
|
||||
|
||||
// If the root is already calculated then just use it directly
|
||||
if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
|
||||
self.stats.borrow_mut().from_cache_count += 1;
|
||||
return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
|
||||
}
|
||||
|
||||
// Compute storage root synchronously using the shared calculator
|
||||
self.stats.borrow_mut().sync_count += 1;
|
||||
AsyncAccountDeferredValueEncoder::Sync {
|
||||
storage_calculator: self.storage_calculator.clone(),
|
||||
// Create a proof input which targets a bogus key, so that we calculate the root as a
|
||||
// side-effect.
|
||||
let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]);
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
let proof_result_rx = self
|
||||
.storage_work_tx
|
||||
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx })
|
||||
.map_err(|_| DatabaseError::Other("storage workers unavailable".to_string()))
|
||||
.map(|_| rx);
|
||||
|
||||
AsyncAccountDeferredValueEncoder::Dispatched {
|
||||
hashed_address,
|
||||
account,
|
||||
cached_storage_roots: self.cached_storage_roots.clone(),
|
||||
proof_result_rx,
|
||||
storage_proof_results: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_trie_common::{
|
||||
use reth_trie_sparse::{
|
||||
provider::{RevealedNode, TrieNodeProvider},
|
||||
LeafLookup, LeafLookupError, RlpNodeStackItem, SparseNode, SparseNodeType, SparseTrie,
|
||||
SparseTrieExt, SparseTrieUpdates,
|
||||
SparseTrieUpdates,
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
use std::cmp::{Ord, Ordering, PartialOrd};
|
||||
@@ -908,162 +908,6 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
}
|
||||
}
|
||||
|
||||
impl SparseTrieExt for ParallelSparseTrie {
|
||||
/// Returns the count of revealed (non-hash) nodes across all subtries.
|
||||
fn revealed_node_count(&self) -> usize {
|
||||
let upper_count = self.upper_subtrie.nodes.values().filter(|n| !n.is_hash()).count();
|
||||
|
||||
let lower_count: usize = self
|
||||
.lower_subtries
|
||||
.iter()
|
||||
.filter_map(|s| s.as_revealed_ref())
|
||||
.map(|s| s.nodes.values().filter(|n| !n.is_hash()).count())
|
||||
.sum();
|
||||
|
||||
upper_count + lower_count
|
||||
}
|
||||
|
||||
fn prune(&mut self, max_depth: usize) -> usize {
|
||||
// DFS traversal to find nodes at max_depth that can be pruned.
|
||||
// Collects "effective pruned roots" - children of nodes at max_depth with computed hashes.
|
||||
// We replace nodes with Hash stubs inline during traversal.
|
||||
let mut effective_pruned_roots = Vec::<(Nibbles, B256)>::new();
|
||||
let mut stack: SmallVec<[(Nibbles, usize); 32]> = SmallVec::new();
|
||||
stack.push((Nibbles::default(), 0));
|
||||
|
||||
// DFS traversal: pop path and depth, skip if subtrie or node not found.
|
||||
while let Some((path, depth)) = stack.pop() {
|
||||
// Get children to visit from current node (immutable access)
|
||||
let children: SmallVec<[Nibbles; 16]> = {
|
||||
let Some(subtrie) = self.subtrie_for_path(&path) else { continue };
|
||||
let Some(node) = subtrie.nodes.get(&path) else { continue };
|
||||
|
||||
match node {
|
||||
SparseNode::Empty | SparseNode::Hash(_) | SparseNode::Leaf { .. } => {
|
||||
SmallVec::new()
|
||||
}
|
||||
SparseNode::Extension { key, .. } => {
|
||||
let mut child = path;
|
||||
child.extend(key);
|
||||
SmallVec::from_buf_and_len([child; 16], 1)
|
||||
}
|
||||
SparseNode::Branch { state_mask, .. } => {
|
||||
let mut children = SmallVec::new();
|
||||
let mut mask = state_mask.get();
|
||||
while mask != 0 {
|
||||
let nibble = mask.trailing_zeros() as u8;
|
||||
mask &= mask - 1;
|
||||
let mut child = path;
|
||||
child.push_unchecked(nibble);
|
||||
children.push(child);
|
||||
}
|
||||
children
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Process children - either continue traversal or prune
|
||||
for child in children {
|
||||
if depth == max_depth {
|
||||
// Check if child has a computed hash and replace inline
|
||||
let hash = self
|
||||
.subtrie_for_path(&child)
|
||||
.and_then(|s| s.nodes.get(&child))
|
||||
.filter(|n| !n.is_hash())
|
||||
.and_then(|n| n.hash());
|
||||
|
||||
if let Some(hash) = hash {
|
||||
self.subtrie_for_path_mut(&child)
|
||||
.nodes
|
||||
.insert(child, SparseNode::Hash(hash));
|
||||
effective_pruned_roots.push((child, hash));
|
||||
}
|
||||
} else {
|
||||
stack.push((child, depth + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if effective_pruned_roots.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let nodes_converted = effective_pruned_roots.len();
|
||||
|
||||
// Sort roots by subtrie type (upper first), then by path for efficient partitioning.
|
||||
effective_pruned_roots.sort_unstable_by(|(path_a, _), (path_b, _)| {
|
||||
let subtrie_type_a = SparseSubtrieType::from_path(path_a);
|
||||
let subtrie_type_b = SparseSubtrieType::from_path(path_b);
|
||||
subtrie_type_a.cmp(&subtrie_type_b).then(path_a.cmp(path_b))
|
||||
});
|
||||
|
||||
// Split off upper subtrie roots (they come first due to sorting)
|
||||
let num_upper_roots = effective_pruned_roots
|
||||
.iter()
|
||||
.position(|(p, _)| !SparseSubtrieType::path_len_is_upper(p.len()))
|
||||
.unwrap_or(effective_pruned_roots.len());
|
||||
|
||||
let roots_upper = &effective_pruned_roots[..num_upper_roots];
|
||||
let roots_lower = &effective_pruned_roots[num_upper_roots..];
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let mut all_roots: Vec<_> = effective_pruned_roots.iter().map(|(p, _)| p).collect();
|
||||
all_roots.sort_unstable();
|
||||
all_roots.windows(2).all(|w| !w[1].starts_with(w[0]))
|
||||
},
|
||||
"prune roots must be prefix-free"
|
||||
);
|
||||
|
||||
// Upper prune roots that are prefixes of lower subtrie root paths cause the entire
|
||||
// subtrie to be cleared (preserving allocations for reuse).
|
||||
if !roots_upper.is_empty() {
|
||||
for subtrie in &mut self.lower_subtries {
|
||||
let should_clear = subtrie.as_revealed_ref().is_some_and(|s| {
|
||||
let search_idx = roots_upper.partition_point(|(root, _)| root <= &s.path);
|
||||
search_idx > 0 && s.path.starts_with(&roots_upper[search_idx - 1].0)
|
||||
});
|
||||
if should_clear {
|
||||
subtrie.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Upper subtrie: prune nodes and values
|
||||
self.upper_subtrie.nodes.retain(|p, _| !is_strict_descendant_in(roots_upper, p));
|
||||
self.upper_subtrie.inner.values.retain(|p, _| {
|
||||
!starts_with_pruned_in(roots_upper, p) && !starts_with_pruned_in(roots_lower, p)
|
||||
});
|
||||
|
||||
// Process lower subtries using chunk_by to group roots by subtrie
|
||||
for roots_group in roots_lower.chunk_by(|(path_a, _), (path_b, _)| {
|
||||
SparseSubtrieType::from_path(path_a) == SparseSubtrieType::from_path(path_b)
|
||||
}) {
|
||||
let subtrie_idx = path_subtrie_index_unchecked(&roots_group[0].0);
|
||||
|
||||
// Skip unrevealed/blinded subtries - nothing to prune
|
||||
let Some(subtrie) = self.lower_subtries[subtrie_idx].as_revealed_mut() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Retain only nodes/values not descended from any pruned root.
|
||||
subtrie.nodes.retain(|p, _| !is_strict_descendant_in(roots_group, p));
|
||||
subtrie.inner.values.retain(|p, _| !starts_with_pruned_in(roots_group, p));
|
||||
}
|
||||
|
||||
// Branch node masks pruning
|
||||
self.branch_node_masks.retain(|p, _| {
|
||||
if SparseSubtrieType::path_len_is_upper(p.len()) {
|
||||
!starts_with_pruned_in(roots_upper, p)
|
||||
} else {
|
||||
!starts_with_pruned_in(roots_lower, p) && !starts_with_pruned_in(roots_upper, p)
|
||||
}
|
||||
});
|
||||
|
||||
nodes_converted
|
||||
}
|
||||
}
|
||||
|
||||
impl ParallelSparseTrie {
|
||||
/// Sets the thresholds that control when parallelism is used during operations.
|
||||
pub const fn with_parallelism_thresholds(mut self, thresholds: ParallelismThresholds) -> Self {
|
||||
@@ -2810,44 +2654,6 @@ fn path_subtrie_index_unchecked(path: &Nibbles) -> usize {
|
||||
path.get_byte_unchecked(0) as usize
|
||||
}
|
||||
|
||||
/// Checks if `path` is a strict descendant of any root in a sorted slice.
|
||||
///
|
||||
/// Uses binary search to find the candidate root that could be an ancestor.
|
||||
/// Returns `true` if `path` starts with a root and is longer (strict descendant).
|
||||
fn is_strict_descendant_in(roots: &[(Nibbles, B256)], path: &Nibbles) -> bool {
|
||||
if roots.is_empty() {
|
||||
return false;
|
||||
}
|
||||
debug_assert!(roots.windows(2).all(|w| w[0].0 <= w[1].0), "roots must be sorted by path");
|
||||
let idx = roots.partition_point(|(root, _)| root <= path);
|
||||
if idx > 0 {
|
||||
let candidate = &roots[idx - 1].0;
|
||||
if path.starts_with(candidate) && path.len() > candidate.len() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Checks if `path` starts with any root in a sorted slice (inclusive).
|
||||
///
|
||||
/// Uses binary search to find the candidate root that could be a prefix.
|
||||
/// Returns `true` if `path` starts with a root (including exact match).
|
||||
fn starts_with_pruned_in(roots: &[(Nibbles, B256)], path: &Nibbles) -> bool {
|
||||
if roots.is_empty() {
|
||||
return false;
|
||||
}
|
||||
debug_assert!(roots.windows(2).all(|w| w[0].0 <= w[1].0), "roots must be sorted by path");
|
||||
let idx = roots.partition_point(|(root, _)| root <= path);
|
||||
if idx > 0 {
|
||||
let candidate = &roots[idx - 1].0;
|
||||
if path.starts_with(candidate) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Used by lower subtries to communicate updates to the top-level [`SparseTrieUpdates`] set.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
enum SparseTrieUpdatesAction {
|
||||
@@ -2898,8 +2704,7 @@ mod tests {
|
||||
use reth_trie_db::DatabaseTrieCursorFactory;
|
||||
use reth_trie_sparse::{
|
||||
provider::{DefaultTrieNodeProvider, RevealedNode, TrieNodeProvider},
|
||||
LeafLookup, LeafLookupError, SerialSparseTrie, SparseNode, SparseTrie, SparseTrieExt,
|
||||
SparseTrieUpdates,
|
||||
LeafLookup, LeafLookupError, SerialSparseTrie, SparseNode, SparseTrie, SparseTrieUpdates,
|
||||
};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
@@ -2944,17 +2749,6 @@ mod tests {
|
||||
Account { nonce, ..Default::default() }
|
||||
}
|
||||
|
||||
fn large_account_value() -> Vec<u8> {
|
||||
let account = Account {
|
||||
nonce: 0x123456789abcdef,
|
||||
balance: U256::from(0x123456789abcdef0123456789abcdef_u128),
|
||||
..Default::default()
|
||||
};
|
||||
let mut buf = Vec::new();
|
||||
account.into_trie_account(EMPTY_ROOT_HASH).encode(&mut buf);
|
||||
buf
|
||||
}
|
||||
|
||||
fn encode_account_value(nonce: u64) -> Vec<u8> {
|
||||
let account = Account { nonce, ..Default::default() };
|
||||
let trie_account = account.into_trie_account(EMPTY_ROOT_HASH);
|
||||
@@ -7312,372 +7106,4 @@ mod tests {
|
||||
// Value should be retrievable
|
||||
assert_eq!(trie.get_leaf_value(&slot_path), Some(&slot_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_empty_suffix_key_regression() {
|
||||
// Regression test: when a leaf has an empty suffix key (full path == node path),
|
||||
// the value must be removed when that path becomes a pruned root.
|
||||
// This catches the bug where is_strict_descendant fails to remove p == pruned_root.
|
||||
|
||||
use reth_trie_sparse::provider::DefaultTrieNodeProvider;
|
||||
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut parallel = ParallelSparseTrie::default();
|
||||
|
||||
// Large value to ensure nodes have hashes (RLP >= 32 bytes)
|
||||
let value = {
|
||||
let account = Account {
|
||||
nonce: 0x123456789abcdef,
|
||||
balance: U256::from(0x123456789abcdef0123456789abcdef_u128),
|
||||
..Default::default()
|
||||
};
|
||||
let mut buf = Vec::new();
|
||||
account.into_trie_account(EMPTY_ROOT_HASH).encode(&mut buf);
|
||||
buf
|
||||
};
|
||||
|
||||
// Create a trie with multiple leaves to force a branch at root
|
||||
for i in 0..16u8 {
|
||||
parallel
|
||||
.update_leaf(
|
||||
Nibbles::from_nibbles([i, 0x1, 0x2, 0x3, 0x4, 0x5]),
|
||||
value.clone(),
|
||||
&provider,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Compute root to get hashes
|
||||
let root_before = parallel.root();
|
||||
|
||||
// Prune at depth 0: the children of root become pruned roots
|
||||
parallel.prune(0);
|
||||
|
||||
let root_after = parallel.root();
|
||||
assert_eq!(root_before, root_after, "root hash must be preserved");
|
||||
|
||||
// Key assertion: values under pruned paths must be removed
|
||||
// With the bug, values at pruned_root paths (not strict descendants) would remain
|
||||
for i in 0..16u8 {
|
||||
let path = Nibbles::from_nibbles([i, 0x1, 0x2, 0x3, 0x4, 0x5]);
|
||||
assert!(
|
||||
parallel.get_leaf_value(&path).is_none(),
|
||||
"value at {:?} should be removed after prune",
|
||||
path
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_at_various_depths() {
|
||||
for max_depth in [0, 1, 2] {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
for i in 0..4u8 {
|
||||
for j in 0..4u8 {
|
||||
for k in 0..4u8 {
|
||||
trie.update_leaf(
|
||||
Nibbles::from_nibbles([i, j, k, 0x1, 0x2, 0x3]),
|
||||
value.clone(),
|
||||
&provider,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let root_before = trie.root();
|
||||
let nodes_before = trie.revealed_node_count();
|
||||
|
||||
trie.prune(max_depth);
|
||||
|
||||
let root_after = trie.root();
|
||||
assert_eq!(root_before, root_after, "root hash should be preserved after prune");
|
||||
|
||||
let nodes_after = trie.revealed_node_count();
|
||||
assert!(
|
||||
nodes_after < nodes_before,
|
||||
"node count should decrease after prune at depth {max_depth}"
|
||||
);
|
||||
|
||||
if max_depth == 0 {
|
||||
assert_eq!(nodes_after, 1, "only root should be revealed after prune(0)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_empty_trie() {
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
trie.prune(2);
|
||||
let root = trie.root();
|
||||
assert_eq!(root, EMPTY_ROOT_HASH, "empty trie should have empty root hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_preserves_root_hash() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
for i in 0..8u8 {
|
||||
for j in 0..4u8 {
|
||||
trie.update_leaf(
|
||||
Nibbles::from_nibbles([i, j, 0x3, 0x4, 0x5, 0x6]),
|
||||
value.clone(),
|
||||
&provider,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let root_before = trie.root();
|
||||
trie.prune(1);
|
||||
let root_after = trie.root();
|
||||
assert_eq!(root_before, root_after, "root hash must be preserved after prune");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_single_leaf_trie() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
trie.update_leaf(Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]), value, &provider).unwrap();
|
||||
|
||||
let root_before = trie.root();
|
||||
let nodes_before = trie.revealed_node_count();
|
||||
|
||||
trie.prune(0);
|
||||
|
||||
let root_after = trie.root();
|
||||
assert_eq!(root_before, root_after, "root hash should be preserved");
|
||||
assert_eq!(trie.revealed_node_count(), nodes_before, "single leaf trie should not change");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_deep_depth_no_effect() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
for i in 0..4u8 {
|
||||
trie.update_leaf(Nibbles::from_nibbles([i, 0x2, 0x3, 0x4]), value.clone(), &provider)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
trie.root();
|
||||
let nodes_before = trie.revealed_node_count();
|
||||
|
||||
trie.prune(100);
|
||||
|
||||
assert_eq!(nodes_before, trie.revealed_node_count(), "deep prune should have no effect");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_extension_node_depth_semantics() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
trie.update_leaf(Nibbles::from_nibbles([0, 1, 2, 3, 0, 5, 6, 7]), value.clone(), &provider)
|
||||
.unwrap();
|
||||
trie.update_leaf(Nibbles::from_nibbles([0, 1, 2, 3, 1, 5, 6, 7]), value, &provider)
|
||||
.unwrap();
|
||||
|
||||
let root_before = trie.root();
|
||||
trie.prune(1);
|
||||
|
||||
assert_eq!(root_before, trie.root(), "root hash should be preserved");
|
||||
assert_eq!(trie.revealed_node_count(), 2, "should have root + extension after prune(1)");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_embedded_node_preserved() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let small_value = vec![0x80];
|
||||
trie.update_leaf(Nibbles::from_nibbles([0x0]), small_value.clone(), &provider).unwrap();
|
||||
trie.update_leaf(Nibbles::from_nibbles([0x1]), small_value, &provider).unwrap();
|
||||
|
||||
let root_before = trie.root();
|
||||
let nodes_before = trie.revealed_node_count();
|
||||
|
||||
trie.prune(0);
|
||||
|
||||
assert_eq!(root_before, trie.root(), "root hash must be preserved");
|
||||
|
||||
if trie.revealed_node_count() == nodes_before {
|
||||
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x0])).is_some());
|
||||
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x1])).is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_mixed_embedded_and_hashed() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let large_value = large_account_value();
|
||||
let small_value = vec![0x80];
|
||||
|
||||
for i in 0..8u8 {
|
||||
let value = if i < 4 { large_value.clone() } else { small_value.clone() };
|
||||
trie.update_leaf(Nibbles::from_nibbles([i, 0x1, 0x2, 0x3]), value, &provider).unwrap();
|
||||
}
|
||||
|
||||
let root_before = trie.root();
|
||||
trie.prune(0);
|
||||
assert_eq!(root_before, trie.root(), "root hash must be preserved");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_many_lower_subtries() {
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
|
||||
let large_value = large_account_value();
|
||||
|
||||
let mut keys = Vec::new();
|
||||
for first in 0..16u8 {
|
||||
for second in 0..16u8 {
|
||||
keys.push(Nibbles::from_nibbles([first, second, 0x1, 0x2, 0x3, 0x4]));
|
||||
}
|
||||
}
|
||||
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
for key in &keys {
|
||||
trie.update_leaf(*key, large_value.clone(), &provider).unwrap();
|
||||
}
|
||||
|
||||
let root_before = trie.root();
|
||||
let pruned = trie.prune(1);
|
||||
|
||||
assert!(pruned > 0, "should have pruned some nodes");
|
||||
assert_eq!(root_before, trie.root(), "root hash should be preserved");
|
||||
|
||||
for key in &keys {
|
||||
assert!(trie.get_leaf_value(key).is_none(), "value should be pruned");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "profiling test - run manually"]
|
||||
fn test_prune_profile() {
|
||||
use std::time::Instant;
|
||||
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let large_value = large_account_value();
|
||||
|
||||
// Generate 65536 keys (16^4) for a large trie
|
||||
let mut keys = Vec::with_capacity(65536);
|
||||
for a in 0..16u8 {
|
||||
for b in 0..16u8 {
|
||||
for c in 0..16u8 {
|
||||
for d in 0..16u8 {
|
||||
keys.push(Nibbles::from_nibbles([a, b, c, d, 0x5, 0x6, 0x7, 0x8]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build base trie once
|
||||
let mut base_trie = ParallelSparseTrie::default();
|
||||
for key in &keys {
|
||||
base_trie.update_leaf(*key, large_value.clone(), &provider).unwrap();
|
||||
}
|
||||
base_trie.root(); // ensure hashes computed
|
||||
|
||||
// Pre-clone tries to exclude clone time from profiling
|
||||
let iterations = 100;
|
||||
let mut tries: Vec<_> = (0..iterations).map(|_| base_trie.clone()).collect();
|
||||
|
||||
// Measure only prune()
|
||||
let mut total_pruned = 0;
|
||||
let start = Instant::now();
|
||||
for trie in &mut tries {
|
||||
total_pruned += trie.prune(2);
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
println!(
|
||||
"Prune benchmark: {} iterations, total: {:?}, avg: {:?}, pruned/iter: {}",
|
||||
iterations,
|
||||
elapsed,
|
||||
elapsed / iterations as u32,
|
||||
total_pruned / iterations
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_max_depth_overflow() {
|
||||
// Verify that max_depth > 255 is not truncated (was u8, now usize)
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
for i in 0..4u8 {
|
||||
trie.update_leaf(Nibbles::from_nibbles([i, 0x1, 0x2, 0x3]), value.clone(), &provider)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
trie.root();
|
||||
let nodes_before = trie.revealed_node_count();
|
||||
|
||||
// If depth were truncated to u8, 300 would become 44 and might prune something
|
||||
trie.prune(300);
|
||||
|
||||
assert_eq!(
|
||||
nodes_before,
|
||||
trie.revealed_node_count(),
|
||||
"prune(300) should have no effect on a shallow trie"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_fast_path_case2_update_after() {
|
||||
// Test fast-path Case 2: upper prune root is prefix of lower subtrie.
|
||||
// After pruning, we should be able to update leaves without panic.
|
||||
let provider = DefaultTrieNodeProvider;
|
||||
let mut trie = ParallelSparseTrie::default();
|
||||
|
||||
let value = large_account_value();
|
||||
|
||||
// Create keys that span into lower subtries (path.len() >= UPPER_TRIE_MAX_DEPTH)
|
||||
// UPPER_TRIE_MAX_DEPTH is typically 2, so paths of length 3+ go to lower subtries
|
||||
for first in 0..4u8 {
|
||||
for second in 0..4u8 {
|
||||
trie.update_leaf(
|
||||
Nibbles::from_nibbles([first, second, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]),
|
||||
value.clone(),
|
||||
&provider,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let root_before = trie.root();
|
||||
|
||||
// Prune at depth 0 - upper roots become prefixes of lower subtrie paths
|
||||
trie.prune(0);
|
||||
|
||||
let root_after = trie.root();
|
||||
assert_eq!(root_before, root_after, "root hash should be preserved");
|
||||
|
||||
// Now try to update a leaf - this should not panic even though lower subtries
|
||||
// were replaced with Blind(None)
|
||||
let new_path = Nibbles::from_nibbles([0x5, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]);
|
||||
trie.update_leaf(new_path, value, &provider).unwrap();
|
||||
|
||||
// The trie should still be functional
|
||||
let _ = trie.root();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,6 @@
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
/// Default depth to prune sparse tries to for cross-payload caching.
|
||||
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
|
||||
/// Default number of storage tries to preserve across payload validations.
|
||||
pub const DEFAULT_MAX_PRESERVED_STORAGE_TRIES: usize = 100;
|
||||
|
||||
mod state;
|
||||
pub use state::*;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
traits::{SparseTrie as SparseTrieTrait, SparseTrieExt},
|
||||
traits::SparseTrie as SparseTrieTrait,
|
||||
RevealableSparseTrie, SerialSparseTrie,
|
||||
};
|
||||
use alloc::{collections::VecDeque, vec::Vec};
|
||||
@@ -972,117 +972,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, S> SparseStateTrie<A, S>
|
||||
where
|
||||
A: SparseTrieTrait + SparseTrieExt + Default,
|
||||
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
|
||||
{
|
||||
/// Minimum number of storage tries before parallel pruning is enabled.
|
||||
const PARALLEL_PRUNE_THRESHOLD: usize = 16;
|
||||
|
||||
/// Returns true if parallelism should be enabled for pruning the given number of tries.
|
||||
/// Will always return false in `no_std` builds.
|
||||
const fn is_prune_parallelism_enabled(num_tries: usize) -> bool {
|
||||
#[cfg(not(feature = "std"))]
|
||||
return false;
|
||||
|
||||
num_tries >= Self::PARALLEL_PRUNE_THRESHOLD
|
||||
}
|
||||
|
||||
/// Prunes the account trie and selected storage tries to reduce memory usage.
|
||||
///
|
||||
/// Storage tries not in the top `max_storage_tries` by revealed node count are cleared
|
||||
/// entirely.
|
||||
///
|
||||
/// # Preconditions
|
||||
///
|
||||
/// Node hashes must be computed via `root()` before calling this method. Otherwise, nodes
|
||||
/// cannot be converted to hash stubs and pruning will have no effect.
|
||||
///
|
||||
/// # Effects
|
||||
///
|
||||
/// - Clears `revealed_account_paths` and `revealed_paths` for all storage tries
|
||||
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
|
||||
if let Some(trie) = self.state.as_revealed_mut() {
|
||||
trie.prune(max_depth);
|
||||
}
|
||||
self.revealed_account_paths.clear();
|
||||
|
||||
let mut storage_trie_counts: Vec<(B256, usize)> = self
|
||||
.storage
|
||||
.tries
|
||||
.iter()
|
||||
.map(|(hash, trie)| {
|
||||
let count = match trie {
|
||||
RevealableSparseTrie::Revealed(t) => t.revealed_node_count(),
|
||||
RevealableSparseTrie::Blind(_) => 0,
|
||||
};
|
||||
(*hash, count)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Use O(n) selection instead of O(n log n) sort
|
||||
let tries_to_keep: HashSet<B256> = if storage_trie_counts.len() <= max_storage_tries {
|
||||
storage_trie_counts.iter().map(|(hash, _)| *hash).collect()
|
||||
} else {
|
||||
storage_trie_counts
|
||||
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.1.cmp(&a.1));
|
||||
storage_trie_counts[..max_storage_tries].iter().map(|(hash, _)| *hash).collect()
|
||||
};
|
||||
|
||||
// Collect keys to avoid borrow conflict
|
||||
let tries_to_clear: Vec<B256> = self
|
||||
.storage
|
||||
.tries
|
||||
.keys()
|
||||
.filter(|hash| !tries_to_keep.contains(*hash))
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
|
||||
for hash in tries_to_clear {
|
||||
if let Some(trie) = self.storage.tries.remove(&hash) {
|
||||
self.storage.cleared_tries.push(trie.clear());
|
||||
}
|
||||
if let Some(mut paths) = self.storage.revealed_paths.remove(&hash) {
|
||||
paths.clear();
|
||||
self.storage.cleared_revealed_paths.push(paths);
|
||||
}
|
||||
}
|
||||
|
||||
// Prune storage tries that are kept
|
||||
if Self::is_prune_parallelism_enabled(tries_to_keep.len()) {
|
||||
#[cfg(feature = "std")]
|
||||
{
|
||||
use rayon::prelude::*;
|
||||
|
||||
self.storage.tries.par_iter_mut().for_each(|(hash, trie)| {
|
||||
if tries_to_keep.contains(hash) &&
|
||||
let Some(t) = trie.as_revealed_mut()
|
||||
{
|
||||
t.prune(max_depth);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
for hash in &tries_to_keep {
|
||||
if let Some(trie) =
|
||||
self.storage.tries.get_mut(hash).and_then(|t| t.as_revealed_mut())
|
||||
{
|
||||
trie.prune(max_depth);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear revealed_paths for kept tries
|
||||
for hash in &tries_to_keep {
|
||||
if let Some(paths) = self.storage.revealed_paths.get_mut(hash) {
|
||||
paths.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The fields of [`SparseStateTrie`] related to storage tries. This is kept separate from the rest
|
||||
/// of [`SparseStateTrie`] both to help enforce allocation re-use and to allow us to implement
|
||||
/// methods like `get_trie_and_revealed_paths` which return multiple mutable borrows.
|
||||
@@ -1371,7 +1260,7 @@ mod tests {
|
||||
use reth_trie::{updates::StorageTrieUpdates, HashBuilder, MultiProof, EMPTY_ROOT_HASH};
|
||||
use reth_trie_common::{
|
||||
proof::{ProofNodes, ProofRetainer},
|
||||
BranchNode, BranchNodeMasks, BranchNodeMasksMap, LeafNode, StorageMultiProof, TrieMask,
|
||||
BranchNode, BranchNodeMasks, LeafNode, StorageMultiProof, TrieMask,
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -232,36 +232,6 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
|
||||
fn shrink_values_to(&mut self, size: usize);
|
||||
}
|
||||
|
||||
/// Extension trait for sparse tries that support pruning.
|
||||
///
|
||||
/// This trait provides the `prune` method for sparse trie implementations that support
|
||||
/// converting nodes beyond a certain depth into hash stubs. This is useful for reducing
|
||||
/// memory usage when caching tries across payload validations.
|
||||
pub trait SparseTrieExt: SparseTrie {
|
||||
/// Returns the number of revealed (non-Hash) nodes in the trie.
|
||||
fn revealed_node_count(&self) -> usize;
|
||||
|
||||
/// Replaces nodes beyond `max_depth` with hash stubs and removes their descendants.
|
||||
///
|
||||
/// Depth counts nodes traversed (not nibbles), so extension nodes count as 1 depth
|
||||
/// regardless of key length. `max_depth == 0` prunes all children of the root node.
|
||||
///
|
||||
/// # Preconditions
|
||||
///
|
||||
/// Must be called after `root()` to ensure all nodes have computed hashes.
|
||||
/// Calling on a trie without computed hashes will result in no pruning.
|
||||
///
|
||||
/// # Behavior
|
||||
///
|
||||
/// - Embedded nodes (RLP < 32 bytes) are preserved since they have no hash
|
||||
/// - Returns 0 if `max_depth` exceeds trie depth or trie is empty
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The number of nodes converted to hash stubs.
|
||||
fn prune(&mut self, max_depth: usize) -> usize;
|
||||
}
|
||||
|
||||
/// Tracks modifications to the sparse trie structure.
|
||||
///
|
||||
/// Maintains references to both modified and pruned/removed branches, enabling
|
||||
|
||||
@@ -79,7 +79,7 @@ impl HashedPostStateCursorValue for U256 {
|
||||
type NonZero = Self;
|
||||
|
||||
fn into_option(self) -> Option<Self::NonZero> {
|
||||
(!self.is_zero()).then_some(self)
|
||||
(self != Self::ZERO).then_some(self)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +351,7 @@ where
|
||||
/// [`HashedCursor::next`].
|
||||
fn is_storage_empty(&mut self) -> Result<bool, DatabaseError> {
|
||||
// Storage is not empty if it has non-zero slots.
|
||||
if self.post_state_cursor.has_any(|(_, value)| !value.is_zero()) {
|
||||
if self.post_state_cursor.has_any(|(_, value)| value.into_option().is_some()) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
|
||||
@@ -367,7 +367,7 @@ where
|
||||
|
||||
let target_nibbles = targets.into_iter().map(Nibbles::unpack).collect::<Vec<_>>();
|
||||
let mut prefix_set = self.prefix_set;
|
||||
prefix_set.extend_keys(target_nibbles.iter().copied());
|
||||
prefix_set.extend_keys(target_nibbles.clone());
|
||||
|
||||
let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?;
|
||||
|
||||
|
||||
@@ -109,20 +109,39 @@ where
|
||||
T: TrieCursorFactory,
|
||||
H: HashedCursorFactory,
|
||||
{
|
||||
// Synchronously computes the storage root for this account and RLP-encodes the resulting
|
||||
// `TrieAccount` into `buf`
|
||||
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
|
||||
// Create cursors for storage proof calculation
|
||||
let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?;
|
||||
let hashed_cursor =
|
||||
self.hashed_cursor_factory.hashed_storage_cursor(self.hashed_address)?;
|
||||
|
||||
// Create storage proof calculator with StorageValueEncoder
|
||||
let mut storage_proof_calculator = ProofCalculator::new_storage(trie_cursor, hashed_cursor);
|
||||
|
||||
let proof = storage_proof_calculator
|
||||
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])?;
|
||||
// Compute storage root by calling storage_proof with the root path as a target.
|
||||
// This returns just the root node of the storage trie.
|
||||
let storage_root = storage_proof_calculator
|
||||
.compute_root_hash(&proof)?
|
||||
.expect("storage_proof with dummy target always returns root");
|
||||
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])
|
||||
.map(|nodes| {
|
||||
// Encode the root node to RLP and hash it
|
||||
let root_node =
|
||||
nodes.first().expect("storage_proof always returns at least the root");
|
||||
root_node.node.encode(buf);
|
||||
|
||||
let storage_root = alloy_primitives::keccak256(buf.as_slice());
|
||||
|
||||
// Clear the buffer so we can re-use it to encode the TrieAccount
|
||||
buf.clear();
|
||||
|
||||
storage_root
|
||||
})?;
|
||||
|
||||
// Combine account with storage root to create TrieAccount
|
||||
let trie_account = self.account.into_trie_account(storage_root);
|
||||
|
||||
// Encode the trie account
|
||||
trie_account.encode(buf);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -60,9 +60,8 @@ impl CursorSubNode {
|
||||
let position = node.as_ref().filter(|n| n.root_hash.is_none()).map_or(
|
||||
SubNodePosition::ParentBranch,
|
||||
|n| {
|
||||
let mut child_index_range = CHILD_INDEX_RANGE;
|
||||
SubNodePosition::Child(
|
||||
child_index_range.find(|i| n.state_mask.is_bit_set(*i)).unwrap(),
|
||||
CHILD_INDEX_RANGE.clone().find(|i| n.state_mask.is_bit_set(*i)).unwrap(),
|
||||
)
|
||||
},
|
||||
);
|
||||
|
||||
@@ -115,10 +115,9 @@ where
|
||||
} else {
|
||||
self.get_proof_targets(&state)?
|
||||
};
|
||||
let prefix_sets = core::mem::take(&mut self.prefix_sets);
|
||||
let multiproof =
|
||||
Proof::new(self.trie_cursor_factory.clone(), self.hashed_cursor_factory.clone())
|
||||
.with_prefix_sets_mut(prefix_sets)
|
||||
.with_prefix_sets_mut(self.prefix_sets.clone())
|
||||
.multiproof(proof_targets.clone())?;
|
||||
|
||||
// No need to reconstruct the rest of the trie, we just need to include
|
||||
|
||||
@@ -184,7 +184,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -192,7 +192,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -919,7 +919,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -927,7 +927,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -1001,8 +1001,8 @@ Engine:
|
||||
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
|
||||
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
|
||||
|
||||
--engine.disable-proof-v2
|
||||
Disable V2 storage proofs for state root calculations
|
||||
--engine.enable-proof-v2
|
||||
Enable V2 storage proofs for state root calculations
|
||||
|
||||
--engine.disable-cache-metrics
|
||||
Disable cache metrics recording, which can take up to 50ms with large cached state
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -175,7 +175,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -183,7 +183,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -173,7 +173,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -181,7 +181,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -184,7 +184,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -192,7 +192,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -919,7 +919,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -927,7 +927,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -1001,8 +1001,8 @@ Engine:
|
||||
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
|
||||
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
|
||||
|
||||
--engine.disable-proof-v2
|
||||
Disable V2 storage proofs for state root calculations
|
||||
--engine.enable-proof-v2
|
||||
Enable V2 storage proofs for state root calculations
|
||||
|
||||
--engine.disable-cache-metrics
|
||||
Disable cache metrics recording, which can take up to 50ms with large cached state
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -175,7 +175,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -183,7 +183,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -168,7 +168,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -176,7 +176,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
@@ -173,7 +173,7 @@ RocksDB:
|
||||
--rocksdb.storages-history <STORAGES_HISTORY>
|
||||
Route storages history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
@@ -181,7 +181,7 @@ RocksDB:
|
||||
--rocksdb.account-history <ACCOUNT_HISTORY>
|
||||
Route account history tables to `RocksDB` instead of MDBX.
|
||||
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `false`.
|
||||
This is a genesis-initialization-only flag: changing it after genesis requires a re-sync. Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
|
||||
|
||||
[default: false]
|
||||
[possible values: true, false]
|
||||
|
||||
Reference in New Issue
Block a user