diff --git a/futures-plex/Cargo.toml b/futures-plex/Cargo.toml index 5937875..3c1f5f6 100644 --- a/futures-plex/Cargo.toml +++ b/futures-plex/Cargo.toml @@ -5,5 +5,4 @@ edition = "2024" description = "Port of tokio's `SimplexStream` and `DuplexStream` for the `futures` ecosystem." [dependencies] -bytes = { version = "1" } futures = { version = "0.3", default-features = false, features = ["unstable", "bilock", "executor"] } diff --git a/futures-plex/src/half.rs b/futures-plex/src/half.rs index a433f8a..d4f6946 100644 --- a/futures-plex/src/half.rs +++ b/futures-plex/src/half.rs @@ -4,21 +4,57 @@ use futures::{ AsyncRead, AsyncWrite, io::{IoSlice, IoSliceMut}, - lock::BiLock, + lock::{BiLock, BiLockGuard}, }; + use std::{ fmt, io::{self}, + ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll, ready}, }; use crate::SimplexStream; +#[derive(Debug)] +pub struct ReadGuard<'a, T>(pub(crate) BiLockGuard<'a, T>); + +impl<'a, T> Deref for ReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Unpin> DerefMut for ReadGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_pin_mut().get_mut() + } +} + /// The readable half of an object. #[derive(Debug)] pub struct ReadHalf { - handle: BiLock, + pub(crate) handle: BiLock, +} + +#[derive(Debug)] +pub struct WriteGuard<'a, T>(pub(crate) BiLockGuard<'a, T>); + +impl<'a, T> Deref for WriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Unpin> DerefMut for WriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_pin_mut().get_mut() + } } /// The writable half of an object. @@ -41,6 +77,12 @@ pub(crate) fn split(t: T) -> (ReadHalf, WriteHalf< } impl ReadHalf { + /// Attempt to acquire a lock on the read side, returning `Poll::Pending` if + /// it can't be acquired. + pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { + self.handle.poll_lock(cx).map(ReadGuard) + } + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &WriteHalf) -> bool { @@ -60,6 +102,12 @@ impl ReadHalf { } impl WriteHalf { + /// Attempt to acquire a lock on the write side, returning `Poll::Pending` + /// if it can't be acquired. + pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { + self.handle.poll_lock(cx).map(WriteGuard) + } + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &ReadHalf) -> bool { diff --git a/futures-plex/src/lib.rs b/futures-plex/src/lib.rs index f2641e1..117a7ef 100644 --- a/futures-plex/src/lib.rs +++ b/futures-plex/src/lib.rs @@ -2,15 +2,15 @@ use std::{ io::{Read, Write}, - pin::Pin, - task::{self, Poll, Waker}, + pin::{Pin, pin}, + task::{self, Poll, Waker, ready}, }; -use bytes::{Buf, BytesMut}; -use futures::{AsyncRead, AsyncWrite, future::poll_fn}; +use futures::{AsyncBufRead, AsyncRead, AsyncWrite, future::poll_fn, io}; mod half; -pub use half::{ReadHalf, WriteHalf}; + +pub use half::{ReadGuard, ReadHalf, WriteGuard, WriteHalf}; /// A bidirectional pipe to read and write bytes in memory. /// @@ -53,57 +53,38 @@ pub struct DuplexStream { } impl DuplexStream { - /// Returns the number of bytes that can be read. - pub fn remaining(&self) -> usize { - self.read.remaining() + /// Read data from this duplex into the provided writer. + pub fn poll_read_to( + &self, + cx: &mut task::Context<'_>, + wr: W, + ) -> Poll> { + ready!(self.poll_lock_read(cx)).poll_read_to(cx, wr) } - /// Returns the number of bytes that can be written. - pub fn remaining_mut(&self) -> usize { - self.write.remaining_mut() + /// Write data from the provided reader into this duplex. + pub fn poll_write_from( + &self, + cx: &mut task::Context<'_>, + rd: R, + ) -> Poll> { + ready!(self.poll_lock_write(cx)).poll_write_from(cx, rd) } -} -/// A unidirectional pipe to read and write bytes in memory. -/// -/// It can be constructed by [`simplex`] function which will create a pair of -/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will -/// create a handle for both reading and writing. -/// -/// # Example -/// -/// ``` -/// # async fn ex() -> std::io::Result<()> { -/// # use futures::{AsyncReadExt, AsyncWriteExt}; -/// let (mut receiver, mut sender) = futures_plex::simplex(64); -/// -/// sender.write_all(b"ping").await?; -/// -/// let mut buf = [0u8; 4]; -/// receiver.read_exact(&mut buf).await?; -/// assert_eq!(&buf, b"ping"); -/// # Ok(()) -/// # } -/// ``` -#[derive(Debug)] -pub struct SimplexStream { - /// The buffer storing the bytes written, also read from. - /// - /// Using a `BytesMut` because it has efficient `Buf` and `BufMut` - /// functionality already. Additionally, it can try to copy data in the - /// same buffer if there read index has advanced far enough. - buffer: BytesMut, - /// Determines if the write side has been closed. - is_closed: bool, - /// The maximum amount of bytes that can be written before returning - /// `Poll::Pending`. - max_buf_size: usize, - /// If the `read` side has been polled and is pending, this is the waker - /// for that parked task. - read_waker: Option, - /// If the `write` side has filled the `max_buf_size` and returned - /// `Poll::Pending`, this is the waker for that parked task. - write_waker: Option, + /// Attempt to acquire a lock on the read side, returning `Poll::Pending` + /// if it can't be acquired. + pub fn poll_lock_read(&self, cx: &mut task::Context<'_>) -> Poll> { + self.read.poll_lock(cx) + } + + /// Attempt to acquire a lock on the write side, returning `Poll::Pending` + /// if it can't be acquired. + pub fn poll_lock_write( + &self, + cx: &mut task::Context<'_>, + ) -> Poll> { + self.write.poll_lock(cx) + } } // ===== impl DuplexStream ===== @@ -140,7 +121,7 @@ impl AsyncRead for DuplexStream { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.read).poll_read(cx, buf) } } @@ -151,7 +132,7 @@ impl AsyncWrite for DuplexStream { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.write).poll_write(cx, buf) } @@ -164,40 +145,73 @@ impl AsyncWrite for DuplexStream { } #[allow(unused_mut)] - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.write).poll_flush(cx) } #[allow(unused_mut)] - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.write).poll_close(cx) } } impl Read for DuplexStream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + fn read(&mut self, buf: &mut [u8]) -> io::Result { let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf)); futures::executor::block_on(fut) } } impl Write for DuplexStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { + fn write(&mut self, buf: &[u8]) -> io::Result { let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf)); futures::executor::block_on(fut) } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { Ok(()) } } +/// A unidirectional pipe to read and write bytes in memory. +/// +/// It can be constructed by [`simplex`] function which will create a pair of +/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will +/// create a handle for both reading and writing. +/// +/// # Example +/// +/// ``` +/// # async fn ex() -> std::io::Result<()> { +/// # use futures::{AsyncReadExt, AsyncWriteExt}; +/// let (mut receiver, mut sender) = futures_plex::simplex(64); +/// +/// sender.write_all(b"ping").await?; +/// +/// let mut buf = [0u8; 4]; +/// receiver.read_exact(&mut buf).await?; +/// assert_eq!(&buf, b"ping"); +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug)] +pub struct SimplexStream { + // The buffer storing the bytes written, also read from. + buffer: Box<[u8]>, + // Pointer to the next byte in the buffer. + ptr: usize, + // Number of bytes in the buffer. + len: usize, + // Determines if the write side has been closed. + is_closed: bool, + // If the `read` side has been polled and is pending, this is the waker + // for that parked task. + read_waker: Option, + // If the `write` side has filled the `max_buf_size` and returned + // `Poll::Pending`, this is the waker for that parked task. + write_waker: Option, +} + // ===== impl SimplexStream ===== /// Creates unidirectional buffer that acts like in memory pipe. @@ -234,13 +248,14 @@ impl SimplexStream { /// split version with separate reader and writer you can use /// [`simplex`] function. /// - /// The `max_buf_size` argument is the maximum amount of bytes that can be + /// The `buf_size` argument is the maximum amount of bytes that can be /// written to a buffer before the it returns `Poll::Pending`. - pub fn new_unsplit(max_buf_size: usize) -> SimplexStream { + pub fn new_unsplit(buf_size: usize) -> SimplexStream { SimplexStream { - buffer: BytesMut::new(), + buffer: vec![0; buf_size].into_boxed_slice(), + ptr: 0, + len: 0, is_closed: false, - max_buf_size, read_waker: None, write_waker: None, } @@ -248,12 +263,158 @@ impl SimplexStream { /// Returns the number of bytes that can be read from this buffer. pub fn remaining(&self) -> usize { - self.buffer.remaining() + self.len } /// Returns the number of bytes that can be written into this buffer. pub fn remaining_mut(&self) -> usize { - self.max_buf_size - self.buffer.remaining() + self.buffer.len() - self.len + } + + /// Returns a reference to the first contiguous chunk of data in the buffer. + /// + /// For a ring buffer, data may wrap around. This returns only the first + /// contiguous chunk. Call again after `advance()` to get remaining data. + pub fn get(&self) -> &[u8] { + if self.len == 0 { + return &[]; + } + let cap = self.buffer.len(); + let end = self.ptr + self.len; + if end <= cap { + &self.buffer[self.ptr..end] + } else { + // Data wraps, return first contiguous chunk + &self.buffer[self.ptr..cap] + } + } + + /// Returns a reference to the data in the buffer when data is available. + pub fn poll_get(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if self.remaining() > 0 { + Poll::Ready(Ok(self.get())) + } else if self.is_closed { + Poll::Ready(Ok(&[])) + } else { + self.read_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + /// Returns a mutable slice to the first contiguous chunk of available + /// capacity in the buffer. + /// + /// For a ring buffer, available space may wrap around. This returns only + /// the first contiguous chunk. Call again after `advance_mut()` to get + /// remaining space. + pub fn get_mut(&mut self) -> &mut [u8] { + let cap = self.buffer.len(); + let avail = cap - self.len; + if avail == 0 { + return &mut []; + } + let tail = (self.ptr + self.len) % cap; + if tail < self.ptr { + // Tail wrapped around, contiguous space is tail..ptr + &mut self.buffer[tail..self.ptr] + } else { + // Tail is at or after ptr, contiguous space is tail..cap + &mut self.buffer[tail..cap] + } + } + + /// Returns a mutable reference to the available space in the buffer when + /// there is space available. + pub fn poll_mut(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if self.is_closed { + return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); + } + let avail = self.remaining_mut(); + if avail == 0 { + self.write_waker = Some(cx.waker().clone()); + Poll::Pending + } else { + Poll::Ready(Ok(self.get_mut())) + } + } + + /// Advances the read cursor. + /// + /// This method should be called after reading from the simplex using + /// [`get`](Self::get) or [`poll_get`](Self::poll_get) to advance the read + /// cursor. + /// + /// # Panics + /// + /// Panics if the provided amount exceeds the data in the buffer. + /// + /// # Arguments + /// + /// * `amt` - The number of bytes read. + pub fn advance(&mut self, amt: usize) { + if amt == 0 { + return; + } + + assert!(amt <= self.len, "out of bounds"); + self.ptr = (self.ptr + amt) % self.buffer.len(); + self.len -= amt; + + // Wake the writer side now that space is available. + if let Some(waker) = self.write_waker.take() { + waker.wake(); + } + } + + /// Advances the write cursor. + /// + /// This method should be called after writing to the simplex using + /// [`get_mut`](Self::get_mut) or [`poll_mut`](Self::poll_mut) to advance + /// the write cursor. + /// + /// # Panics + /// + /// Panics if the provided amount exceeds the spare capacity. + /// + /// # Arguments + /// + /// * `amt` - The number of bytes written. + pub fn advance_mut(&mut self, amt: usize) { + if amt == 0 { + return; + } + + assert!(self.len + amt <= self.buffer.len(), "out of bounds"); + self.len += amt; + + // Wake the read side now that data is available. + if let Some(waker) = self.read_waker.take() { + waker.wake(); + } + } + + /// Read data from this simplex into the provided writer. + pub fn poll_read_to( + &mut self, + cx: &mut task::Context<'_>, + wr: W, + ) -> Poll> { + let buf = ready!(self.poll_get(cx))?; + let len = ready!(pin!(wr).poll_write(cx, buf))?; + self.advance(len); + Poll::Ready(Ok(len)) + } + + /// Write data from the provided reader into this simplex. + pub fn poll_write_from( + &mut self, + cx: &mut task::Context<'_>, + rd: R, + ) -> Poll> { + let buf = ready!(self.poll_mut(cx))?; + let len = ready!(pin!(rd).poll_read(cx, buf))?; + self.advance_mut(len); + Poll::Ready(Ok(len)) } fn close_write(&mut self) { @@ -268,46 +429,35 @@ impl SimplexStream { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], - ) -> Poll> { - if self.buffer.has_remaining() { - let len = self.buffer.remaining().min(buf.len()); - buf[..len].copy_from_slice(&self.buffer[..len]); - self.buffer.advance(len); - if len > 0 { - // The passed `buf` might have been empty, don't wake up if - // no bytes have been moved. - if let Some(waker) = self.write_waker.take() { - waker.wake(); - } - } - Poll::Ready(Ok(len)) - } else if self.is_closed { - Poll::Ready(Ok(0)) - } else { - self.read_waker = Some(cx.waker().clone()); - Poll::Pending + ) -> Poll> { + let src = ready!(self.poll_get(cx))?; + + if src.is_empty() { + return Poll::Ready(Ok(0)); } + + let len = buf.len().min(src.len()); + buf[..len].copy_from_slice(&src[..len]); + self.advance(len); + + Poll::Ready(Ok(len)) } fn poll_write_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], - ) -> Poll> { - if self.is_closed { - return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); - } - let avail = self.max_buf_size - self.buffer.len(); - if avail == 0 { - self.write_waker = Some(cx.waker().clone()); - return Poll::Pending; - } + ) -> Poll> { + let dest = ready!(self.poll_mut(cx))?; + debug_assert!( + !dest.is_empty(), + "returned ready when no space is available" + ); + + let len = buf.len().min(dest.len()); + dest[..len].copy_from_slice(&buf[..len]); + self.advance_mut(len); - let len = buf.len().min(avail); - self.buffer.extend_from_slice(&buf[..len]); - if let Some(waker) = self.read_waker.take() { - waker.wake(); - } Poll::Ready(Ok(len)) } @@ -316,30 +466,29 @@ impl SimplexStream { cx: &mut task::Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll> { - if self.is_closed { - return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); - } - let avail = self.max_buf_size - self.buffer.len(); - if avail == 0 { - self.write_waker = Some(cx.waker().clone()); - return Poll::Pending; - } + let mut dest = ready!(self.poll_mut(cx))?; + debug_assert!( + !dest.is_empty(), + "returned ready when no space is available" + ); - let mut rem = avail; + let avail = dest.len(); + let mut amt = 0; for buf in bufs { - if rem == 0 { + if amt >= avail { break; } - let len = buf.len().min(rem); - self.buffer.extend_from_slice(&buf[..len]); - rem -= len; + let len = buf.len().min(dest.len()); + dest[..len].copy_from_slice(&buf[..len]); + + dest = &mut dest[len..]; + amt += len; } - if let Some(waker) = self.read_waker.take() { - waker.wake(); - } - Poll::Ready(Ok(avail - rem)) + self.advance_mut(amt); + + Poll::Ready(Ok(amt)) } } @@ -348,17 +497,27 @@ impl AsyncRead for SimplexStream { self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { self.poll_read_internal(cx, buf) } } +impl AsyncBufRead for SimplexStream { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + Pin::get_mut(self).poll_get(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.advance(amt); + } +} + impl AsyncWrite for SimplexStream { fn poll_write( self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { self.poll_write_internal(cx, buf) } @@ -366,37 +525,213 @@ impl AsyncWrite for SimplexStream { self: Pin<&mut Self>, cx: &mut task::Context<'_>, bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { + ) -> Poll> { self.poll_write_vectored_internal(cx, bufs) } - fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut task::Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { self.close_write(); Poll::Ready(Ok(())) } } impl Read for SimplexStream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + fn read(&mut self, buf: &mut [u8]) -> io::Result { let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf)); futures::executor::block_on(fut) } } impl Write for SimplexStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { + fn write(&mut self, buf: &[u8]) -> io::Result { let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf)); futures::executor::block_on(fut) } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_simplex_no_capacity() { + let mut s = SimplexStream::new_unsplit(0); + assert!(s.get().is_empty()); + assert!(s.get_mut().is_empty()); + } + + #[test] + fn test_simplex() { + let mut s = SimplexStream::new_unsplit(8); + assert!(s.get().is_empty()); + assert_eq!(s.get_mut().len(), 8); + assert_eq!(s.get_mut(), &vec![0; 8]); + + s.get_mut().copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]); + + // write 2 bytes + s.advance_mut(2); + assert_eq!(s.remaining(), 2); + assert_eq!(s.remaining_mut(), 6); + assert_eq!(s.get(), &[0, 1]); + assert_eq!(s.get_mut(), &[2, 3, 4, 5, 6, 7]); + + // read 1 byte + s.advance(1); + assert_eq!(s.remaining(), 1); + // space is reclaimed immediately with ring buffer + assert_eq!(s.remaining_mut(), 7); + + // write the rest of the bytes + s.advance_mut(6); + assert_eq!(s.get(), &[1, 2, 3, 4, 5, 6, 7]); + // read everything out + s.advance(7); + + assert!(s.get().is_empty()); + assert_eq!(s.get_mut().len(), 8); + } + + #[test] + fn test_read_to() { + let mut s0 = SimplexStream::new_unsplit(16); + let mut s1 = SimplexStream::new_unsplit(8); + + s0.advance_mut(8); + assert_eq!(s0.remaining(), 8); + + let waker = Waker::noop(); + let mut cx = task::Context::from_waker(waker); + + assert_eq!(s1.remaining(), 0); + assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready()); + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 8); + + s1.advance(8); + assert_eq!(s1.remaining(), 0); + + s0.advance_mut(4); + assert_eq!(s0.remaining(), 4); + + assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready()); + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 4); + + s1.advance(4); + assert_eq!(s1.remaining(), 0); + + // With ring buffer, data wraps around and may need multiple reads + s0.advance_mut(16); + assert_eq!(s0.remaining(), 16); + + // Transfer in chunks due to ring buffer wrap-around + while s0.remaining() > 0 && s1.remaining_mut() > 0 { + assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready()); + } + assert_eq!(s1.remaining(), 8); + s1.advance(8); + + while s0.remaining() > 0 && s1.remaining_mut() > 0 { + assert!(s0.poll_read_to(&mut cx, &mut s1).is_ready()); + } + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 8); + } + + #[test] + fn test_write_from() { + let mut s0 = SimplexStream::new_unsplit(16); + let mut s1 = SimplexStream::new_unsplit(8); + + s0.advance_mut(8); + assert_eq!(s0.remaining(), 8); + + let waker = Waker::noop(); + let mut cx = task::Context::from_waker(waker); + + assert_eq!(s1.remaining(), 0); + assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready()); + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 8); + + s1.advance(8); + assert_eq!(s1.remaining(), 0); + + s0.advance_mut(4); + assert_eq!(s0.remaining(), 4); + + assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready()); + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 4); + + s1.advance(4); + assert_eq!(s1.remaining(), 0); + + // With ring buffer, data wraps around and may need multiple writes + s0.advance_mut(16); + assert_eq!(s0.remaining(), 16); + + // Transfer in chunks due to ring buffer wrap-around + while s0.remaining() > 0 && s1.remaining_mut() > 0 { + assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready()); + } + assert_eq!(s1.remaining(), 8); + s1.advance(8); + + while s0.remaining() > 0 && s1.remaining_mut() > 0 { + assert!(s1.poll_write_from(&mut cx, &mut s0).is_ready()); + } + assert_eq!(s0.remaining(), 0); + assert_eq!(s1.remaining(), 8); + } + + #[test] + fn test_ring_buffer_wrap() { + let mut s = SimplexStream::new_unsplit(8); + + // Fill buffer completely + s.get_mut().copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]); + s.advance_mut(8); + assert_eq!(s.remaining(), 8); + assert_eq!(s.remaining_mut(), 0); + + // Read 4 bytes - this frees space at the front + assert_eq!(s.get(), &[0, 1, 2, 3, 4, 5, 6, 7]); + s.advance(4); + // ptr=4, len=4 + assert_eq!(s.remaining(), 4); + assert_eq!(s.remaining_mut(), 4); + assert_eq!(s.get(), &[4, 5, 6, 7]); + + // Write space wraps to the front + // tail = (4+4) % 8 = 0, so get_mut returns buffer[0..4] + assert_eq!(s.get_mut().len(), 4); + s.get_mut().copy_from_slice(&[8, 9, 10, 11]); + s.advance_mut(4); + // ptr=4, len=8 + assert_eq!(s.remaining(), 8); + assert_eq!(s.remaining_mut(), 0); + + // Read wraps around - first chunk is [4,5,6,7] + assert_eq!(s.get(), &[4, 5, 6, 7]); + s.advance(4); + // ptr=0, len=4 + + // Second chunk is [8,9,10,11] + assert_eq!(s.get(), &[8, 9, 10, 11]); + s.advance(4); + // ptr=4, len=0 + + assert!(s.get().is_empty()); + assert_eq!(s.remaining_mut(), 8); + } +}