feat(stages): normalized sender recovery commits (#3013)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2023-06-06 23:51:19 +03:00
committed by GitHub
parent 7eeecf0835
commit 0bf360e460
10 changed files with 132 additions and 70 deletions

View File

@@ -178,13 +178,13 @@ impl From<BodiesConfig> for BodiesDownloaderBuilder {
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database.
/// The maximum number of transactions to process before committing progress to the database.
pub commit_threshold: u64,
}
impl Default for SenderRecoveryConfig {
fn default() -> Self {
Self { commit_threshold: 5_000 }
Self { commit_threshold: 50_000 }
}
}

View File

@@ -1,11 +1,11 @@
use crate::error::StageError;
use async_trait::async_trait;
use reth_db::database::Database;
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber,
BlockNumber, TxNumber,
};
use reth_provider::Transaction;
use reth_provider::{ProviderError, Transaction};
use std::{
cmp::{max, min},
ops::RangeInclusive,
@@ -26,6 +26,18 @@ impl ExecInput {
self.checkpoint.unwrap_or_default()
}
/// Return the next block number after the current
/// +1 is needed to skip the present block and always start from block number 1, not 0.
pub fn next_block(&self) -> BlockNumber {
let current_block = self.checkpoint();
current_block.block_number + 1
}
/// Returns `true` if the target block number has already been reached.
pub fn target_reached(&self) -> bool {
self.checkpoint().block_number >= self.previous_stage_checkpoint_block_number()
}
/// Return the progress of the previous stage or default.
pub fn previous_stage_checkpoint_block_number(&self) -> BlockNumber {
self.previous_stage.map(|(_, block_number)| block_number).unwrap_or_default()
@@ -49,7 +61,6 @@ impl ExecInput {
threshold: u64,
) -> (RangeInclusive<BlockNumber>, bool) {
let current_block = self.checkpoint();
// +1 is to skip present block and always start from block number 1, not 0.
let start = current_block.block_number + 1;
let target = self.previous_stage_checkpoint_block_number();
@@ -58,6 +69,38 @@ impl ExecInput {
let is_final_range = end == target;
(start..=end, is_final_range)
}
/// Return the next block range determined the number of transactions within it.
/// This function walks the the block indices until either the end of the range is reached or
/// the number of transactions exceeds the threshold.
pub fn next_block_range_with_transaction_threshold<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
tx_threshold: u64,
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
let start_block = self.next_block();
let start_block_body = tx
.get::<tables::BlockBodyIndices>(start_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
let target_block = self.previous_stage_checkpoint_block_number();
let first_tx_number = start_block_body.first_tx_num();
let mut last_tx_number = start_block_body.last_tx_num();
let mut end_block_number = start_block;
let mut body_indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
for entry in body_indices_cursor.walk_range(start_block..=target_block)? {
let (block, body) = entry?;
last_tx_number = body.last_tx_num();
end_block_number = block;
let tx_count = (first_tx_number..=last_tx_number).count() as u64;
if tx_count > tx_threshold {
break
}
}
let is_final_range = end_block_number >= target_block;
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range))
}
}
/// Stage unwind input, see [Stage::unwind].

View File

@@ -138,7 +138,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let start_block = input.checkpoint().block_number + 1;
let start_block = input.next_block();
let max_block = input.previous_stage_checkpoint_block_number();
// Build executor

View File

@@ -552,7 +552,7 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.checkpoint().block_number + 1;
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())

View File

@@ -486,7 +486,7 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.checkpoint().block_number + 1;
let stage_progress = input.next_block();
let end = input.previous_stage_checkpoint_block_number();
let n_accounts = 31;

View File

@@ -216,10 +216,7 @@ mod tests {
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number + 1,
to: run_to
},
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),

View File

@@ -229,10 +229,7 @@ mod tests {
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number + 1,
to: run_to
},
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),

View File

