From 7c18ba8ed3282fc27d3801f265dac5253a277340 Mon Sep 17 00:00:00 2001 From: rakita Date: Wed, 5 Apr 2023 14:46:10 +0200 Subject: [PATCH] perf(db): RawTable, decode/encode only if needed (#2081) --- crates/stages/src/stages/hashing_account.rs | 18 +-- crates/stages/src/stages/sender_recovery.rs | 17 ++- crates/storage/db/src/abstraction/table.rs | 2 +- crates/storage/db/src/tables/mod.rs | 3 + crates/storage/db/src/tables/raw.rs | 117 ++++++++++++++++++++ 5 files changed, 143 insertions(+), 14 deletions(-) create mode 100644 crates/storage/db/src/tables/raw.rs diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 0546ecac36..8415e0fc4c 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -7,6 +7,7 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, + RawKey, RawTable, }; use reth_primitives::{keccak256, AccountHashingCheckpoint}; use reth_provider::Transaction; @@ -197,14 +198,15 @@ impl Stage for AccountHashingStage { self.save_checkpoint(tx, checkpoint)?; } - let start_address = checkpoint.address.take(); + let start_address = checkpoint.address.take().map(RawKey::new); let next_address = { - let mut accounts_cursor = tx.cursor_read::()?; + let mut accounts_cursor = + tx.cursor_read::>()?; // channels used to return result of account hashing let mut channels = Vec::new(); for chunk in &accounts_cursor - .walk(start_address)? + .walk(start_address.clone())? .take(self.commit_threshold as usize) .chunks(self.commit_threshold as usize / rayon::current_num_threads()) { @@ -216,7 +218,8 @@ impl Stage for AccountHashingStage { // Spawn the hashing task onto the global rayon pool rayon::spawn(move || { for (address, account) in chunk.into_iter() { - let _ = tx.send((keccak256(address), account)); + let address = address.key().unwrap(); + let _ = tx.send((RawKey::new(keccak256(address)), account)); } }); } @@ -228,11 +231,11 @@ impl Stage for AccountHashingStage { hashed_batch.push(hashed); } } - // sort it all in parallel hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); - let mut hashed_account_cursor = tx.cursor_write::()?; + let mut hashed_account_cursor = + tx.cursor_write::>()?; // iterate and put presorted hashed accounts if start_address.is_none() { @@ -244,13 +247,12 @@ impl Stage for AccountHashingStage { .into_iter() .try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?; } - // next key of iterator accounts_cursor.next()? }; if let Some((next_address, _)) = &next_address { - checkpoint.address = Some(*next_address); + checkpoint.address = Some(next_address.key().unwrap()); checkpoint.from = from_transition; checkpoint.to = to_transition; } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 4896bc08c2..dcf4b4a573 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -8,6 +8,7 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, + RawKey, RawTable, RawValue, }; use reth_primitives::{TransactionSigned, TxNumber, H160}; use reth_provider::Transaction; @@ -31,7 +32,7 @@ pub struct SenderRecoveryStage { impl Default for SenderRecoveryStage { fn default() -> Self { - Self { commit_threshold: 10000 } + Self { commit_threshold: 500_000 } } } @@ -72,9 +73,10 @@ impl Stage for SenderRecoveryStage { let mut senders_cursor = tx.cursor_write::()?; // Acquire the cursor over the transactions - let mut tx_cursor = tx.cursor_read::()?; + let mut tx_cursor = tx.cursor_read::>()?; // Walk the transactions from start to end index (inclusive) - let tx_walker = tx_cursor.walk_range(first_tx_num..=last_tx_num)?; + let tx_walker = + tx_cursor.walk_range(RawKey::new(first_tx_num)..=RawKey::new(last_tx_num))?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders"); @@ -98,8 +100,14 @@ impl Stage for SenderRecoveryStage { let chunk: Vec<_> = chunk.collect(); // closure that would recover signer. Used as utility to wrap result - let recover = |entry: Result<(TxNumber, TransactionSigned), reth_db::Error>| -> Result<(u64, H160), Box> { + let recover = |entry: Result< + (RawKey, RawValue), + reth_db::Error, + >| + -> Result<(u64, H160), Box> { let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?; + let tx_id = tx_id.key().expect("key to be formated"); + let transaction = transaction.value().expect("value to be formated"); let sender = transaction.recover_signer().ok_or(StageError::from( SenderRecoveryStageError::SenderRecovery { tx: tx_id }, ))?; @@ -115,7 +123,6 @@ impl Stage for SenderRecoveryStage { } }); } - // Iterate over channels and append the sender in the order that they are received. for mut channel in channels { while let Some(recovered) = channel.recv().await { diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index 34458bef6e..68421967f0 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -37,7 +37,7 @@ pub trait Encode: Send + Sync + Sized + Debug { /// Trait that will transform the data to be read from the DB. pub trait Decode: Send + Sync + Sized + Debug { /// Decodes data coming from the database. - fn decode>(value: B) -> Result; + fn decode>(key: B) -> Result; } /// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`]. diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 3da4d6b76f..ead0861677 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -14,8 +14,11 @@ pub mod codecs; pub mod models; +mod raw; pub(crate) mod utils; +pub use raw::{RawDubSort, RawKey, RawTable, RawValue}; + /// Declaration of all Database tables. use crate::{ table::DupSort, diff --git a/crates/storage/db/src/tables/raw.rs b/crates/storage/db/src/tables/raw.rs new file mode 100644 index 0000000000..b23298288a --- /dev/null +++ b/crates/storage/db/src/tables/raw.rs @@ -0,0 +1,117 @@ +use crate::{ + table::{Compress, Decode, Decompress, DupSort, Encode, Key, Table, Value}, + Error, +}; +use serde::Serialize; + +/// Raw table that can be used to access any table and its data in raw mode. +/// This is useful for delayed decoding/encoding of data. +#[derive(Default, Copy, Clone, Debug)] +pub struct RawTable { + phantom: std::marker::PhantomData, +} + +impl Table for RawTable { + const NAME: &'static str = T::NAME; + + type Key = RawKey; + + type Value = RawValue; +} + +/// Raw DubSort table that can be used to access any table and its data in raw mode. +/// This is useful for delayed decoding/encoding of data. +#[derive(Default, Copy, Clone, Debug)] +pub struct RawDubSort { + phantom: std::marker::PhantomData, +} + +impl Table for RawDubSort { + const NAME: &'static str = T::NAME; + + type Key = RawKey; + + type Value = RawValue; +} + +impl DupSort for RawDubSort { + type SubKey = RawKey; +} + +/// Raw table key. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RawKey { + key: Vec, + _phantom: std::marker::PhantomData, +} + +impl RawKey { + /// Create new raw key. + pub fn new(key: K) -> Self { + Self { key: K::encode(key).as_ref().to_vec(), _phantom: std::marker::PhantomData } + } + /// Returns the raw key. + pub fn key(&self) -> Result { + K::decode(&self.key) + } +} + +impl AsRef<[u8]> for RawKey> { + fn as_ref(&self) -> &[u8] { + &self.key + } +} + +// Encode +impl Encode for RawKey { + type Encoded = Vec; + + fn encode(self) -> Self::Encoded { + self.key + } +} + +// Decode +impl Decode for RawKey { + fn decode>(key: B) -> Result { + Ok(Self { key: key.as_ref().to_vec(), _phantom: std::marker::PhantomData }) + } +} + +/// Raw table value. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize, Ord, Hash)] +pub struct RawValue { + value: Vec, + _phantom: std::marker::PhantomData, +} + +impl RawValue { + /// Create new raw value. + pub fn new(value: V) -> Self { + Self { value: V::compress(value).as_ref().to_vec(), _phantom: std::marker::PhantomData } + } + /// Returns the raw value. + pub fn value(&self) -> Result { + V::decompress(&self.value) + } +} + +impl AsRef<[u8]> for RawValue> { + fn as_ref(&self) -> &[u8] { + &self.value + } +} + +impl Compress for RawValue { + type Compressed = Vec; + + fn compress(self) -> Self::Compressed { + self.value + } +} + +impl Decompress for RawValue { + fn decompress>(value: B) -> Result { + Ok(Self { value: value.as_ref().to_vec(), _phantom: std::marker::PhantomData }) + } +}