feat(serio): unbounded memory channels (#37)

* Added unbounded memory channels

* Add Clone to channel senders

* Changelog

* Apply suggestions from code review

Co-authored-by: th4s <th4s@metavoid.xyz>

* UnboundedMemoryDuplex + comments

---------

Co-authored-by: th4s <th4s@metavoid.xyz>
This commit is contained in:
Nicolas Le Bel
2024-11-25 18:14:45 +02:00
committed by GitHub
parent 09f1abe261
commit 280bb38dac
2 changed files with 135 additions and 1 deletions

View File

@@ -5,3 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
`UnboundedMemorySink` and `UnboundedMemoryStream` mirroring the `MemorySink` and `MemoryStream` wrappers of `futures::mpsc::{Sender, Receiver}` for `futures::mpsc::{UnboundedSender, UnboundedReceiver}`.

View File

@@ -16,7 +16,7 @@ use crate::{Deserialize, Serialize, Sink, Stream};
type Item = Box<dyn Any + Send + Sync + 'static>;
/// A memory sink that can be used to send any serializable type to the receiver.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemorySink(mpsc::Sender<Item>);
impl Sink for MemorySink {
@@ -77,6 +77,68 @@ pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) {
(MemorySink(sender), MemoryStream(receiver))
}
/// An unbounded memory sink that can be used to send any serializable type to the receiver.
#[derive(Debug, Clone)]
pub struct UnboundedMemorySink(mpsc::UnboundedSender<Item>);
impl Sink for UnboundedMemorySink {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_ready(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.0)
.start_send(Box::new(item))
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_flush(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_close(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
}
/// An unbounded memory stream that can be used to receive any deserializable type from the sender.
#[derive(Debug)]
pub struct UnboundedMemoryStream(mpsc::UnboundedReceiver<Item>);
impl Stream for UnboundedMemoryStream {
type Error = Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.0).poll_next(cx).map(|item| {
item.map(|item| {
item.downcast().map(|item| *item).map_err(|_| {
Error::new(ErrorKind::InvalidData, "sender sent an unexpected type")
})
})
})
}
}
/// Creates a new unbounded memory channel.
pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) {
let (sender, receiver) = mpsc::unbounded();
(UnboundedMemorySink(sender), UnboundedMemoryStream(receiver))
}
/// A memory duplex that can be used to send and receive any serializable types.
#[derive(Debug)]
pub struct MemoryDuplex {
@@ -145,6 +207,74 @@ pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) {
)
}
/// An unbounded memory duplex that can be used to send and receive any serializable types.
#[derive(Debug)]
pub struct UnboundedMemoryDuplex {
sink: UnboundedMemorySink,
stream: UnboundedMemoryStream,
}
impl UnboundedMemoryDuplex {
/// Returns the inner sink and stream.
pub fn into_inner(self) -> (UnboundedMemorySink, UnboundedMemoryStream) {
(self.sink, self.stream)
}
/// Returns a reference to the inner sink.
pub fn sink_mut(&mut self) -> &mut UnboundedMemorySink {
&mut self.sink
}
/// Returns a reference to the inner stream.
pub fn stream_mut(&mut self) -> &mut UnboundedMemoryStream {
&mut self.stream
}
}
impl Sink for UnboundedMemoryDuplex {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
impl Stream for UnboundedMemoryDuplex {
type Error = Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
/// Creates a new unbounded memory duplex.
pub fn unbounded_duplex() -> (UnboundedMemoryDuplex, UnboundedMemoryDuplex) {
let (a, b) = unbounded();
let (c, d) = unbounded();
(
UnboundedMemoryDuplex { sink: a, stream: d },
UnboundedMemoryDuplex { sink: c, stream: b },
)
}
#[cfg(test)]
mod tests {
use super::*;