From d001313f99137e3599d74629e79994f20b042bda Mon Sep 17 00:00:00 2001 From: rakita Date: Sat, 15 Jul 2023 21:36:27 +0200 Subject: [PATCH] bug(stages): TxLookup/Sender stages fix range if there is no tx (#3479) --- bin/reth/src/stage/drop.rs | 8 +++ crates/stages/src/stage.rs | 56 +++++++++++++-------- crates/stages/src/stages/sender_recovery.rs | 2 +- crates/stages/src/test_utils/test_db.rs | 20 +++++--- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 771dff1d20..33298d287f 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -155,6 +155,14 @@ impl Command { )?; insert_genesis_header::(tx, self.chain)?; } + StageEnum::TxLookup => { + tx.clear::()?; + tx.put::( + StageId::TransactionLookup.to_string(), + Default::default(), + )?; + insert_genesis_header::(tx, self.chain)?; + } _ => { info!("Nothing to do for stage {:?}", self.stage); return Ok(()) diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 7580c6bab4..e369af0602 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,14 +1,14 @@ use crate::error::StageError; use async_trait::async_trait; -use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; +use reth_db::database::Database; use reth_primitives::{ stage::{StageCheckpoint, StageId}, BlockNumber, TxNumber, }; -use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError}; +use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError, TransactionsProvider}; use std::{ cmp::{max, min}, - ops::RangeInclusive, + ops::{Range, RangeInclusive}, }; /// Stage execution input, see [Stage::execute]. @@ -77,30 +77,46 @@ impl ExecInput { &self, provider: &DatabaseProviderRW<'_, DB>, tx_threshold: u64, - ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { + ) -> Result<(Range, RangeInclusive, bool), StageError> { let start_block = self.next_block(); + let target_block = self.target(); + let start_block_body = provider .block_body_indices(start_block)? .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?; + let first_tx_num = start_block_body.first_tx_num(); - let target_block = self.target(); + let target_block_body = provider + .block_body_indices(target_block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?; - let first_tx_number = start_block_body.first_tx_num(); - let mut last_tx_number = start_block_body.last_tx_num(); - let mut end_block_number = start_block; - let mut body_indices_cursor = - provider.tx_ref().cursor_read::()?; - for entry in body_indices_cursor.walk_range(start_block..=target_block)? { - let (block, body) = entry?; - last_tx_number = body.last_tx_num(); - end_block_number = block; - let tx_count = (first_tx_number..=last_tx_number).count() as u64; - if tx_count > tx_threshold { - break - } + // number of transactions left to execute. + let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num; + + if all_tx_cnt == 0 { + // if there is no more transaction return back. + return Ok((first_tx_num..first_tx_num, start_block..=target_block, true)) } - let is_final_range = end_block_number >= target_block; - Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range)) + + // get block of this tx + let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold { + (target_block, true, target_block_body.next_tx_num()) + } else { + // get tx block number. next_tx_num in this case will be less thean all_tx_cnt. + // So we are sure that transaction must exist. + let end_block_number = provider + .transaction_block(first_tx_num + tx_threshold)? + .expect("block of tx must exist"); + // we want to get range of all transactions of this block, so we are fetching block + // body. + let end_block_body = provider + .block_body_indices(end_block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?; + (end_block_number, false, end_block_body.next_tx_num()) + }; + + let tx_range = first_tx_num..next_tx_num; + Ok((tx_range, start_block..=end_block, is_final_range)) } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index e6294b3a86..8872c8138f 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -85,7 +85,7 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor over the transactions let mut tx_cursor = tx.cursor_read::>()?; // Walk the transactions from start to end index (inclusive) - let raw_tx_range = RawKey::new(*tx_range.start())..=RawKey::new(*tx_range.end()); + let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end); let tx_walker = tx_cursor.walk_range(raw_tx_range)?; // Iterate over transactions in chunks diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index e3ad10d15c..ddab550407 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -246,13 +246,19 @@ impl TestTransaction { blocks.into_iter().try_for_each(|block| { Self::insert_header(tx, &block.header)?; // Insert into body tables. - tx.put::( - block.number, - StoredBlockBodyIndices { - first_tx_num: next_tx_num, - tx_count: block.body.len() as u64, - }, - )?; + let block_body_indices = StoredBlockBodyIndices { + first_tx_num: next_tx_num, + tx_count: block.body.len() as u64, + }; + + if !block.body.is_empty() { + tx.put::( + block_body_indices.last_tx_num(), + block.number, + )?; + } + tx.put::(block.number, block_body_indices)?; + block.body.iter().try_for_each(|body_tx| { tx.put::(next_tx_num, body_tx.clone().into())?; next_tx_num += 1;