From b713408331ac3faab2629aa61ce19f4c41989cb0 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 22 Jan 2024 16:37:56 +0100 Subject: [PATCH] Bug fix tx fetcher RUST_LOG=trace & add state dump to `debug_assert` error (#6146) --- crates/net/eth-wire/src/types/broadcast.rs | 8 + crates/net/eth-wire/src/types/version.rs | 4 +- crates/net/network/src/cache.rs | 28 ++-- .../net/network/src/transactions/fetcher.rs | 158 ++++++++++++------ crates/net/network/src/transactions/mod.rs | 57 +++++-- 5 files changed, 169 insertions(+), 86 deletions(-) diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index fb9e482841..236924e03a 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -124,6 +124,14 @@ pub enum NewPooledTransactionHashes { // === impl NewPooledTransactionHashes === impl NewPooledTransactionHashes { + /// Returns the message [`EthVersion`]. + pub fn version(&self) -> EthVersion { + match self { + NewPooledTransactionHashes::Eth66(_) => EthVersion::Eth66, + NewPooledTransactionHashes::Eth68(_) => EthVersion::Eth68, + } + } + /// Returns `true` if the payload is valid for the given version pub fn is_valid_for_version(&self, version: EthVersion) -> bool { match self { diff --git a/crates/net/eth-wire/src/types/version.rs b/crates/net/eth-wire/src/types/version.rs index afc093726e..bb1076a034 100644 --- a/crates/net/eth-wire/src/types/version.rs +++ b/crates/net/eth-wire/src/types/version.rs @@ -3,6 +3,8 @@ use std::str::FromStr; +use derive_more::Display; + /// Error thrown when failed to parse a valid [`EthVersion`]. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("Unknown eth protocol version: {0}")] @@ -10,7 +12,7 @@ pub struct ParseVersionError(String); /// The `eth` protocol version. #[repr(u8)] -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Display)] pub enum EthVersion { /// The `eth` protocol version 66. Eth66 = 66, diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 4ccf48e601..70fe033d27 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -1,13 +1,9 @@ use core::hash::BuildHasher; use derive_more::{Deref, DerefMut}; +use itertools::Itertools; use linked_hash_set::LinkedHashSet; use schnellru::{self, ByLength, Limiter, RandomState, Unlimited}; -use std::{ - borrow::Borrow, - fmt::{self, Write}, - hash::Hash, - num::NonZeroUsize, -}; +use std::{borrow::Borrow, fmt, hash::Hash, num::NonZeroUsize}; /// A minimal LRU cache based on a `LinkedHashSet` with limited capacity. /// @@ -115,16 +111,22 @@ impl fmt::Debug for LruMap where K: Hash + PartialEq + fmt::Display, V: fmt::Debug, - L: Limiter, + L: Limiter + fmt::Debug, S: BuildHasher, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut debug_struct = f.debug_struct("LruMap"); - for (k, v) in self.0.iter() { - let mut key_str = String::new(); - write!(&mut key_str, "{k}")?; - debug_struct.field(&key_str, &v); - } + + debug_struct.field("limiter", self.limiter()); + + debug_struct.field( + "inner", + &format_args!( + "Iter: {{{}}}", + self.0.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",") + ), + ); + debug_struct.finish() } } @@ -214,6 +216,6 @@ mod test { let value_2 = Value(22); cache.insert(key_2, value_2); - assert_eq!("LruMap { 2: Value(22), 1: Value(11) }", format!("{cache:?}")) + assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, inner: Iter: { 2: Value(22), 1: Value(11)} }", format!("{cache:?}")) } } diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index bf1f0fa104..366f6554e8 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -3,9 +3,8 @@ use crate::{ message::PeerRequest, }; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; -use itertools::Itertools; use pin_project::pin_project; -use reth_eth_wire::GetPooledTransactions; +use reth_eth_wire::{EthVersion, GetPooledTransactions}; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use schnellru::{ByLength, Unlimited}; @@ -83,17 +82,10 @@ impl TransactionFetcher { } /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`]. - fn update_peer_activity(&mut self, resp: &GetPooledTxResponse) { - let GetPooledTxResponse { peer_id, .. } = resp; - - debug_assert!( - self.active_peers.get(peer_id).is_some(), - "broken invariant `active-peers` and `inflight-requests`" - ); - + fn decrement_inflight_request_count_for(&mut self, peer_id: PeerId) { let remove = || -> bool { - if let Some(inflight_count) = self.active_peers.get(peer_id) { - if *inflight_count <= 1 { + if let Some(inflight_count) = self.active_peers.get(&peer_id) { + if *inflight_count <= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { return true } *inflight_count -= 1; @@ -102,7 +94,7 @@ impl TransactionFetcher { }(); if remove { - self.active_peers.remove(peer_id); + self.active_peers.remove(&peer_id); } } @@ -170,13 +162,16 @@ impl TransactionFetcher { /// /// Returns `true` if hash is included in request. If there is still space in the respective /// response but not enough for the transaction of given hash, `false` is returned. - fn include_eth68_hash(&self, acc_size_response: &mut usize, eth68_hash: TxHash) -> bool { + fn include_eth68_hash(&self, acc_size_response: &mut usize, hash: TxHash) -> bool { debug_assert!( - self.eth68_meta.peek(ð68_hash).is_some(), - "broken invariant `eth68-hash` and `eth68-meta`" + self.eth68_meta.peek(&hash).is_some(), + "can't find eth68 metadata for `%hash` that should be of version eth68, broken invariant `@eth68_meta` and `@self`, +`%hash`: {}, +`@self`: {:?}", + hash, self ); - if let Some(size) = self.eth68_meta.peek(ð68_hash) { + if let Some(size) = self.eth68_meta.peek(&hash) { let next_acc_size = *acc_size_response + size; if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE { @@ -212,11 +207,10 @@ impl TransactionFetcher { hashes.retain(|&hash| match self.include_eth68_hash(&mut acc_size_response, hash) { true => true, false => { - trace!( - target: "net::tx", + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=format!("{hash:#}"), - size=self.eth68_meta.get(&hash).expect("should find size in `eth68-meta`"), + hash=%hash, + size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"), acc_size_response=acc_size_response, MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, "no space for hash in `GetPooledTransactions` request to peer" @@ -251,10 +245,13 @@ impl TransactionFetcher { let mut max_retried_hashes = vec![]; for hash in hashes { - // todo: enforce by adding new type UnknownTxHash + // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 debug_assert!( self.unknown_hashes.peek(&hash).is_some(), - "only hashes that are confirmed as unknown should be buffered" + "`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, +`%hash`: {}, +`@self`: {:?}", + hash, self ); let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; @@ -266,11 +263,20 @@ impl TransactionFetcher { // peer in caller's context has requested hash and is hence not eligible as // fallback peer. if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { + let msg_version = || -> EthVersion { + self.eth68_meta + .peek(&hash) + .map(|_| EthVersion::Eth68) + .unwrap_or(EthVersion::Eth66) + }; + debug!(target: "net::tx", - hash=format!("{hash:#}"), + hash=%hash, retries=retries, + msg_version=%msg_version(), "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" ); + max_retried_hashes.push(hash); continue; } @@ -326,11 +332,14 @@ impl TransactionFetcher { backups.insert(peer_id); return false } + + let msg_version = || -> EthVersion { self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66) }; + // vacant entry - trace!( - target: "net::tx", + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=format!("{hash:#}"), + hash=%hash, + msg_version=%msg_version(), "new hash seen in announcement by peer" ); @@ -343,7 +352,8 @@ impl TransactionFetcher { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=format!("{hash:#}"), + hash=%hash, + msg_version=%msg_version(), "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" ); @@ -368,10 +378,23 @@ impl TransactionFetcher { ) -> Option> { let peer_id: PeerId = peer.request_tx.peer_id; + let msg_version = || -> EthVersion { + new_announced_hashes + .first() + .map(|hash| { + self.eth68_meta + .peek(hash) + .map(|_| EthVersion::Eth68) + .unwrap_or(EthVersion::Eth66) + }) + .expect("`new_announced_hashes` shouldn't be empty") + }; + if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + new_announced_hashes=?new_announced_hashes, + msg_version=%msg_version(), limit=MAX_CONCURRENT_TX_REQUESTS, "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer" ); @@ -381,7 +404,8 @@ impl TransactionFetcher { let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + new_announced_hashes=?new_announced_hashes, + msg_version=%msg_version(), "failed to cache active peer in schnellru::LruMap, dropping request to peer" ); return Some(new_announced_hashes) @@ -390,7 +414,8 @@ impl TransactionFetcher { if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + new_announced_hashes=?new_announced_hashes, + msg_version=%msg_version(), limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER, "limit for concurrent `GetPooledTransactions` requests per peer reached" ); @@ -420,7 +445,6 @@ impl TransactionFetcher { metrics_increment_egress_peer_channel_full(); return Some(new_announced_hashes) } else { - // remove requested hashes from buffered hashes debug_assert!( || -> bool { for hash in &new_announced_hashes { @@ -430,7 +454,10 @@ impl TransactionFetcher { } true }(), - "broken invariant `buffered-hashes` and `unknown-hashes`" + "`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`, +`%new_announced_hashes`: {:?}, +`@self`: {:?}", + new_announced_hashes, self ); // stores a new request future for the request @@ -452,45 +479,46 @@ impl TransactionFetcher { &mut self, hashes: &mut Vec, peer_id: PeerId, - mut acc_eth68_size: Option, + mut acc_size_eth68_response: Option, ) { debug_assert!( - acc_eth68_size.is_none() || { + acc_size_eth68_response.is_none() || { let mut acc_size = 0; for &hash in hashes.iter() { _ = self.include_eth68_hash(&mut acc_size, hash); } - Some(acc_size) == acc_eth68_size + Some(acc_size) == acc_size_eth68_response }, - "broken invariant `acc-eth68-size` and `hashes`" + "an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_eth68_response` and `%hashes`, +`%acc_size_eth68_response`: {:?}, +`%hashes`: {:?}, +`@self`: {:?}", + acc_size_eth68_response, hashes, self ); for hash in self.buffered_hashes.iter() { // if this request is for eth68 txns... - if let Some(acc_size_response) = acc_eth68_size.as_mut() { - if *acc_size_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE { - trace!( - target: "net::tx", + if let Some(acc_size_eth68_response) = acc_size_eth68_response.as_mut() { + if *acc_size_eth68_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE { + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=format!("{hash:#}"), - size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"), - acc_size_response=acc_size_response, + hash=%hash, + acc_size_eth68_response=acc_size_eth68_response, MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, - "found buffered hash for peer but can't fit it into request" + "request to peer full" ); break } // ...and this buffered hash is for an eth68 tx, check the size metadata if self.eth68_meta.get(hash).is_some() && - !self.include_eth68_hash(acc_size_response, *hash) + !self.include_eth68_hash(acc_size_eth68_response, *hash) { - trace!( - target: "net::tx", + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hash=format!("{hash:#}"), - size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"), - acc_size_response=acc_size_response, + hash=%hash, + size=self.eth68_meta.peek(hash).expect("should find size in `eth68-meta`"), + acc_size_eth68_response=acc_size_eth68_response, MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, "found buffered hash for peer but can't fit it into request" ); @@ -502,9 +530,21 @@ impl TransactionFetcher { break } + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%hash, + size=self.eth68_meta.peek(hash), + acc_size_eth68_response=acc_size_eth68_response, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "found buffered hash for request to peer" + ); + debug_assert!( - self.unknown_hashes.peek(hash).is_some(), - "broken invariant `buffered-hashes` and `unknown-hashes`" + self.unknown_hashes.get(hash).is_some(), + "can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, +`%hash`: {}, +`@self`: {:?}", + hash, self ); if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { @@ -532,7 +572,17 @@ impl Stream for TransactionFetcher { if let Poll::Ready(Some(response)) = res { // update peer activity, requests for buffered hashes can only be made to idle // fallback peers - self.update_peer_activity(&response); + let GetPooledTxResponse { peer_id, .. } = response; + + debug_assert!( + self.active_peers.get(&peer_id).is_some(), + "`%peer_id` has been removed from `@active_peers` before inflight request(s) resolved, broken invariant `@active_peers` and `@inflight_requests`, +`%peer_id`: {}, +`@self`: {:?}", + peer_id, self + ); + + self.decrement_inflight_request_count_for(peer_id); let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 79fcff1eff..aac191b170 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -35,7 +35,6 @@ use crate::{ NetworkEvents, NetworkHandle, }; use futures::{stream::FuturesUnordered, Future, StreamExt}; -use itertools::Itertools; use reth_eth_wire::{ EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, @@ -559,13 +558,14 @@ where }; // message version decides how hashes are packed - // if this is a eth68 message, store eth68 tx metadata - if let Some(eth68_msg) = msg.as_eth68() { - for (&hash, (_type, size)) in eth68_msg.metadata_iter() { - self.transaction_fetcher.eth68_meta.insert(hash, size); - } - } - // extract hashes payload + let msg_version = msg.version(); + // extract hashes payload, and sizes if version eth68 + let sizes = msg.as_eth68().map(|eth68_msg| { + eth68_msg + .metadata_iter() + .map(|(&hash, (_type, size))| (hash, size)) + .collect::>() + }); let mut hashes = msg.into_hashes(); // keep track of the transactions the peer knows @@ -593,16 +593,27 @@ where debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", hashes.iter().format(", ")), + hashes=?hashes, + msg_version=?msg_version, "received previously unseen hashes in announcement from peer" ); + if msg_version == EthVersion::Eth68 { + // cache size metadata of unseen hashes + for (hash, size) in sizes.expect("should be at least empty map") { + if hashes.contains(&hash) { + self.transaction_fetcher.eth68_meta.insert(hash, size); + } + } + } + // only send request for hashes to idle peer, otherwise buffer hashes storing peer as // fallback if !self.transaction_fetcher.is_idle(peer_id) { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", hashes.iter().format(", ")), + hashes=?hashes, + msg_version=?msg_version, "buffering hashes announced by busy peer" ); @@ -616,7 +627,8 @@ where if !surplus_hashes.is_empty() { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - surplus_hashes=format!("{surplus_hashes:#?}"), + surplus_hashes=?surplus_hashes, + msg_version=?msg_version, "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" ); @@ -625,7 +637,8 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", hashes.iter().format(", ")), + hashes=?hashes, + msg_version=?msg_version, "sending hashes in `GetPooledTransactions` request to peer's session" ); @@ -641,7 +654,8 @@ where { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")), + failed_to_request_hashes=?failed_to_request_hashes, + msg_version=?msg_version, "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" ); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); @@ -668,7 +682,11 @@ where debug_assert!( self.peers.contains_key(&peer_id), - "broken invariant `peers` and `transaction-fetcher`" + "a dead peer has been returned as idle by `@pop_any_idle_peer`, broken invariant `@peers` and `@transaction_fetcher`, +`%peer_id`: {:?}, +`@peers`: {:?}, +`@transaction_fetcher`: {:?}", + peer_id, self.peers, self.transaction_fetcher ); // fill the request with other buffered hashes that have been announced by the peer @@ -676,12 +694,14 @@ where let Some(hash) = hashes.first() else { return }; let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied(); + let msg_version = + acc_eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size); - trace!( - target: "net::tx", + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", hashes.iter().format(", ")), + hashes=?hashes, + msg_version=?msg_version, "requesting buffered hashes from idle peer" ); @@ -694,7 +714,8 @@ where { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")), + failed_to_request_hashes=?failed_to_request_hashes, + msg_version=?msg_version, "failed sending request to peer's session, buffering hashes" );