mirror of
https://github.com/AtHeartEngineering/rust-libp2p-dandelion.git
synced 2026-01-10 16:57:56 -05:00
muxers/mplex: Implement AsyncRead and AsyncWrite for Substream (#2706)
This aligns the public API of the `libp2p-mplex` module with the one
from `libp2p-yamux`. This change has two benefits:
1. For standalone users of `libp2p-mplex`, the substreams itself are
now useful, similar to `libp2p-yamux` and don't necessarily need to
be polled via the `StreamMuxer`. The `StreamMuxer` only forwards to
the `Async{Read,Write}` implementations.
2. This will reduce the diff of #2648 because we can chunk the one
giant commit into smaller atomic ones.
This commit is contained in:
@@ -46,6 +46,8 @@
|
||||
# 0.46.0 [unreleased]
|
||||
- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
|
||||
- Added weak dependencies for features. See [PR 2646].
|
||||
- Update individual crates.
|
||||
- Update to [`libp2p-mplex` `v0.34.0`](muxers/mplex/CHANGELOG.md).
|
||||
|
||||
[PR 2646]: https://github.com/libp2p/rust-libp2p/pull/2646
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional =
|
||||
libp2p-identify = { version = "0.36.1", path = "protocols/identify", optional = true }
|
||||
libp2p-kad = { version = "0.37.1", path = "protocols/kad", optional = true }
|
||||
libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true }
|
||||
libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true }
|
||||
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
|
||||
libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true }
|
||||
libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true }
|
||||
libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true }
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
# 0.34.0 [unreleased]
|
||||
|
||||
- `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706].
|
||||
|
||||
[PR 2706]: https://github.com/libp2p/rust-libp2p/pull/2706/
|
||||
|
||||
# 0.33.0
|
||||
|
||||
- Update to `libp2p-core` `v0.33.0`.
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "libp2p-mplex"
|
||||
edition = "2021"
|
||||
rust-version = "1.56.1"
|
||||
description = "Mplex multiplexing protocol for libp2p"
|
||||
version = "0.33.0"
|
||||
version = "0.34.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
|
||||
@@ -33,7 +33,7 @@ use libp2p_core::{
|
||||
StreamMuxer,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use std::{cmp, iter, task::Context, task::Poll};
|
||||
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};
|
||||
|
||||
impl UpgradeInfo for MplexConfig {
|
||||
type Info = &'static [u8];
|
||||
@@ -54,7 +54,7 @@ where
|
||||
|
||||
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
future::ready(Ok(Multiplex {
|
||||
io: Mutex::new(io::Multiplexed::new(socket, self)),
|
||||
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -69,7 +69,7 @@ where
|
||||
|
||||
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
future::ready(Ok(Multiplex {
|
||||
io: Mutex::new(io::Multiplexed::new(socket, self)),
|
||||
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -79,14 +79,14 @@ where
|
||||
/// This implementation isn't capable of detecting when the underlying socket changes its address,
|
||||
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
|
||||
pub struct Multiplex<C> {
|
||||
io: Mutex<io::Multiplexed<C>>,
|
||||
io: Arc<Mutex<io::Multiplexed<C>>>,
|
||||
}
|
||||
|
||||
impl<C> StreamMuxer for Multiplex<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
type Substream = Substream;
|
||||
type Substream = Substream<C>;
|
||||
type OutboundSubstream = OutboundSubstream;
|
||||
type Error = io::Error;
|
||||
|
||||
@@ -95,7 +95,7 @@ where
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
|
||||
let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
|
||||
let stream = Substream::new(stream_id);
|
||||
let stream = Substream::new(stream_id, self.io.clone());
|
||||
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ where
|
||||
_: &mut Self::OutboundSubstream,
|
||||
) -> Poll<Result<Self::Substream, io::Error>> {
|
||||
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
|
||||
Poll::Ready(Ok(Substream::new(stream_id)))
|
||||
Poll::Ready(Ok(Substream::new(stream_id, self.io.clone())))
|
||||
}
|
||||
|
||||
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
|
||||
@@ -122,22 +122,7 @@ where
|
||||
substream: &mut Self::Substream,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
loop {
|
||||
// Try to read from the current (i.e. last received) frame.
|
||||
if !substream.current_data.is_empty() {
|
||||
let len = cmp::min(substream.current_data.len(), buf.len());
|
||||
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
|
||||
return Poll::Ready(Ok(len));
|
||||
}
|
||||
|
||||
// Read the next data frame from the multiplexed stream.
|
||||
match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
|
||||
Some(data) => {
|
||||
substream.current_data = data;
|
||||
}
|
||||
None => return Poll::Ready(Ok(0)),
|
||||
}
|
||||
}
|
||||
Pin::new(substream).poll_read(cx, buf)
|
||||
}
|
||||
|
||||
fn write_substream(
|
||||
@@ -146,7 +131,7 @@ where
|
||||
substream: &mut Self::Substream,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
self.io.lock().poll_write_stream(cx, substream.id, buf)
|
||||
Pin::new(substream).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn flush_substream(
|
||||
@@ -154,7 +139,7 @@ where
|
||||
cx: &mut Context<'_>,
|
||||
substream: &mut Self::Substream,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
self.io.lock().poll_flush_stream(cx, substream.id)
|
||||
Pin::new(substream).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn shutdown_substream(
|
||||
@@ -162,11 +147,11 @@ where
|
||||
cx: &mut Context<'_>,
|
||||
substream: &mut Self::Substream,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
self.io.lock().poll_close_stream(cx, substream.id)
|
||||
Pin::new(substream).poll_close(cx)
|
||||
}
|
||||
|
||||
fn destroy_substream(&self, sub: Self::Substream) {
|
||||
self.io.lock().drop_stream(sub.id);
|
||||
std::mem::drop(sub)
|
||||
}
|
||||
|
||||
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
@@ -177,19 +162,98 @@ where
|
||||
/// Active attempt to open an outbound substream.
|
||||
pub struct OutboundSubstream {}
|
||||
|
||||
impl<C> AsyncRead for Substream<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
loop {
|
||||
// Try to read from the current (i.e. last received) frame.
|
||||
if !this.current_data.is_empty() {
|
||||
let len = cmp::min(this.current_data.len(), buf.len());
|
||||
buf[..len].copy_from_slice(&this.current_data.split_to(len));
|
||||
return Poll::Ready(Ok(len));
|
||||
}
|
||||
|
||||
// Read the next data frame from the multiplexed stream.
|
||||
match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
|
||||
Some(data) => {
|
||||
this.current_data = data;
|
||||
}
|
||||
None => return Poll::Ready(Ok(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> AsyncWrite for Substream<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
this.io.lock().poll_write_stream(cx, this.id, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
this.io.lock().poll_flush_stream(cx, this.id)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let this = self.get_mut();
|
||||
let mut io = this.io.lock();
|
||||
|
||||
ready!(io.poll_close_stream(cx, this.id))?;
|
||||
ready!(io.poll_flush_stream(cx, this.id))?;
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Active substream to the remote.
|
||||
pub struct Substream {
|
||||
pub struct Substream<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
/// The unique, local identifier of the substream.
|
||||
id: LocalStreamId,
|
||||
/// The current data frame the substream is reading from.
|
||||
current_data: Bytes,
|
||||
/// Shared reference to the actual muxer.
|
||||
io: Arc<Mutex<io::Multiplexed<C>>>,
|
||||
}
|
||||
|
||||
impl Substream {
|
||||
fn new(id: LocalStreamId) -> Self {
|
||||
impl<C> Substream<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
current_data: Bytes::new(),
|
||||
io,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Drop for Substream<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
self.io.lock().drop_stream(self.id);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user