diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 21e305467f..180096241a 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -455,7 +455,15 @@ impl Future for ActiveSession { return this.poll_disconnect(cx) } - loop { + // The receive loop can be CPU intensive since it involves message decoding which could take + // up a lot of resources and increase latencies for other sessions if not yielded manually. + // If the budget is exhausted we manually yield back control to the (coop) scheduler. This + // manual yield point should prevent situations where polling appears to be frozen. See also + // And tokio's docs on cooperative scheduling + let mut budget = 4; + + // The main poll loop that drives the session + 'main: loop { let mut progress = false; // we prioritize incoming commands sent from the session manager @@ -529,6 +537,14 @@ impl Future for ActiveSession { // read incoming messages from the wire 'receive: loop { + // ensure we still have enough budget for another iteration + budget -= 1; + if budget == 0 { + // make sure we're woken up again + cx.waker().wake_by_ref(); + break 'main + } + // try to resend the pending message that we could not send because the channel was // full. if let Some(msg) = this.pending_message_to_session.take() { @@ -592,21 +608,23 @@ impl Future for ActiveSession { } if !progress { - if this.internal_request_timeout_interval.poll_tick(cx).is_ready() { - let _ = this.internal_request_timeout_interval.poll_tick(cx); - // check for timed out requests - if this.check_timed_out_requests(Instant::now()) { - let _ = this.to_session_manager.clone().try_send( - ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id }, - ); - } - } - - this.shrink_to_fit(); - - return Poll::Pending + break 'main } } + + if this.internal_request_timeout_interval.poll_tick(cx).is_ready() { + let _ = this.internal_request_timeout_interval.poll_tick(cx); + // check for timed out requests + if this.check_timed_out_requests(Instant::now()) { + let _ = this.to_session_manager.clone().try_send( + ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id }, + ); + } + } + + this.shrink_to_fit(); + + Poll::Pending } }