feat: serio (#24)

* feat: serio

* add IoStreamExt

* fix doc link

* add into_stream

* pin project IntoSink

* add flush method to SinkExt

* add duplex traits

* Apply suggestions from code review

Co-authored-by: dan <themighty1@users.noreply.github.com>

* add Framed test

---------

Co-authored-by: dan <themighty1@users.noreply.github.com>
This commit is contained in:
sinu.eth
2024-03-04 12:44:56 -08:00
committed by GitHub
parent 51f313d153
commit 8f2fc9e540
12 changed files with 1458 additions and 1 deletions

View File

@@ -1,18 +1,23 @@
[workspace]
members = ["utils", "utils-aio", "spansy"]
members = ["utils", "utils-aio", "spansy", "serio"]
[workspace.dependencies]
tlsn-utils = { path = "utils" }
tlsn-utils-aio = { path = "utils-aio" }
spansy = { path = "spansy" }
serio = { path = "serio" }
rand = "0.8"
thiserror = "1"
async-trait = "0.1"
prost = "0.9"
futures = "0.3"
futures-sink = "0.3"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
tokio-util = "0.7"
tokio-serde = "0.8"
tokio = "1.23"
async-tungstenite = "0.16"
prost-build = "0.9"
@@ -20,3 +25,6 @@ bytes = "1"
async-std = "1"
rayon = "1"
serde = "1"
cfg-if = "1"
bincode = "1.3"
pin-project-lite = "0.2"

33
serio/Cargo.toml Normal file
View File

@@ -0,0 +1,33 @@
[package]
name = "serio"
version = "0.1.0"
edition = "2021"
[features]
default = ["compat", "channel", "codec", "bincode"]
compat = ["dep:futures-sink"]
channel = ["dep:futures-channel"]
codec = ["dep:tokio-util"]
bincode = ["dep:bincode"]
[dependencies]
bytes.workspace = true
serde.workspace = true
pin-project-lite.workspace = true
futures-core.workspace = true
futures-channel = { workspace = true, optional = true }
futures-sink = { workspace = true, optional = true }
futures-util = { workspace = true, features = ["bilock", "unstable"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
bincode = { workspace = true, optional = true }
[dev-dependencies]
futures.workspace = true
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["codec"] }
[[example]]
name = "tokio_codec"
path = "examples/tokio_codec.rs"
required-features = ["codec"]

203
serio/LICENSE-APACHE Normal file
View File

@@ -0,0 +1,203 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright (c) 2016 Alex Crichton
Copyright (c) 2017 The Tokio Authors
Copyright (c) 2024 The TLSNotary authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

27
serio/LICENSE-MIT Normal file
View File

@@ -0,0 +1,27 @@
Copyright (c) 2016 Alex Crichton
Copyright (c) 2017 The Tokio Authors
Copyright (c) 2024 The TLSNotary authors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

11
serio/README.md Normal file
View File

@@ -0,0 +1,11 @@
# serio
This crate provides the `Sink` and `Stream` traits which are similar to `futures::Sink` and `futures::Stream` except they work with *any* serializable types.
Much of the functionality from this crate was either adapted or directly copied from the [`futures`](https://github.com/rust-lang/futures-rs/) project. Inspiration was also taken from the [`tokio-serde`](https://crates.io/crates/tokio-serde) crate.
# License
Licensed under either of Apache License, Version 2.0 or MIT license at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

View File

@@ -0,0 +1,47 @@
use serde::{Deserialize, Serialize};
use serio::{
codec::{Bincode, Framed},
IoSink, IoStream, SinkExt as _, StreamExt as _,
};
use std::io::Result;
use tokio::io::duplex;
use tokio_util::codec::LengthDelimitedCodec;
#[derive(Serialize, Deserialize)]
struct Ping(String);
#[derive(Serialize, Deserialize)]
struct Pong(u32);
#[tokio::main]
async fn main() {
let (a, b) = duplex(1024);
let a = LengthDelimitedCodec::builder().new_framed(a);
let b = LengthDelimitedCodec::builder().new_framed(b);
let a = Framed::new(a, Bincode::default());
let b = Framed::new(b, Bincode::default());
tokio::try_join!(alice(a), bob(b)).unwrap();
}
async fn alice<T: IoSink + IoStream + Unpin>(mut io: T) -> Result<()> {
io.send(Ping("hello bob!".to_string())).await?;
println!("alice: sent ping");
let Pong(num) = io.next().await.unwrap()?;
println!("alice: received pong {num}");
Ok(())
}
async fn bob<T: IoSink + IoStream + Unpin>(mut io: T) -> Result<()> {
let Ping(msg) = io.next().await.unwrap()?;
println!("bob: received ping \"{msg}\"");
io.send(Pong(42)).await?;
println!("bob: sent pong");
Ok(())
}

192
serio/src/channel.rs Normal file
View File

@@ -0,0 +1,192 @@
//! Memory channels for sending and receiving serializable types. Useful for testing.
use std::{
any::Any,
io::{Error, ErrorKind},
pin::Pin,
task::{Context, Poll},
};
use futures_channel::mpsc;
use futures_core::Stream as _;
use futures_sink::Sink as _;
use crate::{Deserialize, Serialize, Sink, Stream};
type Item = Box<dyn Any + Send + Sync + 'static>;
/// A memory sink that can be used to send any serializable type to the receiver.
pub struct MemorySink(mpsc::Sender<Item>);
impl Sink for MemorySink {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_ready(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.0)
.start_send(Box::new(item))
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_flush(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.0)
.poll_close(cx)
.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
}
}
/// A memory stream that can be used to receive any deserializable type from the sender.
pub struct MemoryStream(mpsc::Receiver<Item>);
impl Stream for MemoryStream {
type Error = Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.0).poll_next(cx).map(|item| {
item.map(|item| {
item.downcast().map(|item| *item).map_err(|_| {
Error::new(ErrorKind::InvalidData, "sender sent an unexpected type")
})
})
})
}
}
/// Creates a new memory channel with the specified buffer size.
pub fn channel(buffer: usize) -> (MemorySink, MemoryStream) {
let (sender, receiver) = mpsc::channel(buffer);
(MemorySink(sender), MemoryStream(receiver))
}
/// A memory duplex that can be used to send and receive any serializable types.
pub struct MemoryDuplex {
sink: MemorySink,
stream: MemoryStream,
}
impl MemoryDuplex {
/// Returns the inner sink and stream.
pub fn into_inner(self) -> (MemorySink, MemoryStream) {
(self.sink, self.stream)
}
/// Returns a reference to the inner sink.
pub fn sink_mut(&mut self) -> &mut MemorySink {
&mut self.sink
}
/// Returns a reference to the inner stream.
pub fn stream_mut(&mut self) -> &mut MemoryStream {
&mut self.stream
}
}
impl Sink for MemoryDuplex {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
impl Stream for MemoryDuplex {
type Error = Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
/// Creates a new memory duplex with the specified buffer size.
pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) {
let (a, b) = channel(buffer);
let (c, d) = channel(buffer);
(
MemoryDuplex { sink: a, stream: d },
MemoryDuplex { sink: c, stream: b },
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{SinkExt, StreamExt};
#[test]
fn test_channel() {
let (mut sink, mut stream) = channel(1);
futures::executor::block_on(async {
sink.send(42u8).await.unwrap();
assert_eq!(stream.next::<u8>().await.unwrap().unwrap(), 42);
})
}
#[test]
#[should_panic]
fn test_channel_type_mismatch() {
let (mut sink, mut stream) = channel(1);
futures::executor::block_on(async {
sink.send(42u16).await.unwrap();
stream.next::<u8>().await.unwrap().unwrap();
})
}
#[test]
fn test_duplex() {
let (mut a, mut b) = duplex(1);
futures::executor::block_on(async {
a.send(42u8).await.unwrap();
assert_eq!(b.next::<u8>().await.unwrap().unwrap(), 42);
})
}
#[test]
#[should_panic]
fn test_duplex_type_mismatch() {
let (mut a, mut b) = duplex(1);
futures::executor::block_on(async {
a.send(42u16).await.unwrap();
b.next::<u8>().await.unwrap().unwrap();
})
}
}

172
serio/src/codec.rs Normal file
View File

@@ -0,0 +1,172 @@
//! Utilities for converting framed transports to streams and sinks using a codec.
use std::{
io::{Error, ErrorKind},
pin::Pin,
task::{ready, Context, Poll},
};
use bytes::{Bytes, BytesMut};
use futures_core::stream::TryStream;
use crate::{Deserialize, Serialize, Sink, Stream};
/// A serializer.
pub trait Serializer {
/// The error type.
type Error;
/// Serializes `item` into a buffer.
fn serialize<T: Serialize>(&mut self, item: &T) -> Result<Bytes, Self::Error>;
}
/// A deserializer.
pub trait Deserializer {
/// The error type.
type Error;
/// Deserializes a buffer into a value.
fn deserialize<T: Deserialize>(&mut self, buf: &BytesMut) -> Result<T, Self::Error>;
}
#[cfg(feature = "bincode")]
mod bincode_impl {
use super::*;
use bincode::{deserialize, serialize};
/// A bincode codec.
#[derive(Default, Clone)]
pub struct Bincode;
impl Serializer for Bincode {
type Error = bincode::Error;
fn serialize<T: Serialize>(&mut self, item: &T) -> Result<Bytes, Self::Error> {
Ok(Bytes::from(serialize(item)?))
}
}
impl Deserializer for Bincode {
type Error = bincode::Error;
fn deserialize<T: Deserialize>(&mut self, buf: &BytesMut) -> Result<T, Self::Error> {
Ok(deserialize(buf)?)
}
}
}
#[cfg(feature = "bincode")]
pub use bincode_impl::Bincode;
/// A framed transport.
pub struct Framed<T, C> {
inner: T,
codec: C,
}
impl<T, C> Framed<T, C> {
/// Creates a new `Framed` with the given transport and codec.
pub fn new(inner: T, codec: C) -> Self {
Self { inner, codec }
}
}
impl<T, C> Sink for Framed<T, C>
where
T: futures_sink::Sink<Bytes, Error = Error> + Unpin,
C: Serializer + Unpin,
<C as Serializer>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn start_send<I: Serialize>(
mut self: std::pin::Pin<&mut Self>,
item: I,
) -> Result<(), Self::Error> {
let buf = self
.codec
.serialize(&item)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
Pin::new(&mut self.inner).start_send(buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
impl<T, C> Stream for Framed<T, C>
where
T: TryStream<Ok = BytesMut, Error = Error> + Unpin,
C: Deserializer + Unpin,
<C as Deserializer>::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Error = Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Error>>> {
let Some(buf) = ready!(Pin::new(&mut self.inner).try_poll_next(cx)) else {
return Poll::Ready(None);
};
let item = self
.codec
.deserialize(&buf?)
.map_err(|e| Error::new(ErrorKind::InvalidData, e));
Poll::Ready(Some(item))
}
}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use tokio::io::duplex;
use tokio_util::codec::LengthDelimitedCodec;
use crate::{SinkExt, StreamExt};
use super::*;
#[derive(Serialize, Deserialize)]
struct Ping;
#[derive(Serialize, Deserialize)]
struct Pong;
#[test]
fn test_framed() {
let (a, b) = duplex(1024);
let a = LengthDelimitedCodec::builder().new_framed(a);
let b = LengthDelimitedCodec::builder().new_framed(b);
let mut a = Framed::new(a, Bincode::default());
let mut b = Framed::new(b, Bincode::default());
let a = async {
a.send(Ping).await.unwrap();
a.next::<Pong>().await.unwrap().unwrap();
};
let b = async {
b.next::<Ping>().await.unwrap().unwrap();
b.send(Pong).await.unwrap();
};
futures::executor::block_on(async {
futures::join!(a, b);
});
}
}

10
serio/src/future.rs Normal file
View File

@@ -0,0 +1,10 @@
use std::future::Future;
// Just a helper function to ensure the futures we're returning all have the
// right implementations.
pub(crate) fn assert_future<T, F>(future: F) -> F
where
F: Future<Output = T>,
{
future
}

36
serio/src/lib.rs Normal file
View File

@@ -0,0 +1,36 @@
#![doc = include_str!("../README.md")]
#![deny(missing_docs, unreachable_pub, unused_must_use)]
#![deny(clippy::all)]
#[cfg(feature = "channel")]
pub mod channel;
#[cfg(feature = "codec")]
pub mod codec;
pub(crate) mod future;
pub mod sink;
pub mod stream;
#[cfg(feature = "codec")]
pub use codec::{Deserializer, Framed, Serializer};
pub use sink::{IoSink, Sink, SinkExt};
pub use stream::{IoStream, Stream, StreamExt};
/// A serializable type.
pub trait Serialize: serde::Serialize + Send + Sync + Unpin + 'static {}
impl<T> Serialize for T where T: serde::Serialize + Send + Sync + Unpin + 'static {}
/// A deserializable type.
pub trait Deserialize: serde::de::DeserializeOwned + Send + Sync + Unpin + 'static {}
impl<T> Deserialize for T where T: serde::de::DeserializeOwned + Send + Sync + Unpin + 'static {}
/// A duplex.
pub trait Duplex<T, E>: Sink<Error = E> + Stream<Error = E> {}
impl<T, U, E> Duplex<U, E> for T where T: Sink<Error = E> + Stream<Error = E> {}
/// A duplex with a `std::io::Error` error type.
pub trait IoDuplex<T>: Duplex<T, std::io::Error> {}
impl<T, U> IoDuplex<U> for T where T: Duplex<U, std::io::Error> {}

387
serio/src/sink.rs Normal file
View File

@@ -0,0 +1,387 @@
//! Sink types and traits.
use std::{
future::Future,
marker::PhantomData,
ops::DerefMut,
pin::Pin,
task::{ready, Context, Poll},
};
use crate::{future::assert_future, Serialize};
/// A sink with an error type of `std::io::Error`.
pub trait IoSink: Sink<Error = std::io::Error> {}
impl<T: ?Sized> IoSink for T where T: Sink<Error = std::io::Error> {}
/// A sink which accepts any item which implements `Serialize`.
///
/// This trait is similar to [`futures::Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html),
/// but facilitates sending of any serializable type instead of a single type.
#[must_use = "sinks do nothing unless polled"]
pub trait Sink {
/// The type of value produced by the sink when an error occurs.
type Error;
/// Attempts to prepare the `Sink` to receive a value.
///
/// This method must be called and return `Poll::Ready(Ok(()))` prior to
/// each call to `start_send`.
///
/// This method returns `Poll::Ready` once the underlying sink is ready to
/// receive data. If this method returns `Poll::Pending`, the current task
/// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
/// should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
/// permanently be unable to receive items.
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Begin the process of sending a value to the sink.
/// Each call to this function must be preceded by a successful call to
/// `poll_ready` which returned `Poll::Ready(Ok(()))`.
///
/// As the name suggests, this method only *begins* the process of sending
/// the item. If the sink employs buffering, the item isn't fully processed
/// until the buffer is fully flushed. Since sinks are designed to work with
/// asynchronous I/O, the process of actually writing out the data to an
/// underlying object takes place asynchronously. **You *must* use
/// `poll_flush` or `poll_close` in order to guarantee completion of a
/// send**.
///
/// Implementations of `poll_ready` and `start_send` will usually involve
/// flushing behind the scenes in order to make room for new messages.
/// It is only necessary to call `poll_flush` if you need to guarantee that
/// *all* of the items placed into the `Sink` have been sent.
///
/// In most cases, if the sink encounters an error, the sink will
/// permanently be unable to receive items.
fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
/// Flush any remaining output from this sink.
///
/// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this
/// value is returned then it is guaranteed that all previous values sent
/// via `start_send` have been flushed.
///
/// Returns `Poll::Pending` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_flush` should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
/// permanently be unable to receive items.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Flush any remaining output and close this sink, if necessary.
///
/// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink
/// has been successfully closed.
///
/// Returns `Poll::Pending` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_close` should be called again.
///
/// If this function encounters an error, the sink should be considered to
/// have failed permanently, and no more `Sink` methods should be called.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
impl<S: ?Sized + Sink + Unpin> Sink for &mut S {
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_ready(cx)
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut **self).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_close(cx)
}
}
impl<P> Sink for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Sink,
{
type Error = <P::Target as Sink>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_ready(cx)
}
fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
self.get_mut().as_mut().start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_close(cx)
}
}
/// An extension trait for Sinks that provides a variety of convenient functions.
pub trait SinkExt: Sink {
/// Close the sink.
fn close(&mut self) -> Close<'_, Self>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Close::new(self))
}
/// A future that completes after the given item has been fully processed
/// into the sink, including flushing.
///
/// Note that, **because of the flushing requirement, it is usually better
/// to batch together items to send via `feed` or `send_all`,
/// rather than flushing between each item.**
fn send<Item: Serialize>(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
}
/// A future that completes after the given item has been received
/// by the sink.
///
/// Unlike `send`, the returned future does not flush the sink.
/// It is the caller's responsibility to ensure all pending items
/// are processed, which can be done via `flush` or `close`.
fn feed<Item: Serialize>(&mut self, item: Item) -> Feed<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
}
/// A future that completes after the sink has been flushed.
fn flush(&mut self) -> Flush<'_, Self>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
}
/// Convert this sink into a `futures::Sink`.
#[cfg(feature = "compat")]
fn into_sink<Item: Serialize>(self) -> IntoSink<Self, Item>
where
Self: Sized,
{
assert_futures_sink(IntoSink::new(self))
}
}
impl<S: Sink + ?Sized> SinkExt for S {}
/// Future for the [`close`](SinkExt::close) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Close<'a, Si: ?Sized> {
sink: &'a mut Si,
}
impl<Si: Unpin + ?Sized> Unpin for Close<'_, Si> {}
/// A future that completes when the sink has finished closing.
///
/// The sink itself is returned after closing is complete.
impl<'a, Si: Sink + Unpin + ?Sized> Close<'a, Si> {
fn new(sink: &'a mut Si) -> Self {
Self { sink }
}
}
impl<Si: Sink + Unpin + ?Sized> Future for Close<'_, Si> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
/// Future for the [`send`](SinkExt::send) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
feed: Feed<'a, Si, Item>,
}
// Pinning is never projected to children.
impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> {
fn new(sink: &'a mut Si, item: Item) -> Self {
Self {
feed: Feed::new(sink, item),
}
}
}
impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Send<'_, Si, Item> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.feed.is_item_pending() {
ready!(Pin::new(&mut this.feed).poll(cx))?;
debug_assert!(!this.feed.is_item_pending());
}
// We're done sending the item, but want to block on flushing the
// sink.
ready!(this.feed.sink_pin_mut().poll_flush(cx))?;
Poll::Ready(Ok(()))
}
}
/// Future for the [`feed`](SinkExt::feed) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Feed<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
}
// Pinning is never projected to children.
impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}
impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
fn new(sink: &'a mut Si, item: Item) -> Self {
Feed {
sink,
item: Some(item),
}
}
fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}
fn is_item_pending(&self) -> bool {
self.item.is_some()
}
}
impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Feed<'_, Si, Item> {
type Output = Result<(), Si::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut sink = Pin::new(&mut this.sink);
ready!(sink.as_mut().poll_ready(cx))?;
let item = this.item.take().expect("polled Feed after completion");
sink.as_mut().start_send(item)?;
Poll::Ready(Ok(()))
}
}
/// Future for the [`flush`](SinkExt::flush) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flush<'a, Si: ?Sized> {
sink: &'a mut Si,
}
impl<Si: Unpin + ?Sized> Unpin for Flush<'_, Si> {}
impl<'a, Si: Sink + Unpin + ?Sized> Flush<'a, Si> {
fn new(sink: &'a mut Si) -> Self {
Self { sink }
}
}
impl<Si: Sink + Unpin + ?Sized> Future for Flush<'_, Si> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.sink).poll_flush(cx)
}
}
#[cfg(feature = "compat")]
mod compat {
use super::*;
pin_project_lite::pin_project! {
/// Wraps a sink and provides a `futures::Sink` implementation.
pub struct IntoSink<Si, Item> {
#[pin]
sink: Si,
_pd: PhantomData<Item>,
}
}
impl<Si, Item> IntoSink<Si, Item> {
pub(super) fn new(sink: Si) -> Self {
Self {
sink,
_pd: PhantomData,
}
}
/// Returns a reference to the inner sink.
pub fn sink(&self) -> &Si {
&self.sink
}
/// Returns a mutable reference to the inner sink.
pub fn sink_mut(&mut self) -> &mut Si {
&mut self.sink
}
/// Returns the inner sink.
pub fn into_inner(self) -> Si {
self.sink
}
}
impl<Si, Item> futures_sink::Sink<Item> for IntoSink<Si, Item>
where
Si: Sink,
Item: Serialize,
{
type Error = Si::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().sink.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
self.project().sink.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().sink.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().sink.poll_close(cx)
}
}
}
#[cfg(feature = "compat")]
pub use compat::IntoSink;
pub(crate) fn assert_futures_sink<S: futures_sink::Sink<Item>, Item>(sink: S) -> S {
sink
}

