From ed239e868c6e94ce51504ce25afbce6118e8c7b8 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Wed, 25 Jan 2023 14:05:50 -0500 Subject: [PATCH] feat(sync): `TxLookup` stage (#972) Co-authored-by: Roman Krasiuk --- crates/stages/src/stages/bodies.rs | 63 ++-- crates/stages/src/stages/headers.rs | 6 +- crates/stages/src/stages/mod.rs | 2 + crates/stages/src/stages/sender_recovery.rs | 114 +++---- crates/stages/src/stages/total_difficulty.rs | 2 +- crates/stages/src/stages/tx_lookup.rs | 304 +++++++++++++++++++ crates/stages/src/test_utils/test_db.rs | 49 ++- 7 files changed, 416 insertions(+), 124 deletions(-) create mode 100644 crates/stages/src/stages/tx_lookup.rs diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 55f1fd26ea..9e06cbb281 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -39,7 +39,6 @@ pub(crate) const BODIES: StageId = StageId("Bodies"); /// /// - [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] /// - [`Transactions`][reth_interfaces::db::tables::Transactions] -/// - [`TransactionHashNumber`][reth_interfaces::db::tables::TransactionHashNumber] /// /// # Genesis /// @@ -49,7 +48,6 @@ pub(crate) const BODIES: StageId = StageId("Bodies"); /// - The [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] table /// - The [`CumulativeTxCount`][reth_interfaces::db::tables::CumulativeTxCount] table /// - The [`Transactions`][reth_interfaces::db::tables::Transactions] table -/// - The [`TransactionHashNumber`][reth_interfaces::db::tables::TransactionHashNumber] table #[derive(Debug)] pub struct BodyStage { /// The body downloader. @@ -128,12 +126,12 @@ impl Stage for BodyStage { // Write transactions for transaction in block.body { - // Insert the transaction hash to number mapping - tx.put::(transaction.hash(), current_tx_id)?; // Append the transaction tx_cursor.append(current_tx_id, transaction)?; tx_transition_cursor.append(current_tx_id, transition_id)?; + // Increment transaction id for each transaction. current_tx_id += 1; + // Increment transition id for each transaction. transition_id += 1; } } @@ -173,11 +171,10 @@ impl Stage for BodyStage { input: UnwindInput, ) -> Result { info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding"); - // Cursors to unwind bodies, ommers, transactions and tx hash to number + // Cursors to unwind bodies, ommers let mut body_cursor = tx.cursor_write::()?; let mut ommers_cursor = tx.cursor_write::()?; let mut transaction_cursor = tx.cursor_write::()?; - let mut tx_hash_number_cursor = tx.cursor_write::()?; // Cursors to unwind transitions let mut block_transition_cursor = tx.cursor_write::()?; let mut tx_transition_cursor = tx.cursor_write::()?; @@ -200,12 +197,9 @@ impl Stage for BodyStage { // Delete all transactions that belong to this block for tx_id in body.tx_id_range() { - // First delete the transaction and hash to id mapping - if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { + // First delete the transaction + if transaction_cursor.seek_exact(tx_id)?.is_some() { transaction_cursor.delete_current()?; - if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { - tx_hash_number_cursor.delete_current()?; - } } // Delete the transaction transition if any if tx_transition_cursor.seek_exact(tx_id)?.is_some() { @@ -367,9 +361,8 @@ mod tests { .tx() .commit(|tx| { let mut tx_cursor = tx.cursor_write::()?; - let (_, transaction) = tx_cursor.last()?.expect("Could not read last transaction"); + tx_cursor.last()?.expect("Could not read last transaction"); tx_cursor.delete_current()?; - tx.delete::(transaction.hash, None)?; Ok(()) }) .expect("Could not delete a transaction"); @@ -512,7 +505,6 @@ mod tests { }; body.tx_id_range().try_for_each(|tx_id| { let transaction = random_signed_tx(); - tx.put::(transaction.hash(), tx_id)?; tx.put::(tx_id, transaction)?; tx.put::(tx_id, tx_id) })?; @@ -550,27 +542,25 @@ mod tests { impl UnwindStageTestRunner for BodyTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.tx.check_no_entry_above::(input.unwind_to, |key| { - key.number() - })?; - self.tx.check_no_entry_above::(input.unwind_to, |key| { - key.number() - })?; - self.tx.check_no_entry_above::( + self.tx + .ensure_no_entry_above::(input.unwind_to, |key| { + key.number() + })?; + self.tx + .ensure_no_entry_above::(input.unwind_to, |key| { + key.number() + })?; + self.tx.ensure_no_entry_above::( input.unwind_to, |key| key, )?; if let Some(last_tx_id) = self.get_last_tx_id()? { self.tx - .check_no_entry_above::(last_tx_id, |key| key)?; - self.tx.check_no_entry_above::( + .ensure_no_entry_above::(last_tx_id, |key| key)?; + self.tx.ensure_no_entry_above::( last_tx_id, |key| key, )?; - self.tx.check_no_entry_above_by_value::( - last_tx_id, - |value| value, - )?; } Ok(()) } @@ -604,7 +594,6 @@ mod tests { let mut ommers_cursor = tx.cursor_read::()?; let mut block_transition_cursor = tx.cursor_read::()?; let mut transaction_cursor = tx.cursor_read::()?; - let mut tx_hash_num_cursor = tx.cursor_read::()?; let mut tx_transition_cursor = tx.cursor_read::()?; let first_body_key = match bodies_cursor.first()? { @@ -613,7 +602,7 @@ mod tests { }; let mut prev_key: Option = None; - let mut current_transition_id = 0; + let mut expected_transition_id = 0; for entry in bodies_cursor.walk(first_body_key)? { let (key, body) = entry?; @@ -646,23 +635,19 @@ mod tests { let tx_entry = transaction_cursor.seek_exact(tx_id)?; assert!(tx_entry.is_some(), "Transaction is missing."); assert_eq!( - tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, current_transition_id - ); - current_transition_id += 1; - assert_matches!( - tx_hash_num_cursor.seek_exact(tx_entry.unwrap().1.hash), - Ok(Some(_)), - "Transaction hash to index mapping is missing." + tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, expected_transition_id ); + // Increment expected id for each transaction transition. + expected_transition_id += 1; } - // for block reward + // Increment expected id for block reward. if self.consensus.has_block_reward(key.number()) { - current_transition_id += 1; + expected_transition_id += 1; } // Validate that block transition exists - assert_eq!(block_transition_cursor.seek_exact(key.number()).expect("To be okay").expect("Block transition to be present").1,current_transition_id); + assert_eq!(block_transition_cursor.seek_exact(key.number()).expect("To be okay").expect("Block transition to be present").1,expected_transition_id); prev_key = Some(key); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 9bdf11b740..40a0d35bf6 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -462,9 +462,9 @@ mod tests { block: BlockNumber, ) -> Result<(), TestRunnerError> { self.tx - .check_no_entry_above_by_value::(block, |val| val)?; - self.tx.check_no_entry_above::(block, |key| key)?; - self.tx.check_no_entry_above::(block, |key| key.number())?; + .ensure_no_entry_above_by_value::(block, |val| val)?; + self.tx.ensure_no_entry_above::(block, |key| key)?; + self.tx.ensure_no_entry_above::(block, |key| key.number())?; Ok(()) } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 88dd5e17a2..521dfa9270 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -14,3 +14,5 @@ pub mod merkle; pub mod sender_recovery; /// The total difficulty stage pub mod total_difficulty; +/// The transaction lookup stage +pub mod tx_lookup; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index d94e7535f2..893a3dab0c 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -127,7 +127,6 @@ impl Stage for SenderRecoveryStage { #[cfg(test)] mod tests { use assert_matches::assert_matches; - use reth_db::models::StoredBlockBody; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_primitives::{BlockNumber, SealedBlock, H256}; @@ -151,18 +150,14 @@ mod tests { stage_progress: Some(stage_progress), }; - let mut current_tx_id = 0; - let stage_progress = input.stage_progress.unwrap_or_default(); // Insert blocks with a single transaction at block `stage_progress + 10` - (stage_progress..input.previous_stage_progress() + 1) - .map(|number| -> Result { - let tx_count = Some((number == stage_progress + 10) as u8); - let block = random_block(number, None, tx_count, None); - current_tx_id = runner.insert_block(current_tx_id, &block, false)?; - Ok(block) + let non_empty_block_number = stage_progress + 10; + let blocks = (stage_progress..input.previous_stage_progress() + 1) + .map(|number| { + random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) - .collect::, _>>() - .expect("failed to insert blocks"); + .collect::>(); + runner.tx.insert_blocks(blocks.iter(), None).expect("failed to insert blocks"); let rx = runner.execute(input); @@ -232,6 +227,29 @@ mod tests { fn set_threshold(&mut self, threshold: u64) { self.threshold = threshold; } + + /// # Panics + /// + /// 1. If there are any entries in the [tables::TxSenders] table above + /// a given block number. + /// + /// 2. If the is no requested block entry in the bodies table, + /// but [tables::TxSenders] is not empty. + fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { + let body_result = self.tx.inner().get_block_body_by_num(block); + match body_result { + Ok(body) => self + .tx + .ensure_no_entry_above::(body.last_tx_index(), |key| { + key + })?, + Err(_) => { + assert!(self.tx.table_is_empty::()?); + } + }; + + Ok(()) + } } impl StageTestRunner for SenderRecoveryTestRunner { @@ -254,12 +272,7 @@ mod tests { let end = input.previous_stage_progress() + 1; let blocks = random_block_range(stage_progress..end, H256::zero(), 0..2); - - let mut current_tx_id = 0; - blocks.iter().try_for_each(|b| -> Result<(), TestRunnerError> { - current_tx_id = self.insert_block(current_tx_id, b, b.number == stage_progress)?; - Ok(()) - })?; + self.tx.insert_blocks(blocks.iter(), None)?; Ok(blocks) } @@ -268,8 +281,8 @@ mod tests { input: ExecInput, output: Option, ) -> Result<(), TestRunnerError> { - if let Some(output) = output { - self.tx.query(|tx| { + match output { + Some(output) => self.tx.query(|tx| { let start_block = input.stage_progress.unwrap_or_default() + 1; let end_block = output.stage_progress; @@ -293,10 +306,11 @@ mod tests { } Ok(()) - })?; - } else { - self.check_no_senders_by_block(input.stage_progress.unwrap_or_default())?; - } + })?, + None => { + self.ensure_no_senders_by_block(input.stage_progress.unwrap_or_default())? + } + }; Ok(()) } @@ -304,59 +318,7 @@ mod tests { impl UnwindStageTestRunner for SenderRecoveryTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.check_no_senders_by_block(input.unwind_to) - } - } - - impl SenderRecoveryTestRunner { - fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner().get_block_body_by_num(block); - match body_result { - Ok(body) => self - .tx - .check_no_entry_above::(body.last_tx_index(), |key| { - key - })?, - Err(_) => { - assert!(self.tx.table_is_empty::()?); - } - }; - - Ok(()) - } - - fn insert_block( - &self, - tx_offset: u64, - block: &SealedBlock, - insert_senders: bool, - ) -> Result { - let mut current_tx_id = tx_offset; - let txs = block.body.clone(); - - self.tx.commit(|tx| { - let numhash = block.header.num_hash().into(); - tx.put::(block.number, block.hash())?; - tx.put::( - numhash, - StoredBlockBody { start_tx_id: current_tx_id, tx_count: txs.len() as u64 }, - )?; - - for body_tx in txs { - // Insert senders for previous stage progress - if insert_senders { - tx.put::( - current_tx_id, - body_tx.recover_signer().expect("failed to recover sender"), - )?; - } - tx.put::(current_tx_id, body_tx)?; - current_tx_id += 1; - } - Ok(()) - })?; - - Ok(current_tx_id) + self.ensure_no_senders_by_block(input.unwind_to) } } } diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index faa2ef4946..387433696f 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -237,7 +237,7 @@ mod tests { impl TotalDifficultyTestRunner { fn check_no_td_above(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - self.tx.check_no_entry_above::(block, |key| key.number())?; + self.tx.ensure_no_entry_above::(block, |key| key.number())?; Ok(()) } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs new file mode 100644 index 0000000000..5c9947feeb --- /dev/null +++ b/crates/stages/src/stages/tx_lookup.rs @@ -0,0 +1,304 @@ +use crate::{ + db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, + UnwindInput, UnwindOutput, +}; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + database::Database, + tables, + transaction::{DbTx, DbTxMut}, +}; +use tracing::*; + +const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); + +/// The transaction lookup stage. +/// +/// This stage walks over the bodies table, and sets the transaction hash of each transaction in a +/// block to the corresponding `TransitionId` at each block. This is written to the +/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. +#[derive(Debug)] +pub struct TransactionLookupStage { + /// The number of table entries to commit at once + commit_threshold: u64, +} + +impl TransactionLookupStage { + /// Create new instance of [TransactionLookupStage]. + pub fn new(commit_threshold: u64) -> Self { + Self { commit_threshold } + } +} + +#[async_trait::async_trait] +impl Stage for TransactionLookupStage { + /// Return the id of the stage + fn id(&self) -> StageId { + TRANSACTION_LOOKUP + } + + /// Write total difficulty entries + async fn execute( + &mut self, + tx: &mut Transaction<'_, DB>, + input: ExecInput, + ) -> Result { + let ((start_block, end_block), capped) = + exec_or_return!(input, self.commit_threshold, "sync::stages::transaction_lookup"); + + debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync"); + + let mut cursor_bodies = tx.cursor_read::()?; + let mut tx_cursor = tx.cursor_write::()?; + let start_key = tx.get_block_numhash(start_block)?; + + // Walk over block bodies within a specified range. + let bodies = cursor_bodies.walk(start_key)?.take_while(|entry| { + entry + .as_ref() + .map(|(block_num_hash, _)| block_num_hash.number() <= end_block) + .unwrap_or_default() + }); + + // Collect tranasctions for each body and insert the reverse lookup for hash -> tx_id. + for body_entry in bodies { + let (_, body) = body_entry?; + let transactions = tx_cursor.walk(body.start_tx_id)?.take(body.tx_count as usize); + + for tx_entry in transactions { + let (id, transaction) = tx_entry?; + tx.put::(transaction.hash(), id)?; + } + } + + let done = !capped; + info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, done, "Sync iteration finished"); + Ok(ExecOutput { done, stage_progress: end_block }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + tx: &mut Transaction<'_, DB>, + input: UnwindInput, + ) -> Result { + info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding"); + // Cursors to unwind tx hash to number + let mut body_cursor = tx.cursor_write::()?; + let mut tx_hash_number_cursor = tx.cursor_write::()?; + let mut transaction_cursor = tx.cursor_write::()?; + let mut rev_walker = body_cursor.walk_back(None)?; + while let Some((key, body)) = rev_walker.next().transpose()? { + if key.number() <= input.unwind_to { + break + } + + // Delete all transactions that belong to this block + for tx_id in body.tx_id_range() { + // First delete the transaction and hash to id mapping + if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { + if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { + tx_hash_number_cursor.delete_current()?; + } + } + } + } + + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + }; + use assert_matches::assert_matches; + use reth_interfaces::test_utils::generators::{random_block, random_block_range}; + use reth_primitives::{BlockNumber, SealedBlock, H256}; + + // Implement stage test suite. + stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); + + #[tokio::test] + async fn execute_single_transaction_lookup() { + let (previous_stage, stage_progress) = (500, 100); + + // Set up the runner + let runner = TransactionLookupTestRunner::default(); + let input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(stage_progress), + }; + + // Insert blocks with a single transaction at block `stage_progress + 10` + let non_empty_block_number = stage_progress + 10; + let blocks = (stage_progress..input.previous_stage_progress() + 1) + .map(|number| { + random_block(number, None, Some((number == non_empty_block_number) as u8), None) + }) + .collect::>(); + runner.tx.insert_blocks(blocks.iter(), None).expect("failed to insert blocks"); + + let rx = runner.execute(input); + + // Assert the successful result + let result = rx.await.unwrap(); + assert_matches!( + result, + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage + ); + + // Validate the stage execution + assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + } + + /// Execute the stage twice with input range that exceeds the commit threshold + #[tokio::test] + async fn execute_intermediate_commit_transaction_lookup() { + let threshold = 50; + let mut runner = TransactionLookupTestRunner::default(); + runner.set_threshold(threshold); + let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold + let first_input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(stage_progress), + }; + + // Seed only once with full input range + runner.seed_execution(first_input).expect("failed to seed execution"); + + // Execute first time + let result = runner.execute(first_input).await.unwrap(); + let expected_progress = stage_progress + threshold; + assert_matches!( + result, + Ok(ExecOutput { done: false, stage_progress }) + if stage_progress == expected_progress + ); + + // Execute second time + let second_input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(expected_progress), + }; + let result = runner.execute(second_input).await.unwrap(); + assert_matches!( + result, + Ok(ExecOutput { done: true, stage_progress }) + if stage_progress == previous_stage + ); + + assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); + } + + struct TransactionLookupTestRunner { + tx: TestTransaction, + threshold: u64, + } + + impl Default for TransactionLookupTestRunner { + fn default() -> Self { + Self { threshold: 1000, tx: TestTransaction::default() } + } + } + + impl TransactionLookupTestRunner { + fn set_threshold(&mut self, threshold: u64) { + self.threshold = threshold; + } + + /// # Panics + /// + /// 1. If there are any entries in the [tables::TxHashNumber] table above + /// a given block number. + /// + /// 2. If the is no requested block entry in the bodies table, + /// but [tables::TxHashNumber] is not empty. + fn ensure_no_hash_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { + let body_result = self.tx.inner().get_block_body_by_num(block); + match body_result { + Ok(body) => self.tx.ensure_no_entry_above_by_value::( + body.last_tx_index(), + |key| key, + )?, + Err(_) => { + assert!(self.tx.table_is_empty::()?); + } + }; + + Ok(()) + } + } + + impl StageTestRunner for TransactionLookupTestRunner { + type S = TransactionLookupStage; + + fn tx(&self) -> &TestTransaction { + &self.tx + } + + fn stage(&self) -> Self::S { + TransactionLookupStage { commit_threshold: self.threshold } + } + } + + impl ExecuteStageTestRunner for TransactionLookupTestRunner { + type Seed = Vec; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let stage_progress = input.stage_progress.unwrap_or_default(); + let end = input.previous_stage_progress() + 1; + + let blocks = random_block_range(stage_progress..end, H256::zero(), 0..2); + self.tx.insert_blocks(blocks.iter(), None)?; + Ok(blocks) + } + + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + match output { + Some(output) => self.tx.query(|tx| { + let start_block = input.stage_progress.unwrap_or_default() + 1; + let end_block = output.stage_progress; + + if start_block > end_block { + return Ok(()) + } + + let start_hash = tx.get::(start_block)?.unwrap(); + let mut body_cursor = tx.cursor_read::()?; + body_cursor.seek_exact((start_block, start_hash).into())?; + + while let Some((_, body)) = body_cursor.next()? { + for tx_id in body.tx_id_range() { + let transaction = tx + .get::(tx_id)? + .expect("no transaction entry"); + assert_eq!( + Some(tx_id), + tx.get::(transaction.hash)?, + ); + } + } + + Ok(()) + })?, + None => self.ensure_no_hash_by_block(input.stage_progress.unwrap_or_default())?, + }; + Ok(()) + } + } + + impl UnwindStageTestRunner for TransactionLookupTestRunner { + fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + self.ensure_no_hash_by_block(input.unwind_to) + } + } +} diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index ac58ebeb54..0e777a4ad0 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -1,13 +1,13 @@ use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, mdbx::{test_utils::create_test_db, tx::Tx, Env, EnvKind, WriteMap, RW}, - models::BlockNumHash, + models::{BlockNumHash, StoredBlockBody}, table::Table, tables, transaction::{DbTx, DbTxMut}, Error as DbError, }; -use reth_primitives::{BlockNumber, SealedHeader}; +use reth_primitives::{BlockNumber, SealedBlock, SealedHeader}; use std::{borrow::Borrow, sync::Arc}; use crate::db::Transaction; @@ -135,7 +135,7 @@ impl TestTransaction { /// Check that there is no table entry above a given /// number by [Table::Key] - pub(crate) fn check_no_entry_above( + pub(crate) fn ensure_no_entry_above( &self, num: u64, mut selector: F, @@ -155,7 +155,7 @@ impl TestTransaction { /// Check that there is no table entry above a given /// number by [Table::Value] - pub(crate) fn check_no_entry_above_by_value( + pub(crate) fn ensure_no_entry_above_by_value( &self, num: u64, mut selector: F, @@ -184,7 +184,7 @@ impl TestTransaction { let headers = headers.collect::>(); for header in headers { - let key: BlockNumHash = (header.number, header.hash()).into(); + let key: BlockNumHash = header.num_hash().into(); tx.put::(header.number, header.hash())?; tx.put::(header.hash(), header.number)?; @@ -194,4 +194,43 @@ impl TestTransaction { Ok(()) }) } + + /// Insert ordered collection of [SealedBlock] into corresponding tables. + /// Superset functionality of [TestTransaction::insert_headers]. + pub(crate) fn insert_blocks<'a, I>( + &self, + blocks: I, + tx_offset: Option, + ) -> Result<(), DbError> + where + I: Iterator, + { + self.commit(|tx| { + let mut current_tx_id = tx_offset.unwrap_or_default(); + + for block in blocks { + let key: BlockNumHash = block.num_hash().into(); + + // Insert into header tables. + tx.put::(block.number, block.hash())?; + tx.put::(block.hash(), block.number)?; + tx.put::(key, block.header.clone().unseal())?; + + // Insert into body tables. + tx.put::( + key, + StoredBlockBody { + start_tx_id: current_tx_id, + tx_count: block.body.len() as u64, + }, + )?; + for body_tx in block.body.clone() { + tx.put::(current_tx_id, body_tx)?; + current_tx_id += 1; + } + } + + Ok(()) + }) + } }