From 7ca0b132787a31faa0e91c816d6603d40db7e526 Mon Sep 17 00:00:00 2001 From: th4s Date: Mon, 8 Dec 2025 19:27:18 +0100 Subject: [PATCH] feat(futures-plex): add sync `Read` and `Write` support * feat(futures-plex): add sync `Read` and `Write` support * return `WouldBlock` error instead * use `futures::block_on` and add way to inspect buffer * delegate to async methods in sync implementations --- futures-plex/Cargo.toml | 3 +- futures-plex/src/half.rs | 158 +++++++++++++++++++++++++++++++++++++++ futures-plex/src/lib.rs | 80 +++++++++++++++++--- 3 files changed, 228 insertions(+), 13 deletions(-) create mode 100644 futures-plex/src/half.rs diff --git a/futures-plex/Cargo.toml b/futures-plex/Cargo.toml index 0923785..5937875 100644 --- a/futures-plex/Cargo.toml +++ b/futures-plex/Cargo.toml @@ -6,5 +6,4 @@ description = "Port of tokio's `SimplexStream` and `DuplexStream` for the `futur [dependencies] bytes = { version = "1" } -futures-io = { version = "0.3" } -futures-util = { version = "0.3", default-features = false, features = ["io"] } +futures = { version = "0.3", default-features = false, features = ["unstable", "bilock", "executor"] } diff --git a/futures-plex/src/half.rs b/futures-plex/src/half.rs new file mode 100644 index 0000000..a433f8a --- /dev/null +++ b/futures-plex/src/half.rs @@ -0,0 +1,158 @@ +//! Adapted from +//! to support sync operations. + +use futures::{ + AsyncRead, AsyncWrite, + io::{IoSlice, IoSliceMut}, + lock::BiLock, +}; +use std::{ + fmt, + io::{self}, + pin::Pin, + task::{Context, Poll, ready}, +}; + +use crate::SimplexStream; + +/// The readable half of an object. +#[derive(Debug)] +pub struct ReadHalf { + handle: BiLock, +} + +/// The writable half of an object. +#[derive(Debug)] +pub struct WriteHalf { + handle: BiLock, +} + +fn lock_and_then(lock: &BiLock, cx: &mut Context<'_>, f: F) -> Poll> +where + F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, +{ + let mut l = ready!(lock.poll_lock(cx)); + f(l.as_pin_mut(), cx) +} + +pub(crate) fn split(t: T) -> (ReadHalf, WriteHalf) { + let (a, b) = BiLock::new(t); + (ReadHalf { handle: a }, WriteHalf { handle: b }) +} + +impl ReadHalf { + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same + /// stream. + pub fn is_pair_of(&self, other: &WriteHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + +impl ReadHalf { + /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` + /// back together. Succeeds only if the `ReadHalf` and `WriteHalf` + /// are a matching pair originating from the same call to `split`. + pub fn reunite(self, other: WriteHalf) -> Result> { + self.handle + .reunite(other.handle) + .map_err(|err| ReuniteError(Self { handle: err.0 }, WriteHalf { handle: err.1 })) + } +} + +impl WriteHalf { + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same + /// stream. + pub fn is_pair_of(&self, other: &ReadHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + +impl WriteHalf { + /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` + /// back together. Succeeds only if the `ReadHalf` and `WriteHalf` + /// are a matching pair originating from the same call to `split`. + pub fn reunite(self, other: ReadHalf) -> Result> { + other.reunite(self) + } +} + +impl AsyncRead for ReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs)) + } +} + +impl AsyncWrite for WriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx)) + } +} + +impl ReadHalf { + /// Returns the number of bytes that can be read. + pub fn remaining(&self) -> usize { + let handle = futures::executor::block_on(self.handle.lock()); + handle.remaining() + } +} + +impl WriteHalf { + /// Returns the number of bytes that can be written. + pub fn remaining_mut(&self) -> usize { + let handle = futures::executor::block_on(self.handle.lock()); + handle.remaining_mut() + } +} + +/// Error indicating a `ReadHalf` and `WriteHalf` were not two halves +/// of a `AsyncRead + AsyncWrite`, and thus could not be `reunite`d. +pub struct ReuniteError(pub ReadHalf, pub WriteHalf); + +impl fmt::Debug for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("ReuniteError").field(&"...").finish() + } +} + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite a ReadHalf and WriteHalf that don't form a pair" + ) + } +} + +impl std::error::Error for ReuniteError {} diff --git a/futures-plex/src/lib.rs b/futures-plex/src/lib.rs index a4c388d..f2641e1 100644 --- a/futures-plex/src/lib.rs +++ b/futures-plex/src/lib.rs @@ -1,16 +1,16 @@ #![doc = include_str!("../README.md")] use std::{ + io::{Read, Write}, pin::Pin, task::{self, Poll, Waker}, }; use bytes::{Buf, BytesMut}; -use futures_io::{AsyncRead, AsyncWrite}; -use futures_util::{ - AsyncReadExt, - io::{ReadHalf, WriteHalf}, -}; +use futures::{AsyncRead, AsyncWrite, future::poll_fn}; + +mod half; +pub use half::{ReadHalf, WriteHalf}; /// A bidirectional pipe to read and write bytes in memory. /// @@ -30,7 +30,7 @@ use futures_util::{ /// /// ``` /// # async fn ex() -> std::io::Result<()> { -/// # use futures_util::{AsyncReadExt, AsyncWriteExt}; +/// # use futures::{AsyncReadExt, AsyncWriteExt}; /// let (mut client, mut server) = futures_plex::duplex(64); /// /// client.write_all(b"ping").await?; @@ -52,6 +52,18 @@ pub struct DuplexStream { write: WriteHalf, } +impl DuplexStream { + /// Returns the number of bytes that can be read. + pub fn remaining(&self) -> usize { + self.read.remaining() + } + + /// Returns the number of bytes that can be written. + pub fn remaining_mut(&self) -> usize { + self.write.remaining_mut() + } +} + /// A unidirectional pipe to read and write bytes in memory. /// /// It can be constructed by [`simplex`] function which will create a pair of @@ -62,7 +74,7 @@ pub struct DuplexStream { /// /// ``` /// # async fn ex() -> std::io::Result<()> { -/// # use futures_util::{AsyncReadExt, AsyncWriteExt}; +/// # use futures::{AsyncReadExt, AsyncWriteExt}; /// let (mut receiver, mut sender) = futures_plex::simplex(64); /// /// sender.write_all(b"ping").await?; @@ -102,8 +114,8 @@ pub struct SimplexStream { /// The `max_buf_size` argument is the maximum amount of bytes that can be /// written to a side before the write returns `Poll::Pending`. pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { - let (read_0, write_0) = SimplexStream::new_unsplit(max_buf_size).split(); - let (read_1, write_1) = SimplexStream::new_unsplit(max_buf_size).split(); + let (read_0, write_0) = half::split(SimplexStream::new_unsplit(max_buf_size)); + let (read_1, write_1) = half::split(SimplexStream::new_unsplit(max_buf_size)); ( DuplexStream { @@ -168,6 +180,24 @@ impl AsyncWrite for DuplexStream { } } +impl Read for DuplexStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf)); + futures::executor::block_on(fut) + } +} + +impl Write for DuplexStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf)); + futures::executor::block_on(fut) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + // ===== impl SimplexStream ===== /// Creates unidirectional buffer that acts like in memory pipe. @@ -184,7 +214,7 @@ impl AsyncWrite for DuplexStream { /// /// ``` /// # async fn ex() -> std::io::Result<()> { -/// # use futures_util::{AsyncReadExt, AsyncWriteExt}; +/// # use futures::{AsyncReadExt, AsyncWriteExt}; /// let (reader, writer) = futures_plex::simplex(64); /// let mut simplex_stream = reader.reunite(writer).unwrap(); /// simplex_stream.write_all(b"hello").await?; @@ -196,7 +226,7 @@ impl AsyncWrite for DuplexStream { /// # } /// ``` pub fn simplex(max_buf_size: usize) -> (ReadHalf, WriteHalf) { - SimplexStream::new_unsplit(max_buf_size).split() + half::split(SimplexStream::new_unsplit(max_buf_size)) } impl SimplexStream { @@ -216,6 +246,16 @@ impl SimplexStream { } } + /// Returns the number of bytes that can be read from this buffer. + pub fn remaining(&self) -> usize { + self.buffer.remaining() + } + + /// Returns the number of bytes that can be written into this buffer. + pub fn remaining_mut(&self) -> usize { + self.max_buf_size - self.buffer.remaining() + } + fn close_write(&mut self) { self.is_closed = true; // needs to notify any readers that no more data will come @@ -342,3 +382,21 @@ impl AsyncWrite for SimplexStream { Poll::Ready(Ok(())) } } + +impl Read for SimplexStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let fut = poll_fn(|cx| AsyncRead::poll_read(Pin::new(self), cx, buf)); + futures::executor::block_on(fut) + } +} + +impl Write for SimplexStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let fut = poll_fn(|cx| AsyncWrite::poll_write(Pin::new(self), cx, buf)); + futures::executor::block_on(fut) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +}