Optimize Sender Recovery Process (#11385)

This commit is contained in:
Ayodeji Akinola
2024-10-18 16:17:11 +01:00
committed by GitHub
parent 9c8f5d89d8
commit 587c91f1cf

View File

@@ -29,6 +29,9 @@ const BATCH_SIZE: usize = 100_000;
/// Maximum number of senders to recover per rayon worker job.
const WORKER_CHUNK_SIZE: usize = 100;
/// Type alias for a sender that transmits the result of sender recovery.
type RecoveryResultSender = mpsc::Sender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
/// in [`TransactionSenders`][reth_db::tables::TransactionSenders] table.
@@ -100,8 +103,10 @@ where
.map(|start| start..std::cmp::min(start + BATCH_SIZE as u64, tx_range.end))
.collect::<Vec<Range<u64>>>();
let tx_batch_sender = setup_range_recovery(provider);
for range in batch {
recover_range(range, provider, &mut senders_cursor)?;
recover_range(range, provider, tx_batch_sender.clone(), &mut senders_cursor)?;
}
Ok(ExecOutput {
@@ -136,15 +141,16 @@ where
fn recover_range<Provider, CURSOR>(
tx_range: Range<u64>,
provider: &Provider,
tx_batch_sender: mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>,
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch");
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");
// Preallocate channels
// Preallocate channels for each chunks in the batch
let (chunks, receivers): (Vec<_>, Vec<_>) = tx_range
.clone()
.step_by(WORKER_CHUNK_SIZE)
@@ -156,62 +162,9 @@ where
})
.unzip();
let static_file_provider = provider.static_file_provider();
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
// additional blocking tasks. This would cause this function to return
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = match static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
) {
Ok(chunk) => chunk,
Err(err) => {
// We exit early since we could not process this chunk.
let _ = recovered_senders_tx
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
break
}
};
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
// and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
let res = tx
.value()
.map_err(|err| Box::new(SenderRecoveryStageError::StageError(err.into())))
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));
let is_err = res.is_err();
let _ = recovered_senders_tx.send(res);
// Finish early
if is_err {
break
}
}
});
}
});
if let Some(err) = tx_batch_sender.send(chunks).err() {
return Err(StageError::Fatal(err.into()));
}
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database");
@@ -235,6 +188,7 @@ where
provider.sealed_header(block_number)?.ok_or_else(|| {
ProviderError::HeaderNotFound(block_number.into())
})?;
Err(StageError::Block {
block: Box::new(sealed_header),
error: BlockErrorKind::Validation(
@@ -269,10 +223,82 @@ where
.into(),
));
}
Ok(())
}
/// Spawns a thread to handle the recovery of transaction senders for
/// specified chunks of a given batch. It processes incoming ranges, fetching and recovering
/// transactions in parallel using global rayon pool
fn setup_range_recovery<Provider>(
provider: &Provider,
) -> mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
{
let (tx_sender, tx_receiver) = mpsc::channel::<Vec<(Range<u64>, RecoveryResultSender)>>();
let static_file_provider = provider.static_file_provider();
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
// additional blocking tasks. This would cause this function to return
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
while let Ok(chunks) = tx_receiver.recv() {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = match static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
) {
Ok(chunk) => chunk,
Err(err) => {
// We exit early since we could not process this chunk.
let _ = recovered_senders_tx
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
break
}
};
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the
// transaction and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
let res = tx
.value()
.map_err(|err| {
Box::new(SenderRecoveryStageError::StageError(err.into()))
})
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));
let is_err = res.is_err();
let _ = recovered_senders_tx.send(res);
// Finish early
if is_err {
break
}
}
});
}
}
});
tx_sender
}
#[inline]
fn recover_sender(
(tx_id, tx): (TxNumber, TransactionSignedNoHash),