feat: Add transaction to block number index (#1866)

This commit is contained in:
rakita
2023-03-20 19:16:28 +01:00
committed by GitHub
parent c5dbdab2e4
commit 38b0bf6d1a
9 changed files with 109 additions and 12 deletions

View File

@@ -44,6 +44,7 @@ pub const BODIES: StageId = StageId("Bodies");
/// - [`BlockOmmers`][reth_db::tables::BlockOmmers]
/// - [`BlockBodies`][reth_db::tables::BlockBodies]
/// - [`Transactions`][reth_db::tables::Transactions]
/// - [`TransactionBlock`][reth_db::tables::TransactionBlock]
/// - [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex]
/// - [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex]
///
@@ -90,6 +91,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Cursors used to write bodies, ommers and transactions
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlock>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
@@ -117,13 +119,16 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut has_withdrawals = false;
match response {
BlockResponse::Full(block) => {
body_cursor.append(
block_number,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
},
)?;
let body = StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
};
body_cursor.append(block_number, body.clone())?;
// write transaction block index
if !body.is_empty() {
tx_block_cursor.append(body.last_tx_index(), block.number)?;
}
// Write transactions
for transaction in block.body {
@@ -208,6 +213,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Cursors to unwind transitions
let mut block_transition_cursor = tx.cursor_write::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlock>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((number, body)) = rev_walker.next().transpose()? {
@@ -230,6 +236,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
block_transition_cursor.delete_current()?;
}
// Delete all transaction to block values.
if !body.is_empty() && tx_block_cursor.seek_exact(body.last_tx_index())?.is_some() {
tx_block_cursor.delete_current()?;
}
// Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
// First delete the transaction
@@ -555,6 +566,12 @@ mod tests {
progress.number,
block_transition_id,
)?;
if body.tx_count != 0 {
tx.put::<tables::TransactionBlock>(
body.first_tx_index(),
progress.number,
)?;
}
tx.put::<tables::BlockBodies>(progress.number, body)?;
if !progress.ommers_hash_is_empty() {
tx.put::<tables::BlockOmmers>(
@@ -605,6 +622,10 @@ mod tests {
last_tx_id,
|key| key,
)?;
self.tx.ensure_no_entry_above::<tables::TransactionBlock, _>(
last_tx_id,
|key| key,
)?;
}
Ok(())
}
@@ -640,6 +661,7 @@ mod tests {
let mut block_transition_cursor = tx.cursor_read::<tables::BlockTransitionIndex>()?;
let mut transaction_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut tx_transition_cursor = tx.cursor_read::<tables::TxTransitionIndex>()?;
let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlock>()?;
let first_body_key = match bodies_cursor.first()? {
Some((key, _)) => key,
@@ -675,6 +697,13 @@ mod tests {
assert!(stored_ommers.is_some(), "Missing ommers entry");
}
let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_index())?.map(|(_,b)| b);
if body.tx_count == 0 {
assert_ne!(tx_block_id,Some(number));
} else {
assert_eq!(tx_block_id, Some(number));
}
for tx_id in body.tx_id_range() {
let tx_entry = transaction_cursor.seek_exact(tx_id)?;
assert!(tx_entry.is_some(), "Transaction is missing.");

View File

@@ -32,7 +32,7 @@ pub enum TableType {
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 27] = [
pub const TABLES: [(TableType, &str); 28] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
@@ -40,6 +40,7 @@ pub const TABLES: [(TableType, &str); 27] = [
(TableType::Table, BlockBodies::const_name()),
(TableType::Table, BlockOmmers::const_name()),
(TableType::Table, BlockWithdrawals::const_name()),
(TableType::Table, TransactionBlock::const_name()),
(TableType::Table, Transactions::const_name()),
(TableType::Table, TxHashNumber::const_name()),
(TableType::Table, Receipts::const_name()),
@@ -148,7 +149,7 @@ table!(
table!(
/// (Canonical only) Stores the transaction body for canonical transactions.
( Transactions ) TxNumber | TransactionSigned
( Transactions ) TxNumber | TransactionSigned
);
table!(
@@ -156,6 +157,13 @@ table!(
( TxHashNumber ) TxHash | TxNumber
);
table!(
/// Stores the mapping of transaction number to the blocks number.
///
/// The key is the highest transaction ID in the block.
( TransactionBlock ) TxNumber | BlockNumber
);
table!(
/// (Canonical only) Stores transaction receipts.
( Receipts ) TxNumber | Receipt

View File

@@ -181,6 +181,15 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
.map_err(Into::into)
}
fn transaction_block(&self, id: TxNumber) -> Result<Option<BlockNumber>> {
self.db
.view(|tx| {
let mut cursor = tx.cursor_read::<tables::TransactionBlock>()?;
cursor.seek(id).map(|b| b.map(|(_, bn)| bn))
})?
.map_err(Into::into)
}
fn transactions_by_block(&self, id: BlockId) -> Result<Option<Vec<TransactionSigned>>> {
if let Some(number) = self.block_number_for_id(id)? {
let tx = self.db.tx()?;

View File

@@ -23,6 +23,7 @@ pub fn assert_genesis_block<DB: Database>(tx: &Transaction<'_, DB>, g: SealedBlo
assert_eq!(tx.table::<tables::BlockOmmers>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::BlockWithdrawals>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::Transactions>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::TransactionBlock>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::TxHashNumber>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::Receipts>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::PlainAccountState>().unwrap(), vec![]);

View File

@@ -149,6 +149,10 @@ impl TransactionsProvider for MockEthProvider {
.find_map(|(_, block)| block.body.iter().find(|tx| tx.hash == hash).cloned()))
}
fn transaction_block(&self, _id: TxNumber) -> Result<Option<BlockNumber>> {
unimplemented!()
}
fn transactions_by_block(&self, id: BlockId) -> Result<Option<Vec<TransactionSigned>>> {
Ok(self.block(id)?.map(|b| b.body))
}

View File

@@ -56,6 +56,10 @@ impl TransactionsProvider for NoopProvider {
Ok(None)
}
fn transaction_block(&self, _id: TxNumber) -> Result<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(&self, _block_id: BlockId) -> Result<Option<Vec<TransactionSigned>>> {
Ok(None)
}

View File

@@ -12,6 +12,9 @@ pub trait TransactionsProvider: BlockIdProvider + Send + Sync {
/// Get transaction by transaction hash.
fn transaction_by_hash(&self, hash: TxHash) -> Result<Option<TransactionSigned>>;
/// Get transaction block number
fn transaction_block(&self, id: TxNumber) -> Result<Option<BlockNumber>>;
/// Get transactions by block id.
fn transactions_by_block(&self, block: BlockId) -> Result<Option<Vec<TransactionSigned>>>;

View File

@@ -650,6 +650,12 @@ where
self.get_or_take::<tables::TxTransitionIndex, TAKE>(
first_transaction..=last_transaction,
)?;
// rm Transaction block index if there are transaction present
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
self.get_or_take::<tables::TransactionBlock, TAKE>(tx_id_range)?;
}
}
// Merge transaction into blocks
@@ -721,7 +727,7 @@ where
let block_header_hashes_iter = block_header_hashes.into_iter();
let block_tx_iter = block_tx.into_iter();
// can be not found in tables
// Ommers can be empty for some blocks
let mut block_ommers_iter = block_ommers.into_iter();
let mut block_withdrawals_iter = block_withdrawals.into_iter();
let mut block_ommers = block_ommers_iter.next();
@@ -1479,10 +1485,13 @@ pub enum TransactionError {
#[cfg(test)]
mod test {
use crate::{insert_canonical_block, test_utils::blocks::*, Transaction};
use crate::{
insert_canonical_block, test_utils::blocks::*, ShareableDatabase, Transaction,
TransactionsProvider,
};
use reth_db::{mdbx::test_utils::create_test_rw_db, tables, transaction::DbTxMut};
use reth_primitives::{proofs::EMPTY_ROOT, ChainSpecBuilder, TransitionId, MAINNET};
use std::ops::DerefMut;
use std::{ops::DerefMut, sync::Arc};
#[test]
fn insert_get_take() {
@@ -1554,6 +1563,28 @@ mod test {
)
.unwrap();
tx.commit().unwrap();
// Check that transactions map onto blocks correctly.
{
let provider = ShareableDatabase::new(tx.db, Arc::new(MAINNET.clone()));
assert_eq!(
provider.transaction_block(0).unwrap(),
Some(1),
"Transaction 0 should be in block 1"
);
assert_eq!(
provider.transaction_block(1).unwrap(),
Some(2),
"Transaction 1 should be in block 2"
);
assert_eq!(
provider.transaction_block(2).unwrap(),
None,
"Transaction 0 should not exist"
);
}
// get second block
let get = tx.get_block_and_execution_range(&chain_spec, 2..=2).unwrap();
assert_eq!(get, vec![(block2.clone(), exec_res2.clone())]);

View File

@@ -70,6 +70,14 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 },
)?;
if !block.body.is_empty() {
// -1 is here as current_tx_id points to the next transaction.
tx.put::<tables::TransactionBlock>(
current_tx_id + block.body.len() as u64 - 1,
block.number,
)?;
}
let senders_len = senders.as_ref().map(|s| s.len());
let tx_iter = if Some(block.body.len()) == senders_len {
block.body.into_iter().zip(senders.unwrap().into_iter()).collect::<Vec<(_, _)>>()