mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
refactor(rocksdb): deduplicate iterator next() implementations (#21737)
This commit is contained in:
@@ -40,6 +40,9 @@ use tracing::instrument;
|
||||
/// Pending `RocksDB` batches type alias.
|
||||
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
|
||||
|
||||
/// Raw key-value result from a `RocksDB` iterator.
|
||||
type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
|
||||
|
||||
/// Statistics for a single `RocksDB` table (column family).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBTableStats {
|
||||
@@ -2147,29 +2150,7 @@ impl<T: Table> Iterator for RocksDBIter<'_, T> {
|
||||
type Item = ProviderResult<(T::Key, T::Value)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (key_bytes, value_bytes) = match self.inner.next()? {
|
||||
Ok(kv) => kv,
|
||||
Err(e) => {
|
||||
return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))))
|
||||
}
|
||||
};
|
||||
|
||||
// Decode key
|
||||
let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
|
||||
Ok(k) => k,
|
||||
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
|
||||
};
|
||||
|
||||
// Decompress value
|
||||
let value = match T::Value::decompress(&value_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
|
||||
};
|
||||
|
||||
Some(Ok((key, value)))
|
||||
Some(decode_iter_item::<T>(self.inner.next()?))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2218,32 +2199,31 @@ impl<T: Table> Iterator for RocksTxIter<'_, T> {
|
||||
type Item = ProviderResult<(T::Key, T::Value)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (key_bytes, value_bytes) = match self.inner.next()? {
|
||||
Ok(kv) => kv,
|
||||
Err(e) => {
|
||||
return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))))
|
||||
}
|
||||
};
|
||||
|
||||
// Decode key
|
||||
let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
|
||||
Ok(k) => k,
|
||||
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
|
||||
};
|
||||
|
||||
// Decompress value
|
||||
let value = match T::Value::decompress(&value_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
|
||||
};
|
||||
|
||||
Some(Ok((key, value)))
|
||||
Some(decode_iter_item::<T>(self.inner.next()?))
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
|
||||
///
|
||||
/// Handles both error propagation from the underlying iterator and
|
||||
/// decoding/decompression of the key and value bytes.
|
||||
fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
|
||||
let (key_bytes, value_bytes) = result.map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
|
||||
let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
let value = T::Value::decompress(&value_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
Ok((key, value))
|
||||
}
|
||||
|
||||
/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
|
||||
const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
|
||||
match level {
|
||||
|
||||
Reference in New Issue
Block a user