set pending_frame on main poll,

- so that it's clear we are replacing a None `pending_frame`

change the order of the closing state to allow flushing the pending frames first,
and not override a pending frame with a closing one from the receiver streams
This commit is contained in:
João Oliveira
2024-03-04 12:18:10 +00:00
parent 61003c53ce
commit 297e9eba1f
2 changed files with 35 additions and 47 deletions

View File

@@ -405,8 +405,20 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if self.pending_frame.is_none() {
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
if let Some(stream) = self.on_frame(frame?)? {
return Poll::Ready(Ok(stream));
match self.on_frame(frame?)? {
Action::None => {}
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
return Poll::Ready(Ok(stream));
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frame.replace(f.into());
}
}
continue;
}
@@ -418,15 +430,26 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
match self.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
self.on_send_frame(frame);
log::trace!(
"{}/{}: sending: {}",
self.id,
frame.header().stream_id(),
frame.header()
);
self.pending_frame.replace(frame.into());
continue;
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
self.on_close_stream(id, ack);
log::trace!("{}/{}: sending close", self.id, id);
self.pending_frame
.replace(Frame::close_stream(id, ack).into());
continue;
}
Poll::Ready(Some((id, None))) => {
self.on_drop_stream(id);
if let Some(frame) = self.on_drop_stream(id) {
log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
self.pending_frame.replace(frame);
};
continue;
}
Poll::Ready(None) => {
@@ -464,23 +487,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Poll::Ready(Ok(stream))
}
fn on_send_frame(&mut self, frame: Frame<Either<Data, WindowUpdate>>) {
log::trace!(
"{}/{}: sending: {}",
self.id,
frame.header().stream_id(),
frame.header()
);
self.pending_frame.replace(frame.into());
}
fn on_close_stream(&mut self, id: StreamId, ack: bool) {
log::trace!("{}/{}: sending close", self.id, id);
self.pending_frame
.replace(Frame::close_stream(id, ack).into());
}
fn on_drop_stream(&mut self, stream_id: StreamId) {
fn on_drop_stream(&mut self, stream_id: StreamId) -> Option<Frame<()>> {
let s = self.streams.remove(&stream_id).expect("stream not found");
log::trace!("{}: removing dropped stream {}", self.id, stream_id);
@@ -526,10 +533,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
frame
};
if let Some(f) = frame {
log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header());
self.pending_frame.replace(f.into());
}
frame.map(Into::into)
}
/// Process the result of reading from the socket.
@@ -538,7 +542,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
/// and return a corresponding error, which terminates the connection.
/// Otherwise we process the frame and potentially return a new `Stream`
/// if one was opened by the remote.
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
fn on_frame(&mut self, frame: Frame<()>) -> Result<Action> {
log::trace!("{}: received: {}", self.id, frame.header());
if frame.header().flags().contains(header::ACK)
@@ -561,23 +565,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Tag::Ping => self.on_ping(&frame.into_ping()),
Tag::GoAway => return Err(ConnectionError::Closed),
};
match action {
Action::None => {}
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
return Ok(Some(stream));
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frame.replace(f.into());
}
}
Ok(None)
Ok(action)
}
fn on_data(&mut self, frame: Frame<Data>) -> Action {

View File

@@ -29,7 +29,7 @@ where
socket: Fuse<frame::Io<T>>,
) -> Self {
Self {
state: State::ClosingStreamReceiver,
state: State::FlushingPendingFrames,
stream_receivers,
pending_frame,
socket,
@@ -68,7 +68,7 @@ where
Poll::Pending | Poll::Ready(None) => {
// No more frames from streams, append `Term` frame and flush them all.
this.pending_frame.replace(Frame::term().into());
this.state = State::FlushingPendingFrames;
this.state = State::ClosingSocket;
continue;
}
}
@@ -78,7 +78,7 @@ where
match this.pending_frame.take() {
Some(frame) => this.socket.start_send_unpin(frame)?,
None => this.state = State::ClosingSocket,
None => this.state = State::ClosingStreamReceiver,
}
}
State::ClosingSocket => {