mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
fix(payload): avoid dropping rebuild ticks while payload build is in progress (#22654)
Signed-off-by: Delweng <delweng@gmail.com>
This commit is contained in:
@@ -376,48 +376,56 @@ where
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
// check if the interval is reached
|
||||
while this.interval.poll_tick(cx).is_ready() {
|
||||
// start a new job if there is no pending block, we haven't reached the deadline,
|
||||
// and the payload isn't frozen
|
||||
if this.pending_block.is_none() && !this.best_payload.is_frozen() {
|
||||
this.spawn_build_job();
|
||||
}
|
||||
}
|
||||
|
||||
// poll the pending block
|
||||
if let Some(mut fut) = this.pending_block.take() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(outcome)) => match outcome {
|
||||
BuildOutcome::Better { payload, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
|
||||
this.best_payload = PayloadState::Best(payload);
|
||||
loop {
|
||||
// Wait for any pending build to complete before polling the next tick.
|
||||
//
|
||||
// This avoids consuming interval ticks while a build is still in-flight,
|
||||
// which would delay the follow-up build by a full interval even though
|
||||
// the current attempt has already finished.
|
||||
if let Some(mut fut) = this.pending_block.take() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(outcome)) => match outcome {
|
||||
BuildOutcome::Better { payload, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
|
||||
this.best_payload = PayloadState::Best(payload);
|
||||
}
|
||||
BuildOutcome::Freeze(payload) => {
|
||||
debug!(target: "payload_builder", "payload frozen, no further building will occur");
|
||||
this.best_payload = PayloadState::Frozen(payload);
|
||||
}
|
||||
BuildOutcome::Aborted { fees, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
|
||||
}
|
||||
BuildOutcome::Cancelled => {
|
||||
unreachable!("the cancel signal never fired")
|
||||
}
|
||||
},
|
||||
Poll::Ready(Err(error)) => {
|
||||
// job failed, but we simply try again next interval
|
||||
debug!(target: "payload_builder", %error, "payload build attempt failed");
|
||||
this.metrics.inc_failed_payload_builds();
|
||||
}
|
||||
BuildOutcome::Freeze(payload) => {
|
||||
debug!(target: "payload_builder", "payload frozen, no further building will occur");
|
||||
this.best_payload = PayloadState::Frozen(payload);
|
||||
Poll::Pending => {
|
||||
this.pending_block = Some(fut);
|
||||
return Poll::Pending
|
||||
}
|
||||
BuildOutcome::Aborted { fees, cached_reads } => {
|
||||
this.cached_reads = Some(cached_reads);
|
||||
trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
|
||||
}
|
||||
BuildOutcome::Cancelled => {
|
||||
unreachable!("the cancel signal never fired")
|
||||
}
|
||||
},
|
||||
Poll::Ready(Err(error)) => {
|
||||
// job failed, but we simply try again next interval
|
||||
debug!(target: "payload_builder", %error, "payload build attempt failed");
|
||||
this.metrics.inc_failed_payload_builds();
|
||||
}
|
||||
Poll::Pending => {
|
||||
this.pending_block = Some(fut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
if this.best_payload.is_frozen() {
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
// Wait for the next build interval tick.
|
||||
//
|
||||
// The loop is needed because `poll_tick` does not register a waker
|
||||
// when it returns `Ready`, so we must loop back after spawning a job
|
||||
// to reach a point that *does* register one (the pending block poll above).
|
||||
ready!(this.interval.poll_tick(cx));
|
||||
this.spawn_build_job()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1042,4 +1042,76 @@ mod tests {
|
||||
assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
|
||||
}
|
||||
}
|
||||
|
||||
/// Reproduces the "Blob Transaction Ordering, Multiple Clients" Hive scenario.
|
||||
///
|
||||
/// Sender A contributes 5-blob transactions while sender B contributes 1-blob transactions.
|
||||
/// A single payload build should be able to fill the block with 6 blobs total (5+1).
|
||||
#[test]
|
||||
fn test_blob_transaction_ordering_multiple_clients_shape() {
|
||||
let mut pool = PendingPool::new(MockOrdering::default());
|
||||
let mut f = MockTransactionFactory::default();
|
||||
|
||||
let base_fee: u64 = 10;
|
||||
let base_fee_per_blob_gas: u64 = 1;
|
||||
let max_blob_count: u64 = 6;
|
||||
|
||||
let sender_a = MockTransaction::eip4844()
|
||||
.with_blob_hashes(5)
|
||||
.with_max_fee(base_fee as u128 + 20)
|
||||
.with_priority_fee(base_fee as u128 + 20)
|
||||
.with_blob_fee(120);
|
||||
for nonce in 0..5u64 {
|
||||
let tx = sender_a.clone().rng_hash().with_nonce(nonce);
|
||||
pool.add_transaction(Arc::new(f.validated(tx)), 0);
|
||||
}
|
||||
|
||||
let sender_b = MockTransaction::eip4844()
|
||||
.with_blob_hashes(1)
|
||||
.with_max_fee(base_fee as u128 + 20)
|
||||
.with_priority_fee(base_fee as u128 + 20)
|
||||
.with_blob_fee(100);
|
||||
for nonce in 0..5u64 {
|
||||
let tx = sender_b.clone().rng_hash().with_nonce(nonce);
|
||||
pool.add_transaction(Arc::new(f.validated(tx)), 0);
|
||||
}
|
||||
|
||||
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
|
||||
let mut block_blob_count = 0u64;
|
||||
let mut included_txs = 0u64;
|
||||
|
||||
while let Some(tx) = best.next() {
|
||||
if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
|
||||
let tx_blob_count = blob_hashes.len() as u64;
|
||||
|
||||
if block_blob_count + tx_blob_count > max_blob_count {
|
||||
crate::traits::BestTransactions::mark_invalid(
|
||||
&mut best,
|
||||
&tx,
|
||||
&InvalidPoolTransactionError::Eip4844(
|
||||
Eip4844PoolTransactionError::TooManyEip4844Blobs {
|
||||
have: block_blob_count + tx_blob_count,
|
||||
permitted: max_blob_count,
|
||||
},
|
||||
),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
block_blob_count += tx_blob_count;
|
||||
included_txs += 1;
|
||||
|
||||
if block_blob_count == max_blob_count {
|
||||
best.skip_blobs();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
block_blob_count, max_blob_count,
|
||||
"expected a full blob block (5+1 blobs across senders)"
|
||||
);
|
||||
assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,6 +428,14 @@ impl MockTransaction {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the number of blob versioned hashes for EIP-4844 transactions.
|
||||
pub fn with_blob_hashes(mut self, count: usize) -> Self {
|
||||
if let Self::Eip4844 { blob_versioned_hashes, .. } = &mut self {
|
||||
*blob_versioned_hashes = (0..count).map(|_| B256::random()).collect();
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the max fee per blob gas for EIP-4844 transactions,
|
||||
pub const fn set_blob_fee(&mut self, val: u128) -> &mut Self {
|
||||
if let Self::Eip4844 { max_fee_per_blob_gas, .. } = self {
|
||||
|
||||
Reference in New Issue
Block a user