mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(engine): guard receipt streaming against duplicate indices (#21512)
This commit is contained in:
@@ -77,8 +77,22 @@ impl<R: Receipt> ReceiptRootTaskHandle<R> {
|
||||
receipt_with_bloom.encode_2718(&mut encode_buf);
|
||||
|
||||
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
|
||||
builder.push_unchecked(indexed_receipt.index, &encode_buf);
|
||||
received_count += 1;
|
||||
match builder.push(indexed_receipt.index, &encode_buf) {
|
||||
Ok(()) => {
|
||||
received_count += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
// If a duplicate or out-of-bounds index is streamed, skip it and
|
||||
// fall back to computing the receipt root from the full receipts
|
||||
// vector later.
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
index = indexed_receipt.index,
|
||||
?err,
|
||||
"Receipt root task received invalid receipt index, skipping"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Ok(root) = builder.finalize() else {
|
||||
|
||||
@@ -792,6 +792,11 @@ where
|
||||
// Execute transactions
|
||||
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
|
||||
let mut transactions = transactions.into_iter();
|
||||
// Some executors may execute transactions that do not append receipts during the
|
||||
// main loop (e.g., system transactions whose receipts are added during finalization).
|
||||
// In that case, invoking the callback on every transaction would resend the previous
|
||||
// receipt with the same index and can panic the ordered root builder.
|
||||
let mut last_sent_len = 0usize;
|
||||
loop {
|
||||
// Measure time spent waiting for next transaction from iterator
|
||||
// (e.g., parallel signature recovery)
|
||||
@@ -818,10 +823,14 @@ where
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
// Send the latest receipt to the background task for incremental root computation
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = executor.receipts().len() - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
// Send the latest receipt to the background task for incremental root computation.
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = current_len - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
enter.record("gas_used", gas_used);
|
||||
|
||||
Reference in New Issue
Block a user