From b8a6f8788472584844518eb8fb4cb0bd4a05c5e5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 12 Aug 2024 18:28:49 +0200 Subject: [PATCH] fix: fix tx propagation when full (#10251) --- crates/net/network/src/transactions/mod.rs | 344 +++++++++++++++--- .../transaction-pool/src/test_utils/mock.rs | 2 +- crates/transaction-pool/src/traits.rs | 10 + 3 files changed, 297 insertions(+), 59 deletions(-) diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index a81aaeb942..db0a5613fc 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -33,7 +33,7 @@ use std::{ use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ - EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, + DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, RequestTxHashes, Transactions, }; @@ -425,9 +425,12 @@ where // Note: Assuming ~random~ order due to random state of the peers map hasher for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() { - // filter all transactions unknown to the peer - let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); - let mut full_transactions = FullTransactionsBuilder::default(); + // determine whether to send full tx objects or hashes. + let mut builder = if peer_idx > max_num_full { + PropagateTransactionsBuilder::pooled(peer.version) + } else { + PropagateTransactionsBuilder::full(peer.version) + }; // Iterate through the transactions to propagate and fill the hashes and full // transaction lists, before deciding whether or not to send full transactions to the @@ -436,31 +439,19 @@ where // Only proceed if the transaction is not in the peer's list of seen transactions if !peer.seen_transactions.contains(&tx.hash()) { // add transaction to the list of hashes to propagate - hashes.push(tx); - - // Do not send full 4844 transaction hashes to peers. - // - // Nodes MUST NOT automatically broadcast blob transactions to their peers. - // Instead, those transactions are only announced using - // `NewPooledTransactionHashes` messages, and can then be manually requested - // via `GetPooledTransactions`. - // - // From: - if !tx.transaction.is_eip4844() { - full_transactions.push(tx); - } + builder.push(tx); } } - let mut new_pooled_hashes = hashes.build(); - if new_pooled_hashes.is_empty() { + if builder.is_empty() { trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions"); continue } - // determine whether to send full tx objects or hashes. If there are no full - // transactions, try to send hashes. - if peer_idx > max_num_full || full_transactions.is_empty() { + let PropagateTransactions { pooled, full } = builder.build(); + + // send hashes if any + if let Some(mut new_pooled_hashes) = pooled { // enforce tx soft limit per message for the (unlikely) event the number of // hashes exceeds it new_pooled_hashes @@ -476,9 +467,10 @@ where // send hashes of transactions self.network.send_transactions_hashes(*peer_id, new_pooled_hashes); - } else { - let new_full_transactions = full_transactions.build(); + } + // send full transactions, if any + if let Some(new_full_transactions) = full { for tx in &new_full_transactions { propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id)); // mark transaction as seen by peer @@ -498,9 +490,9 @@ where propagated } - /// Propagate the full transactions to a specific peer + /// Propagate the full transactions to a specific peer. /// - /// Returns the propagated transactions + /// Returns the propagated transactions. fn propagate_full_transactions_to_peer( &mut self, txs: Vec, @@ -512,33 +504,45 @@ where let mut propagated = PropagatedTransactions::default(); // filter all transactions unknown to the peer - let mut full_transactions = FullTransactionsBuilder::default(); + let mut full_transactions = FullTransactionsBuilder::new(peer.version); - let to_propagate = self - .pool - .get_all(txs) - .into_iter() - .filter(|tx| !tx.transaction.is_eip4844()) - .map(PropagateTransaction::new); + let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new); // Iterate through the transactions to propagate and fill the hashes and full transaction for tx in to_propagate { - if peer.seen_transactions.insert(tx.hash()) { + if !peer.seen_transactions.contains(&tx.hash()) { full_transactions.push(&tx); } } - if full_transactions.transactions.is_empty() { + if full_transactions.is_empty() { // nothing to propagate return None } - let new_full_transactions = full_transactions.build(); - for tx in &new_full_transactions { - propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id)); + let PropagateTransactions { pooled, full } = full_transactions.build(); + + // send hashes if any + if let Some(new_pooled_hashes) = pooled { + for hash in new_pooled_hashes.iter_hashes().copied() { + propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); + // mark transaction as seen by peer + peer.seen_transactions.insert(hash); + } + // send hashes of transactions + self.network.send_transactions_hashes(peer_id, new_pooled_hashes); + } + + // send full transactions, if any + if let Some(new_full_transactions) = full { + for tx in &new_full_transactions { + propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id)); + // mark transaction as seen by peer + peer.seen_transactions.insert(tx.hash()); + } + // send full transactions + self.network.send_transactions(peer_id, new_full_transactions); } - // send full transactions - self.network.send_transactions(peer_id, new_full_transactions); // Update propagated transactions metrics self.metrics.propagated_transactions.increment(propagated.0.len() as u64); @@ -865,8 +869,8 @@ where let peers = self.peers.keys().copied().collect::>(); tx.send(peers).ok(); } - TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => { - if let Some(propagated) = self.propagate_full_transactions_to_peer(_txs, _peer) { + TransactionsCommand::PropagateTransactionsTo(txs, _peer) => { + if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, _peer) { self.pool.on_propagated(propagated); } } @@ -1368,6 +1372,7 @@ where } /// A transaction that's about to be propagated to multiple peers. +#[derive(Debug, Clone)] struct PropagateTransaction { size: usize, transaction: Arc, @@ -1388,27 +1393,114 @@ impl PropagateTransaction { } } +/// Helper type to construct the appropriate message to send to the peer based on whether the peer +/// should receive them in full or as pooled +#[derive(Debug, Clone)] +enum PropagateTransactionsBuilder { + Pooled(PooledTransactionsHashesBuilder), + Full(FullTransactionsBuilder), +} + +impl PropagateTransactionsBuilder { + /// Create a builder for pooled transactions + fn pooled(version: EthVersion) -> Self { + Self::Pooled(PooledTransactionsHashesBuilder::new(version)) + } + + /// Create a builder that sends transactions in full and records transactions that don't fit. + fn full(version: EthVersion) -> Self { + Self::Full(FullTransactionsBuilder::new(version)) + } + + /// Appends a transaction to the list. + fn push(&mut self, transaction: &PropagateTransaction) { + match self { + Self::Pooled(builder) => builder.push(transaction), + Self::Full(builder) => builder.push(transaction), + } + } + + /// Returns true if no transactions are recorded. + fn is_empty(&self) -> bool { + match self { + Self::Pooled(builder) => builder.is_empty(), + Self::Full(builder) => builder.is_empty(), + } + } + + /// Consumes the type and returns the built messages that should be sent to the peer. + fn build(self) -> PropagateTransactions { + match self { + Self::Pooled(pooled) => { + PropagateTransactions { pooled: Some(pooled.build()), full: None } + } + Self::Full(full) => full.build(), + } + } +} + +/// Represents how the transactions should be sent to a peer if any. +struct PropagateTransactions { + /// The pooled transaction hashes to send. + pooled: Option, + /// The transactions to send in full. + full: Option>>, +} + /// Helper type for constructing the full transaction message that enforces the -/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`]. -#[derive(Default)] +/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast +/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be +/// broadcasted in full. +#[derive(Debug, Clone)] struct FullTransactionsBuilder { + /// The soft limit to enforce for a single broadcast message of full transactions. total_size: usize, + /// All transactions to be broadcasted. transactions: Vec>, + /// Transactions that didn't fit into the broadcast message + pooled: PooledTransactionsHashesBuilder, } // === impl FullTransactionsBuilder === impl FullTransactionsBuilder { - /// Append a transaction to the list if the total message bytes size doesn't exceed the soft - /// maximum target byte size. The limit is soft, meaning if one single transaction goes over - /// the limit, it will be broadcasted in its own [`Transactions`] message. The same pattern is - /// followed in filling a [`GetPooledTransactions`] request in + /// Create a builder for the negotiated version of the peer's session + fn new(version: EthVersion) -> Self { + Self { + total_size: 0, + pooled: PooledTransactionsHashesBuilder::new(version), + transactions: vec![], + } + } + + /// Append a transaction to the list of full transaction if the total message bytes size doesn't + /// exceed the soft maximum target byte size. The limit is soft, meaning if one single + /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`] + /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`]. + /// + /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended + /// to list of pooled transactions, (e.g. 4844 transactions). fn push(&mut self, transaction: &PropagateTransaction) { + // Do not send full 4844 transaction hashes to peers. + // + // Nodes MUST NOT automatically broadcast blob transactions to their peers. + // Instead, those transactions are only announced using + // `NewPooledTransactionHashes` messages, and can then be manually requested + // via `GetPooledTransactions`. + // + // From: + if transaction.transaction.is_eip4844() { + self.pooled.push(transaction); + return + } + let new_size = self.total_size + transaction.size; if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE && self.total_size > 0 { + // transaction does not fit into the message + self.pooled.push(transaction); return } @@ -1418,17 +1510,20 @@ impl FullTransactionsBuilder { /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`]. fn is_empty(&self) -> bool { - self.transactions.is_empty() + self.transactions.is_empty() && self.pooled.is_empty() } - /// returns the list of transactions. - fn build(self) -> Vec> { - self.transactions + /// Returns the messages that should be propagated to the peer. + fn build(self) -> PropagateTransactions { + let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty()); + let full = Some(self.transactions).filter(|full| !full.is_empty()); + PropagateTransactions { pooled, full } } } /// A helper type to create the pooled transactions message based on the negotiated version of the /// session with the peer +#[derive(Debug, Clone)] enum PooledTransactionsHashesBuilder { Eth66(NewPooledTransactionHashes66), Eth68(NewPooledTransactionHashes68), @@ -1449,6 +1544,14 @@ impl PooledTransactionsHashesBuilder { } } + /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`]. + fn is_empty(&self) -> bool { + match self { + Self::Eth66(hashes) => hashes.is_empty(), + Self::Eth68(hashes) => hashes.is_empty(), + } + } + fn push(&mut self, tx: &PropagateTransaction) { match self { Self::Eth66(msg) => msg.0.push(tx.hash()), @@ -1627,13 +1730,20 @@ mod tests { }; use reth_primitives::hex; use reth_provider::test_utils::NoopProvider; - use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; + use reth_transaction_pool::test_utils::{ + testing_pool, MockTransaction, MockTransactionFactory, TestPool, + }; use secp256k1::SecretKey; - use std::{fmt, future::poll_fn, hash}; + use std::{ + fmt, + future::poll_fn, + hash, + net::{IpAddr, Ipv4Addr, SocketAddr}, + }; use tests::fetcher::TxFetchMetadata; use tracing::error; - async fn new_tx_manager() -> TransactionsManager { + async fn new_tx_manager() -> (TransactionsManager, NetworkManager) { let secret_key = SecretKey::new(&mut rand::thread_rng()); let client = NoopProvider::default(); @@ -1646,14 +1756,14 @@ mod tests { let pool = testing_pool(); let transactions_manager_config = config.transactions_manager_config.clone(); - let (_network_handle, _network, transactions, _) = NetworkManager::new(config) + let (_network_handle, network, transactions, _) = NetworkManager::new(config) .await .unwrap() .into_builder() .transactions(pool.clone(), transactions_manager_config) .split_with_handle(); - transactions + (transactions, network) } pub(super) fn default_cache() -> LruCache { @@ -2034,7 +2144,7 @@ mod tests { async fn test_max_retries_tx_request() { reth_tracing::init_test_tracing(); - let mut tx_manager = new_tx_manager().await; + let mut tx_manager = new_tx_manager().await.0; let tx_fetcher = &mut tx_manager.transaction_fetcher; let peer_id_1 = PeerId::new([1; 64]); @@ -2142,4 +2252,122 @@ mod tests { assert!(tx_fetcher.hashes_pending_fetch.is_empty()); assert_eq!(tx_fetcher.active_peers.len(), 0); } + + #[test] + fn test_transaction_builder_empty() { + let mut builder = PropagateTransactionsBuilder::pooled(EthVersion::Eth68); + assert!(builder.is_empty()); + + let mut factory = MockTransactionFactory::default(); + let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559())); + builder.push(&tx); + assert!(!builder.is_empty()); + + let txs = builder.build(); + assert!(txs.full.is_none()); + let txs = txs.pooled.unwrap(); + assert_eq!(txs.len(), 1); + } + + #[test] + fn test_transaction_builder_large() { + let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68); + assert!(builder.is_empty()); + + let mut factory = MockTransactionFactory::default(); + let mut tx = factory.create_eip1559(); + // create a transaction that still fits + tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1); + let tx = Arc::new(tx); + let tx = PropagateTransaction::new(tx); + builder.push(&tx); + assert!(!builder.is_empty()); + + let txs = builder.clone().build(); + assert!(txs.pooled.is_none()); + let txs = txs.full.unwrap(); + assert_eq!(txs.len(), 1); + + builder.push(&tx); + + let txs = builder.clone().build(); + let pooled = txs.pooled.unwrap(); + assert_eq!(pooled.len(), 1); + let txs = txs.full.unwrap(); + assert_eq!(txs.len(), 1); + } + + #[test] + fn test_transaction_builder_eip4844() { + let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68); + assert!(builder.is_empty()); + + let mut factory = MockTransactionFactory::default(); + let tx = PropagateTransaction::new(Arc::new(factory.create_eip4844())); + builder.push(&tx); + assert!(!builder.is_empty()); + + let txs = builder.clone().build(); + assert!(txs.full.is_none()); + let txs = txs.pooled.unwrap(); + assert_eq!(txs.len(), 1); + + let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559())); + builder.push(&tx); + + let txs = builder.clone().build(); + let pooled = txs.pooled.unwrap(); + assert_eq!(pooled.len(), 1); + let txs = txs.full.unwrap(); + assert_eq!(txs.len(), 1); + } + + #[tokio::test] + async fn test_propagate_full() { + reth_tracing::init_test_tracing(); + + let (mut tx_manager, network) = new_tx_manager().await; + let peer_id = PeerId::random(); + + // ensure not syncing + network.handle().update_sync_state(SyncState::Idle); + + // mock a peer + let (tx, _rx) = mpsc::channel(1); + tx_manager.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), + client_version: Arc::from(""), + capabilities: Arc::new(vec![].into()), + messages: PeerRequestSender::new(peer_id, tx), + status: Arc::new(Default::default()), + version: EthVersion::Eth68, + }); + + let mut propagate = vec![]; + let mut factory = MockTransactionFactory::default(); + let eip1559_tx = Arc::new(factory.create_eip1559()); + propagate.push(PropagateTransaction::new(eip1559_tx.clone())); + let eip4844_tx = Arc::new(factory.create_eip4844()); + propagate.push(PropagateTransaction::new(eip4844_tx.clone())); + + let propagated = tx_manager.propagate_transactions(propagate.clone()); + assert_eq!(propagated.0.len(), 2); + let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap(); + assert_eq!(prop_txs.len(), 1); + assert!(prop_txs[0].is_full()); + + let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap(); + assert_eq!(prop_txs.len(), 1); + assert!(prop_txs[0].is_hash()); + + let peer = tx_manager.peers.get(&peer_id).unwrap(); + assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash())); + assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash())); + peer.seen_transactions.contains(eip4844_tx.transaction.hash()); + + // propagate again + let propagated = tx_manager.propagate_transactions(propagate); + assert!(propagated.0.is_empty()); + } } diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index 6687c3e344..339020215b 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -718,7 +718,7 @@ impl PoolTransaction for MockTransaction { /// Returns the encoded length of the transaction. fn encoded_length(&self) -> usize { - 0 + self.size() } /// Returns the chain ID associated with the transaction. diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index a821e9ee57..debb0076b8 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -499,6 +499,16 @@ impl PropagateKind { Self::Full(peer) | Self::Hash(peer) => peer, } } + + /// Returns true if the transaction was sent as a full transaction + pub const fn is_full(&self) -> bool { + matches!(self, Self::Full(_)) + } + + /// Returns true if the transaction was sent as a hash + pub const fn is_hash(&self) -> bool { + matches!(self, Self::Hash(_)) + } } impl From for PeerId {