fix(rocksdb): flush all column families on drop and show SST/memtable sizes (#21251)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
joshieDo
2026-01-21 12:44:08 +00:00
committed by GitHub
parent 3065a328f9
commit 04d4c9a02f
4 changed files with 35 additions and 12 deletions

View File

@@ -159,18 +159,31 @@ impl Command {
fn rocksdb_stats_table<N: NodeTypesWithDB>(&self, tool: &DbTool<N>) -> ComfyTable {
let mut table = ComfyTable::new();
table.load_preset(comfy_table::presets::ASCII_MARKDOWN);
table.set_header(["RocksDB Table Name", "# Entries", "Total Size", "Pending Compaction"]);
table.set_header([
"RocksDB Table Name",
"# Entries",
"SST Size",
"Memtable Size",
"Total Size",
"Pending Compaction",
]);
let stats = tool.provider_factory.rocksdb_provider().table_stats();
let mut total_sst: u64 = 0;
let mut total_memtable: u64 = 0;
let mut total_size: u64 = 0;
let mut total_pending: u64 = 0;
for stat in &stats {
total_sst += stat.sst_size_bytes;
total_memtable += stat.memtable_size_bytes;
total_size += stat.estimated_size_bytes;
total_pending += stat.pending_compaction_bytes;
let mut row = Row::new();
row.add_cell(Cell::new(&stat.name))
.add_cell(Cell::new(stat.estimated_num_keys))
.add_cell(Cell::new(human_bytes(stat.sst_size_bytes as f64)))
.add_cell(Cell::new(human_bytes(stat.memtable_size_bytes as f64)))
.add_cell(Cell::new(human_bytes(stat.estimated_size_bytes as f64)))
.add_cell(Cell::new(human_bytes(stat.pending_compaction_bytes as f64)));
table.add_row(row);
@@ -187,6 +200,8 @@ impl Command {
let mut row = Row::new();
row.add_cell(Cell::new("RocksDB Total"))
.add_cell(Cell::new(""))
.add_cell(Cell::new(human_bytes(total_sst as f64)))
.add_cell(Cell::new(human_bytes(total_memtable as f64)))
.add_cell(Cell::new(human_bytes(total_size as f64)))
.add_cell(Cell::new(human_bytes(total_pending as f64)));
table.add_row(row);

View File

@@ -6,7 +6,7 @@ use reth_db::Tables;
use reth_metrics::Metrics;
use strum::{EnumIter, IntoEnumIterator};
const ROCKSDB_TABLES: &[&str] = &[
pub(super) const ROCKSDB_TABLES: &[&str] = &[
Tables::TransactionHashNumbers.name(),
Tables::StoragesHistory.name(),
Tables::AccountsHistory.name(),

View File

@@ -1,4 +1,4 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
@@ -41,6 +41,10 @@ pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<
/// Statistics for a single `RocksDB` table (column family).
#[derive(Debug, Clone)]
pub struct RocksDBTableStats {
/// Size of SST files on disk in bytes.
pub sst_size_bytes: u64,
/// Size of memtables in memory in bytes.
pub memtable_size_bytes: u64,
/// Name of the table/column family.
pub name: String,
/// Estimated number of keys in the table.
@@ -421,17 +425,11 @@ impl RocksDBProviderInner {
/// Returns statistics for all column families in the database.
fn table_stats(&self) -> Vec<RocksDBTableStats> {
let cf_names = [
tables::TransactionHashNumbers::NAME,
tables::AccountsHistory::NAME,
tables::StoragesHistory::NAME,
];
let mut stats = Vec::new();
macro_rules! collect_stats {
($db:expr) => {
for cf_name in cf_names {
for cf_name in ROCKSDB_TABLES {
if let Some(cf) = $db.cf_handle(cf_name) {
let estimated_num_keys = $db
.property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
@@ -464,6 +462,8 @@ impl RocksDBProviderInner {
.unwrap_or(0);
stats.push(RocksDBTableStats {
sst_size_bytes: sst_size,
memtable_size_bytes: memtable_size,
name: cf_name.to_string(),
estimated_num_keys,
estimated_size_bytes,
@@ -509,8 +509,12 @@ impl Drop for RocksDBProviderInner {
if let Err(e) = db.flush_wal(true) {
tracing::warn!(target: "storage::rocksdb", ?e, "Failed to flush WAL on drop");
}
if let Err(e) = db.flush() {
tracing::warn!(target: "storage::rocksdb", ?e, "Failed to flush memtables on drop");
for cf_name in ROCKSDB_TABLES {
if let Some(cf) = db.cf_handle(cf_name) &&
let Err(e) = db.flush_cf(&cf)
{
tracing::warn!(target: "storage::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
}
}
db.cancel_all_background_work(true);
}

View File

@@ -17,6 +17,10 @@ pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Statistics for a single `RocksDB` table (column family) - stub.
#[derive(Debug, Clone)]
pub struct RocksDBTableStats {
/// Size of SST files on disk in bytes.
pub sst_size_bytes: u64,
/// Size of memtables in memory in bytes.
pub memtable_size_bytes: u64,
/// Name of the table/column family.
pub name: String,
/// Estimated number of keys in the table.