feat: switch pending_frames

VecDequeue for an Option to bound it
This commit is contained in:
João Oliveira
2024-02-28 22:59:14 +00:00
parent cf6456f55f
commit 61003c53ce
3 changed files with 48 additions and 48 deletions

View File

@@ -57,7 +57,7 @@ fn concurrent_streams() {
const PAYLOAD_SIZE: usize = 128 * 1024;
let data = Msg(vec![0x42; PAYLOAD_SIZE]);
let n_streams = 1000;
let n_streams = 512;
let mut cfg = Config::default();
cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test.

View File

@@ -31,7 +31,6 @@ use futures::stream::SelectAll;
use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
use nohash_hasher::IntMap;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::task::{Context, Waker};
use std::{fmt, sync::Arc, task::Poll};
@@ -286,7 +285,7 @@ struct Active<T> {
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
no_streams_waker: Option<Waker>,
pending_frames: VecDeque<Frame<()>>,
pending_frame: Option<Frame<()>>,
new_outbound_stream_waker: Option<Waker>,
rtt: rtt::Rtt,
@@ -360,7 +359,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Mode::Client => 1,
Mode::Server => 2,
},
pending_frames: VecDeque::default(),
pending_frame: None,
new_outbound_stream_waker: None,
rtt: rtt::Rtt::new(),
accumulated_max_stream_windows: Default::default(),
@@ -369,7 +368,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
/// Gracefully close the connection to the remote.
fn close(self) -> Closing<T> {
Closing::new(self.stream_receivers, self.pending_frames, self.socket)
Closing::new(self.stream_receivers, self.pending_frame, self.socket)
}
/// Cleanup all our resources.
@@ -392,7 +391,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
continue;
}
if let Some(frame) = self.pending_frames.pop_front() {
if let Some(frame) = self.pending_frame.take() {
self.socket.start_send_unpin(frame)?;
continue;
}
@@ -403,36 +402,38 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
Poll::Pending => {}
}
match self.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
self.on_send_frame(frame);
continue;
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
self.on_close_stream(id, ack);
continue;
}
Poll::Ready(Some((id, None))) => {
self.on_drop_stream(id);
continue;
}
Poll::Ready(None) => {
self.no_streams_waker = Some(cx.waker().clone());
}
Poll::Pending => {}
}
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
if let Some(stream) = self.on_frame(frame?)? {
return Poll::Ready(Ok(stream));
if self.pending_frame.is_none() {
match self.socket.poll_next_unpin(cx) {
Poll::Ready(Some(frame)) => {
if let Some(stream) = self.on_frame(frame?)? {
return Poll::Ready(Ok(stream));
}
continue;
}
continue;
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed));
}
Poll::Pending => {}
}
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed));
match self.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
self.on_send_frame(frame);
continue;
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
self.on_close_stream(id, ack);
continue;
}
Poll::Ready(Some((id, None))) => {
self.on_drop_stream(id);
continue;
}
Poll::Ready(None) => {
self.no_streams_waker = Some(cx.waker().clone());
}
Poll::Pending => {}
}
Poll::Pending => {}
}
// If we make it this far, at least one of the above must have registered a waker.
@@ -470,13 +471,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
frame.header().stream_id(),
frame.header()
);
self.pending_frames.push_back(frame.into());
self.pending_frame.replace(frame.into());
}
fn on_close_stream(&mut self, id: StreamId, ack: bool) {
log::trace!("{}/{}: sending close", self.id, id);
self.pending_frames
.push_back(Frame::close_stream(id, ack).into());
self.pending_frame
.replace(Frame::close_stream(id, ack).into());
}
fn on_drop_stream(&mut self, stream_id: StreamId) {
@@ -527,7 +528,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
};
if let Some(f) = frame {
log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header());
self.pending_frames.push_back(f.into());
self.pending_frame.replace(f.into());
}
}
@@ -568,11 +569,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
self.pending_frame.replace(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frames.push_back(f.into());
self.pending_frame.replace(f.into());
}
}

View File

@@ -6,7 +6,6 @@ use crate::{frame, StreamId};
use futures::channel::mpsc;
use futures::stream::{Fuse, SelectAll};
use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -16,7 +15,7 @@ use std::task::{Context, Poll};
pub struct Closing<T> {
state: State,
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
pending_frames: VecDeque<Frame<()>>,
pending_frame: Option<Frame<()>>,
socket: Fuse<frame::Io<T>>,
}
@@ -26,13 +25,13 @@ where
{
pub(crate) fn new(
stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
pending_frames: VecDeque<Frame<()>>,
pending_frame: Option<Frame<()>>,
socket: Fuse<frame::Io<T>>,
) -> Self {
Self {
state: State::ClosingStreamReceiver,
stream_receivers,
pending_frames,
pending_frame,
socket,
}
}
@@ -59,16 +58,16 @@ where
State::DrainingStreamReceiver => {
match this.stream_receivers.poll_next_unpin(cx) {
Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
this.pending_frames.push_back(frame.into())
this.pending_frame.replace(frame.into());
}
Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
this.pending_frames
.push_back(Frame::close_stream(id, ack).into());
this.pending_frame
.replace(Frame::close_stream(id, ack).into());
}
Poll::Ready(Some((_, None))) => {}
Poll::Pending | Poll::Ready(None) => {
// No more frames from streams, append `Term` frame and flush them all.
this.pending_frames.push_back(Frame::term().into());
this.pending_frame.replace(Frame::term().into());
this.state = State::FlushingPendingFrames;
continue;
}
@@ -77,7 +76,7 @@ where
State::FlushingPendingFrames => {
ready!(this.socket.poll_ready_unpin(cx))?;
match this.pending_frames.pop_front() {
match this.pending_frame.take() {
Some(frame) => this.socket.start_send_unpin(frame)?,
None => this.state = State::ClosingSocket,
}