Bug fix, tx fetcher fill_request_for_peer (#6150)

This commit is contained in:
Emilia Hane
2024-01-22 21:03:10 +01:00
committed by GitHub
parent a30d149b05
commit 021d236f8c
3 changed files with 212 additions and 85 deletions

View File

@@ -9,7 +9,7 @@ use std::{borrow::Borrow, fmt, hash::Hash, num::NonZeroUsize};
///
/// If the length exceeds the set capacity, the oldest element will be removed
/// In the limit, for each element inserted the oldest existing element will be removed.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct LruCache<T: Hash + Eq> {
limit: NonZeroUsize,
inner: LinkedHashSet<T>,
@@ -38,7 +38,7 @@ impl<T: Hash + Eq> LruCache<T> {
/// if one was evicted.
pub fn insert_and_get_evicted(&mut self, entry: T) -> (bool, Option<T>) {
if self.inner.insert(entry) {
if self.limit.get() == self.inner.len() {
if self.limit.get() < self.inner.len() {
// remove the oldest element in the set
return (true, self.remove_lru())
}
@@ -70,9 +70,9 @@ impl<T: Hash + Eq> LruCache<T> {
self.inner.contains(value)
}
/// Returns an iterator over all cached entries
/// Returns an iterator over all cached entries in lru order
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
self.inner.iter()
self.inner.iter().rev()
}
/// Returns number of elements currently in cache.
@@ -99,6 +99,24 @@ where
}
}
impl<T> fmt::Debug for LruCache<T>
where
T: fmt::Debug + Hash + Eq,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("LruCache");
debug_struct.field("limit", &self.limit);
debug_struct.field(
"res_fn_iter",
&format_args!("Iter: {{{} }}", self.iter().map(|k| format!(" {k:?}")).format(",")),
);
debug_struct.finish()
}
}
/// Wrapper of [`schnellru::LruMap`] that implements [`fmt::Debug`].
#[derive(Deref, DerefMut)]
pub struct LruMap<K, V, L = ByLength, S = RandomState>(schnellru::LruMap<K, V, L, S>)
@@ -120,10 +138,10 @@ where
debug_struct.field("limiter", self.limiter());
debug_struct.field(
"inner",
"res_fn_iter",
&format_args!(
"Iter: {{{}}}",
self.0.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",")
"Iter: {{{} }}",
self.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",")
),
);
@@ -216,6 +234,24 @@ mod test {
let value_2 = Value(22);
cache.insert(key_2, value_2);
assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, inner: Iter: { 2: Value(22), 1: Value(11)} }", format!("{cache:?}"))
assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, res_fn_iter: Iter: { 2: Value(22), 1: Value(11) } }", format!("{cache:?}"))
}
#[test]
#[allow(dead_code)]
fn test_debug_impl_lru_cache() {
#[derive(Debug, Hash, PartialEq, Eq)]
struct Key(i8);
let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap());
let key_1 = Key(1);
cache.insert(key_1);
let key_2 = Key(2);
cache.insert(key_2);
assert_eq!(
"LruCache { limit: 2, res_fn_iter: Iter: { Key(2), Key(1) } }",
format!("{cache:?}")
)
}
}

View File

