From 75132dabbab6c3aebaa749bf528fc090b8e04aab Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 5 Feb 2024 23:39:45 +0100 Subject: [PATCH] Fix bug hashes buffered for busy peer gave false-positive on eth68 check (#6427) --- crates/net/eth-wire/src/types/broadcast.rs | 144 +++++++++++++++++- .../net/network/src/transactions/fetcher.rs | 115 +++++++------- crates/net/network/src/transactions/mod.rs | 63 ++++---- .../network/src/transactions/validation.rs | 16 +- 4 files changed, 243 insertions(+), 95 deletions(-) diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index bdbfa14565..b7b95d0441 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -4,6 +4,7 @@ use crate::{EthMessage, EthVersion}; use alloy_rlp::{ Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper, }; +use derive_more::{Deref, DerefMut, IntoIterator}; use reth_codecs::derive_arbitrary; use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128}; @@ -447,8 +448,14 @@ pub trait HandleAnnouncement { /// The announcement contains no entries. fn is_empty(&self) -> bool; + /// Returns the number of entries. + fn len(&self) -> usize; + /// Retain only entries for which the hash in the entry, satisfies a given predicate. fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool); + + /// Returns the announcement version, either [`EthVersion::Eth66`] or [`EthVersion::Eth68`]. + fn msg_version(&self) -> EthVersion; } impl HandleAnnouncement for NewPooledTransactionHashes { @@ -456,12 +463,20 @@ impl HandleAnnouncement for NewPooledTransactionHashes { self.is_empty() } + fn len(&self) -> usize { + self.len() + } + fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) { match self { NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f), NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f), } } + + fn msg_version(&self) -> EthVersion { + self.version() + } } impl HandleAnnouncement for NewPooledTransactionHashes68 { @@ -469,6 +484,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 { self.hashes.is_empty() } + fn len(&self) -> usize { + self.hashes.len() + } + fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { let mut indices_to_remove = vec![]; for (i, &hash) in self.hashes.iter().enumerate() { @@ -483,6 +502,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 { self.sizes.remove(index); } } + + fn msg_version(&self) -> EthVersion { + EthVersion::Eth68 + } } impl HandleAnnouncement for NewPooledTransactionHashes66 { @@ -490,6 +513,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 { self.0.is_empty() } + fn len(&self) -> usize { + self.0.len() + } + fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { let mut indices_to_remove = vec![]; for (i, &hash) in self.0.iter().enumerate() { @@ -502,34 +529,139 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 { self.0.remove(index); } } + + fn msg_version(&self) -> EthVersion { + EthVersion::Eth66 + } } /// Announcement data that has been validated according to the configured network. For an eth68 /// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66 /// announcement, values of the map are `None`. -pub type ValidAnnouncementData = HashMap>; +#[derive(Debug, IntoIterator)] +pub struct ValidAnnouncementData { + #[into_iterator] + data: HashMap>, + version: EthVersion, +} + +impl ValidAnnouncementData { + /// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`] + /// announcement data. + pub fn new_eth68(data: HashMap>) -> Self { + Self { data, version: EthVersion::Eth68 } + } + + /// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`] + /// announcement data. + pub fn new_eth66(data: HashMap>) -> Self { + Self { data, version: EthVersion::Eth66 } + } + + /// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth68`] + /// announcement. + pub fn empty_eth68() -> Self { + Self { data: HashMap::new(), version: EthVersion::Eth68 } + } + + /// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth66`] + /// announcement. + pub fn empty_eth66() -> Self { + Self { data: HashMap::new(), version: EthVersion::Eth66 } + } + + /// Destructs returning the validated data. + pub fn into_data(self) -> HashMap> { + self.data + } +} impl HandleAnnouncement for ValidAnnouncementData { fn is_empty(&self) -> bool { - self.is_empty() + self.data.is_empty() + } + + fn len(&self) -> usize { + self.data.len() } fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { - self.retain(|&hash, _| f(hash)) + self.data.retain(|&hash, _| f(hash)) + } + + fn msg_version(&self) -> EthVersion { + self.version } } /// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68 /// metadata should have been cached already. -pub type ValidTxHashes = Vec; +#[derive(Debug, Deref, DerefMut, IntoIterator)] +pub struct ValidTxHashes { + #[deref] + #[deref_mut] + #[into_iterator] + hashes: Vec, + version: EthVersion, +} + +impl ValidTxHashes { + /// Returns a new [`ValidTxHashes`] wrapper around validated hashes. Takes a list of validated + /// hashes as parameter along with the eth version. + pub fn new(hashes: Vec, version: EthVersion) -> Self { + Self { hashes, version } + } + + /// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid + /// [`EthVersion::Eth68`] announcement data. Takes a list of validated hashes as parameter. + pub fn new_eth68(hashes: Vec) -> Self { + Self::new(hashes, EthVersion::Eth68) + } + + /// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid + /// [`EthVersion::Eth66`] announcement data. Takes a list of validated hashes as parameter. + pub fn new_eth66(hashes: Vec) -> Self { + Self::new(hashes, EthVersion::Eth66) + } + + /// Returns a new [`ValidTxHashes`] with empty hashes. + pub fn empty(version: EthVersion) -> Self { + Self { hashes: vec![], version } + } + + /// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth68`] + /// announcement. + pub fn empty_eth68() -> Self { + Self::empty(EthVersion::Eth68) + } + + /// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth66`] + /// announcement. + pub fn empty_eth66() -> Self { + Self::empty(EthVersion::Eth66) + } + + /// Destructs returning the validated hashes. + pub fn into_hashes(self) -> Vec { + self.hashes + } +} impl HandleAnnouncement for ValidTxHashes { fn is_empty(&self) -> bool { - self.is_empty() + self.hashes.is_empty() + } + + fn len(&self) -> usize { + self.hashes.len() } fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { - self.retain(|&hash| f(hash)) + self.hashes.retain(|&hash| f(hash)) + } + + fn msg_version(&self) -> EthVersion { + self.version } } diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 0fe0c0ea84..af9bbe4919 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -4,7 +4,7 @@ use crate::{ }; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use pin_project::pin_project; -use reth_eth_wire::{EthVersion, GetPooledTransactions, HandleAnnouncement}; +use reth_eth_wire::{GetPooledTransactions, HandleAnnouncement, ValidTxHashes}; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use schnellru::{ByLength, Unlimited}; @@ -138,10 +138,16 @@ impl TransactionFetcher { } /// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes. - pub(super) fn pack_hashes(&mut self, hashes: &mut Vec, peer_id: PeerId) -> Vec { - let Some(hash) = hashes.first() else { return vec![] }; + pub(super) fn pack_hashes( + &mut self, + hashes: &mut ValidTxHashes, + peer_id: PeerId, + ) -> ValidTxHashes { + if hashes.is_empty() { + return ValidTxHashes::empty(hashes.msg_version()) + }; - if self.eth68_meta.get(hash).is_some() { + if hashes.msg_version().is_eth68() { return self.pack_hashes_eth68(hashes, peer_id) } self.pack_hashes_eth66(hashes) @@ -151,11 +157,13 @@ impl TransactionFetcher { /// If necessary, takes hashes from buffer for which peer is listed as fallback peer. /// /// Returns left over hashes. - pub(super) fn pack_hashes_eth66(&mut self, hashes: &mut Vec) -> Vec { + pub(super) fn pack_hashes_eth66(&mut self, hashes: &mut ValidTxHashes) -> ValidTxHashes { if hashes.len() <= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { - return vec![] + return ValidTxHashes::empty_eth66() } - hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES - 1) + let surplus_hashes = hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES - 1); + + ValidTxHashes::new_eth66(surplus_hashes) } /// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68 @@ -201,13 +209,14 @@ impl TransactionFetcher { /// 4. Return surplus hashes. pub(super) fn pack_hashes_eth68( &mut self, - hashes: &mut Vec, + hashes: &mut ValidTxHashes, peer_id: PeerId, - ) -> Vec { + ) -> ValidTxHashes { if let Some(hash) = hashes.first() { if let Some(size) = self.eth68_meta.get(hash) { if *size >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE { - return hashes.split_off(1) + let surplus_hashes = hashes.split_off(1); + return ValidTxHashes::new_eth68(surplus_hashes) } } } @@ -233,10 +242,10 @@ impl TransactionFetcher { } }); - surplus_hashes + ValidTxHashes::new_eth68(surplus_hashes) } - pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec) { + pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: ValidTxHashes) { // It could be that the txns have been received over broadcast in the time being. hashes.retain(|hash| self.unknown_hashes.get(hash).is_some()); @@ -246,10 +255,12 @@ impl TransactionFetcher { /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be /// passed as `fallback_peer` parameter! Hashes that have been re-requested /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. - pub(super) fn buffer_hashes(&mut self, hashes: Vec, fallback_peer: Option) { + pub(super) fn buffer_hashes(&mut self, hashes: ValidTxHashes, fallback_peer: Option) { let mut max_retried_and_evicted_hashes = vec![]; - for hash in hashes { + let msg_version = hashes.msg_version(); + + for hash in hashes.into_iter() { // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 debug_assert!( self.unknown_hashes.peek(&hash).is_some(), @@ -267,17 +278,10 @@ impl TransactionFetcher { // peer in caller's context has requested hash and is hence not eligible as // fallback peer. if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { - let msg_version = || { - self.eth68_meta - .peek(&hash) - .map(|_| EthVersion::Eth68) - .unwrap_or(EthVersion::Eth66) - }; - debug!(target: "net::tx", hash=%hash, retries=retries, - msg_version=%msg_version(), + msg_version=%msg_version, "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" ); @@ -305,12 +309,16 @@ impl TransactionFetcher { self.remove_from_unknown_hashes(hashes) } - pub(super) fn filter_unseen_hashes( + pub(super) fn filter_unseen_and_pending_hashes( &mut self, new_announced_hashes: &mut T, peer_id: PeerId, is_session_active: impl Fn(PeerId) -> bool, ) { + #[cfg(debug_assertions)] + let mut previously_unseen_hashes = Vec::with_capacity(new_announced_hashes.len() / 4); + let msg_version = new_announced_hashes.msg_version(); + // filter out inflight hashes, and register the peer as fallback for all inflight hashes new_announced_hashes.retain_by_hash(|hash| { // occupied entry @@ -337,12 +345,13 @@ impl TransactionFetcher { } // vacant entry - let msg_version = || self.eth68_meta.peek(&hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); + #[cfg(debug_assertions)] + previously_unseen_hashes.push(hash); trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%hash, - msg_version=%msg_version(), + msg_version=%msg_version, "new hash seen in announcement by peer" ); @@ -356,7 +365,7 @@ impl TransactionFetcher { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%hash, - msg_version=%msg_version(), + msg_version=%msg_version, "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" ); @@ -364,6 +373,14 @@ impl TransactionFetcher { } true }); + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + previously_unseen_hashes_len=previously_unseen_hashes.len(), + previously_unseen_hashes=?previously_unseen_hashes, + msg_version=%msg_version, + "received previously unseen hashes in announcement from peer" + ); } /// Requests the missing transactions from the announced hashes of the peer. Returns the @@ -375,28 +392,17 @@ impl TransactionFetcher { /// flight. pub(super) fn request_transactions_from_peer( &mut self, - new_announced_hashes: Vec, + new_announced_hashes: ValidTxHashes, peer: &Peer, metrics_increment_egress_peer_channel_full: impl FnOnce(), - ) -> Option> { + ) -> Option { let peer_id: PeerId = peer.request_tx.peer_id; - let msg_version = || { - new_announced_hashes - .first() - .map(|hash| { - self.eth68_meta - .peek(hash) - .map(|_| EthVersion::Eth68) - .unwrap_or(EthVersion::Eth66) - }) - .expect("`new_announced_hashes` shouldn't be empty") - }; if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - new_announced_hashes=?new_announced_hashes, - msg_version=%msg_version(), + new_announced_hashes=?*new_announced_hashes, + msg_version=%new_announced_hashes.msg_version(), limit=MAX_CONCURRENT_TX_REQUESTS, "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer" ); @@ -406,8 +412,8 @@ impl TransactionFetcher { let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - new_announced_hashes=?new_announced_hashes, - msg_version=%msg_version(), + new_announced_hashes=?*new_announced_hashes, + msg_version=%new_announced_hashes.msg_version(), "failed to cache active peer in schnellru::LruMap, dropping request to peer" ); return Some(new_announced_hashes) @@ -416,8 +422,8 @@ impl TransactionFetcher { if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - new_announced_hashes=?new_announced_hashes, - msg_version=%msg_version(), + new_announced_hashes=?*new_announced_hashes, + msg_version=%new_announced_hashes.msg_version(), limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER, "limit for concurrent `GetPooledTransactions` requests per peer reached" ); @@ -428,7 +434,7 @@ impl TransactionFetcher { debug_assert!( || -> bool { - for hash in &new_announced_hashes { + for hash in new_announced_hashes.iter() { if self.buffered_hashes.contains(hash) { return false } @@ -456,7 +462,7 @@ impl TransactionFetcher { let req = req.into_get_pooled_transactions().expect("is get pooled tx"); metrics_increment_egress_peer_channel_full(); - return Some(req.0) + return Some(ValidTxHashes::new(req.0, new_announced_hashes.msg_version())) } } } else { @@ -753,14 +759,14 @@ pub(super) enum FetchEvent { pub(super) struct GetPooledTxRequest { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: Vec, + requested_hashes: ValidTxHashes, response: oneshot::Receiver>, } pub(super) struct GetPooledTxResponse { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: Vec, + requested_hashes: ValidTxHashes, result: Result, RecvError>, } @@ -775,7 +781,7 @@ impl GetPooledTxRequestFut { #[inline] fn new( peer_id: PeerId, - requested_hashes: Vec, + requested_hashes: ValidTxHashes, response: oneshot::Receiver>, ) -> Self { Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } @@ -846,13 +852,16 @@ mod test { tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]); } - let mut eth68_hashes_to_request = eth68_hashes.clone().to_vec(); + let mut eth68_hashes_to_request = ValidTxHashes::new_eth68(eth68_hashes.clone().to_vec()); let surplus_eth68_hashes = tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id); - assert_eq!(surplus_eth68_hashes, vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5])); assert_eq!( - eth68_hashes_to_request, + surplus_eth68_hashes.into_hashes(), + vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5]) + ); + assert_eq!( + eth68_hashes_to_request.into_hashes(), vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4]) ); } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 3b74cd551b..4d18a602a6 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -36,8 +36,9 @@ use crate::{ }; use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ - EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, - NewPooledTransactionHashes68, PooledTransactions, Transactions, + EthVersion, GetPooledTransactions, HandleAnnouncement, NewPooledTransactionHashes, + NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, + ValidTxHashes, }; use reth_interfaces::{ p2p::error::{RequestError, RequestResult}, @@ -590,9 +591,6 @@ where // 2. filter out invalid entries // - // first get the message version, because this will destruct the message - let msg_version = msg.version(); - // // validates messages with respect to the given network, e.g. allowed tx types // let mut hashes = match msg { @@ -604,7 +602,7 @@ where if let FilterOutcome::ReportPeer = outcome { self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); } - valid_data.into_iter().map(|(hash, metadata)| { + let hashes = valid_data.into_iter().map(|(hash, metadata)| { // cache eth68 metadata if let Some((_ty, size)) = metadata { // check if this peer is announcing a different size for an already seen @@ -624,7 +622,9 @@ where self.transaction_fetcher.eth68_meta.insert(hash, size); } hash - }).collect::>() + }).collect::>(); + + ValidTxHashes::new_eth68(hashes) } NewPooledTransactionHashes::Eth66(eth66_msg) => { // validate eth66 announcement data @@ -635,7 +635,9 @@ where self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); } - valid_data.into_keys().collect::>() + let valid_hashes = valid_data.into_data().into_keys().collect::>(); + + ValidTxHashes::new_eth66(valid_hashes) } }; @@ -646,9 +648,11 @@ where // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx // fetcher, hence they should be valid at this point. // - self.transaction_fetcher.filter_unseen_hashes(&mut hashes, peer_id, |peer_id| { - self.peers.contains_key(&peer_id) - }); + self.transaction_fetcher.filter_unseen_and_pending_hashes( + &mut hashes, + peer_id, + |peer_id| self.peers.contains_key(&peer_id), + ); if hashes.is_empty() { // nothing to request @@ -657,9 +661,10 @@ where debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=?hashes, - msg_version=%msg_version, - "received previously unseen hashes in announcement from peer" + hashes_len=hashes.len(), + hashes=?*hashes, + msg_version=%hashes.msg_version(), + "received previously unseen and pending hashes in announcement from peer" ); // only send request for hashes to idle peer, otherwise buffer hashes storing peer as @@ -667,8 +672,8 @@ where if !self.transaction_fetcher.is_idle(peer_id) { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=?hashes, - msg_version=%msg_version, + hashes=?*hashes, + msg_version=%hashes.msg_version(), "buffering hashes announced by busy peer" ); @@ -682,8 +687,8 @@ where if !surplus_hashes.is_empty() { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - surplus_hashes=?surplus_hashes, - msg_version=%msg_version, + surplus_hashes=?*surplus_hashes, + msg_version=%surplus_hashes.msg_version(), "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" ); @@ -692,8 +697,8 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=?hashes, - msg_version=%msg_version, + hashes=?*hashes, + msg_version=%hashes.msg_version(), "sending hashes in `GetPooledTransactions` request to peer's session" ); @@ -709,8 +714,8 @@ where { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - failed_to_request_hashes=?failed_to_request_hashes, - msg_version=%msg_version, + failed_to_request_hashes=?*failed_to_request_hashes, + msg_version=%failed_to_request_hashes.msg_version(), "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" ); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); @@ -755,26 +760,28 @@ where self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id); } - let msg_version = || eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); + let msg_version = peer.version; trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?hashes, - msg_version=%msg_version(), + msg_version=%msg_version, "requesting buffered hashes from idle peer" ); // request the buffered missing transactions let metrics = &self.metrics; if let Some(failed_to_request_hashes) = - self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || { - metrics.egress_peer_channel_full.increment(1) - }) + self.transaction_fetcher.request_transactions_from_peer( + ValidTxHashes::new(hashes, msg_version), + peer, + || metrics.egress_peer_channel_full.increment(1), + ) { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?failed_to_request_hashes, - msg_version=%msg_version(), + msg_version=%msg_version, "failed sending request to peer's session, buffering hashes" ); diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs index a3212fea7e..0f4c43509c 100644 --- a/crates/net/network/src/transactions/validation.rs +++ b/crates/net/network/src/transactions/validation.rs @@ -237,7 +237,7 @@ impl FilterAnnouncement for EthAnnouncementFilter { network=%Self, "empty eth68 announcement" ); - return (FilterOutcome::ReportPeer, HashMap::new()) + return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth68()) } let mut should_report_peer = false; @@ -284,7 +284,7 @@ impl FilterAnnouncement for EthAnnouncementFilter { ( if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok }, - deduped_data, + ValidAnnouncementData::new_eth68(deduped_data), ) } @@ -306,7 +306,7 @@ impl FilterAnnouncement for EthAnnouncementFilter { network=%Self, "empty eth66 announcement" ); - return (FilterOutcome::ReportPeer, HashMap::new()) + return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66()) } // 2. checks if announcement is spam packed with duplicate hashes @@ -324,7 +324,7 @@ impl FilterAnnouncement for EthAnnouncementFilter { } else { FilterOutcome::Ok }, - deduped_data, + ValidAnnouncementData::new_eth66(deduped_data), ) } } @@ -380,7 +380,7 @@ mod test { let mut expected_data = HashMap::new(); expected_data.insert(hashes[1], Some((types[1], sizes[1]))); - assert_eq!(expected_data, data,) + assert_eq!(expected_data, data.into_data()) } #[test] @@ -416,7 +416,7 @@ mod test { let mut expected_data = HashMap::new(); expected_data.insert(hashes[2], Some((types[2], sizes[2]))); - assert_eq!(expected_data, data,) + assert_eq!(expected_data, data.into_data()) } #[test] @@ -456,7 +456,7 @@ mod test { expected_data.insert(hashes[3], Some((types[3], sizes[3]))); expected_data.insert(hashes[0], Some((types[0], sizes[0]))); - assert_eq!(expected_data, data,) + assert_eq!(expected_data, data.into_data()) } #[test] @@ -500,7 +500,7 @@ mod test { expected_data.insert(hashes[1], None); expected_data.insert(hashes[0], None); - assert_eq!(expected_data, data) + assert_eq!(expected_data, data.into_data()) } #[test]