From b0149f0b9f71b4112d7b11c19ba6f02fd00e583c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 9 Dec 2022 12:39:08 +0100 Subject: [PATCH] fix(net): prevent bad loop if no peers available (#359) * fix(net): prevent bad loop if no peers available * test: add poll fetcher test --- crates/net/network/src/fetch/mod.rs | 53 ++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 594a3c6707..0f6de5763e 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -90,26 +90,33 @@ impl StateFetcher { } /// Returns the next action to return - fn poll_action(&mut self) -> Option { + fn poll_action(&mut self) -> PollAction { + // we only check and not pop here since we don't know yet whether a peer is available. if self.queued_requests.is_empty() { - return None + return PollAction::NoRequests } - let peer_id = *self.next_peer()?.0; + let peer_id = if let Some(peer) = self.next_peer() { + *peer.0 + } else { + return PollAction::NoPeersAvailable + }; let request = self.queued_requests.pop_front().expect("not empty; qed"); let request = self.prepare_block_request(peer_id, request); - Some(FetchAction::BlockRequest { peer_id, request }) + PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) } /// Advance the state the syncer pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { // drain buffered actions first loop { - if let Some(action) = self.poll_action() { - return Poll::Ready(action) - } + let no_peers_available = match self.poll_action() { + PollAction::Ready(action) => return Poll::Ready(action), + PollAction::NoRequests => false, + PollAction::NoPeersAvailable => true, + }; if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) { return Poll::Ready(FetchAction::StatusUpdate(status)) @@ -128,7 +135,7 @@ impl StateFetcher { } } - if self.queued_requests.is_empty() { + if self.queued_requests.is_empty() || no_peers_available { return Poll::Pending } } @@ -244,6 +251,13 @@ impl Default for StateFetcher { } } +/// The outcome of [`StateFetcher::poll_action`] +enum PollAction { + Ready(FetchAction), + NoRequests, + NoPeersAvailable, +} + /// Represents a connected peer struct Peer { /// The state this peer currently resides in. @@ -352,3 +366,26 @@ pub(crate) enum BlockResponseOutcome { /// How to handle a bad response and the reputation change to apply. BadResponse(PeerId, ReputationChangeKind), } + +#[cfg(test)] +mod tests { + use super::*; + use std::future::poll_fn; + + #[tokio::test(flavor = "multi_thread")] + async fn test_poll_fetcher() { + let mut fetcher = StateFetcher::default(); + + poll_fn(move |cx| { + assert!(fetcher.poll(cx).is_pending()); + let (tx, _rx) = oneshot::channel(); + fetcher + .queued_requests + .push_back(DownloadRequest::GetBlockBodies { request: vec![], response: tx }); + assert!(fetcher.poll(cx).is_pending()); + + Poll::Ready(()) + }) + .await; + } +}