diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 84fb264fd9..7393ae993d 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -85,7 +85,9 @@ pub use self::constants::{ }; use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE}; -/// The future for inserting a function into the pool +/// The future for importing transactions into the pool. +/// +/// Resolves with the result of each transaction import. pub type PoolImportFuture = Pin>> + Send + 'static>>; /// Api to interact with [TransactionsManager] task. @@ -220,7 +222,17 @@ pub struct TransactionsManager { /// This way we can track incoming transactions and prevent multiple pool imports for the same /// transaction transactions_by_peers: HashMap>, - /// Transactions that are currently imported into the `Pool` + /// Transactions that are currently imported into the `Pool`. + /// + /// The import process includes: + /// - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are + /// valid, or for 4844 transaction the blobs are valid. See also + /// [EthTransactionValidator](reth_transaction_pool::validate::EthTransactionValidator) + /// - if the transaction is valid, it is added into the pool. + /// + /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via + /// [TransactionPool::pending_transactions_listener] and arrive at the `pending_transactions` + /// receiver. pool_imports: FuturesUnordered, /// Stats on pending pool imports that help the node self-monitor. pending_pool_imports_info: PendingPoolImportsInfo, @@ -237,7 +249,14 @@ pub struct TransactionsManager { /// This will only receive commands if a user manually sends a command to the manager through /// the [TransactionsHandle] to interact with this type directly. command_rx: UnboundedReceiverStream, - /// Incoming commands from [`TransactionsHandle`]. + /// A stream that yields new __pending__ transactions. + /// + /// A transaction is considered __pending__ if it is executable on the current state of the + /// chain. In other words, this only yields transactions that satisfy all consensus + /// requirements, these include: + /// - no nonce gaps + /// - all dynamic fee requirements are (currently) met + /// - account has enough balance to cover the transaction's gas pending_transactions: ReceiverStream, /// Incoming events from the [`NetworkManager`](crate::NetworkManager). transaction_events: UnboundedMeteredReceiver, @@ -263,8 +282,8 @@ impl TransactionsManager { &transactions_manager_config.transaction_fetcher_config, ); - // install a listener for new pending transactions that are allowed to be propagated over - // the network + // install a listener for new __pending__ transactions that are allowed to be propagated + // over the network let pending = pool.pending_transactions_listener(); let pending_pool_imports_info = PendingPoolImportsInfo::default(); @@ -385,18 +404,18 @@ where } } - /// Invoked when a new transaction is pending in the local pool. + /// Invoked when transactions in the local mempool are considered __pending__. /// - /// When new transactions appear in the pool, we propagate them to the network using the - /// `Transactions` and `NewPooledTransactionHashes` messages. The Transactions message relays - /// complete transaction objects and is typically sent to a small, random fraction of connected - /// peers. + /// When a transaction in the local mempool is moved to the pending pool, we propagate them to + /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes` + /// messages. The Transactions message relays complete transaction objects and is typically + /// sent to a small, random fraction of connected peers. /// /// All other peers receive a notification of the transaction hash and can request the /// complete transaction object if it is unknown to them. The dissemination of complete /// transactions to a fraction of peers usually ensures that all nodes receive the transaction /// and won't need to request it. - fn on_new_transactions(&mut self, hashes: Vec) { + fn on_new_pending_transactions(&mut self, hashes: Vec) { // Nothing to propagate while initially syncing if self.network.is_initially_syncing() { return @@ -407,8 +426,8 @@ where trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions"); - // This fetches all transaction from the pool, including the blob transactions, which are - // only ever sent as hashes. + // This fetches all transaction from the pool, including the 4844 blob transactions but + // __without__ their sidecar, because 4844 transactions are only ever announced as hashes. let propagated = self.propagate_transactions( self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(), ); @@ -417,7 +436,7 @@ where self.pool.on_propagated(propagated); } - /// Propagate the transactions to all connected peers either as full objects or hashes + /// Propagate the transactions to all connected peers either as full objects or hashes. /// /// The message for new pooled hashes depends on the negotiated version of the stream. /// See [NewPooledTransactionHashes] @@ -856,7 +875,9 @@ where /// Handles a command received from a detached [`TransactionsHandle`] fn on_command(&mut self, cmd: TransactionsCommand) { match cmd { - TransactionsCommand::PropagateHash(hash) => self.on_new_transactions(vec![hash]), + TransactionsCommand::PropagateHash(hash) => { + self.on_new_pending_transactions(vec![hash]) + } TransactionsCommand::PropagateHashesTo(hashes, peer) => { self.propagate_hashes_to(hashes, peer) } @@ -1297,10 +1318,10 @@ where let acc = &mut poll_durations.acc_imported_txns; duration_metered_exec!( { - // drain successful pool insertions, handle and propagate transactions. - // - // higher priority! stream is drained - // + // drain new __pending__ transactions handle and propagate transactions. + // we drain this to batch the transactions in a single message. + // we don't expect this buffer to be large, since only pending transactions are + // emitted here. let mut new_txs = Vec::new(); while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) @@ -1308,7 +1329,7 @@ where new_txs.push(hash); } if !new_txs.is_empty() { - this.on_new_transactions(new_txs); + this.on_new_pending_transactions(new_txs); } }, acc @@ -1545,9 +1566,9 @@ pub enum NetworkTransactionEvent { /// Tracks stats about the [`TransactionsManager`]. #[derive(Debug)] struct PendingPoolImportsInfo { - /// Number of transactions about to be imported into the pool. + /// Number of transactions that are currently being imported into pool. pending_pool_imports: Arc, - /// Max number of transactions about to be imported into the pool. + /// Max number of transactions allowed to be imported concurrently. max_pending_pool_imports: usize, } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 0de5a48450..00500f5e2c 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -114,15 +114,15 @@ pub trait TransactionPool: Send + Sync + Clone { /// inserted into the pool that are allowed to be propagated. /// /// Note: This is intended for networking and will __only__ yield transactions that are allowed - /// to be propagated over the network. + /// to be propagated over the network, see also [TransactionListenerKind]. /// /// Consumer: RPC/P2P fn pending_transactions_listener(&self) -> Receiver { self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly) } - /// Returns a new Stream that yields transactions hashes for new __pending__ transactions - /// inserted into the pool depending on the given [TransactionListenerKind] argument. + /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions + /// inserted into the pending pool depending on the given [TransactionListenerKind] argument. fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver; /// Returns a new stream that yields new valid transactions added to the pool. @@ -130,7 +130,7 @@ pub trait TransactionPool: Send + Sync + Clone { self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly) } - /// Returns a new Stream that yields blob "sidecars" (blobs w/ assoc. kzg + /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg /// commitments/proofs) for eip-4844 transactions inserted into the pool fn blob_transaction_sidecars_listener(&self) -> Receiver;