From f29e04dadc55ae69353fc160e342bc52141aac55 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sat, 18 Nov 2023 09:34:57 +0100 Subject: [PATCH] fix: track actually requested transactions (#5483) --- crates/net/network/src/transactions.rs | 46 +++++++++++--------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 24f6f8ffb2..f9ffc138de 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -760,7 +760,7 @@ where } fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) { - trace!(target: "net::tx", ?peer_id, ?kind); + trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change"); self.network.reputation_change(peer_id, kind); self.metrics.reported_bad_transactions.increment(1); } @@ -831,11 +831,10 @@ where while let Poll::Ready(fetch_event) = this.transaction_fetcher.poll(cx) { match fetch_event { FetchEvent::TransactionsFetched { peer_id, transactions } => { - if let Some(txns) = transactions { - this.import_transactions(peer_id, txns, TransactionSource::Response); - } + this.import_transactions(peer_id, transactions, TransactionSource::Response); } FetchEvent::FetchError { peer_id, error } => { + trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed"); this.on_request_error(peer_id, error); } } @@ -857,7 +856,7 @@ where // known that this transaction is bad. (e.g. consensus // rules) if err.is_bad_transaction() && !this.network.is_syncing() { - trace!(target: "net::tx", ?err, "Bad transaction import"); + trace!(target: "net::tx", ?err, "bad pool transaction import"); this.on_bad_import(err.hash); continue } @@ -1008,6 +1007,8 @@ impl TransactionSource { /// An inflight request for `PooledTransactions` from a peer struct GetPooledTxRequest { peer_id: PeerId, + /// Transaction hashes that were requested, for cleanup purposes + requested_hashes: Vec, response: oneshot::Receiver>, } @@ -1026,11 +1027,13 @@ struct GetPooledTxRequestFut { } impl GetPooledTxRequestFut { + #[inline] fn new( peer_id: PeerId, + requested_hashes: Vec, response: oneshot::Receiver>, ) -> Self { - Self { inner: Some(GetPooledTxRequest { peer_id, response }) } + Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } } } @@ -1040,20 +1043,11 @@ impl Future for GetPooledTxRequestFut { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut req = self.as_mut().project().inner.take().expect("polled after completion"); match req.response.poll_unpin(cx) { - Poll::Ready(result) => { - let request_hashes: Vec = match &result { - Ok(Ok(pooled_txs)) => { - pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect() - } - _ => Vec::new(), - }; - - Poll::Ready(GetPooledTxResponse { - peer_id: req.peer_id, - requested_hashes: request_hashes, - result, - }) - } + Poll::Ready(result) => Poll::Ready(GetPooledTxResponse { + peer_id: req.peer_id, + requested_hashes: req.requested_hashes, + result, + }), Poll::Pending => { self.project().inner.set(Some(req)); Poll::Pending @@ -1108,16 +1102,16 @@ impl TransactionFetcher { self.inflight_requests.poll_next_unpin(cx) { return match result { - Ok(Ok(txs)) => { + Ok(Ok(transactions)) => { // clear received hashes - self.remove_inflight_hashes(txs.hashes()); + self.remove_inflight_hashes(transactions.hashes()); // TODO: re-request missing hashes, for now clear all of them self.remove_inflight_hashes(requested_hashes.iter()); Poll::Ready(FetchEvent::TransactionsFetched { peer_id, - transactions: Some(txs.0), + transactions: transactions.0, }) } Ok(Err(req_err)) => { @@ -1189,7 +1183,7 @@ impl TransactionFetcher { let (response, rx) = oneshot::channel(); let req: PeerRequest = PeerRequest::GetPooledTransactions { - request: GetPooledTransactions(announced_hashes), + request: GetPooledTransactions(announced_hashes.clone()), response, }; @@ -1210,7 +1204,7 @@ impl TransactionFetcher { return false } else { //create a new request for it, from that peer - self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx)) + self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, announced_hashes, rx)) } true @@ -1225,7 +1219,7 @@ enum FetchEvent { /// The ID of the peer from which transactions were fetched. peer_id: PeerId, /// The transactions that were fetched, if available. - transactions: Option>, + transactions: Vec, }, /// Triggered when there is an error in fetching transactions. FetchError {