@@ -57,7 +57,8 @@ pub(super) struct TransactionFetcher {
/// All currently active requests for pooled transactions.
#[pin]
pub(super) inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Hashes that are awaiting fetch from an idle peer.
/// Hashes that are awaiting an idle peer so they can be fetched.
// todo: store buffered eth68 and eth66 hashes separately
pub(super) buffered_hashes: LruCache<TxHash>,
/// Tracks all hashes that are currently being fetched or are buffered, mapping them to
/// request retries and last recently seen fallback peers (max one request try for any peer).
@@ -150,7 +151,7 @@ impl TransactionFetcher {
peer_id: PeerId,
) -> Vec<TxHash> {
if hashes.len() < GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
self.fill_request_for_peer(hashes, peer_id, None);
self.fill_eth66_request_for_peer(hashes, peer_id);
return vec![]
}
hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES)
@@ -224,7 +225,7 @@ impl TransactionFetcher {
// all hashes included in request and there is still space
// todo: compare free space with min tx size
if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE {
self.fill_request_for_peer(hashes, peer_id, Some(acc_size_response));
self.fill_eth68_request_for_peer(hashes, peer_id, &mut acc_size_response);
}
surplus_hashes
@@ -263,7 +264,7 @@ impl TransactionFetcher {
// peer in caller's context has requested hash and is hence not eligible as
// fallback peer.
if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH {
let msg_version = || -> EthVersion {
let msg_version = || {
self.eth68_meta
.peek(&hash)
.map(|_| EthVersion::Eth68)
@@ -333,7 +334,7 @@ impl TransactionFetcher {
return false
}
let msg_version = || -> EthVersion { self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66) };
let msg_version = || self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);
// vacant entry
trace!(target: "net::tx",
@@ -377,8 +378,7 @@ impl TransactionFetcher {
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<Vec<TxHash>> {
let peer_id: PeerId = peer.request_tx.peer_id;
let msg_version = || -> EthVersion {
let msg_version = || {
new_announced_hashes
.first()
.map(|hash| {
@@ -471,73 +471,64 @@ impl TransactionFetcher {
None
}
/// Tries to fill request so that the respective tx response is at its size limit. It does so
/// by taking buffered hashes for which peer is listed as fallback peer. If this is an eth68
/// request, the accumulated size of transactions corresponding to parameter hashes, must also
/// be passed as parameter.
pub(super) fn fill_request_for_peer(
/// Tries to fill request with eth68 hashes so that the respective tx response is at its size
/// limit. It does so by taking buffered eth68 hashes for which peer is listed as fallback
/// peer. A mutable reference to a list of hashes to request is passed as parameter.
///
/// Loops through buffered hashes and does:
///
/// 1. Check acc size against limit, if so stop looping.
/// 2. Check if this buffered hash is an eth68 hash, else skip to next iteration.
/// 3. Check if hash can be included with respect to size metadata and acc size copy.
/// 4. Check if peer is fallback peer for hash and remove, else skip to next iteration.
/// 4. Add hash to hashes list parameter.
/// 5. Overwrite eth68 acc size with copy.
pub(super) fn fill_eth68_request_for_peer(
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
mut acc_size_eth68_response: Option<usize>,
acc_size_response: &mut usize,
) {
debug_assert!(
acc_size_eth68_response.is_none() || {
{
let mut acc_size = 0;
for &hash in hashes.iter() {
_ = self.include_eth68_hash(&mut acc_size, hash);
}
Some(acc_size) == acc_size_eth68_response
acc_size == *acc_size_response
},
"an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_eth68_response` and `%hashes`,
`%acc_size_eth68_response`: {:?},
"an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_response` and `%hashes`,
`%acc_size_response`: {:?},
`%hashes`: {:?},
`@self`: {:?}",
acc_size_eth68_response, hashes, self
acc_size_response, hashes, self
);
for hash in self.buffered_hashes.iter() {
// if this request is for eth68 txns...
if let Some(acc_size_eth68_response) = acc_size_eth68_response.as_mut() {
if *acc_size_eth68_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"request to peer full"
);
// copy acc size
let mut next_acc_size = *acc_size_response;
break
}
// ...and this buffered hash is for an eth68 tx, check the size metadata
if self.eth68_meta.get(hash).is_some() &&
!self.include_eth68_hash(acc_size_eth68_response, *hash)
{
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
size=self.eth68_meta.peek(hash).expect("should find size in `eth68-meta`"),
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for peer but can't fit it into request"
);
// 1. Check acc size against limit, if so stop looping.
if next_acc_size >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
acc_size_eth68_response=acc_size_response, // no change acc size
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"request to peer full"
);
continue
}
// otherwise fill request based on hashes count
} else if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
break
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
size=self.eth68_meta.peek(hash),
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for request to peer"
);
// 2. Check if this buffered hash is an eth68 hash, else skip to next iteration.
if self.eth68_meta.get(hash).is_none() {
continue
}
// 3. Check if hash can be included with respect to size metadata and acc size copy.
//
// mutates acc size copy
if !self.include_eth68_hash(&mut next_acc_size, *hash) {
continue
}
debug_assert!(
self.unknown_hashes.get(hash).is_some(),
@@ -548,13 +539,91 @@ impl TransactionFetcher {
);
if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) {
// upgrade this peer from fallback peer
// 4. Check if peer is fallback peer for hash and remove, else skip to next
// iteration.
//
// upgrade this peer from fallback peer, soon to be active peer with inflight
// request. since 1 retry per peer per tx hash on this tx fetcher layer, remove
// peer.
if fallback_peers.remove(&peer_id) {
hashes.push(*hash)
// 4. Add hash to hashes list parameter.
hashes.push(*hash);
// 5. Overwrite eth68 acc size with copy.
*acc_size_response = next_acc_size;
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
acc_size_eth68_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for request to peer"
);
}
}
}
// remove hashes that will be included in request from buffer
for hash in hashes {
self.buffered_hashes.remove(hash);
}
}
/// Tries to fill request with eth66 hashes so that the respective tx response is at its size
/// limit. It does so by taking buffered hashes for which peer is listed as fallback peer. A
/// mutable reference to a list of hashes to request is passed as parameter.
///
/// Loops through buffered hashes and does:
///
/// 1. Check if this buffered hash is an eth66 hash, else skip to next iteration.
/// 2. Check hashes count in request, if max reached stop looping.
/// 3. Check if peer is fallback peer for hash and remove, else skip to next iteration.
/// 4. Add hash to hashes list parameter. This increases length i.e. hashes count.
///
/// Removes hashes included in request from buffer.
pub(super) fn fill_eth66_request_for_peer(
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
) {
for hash in self.buffered_hashes.iter() {
// 1. Check hashes count in request.
if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
break
}
// 2. Check if this buffered hash is an eth66 hash.
if self.eth68_meta.get(hash).is_some() {
continue
}
debug_assert!(
self.unknown_hashes.get(hash).is_some(),
"can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
`%hash`: {},
`@self`: {:?}",
hash, self
);
if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) {
// 3. Check if peer is fallback peer for hash and remove.
//
// upgrade this peer from fallback peer, soon to be active peer with inflight
// request. since 1 retry per peer per tx hash on this tx fetcher layer, remove
// peer.
if fallback_peers.remove(&peer_id) {
// 4. Add hash to hashes list parameter.
hashes.push(*hash);
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for request to peer"
);
}
}
}
// remove hashes that will be included in request from buffer
for hash in hashes {
self.buffered_hashes.remove(hash);
}
@@ -745,8 +814,10 @@ mod test {
1,
];
// load unseen hashes
for i in 0..6 {
// load unseen hashes in reverse order so index 0 in seen_eth68_hashes and
// seen_eth68_hashes_sizes is lru!
for i in (0..6).rev() {
tx_fetcher.unknown_hashes.insert(eth68_hashes[i], (0, default_cache()));
tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]);
}

View File

@@ -594,7 +594,7 @@ where
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?hashes,
msg_version=?msg_version,
msg_version=%msg_version,
"received previously unseen hashes in announcement from peer"
);
@@ -613,7 +613,7 @@ where
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?hashes,
msg_version=?msg_version,
msg_version=%msg_version,
"buffering hashes announced by busy peer"
);
@@ -628,7 +628,7 @@ where
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=?surplus_hashes,
msg_version=?msg_version,
msg_version=%msg_version,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
@@ -638,7 +638,7 @@ where
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?hashes,
msg_version=?msg_version,
msg_version=%msg_version,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
@@ -655,7 +655,7 @@ where
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes,
msg_version=?msg_version,
msg_version=%msg_version,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
@@ -693,15 +693,19 @@ where
let Some(peer) = self.peers.get(&peer_id) else { return };
let Some(hash) = hashes.first() else { return };
let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied();
let msg_version =
acc_eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);
self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size);
let mut eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied();
if let Some(ref mut size) = eth68_size {
self.transaction_fetcher.fill_eth68_request_for_peer(&mut hashes, peer_id, size);
} else {
self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id);
}
let msg_version = || eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?hashes,
msg_version=?msg_version,
msg_version=%msg_version(),
"requesting buffered hashes from idle peer"
);
@@ -715,7 +719,7 @@ where
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes,
msg_version=?msg_version,
msg_version=%msg_version(),
"failed sending request to peer's session, buffering hashes"
);
@@ -1698,14 +1702,14 @@ mod tests {
tx_manager.peers.insert(peer_id_1, peer_1);
// hashes are seen and currently not inflight, with one fallback peer, and are buffered
// for first retry.
// for first retry in reverse order to make index 0 lru
let retries = 1;
let mut backups = default_cache();
backups.insert(peer_id_1);
tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups.clone()));
tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups));
tx_fetcher.buffered_hashes.insert(seen_hashes[0]);
tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups.clone()));
tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups));
tx_fetcher.buffered_hashes.insert(seen_hashes[1]);
tx_fetcher.buffered_hashes.insert(seen_hashes[0]);
// peer_1 is idle
assert!(tx_fetcher.is_idle(peer_id_1));
@@ -1787,6 +1791,8 @@ mod tests {
let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
let unseen_eth68_hashes_sizes =
[MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4];
// hashes and sizes to buffer in reverse order so that seen_eth68_hashes[0] and
// seen_eth68_hashes_sizes[0] are lru
let seen_eth68_hashes =
[B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32])];
let seen_eth68_hashes_sizes = [
@@ -1803,12 +1809,26 @@ mod tests {
// for first try to fetch.
let mut backups = default_cache();
backups.insert(peer_id);
for i in 0..3 {
// load in reverse order so index 0 in seen_eth68_hashes and seen_eth68_hashes_sizes is
// lru!
for i in (0..3).rev() {
tx_fetcher.unknown_hashes.insert(seen_eth68_hashes[i], (0, backups.clone()));
tx_fetcher.eth68_meta.insert(seen_eth68_hashes[i], seen_eth68_hashes_sizes[i]);
tx_fetcher.buffered_hashes.insert(seen_eth68_hashes[i]);
}
// insert buffered hash for some other peer too, to verify response size accumulation and
// selection from buffered hashes
let peer_id_other = PeerId::new([2; 64]);
let hash_other = B256::from_slice(&[6; 32]);
let mut backups = default_cache();
backups.insert(peer_id_other);
tx_fetcher.unknown_hashes.insert(hash_other, (0, backups));
tx_fetcher.eth68_meta.insert(hash_other, MAX_FULL_TRANSACTIONS_PACKET_SIZE - 2); // a big tx
tx_fetcher.buffered_hashes.insert(hash_other);
let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version);
tx_manager.peers.insert(peer_id, peer);
@@ -1823,9 +1843,9 @@ mod tests {
let tx_fetcher = &mut tx_manager.transaction_fetcher;
// since hashes are unseen, length of unknown hashes increases
assert_eq!(tx_fetcher.unknown_hashes.len(), 5);
assert_eq!(tx_fetcher.unknown_hashes.len(), 6);
// seen_eth68_hashes[1] should be taken out of buffer and packed into request
assert_eq!(tx_fetcher.buffered_hashes.len(), 2);
assert_eq!(tx_fetcher.buffered_hashes.len(), 3);
assert!(tx_fetcher.buffered_hashes.contains(&seen_eth68_hashes[0]));
// mock session of peer receives request