fix: add receive budget (#3361)

This commit is contained in:
Matthias Seitz
2023-06-25 15:12:39 +02:00
committed by GitHub
parent 8dd7a78356
commit 05eada7aa2

View File

@@ -455,7 +455,15 @@ impl Future for ActiveSession {
return this.poll_disconnect(cx)
}
loop {
// The receive loop can be CPU intensive since it involves message decoding which could take
// up a lot of resources and increase latencies for other sessions if not yielded manually.
// If the budget is exhausted we manually yield back control to the (coop) scheduler. This
// manual yield point should prevent situations where polling appears to be frozen. See also <https://tokio.rs/blog/2020-04-preemption>
// And tokio's docs on cooperative scheduling <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
let mut budget = 4;
// The main poll loop that drives the session
'main: loop {
let mut progress = false;
// we prioritize incoming commands sent from the session manager
@@ -529,6 +537,14 @@ impl Future for ActiveSession {
// read incoming messages from the wire
'receive: loop {
// ensure we still have enough budget for another iteration
budget -= 1;
if budget == 0 {
// make sure we're woken up again
cx.waker().wake_by_ref();
break 'main
}
// try to resend the pending message that we could not send because the channel was
// full.
if let Some(msg) = this.pending_message_to_session.take() {
@@ -592,21 +608,23 @@ impl Future for ActiveSession {
}
if !progress {
if this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
let _ = this.internal_request_timeout_interval.poll_tick(cx);
// check for timed out requests
if this.check_timed_out_requests(Instant::now()) {
let _ = this.to_session_manager.clone().try_send(
ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id },
);
}
}
this.shrink_to_fit();
return Poll::Pending
break 'main
}
}
if this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
let _ = this.internal_request_timeout_interval.poll_tick(cx);
// check for timed out requests
if this.check_timed_out_requests(Instant::now()) {
let _ = this.to_session_manager.clone().try_send(
ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id },
);
}
}
this.shrink_to_fit();
Poll::Pending
}
}