use pending_read_frame and pending_write_frame separately

This commit is contained in:
João Oliveira
2024-03-13 14:24:56 +00:00
parent 297e9eba1f
commit 286728026d
2 changed files with 36 additions and 19 deletions

View File

@@ -31,6 +31,7 @@ use futures::stream::SelectAll;
use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
use nohash_hasher::IntMap;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::task::{Context, Waker};
use std::{fmt, sync::Arc, task::Poll};
@@ -285,7 +286,8 @@ struct Active<T> {
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
no_streams_waker: Option<Waker>,
pending_frame: Option<Frame<()>>,
pending_read_frame: Option<Frame<()>>,
pending_write_frame: Option<Frame<()>>,
new_outbound_stream_waker: Option<Waker>,
rtt: rtt::Rtt,
@@ -359,7 +361,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Mode::Client => 1,
Mode::Server => 2,
},
pending_frame: None,
pending_read_frame: None,
pending_write_frame: None,
new_outbound_stream_waker: None,
rtt: rtt::Rtt::new(),
accumulated_max_stream_windows: Default::default(),
@@ -368,7 +371,12 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
/// Gracefully close the connection to the remote.
fn close(self) -> Closing<T> {
Closing::new(self.stream_receivers, self.pending_frame, self.socket)
let pending_frames = self
.pending_read_frame
.into_iter()
.chain(self.pending_write_frame)
.collect::<VecDeque<Frame<()>>>();
Closing::new(self.stream_receivers, pending_frames, self.socket)
}
/// Cleanup all our resources.
@@ -391,7 +399,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
continue;
}
if let Some(frame) = self.pending_frame.take() {
// Privilege pending `Pong` and `GoAway` `Frame`s
// over `Frame`s from the receivers.
if let Some(frame) = self.pending_read_frame.take() {
self.socket.start_send_unpin(frame)?;
continue;
}
if let Some(frame) = self.pending_write_frame.take() {
self.socket.start_send_unpin(frame)?;
continue;
}
@@ -402,7 +417,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Poll::Pending => {}
}
if self.pending_frame.is_none() {
if self.pending_read_frame.is_none() {
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
match self.on_frame(frame?)? {
@@ -413,11 +428,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frame.replace(f.into());
self.pending_read_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frame.replace(f.into());
self.pending_read_frame.replace(f.into());
}
}
continue;
@@ -427,7 +442,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
Poll::Pending => {}
}
}
if self.pending_write_frame.is_none() {
match self.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
log::trace!(
@@ -436,19 +452,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
frame.header().stream_id(),
frame.header()
);
self.pending_frame.replace(frame.into());
self.pending_write_frame.replace(frame.into());
continue;
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
log::trace!("{}/{}: sending close", self.id, id);
self.pending_frame
self.pending_write_frame
.replace(Frame::close_stream(id, ack).into());
continue;
}
Poll::Ready(Some((id, None))) => {
if let Some(frame) = self.on_drop_stream(id) {
log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
self.pending_frame.replace(frame);
self.pending_write_frame.replace(frame);
};
continue;
}

View File

@@ -6,6 +6,7 @@ use crate::{frame, StreamId};
use futures::channel::mpsc;
use futures::stream::{Fuse, SelectAll};
use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -15,7 +16,7 @@ use std::task::{Context, Poll};
pub struct Closing<T> {
state: State,
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
pending_frame: Option<Frame<()>>,
pending_frames: VecDeque<Frame<()>>,
socket: Fuse<frame::Io<T>>,
}
@@ -25,13 +26,13 @@ where
{
pub(crate) fn new(
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
pending_frame: Option<Frame<()>>,
pending_frames: VecDeque<Frame<()>>,
socket: Fuse<frame::Io<T>>,
) -> Self {
Self {
state: State::FlushingPendingFrames,
stream_receivers,
pending_frame,
pending_frames,
socket,
}
}
@@ -58,16 +59,16 @@ where
State::DrainingStreamReceiver => {
match this.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
this.pending_frame.replace(frame.into());
this.pending_frames.push_back(frame.into());
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
this.pending_frame
.replace(Frame::close_stream(id, ack).into());
this.pending_frames
.push_back(Frame::close_stream(id, ack).into());
}
Poll::Ready(Some((_, None))) => {}
Poll::Pending | Poll::Ready(None) => {
// No more frames from streams, append `Term` frame and flush them all.
this.pending_frame.replace(Frame::term().into());
this.pending_frames.push_back(Frame::term().into());
this.state = State::ClosingSocket;
continue;
}
@@ -76,7 +77,7 @@ where
State::FlushingPendingFrames => {
ready!(this.socket.poll_ready_unpin(cx))?;
match this.pending_frame.take() {
match this.pending_frames.pop_front() {
Some(frame) => this.socket.start_send_unpin(frame)?,
None => this.state = State::ClosingStreamReceiver,
}