mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(rocksdb): add missing observability (#21253)
This commit is contained in:
@@ -507,13 +507,13 @@ impl Drop for RocksDBProviderInner {
|
||||
// Flush all memtables if possible. If not, they will be rebuilt from the WAL on
|
||||
// restart
|
||||
if let Err(e) = db.flush_wal(true) {
|
||||
tracing::warn!(target: "storage::rocksdb", ?e, "Failed to flush WAL on drop");
|
||||
tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
|
||||
}
|
||||
for cf_name in ROCKSDB_TABLES {
|
||||
if let Some(cf) = db.cf_handle(cf_name) &&
|
||||
let Err(e) = db.flush_cf(&cf)
|
||||
{
|
||||
tracing::warn!(target: "storage::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
|
||||
tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
|
||||
}
|
||||
}
|
||||
db.cancel_all_background_work(true);
|
||||
@@ -821,6 +821,7 @@ impl RocksDBProvider {
|
||||
/// (i.e., removes the minimum block and all higher blocks).
|
||||
///
|
||||
/// Returns a `WriteBatchWithTransaction` that can be committed later.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
pub fn unwind_account_history_indices(
|
||||
&self,
|
||||
last_indices: &[(Address, BlockNumber)],
|
||||
@@ -846,6 +847,7 @@ impl RocksDBProvider {
|
||||
}
|
||||
|
||||
/// Writes a batch of operations atomically.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
|
||||
where
|
||||
F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
|
||||
@@ -864,6 +866,7 @@ impl RocksDBProvider {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the provider is in read-only mode.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
|
||||
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
|
||||
self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
@@ -878,7 +881,7 @@ impl RocksDBProvider {
|
||||
/// This handles transaction hash numbers, account history, and storage history based on
|
||||
/// the provided storage settings. Each operation runs in parallel with its own batch,
|
||||
/// pushing to `ctx.pending_batches` for later commit.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
|
||||
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -919,7 +922,7 @@ impl RocksDBProvider {
|
||||
}
|
||||
|
||||
/// Writes transaction hash to number mappings for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -940,7 +943,7 @@ impl RocksDBProvider {
|
||||
}
|
||||
|
||||
/// Writes account history indices for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_account_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -965,7 +968,7 @@ impl RocksDBProvider {
|
||||
}
|
||||
|
||||
/// Writes storage history indices for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_storage_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -1050,6 +1053,7 @@ impl<'a> RocksDBBatch<'a> {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the provider is in read-only mode.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
@@ -1398,6 +1402,7 @@ impl<'db> RocksTx<'db> {
|
||||
}
|
||||
|
||||
/// Commits the transaction, persisting all changes.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
self.inner.commit().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
@@ -1408,6 +1413,7 @@ impl<'db> RocksTx<'db> {
|
||||
}
|
||||
|
||||
/// Rolls back the transaction, discarding all changes.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
pub fn rollback(self) -> ProviderResult<()> {
|
||||
self.inner.rollback().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
|
||||
|
||||
@@ -528,7 +528,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
|
||||
/// Writes headers for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
fn write_headers(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -541,7 +541,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
|
||||
/// Writes transactions for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
fn write_transactions(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -558,7 +558,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
|
||||
/// Writes transaction senders for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
fn write_transaction_senders(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -575,7 +575,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
|
||||
/// Writes receipts for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
fn write_receipts(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -602,7 +602,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
|
||||
/// Writes account changesets for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
fn write_account_changesets(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
@@ -647,7 +647,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
///
|
||||
/// This spawns separate threads for each segment type and each thread calls `sync_all()` on its
|
||||
/// writer when done.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
pub fn write_blocks_data(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
|
||||
Reference in New Issue
Block a user