diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs index 61b47d44f2..7beb11e8ec 100644 --- a/crates/transaction-pool/src/pool/best.rs +++ b/crates/transaction-pool/src/pool/best.rs @@ -7,7 +7,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashSet}, sync::Arc, }; -use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::{error::TryRecvError, Receiver}; use tracing::debug; /// An iterator that returns transactions that can be executed on the current state (*best* @@ -86,17 +86,24 @@ impl BestTransactions { /// Non-blocking read on the new pending transactions subscription channel fn try_recv(&mut self) -> Option> { - match self.new_transaction_reciever.try_recv() { - Ok(tx) => Some(tx), - // note TryRecvError::Lagged can be returned here, which is an error that attempts to - // correct itself on consecutive try_recv() attempts + loop { + match self.new_transaction_reciever.try_recv() { + Ok(tx) => return Some(tx), + // note TryRecvError::Lagged can be returned here, which is an error that attempts + // to correct itself on consecutive try_recv() attempts - // the cost of ignoring this error is allowing old transactions to get - // overwritten after the chan buffer size is met + // the cost of ignoring this error is allowing old transactions to get + // overwritten after the chan buffer size is met + Err(TryRecvError::Lagged(_)) => { + // Handle the case where the receiver lagged too far behind. + // `num_skipped` indicates the number of messages that were skipped. + continue + } - // this case is still better than the existing iterator behavior where no new - // pending txs are surfaced to consumers - Err(_) => None, + // this case is still better than the existing iterator behavior where no new + // pending txs are surfaced to consumers + Err(_) => return None, + } } }