From 2d7c4203c8c344003e09dc38daf774fba5b2d22d Mon Sep 17 00:00:00 2001 From: "Protocolwhisper.eth" <57886661+protocolwhisper@users.noreply.github.com> Date: Thu, 17 Aug 2023 05:19:39 -0500 Subject: [PATCH] (fix): Impl. Lagged error in pool update channel (#4242) --- crates/transaction-pool/src/pool/best.rs | 27 +++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs index 61b47d44f2..7beb11e8ec 100644 --- a/crates/transaction-pool/src/pool/best.rs +++ b/crates/transaction-pool/src/pool/best.rs @@ -7,7 +7,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashSet}, sync::Arc, }; -use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::{error::TryRecvError, Receiver}; use tracing::debug; /// An iterator that returns transactions that can be executed on the current state (*best* @@ -86,17 +86,24 @@ impl BestTransactions { /// Non-blocking read on the new pending transactions subscription channel fn try_recv(&mut self) -> Option> { - match self.new_transaction_reciever.try_recv() { - Ok(tx) => Some(tx), - // note TryRecvError::Lagged can be returned here, which is an error that attempts to - // correct itself on consecutive try_recv() attempts + loop { + match self.new_transaction_reciever.try_recv() { + Ok(tx) => return Some(tx), + // note TryRecvError::Lagged can be returned here, which is an error that attempts + // to correct itself on consecutive try_recv() attempts - // the cost of ignoring this error is allowing old transactions to get - // overwritten after the chan buffer size is met + // the cost of ignoring this error is allowing old transactions to get + // overwritten after the chan buffer size is met + Err(TryRecvError::Lagged(_)) => { + // Handle the case where the receiver lagged too far behind. + // `num_skipped` indicates the number of messages that were skipped. + continue + } - // this case is still better than the existing iterator behavior where no new - // pending txs are surfaced to consumers - Err(_) => None, + // this case is still better than the existing iterator behavior where no new + // pending txs are surfaced to consumers + Err(_) => return None, + } } }