feat(futures-plex): sync trait impls

This commit is contained in:
sinu
2025-09-02 15:02:55 -07:00
parent dea3db250c
commit eb4841348c

View File

@@ -1,6 +1,7 @@
#![doc = include_str!("../README.md")]
use std::{
io::{Read, Write},
pin::Pin,
task::{self, Poll, Waker},
};
@@ -216,7 +217,28 @@ impl SimplexStream {
}
}
fn close_write(&mut self) {
/// Returns true if there is no data in the buffer.
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
/// Returns the number of bytes in the buffer.
pub fn len(&self) -> usize {
self.buffer.len()
}
/// Returns the capacity of the buffer.
pub fn capacity(&self) -> usize {
self.max_buf_size
}
/// Returns the maximum number of bytes that can be written before blocking.
pub fn remaining_capacity(&self) -> usize {
self.max_buf_size - self.buffer.len()
}
/// Closes the write side of the simplex.
pub fn close_write(&mut self) {
self.is_closed = true;
// needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
@@ -224,6 +246,26 @@ impl SimplexStream {
}
}
fn read_internal(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.has_remaining() {
let len = self.buffer.remaining().min(buf.len());
buf[..len].copy_from_slice(&self.buffer[..len]);
self.buffer.advance(len);
if len > 0 {
// The passed `buf` might have been empty, don't wake up if
// no bytes have been moved.
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
}
Ok(len)
} else if self.is_closed {
Ok(0)
} else {
Err(std::io::ErrorKind::WouldBlock.into())
}
}
fn poll_read_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
@@ -249,6 +291,23 @@ impl SimplexStream {
}
}
fn write_internal(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.is_closed {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
let avail = self.max_buf_size - self.buffer.len();
if avail == 0 {
return Err(std::io::ErrorKind::WouldBlock.into());
}
let len = buf.len().min(avail);
self.buffer.extend_from_slice(&buf[..len]);
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
Ok(len)
}
fn poll_write_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
@@ -271,6 +330,32 @@ impl SimplexStream {
Poll::Ready(Ok(len))
}
fn write_vectored_internal(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
if self.is_closed {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
let avail = self.max_buf_size - self.buffer.len();
if avail == 0 {
return Err(std::io::ErrorKind::WouldBlock.into());
}
let mut rem = avail;
for buf in bufs {
if rem == 0 {
break;
}
let len = buf.len().min(rem);
self.buffer.extend_from_slice(&buf[..len]);
rem -= len;
}
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
Ok(avail - rem)
}
fn poll_write_vectored_internal(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
@@ -313,6 +398,12 @@ impl AsyncRead for SimplexStream {
}
}
impl Read for SimplexStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.read_internal(buf)
}
}
impl AsyncWrite for SimplexStream {
fn poll_write(
self: Pin<&mut Self>,
@@ -342,3 +433,17 @@ impl AsyncWrite for SimplexStream {
Poll::Ready(Ok(()))
}
}
impl Write for SimplexStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.write_internal(buf)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
self.write_vectored_internal(bufs)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}