From 9452b3658bc66ddd7a45f5dd6dec53747d82d5b7 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 21 Apr 2023 17:15:52 +0800 Subject: [PATCH] feat(db): Don't store `TxHash` when writing to the table (#2279) --- crates/consensus/auto-seal/src/task.rs | 2 +- crates/net/network/src/transactions.rs | 6 +- crates/primitives/src/lib.rs | 4 +- crates/primitives/src/transaction/mod.rs | 227 +++++++++++++++--- .../primitives/src/transaction/signature.rs | 31 ++- crates/rpc/rpc-types/src/eth/block.rs | 2 +- .../rpc/rpc-types/src/eth/transaction/mod.rs | 2 +- crates/rpc/rpc/src/eth/filter.rs | 4 +- crates/rpc/rpc/src/trace.rs | 2 +- crates/stages/src/stages/bodies.rs | 4 +- crates/stages/src/stages/hashing_account.rs | 7 +- crates/stages/src/stages/hashing_storage.rs | 14 +- crates/stages/src/stages/sender_recovery.rs | 29 ++- crates/stages/src/stages/tx_lookup.rs | 83 +++++-- crates/stages/src/test_utils/test_db.rs | 2 +- .../codecs/derive/src/compact/generator.rs | 2 +- .../storage/codecs/derive/src/compact/mod.rs | 6 +- .../storage/db/src/tables/codecs/compact.rs | 2 +- crates/storage/db/src/tables/mod.rs | 4 +- crates/storage/provider/src/providers/mod.rs | 12 +- .../storage/provider/src/test_utils/mock.rs | 2 +- crates/storage/provider/src/transaction.rs | 23 +- crates/storage/provider/src/utils.rs | 2 +- .../transaction-pool/src/test_utils/mock.rs | 2 +- crates/transaction-pool/src/traits.rs | 2 +- 25 files changed, 358 insertions(+), 118 deletions(-) diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 73da2c9702..0173503b97 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -176,7 +176,7 @@ where let Block { mut header, body, .. } = block; // clear all transactions from pool - pool.remove_transactions(body.iter().map(|tx| tx.hash)); + pool.remove_transactions(body.iter().map(|tx| tx.hash())); header.receipts_root = if post_state.receipts().is_empty() { EMPTY_RECEIPTS diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 7d7f89f7c0..3794eab1e6 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -420,11 +420,11 @@ where // If we received the transactions as the response to our GetPooledTransactions // requests (based on received `NewPooledTransactionHashes`) then we already // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] - if source.is_broadcast() && !peer.transactions.insert(tx.hash) { + if source.is_broadcast() && !peer.transactions.insert(tx.hash()) { num_already_seen += 1; } - match self.transactions_by_peers.entry(tx.hash) { + match self.transactions_by_peers.entry(tx.hash()) { Entry::Occupied(mut entry) => { // transaction was already inserted entry.get_mut().push(peer_id); @@ -566,7 +566,7 @@ struct PropagateTransaction { impl PropagateTransaction { fn hash(&self) -> TxHash { - self.transaction.hash + self.transaction.hash() } fn new(transaction: Arc) -> Self { diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 25f2d5dbba..99b7c8d361 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -72,8 +72,8 @@ pub use transaction::{ util::secp256k1::sign_message, AccessList, AccessListItem, AccessListWithGasUsed, FromRecoveredTransaction, IntoRecoveredTransaction, InvalidTransactionError, Signature, Transaction, TransactionKind, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, - TxEip1559, TxEip2930, TxLegacy, TxType, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, - LEGACY_TX_TYPE_ID, + TransactionSignedNoHash, TxEip1559, TxEip2930, TxLegacy, TxType, EIP1559_TX_TYPE_ID, + EIP2930_TX_TYPE_ID, LEGACY_TX_TYPE_ID, }; pub use withdrawal::Withdrawal; diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index fdc5bc1275..b47f40cf78 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -4,10 +4,11 @@ use bytes::{Buf, BytesMut}; use derive_more::{AsRef, Deref}; pub use error::InvalidTransactionError; pub use meta::TransactionMeta; -use reth_codecs::{add_arbitrary_tests, main_codec, Compact}; +use reth_codecs::{add_arbitrary_tests, derive_arbitrary, main_codec, Compact}; use reth_rlp::{ length_of_length, Decodable, DecodeError, Encodable, Header, EMPTY_LIST_CODE, EMPTY_STRING_CODE, }; +use serde::{Deserialize, Serialize}; pub use signature::Signature; pub use tx_type::{TxType, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, LEGACY_TX_TYPE_ID}; @@ -167,8 +168,8 @@ pub struct TxEip1559 { /// A raw transaction. /// /// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718). -#[main_codec] -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive_arbitrary(compact)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Transaction { /// Legacy transaction. Legacy(TxLegacy), @@ -178,6 +179,90 @@ pub enum Transaction { Eip1559(TxEip1559), } +impl Transaction { + /// This encodes the transaction _without_ the signature, and is only suitable for creating a + /// hash intended for signing. + pub fn encode_without_signature(&self, out: &mut dyn bytes::BufMut) { + Encodable::encode(self, out); + } + + /// Inner encoding function that is used for both rlp [`Encodable`] trait and for calculating + /// hash that for eip2718 does not require rlp header + pub fn encode_with_signature( + &self, + signature: &Signature, + out: &mut dyn bytes::BufMut, + with_header: bool, + ) { + match self { + Transaction::Legacy(TxLegacy { chain_id, .. }) => { + // do nothing w/ with_header + let payload_length = + self.fields_len() + signature.payload_len_with_eip155_chain_id(*chain_id); + let header = Header { list: true, payload_length }; + header.encode(out); + self.encode_fields(out); + signature.encode_with_eip155_chain_id(out, *chain_id); + } + _ => { + let payload_length = self.fields_len() + signature.payload_len(); + if with_header { + Header { + list: false, + payload_length: 1 + length_of_length(payload_length) + payload_length, + } + .encode(out); + } + out.put_u8(self.tx_type() as u8); + let header = Header { list: true, payload_length }; + header.encode(out); + self.encode_fields(out); + signature.encode(out); + } + } + } +} + +impl Compact for Transaction { + fn to_compact(self, buf: &mut B) -> usize + where + B: bytes::BufMut + AsMut<[u8]>, + { + match self { + Transaction::Legacy(tx) => { + tx.to_compact(buf); + 0 + } + Transaction::Eip2930(tx) => { + tx.to_compact(buf); + 1 + } + Transaction::Eip1559(tx) => { + tx.to_compact(buf); + 2 + } + } + } + + fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) { + match identifier { + 0 => { + let (tx, buf) = TxLegacy::from_compact(buf, buf.len()); + (Transaction::Legacy(tx), buf) + } + 1 => { + let (tx, buf) = TxEip2930::from_compact(buf, buf.len()); + (Transaction::Eip2930(tx), buf) + } + 2 => { + let (tx, buf) = TxEip1559::from_compact(buf, buf.len()); + (Transaction::Eip1559(tx), buf) + } + _ => unreachable!(), + } + } +} + // === impl Transaction === impl Transaction { @@ -549,8 +634,8 @@ impl Encodable for Transaction { } /// Whether or not the transaction is a contract creation. -#[main_codec] -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +#[derive_arbitrary(compact, rlp)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] pub enum TransactionKind { /// A transaction that creates a contract. #[default] @@ -559,6 +644,32 @@ pub enum TransactionKind { Call(Address), } +impl Compact for TransactionKind { + fn to_compact(self, buf: &mut B) -> usize + where + B: bytes::BufMut + AsMut<[u8]>, + { + match self { + TransactionKind::Create => 0, + TransactionKind::Call(address) => { + address.to_compact(buf); + 1 + } + } + } + + fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) { + match identifier { + 0 => (TransactionKind::Create, buf), + 1 => { + let (addr, buf) = Address::from_compact(buf, buf.len()); + (TransactionKind::Call(addr), buf) + } + _ => unreachable!(), + } + } +} + impl Encodable for TransactionKind { fn encode(&self, out: &mut dyn reth_rlp::BufMut) { match self { @@ -590,10 +701,77 @@ impl Decodable for TransactionKind { } } +/// Signed transaction without its Hash. Used type for inserting into the DB. +#[derive_arbitrary(compact)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)] +pub struct TransactionSignedNoHash { + /// The transaction signature values + pub signature: Signature, + /// Raw transaction info + #[deref] + #[as_ref] + pub transaction: Transaction, +} + +impl TransactionSignedNoHash { + /// Calculates the transaction hash. If used more than once, it's better to convert it to + /// [`TransactionSigned`] first. + pub fn hash(&self) -> H256 { + let mut buf = Vec::new(); + self.transaction.encode_with_signature(&self.signature, &mut buf, false); + keccak256(&buf) + } + + /// Converts into a transaction type with its hash: [`TransactionSigned`]. + pub fn with_hash(self) -> TransactionSigned { + self.into() + } +} + +impl Compact for TransactionSignedNoHash { + fn to_compact(self, buf: &mut B) -> usize + where + B: bytes::BufMut + AsMut<[u8]>, + { + let before = buf.as_mut().len(); + + // placeholder for bitflags + buf.put_u8(0); + + let sig_bit = self.signature.to_compact(buf) as u8; + let tx_bit = self.transaction.to_compact(buf) as u8; + + // replace with actual flags + buf.as_mut()[before] = sig_bit | (tx_bit << 1); + + buf.as_mut().len() - before + } + + fn from_compact(mut buf: &[u8], _: usize) -> (Self, &[u8]) { + let prefix = buf.get_u8() as usize; + + let (signature, buf) = Signature::from_compact(buf, prefix & 1); + let (transaction, buf) = Transaction::from_compact(buf, prefix >> 1); + + (TransactionSignedNoHash { signature, transaction }, buf) + } +} + +impl From for TransactionSigned { + fn from(tx: TransactionSignedNoHash) -> Self { + TransactionSigned::from_transaction_and_signature(tx.transaction, tx.signature) + } +} + +impl From for TransactionSignedNoHash { + fn from(tx: TransactionSigned) -> Self { + TransactionSignedNoHash { signature: tx.signature, transaction: tx.transaction } + } +} + /// Signed transaction. -#[main_codec(no_arbitrary)] -#[add_arbitrary_tests(rlp, compact)] -#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)] +#[add_arbitrary_tests(rlp)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)] pub struct TransactionSigned { /// Transaction hash pub hash: TxHash, @@ -624,6 +802,11 @@ impl TransactionSigned { self.hash } + /// Reference to transaction hash. Used to identify transaction. + pub fn hash_ref(&self) -> &TxHash { + &self.hash + } + /// Recover signer from signature and hash. /// /// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer]. @@ -668,32 +851,7 @@ impl TransactionSigned { /// Inner encoding function that is used for both rlp [`Encodable`] trait and for calculating /// hash that for eip2718 does not require rlp header pub(crate) fn encode_inner(&self, out: &mut dyn bytes::BufMut, with_header: bool) { - match self.transaction { - Transaction::Legacy(TxLegacy { chain_id, .. }) => { - // do nothing w/ with_header - let payload_length = self.transaction.fields_len() + - self.signature.payload_len_with_eip155_chain_id(chain_id); - let header = Header { list: true, payload_length }; - header.encode(out); - self.transaction.encode_fields(out); - self.signature.encode_with_eip155_chain_id(out, chain_id); - } - _ => { - let payload_length = self.transaction.fields_len() + self.signature.payload_len(); - if with_header { - Header { - list: false, - payload_length: 1 + length_of_length(payload_length) + payload_length, - } - .encode(out); - } - out.put_u8(self.transaction.tx_type() as u8); - let header = Header { list: true, payload_length }; - header.encode(out); - self.transaction.encode_fields(out); - self.signature.encode(out); - } - } + self.transaction.encode_with_signature(&self.signature, out, with_header); } /// Output the length of the encode_inner(out, true). Note to assume that `with_header` is only @@ -920,7 +1078,6 @@ impl<'a> arbitrary::Arbitrary<'a> for TransactionSigned { } /// Signed transaction with recovered signer. -#[main_codec] #[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)] pub struct TransactionSignedEcRecovered { /// Signer of the transaction diff --git a/crates/primitives/src/transaction/signature.rs b/crates/primitives/src/transaction/signature.rs index 38a5a9cc72..04ab978a81 100644 --- a/crates/primitives/src/transaction/signature.rs +++ b/crates/primitives/src/transaction/signature.rs @@ -1,12 +1,14 @@ use crate::{transaction::util::secp256k1, Address, H256, U256}; -use reth_codecs::{main_codec, Compact}; +use bytes::Buf; +use reth_codecs::{derive_arbitrary, Compact}; use reth_rlp::{Decodable, DecodeError, Encodable}; +use serde::{Deserialize, Serialize}; /// r, s: Values corresponding to the signature of the /// transaction and used to determine the sender of /// the transaction; formally Tr and Ts. This is expanded in Appendix F of yellow paper. -#[main_codec] -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] +#[derive_arbitrary(compact)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] pub struct Signature { /// The R field of the signature; the point on the curve. pub r: U256, @@ -16,6 +18,27 @@ pub struct Signature { pub odd_y_parity: bool, } +impl Compact for Signature { + fn to_compact(self, buf: &mut B) -> usize + where + B: bytes::BufMut + AsMut<[u8]>, + { + buf.put_slice(self.r.as_le_bytes().as_ref()); + buf.put_slice(self.s.as_le_bytes().as_ref()); + self.odd_y_parity as usize + } + + fn from_compact(mut buf: &[u8], identifier: usize) -> (Self, &[u8]) { + let r = U256::try_from_le_slice(&buf[..32]).expect("qed"); + buf.advance(32); + + let s = U256::try_from_le_slice(&buf[..32]).expect("qed"); + buf.advance(32); + + (Signature { r, s, odd_y_parity: identifier != 0 }, buf) + } +} + impl Signature { /// Output the length of the signature without the length of the RLP header, using the legacy /// scheme with EIP-155 support depends on chain_id. @@ -91,7 +114,7 @@ impl Signature { } /// Recover signature from hash. - pub(crate) fn recover_signer(&self, hash: H256) -> Option
{ + pub fn recover_signer(&self, hash: H256) -> Option
{ let mut sig: [u8; 65] = [0; 65]; sig[0..32].copy_from_slice(&self.r.to_be_bytes::<32>()); diff --git a/crates/rpc/rpc-types/src/eth/block.rs b/crates/rpc/rpc-types/src/eth/block.rs index c0ee10532e..c6f01a9e29 100644 --- a/crates/rpc/rpc-types/src/eth/block.rs +++ b/crates/rpc/rpc-types/src/eth/block.rs @@ -115,7 +115,7 @@ impl Block { block_hash: Option, ) -> Self { let block_hash = block_hash.unwrap_or_else(|| block.header.hash_slow()); - let transactions = block.body.iter().map(|tx| tx.hash).collect(); + let transactions = block.body.iter().map(|tx| tx.hash()).collect(); Self::from_block_with_transactions( block_hash, diff --git a/crates/rpc/rpc-types/src/eth/transaction/mod.rs b/crates/rpc/rpc-types/src/eth/transaction/mod.rs index 199525e76f..6522dbe281 100644 --- a/crates/rpc/rpc-types/src/eth/transaction/mod.rs +++ b/crates/rpc/rpc-types/src/eth/transaction/mod.rs @@ -132,7 +132,7 @@ impl Transaction { }; Self { - hash: signed_tx.hash, + hash: signed_tx.hash(), nonce: U256::from(signed_tx.nonce()), block_hash: None, block_number: None, diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index b32ec5dcfd..1fa530b537 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -195,7 +195,7 @@ where &mut all_logs, &filter, (block_hash, block.number).into(), - block.body.into_iter().map(|tx| tx.hash).zip(receipts), + block.body.into_iter().map(|tx| tx.hash()).zip(receipts), false, ); } @@ -269,7 +269,7 @@ where &mut all_logs, &filter_params, (block_number, block_hash).into(), - block.body.into_iter().map(|tx| tx.hash).zip(receipts), + block.body.into_iter().map(|tx| tx.hash()).zip(receipts), false, ); diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index a8e50492d5..fb0e8a8e91 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -233,7 +233,7 @@ where for (idx, tx) in transactions.into_iter().enumerate() { let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; let tx_info = TransactionInfo { - hash: Some(tx.hash), + hash: Some(tx.hash()), index: Some(idx as u64), block_hash: Some(block_hash), block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 4d1a26fb27..12348c4a56 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -119,7 +119,7 @@ impl Stage for BodyStage { // Write transactions for transaction in block.body { // Append the transaction - tx_cursor.append(next_tx_num, transaction)?; + tx_cursor.append(next_tx_num, transaction.into())?; // Increment transaction id for each transaction. next_tx_num += 1; } @@ -510,7 +510,7 @@ mod tests { }; body.tx_num_range().try_for_each(|tx_num| { let transaction = random_signed_tx(); - tx.put::(tx_num, transaction) + tx.put::(tx_num, transaction.into()) })?; if body.tx_count != 0 { diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 8bbfc0b215..360cdff546 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -346,7 +346,7 @@ mod tests { let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 10); + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 10); assert_eq!(runner.tx.table::().unwrap().len(), 5); let fifth_address = runner .tx @@ -354,8 +354,7 @@ mod tests { let (address, _) = tx .cursor_read::()? .walk(None)? - .skip(5) - .next() + .nth(5) .unwrap() .unwrap(); Ok(address) @@ -373,7 +372,7 @@ mod tests { let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == true && stage_progress == 20); + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 20); assert_eq!(runner.tx.table::().unwrap().len(), 10); // Validate the stage execution diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 875c6cf099..d01f056526 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -293,7 +293,7 @@ mod tests { // first run, hash first half of storages. let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 100); + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100); assert_eq!(runner.tx.table::().unwrap().len(), 500); let (progress_address, progress_key) = runner .tx @@ -301,8 +301,7 @@ mod tests { let (address, entry) = tx .cursor_read::()? .walk(None)? - .skip(500) - .next() + .nth(500) .unwrap() .unwrap(); Ok((address, entry.key)) @@ -325,7 +324,7 @@ mod tests { runner.set_commit_threshold(2); let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 100); + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100); assert_eq!(runner.tx.table::().unwrap().len(), 502); let (progress_address, progress_key) = runner .tx @@ -333,8 +332,7 @@ mod tests { let (address, entry) = tx .cursor_read::()? .walk(None)? - .skip(502) - .next() + .nth(502) .unwrap() .unwrap(); Ok((address, entry.key)) @@ -359,7 +357,7 @@ mod tests { let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == true && stage_progress == 500); + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 500); assert_eq!( runner.tx.table::().unwrap().len(), runner.tx.table::().unwrap().len() @@ -420,7 +418,7 @@ mod tests { self.tx.commit(|tx| { progress.body.iter().try_for_each(|transaction| { tx.put::(transaction.hash(), next_tx_num)?; - tx.put::(next_tx_num, transaction.clone())?; + tx.put::(next_tx_num, transaction.clone().into())?; let (addr, _) = accounts .get_mut(rand::random::() % n_accounts as usize) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 3fa65f8449..2726e0a341 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -7,7 +7,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, RawValue, }; -use reth_primitives::{TransactionSigned, TxNumber, H160}; +use reth_primitives::{keccak256, TransactionSignedNoHash, TxNumber, H160}; use reth_provider::Transaction; use std::fmt::Debug; use thiserror::Error; @@ -100,16 +100,20 @@ impl Stage for SenderRecoveryStage { // closure that would recover signer. Used as utility to wrap result let recover = |entry: Result< - (RawKey, RawValue), + (RawKey, RawValue), reth_db::Error, - >| + >, + rlp_buf: &mut Vec| -> Result<(u64, H160), Box> { let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?; let tx_id = tx_id.key().expect("key to be formated"); - let transaction = transaction.value().expect("value to be formated"); - let sender = transaction.recover_signer().ok_or(StageError::from( - SenderRecoveryStageError::SenderRecovery { tx: tx_id }, - ))?; + + let tx = transaction.value().expect("value to be formated"); + tx.transaction.encode_without_signature(rlp_buf); + + let sender = tx.signature.recover_signer(keccak256(rlp_buf)).ok_or( + StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }), + )?; Ok((tx_id, sender)) }; @@ -117,8 +121,10 @@ impl Stage for SenderRecoveryStage { // Spawn the sender recovery task onto the global rayon pool // This task will send the results through the channel after it recovered the senders. rayon::spawn(move || { + let mut rlp_buf = Vec::with_capacity(128); for entry in chunk { - let _ = tx.send(recover(entry)); + rlp_buf.truncate(0); + let _ = tx.send(recover(entry, &mut rlp_buf)); } }); } @@ -165,7 +171,7 @@ impl From for StageError { mod tests { use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; - use reth_primitives::{BlockNumber, SealedBlock, H256}; + use reth_primitives::{BlockNumber, SealedBlock, TransactionSigned, H256}; use super::*; use crate::test_utils::{ @@ -330,9 +336,10 @@ mod tests { while let Some((_, body)) = body_cursor.next()? { for tx_id in body.tx_num_range() { - let transaction = tx + let transaction: TransactionSigned = tx .get::(tx_id)? - .expect("no transaction entry"); + .expect("no transaction entry") + .into(); let signer = transaction.recover_signer().expect("failed to recover signer"); assert_eq!(Some(signer), tx.get::(tx_id)?); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index db7930d60e..aa5ea497f6 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,4 +1,5 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use itertools::Itertools; use rayon::prelude::*; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -6,7 +7,10 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; +use reth_primitives::{rpc_utils::keccak256, BlockNumber, TransactionSignedNoHash, TxNumber, H256}; use reth_provider::Transaction; +use thiserror::Error; +use tokio::sync::mpsc; use tracing::*; /// The [`StageId`] of the transaction lookup stage. @@ -19,7 +23,7 @@ pub const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); /// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. #[derive(Debug, Clone)] pub struct TransactionLookupStage { - /// The number of table entries to commit at once + /// The number of blocks to commit at once commit_threshold: u64, } @@ -43,7 +47,7 @@ impl Stage for TransactionLookupStage { TRANSACTION_LOOKUP } - /// Write total difficulty entries + /// Write transaction hash -> id entries async fn execute( &mut self, tx: &mut Transaction<'_, DB>, @@ -58,26 +62,57 @@ impl Stage for TransactionLookupStage { debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync"); let mut block_meta_cursor = tx.cursor_read::()?; + + let (_, first_block) = block_meta_cursor.seek_exact(start_block)?.ok_or( + StageError::from(TransactionLookupStageError::TransactionLookup { block: start_block }), + )?; + + let (_, last_block) = block_meta_cursor.seek_exact(end_block)?.ok_or(StageError::from( + TransactionLookupStageError::TransactionLookup { block: end_block }, + ))?; + let mut tx_cursor = tx.cursor_read::()?; + let tx_walker = + tx_cursor.walk_range(first_block.first_tx_num()..=last_block.last_tx_num())?; - // Walk over block bodies within a specified range. - let bodies = block_meta_cursor.walk_range(start_block..=end_block)?; + let mut channels = Vec::new(); - let mut total_transactions = 0; - let mut tx_ranges = Vec::with_capacity((end_block - start_block) as usize); + for chunk in &tx_walker.chunks(100_000 / rayon::current_num_threads()) { + let (tx, rx) = mpsc::unbounded_channel(); + channels.push(rx); - for block_meta_entry in bodies { - let (_, block_meta) = block_meta_entry?; - total_transactions += block_meta.tx_count; - tx_ranges.push(block_meta.tx_num_range()); + // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) + let chunk: Vec<_> = chunk.collect(); + + // closure that will calculate the TxHash + let calculate_hash = + |entry: Result<(TxNumber, TransactionSignedNoHash), reth_db::Error>, + rlp_buf: &mut Vec| + -> Result<(H256, u64), Box> { + let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?; + tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false); + Ok((H256(keccak256(rlp_buf)), tx_id)) + }; + + // Spawn the task onto the global rayon pool + // This task will send the results through the channel after it has calculated the hash. + rayon::spawn(move || { + let mut rlp_buf = Vec::with_capacity(128); + for entry in chunk { + rlp_buf.truncate(0); + let _ = tx.send(calculate_hash(entry, &mut rlp_buf)); + } + }); } - // Collect transactions for each body - let mut tx_list = Vec::with_capacity(total_transactions as usize); - for tx_num_range in tx_ranges { - for tx_entry in tx_cursor.walk_range(tx_num_range)? { - let (id, transaction) = tx_entry?; - tx_list.push((transaction.hash(), id)); + let mut tx_list = + Vec::with_capacity((last_block.last_tx_num() - first_block.first_tx_num()) as usize); + + // Iterate over channels and append the tx hashes to be sorted out later + for mut channel in channels { + while let Some(tx) = channel.recv().await { + let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?; + tx_list.push((tx_hash, tx_id)); } } @@ -131,7 +166,7 @@ impl Stage for TransactionLookupStage { for tx_id in body.tx_num_range() { // First delete the transaction and hash to id mapping if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { - if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { + if tx_hash_number_cursor.seek_exact(transaction.hash())?.is_some() { tx_hash_number_cursor.delete_current()?; } } @@ -142,6 +177,18 @@ impl Stage for TransactionLookupStage { } } +#[derive(Error, Debug)] +enum TransactionLookupStageError { + #[error("Transaction lookup failed to find block {block}.")] + TransactionLookup { block: BlockNumber }, +} + +impl From for StageError { + fn from(error: TransactionLookupStageError) -> Self { + StageError::Fatal(Box::new(error)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -316,7 +363,7 @@ mod tests { .expect("no transaction entry"); assert_eq!( Some(tx_id), - tx.get::(transaction.hash)?, + tx.get::(transaction.hash())?, ); } } diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 26d9d3210d..20e0cf772a 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -243,7 +243,7 @@ impl TestTransaction { }, )?; block.body.iter().try_for_each(|body_tx| { - tx.put::(next_tx_num, body_tx.clone())?; + tx.put::(next_tx_num, body_tx.clone().into())?; next_tx_num += 1; Ok(()) }) diff --git a/crates/storage/codecs/derive/src/compact/generator.rs b/crates/storage/codecs/derive/src/compact/generator.rs index 95181ee5aa..e112842a78 100644 --- a/crates/storage/codecs/derive/src/compact/generator.rs +++ b/crates/storage/codecs/derive/src/compact/generator.rs @@ -53,7 +53,7 @@ pub fn generate_from_to(ident: &Ident, fields: &FieldList) -> TokenStream2 { /// Generates code to implement the `Compact` trait method `to_compact`. fn generate_from_compact(fields: &FieldList, ident: &Ident) -> Vec { let mut lines = vec![]; - let mut known_types = vec!["H256", "H160", "Address", "Bloom", "Vec"]; + let mut known_types = vec!["H256", "H160", "Address", "Bloom", "Vec", "TxHash"]; // Only types without `bytes::Bytes` should be added here. It's currently manually added, since // it's hard to figure out with derive_macro which types have bytes::Bytes fields. diff --git a/crates/storage/codecs/derive/src/compact/mod.rs b/crates/storage/codecs/derive/src/compact/mod.rs index fddcbd6de7..32c0635427 100644 --- a/crates/storage/codecs/derive/src/compact/mod.rs +++ b/crates/storage/codecs/derive/src/compact/mod.rs @@ -144,7 +144,7 @@ fn should_use_alt_impl(ftype: &String, segment: &syn::PathSegment) -> bool { if let (Some(path), 1) = (arg_path.path.segments.first(), arg_path.path.segments.len()) { - if ["H256", "H160", "Address", "Bloom"] + if ["H256", "H160", "Address", "Bloom", "TxHash"] .contains(&path.ident.to_string().as_str()) { return true @@ -160,11 +160,11 @@ fn should_use_alt_impl(ftype: &String, segment: &syn::PathSegment) -> bool { /// length. pub fn get_bit_size(ftype: &str) -> u8 { match ftype { - "bool" | "Option" => 1, + "TransactionKind" | "bool" | "Option" | "Signature" => 1, "TxType" => 2, "u64" | "BlockNumber" | "TxNumber" | "ChainId" | "NumTransactions" => 4, "u128" => 5, - "U256" | "TxHash" => 6, + "U256" => 6, _ => 0, } } diff --git a/crates/storage/db/src/tables/codecs/compact.rs b/crates/storage/db/src/tables/codecs/compact.rs index 46c3833fc1..b34ada2835 100644 --- a/crates/storage/db/src/tables/codecs/compact.rs +++ b/crates/storage/db/src/tables/codecs/compact.rs @@ -48,7 +48,7 @@ impl_compression_for_compact!( Bytecode, ProofCheckpoint ); -impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); +impl_compression_for_compact!(AccountBeforeTx, TransactionSignedNoHash); impl_compression_for_compact!(CompactU256); macro_rules! impl_compression_fixed_compact { diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index d40da2478a..2b69da7d73 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -35,7 +35,7 @@ use crate::{ use reth_primitives::{ trie::{BranchNodeCompact, StorageTrieEntry, StoredNibbles, StoredNibblesSubKey}, Account, Address, BlockHash, BlockNumber, Bytecode, Header, IntegerList, Receipt, StorageEntry, - TransactionSigned, TxHash, TxNumber, H256, + TransactionSignedNoHash, TxHash, TxNumber, H256, }; /// Enum for the types of tables present in libmdbx. @@ -169,7 +169,7 @@ table!( table!( /// (Canonical only) Stores the transaction body for canonical transactions. - ( Transactions ) TxNumber | TransactionSigned + ( Transactions ) TxNumber | TransactionSignedNoHash ); table!( diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 4c2a338b7c..5a2452594b 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -169,7 +169,10 @@ impl TransactionsProvider for ShareableDatabase { } fn transaction_by_id(&self, id: TxNumber) -> Result> { - self.db.view(|tx| tx.get::(id))?.map_err(Into::into) + self.db + .view(|tx| tx.get::(id))? + .map_err(Into::into) + .map(|tx| tx.map(Into::into)) } fn transaction_by_hash(&self, hash: TxHash) -> Result> { @@ -182,6 +185,7 @@ impl TransactionsProvider for ShareableDatabase { } })? .map_err(Into::into) + .map(|tx| tx.map(Into::into)) } fn transaction_by_hash_with_meta( @@ -216,7 +220,7 @@ impl TransactionsProvider for ShareableDatabase { block_number, }; - return Ok(Some((transaction, meta))) + return Ok(Some((transaction.into(), meta))) } } } @@ -248,7 +252,7 @@ impl TransactionsProvider for ShareableDatabase { let mut tx_cursor = tx.cursor_read::()?; let transactions = tx_cursor .walk_range(tx_range)? - .map(|result| result.map(|(_, tx)| tx)) + .map(|result| result.map(|(_, tx)| tx.into())) .collect::, _>>()?; Ok(Some(transactions)) } @@ -274,7 +278,7 @@ impl TransactionsProvider for ShareableDatabase { results.push( tx_cursor .walk_range(tx_num_range)? - .map(|result| result.map(|(_, tx)| tx)) + .map(|result| result.map(|(_, tx)| tx.into())) .collect::, _>>()?, ); } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 1bd6347eea..7aa44366de 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -152,7 +152,7 @@ impl TransactionsProvider for MockEthProvider { .blocks .lock() .iter() - .find_map(|(_, block)| block.body.iter().find(|tx| tx.hash == hash).cloned())) + .find_map(|(_, block)| block.body.iter().find(|tx| tx.hash() == hash).cloned())) } fn transaction_by_hash_with_meta( diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index b043e97a24..a331975fdc 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -20,7 +20,8 @@ use reth_db::{ use reth_interfaces::{db::Error as DbError, provider::ProviderError}; use reth_primitives::{ keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, - SealedBlockWithSenders, StorageEntry, TransactionSignedEcRecovered, TxNumber, H256, U256, + SealedBlockWithSenders, StorageEntry, TransactionSigned, TransactionSignedEcRecovered, + TxNumber, H256, U256, }; use reth_trie::{StateRoot, StateRootError}; use std::{ @@ -637,8 +638,12 @@ where } // Get transactions and senders - let transactions = - self.get_or_take::(first_transaction..=last_transaction)?; + let transactions = self + .get_or_take::(first_transaction..=last_transaction)? + .into_iter() + .map(|(id, tx)| (id, tx.into())) + .collect::>(); + let senders = self.get_or_take::(first_transaction..=last_transaction)?; @@ -1454,8 +1459,8 @@ mod test { let get = tx.get_block_and_execution_range(&chain_spec, 1..=1).unwrap(); let get_block = get[0].0.clone(); let get_state = get[0].1.clone(); - assert_eq!(get_block, block1.clone()); - assert_eq!(get_state, exec_res1.clone()); + assert_eq!(get_block, block1); + assert_eq!(get_state, exec_res1); // take one block let take = tx.take_block_and_execution_range(&chain_spec, 1..=1).unwrap(); @@ -1492,10 +1497,10 @@ mod test { // get two blocks let get = tx.get_block_and_execution_range(&chain_spec, 1..=2).unwrap(); - assert_eq!(get[0].0, block1.clone()); - assert_eq!(get[1].0, block2.clone()); - assert_eq!(get[0].1, exec_res1.clone()); - assert_eq!(get[1].1, exec_res2.clone()); + assert_eq!(get[0].0, block1); + assert_eq!(get[1].0, block2); + assert_eq!(get[0].1, exec_res1); + assert_eq!(get[1].1, exec_res2); // take two blocks let get = tx.take_block_and_execution_range(&chain_spec, 1..=2).unwrap(); diff --git a/crates/storage/provider/src/utils.rs b/crates/storage/provider/src/utils.rs index cafbb6568c..ae3d70f343 100644 --- a/crates/storage/provider/src/utils.rs +++ b/crates/storage/provider/src/utils.rs @@ -71,7 +71,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( for (transaction, sender) in tx_iter { let hash = transaction.hash(); tx.put::(next_tx_num, sender)?; - tx.put::(next_tx_num, transaction)?; + tx.put::(next_tx_num, transaction.into())?; tx.put::(hash, next_tx_num)?; next_tx_num += 1; } diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index 47fc930627..dc67a310f1 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -367,7 +367,7 @@ impl FromRecoveredTransaction for MockTransaction { fn from_recovered_transaction(tx: TransactionSignedEcRecovered) -> Self { let sender = tx.signer(); let transaction = tx.into_signed(); - let hash = transaction.hash; + let hash = transaction.hash(); match transaction.transaction { Transaction::Legacy(TxLegacy { chain_id, diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 8df307dfa7..c81294515b 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -352,7 +352,7 @@ impl PooledTransaction { impl PoolTransaction for PooledTransaction { /// Returns hash of the transaction. fn hash(&self) -> &TxHash { - &self.transaction.hash + self.transaction.hash_ref() } /// Returns the Sender of the transaction.