perf(db): RawTable, decode/encode only if needed (#2081)

This commit is contained in:
rakita
2023-04-05 14:46:10 +02:00
committed by GitHub
parent 77a41e5edf
commit 7c18ba8ed3
5 changed files with 143 additions and 14 deletions

View File

@@ -7,6 +7,7 @@ use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
RawKey, RawTable,
};
use reth_primitives::{keccak256, AccountHashingCheckpoint};
use reth_provider::Transaction;
@@ -197,14 +198,15 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
self.save_checkpoint(tx, checkpoint)?;
}
let start_address = checkpoint.address.take();
let start_address = checkpoint.address.take().map(RawKey::new);
let next_address = {
let mut accounts_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
let mut accounts_cursor =
tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
// channels used to return result of account hashing
let mut channels = Vec::new();
for chunk in &accounts_cursor
.walk(start_address)?
.walk(start_address.clone())?
.take(self.commit_threshold as usize)
.chunks(self.commit_threshold as usize / rayon::current_num_threads())
{
@@ -216,7 +218,8 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
for (address, account) in chunk.into_iter() {
let _ = tx.send((keccak256(address), account));
let address = address.key().unwrap();
let _ = tx.send((RawKey::new(keccak256(address)), account));
}
});
}
@@ -228,11 +231,11 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
hashed_batch.push(hashed);
}
}
// sort it all in parallel
hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?;
let mut hashed_account_cursor =
tx.cursor_write::<RawTable<tables::HashedAccount>>()?;
// iterate and put presorted hashed accounts
if start_address.is_none() {
@@ -244,13 +247,12 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
}
// next key of iterator
accounts_cursor.next()?
};
if let Some((next_address, _)) = &next_address {
checkpoint.address = Some(*next_address);
checkpoint.address = Some(next_address.key().unwrap());
checkpoint.from = from_transition;
checkpoint.to = to_transition;
}

View File

@@ -8,6 +8,7 @@ use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
RawKey, RawTable, RawValue,
};
use reth_primitives::{TransactionSigned, TxNumber, H160};
use reth_provider::Transaction;
@@ -31,7 +32,7 @@ pub struct SenderRecoveryStage {
impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { commit_threshold: 10000 }
Self { commit_threshold: 500_000 }
}
}
@@ -72,9 +73,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let mut senders_cursor = tx.cursor_write::<tables::TxSenders>()?;
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut tx_cursor = tx.cursor_read::<RawTable<tables::Transactions>>()?;
// Walk the transactions from start to end index (inclusive)
let tx_walker = tx_cursor.walk_range(first_tx_num..=last_tx_num)?;
let tx_walker =
tx_cursor.walk_range(RawKey::new(first_tx_num)..=RawKey::new(last_tx_num))?;
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders");
@@ -98,8 +100,14 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let chunk: Vec<_> = chunk.collect();
// closure that would recover signer. Used as utility to wrap result
let recover = |entry: Result<(TxNumber, TransactionSigned), reth_db::Error>| -> Result<(u64, H160), Box<StageError>> {
let recover = |entry: Result<
(RawKey<TxNumber>, RawValue<TransactionSigned>),
reth_db::Error,
>|
-> Result<(u64, H160), Box<StageError>> {
let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?;
let tx_id = tx_id.key().expect("key to be formated");
let transaction = transaction.value().expect("value to be formated");
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;
@@ -115,7 +123,6 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
});
}
// Iterate over channels and append the sender in the order that they are received.
for mut channel in channels {
while let Some(recovered) = channel.recv().await {

View File

@@ -37,7 +37,7 @@ pub trait Encode: Send + Sync + Sized + Debug {
/// Trait that will transform the data to be read from the DB.
pub trait Decode: Send + Sync + Sized + Debug {
/// Decodes data coming from the database.
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, Error>;
fn decode<B: AsRef<[u8]>>(key: B) -> Result<Self, Error>;
}
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].

View File

@@ -14,8 +14,11 @@
pub mod codecs;
pub mod models;
mod raw;
pub(crate) mod utils;
pub use raw::{RawDubSort, RawKey, RawTable, RawValue};
/// Declaration of all Database tables.
use crate::{
table::DupSort,

View File

@@ -0,0 +1,117 @@
use crate::{
table::{Compress, Decode, Decompress, DupSort, Encode, Key, Table, Value},
Error,
};
use serde::Serialize;
/// Raw table that can be used to access any table and its data in raw mode.
/// This is useful for delayed decoding/encoding of data.
#[derive(Default, Copy, Clone, Debug)]
pub struct RawTable<T: Table> {
phantom: std::marker::PhantomData<T>,
}
impl<T: Table> Table for RawTable<T> {
const NAME: &'static str = T::NAME;
type Key = RawKey<T::Key>;
type Value = RawValue<T::Value>;
}
/// Raw DubSort table that can be used to access any table and its data in raw mode.
/// This is useful for delayed decoding/encoding of data.
#[derive(Default, Copy, Clone, Debug)]
pub struct RawDubSort<T: DupSort> {
phantom: std::marker::PhantomData<T>,
}
impl<T: DupSort> Table for RawDubSort<T> {
const NAME: &'static str = T::NAME;
type Key = RawKey<T::Key>;
type Value = RawValue<T::Value>;
}
impl<T: DupSort> DupSort for RawDubSort<T> {
type SubKey = RawKey<T::SubKey>;
}
/// Raw table key.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RawKey<K: Key> {
key: Vec<u8>,
_phantom: std::marker::PhantomData<K>,
}
impl<K: Key> RawKey<K> {
/// Create new raw key.
pub fn new(key: K) -> Self {
Self { key: K::encode(key).as_ref().to_vec(), _phantom: std::marker::PhantomData }
}
/// Returns the raw key.
pub fn key(&self) -> Result<K, Error> {
K::decode(&self.key)
}
}
impl AsRef<[u8]> for RawKey<Vec<u8>> {
fn as_ref(&self) -> &[u8] {
&self.key
}
}
// Encode
impl<K: Key> Encode for RawKey<K> {
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
self.key
}
}
// Decode
impl<K: Key> Decode for RawKey<K> {
fn decode<B: AsRef<[u8]>>(key: B) -> Result<Self, Error> {
Ok(Self { key: key.as_ref().to_vec(), _phantom: std::marker::PhantomData })
}
}
/// Raw table value.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize, Ord, Hash)]
pub struct RawValue<V: Value> {
value: Vec<u8>,
_phantom: std::marker::PhantomData<V>,
}
impl<V: Value> RawValue<V> {
/// Create new raw value.
pub fn new(value: V) -> Self {
Self { value: V::compress(value).as_ref().to_vec(), _phantom: std::marker::PhantomData }
}
/// Returns the raw value.
pub fn value(&self) -> Result<V, Error> {
V::decompress(&self.value)
}
}
impl AsRef<[u8]> for RawValue<Vec<u8>> {
fn as_ref(&self) -> &[u8] {
&self.value
}
}
impl<V: Value> Compress for RawValue<V> {
type Compressed = Vec<u8>;
fn compress(self) -> Self::Compressed {
self.value
}
}
impl<V: Value> Decompress for RawValue<V> {
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<Self, Error> {
Ok(Self { value: value.as_ref().to_vec(), _phantom: std::marker::PhantomData })
}
}