From 38b0bf6d1aca6aec0e8ad5930147218c65f67d60 Mon Sep 17 00:00:00 2001 From: rakita Date: Mon, 20 Mar 2023 19:16:28 +0100 Subject: [PATCH] feat: Add transaction to block number index (#1866) --- crates/stages/src/stages/bodies.rs | 43 ++++++++++++++++--- crates/storage/db/src/tables/mod.rs | 12 +++++- crates/storage/provider/src/providers/mod.rs | 9 ++++ .../storage/provider/src/test_utils/blocks.rs | 1 + .../storage/provider/src/test_utils/mock.rs | 4 ++ .../storage/provider/src/test_utils/noop.rs | 4 ++ .../provider/src/traits/transactions.rs | 3 ++ crates/storage/provider/src/transaction.rs | 37 ++++++++++++++-- crates/storage/provider/src/utils.rs | 8 ++++ 9 files changed, 109 insertions(+), 12 deletions(-) diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 20b176d05d..8f6aaf87cc 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -44,6 +44,7 @@ pub const BODIES: StageId = StageId("Bodies"); /// - [`BlockOmmers`][reth_db::tables::BlockOmmers] /// - [`BlockBodies`][reth_db::tables::BlockBodies] /// - [`Transactions`][reth_db::tables::Transactions] +/// - [`TransactionBlock`][reth_db::tables::TransactionBlock] /// - [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex] /// - [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex] /// @@ -90,6 +91,7 @@ impl Stage for BodyStage { // Cursors used to write bodies, ommers and transactions let mut body_cursor = tx.cursor_write::()?; let mut tx_cursor = tx.cursor_write::()?; + let mut tx_block_cursor = tx.cursor_write::()?; let mut ommers_cursor = tx.cursor_write::()?; let mut withdrawals_cursor = tx.cursor_write::()?; @@ -117,13 +119,16 @@ impl Stage for BodyStage { let mut has_withdrawals = false; match response { BlockResponse::Full(block) => { - body_cursor.append( - block_number, - StoredBlockBody { - start_tx_id: current_tx_id, - tx_count: block.body.len() as u64, - }, - )?; + let body = StoredBlockBody { + start_tx_id: current_tx_id, + tx_count: block.body.len() as u64, + }; + body_cursor.append(block_number, body.clone())?; + + // write transaction block index + if !body.is_empty() { + tx_block_cursor.append(body.last_tx_index(), block.number)?; + } // Write transactions for transaction in block.body { @@ -208,6 +213,7 @@ impl Stage for BodyStage { // Cursors to unwind transitions let mut block_transition_cursor = tx.cursor_write::()?; let mut tx_transition_cursor = tx.cursor_write::()?; + let mut tx_block_cursor = tx.cursor_write::()?; let mut rev_walker = body_cursor.walk_back(None)?; while let Some((number, body)) = rev_walker.next().transpose()? { @@ -230,6 +236,11 @@ impl Stage for BodyStage { block_transition_cursor.delete_current()?; } + // Delete all transaction to block values. + if !body.is_empty() && tx_block_cursor.seek_exact(body.last_tx_index())?.is_some() { + tx_block_cursor.delete_current()?; + } + // Delete all transactions that belong to this block for tx_id in body.tx_id_range() { // First delete the transaction @@ -555,6 +566,12 @@ mod tests { progress.number, block_transition_id, )?; + if body.tx_count != 0 { + tx.put::( + body.first_tx_index(), + progress.number, + )?; + } tx.put::(progress.number, body)?; if !progress.ommers_hash_is_empty() { tx.put::( @@ -605,6 +622,10 @@ mod tests { last_tx_id, |key| key, )?; + self.tx.ensure_no_entry_above::( + last_tx_id, + |key| key, + )?; } Ok(()) } @@ -640,6 +661,7 @@ mod tests { let mut block_transition_cursor = tx.cursor_read::()?; let mut transaction_cursor = tx.cursor_read::()?; let mut tx_transition_cursor = tx.cursor_read::()?; + let mut tx_block_cursor = tx.cursor_read::()?; let first_body_key = match bodies_cursor.first()? { Some((key, _)) => key, @@ -675,6 +697,13 @@ mod tests { assert!(stored_ommers.is_some(), "Missing ommers entry"); } + let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_index())?.map(|(_,b)| b); + if body.tx_count == 0 { + assert_ne!(tx_block_id,Some(number)); + } else { + assert_eq!(tx_block_id, Some(number)); + } + for tx_id in body.tx_id_range() { let tx_entry = transaction_cursor.seek_exact(tx_id)?; assert!(tx_entry.is_some(), "Transaction is missing."); diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 7d058403df..db21772ad1 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -32,7 +32,7 @@ pub enum TableType { } /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 27] = [ +pub const TABLES: [(TableType, &str); 28] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), @@ -40,6 +40,7 @@ pub const TABLES: [(TableType, &str); 27] = [ (TableType::Table, BlockBodies::const_name()), (TableType::Table, BlockOmmers::const_name()), (TableType::Table, BlockWithdrawals::const_name()), + (TableType::Table, TransactionBlock::const_name()), (TableType::Table, Transactions::const_name()), (TableType::Table, TxHashNumber::const_name()), (TableType::Table, Receipts::const_name()), @@ -148,7 +149,7 @@ table!( table!( /// (Canonical only) Stores the transaction body for canonical transactions. - ( Transactions ) TxNumber | TransactionSigned + ( Transactions ) TxNumber | TransactionSigned ); table!( @@ -156,6 +157,13 @@ table!( ( TxHashNumber ) TxHash | TxNumber ); +table!( + /// Stores the mapping of transaction number to the blocks number. + /// + /// The key is the highest transaction ID in the block. + ( TransactionBlock ) TxNumber | BlockNumber +); + table!( /// (Canonical only) Stores transaction receipts. ( Receipts ) TxNumber | Receipt diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 0e1d99857a..d66433eee2 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -181,6 +181,15 @@ impl TransactionsProvider for ShareableDatabase { .map_err(Into::into) } + fn transaction_block(&self, id: TxNumber) -> Result> { + self.db + .view(|tx| { + let mut cursor = tx.cursor_read::()?; + cursor.seek(id).map(|b| b.map(|(_, bn)| bn)) + })? + .map_err(Into::into) + } + fn transactions_by_block(&self, id: BlockId) -> Result>> { if let Some(number) = self.block_number_for_id(id)? { let tx = self.db.tx()?; diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 19d4545eac..9d0a4fa400 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -23,6 +23,7 @@ pub fn assert_genesis_block(tx: &Transaction<'_, DB>, g: SealedBlo assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); + assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 176861b60b..b484ae436e 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -149,6 +149,10 @@ impl TransactionsProvider for MockEthProvider { .find_map(|(_, block)| block.body.iter().find(|tx| tx.hash == hash).cloned())) } + fn transaction_block(&self, _id: TxNumber) -> Result> { + unimplemented!() + } + fn transactions_by_block(&self, id: BlockId) -> Result>> { Ok(self.block(id)?.map(|b| b.body)) } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index b8f151e42e..7a921ccbec 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -56,6 +56,10 @@ impl TransactionsProvider for NoopProvider { Ok(None) } + fn transaction_block(&self, _id: TxNumber) -> Result> { + todo!() + } + fn transactions_by_block(&self, _block_id: BlockId) -> Result>> { Ok(None) } diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index 7919b9ca26..be34b6552a 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -12,6 +12,9 @@ pub trait TransactionsProvider: BlockIdProvider + Send + Sync { /// Get transaction by transaction hash. fn transaction_by_hash(&self, hash: TxHash) -> Result>; + /// Get transaction block number + fn transaction_block(&self, id: TxNumber) -> Result>; + /// Get transactions by block id. fn transactions_by_block(&self, block: BlockId) -> Result>>; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index a4268c8df9..98426050d0 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -650,6 +650,12 @@ where self.get_or_take::( first_transaction..=last_transaction, )?; + + // rm Transaction block index if there are transaction present + if !transactions.is_empty() { + let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; + self.get_or_take::(tx_id_range)?; + } } // Merge transaction into blocks @@ -721,7 +727,7 @@ where let block_header_hashes_iter = block_header_hashes.into_iter(); let block_tx_iter = block_tx.into_iter(); - // can be not found in tables + // Ommers can be empty for some blocks let mut block_ommers_iter = block_ommers.into_iter(); let mut block_withdrawals_iter = block_withdrawals.into_iter(); let mut block_ommers = block_ommers_iter.next(); @@ -1479,10 +1485,13 @@ pub enum TransactionError { #[cfg(test)] mod test { - use crate::{insert_canonical_block, test_utils::blocks::*, Transaction}; + use crate::{ + insert_canonical_block, test_utils::blocks::*, ShareableDatabase, Transaction, + TransactionsProvider, + }; use reth_db::{mdbx::test_utils::create_test_rw_db, tables, transaction::DbTxMut}; use reth_primitives::{proofs::EMPTY_ROOT, ChainSpecBuilder, TransitionId, MAINNET}; - use std::ops::DerefMut; + use std::{ops::DerefMut, sync::Arc}; #[test] fn insert_get_take() { @@ -1554,6 +1563,28 @@ mod test { ) .unwrap(); + tx.commit().unwrap(); + + // Check that transactions map onto blocks correctly. + { + let provider = ShareableDatabase::new(tx.db, Arc::new(MAINNET.clone())); + assert_eq!( + provider.transaction_block(0).unwrap(), + Some(1), + "Transaction 0 should be in block 1" + ); + assert_eq!( + provider.transaction_block(1).unwrap(), + Some(2), + "Transaction 1 should be in block 2" + ); + assert_eq!( + provider.transaction_block(2).unwrap(), + None, + "Transaction 0 should not exist" + ); + } + // get second block let get = tx.get_block_and_execution_range(&chain_spec, 2..=2).unwrap(); assert_eq!(get, vec![(block2.clone(), exec_res2.clone())]); diff --git a/crates/storage/provider/src/utils.rs b/crates/storage/provider/src/utils.rs index 1262328a81..169fa401e8 100644 --- a/crates/storage/provider/src/utils.rs +++ b/crates/storage/provider/src/utils.rs @@ -70,6 +70,14 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 }, )?; + if !block.body.is_empty() { + // -1 is here as current_tx_id points to the next transaction. + tx.put::( + current_tx_id + block.body.len() as u64 - 1, + block.number, + )?; + } + let senders_len = senders.as_ref().map(|s| s.len()); let tx_iter = if Some(block.body.len()) == senders_len { block.body.into_iter().zip(senders.unwrap().into_iter()).collect::>()