feat(sync): TxLookup stage (#972)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Georgios Konstantopoulos
2023-01-25 14:05:50 -05:00
committed by GitHub
parent 99c52e55ba
commit ed239e868c
7 changed files with 416 additions and 124 deletions

View File

@@ -39,7 +39,6 @@ pub(crate) const BODIES: StageId = StageId("Bodies");
///
/// - [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers]
/// - [`Transactions`][reth_interfaces::db::tables::Transactions]
/// - [`TransactionHashNumber`][reth_interfaces::db::tables::TransactionHashNumber]
///
/// # Genesis
///
@@ -49,7 +48,6 @@ pub(crate) const BODIES: StageId = StageId("Bodies");
/// - The [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] table
/// - The [`CumulativeTxCount`][reth_interfaces::db::tables::CumulativeTxCount] table
/// - The [`Transactions`][reth_interfaces::db::tables::Transactions] table
/// - The [`TransactionHashNumber`][reth_interfaces::db::tables::TransactionHashNumber] table
#[derive(Debug)]
pub struct BodyStage<D: BodyDownloader> {
/// The body downloader.
@@ -128,12 +126,12 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Write transactions
for transaction in block.body {
// Insert the transaction hash to number mapping
tx.put::<tables::TxHashNumber>(transaction.hash(), current_tx_id)?;
// Append the transaction
tx_cursor.append(current_tx_id, transaction)?;
tx_transition_cursor.append(current_tx_id, transition_id)?;
// Increment transaction id for each transaction.
current_tx_id += 1;
// Increment transition id for each transaction.
transition_id += 1;
}
}
@@ -173,11 +171,10 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding");
// Cursors to unwind bodies, ommers, transactions and tx hash to number
// Cursors to unwind bodies, ommers
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut transaction_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
// Cursors to unwind transitions
let mut block_transition_cursor = tx.cursor_write::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
@@ -200,12 +197,9 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
// First delete the transaction and hash to id mapping
if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? {
// First delete the transaction
if transaction_cursor.seek_exact(tx_id)?.is_some() {
transaction_cursor.delete_current()?;
if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() {
tx_hash_number_cursor.delete_current()?;
}
}
// Delete the transaction transition if any
if tx_transition_cursor.seek_exact(tx_id)?.is_some() {
@@ -367,9 +361,8 @@ mod tests {
.tx()
.commit(|tx| {
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let (_, transaction) = tx_cursor.last()?.expect("Could not read last transaction");
tx_cursor.last()?.expect("Could not read last transaction");
tx_cursor.delete_current()?;
tx.delete::<tables::TxHashNumber>(transaction.hash, None)?;
Ok(())
})
.expect("Could not delete a transaction");
@@ -512,7 +505,6 @@ mod tests {
};
body.tx_id_range().try_for_each(|tx_id| {
let transaction = random_signed_tx();
tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(tx_id, transaction)?;
tx.put::<tables::TxTransitionIndex>(tx_id, tx_id)
})?;
@@ -550,27 +542,25 @@ mod tests {
impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.tx.check_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
key.number()
})?;
self.tx.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number()
})?;
self.tx.check_no_entry_above::<tables::BlockTransitionIndex, _>(
self.tx
.ensure_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
key.number()
})?;
self.tx
.ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number()
})?;
self.tx.ensure_no_entry_above::<tables::BlockTransitionIndex, _>(
input.unwind_to,
|key| key,
)?;
if let Some(last_tx_id) = self.get_last_tx_id()? {
self.tx
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
self.tx.check_no_entry_above::<tables::TxTransitionIndex, _>(
.ensure_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
self.tx.ensure_no_entry_above::<tables::TxTransitionIndex, _>(
last_tx_id,
|key| key,
)?;
self.tx.check_no_entry_above_by_value::<tables::TxHashNumber, _>(
last_tx_id,
|value| value,
)?;
}
Ok(())
}
@@ -604,7 +594,6 @@ mod tests {
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let mut block_transition_cursor = tx.cursor_read::<tables::BlockTransitionIndex>()?;
let mut transaction_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut tx_hash_num_cursor = tx.cursor_read::<tables::TxHashNumber>()?;
let mut tx_transition_cursor = tx.cursor_read::<tables::TxTransitionIndex>()?;
let first_body_key = match bodies_cursor.first()? {
@@ -613,7 +602,7 @@ mod tests {
};
let mut prev_key: Option<BlockNumHash> = None;
let mut current_transition_id = 0;
let mut expected_transition_id = 0;
for entry in bodies_cursor.walk(first_body_key)? {
let (key, body) = entry?;
@@ -646,23 +635,19 @@ mod tests {
let tx_entry = transaction_cursor.seek_exact(tx_id)?;
assert!(tx_entry.is_some(), "Transaction is missing.");
assert_eq!(
tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, current_transition_id
);
current_transition_id += 1;
assert_matches!(
tx_hash_num_cursor.seek_exact(tx_entry.unwrap().1.hash),
Ok(Some(_)),
"Transaction hash to index mapping is missing."
tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, expected_transition_id
);
// Increment expected id for each transaction transition.
expected_transition_id += 1;
}
// for block reward
// Increment expected id for block reward.
if self.consensus.has_block_reward(key.number()) {
current_transition_id += 1;
expected_transition_id += 1;
}
// Validate that block transition exists
assert_eq!(block_transition_cursor.seek_exact(key.number()).expect("To be okay").expect("Block transition to be present").1,current_transition_id);
assert_eq!(block_transition_cursor.seek_exact(key.number()).expect("To be okay").expect("Block transition to be present").1,expected_transition_id);
prev_key = Some(key);

View File

@@ -462,9 +462,9 @@ mod tests {
block: BlockNumber,
) -> Result<(), TestRunnerError> {
self.tx
.check_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
self.tx.check_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
self.tx.check_no_entry_above::<tables::Headers, _>(block, |key| key.number())?;
.ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
self.tx.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
self.tx.ensure_no_entry_above::<tables::Headers, _>(block, |key| key.number())?;
Ok(())
}
}

View File

@@ -14,3 +14,5 @@ pub mod merkle;
pub mod sender_recovery;
/// The total difficulty stage
pub mod total_difficulty;
/// The transaction lookup stage
pub mod tx_lookup;

View File

@@ -127,7 +127,6 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use reth_db::models::StoredBlockBody;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{BlockNumber, SealedBlock, H256};
@@ -151,18 +150,14 @@ mod tests {
stage_progress: Some(stage_progress),
};
let mut current_tx_id = 0;
let stage_progress = input.stage_progress.unwrap_or_default();
// Insert blocks with a single transaction at block `stage_progress + 10`
(stage_progress..input.previous_stage_progress() + 1)
.map(|number| -> Result<SealedBlock, TestRunnerError> {
let tx_count = Some((number == stage_progress + 10) as u8);
let block = random_block(number, None, tx_count, None);
current_tx_id = runner.insert_block(current_tx_id, &block, false)?;
Ok(block)
let non_empty_block_number = stage_progress + 10;
let blocks = (stage_progress..input.previous_stage_progress() + 1)
.map(|number| {
random_block(number, None, Some((number == non_empty_block_number) as u8), None)
})
.collect::<Result<Vec<_>, _>>()
.expect("failed to insert blocks");
.collect::<Vec<_>>();
runner.tx.insert_blocks(blocks.iter(), None).expect("failed to insert blocks");
let rx = runner.execute(input);
@@ -232,6 +227,29 @@ mod tests {
fn set_threshold(&mut self, threshold: u64) {
self.threshold = threshold;
}
/// # Panics
///
/// 1. If there are any entries in the [tables::TxSenders] table above
/// a given block number.
///
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxSenders] is not empty.
fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {
Ok(body) => self
.tx
.ensure_no_entry_above::<tables::TxSenders, _>(body.last_tx_index(), |key| {
key
})?,
Err(_) => {
assert!(self.tx.table_is_empty::<tables::TxSenders>()?);
}
};
Ok(())
}
}
impl StageTestRunner for SenderRecoveryTestRunner {
@@ -254,12 +272,7 @@ mod tests {
let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(stage_progress..end, H256::zero(), 0..2);
let mut current_tx_id = 0;
blocks.iter().try_for_each(|b| -> Result<(), TestRunnerError> {
current_tx_id = self.insert_block(current_tx_id, b, b.number == stage_progress)?;
Ok(())
})?;
self.tx.insert_blocks(blocks.iter(), None)?;
Ok(blocks)
}
@@ -268,8 +281,8 @@ mod tests {
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
self.tx.query(|tx| {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
@@ -293,10 +306,11 @@ mod tests {
}
Ok(())
})?;
} else {
self.check_no_senders_by_block(input.stage_progress.unwrap_or_default())?;
}
})?,
None => {
self.ensure_no_senders_by_block(input.stage_progress.unwrap_or_default())?
}
};
Ok(())
}
@@ -304,59 +318,7 @@ mod tests {
impl UnwindStageTestRunner for SenderRecoveryTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.check_no_senders_by_block(input.unwind_to)
}
}
impl SenderRecoveryTestRunner {
fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {
Ok(body) => self
.tx
.check_no_entry_above::<tables::TxSenders, _>(body.last_tx_index(), |key| {
key
})?,
Err(_) => {
assert!(self.tx.table_is_empty::<tables::TxSenders>()?);
}
};
Ok(())
}
fn insert_block(
&self,
tx_offset: u64,
block: &SealedBlock,
insert_senders: bool,
) -> Result<u64, TestRunnerError> {
let mut current_tx_id = tx_offset;
let txs = block.body.clone();
self.tx.commit(|tx| {
let numhash = block.header.num_hash().into();
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
tx.put::<tables::BlockBodies>(
numhash,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: txs.len() as u64 },
)?;
for body_tx in txs {
// Insert senders for previous stage progress
if insert_senders {
tx.put::<tables::TxSenders>(
current_tx_id,
body_tx.recover_signer().expect("failed to recover sender"),
)?;
}
tx.put::<tables::Transactions>(current_tx_id, body_tx)?;
current_tx_id += 1;
}
Ok(())
})?;
Ok(current_tx_id)
self.ensure_no_senders_by_block(input.unwind_to)
}
}
}

