This commit is contained in:
Sean Matthew
2023-10-24 19:02:12 -04:00
parent 7c9e33ab14
commit ab8a96fb20

View File

@@ -1031,6 +1031,7 @@ struct Peer {
/// The type responsible for fetching missing transactions from peers.
#[derive(Debug, Default)]
#[warn(unused_variables)]
struct TransactionFetcher {
/// All currently active requests for pooled transactions.
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
@@ -1041,6 +1042,25 @@ struct TransactionFetcher {
// === impl TransactionFetcher ===
impl TransactionFetcher {
/// TODO: Implement the re-request logic...
fn re_request_hashes(&mut self, hashes: Vec<TxHash>, failed_peer: PeerId) {
for hash in &hashes {
if let Some(peers) = self.inflight_hash_to_fallback_peers.get_mut(hash) {
peers.retain(|&peer| peer != failed_peer); // Remove the failed peer from fallback peers for this hash.
if let Some(next_peer) = peers.pop() {
// Schedule a re-request using next_peer.
}
}
}
}
/// Removes the specified hashes from inflight tracking.
fn remove_inflight_hashes(&mut self, hashes: &[TxHash]) {
for hash in hashes {
self.inflight_hash_to_fallback_peers.remove(hash);
}
}
/// Advances all inflight requests and returns the next event.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchEvent> {
if let Poll::Ready(Some(GetPooledTxResponse { peer_id, request, result })) =
@@ -1049,12 +1069,19 @@ impl TransactionFetcher {
return match result {
Ok(Ok(txs)) => {
// clear received hashes
for tx in txs.0.iter() {
self.inflight_hash_to_fallback_peers.remove(tx.hash());
}
let received_hashes: Vec<TxHash> =
txs.0.iter().map(|tx| tx.hash().clone()).collect();
self.remove_inflight_hashes(&received_hashes);
// TODO - check if we need to re-request any of the hashes missing from the
// check if we need to re-request any of the hashes missing from the
// response but present in the request
let missing_hashes: Vec<TxHash> = request
.iter()
.filter(|&&hash| !received_hashes.contains(&hash))
.cloned()
.collect();
self.re_request_hashes(missing_hashes, peer_id);
Poll::Ready(FetchEvent::TransactionsFetched {
peer_id,
@@ -1062,15 +1089,13 @@ impl TransactionFetcher {
})
}
Ok(Err(req_err)) => {
// TODO: also clear the hashes from the inflight hashes and schedule a
// re-request for alternatives
self.remove_inflight_hashes(&request);
self.re_request_hashes(request, peer_id);
Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err })
}
Err(_) => {
// TODO: we need to change the error type of `GetPooledTxResponse` so that it
// always returns the request object so we know what hashes were requested and
// can do cleanup
self.remove_inflight_hashes(&request);
self.re_request_hashes(request, peer_id);
// request channel closed/dropped
Poll::Ready(FetchEvent::FetchError {
peer_id,