mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
Fix bug, rebuffer hashes that were received over broadcast (#6316)
This commit is contained in:
@@ -84,6 +84,7 @@ impl TransactionFetcher {
|
||||
for hash in hashes {
|
||||
self.unknown_hashes.remove(&hash);
|
||||
self.eth68_meta.remove(&hash);
|
||||
self.buffered_hashes.remove(&hash);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,28 +241,26 @@ impl TransactionFetcher {
|
||||
surplus_hashes
|
||||
}
|
||||
|
||||
pub(super) fn buffer_hashes_for_retry(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
|
||||
pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec<TxHash>) {
|
||||
// It could be that the txns have been received over broadcast in the time being.
|
||||
hashes.retain(|hash| self.unknown_hashes.get(hash).is_some());
|
||||
|
||||
self.buffer_hashes(hashes, None)
|
||||
}
|
||||
|
||||
/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
|
||||
/// passed as `fallback_peer` parameter! Hashes that have been re-requested
|
||||
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped.
|
||||
pub(super) fn buffer_hashes(
|
||||
&mut self,
|
||||
hashes: impl IntoIterator<Item = TxHash>,
|
||||
fallback_peer: Option<PeerId>,
|
||||
) {
|
||||
let mut max_retried_hashes = vec![];
|
||||
pub(super) fn buffer_hashes(&mut self, hashes: Vec<TxHash>, fallback_peer: Option<PeerId>) {
|
||||
let mut max_retried_and_evicted_hashes = vec![];
|
||||
|
||||
for hash in hashes {
|
||||
// todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68
|
||||
debug_assert!(
|
||||
self.unknown_hashes.peek(&hash).is_some(),
|
||||
"`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
|
||||
`%hash`: {},
|
||||
`@self`: {:?}",
|
||||
hash, self
|
||||
`%hash`: {hash},
|
||||
`@self`: {self:?}",
|
||||
);
|
||||
|
||||
let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return };
|
||||
@@ -287,18 +286,17 @@ impl TransactionFetcher {
|
||||
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
|
||||
);
|
||||
|
||||
max_retried_hashes.push(hash);
|
||||
max_retried_and_evicted_hashes.push(hash);
|
||||
continue;
|
||||
}
|
||||
*retries += 1;
|
||||
}
|
||||
if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) {
|
||||
_ = self.unknown_hashes.remove(&evicted_hash);
|
||||
_ = self.eth68_meta.remove(&evicted_hash);
|
||||
max_retried_and_evicted_hashes.push(evicted_hash);
|
||||
}
|
||||
}
|
||||
|
||||
self.remove_from_unknown_hashes(max_retried_hashes);
|
||||
self.remove_from_unknown_hashes(max_retried_and_evicted_hashes);
|
||||
}
|
||||
|
||||
/// Removes the provided transaction hashes from the inflight requests set.
|
||||
@@ -433,6 +431,21 @@ impl TransactionFetcher {
|
||||
|
||||
*inflight_count += 1;
|
||||
|
||||
debug_assert!(
|
||||
|| -> bool {
|
||||
for hash in &new_announced_hashes {
|
||||
if self.buffered_hashes.contains(hash) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
true
|
||||
}(),
|
||||
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
|
||||
`%new_announced_hashes`: {:?},
|
||||
`@self`: {:?}",
|
||||
new_announced_hashes, self
|
||||
);
|
||||
|
||||
let (response, rx) = oneshot::channel();
|
||||
let req: PeerRequest = PeerRequest::GetPooledTransactions {
|
||||
request: GetPooledTransactions(new_announced_hashes.clone()),
|
||||
@@ -447,28 +460,11 @@ impl TransactionFetcher {
|
||||
// need to do some cleanup so
|
||||
let req = req.into_get_pooled_transactions().expect("is get pooled tx");
|
||||
|
||||
// we know that the peer is the only entry in the map, so we can remove all
|
||||
self.remove_from_unknown_hashes(req.0);
|
||||
metrics_increment_egress_peer_channel_full();
|
||||
return Some(req.0)
|
||||
}
|
||||
}
|
||||
metrics_increment_egress_peer_channel_full();
|
||||
return Some(new_announced_hashes)
|
||||
} else {
|
||||
debug_assert!(
|
||||
|| -> bool {
|
||||
for hash in &new_announced_hashes {
|
||||
if self.buffered_hashes.contains(hash) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
true
|
||||
}(),
|
||||
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
|
||||
`%new_announced_hashes`: {:?},
|
||||
`@self`: {:?}",
|
||||
new_announced_hashes, self
|
||||
);
|
||||
|
||||
// stores a new request future for the request
|
||||
self.inflight_requests.push(GetPooledTxRequestFut::new(
|
||||
peer_id,
|
||||
@@ -686,15 +682,16 @@ impl Stream for TransactionFetcher {
|
||||
return match result {
|
||||
Ok(Ok(transactions)) => {
|
||||
// clear received hashes
|
||||
let mut fetched = Vec::with_capacity(transactions.hashes().count());
|
||||
requested_hashes.retain(|requested_hash| {
|
||||
if transactions.hashes().any(|hash| hash == requested_hash) {
|
||||
// hash is now known, stop tracking
|
||||
self.unknown_hashes.remove(requested_hash);
|
||||
self.eth68_meta.remove(requested_hash);
|
||||
fetched.push(*requested_hash);
|
||||
return false
|
||||
}
|
||||
true
|
||||
});
|
||||
self.remove_from_unknown_hashes(fetched);
|
||||
// buffer left over hashes
|
||||
self.buffer_hashes_for_retry(requested_hashes);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user