View File

@@ -237,7 +237,7 @@ mod tests {
impl TotalDifficultyTestRunner {
fn check_no_td_above(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
self.tx.check_no_entry_above::<tables::HeaderTD, _>(block, |key| key.number())?;
self.tx.ensure_no_entry_above::<tables::HeaderTD, _>(block, |key| key.number())?;
Ok(())
}

View File

@@ -0,0 +1,304 @@
use crate::{
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use tracing::*;
const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup");
/// The transaction lookup stage.
///
/// This stage walks over the bodies table, and sets the transaction hash of each transaction in a
/// block to the corresponding `TransitionId` at each block. This is written to the
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
#[derive(Debug)]
pub struct TransactionLookupStage {
/// The number of table entries to commit at once
commit_threshold: u64,
}
impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for TransactionLookupStage {
/// Return the id of the stage
fn id(&self) -> StageId {
TRANSACTION_LOOKUP
}
/// Write total difficulty entries
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::transaction_lookup");
debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync");
let mut cursor_bodies = tx.cursor_read::<tables::BlockBodies>()?;
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let start_key = tx.get_block_numhash(start_block)?;
// Walk over block bodies within a specified range.
let bodies = cursor_bodies.walk(start_key)?.take_while(|entry| {
entry
.as_ref()
.map(|(block_num_hash, _)| block_num_hash.number() <= end_block)
.unwrap_or_default()
});
// Collect tranasctions for each body and insert the reverse lookup for hash -> tx_id.
for body_entry in bodies {
let (_, body) = body_entry?;
let transactions = tx_cursor.walk(body.start_tx_id)?.take(body.tx_count as usize);
for tx_entry in transactions {
let (id, transaction) = tx_entry?;
tx.put::<tables::TxHashNumber>(transaction.hash(), id)?;
}
}
let done = !capped;
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, done, "Sync iteration finished");
Ok(ExecOutput { done, stage_progress: end_block })
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding");
// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
let mut transaction_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((key, body)) = rev_walker.next().transpose()? {
if key.number() <= input.unwind_to {
break
}
// Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
// First delete the transaction and hash to id mapping
if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? {
if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() {
tx_hash_number_cursor.delete_current()?;
}
}
}
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{BlockNumber, SealedBlock, H256};
// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
#[tokio::test]
async fn execute_single_transaction_lookup() {
let (previous_stage, stage_progress) = (500, 100);
// Set up the runner
let runner = TransactionLookupTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
// Insert blocks with a single transaction at block `stage_progress + 10`
let non_empty_block_number = stage_progress + 10;
let blocks = (stage_progress..input.previous_stage_progress() + 1)
.map(|number| {
random_block(number, None, Some((number == non_empty_block_number) as u8), None)
})
.collect::<Vec<_>>();
runner.tx.insert_blocks(blocks.iter(), None).expect("failed to insert blocks");
let rx = runner.execute(input);
// Assert the successful result
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
/// Execute the stage twice with input range that exceeds the commit threshold
#[tokio::test]
async fn execute_intermediate_commit_transaction_lookup() {
let threshold = 50;
let mut runner = TransactionLookupTestRunner::default();
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
// Seed only once with full input range
runner.seed_execution(first_input).expect("failed to seed execution");
// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { done: false, stage_progress })
if stage_progress == expected_progress
);
// Execute second time
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(expected_progress),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == previous_stage
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
struct TransactionLookupTestRunner {
tx: TestTransaction,
threshold: u64,
}
impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { threshold: 1000, tx: TestTransaction::default() }
}
}
impl TransactionLookupTestRunner {
fn set_threshold(&mut self, threshold: u64) {
self.threshold = threshold;
}
/// # Panics
///
/// 1. If there are any entries in the [tables::TxHashNumber] table above
/// a given block number.
///
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxHashNumber] is not empty.
fn ensure_no_hash_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {
Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>(
body.last_tx_index(),
|key| key,
)?,
Err(_) => {
assert!(self.tx.table_is_empty::<tables::TxHashNumber>()?);
}
};
Ok(())
}
}
impl StageTestRunner for TransactionLookupTestRunner {
type S = TransactionLookupStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
TransactionLookupStage { commit_threshold: self.threshold }
}
}
impl ExecuteStageTestRunner for TransactionLookupTestRunner {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(stage_progress..end, H256::zero(), 0..2);
self.tx.insert_blocks(blocks.iter(), None)?;
Ok(blocks)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
if start_block > end_block {
return Ok(())
}
let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
body_cursor.seek_exact((start_block, start_hash).into())?;
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_id_range() {
let transaction = tx
.get::<tables::Transactions>(tx_id)?
.expect("no transaction entry");
assert_eq!(
Some(tx_id),
tx.get::<tables::TxHashNumber>(transaction.hash)?,
);
}
}
Ok(())
})?,
None => self.ensure_no_hash_by_block(input.stage_progress.unwrap_or_default())?,
};
Ok(())
}
}
impl UnwindStageTestRunner for TransactionLookupTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.ensure_no_hash_by_block(input.unwind_to)
}
}
}

