mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(cli): add reth db checksum rocksdb command (#21217)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
9662dc5271
commit
ff8f434dcd
@@ -131,4 +131,4 @@ arbitrary = [
|
||||
"reth-ethereum-primitives/arbitrary",
|
||||
]
|
||||
|
||||
edge = ["reth-db-common/edge", "reth-stages/rocksdb"]
|
||||
edge = ["reth-db-common/edge", "reth-stages/rocksdb", "reth-provider/rocksdb"]
|
||||
|
||||
@@ -22,6 +22,9 @@ use std::{
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
#[cfg(all(unix, feature = "edge"))]
|
||||
mod rocksdb;
|
||||
|
||||
/// Interval for logging progress during checksum computation.
|
||||
const PROGRESS_LOG_INTERVAL: usize = 100_000;
|
||||
|
||||
@@ -70,6 +73,17 @@ enum Subcommand {
|
||||
#[arg(long)]
|
||||
limit: Option<usize>,
|
||||
},
|
||||
/// Calculates the checksum of a RocksDB table
|
||||
#[cfg(all(unix, feature = "edge"))]
|
||||
Rocksdb {
|
||||
/// The RocksDB table
|
||||
#[arg(value_enum)]
|
||||
table: rocksdb::RocksDbTable,
|
||||
|
||||
/// The maximum number of records to checksum.
|
||||
#[arg(long)]
|
||||
limit: Option<usize>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -87,6 +101,10 @@ impl Command {
|
||||
Subcommand::StaticFile { segment, start_block, end_block, limit } => {
|
||||
checksum_static_file(tool, segment, start_block, end_block, limit)?;
|
||||
}
|
||||
#[cfg(all(unix, feature = "edge"))]
|
||||
Subcommand::Rocksdb { table, limit } => {
|
||||
rocksdb::checksum_rocksdb(tool, table, limit)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
106
crates/cli/commands/src/db/checksum/rocksdb.rs
Normal file
106
crates/cli/commands/src/db/checksum/rocksdb.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
//! RocksDB checksum implementation.
|
||||
|
||||
use super::{checksum_hasher, PROGRESS_LOG_INTERVAL};
|
||||
use crate::common::CliNodeTypes;
|
||||
use clap::ValueEnum;
|
||||
use reth_chainspec::EthereumHardforks;
|
||||
use reth_db::{tables, DatabaseEnv};
|
||||
use reth_db_api::table::Table;
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDBAdapter;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use std::{hash::Hasher, sync::Arc, time::Instant};
|
||||
use tracing::info;
|
||||
|
||||
/// RocksDB tables that can be checksummed.
|
||||
#[derive(Debug, Clone, Copy, ValueEnum)]
|
||||
pub enum RocksDbTable {
|
||||
/// Transaction hash to transaction number mapping
|
||||
TransactionHashNumbers,
|
||||
/// Account history indices
|
||||
AccountsHistory,
|
||||
/// Storage history indices
|
||||
StoragesHistory,
|
||||
}
|
||||
|
||||
impl RocksDbTable {
|
||||
/// Returns the table name as a string
|
||||
const fn name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::TransactionHashNumbers => tables::TransactionHashNumbers::NAME,
|
||||
Self::AccountsHistory => tables::AccountsHistory::NAME,
|
||||
Self::StoragesHistory => tables::StoragesHistory::NAME,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes a checksum for a RocksDB table.
|
||||
pub fn checksum_rocksdb<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
|
||||
tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
table: RocksDbTable,
|
||||
limit: Option<usize>,
|
||||
) -> eyre::Result<()> {
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
let start_time = Instant::now();
|
||||
let limit = limit.unwrap_or(usize::MAX);
|
||||
|
||||
info!(
|
||||
"Computing checksum for RocksDB table `{}`, limit={:?}",
|
||||
table.name(),
|
||||
if limit == usize::MAX { None } else { Some(limit) }
|
||||
);
|
||||
|
||||
let (checksum, total) = match table {
|
||||
RocksDbTable::TransactionHashNumbers => {
|
||||
checksum_rocksdb_table::<tables::TransactionHashNumbers>(&rocksdb, limit)?
|
||||
}
|
||||
RocksDbTable::AccountsHistory => {
|
||||
checksum_rocksdb_table::<tables::AccountsHistory>(&rocksdb, limit)?
|
||||
}
|
||||
RocksDbTable::StoragesHistory => {
|
||||
checksum_rocksdb_table::<tables::StoragesHistory>(&rocksdb, limit)?
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
|
||||
info!(
|
||||
"Checksum for RocksDB table `{}`: {:#x} ({} entries, elapsed: {:?})",
|
||||
table.name(),
|
||||
checksum,
|
||||
total,
|
||||
elapsed
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Computes checksum for a specific RocksDB table by iterating over rows.
|
||||
fn checksum_rocksdb_table<T: Table>(
|
||||
rocksdb: &reth_provider::providers::RocksDBProvider,
|
||||
limit: usize,
|
||||
) -> eyre::Result<(u64, usize)> {
|
||||
let iter = rocksdb.raw_iter::<T>()?;
|
||||
let mut hasher = checksum_hasher();
|
||||
let mut total = 0usize;
|
||||
|
||||
for entry in iter {
|
||||
let (key_bytes, value_bytes) = entry?;
|
||||
|
||||
hasher.write(&key_bytes);
|
||||
hasher.write(&value_bytes);
|
||||
|
||||
total += 1;
|
||||
|
||||
if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
|
||||
info!("Hashed {total} entries.");
|
||||
}
|
||||
|
||||
if total >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((hasher.finish(), total))
|
||||
}
|
||||
@@ -38,7 +38,7 @@ pub use consistent::ConsistentProvider;
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
|
||||
pub(crate) mod rocksdb;
|
||||
|
||||
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
|
||||
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksTx};
|
||||
|
||||
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
|
||||
/// [`ProviderNodeTypes`].
|
||||
|
||||
@@ -5,4 +5,4 @@ mod metrics;
|
||||
mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
|
||||
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksTx};
|
||||
|
||||
@@ -656,6 +656,15 @@ impl RocksDBProvider {
|
||||
Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
|
||||
}
|
||||
|
||||
/// Creates a raw iterator over all entries in the specified table.
|
||||
///
|
||||
/// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
|
||||
pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
|
||||
let cf = self.get_cf_handle::<T>()?;
|
||||
let iter = self.0.iterator_cf(cf, IteratorMode::Start);
|
||||
Ok(RocksDBRawIter { inner: iter })
|
||||
}
|
||||
|
||||
/// Returns all account history shards for the given address in ascending key order.
|
||||
///
|
||||
/// This is used for unwind operations where we need to scan all shards for an address
|
||||
@@ -1517,6 +1526,33 @@ impl<T: Table> Iterator for RocksDBIter<'_, T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Raw iterator over a `RocksDB` table (non-transactional).
|
||||
///
|
||||
/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
|
||||
pub struct RocksDBRawIter<'db> {
|
||||
inner: RocksDBIterEnum<'db>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksDBRawIter<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for RocksDBRawIter<'_> {
|
||||
type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self.inner.next()? {
|
||||
Ok(kv) => Some(Ok(kv)),
|
||||
Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
})))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over a `RocksDB` table within a transaction.
|
||||
///
|
||||
/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
|
||||
|
||||
@@ -116,3 +116,7 @@ impl RocksDBBuilder {
|
||||
/// A stub transaction for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTx;
|
||||
|
||||
/// A stub raw iterator for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBRawIter;
|
||||
|
||||
Reference in New Issue
Block a user