@@ -38,7 +38,7 @@ impl SenderRecoveryStage {
impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { commit_threshold: 500_000 }
Self { commit_threshold: 50_000 }
}
}
@@ -59,21 +59,17 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint().block_number))
}
let (start_block, end_block) = range.clone().into_inner();
// Look up the start index for the transaction range
let first_tx_num = tx.block_body_indices(start_block)?.first_tx_num();
// Look up the end index for transaction range (inclusive)
let last_tx_num = tx.block_body_indices(end_block)?.last_tx_num();
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
let end_block = *block_range.end();
// No transactions to walk over
if first_tx_num > last_tx_num {
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached");
if tx_range.is_empty() {
info!(target: "sync::stages::sender_recovery", ?tx_range, "Target transaction already reached");
return Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
@@ -87,11 +83,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<RawTable<tables::Transactions>>()?;
// Walk the transactions from start to end index (inclusive)
let tx_walker =
tx_cursor.walk_range(RawKey::new(first_tx_num)..=RawKey::new(last_tx_num))?;
let raw_tx_range = RawKey::new(*tx_range.start())..=RawKey::new(*tx_range.end());
let tx_walker = tx_cursor.walk_range(raw_tx_range)?;
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders");
info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders");
// channels used to return result of sender recovery.
let mut channels = Vec::new();
@@ -102,12 +98,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// We try to evenly divide the transactions to recover across all threads in the threadpool.
// Chunks are submitted instead of individual transactions to reduce the overhead of work
// stealing in the threadpool workers.
for chunk in
&tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads())
{
let chunk_size = self.commit_threshold as usize / rayon::current_num_threads();
for chunk in &tx_walker.chunks(chunk_size) {
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
channels.push(rx);
let (recovered_senders_tx, recovered_senders_rx) = mpsc::unbounded_channel();
channels.push(recovered_senders_rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();
@@ -140,7 +135,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(recover(entry, &mut rlp_buf));
let recovery_result = recover_sender(entry, &mut rlp_buf);
let _ = recovered_senders_tx.send(recovery_result);
}
});
}
@@ -204,6 +200,24 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
fn recover_sender(
entry: Result<(RawKey<TxNumber>, RawValue<TransactionSignedNoHash>), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(u64, H160), Box<StageError>> {
let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?;
let tx_id = tx_id.key().expect("key to be formated");
let tx = transaction.value().expect("value to be formated");
tx.transaction.encode_without_signature(rlp_buf);
let sender = tx
.signature
.recover_signer(keccak256(rlp_buf))
.ok_or(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }))?;
Ok((tx_id, sender))
}
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
@@ -289,53 +303,63 @@ mod tests {
/// Execute the stage twice with input range that exceeds the commit threshold
#[tokio::test]
async fn execute_intermediate_commit() {
let threshold = 50;
let threshold = 10;
let mut runner = SenderRecoveryTestRunner::default();
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
// Manually seed once with full input range
let seed = random_block_range(stage_progress + 1..=previous_stage, H256::zero(), 0..4); // set tx count range high enough to hit the threshold
runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution");
let total_transactions = runner.tx.table::<tables::Transactions>().unwrap().len() as u64;
let first_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Seed only once with full input range
runner.seed_execution(first_input).expect("failed to seed execution");
let total_transactions = runner.tx.table::<tables::Transactions>().unwrap().len() as u64;
// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number == expected_progress &&
processed == runner.tx.table::<tables::TxSenders>().unwrap().len() as u64 &&
total == total_transactions
let mut tx_count = 0;
let expected_progress = seed
.iter()
.find(|x| {
tx_count += x.body.len();
tx_count as u64 > threshold
})
.map(|x| x.number)
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.tx.table::<tables::TxSenders>().unwrap().len() as u64,
total: total_transactions
}
),
done: false
}
);
// Execute second time
// Execute second time to completion
runner.set_threshold(u64::MAX);
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage &&
processed == runner.tx.table::<tables::TxSenders>().unwrap().len() as u64 &&
total == total_transactions
assert_matches!(result, Ok(_));
assert_eq!(
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_transactions, total: total_transactions }
),
done: true
}
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
@@ -410,7 +434,7 @@ mod tests {
) -> Result<(), TestRunnerError> {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.checkpoint().block_number + 1;
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;
if start_block > end_block {

View File

@@ -389,7 +389,7 @@ mod tests {
) -> Result<(), TestRunnerError> {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.checkpoint().block_number + 1;
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;
if start_block > end_block {

View File

@@ -19,6 +19,7 @@ use reth_provider::Transaction;
use std::{
borrow::Borrow,
collections::BTreeMap,
ops::RangeInclusive,
path::{Path, PathBuf},
sync::Arc,
};