From 2be37884811971d3caef7cc8ae8d3996063cbb05 Mon Sep 17 00:00:00 2001 From: 0xMars42 Date: Sun, 15 Feb 2026 11:48:11 +0100 Subject: [PATCH] fix(exex): drain notification channel during backfill to prevent stall (#22168) Co-authored-by: Georgios Konstantopoulos Co-authored-by: Amp --- crates/exex/exex/src/notifications.rs | 173 +++++++++++++++++++++++++- 1 file changed, 171 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 9d9e6d9327..b5124e8ce0 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -10,6 +10,7 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_stages_api::ExecutionStageThresholds; use reth_tracing::tracing::debug; use std::{ + collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, @@ -286,6 +287,9 @@ where backfill_job: Option>>, /// Custom thresholds for the backfill job, if set. backfill_thresholds: Option, + /// Notifications that arrived during backfill and need to be delivered after it completes. + /// These are notifications for blocks beyond the backfill range that we must not drop. + pending_notifications: VecDeque>, } impl ExExNotificationsWithHead @@ -312,6 +316,7 @@ where pending_check_backfill: true, backfill_job: None, backfill_thresholds: None, + pending_notifications: VecDeque::new(), } } @@ -448,6 +453,34 @@ where // 3. If backfill is in progress yield new notifications if let Some(backfill_job) = &mut this.backfill_job { debug!(target: "exex::notifications", "Polling backfill job"); + + // Drain the notification channel to prevent backpressure from stalling the + // ExExManager. During backfill, the ExEx is not consuming from the channel, + // so the capacity-1 channel fills up, which blocks the manager's PollSender, + // which fills the manager's 1024-entry buffer, which blocks all upstream + // senders. Notifications for blocks covered by the backfill range are + // discarded (they'll be re-delivered by the backfill job), while + // notifications beyond the backfill range are buffered for delivery after the + // backfill completes. + while let Poll::Ready(Some(notification)) = this.notifications.poll_recv(cx) { + // Always buffer revert-containing notifications (ChainReverted, + // ChainReorged) because the backfill job only re-delivers + // ChainCommitted from the database. Discarding a reorg here would + // leave the ExEx unaware of the fork switch. + if notification.reverted_chain().is_some() { + this.pending_notifications.push_back(notification); + continue; + } + if let Some(committed) = notification.committed_chain() && + committed.tip().number() <= this.initial_local_head.number + { + // Covered by backfill range, safe to discard + continue; + } + // Beyond the backfill range — buffer for delivery after backfill + this.pending_notifications.push_back(notification); + } + if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? { debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain"); return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted { @@ -459,13 +492,18 @@ where this.backfill_job = None; } - // 4. Otherwise advance the regular event stream + // 4. Deliver any notifications that were buffered during backfill + if let Some(notification) = this.pending_notifications.pop_front() { + return Poll::Ready(Some(Ok(notification))) + } + + // 5. Otherwise advance the regular event stream loop { let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { return Poll::Ready(None) }; - // 5. In case the exex is ahead of the new tip, we must skip it + // 6. In case the exex is ahead of the new tip, we must skip it if let Some(committed) = notification.committed_chain() { // inclusive check because we should start with `exex.head + 1` if this.initial_exex_head.block.number >= committed.tip().number() { @@ -789,4 +827,135 @@ mod tests { Ok(()) } + + /// Regression test for . + /// + /// During backfill, `poll_next` must drain the notification channel so that + /// the upstream `ExExManager` is never blocked by a full channel. Without + /// the drain loop the capacity-1 channel stays full for the entire backfill + /// duration, which stalls the manager's `PollSender` and eventually blocks + /// all upstream senders once the 1024-entry buffer fills up. + /// + /// The key assertion is the `try_send` after the first `poll_next`: it + /// proves the channel was drained during the backfill poll. Without the + /// fix this `try_send` fails because the notification is still sitting in + /// the channel. + #[tokio::test] + async fn exex_notifications_backfill_drains_channel() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let provider = BlockchainProvider::new(provider_factory.clone())?; + + // Insert block 1 into the DB so there's something to backfill + let node_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ) + .try_recover()?; + let node_head = node_head_block.num_hash(); + let provider_rw = provider_factory.provider_rw()?; + provider_rw.insert_block(&node_head_block)?; + provider_rw.commit()?; + + // ExEx head is at genesis — backfill will run for block 1 + let exex_head = + ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } }; + + // Notification for a block AFTER the backfill range (block 2). + let post_backfill_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .try_recover()?], + Default::default(), + BTreeMap::new(), + )), + }; + + // Another notification (block 3) used to probe channel capacity. + let probe_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + node_head.number + 2, + BlockParams { parent: None, ..Default::default() }, + ) + .try_recover()?], + Default::default(), + BTreeMap::new(), + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + // Fill the capacity-1 channel. + notifications_tx.send(post_backfill_notification.clone()).await?; + + // Confirm the channel is full — this is the precondition that causes the + // stall in production: the ExExManager's PollSender would block here. + assert!( + notifications_tx.try_send(probe_notification.clone()).is_err(), + "channel should be full before backfill poll" + ); + + let mut notifications = ExExNotificationsWithoutHead::new( + node_head, + provider, + EthEvmConfig::mainnet(), + notifications_rx, + wal.handle(), + ) + .with_head(exex_head); + + // Poll once — this returns the backfill result for block 1. Crucially, + // the drain loop in poll_next runs in this same call, consuming the + // notification from the channel and buffering it. + let backfill_result = notifications.next().await.transpose()?; + assert_eq!( + backfill_result, + Some(ExExNotification::ChainCommitted { + new: Arc::new( + BackfillJobFactory::new( + notifications.evm_config.clone(), + notifications.provider.clone() + ) + .backfill(1..=1) + .next() + .ok_or_eyre("failed to backfill")?? + ) + }) + ); + + // KEY ASSERTION: the channel was drained during the backfill poll above. + // Without the drain loop this try_send fails because the original + // notification is still occupying the capacity-1 channel. + assert!( + notifications_tx.try_send(probe_notification.clone()).is_ok(), + "channel should have been drained during backfill poll" + ); + + // The first buffered notification (block 2) was drained from the channel + // during backfill and is delivered now. + let buffered = notifications.next().await.transpose()?; + assert_eq!(buffered, Some(post_backfill_notification)); + + // The probe notification (block 3) that we just sent is delivered next. + let probe = notifications.next().await.transpose()?; + assert_eq!(probe, Some(probe_notification)); + + Ok(()) + } }