mirror of
https://github.com/tlsnotary/tlsn-utils.git
synced 2026-01-08 20:28:06 -05:00
refactor(futures-plex): ringbuffer and slice apis (#83)
* feat(futures-plex): replace BytesMut with fixed-size buffer and add zero-copy APIs - Remove bytes crate dependency - Use Box<[u8]> with ptr/len tracking instead of BytesMut - Add AsyncBufRead implementation for SimplexStream - Add poll_get, poll_mut, get, get_mut for direct buffer access - Add advance, advance_mut for cursor management - Add poll_read_to, poll_write_from for zero-copy transfers - Add ReadGuard/WriteGuard and poll_lock methods for DuplexStream - Add unit tests for new functionality Co-Authored-By: sinu <65924192+sinui0@users.noreply.github.com> * make it a ring buffer --------- Co-authored-by: sinu <65924192+sinui0@users.noreply.github.com>
This commit is contained in:
@@ -5,5 +5,4 @@ edition = "2024"
|
||||
description = "Port of tokio's `SimplexStream` and `DuplexStream` for the `futures` ecosystem."
|
||||
|
||||
[dependencies]
|
||||
bytes = { version = "1" }
|
||||
futures = { version = "0.3", default-features = false, features = ["unstable", "bilock", "executor"] }
|
||||
|
||||
@@ -4,21 +4,57 @@
|
||||
use futures::{
|
||||
AsyncRead, AsyncWrite,
|
||||
io::{IoSlice, IoSliceMut},
|
||||
lock::BiLock,
|
||||
lock::{BiLock, BiLockGuard},
|
||||
};
|
||||
|
||||
use std::{
|
||||
fmt,
|
||||
io::{self},
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
task::{Context, Poll, ready},
|
||||
};
|
||||
|
||||
use crate::SimplexStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReadGuard<'a, T>(pub(crate) BiLockGuard<'a, T>);
|
||||
|
||||
impl<'a, T> Deref for ReadGuard<'a, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Unpin> DerefMut for ReadGuard<'a, T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_pin_mut().get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
/// The readable half of an object.
|
||||
#[derive(Debug)]
|
||||
pub struct ReadHalf<T> {
|
||||
handle: BiLock<T>,
|
||||
pub(crate) handle: BiLock<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WriteGuard<'a, T>(pub(crate) BiLockGuard<'a, T>);
|
||||
|
||||
impl<'a, T> Deref for WriteGuard<'a, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Unpin> DerefMut for WriteGuard<'a, T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_pin_mut().get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
/// The writable half of an object.
|
||||
@@ -41,6 +77,12 @@ pub(crate) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<
|
||||
}
|
||||
|
||||
impl<T> ReadHalf<T> {
|
||||
/// Attempt to acquire a lock on the read side, returning `Poll::Pending` if
|
||||
/// it can't be acquired.
|
||||
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<ReadGuard<'_, T>> {
|
||||
self.handle.poll_lock(cx).map(ReadGuard)
|
||||
}
|
||||
|
||||
/// Checks if this `ReadHalf` and some `WriteHalf` were split from the same
|
||||
/// stream.
|
||||
pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool {
|
||||
@@ -60,6 +102,12 @@ impl<T: Unpin> ReadHalf<T> {
|
||||
}
|
||||
|
||||
impl<T> WriteHalf<T> {
|
||||
/// Attempt to acquire a lock on the write side, returning `Poll::Pending`
|
||||
/// if it can't be acquired.
|
||||
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<WriteGuard<'_, T>> {
|
||||
self.handle.poll_lock(cx).map(WriteGuard)
|
||||
}
|
||||
|
||||
/// Checks if this `WriteHalf` and some `ReadHalf` were split from the same
|
||||
/// stream.
|
||||
pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
|
||||
|
||||
@@ -2,15 +2,15 @@
|
||||
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
pin::Pin,
|
||||
task::{self, Poll, Waker},
|
||||
pin::{Pin, pin},
|
||||
task::{self, Poll, Waker, ready},
|
||||
};
|
||||
|
||||
use bytes::{Buf, BytesMut};
|
||||
use futures::{AsyncRead, AsyncWrite, future::poll_fn};
|
||||
use futures::{AsyncBufRead, AsyncRead, AsyncWrite, future::poll_fn, io};
|
||||
|
||||
mod half;
|
||||
pub use half::{ReadHalf, WriteHalf};
|
||||
|
||||
pub use half::{ReadGuard, ReadHalf, WriteGuard, WriteHalf};
|
||||
|
||||
/// A bidirectional pipe to read and write bytes in memory.
|
||||
///
|
||||
@@ -53,57 +53,38 @@ pub struct DuplexStream {
|
||||
}
|
||||
|
||||
impl DuplexStream {
|
||||
/// Returns the number of bytes that can be read.
|
||||
pub fn remaining(&self) -> usize {
|
||||
self.read.remaining()
|
||||
/// Read data from this duplex into the provided writer.
|
||||
pub fn poll_read_to<W: AsyncWrite + Unpin>(
|
||||
&self,
|
||||
cx: &mut task::Context<'_>,
|
||||
wr: W,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.poll_lock_read(cx)).poll_read_to(cx, wr)
|
||||
}
|
||||
|
||||
/// Returns the number of bytes that can be written.
|
||||
pub fn remaining_mut(&self) -> usize {
|
||||
self.write.remaining_mut()
|
||||
/// Write data from the provided reader into this duplex.
|
||||
pub fn poll_write_from<R: AsyncRead + Unpin>(
|
||||
&self,
|
||||
cx: &mut task::Context<'_>,
|
||||
rd: R,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.poll_lock_write(cx)).poll_write_from(cx, rd)
|
||||
}
|
||||
}
|
||||
|
||||
/// A unidirectional pipe to read and write bytes in memory.
|
||||
///
|
||||
/// It can be constructed by [`simplex`] function which will create a pair of
|
||||
/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will
|
||||
/// create a handle for both reading and writing.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # async fn ex() -> std::io::Result<()> {
|
||||
/// # use futures::{AsyncReadExt, AsyncWriteExt};
|
||||
/// let (mut receiver, mut sender) = futures_plex::simplex(64);
|
||||
///
|
||||
/// sender.write_all(b"ping").await?;
|
||||
///
|
||||
/// let mut buf = [0u8; 4];
|
||||
/// receiver.read_exact(&mut buf).await?;
|
||||
/// assert_eq!(&buf, b"ping");
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct SimplexStream {
|
||||
/// The buffer storing the bytes written, also read from.
|
||||
///
|
||||
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
|
||||
/// functionality already. Additionally, it can try to copy data in the
|
||||
/// same buffer if there read index has advanced far enough.
|
||||
buffer: BytesMut,
|
||||
/// Determines if the write side has been closed.
|
||||
is_closed: bool,
|
||||
/// The maximum amount of bytes that can be written before returning
|
||||
/// `Poll::Pending`.
|
||||
max_buf_size: usize,
|
||||
/// If the `read` side has been polled and is pending, this is the waker
|
||||
/// for that parked task.
|
||||
read_waker: Option<Waker>,
|
||||
/// If the `write` side has filled the `max_buf_size` and returned
|
||||
/// `Poll::Pending`, this is the waker for that parked task.
|
||||
write_waker: Option<Waker>,
|
||||
/// Attempt to acquire a lock on the read side, returning `Poll::Pending`
|
||||
/// if it can't be acquired.
|
||||
pub fn poll_lock_read(&self, cx: &mut task::Context<'_>) -> Poll<ReadGuard<'_, SimplexStream>> {
|
||||
self.read.poll_lock(cx)
|
||||
}
|
||||
|
||||
/// Attempt to acquire a lock on the write side, returning `Poll::Pending`
|
||||
/// if it can't be acquired.
|
||||
pub fn poll_lock_write(
|
||||
&self,
|
||||
cx: &mut task::Context<'_>,
|
||||
) -> Poll<WriteGuard<'_, SimplexStream>> {
|
||||
self.write.poll_lock(cx)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl DuplexStream =====
|
||||
@@ -140,7 +121,7 @@ impl AsyncRead for DuplexStream {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.read).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
@@ -151,7 +132,7 @@ impl AsyncWrite for DuplexStream {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.write).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
@@ -164,40 +145,73 @@ impl AsyncWrite for DuplexStream {
|
||||
}
|
||||
|
||||
#[allow(unused_mut)]
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.write).poll_flush(cx)
|
||||
}
|
||||
|
||||
#[allow(unused_mut)]
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.write).poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for DuplexStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf));
|
||||
futures::executor::block_on(fut)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for DuplexStream {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf));
|
||||
futures::executor::block_on(fut)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A unidirectional pipe to read and write bytes in memory.
|
||||
///
|
||||
/// It can be constructed by [`simplex`] function which will create a pair of
|
||||
/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will
|
||||
/// create a handle for both reading and writing.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # async fn ex() -> std::io::Result<()> {
|
||||
/// # use futures::{AsyncReadExt, AsyncWriteExt};
|
||||
/// let (mut receiver, mut sender) = futures_plex::simplex(64);
|
||||
///
|
||||
/// sender.write_all(b"ping").await?;
|
||||
///
|
||||
/// let mut buf = [0u8; 4];
|
||||
/// receiver.read_exact(&mut buf).await?;
|
||||
/// assert_eq!(&buf, b"ping");
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct SimplexStream {
|
||||
// The buffer storing the bytes written, also read from.
|
||||
buffer: Box<[u8]>,
|
||||
// Pointer to the next byte in the buffer.
|
||||
ptr: usize,
|
||||
// Number of bytes in the buffer.
|
||||
len: usize,
|
||||
// Determines if the write side has been closed.
|
||||
is_closed: bool,
|
||||
// If the `read` side has been polled and is pending, this is the waker
|
||||
// for that parked task.
|
||||
read_waker: Option<Waker>,
|
||||
// If the `write` side has filled the `max_buf_size` and returned
|
||||
// `Poll::Pending`, this is the waker for that parked task.
|
||||
write_waker: Option<Waker>,
|
||||
}
|
||||
|
||||
// ===== impl SimplexStream =====
|
||||
|
||||
/// Creates unidirectional buffer that acts like in memory pipe.
|
||||
@@ -234,13 +248,14 @@ impl SimplexStream {
|
||||
/// split version with separate reader and writer you can use
|
||||
/// [`simplex`] function.
|
||||
///
|
||||
/// The `max_buf_size` argument is the maximum amount of bytes that can be
|
||||
/// The `buf_size` argument is the maximum amount of bytes that can be
|
||||
/// written to a buffer before the it returns `Poll::Pending`.
|
||||
pub fn new_unsplit(max_buf_size: usize) -> SimplexStream {
|
||||
pub fn new_unsplit(buf_size: usize) -> SimplexStream {
|
||||
SimplexStream {
|
||||
buffer: BytesMut::new(),
|
||||
buffer: vec![0; buf_size].into_boxed_slice(),
|
||||
ptr: 0,
|
||||
len: 0,
|
||||
is_closed: false,
|
||||
max_buf_size,
|
||||
read_waker: None,
|
||||
write_waker: None,
|
||||
}
|
||||
@@ -248,12 +263,158 @@ impl SimplexStream {
|
||||
|
||||
/// Returns the number of bytes that can be read from this buffer.
|
||||
pub fn remaining(&self) -> usize {
|
||||
self.buffer.remaining()
|
||||
self.len
|
||||
}
|
||||
|
||||
/// Returns the number of bytes that can be written into this buffer.
|
||||
pub fn remaining_mut(&self) -> usize {
|
||||
self.max_buf_size - self.buffer.remaining()
|
||||
self.buffer.len() - self.len
|
||||
}
|
||||
|
||||
/// Returns a reference to the first contiguous chunk of data in the buffer.
|
||||
///
|
||||
/// For a ring buffer, data may wrap around. This returns only the first
|
||||
/// contiguous chunk. Call again after `advance()` to get remaining data.
|
||||
pub fn get(&self) -> &[u8] {
|
||||
if self.len == 0 {
|
||||
return &[];
|
||||
}
|
||||
let cap = self.buffer.len();
|
||||
let end = self.ptr + self.len;
|
||||
if end <= cap {
|
||||
&self.buffer[self.ptr..end]
|
||||
} else {
|
||||
// Data wraps, return first contiguous chunk
|
||||
&self.buffer[self.ptr..cap]
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the data in the buffer when data is available.
|
||||
pub fn poll_get(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
if self.remaining() > 0 {
|
||||
Poll::Ready(Ok(self.get()))
|
||||
} else if self.is_closed {
|
||||
Poll::Ready(Ok(&[]))
|
||||
} else {
|
||||
self.read_waker = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a mutable slice to the first contiguous chunk of available
|
||||
/// capacity in the buffer.
|
||||
///
|
||||
/// For a ring buffer, available space may wrap around. This returns only
|
||||
/// the first contiguous chunk. Call again after `advance_mut()` to get
|
||||
/// remaining space.
|
||||
pub fn get_mut(&mut self) -> &mut [u8] {
|
||||
let cap = self.buffer.len();
|
||||
let avail = cap - self.len;
|
||||
if avail == 0 {
|
||||
return &mut [];
|
||||
}
|
||||
let tail = (self.ptr + self.len) % cap;
|
||||
if tail < self.ptr {
|
||||
// Tail wrapped around, contiguous space is tail..ptr
|
||||
&mut self.buffer[tail..self.ptr]
|
||||
} else {
|
||||
// Tail is at or after ptr, contiguous space is tail..cap
|
||||
&mut self.buffer[tail..cap]
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the available space in the buffer when
|
||||
/// there is space available.
|
||||
pub fn poll_mut(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<&mut [u8]>> {
|
||||
if self.is_closed {
|
||||
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
|
||||
}
|
||||
let avail = self.remaining_mut();
|
||||
if avail == 0 {
|
||||
self.write_waker = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(Ok(self.get_mut()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Advances the read cursor.
|
||||
///
|
||||
/// This method should be called after reading from the simplex using
|
||||
/// [`get`](Self::get) or [`poll_get`](Self::poll_get) to advance the read
|
||||
/// cursor.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the provided amount exceeds the data in the buffer.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `amt` - The number of bytes read.
|
||||
pub fn advance(&mut self, amt: usize) {
|
||||
if amt == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
assert!(amt <= self.len, "out of bounds");
|
||||
self.ptr = (self.ptr + amt) % self.buffer.len();
|
||||
self.len -= amt;
|
||||
|
||||
// Wake the writer side now that space is available.
|
||||
if let Some(waker) = self.write_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Advances the write cursor.
|
||||
///
|
||||
/// This method should be called after writing to the simplex using
|
||||
/// [`get_mut`](Self::get_mut) or [`poll_mut`](Self::poll_mut) to advance
|
||||
/// the write cursor.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the provided amount exceeds the spare capacity.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `amt` - The number of bytes written.
|
||||
pub fn advance_mut(&mut self, amt: usize) {
|
||||
if amt == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
assert!(self.len + amt <= self.buffer.len(), "out of bounds");
|
||||
self.len += amt;
|
||||
|
||||
// Wake the read side now that data is available.
|
||||
if let Some(waker) = self.read_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Read data from this simplex into the provided writer.
|
||||
pub fn poll_read_to<W: AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
cx: &mut task::Context<'_>,
|
||||
wr: W,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let buf = ready!(self.poll_get(cx))?;
|
||||
let len = ready!(pin!(wr).poll_write(cx, buf))?;
|
||||
self.advance(len);
|
||||
Poll::Ready(Ok(len))
|
||||
}
|
||||
|
||||
/// Write data from the provided reader into this simplex.
|
||||
pub fn poll_write_from<R: AsyncRead + Unpin>(
|
||||
&mut self,
|
||||
cx: &mut task::Context<'_>,
|
||||
rd: R,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let buf = ready!(self.poll_mut(cx))?;
|
||||
let len = ready!(pin!(rd).poll_read(cx, buf))?;
|
||||
self.advance_mut(len);
|
||||
Poll::Ready(Ok(len))
|
||||
}
|
||||
|
||||
fn close_write(&mut self) {
|
||||
@@ -268,46 +429,35 @@ impl SimplexStream {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
if self.buffer.has_remaining() {
|
||||
let len = self.buffer.remaining().min(buf.len());
|
||||
buf[..len].copy_from_slice(&self.buffer[..len]);
|
||||
self.buffer.advance(len);
|
||||
if len > 0 {
|
||||
// The passed `buf` might have been empty, don't wake up if
|
||||
// no bytes have been moved.
|
||||
if let Some(waker) = self.write_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(len))
|
||||
} else if self.is_closed {
|
||||
Poll::Ready(Ok(0))
|
||||
} else {
|
||||
self.read_waker = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let src = ready!(self.poll_get(cx))?;
|
||||
|
||||
if src.is_empty() {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
let len = buf.len().min(src.len());
|
||||
buf[..len].copy_from_slice(&src[..len]);
|
||||
self.advance(len);
|
||||
|
||||
Poll::Ready(Ok(len))
|
||||
}
|
||||
|
||||
fn poll_write_internal(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
if self.is_closed {
|
||||
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
|
||||
}
|
||||
let avail = self.max_buf_size - self.buffer.len();
|
||||
if avail == 0 {
|
||||
self.write_waker = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
let dest = ready!(self.poll_mut(cx))?;
|
||||
debug_assert!(
|
||||
!dest.is_empty(),
|
||||
"returned ready when no space is available"
|
||||
);
|
||||
|
||||
let len = buf.len().min(dest.len());
|
||||
dest[..len].copy_from_slice(&buf[..len]);
|
||||
self.advance_mut(len);
|
||||
|
||||
let len = buf.len().min(avail);
|
||||
self.buffer.extend_from_slice(&buf[..len]);
|
||||
if let Some(waker) = self.read_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
Poll::Ready(Ok(len))
|
||||
}
|
||||
|
||||
@@ -316,30 +466,29 @@ impl SimplexStream {
|
||||
cx: &mut task::Context<'_>,
|
||||
bufs: &[std::io::IoSlice<'_>],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
if self.is_closed {
|
||||
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
|
||||
}
|
||||
let avail = self.max_buf_size - self.buffer.len();
|
||||
if avail == 0 {
|
||||
self.write_waker = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
let mut dest = ready!(self.poll_mut(cx))?;
|
||||
debug_assert!(
|
||||
!dest.is_empty(),
|
||||
"returned ready when no space is available"
|
||||
);
|
||||
|
||||
let mut rem = avail;
|
||||
let avail = dest.len();
|
||||
let mut amt = 0;
|
||||
for buf in bufs {
|
||||
if rem == 0 {
|
||||
if amt >= avail {
|
||||
break;
|
||||
}
|
||||
|
||||
let len = buf.len().min(rem);
|
||||
self.buffer.extend_from_slice(&buf[..len]);
|
||||
rem -= len;
|
||||
let len = buf.len().min(dest.len());
|
||||
dest[..len].copy_from_slice(&buf[..len]);
|
||||
|
||||
dest = &mut dest[len..];
|
||||
amt += len;
|
||||
}
|
||||
|
||||
if let Some(waker) = self.read_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
Poll::Ready(Ok(avail - rem))
|
||||
self.advance_mut(amt);
|
||||
|
||||
Poll::Ready(Ok(amt))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,17 +497,27 @@ impl AsyncRead for SimplexStream {
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_read_internal(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncBufRead for SimplexStream {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
Pin::get_mut(self).poll_get(cx)
|
||||
}
|
||||
|
||||
fn consume(mut self: Pin<&mut Self>, amt: usize) {
|
||||
self.advance(amt);
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for SimplexStream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_write_internal(cx, buf)
|
||||
}
|
||||
|
||||
@@ -366,37 +525,213 @@ impl AsyncWrite for SimplexStream {
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
bufs: &[std::io::IoSlice<'_>],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_write_vectored_internal(cx, bufs)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
fn poll_close(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.close_write();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for SimplexStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf));
|
||||
futures::executor::block_on(fut)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for SimplexStream {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf));
|
||||
futures::executor::block_on(fut)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_simplex_no_capacity() {
|
||||
let mut s = SimplexStream::new_unsplit(0);
|
||||
assert!(s.get().is_empty());
|
||||
assert!(s.get_mut().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplex() {
|
||||
let mut s = SimplexStream::new_unsplit(8);
|
||||
assert!(s.get().is_empty());
|
||||
assert_eq!(s.get_mut().len(), 8);
|
||||
assert_eq!(s.get_mut(), &vec![0; 8]);
|
||||
|
||||
s.get_mut().copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
|
||||
// write 2 bytes
|
||||
s.advance_mut(2);
|
||||
assert_eq!(s.remaining(), 2);
|
||||
assert_eq!(s.remaining_mut(), 6);
|
||||
assert_eq!(s.get(), &[0, 1]);
|
||||
assert_eq!(s.get_mut(), &[2, 3, 4, 5, 6, 7]);
|
||||
|
||||
// read 1 byte
|
||||
s.advance(1);
|
||||
assert_eq!(s.remaining(), 1);
|
||||
// space is reclaimed immediately with ring buffer
|
||||
assert_eq!(s.remaining_mut(), 7);
|
||||
|
||||
// write the rest of the bytes
|
||||
s.advance_mut(6);
|
||||
assert_eq!(s.get(), &[1, 2, 3, 4, 5, 6, 7]);
|
||||
// read everything out
|
||||
s.advance(7);
|
||||
|
||||
assert!(s.get().is_empty());
|
||||
assert_eq!(s.get_mut().len(), 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_to() {
|
||||
let mut s0 = SimplexStream::new_unsplit(16);
|
||||
let mut s1 = SimplexStream::new_unsplit(8);
|
||||
|
||||
s0.advance_mut(8);
|
||||
assert_eq!(s0.remaining(), 8);
|
||||
|
||||
let waker = Waker::noop();
|
||||
let mut cx = task::Context::from_waker(waker);
|
||||
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready());
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
|
||||
s1.advance(8);
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
|
||||
s0.advance_mut(4);
|
||||
assert_eq!(s0.remaining(), 4);
|
||||
|
||||
assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready());
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 4);
|
||||
|
||||
s1.advance(4);
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
|
||||
// With ring buffer, data wraps around and may need multiple reads
|
||||
s0.advance_mut(16);
|
||||
assert_eq!(s0.remaining(), 16);
|
||||
|
||||
// Transfer in chunks due to ring buffer wrap-around
|
||||
while s0.remaining() > 0 && s1.remaining_mut() > 0 {
|
||||
assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready());
|
||||
}
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
s1.advance(8);
|
||||
|
||||
while s0.remaining() > 0 && s1.remaining_mut() > 0 {
|
||||
assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready());
|
||||
}
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_from() {
|
||||
let mut s0 = SimplexStream::new_unsplit(16);
|
||||
let mut s1 = SimplexStream::new_unsplit(8);
|
||||
|
||||
s0.advance_mut(8);
|
||||
assert_eq!(s0.remaining(), 8);
|
||||
|
||||
let waker = Waker::noop();
|
||||
let mut cx = task::Context::from_waker(waker);
|
||||
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready());
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
|
||||
s1.advance(8);
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
|
||||
s0.advance_mut(4);
|
||||
assert_eq!(s0.remaining(), 4);
|
||||
|
||||
assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready());
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 4);
|
||||
|
||||
s1.advance(4);
|
||||
assert_eq!(s1.remaining(), 0);
|
||||
|
||||
// With ring buffer, data wraps around and may need multiple writes
|
||||
s0.advance_mut(16);
|
||||
assert_eq!(s0.remaining(), 16);
|
||||
|
||||
// Transfer in chunks due to ring buffer wrap-around
|
||||
while s0.remaining() > 0 && s1.remaining_mut() > 0 {
|
||||
assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready());
|
||||
}
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
s1.advance(8);
|
||||
|
||||
while s0.remaining() > 0 && s1.remaining_mut() > 0 {
|
||||
assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready());
|
||||
}
|
||||
assert_eq!(s0.remaining(), 0);
|
||||
assert_eq!(s1.remaining(), 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ring_buffer_wrap() {
|
||||
let mut s = SimplexStream::new_unsplit(8);
|
||||
|
||||
// Fill buffer completely
|
||||
s.get_mut().copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
s.advance_mut(8);
|
||||
assert_eq!(s.remaining(), 8);
|
||||
assert_eq!(s.remaining_mut(), 0);
|
||||
|
||||
// Read 4 bytes - this frees space at the front
|
||||
assert_eq!(s.get(), &[0, 1, 2, 3, 4, 5, 6, 7]);
|
||||
s.advance(4);
|
||||
// ptr=4, len=4
|
||||
assert_eq!(s.remaining(), 4);
|
||||
assert_eq!(s.remaining_mut(), 4);
|
||||
assert_eq!(s.get(), &[4, 5, 6, 7]);
|
||||
|
||||
// Write space wraps to the front
|
||||
// tail = (4+4) % 8 = 0, so get_mut returns buffer[0..4]
|
||||
assert_eq!(s.get_mut().len(), 4);
|
||||
s.get_mut().copy_from_slice(&[8, 9, 10, 11]);
|
||||
s.advance_mut(4);
|
||||
// ptr=4, len=8
|
||||
assert_eq!(s.remaining(), 8);
|
||||
assert_eq!(s.remaining_mut(), 0);
|
||||
|
||||
// Read wraps around - first chunk is [4,5,6,7]
|
||||
assert_eq!(s.get(), &[4, 5, 6, 7]);
|
||||
s.advance(4);
|
||||
// ptr=0, len=4
|
||||
|
||||
// Second chunk is [8,9,10,11]
|
||||
assert_eq!(s.get(), &[8, 9, 10, 11]);
|
||||
s.advance(4);
|
||||
// ptr=4, len=0
|
||||
|
||||
assert!(s.get().is_empty());
|
||||
assert_eq!(s.remaining_mut(), 8);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user