331
serio/src/stream.rs Normal file
View File

@@ -0,0 +1,331 @@
//! Stream types and traits.
use std::{
future::Future,
marker::PhantomData,
ops::DerefMut,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_core::FusedFuture;
use crate::{future::assert_future, Deserialize};
/// A stream with an error type of `std::io::Error`.
pub trait IoStream: Stream<Error = std::io::Error> {}
impl<T: ?Sized> IoStream for T where T: Stream<Error = std::io::Error> {}
/// A stream producing any kind of value which implements `Deserialize`.
///
/// This trait is similar to [`futures::Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html),
/// but facilitates receiving of any deserializable type instead of a single type.
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
/// The type of value produced by the stream when an error occurs.
type Error;
/// Attempt to pull out the next value of this stream, registering the
/// current task for wakeup if the value is not yet available, and returning
/// `None` if the stream is exhausted.
///
/// # Return value
///
/// There are several possible return values, each indicating a distinct
/// stream state:
///
/// - `Poll::Pending` means that this stream's next value is not ready
/// yet. Implementations will ensure that the current task will be notified
/// when the next value may be ready.
///
/// - `Poll::Ready(Some(val))` means that the stream has successfully
/// produced a value, `val`, and may produce further values on subsequent
/// `poll_next` calls.
///
/// - `Poll::Ready(None)` means that the stream has terminated, and
/// `poll_next` should not be invoked again.
fn poll_next<Item: Deserialize>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>;
/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Error = S::Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
S::poll_next(Pin::new(&mut **self), cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Stream,
{
type Error = <P::Target as Stream>::Error;
fn poll_next<Item: Deserialize>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
self.get_mut().as_mut().poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
/// A stream which tracks whether or not the underlying stream
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
/// stream has become inactive and can no longer make progress and should be
/// ignored or dropped rather than being polled again.
pub trait FusedStream: Stream {
/// Returns `true` if the stream should no longer be polled.
fn is_terminated(&self) -> bool;
}
impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedStream>::is_terminated(&**self)
}
}
impl<P> FusedStream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: FusedStream,
{
fn is_terminated(&self) -> bool {
<P::Target as FusedStream>::is_terminated(&**self)
}
}
/// An extension trait for Streams that provides a variety of convenient functions.
pub trait StreamExt: Stream {
/// Creates a future that resolves to the next item in the stream.
///
/// Note that because `next` doesn't take ownership over the stream,
/// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
/// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
/// be done by boxing the stream using [`Box::pin`] or
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=3);
///
/// assert_eq!(stream.next().await, Some(1));
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, Some(3));
/// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn next<Item: Deserialize>(&mut self) -> Next<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Option<Result<Item, Self::Error>>, _>(Next::new(self))
}
/// Converts this stream into a `futures::Stream`.
#[cfg(feature = "compat")]
fn into_stream<Item: Deserialize>(self) -> IntoStream<Self, Item>
where
Self: Sized,
{
assert_futures_stream(IntoStream::new(self))
}
/// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
/// stream types.
fn poll_next_unpin<Item: Deserialize>(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>
where
Self: Unpin,
{
Pin::new(self).poll_next(cx)
}
}
impl<S: Stream + ?Sized> StreamExt for S {}
/// Future for the [`next`](StreamExt::next) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, St: ?Sized, Item> {
stream: &'a mut St,
_pd: PhantomData<Item>,
}
impl<St: ?Sized + Unpin, Item> Unpin for Next<'_, St, Item> {}
impl<'a, St: ?Sized + Stream + Unpin, Item> Next<'a, St, Item> {
pub(super) fn new(stream: &'a mut St) -> Self {
Self {
stream,
_pd: PhantomData,
}
}
}
impl<St: ?Sized + FusedStream + Unpin, Item: Deserialize> FusedFuture for Next<'_, St, Item> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
impl<St: ?Sized + Stream + Unpin, Item: Deserialize> Future for Next<'_, St, Item> {
type Output = Option<Result<Item, St::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_next_unpin(cx)
}
}
#[cfg(feature = "compat")]
mod compat {
use super::*;
pin_project_lite::pin_project! {
/// Wraps a stream and provides a `futures::Stream` implementation.
pub struct IntoStream<St, Item> {
#[pin]
stream: St,
_pd: PhantomData<Item>,
}
}
impl<St, Item> IntoStream<St, Item> {
pub(super) fn new(stream: St) -> Self {
Self {
stream,
_pd: PhantomData,
}
}
/// Returns a reference to the inner stream.
pub fn stream(&self) -> &St {
&self.stream
}
/// Returns a mutable reference to the inner stream.
pub fn stream_mut(&mut self) -> &mut St {
&mut self.stream
}
/// Returns the inner stream.
pub fn into_inner(self) -> St {
self.stream
}
}
impl<St, Item> futures_core::Stream for IntoStream<St, Item>
where
St: Stream,
Item: Deserialize,
{
type Item = Result<Item, St::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
}
#[cfg(feature = "compat")]
pub use compat::IntoStream;
/// An extension trait for [`IoStream`] which provides a variety of convenient functions.
pub trait IoStreamExt: IoStream {
/// Creates a future that resolves to the next item in the stream, returning
/// an error if the stream is exhausted.
fn expect_next<Item: Deserialize>(&mut self) -> ExpectNext<'_, Self, Item>
where
Self: Unpin,
{
ExpectNext {
next: self.next(),
_pd: PhantomData,
}
}
}
impl<S: ?Sized> IoStreamExt for S where S: IoStream {}
/// Future for the [`expect_next`](IoStreamExt::expect_next) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ExpectNext<'a, St: ?Sized, Item> {
next: Next<'a, St, Item>,
_pd: PhantomData<Item>,
}
impl<St: ?Sized + Unpin, Item> Unpin for ExpectNext<'_, St, Item> {}
impl<'a, St: ?Sized + IoStream + Unpin, Item: Deserialize> Future for ExpectNext<'a, St, Item> {
type Output = Result<Item, St::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.next).poll(cx)) {
Some(Ok(item)) => Poll::Ready(Ok(item)),
Some(Err(err)) => Poll::Ready(Err(err)),
None => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
}
}
}
pub(crate) fn assert_futures_stream<S: futures_core::Stream>(stream: S) -> S {
stream
}