diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 70fe033d27..1c8daffaad 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -9,7 +9,7 @@ use std::{borrow::Borrow, fmt, hash::Hash, num::NonZeroUsize}; /// /// If the length exceeds the set capacity, the oldest element will be removed /// In the limit, for each element inserted the oldest existing element will be removed. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct LruCache { limit: NonZeroUsize, inner: LinkedHashSet, @@ -38,7 +38,7 @@ impl LruCache { /// if one was evicted. pub fn insert_and_get_evicted(&mut self, entry: T) -> (bool, Option) { if self.inner.insert(entry) { - if self.limit.get() == self.inner.len() { + if self.limit.get() < self.inner.len() { // remove the oldest element in the set return (true, self.remove_lru()) } @@ -70,9 +70,9 @@ impl LruCache { self.inner.contains(value) } - /// Returns an iterator over all cached entries + /// Returns an iterator over all cached entries in lru order pub fn iter(&self) -> impl Iterator + '_ { - self.inner.iter() + self.inner.iter().rev() } /// Returns number of elements currently in cache. @@ -99,6 +99,24 @@ where } } +impl fmt::Debug for LruCache +where + T: fmt::Debug + Hash + Eq, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug_struct = f.debug_struct("LruCache"); + + debug_struct.field("limit", &self.limit); + + debug_struct.field( + "res_fn_iter", + &format_args!("Iter: {{{} }}", self.iter().map(|k| format!(" {k:?}")).format(",")), + ); + + debug_struct.finish() + } +} + /// Wrapper of [`schnellru::LruMap`] that implements [`fmt::Debug`]. #[derive(Deref, DerefMut)] pub struct LruMap(schnellru::LruMap) @@ -120,10 +138,10 @@ where debug_struct.field("limiter", self.limiter()); debug_struct.field( - "inner", + "res_fn_iter", &format_args!( - "Iter: {{{}}}", - self.0.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",") + "Iter: {{{} }}", + self.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",") ), ); @@ -216,6 +234,24 @@ mod test { let value_2 = Value(22); cache.insert(key_2, value_2); - assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, inner: Iter: { 2: Value(22), 1: Value(11)} }", format!("{cache:?}")) + assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, res_fn_iter: Iter: { 2: Value(22), 1: Value(11) } }", format!("{cache:?}")) + } + + #[test] + #[allow(dead_code)] + fn test_debug_impl_lru_cache() { + #[derive(Debug, Hash, PartialEq, Eq)] + struct Key(i8); + + let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap()); + let key_1 = Key(1); + cache.insert(key_1); + let key_2 = Key(2); + cache.insert(key_2); + + assert_eq!( + "LruCache { limit: 2, res_fn_iter: Iter: { Key(2), Key(1) } }", + format!("{cache:?}") + ) } } diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 366f6554e8..9a1e89e833 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -57,7 +57,8 @@ pub(super) struct TransactionFetcher { /// All currently active requests for pooled transactions. #[pin] pub(super) inflight_requests: FuturesUnordered, - /// Hashes that are awaiting fetch from an idle peer. + /// Hashes that are awaiting an idle peer so they can be fetched. + // todo: store buffered eth68 and eth66 hashes separately pub(super) buffered_hashes: LruCache, /// Tracks all hashes that are currently being fetched or are buffered, mapping them to /// request retries and last recently seen fallback peers (max one request try for any peer). @@ -150,7 +151,7 @@ impl TransactionFetcher { peer_id: PeerId, ) -> Vec { if hashes.len() < GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { - self.fill_request_for_peer(hashes, peer_id, None); + self.fill_eth66_request_for_peer(hashes, peer_id); return vec![] } hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES) @@ -224,7 +225,7 @@ impl TransactionFetcher { // all hashes included in request and there is still space // todo: compare free space with min tx size if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE { - self.fill_request_for_peer(hashes, peer_id, Some(acc_size_response)); + self.fill_eth68_request_for_peer(hashes, peer_id, &mut acc_size_response); } surplus_hashes @@ -263,7 +264,7 @@ impl TransactionFetcher { // peer in caller's context has requested hash and is hence not eligible as // fallback peer. if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { - let msg_version = || -> EthVersion { + let msg_version = || { self.eth68_meta .peek(&hash) .map(|_| EthVersion::Eth68) @@ -333,7 +334,7 @@ impl TransactionFetcher { return false } - let msg_version = || -> EthVersion { self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66) }; + let msg_version = || self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); // vacant entry trace!(target: "net::tx", @@ -377,8 +378,7 @@ impl TransactionFetcher { metrics_increment_egress_peer_channel_full: impl FnOnce(), ) -> Option> { let peer_id: PeerId = peer.request_tx.peer_id; - - let msg_version = || -> EthVersion { + let msg_version = || { new_announced_hashes .first() .map(|hash| { @@ -471,73 +471,64 @@ impl TransactionFetcher { None } - /// Tries to fill request so that the respective tx response is at its size limit. It does so - /// by taking buffered hashes for which peer is listed as fallback peer. If this is an eth68 - /// request, the accumulated size of transactions corresponding to parameter hashes, must also - /// be passed as parameter. - pub(super) fn fill_request_for_peer( + /// Tries to fill request with eth68 hashes so that the respective tx response is at its size + /// limit. It does so by taking buffered eth68 hashes for which peer is listed as fallback + /// peer. A mutable reference to a list of hashes to request is passed as parameter. + /// + /// Loops through buffered hashes and does: + /// + /// 1. Check acc size against limit, if so stop looping. + /// 2. Check if this buffered hash is an eth68 hash, else skip to next iteration. + /// 3. Check if hash can be included with respect to size metadata and acc size copy. + /// 4. Check if peer is fallback peer for hash and remove, else skip to next iteration. + /// 4. Add hash to hashes list parameter. + /// 5. Overwrite eth68 acc size with copy. + pub(super) fn fill_eth68_request_for_peer( &mut self, hashes: &mut Vec, peer_id: PeerId, - mut acc_size_eth68_response: Option, + acc_size_response: &mut usize, ) { debug_assert!( - acc_size_eth68_response.is_none() || { + { let mut acc_size = 0; for &hash in hashes.iter() { _ = self.include_eth68_hash(&mut acc_size, hash); } - Some(acc_size) == acc_size_eth68_response + acc_size == *acc_size_response }, - "an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_eth68_response` and `%hashes`, -`%acc_size_eth68_response`: {:?}, + "an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_response` and `%hashes`, +`%acc_size_response`: {:?}, `%hashes`: {:?}, `@self`: {:?}", - acc_size_eth68_response, hashes, self + acc_size_response, hashes, self ); for hash in self.buffered_hashes.iter() { - // if this request is for eth68 txns... - if let Some(acc_size_eth68_response) = acc_size_eth68_response.as_mut() { - if *acc_size_eth68_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - acc_size_eth68_response=acc_size_eth68_response, - MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, - "request to peer full" - ); + // copy acc size + let mut next_acc_size = *acc_size_response; - break - } - // ...and this buffered hash is for an eth68 tx, check the size metadata - if self.eth68_meta.get(hash).is_some() && - !self.include_eth68_hash(acc_size_eth68_response, *hash) - { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - size=self.eth68_meta.peek(hash).expect("should find size in `eth68-meta`"), - acc_size_eth68_response=acc_size_eth68_response, - MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, - "found buffered hash for peer but can't fit it into request" - ); + // 1. Check acc size against limit, if so stop looping. + if next_acc_size >= MAX_FULL_TRANSACTIONS_PACKET_SIZE { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + acc_size_eth68_response=acc_size_response, // no change acc size + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "request to peer full" + ); - continue - } - // otherwise fill request based on hashes count - } else if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { break } - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - size=self.eth68_meta.peek(hash), - acc_size_eth68_response=acc_size_eth68_response, - MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, - "found buffered hash for request to peer" - ); + // 2. Check if this buffered hash is an eth68 hash, else skip to next iteration. + if self.eth68_meta.get(hash).is_none() { + continue + } + // 3. Check if hash can be included with respect to size metadata and acc size copy. + // + // mutates acc size copy + if !self.include_eth68_hash(&mut next_acc_size, *hash) { + continue + } debug_assert!( self.unknown_hashes.get(hash).is_some(), @@ -548,13 +539,91 @@ impl TransactionFetcher { ); if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { - // upgrade this peer from fallback peer + // 4. Check if peer is fallback peer for hash and remove, else skip to next + // iteration. + // + // upgrade this peer from fallback peer, soon to be active peer with inflight + // request. since 1 retry per peer per tx hash on this tx fetcher layer, remove + // peer. if fallback_peers.remove(&peer_id) { - hashes.push(*hash) + // 4. Add hash to hashes list parameter. + hashes.push(*hash); + // 5. Overwrite eth68 acc size with copy. + *acc_size_response = next_acc_size; + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%hash, + acc_size_eth68_response=acc_size_response, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "found buffered hash for request to peer" + ); } } } + // remove hashes that will be included in request from buffer + for hash in hashes { + self.buffered_hashes.remove(hash); + } + } + + /// Tries to fill request with eth66 hashes so that the respective tx response is at its size + /// limit. It does so by taking buffered hashes for which peer is listed as fallback peer. A + /// mutable reference to a list of hashes to request is passed as parameter. + /// + /// Loops through buffered hashes and does: + /// + /// 1. Check if this buffered hash is an eth66 hash, else skip to next iteration. + /// 2. Check hashes count in request, if max reached stop looping. + /// 3. Check if peer is fallback peer for hash and remove, else skip to next iteration. + /// 4. Add hash to hashes list parameter. This increases length i.e. hashes count. + /// + /// Removes hashes included in request from buffer. + pub(super) fn fill_eth66_request_for_peer( + &mut self, + hashes: &mut Vec, + peer_id: PeerId, + ) { + for hash in self.buffered_hashes.iter() { + // 1. Check hashes count in request. + if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { + break + } + // 2. Check if this buffered hash is an eth66 hash. + if self.eth68_meta.get(hash).is_some() { + continue + } + + debug_assert!( + self.unknown_hashes.get(hash).is_some(), + "can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, +`%hash`: {}, +`@self`: {:?}", + hash, self + ); + + if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { + // 3. Check if peer is fallback peer for hash and remove. + // + // upgrade this peer from fallback peer, soon to be active peer with inflight + // request. since 1 retry per peer per tx hash on this tx fetcher layer, remove + // peer. + if fallback_peers.remove(&peer_id) { + // 4. Add hash to hashes list parameter. + hashes.push(*hash); + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%hash, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "found buffered hash for request to peer" + ); + } + } + } + + // remove hashes that will be included in request from buffer for hash in hashes { self.buffered_hashes.remove(hash); } @@ -745,8 +814,10 @@ mod test { 1, ]; - // load unseen hashes - for i in 0..6 { + // load unseen hashes in reverse order so index 0 in seen_eth68_hashes and + // seen_eth68_hashes_sizes is lru! + + for i in (0..6).rev() { tx_fetcher.unknown_hashes.insert(eth68_hashes[i], (0, default_cache())); tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]); } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index aac191b170..2d28f279c6 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -594,7 +594,7 @@ where debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?hashes, - msg_version=?msg_version, + msg_version=%msg_version, "received previously unseen hashes in announcement from peer" ); @@ -613,7 +613,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?hashes, - msg_version=?msg_version, + msg_version=%msg_version, "buffering hashes announced by busy peer" ); @@ -628,7 +628,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), surplus_hashes=?surplus_hashes, - msg_version=?msg_version, + msg_version=%msg_version, "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" ); @@ -638,7 +638,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?hashes, - msg_version=?msg_version, + msg_version=%msg_version, "sending hashes in `GetPooledTransactions` request to peer's session" ); @@ -655,7 +655,7 @@ where debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?failed_to_request_hashes, - msg_version=?msg_version, + msg_version=%msg_version, "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" ); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); @@ -693,15 +693,19 @@ where let Some(peer) = self.peers.get(&peer_id) else { return }; let Some(hash) = hashes.first() else { return }; - let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied(); - let msg_version = - acc_eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); - self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size); + let mut eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied(); + if let Some(ref mut size) = eth68_size { + self.transaction_fetcher.fill_eth68_request_for_peer(&mut hashes, peer_id, size); + } else { + self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id); + } + + let msg_version = || eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?hashes, - msg_version=?msg_version, + msg_version=%msg_version(), "requesting buffered hashes from idle peer" ); @@ -715,7 +719,7 @@ where debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?failed_to_request_hashes, - msg_version=?msg_version, + msg_version=%msg_version(), "failed sending request to peer's session, buffering hashes" ); @@ -1698,14 +1702,14 @@ mod tests { tx_manager.peers.insert(peer_id_1, peer_1); // hashes are seen and currently not inflight, with one fallback peer, and are buffered - // for first retry. + // for first retry in reverse order to make index 0 lru let retries = 1; let mut backups = default_cache(); backups.insert(peer_id_1); - tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups.clone())); - tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups)); - tx_fetcher.buffered_hashes.insert(seen_hashes[0]); + tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups.clone())); + tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups)); tx_fetcher.buffered_hashes.insert(seen_hashes[1]); + tx_fetcher.buffered_hashes.insert(seen_hashes[0]); // peer_1 is idle assert!(tx_fetcher.is_idle(peer_id_1)); @@ -1787,6 +1791,8 @@ mod tests { let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])]; let unseen_eth68_hashes_sizes = [MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4]; + // hashes and sizes to buffer in reverse order so that seen_eth68_hashes[0] and + // seen_eth68_hashes_sizes[0] are lru let seen_eth68_hashes = [B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32])]; let seen_eth68_hashes_sizes = [ @@ -1803,12 +1809,26 @@ mod tests { // for first try to fetch. let mut backups = default_cache(); backups.insert(peer_id); - for i in 0..3 { + + // load in reverse order so index 0 in seen_eth68_hashes and seen_eth68_hashes_sizes is + // lru! + + for i in (0..3).rev() { tx_fetcher.unknown_hashes.insert(seen_eth68_hashes[i], (0, backups.clone())); tx_fetcher.eth68_meta.insert(seen_eth68_hashes[i], seen_eth68_hashes_sizes[i]); tx_fetcher.buffered_hashes.insert(seen_eth68_hashes[i]); } + // insert buffered hash for some other peer too, to verify response size accumulation and + // selection from buffered hashes + let peer_id_other = PeerId::new([2; 64]); + let hash_other = B256::from_slice(&[6; 32]); + let mut backups = default_cache(); + backups.insert(peer_id_other); + tx_fetcher.unknown_hashes.insert(hash_other, (0, backups)); + tx_fetcher.eth68_meta.insert(hash_other, MAX_FULL_TRANSACTIONS_PACKET_SIZE - 2); // a big tx + tx_fetcher.buffered_hashes.insert(hash_other); + let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version); tx_manager.peers.insert(peer_id, peer); @@ -1823,9 +1843,9 @@ mod tests { let tx_fetcher = &mut tx_manager.transaction_fetcher; // since hashes are unseen, length of unknown hashes increases - assert_eq!(tx_fetcher.unknown_hashes.len(), 5); + assert_eq!(tx_fetcher.unknown_hashes.len(), 6); // seen_eth68_hashes[1] should be taken out of buffer and packed into request - assert_eq!(tx_fetcher.buffered_hashes.len(), 2); + assert_eq!(tx_fetcher.buffered_hashes.len(), 3); assert!(tx_fetcher.buffered_hashes.contains(&seen_eth68_hashes[0])); // mock session of peer receives request