View File

@@ -1,13 +1,13 @@
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
mdbx::{test_utils::create_test_db, tx::Tx, Env, EnvKind, WriteMap, RW},
models::BlockNumHash,
models::{BlockNumHash, StoredBlockBody},
table::Table,
tables,
transaction::{DbTx, DbTxMut},
Error as DbError,
};
use reth_primitives::{BlockNumber, SealedHeader};
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader};
use std::{borrow::Borrow, sync::Arc};
use crate::db::Transaction;
@@ -135,7 +135,7 @@ impl TestTransaction {
/// Check that there is no table entry above a given
/// number by [Table::Key]
pub(crate) fn check_no_entry_above<T, F>(
pub(crate) fn ensure_no_entry_above<T, F>(
&self,
num: u64,
mut selector: F,
@@ -155,7 +155,7 @@ impl TestTransaction {
/// Check that there is no table entry above a given
/// number by [Table::Value]
pub(crate) fn check_no_entry_above_by_value<T, F>(
pub(crate) fn ensure_no_entry_above_by_value<T, F>(
&self,
num: u64,
mut selector: F,
@@ -184,7 +184,7 @@ impl TestTransaction {
let headers = headers.collect::<Vec<_>>();
for header in headers {
let key: BlockNumHash = (header.number, header.hash()).into();
let key: BlockNumHash = header.num_hash().into();
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
@@ -194,4 +194,43 @@ impl TestTransaction {
Ok(())
})
}
/// Insert ordered collection of [SealedBlock] into corresponding tables.
/// Superset functionality of [TestTransaction::insert_headers].
pub(crate) fn insert_blocks<'a, I>(
&self,
blocks: I,
tx_offset: Option<u64>,
) -> Result<(), DbError>
where
I: Iterator<Item = &'a SealedBlock>,
{
self.commit(|tx| {
let mut current_tx_id = tx_offset.unwrap_or_default();
for block in blocks {
let key: BlockNumHash = block.num_hash().into();
// Insert into header tables.
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
tx.put::<tables::Headers>(key, block.header.clone().unseal())?;
// Insert into body tables.
tx.put::<tables::BlockBodies>(
key,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
},
)?;
for body_tx in block.body.clone() {
tx.put::<tables::Transactions>(current_tx_id, body_tx)?;
current_tx_id += 1;
}
}
Ok(())
})
}
}