diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 77d1510c6b..1008a92502 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -239,17 +239,27 @@ where } /// Request handler for an incoming `NewPooledTransactionHashes` - fn on_new_pooled_transactions(&mut self, peer_id: PeerId, msg: NewPooledTransactionHashes) { + fn on_new_pooled_transaction_hashes( + &mut self, + peer_id: PeerId, + msg: NewPooledTransactionHashes, + ) { // If the node is currently syncing, ignore transactions if self.network.is_syncing() { return } + let mut num_already_seen = 0; + if let Some(peer) = self.peers.get_mut(&peer_id) { let mut transactions = msg.0; // keep track of the transactions the peer knows - peer.transactions.extend(transactions.clone()); + for tx in transactions.iter().copied() { + if !peer.transactions.insert(tx) { + num_already_seen += 1; + } + } self.pool.retain_unknown(&mut transactions); @@ -269,16 +279,20 @@ where self.inflight_requests.push(GetPooledTxRequest { peer_id, response: rx }) } } + + if num_already_seen > 0 { + self.report_bad_message(peer_id); + } } /// Handles dedicated transaction events related tot the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { - self.import_transactions(peer_id, msg.0); + self.import_transactions(peer_id, msg.0, TransactionSource::Broadcast); } NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => { - self.on_new_pooled_transactions(peer_id, msg) + self.on_new_pooled_transaction_hashes(peer_id, msg) } NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => { self.on_get_pooled_transactions(peer_id, request, response) @@ -330,13 +344,21 @@ where } /// Starts the import process for the given transactions. - fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec) { + fn import_transactions( + &mut self, + peer_id: PeerId, + transactions: Vec, + source: TransactionSource, + ) { // If the node is currently syncing, ignore transactions if self.network.is_syncing() { return } + // tracks the quality of the given transactions let mut has_bad_transactions = false; + let mut num_already_seen = 0; + if let Some(peer) = self.peers.get_mut(&peer_id) { for tx in transactions { // recover transaction @@ -347,8 +369,13 @@ where continue }; - // track that the peer knows this transaction - peer.transactions.insert(tx.hash); + // track that the peer knows this transaction, but only if this is a new broadcast. + // If we received the transactions as the response to our GetPooledTransactions + // requests (based on received `NewPooledTransactionHashes`) then we already + // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] + if source.is_broadcast() && !peer.transactions.insert(tx.hash) { + num_already_seen += 1; + } match self.transactions_by_peers.entry(tx.hash) { Entry::Occupied(mut entry) => { @@ -371,7 +398,7 @@ where } } - if has_bad_transactions { + if has_bad_transactions || num_already_seen > 0 { self.report_bad_message(peer_id); } } @@ -430,7 +457,7 @@ where this.inflight_requests.push(req); } Poll::Ready(Ok(Ok(txs))) => { - this.import_transactions(req.peer_id, txs.0); + this.import_transactions(req.peer_id, txs.0, TransactionSource::Response); } Poll::Ready(Ok(Err(_))) => { this.report_bad_message(req.peer_id); @@ -468,6 +495,23 @@ where } } +/// How we received the transactions. +enum TransactionSource { + /// Transactions were broadcast to us via [`Transactions`] message. + Broadcast, + /// Transactions were sent as the response of [`GetPooledTxRequest`] issued by us. + Response, +} + +// === impl TransactionSource === + +impl TransactionSource { + /// Whether the transaction were sent as broadcast. + fn is_broadcast(&self) -> bool { + matches!(self, TransactionSource::Broadcast) + } +} + /// An inflight request for `PooledTransactions` from a peer #[allow(missing_docs)] struct GetPooledTxRequest {