mirror of
https://github.com/th4s/mpz-play.git
synced 2026-01-08 01:53:54 -05:00
WIP: Simplify lessons connect and oblivious-transfer.
This commit is contained in:
@@ -1,21 +1,19 @@
|
||||
use common::{tcp_mux, FramedUidMux, Role, DEFAULT_LOCAL};
|
||||
use serio::{stream::IoStreamExt, SinkExt};
|
||||
use common::{tcp_connect, Role, DEFAULT_LOCAL};
|
||||
use serio::{
|
||||
codec::{Bincode, Codec},
|
||||
stream::IoStreamExt,
|
||||
SinkExt,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Open connection and poll it in the background.
|
||||
let (future, mut ctrl) = tcp_mux(Role::Alice, DEFAULT_LOCAL).await.unwrap();
|
||||
let join_handle = tokio::spawn(future);
|
||||
|
||||
// Open a channel.
|
||||
let mut channel = ctrl.open_framed(b"1").await.unwrap();
|
||||
// Open a connection.
|
||||
let tcp = tcp_connect(Role::Alice, DEFAULT_LOCAL).await.unwrap();
|
||||
let mut channel = Bincode::default().new_framed(tcp);
|
||||
|
||||
// Send a number to Bob and wait for Bob's number.
|
||||
channel.send(42_u32).await.unwrap();
|
||||
let received: u32 = channel.expect_next().await.unwrap();
|
||||
println!("Alice received: {received}");
|
||||
|
||||
// Properly close the connection.
|
||||
ctrl.mux_mut().close();
|
||||
join_handle.await.unwrap().unwrap();
|
||||
println!("Alice received: {received}");
|
||||
}
|
||||
|
||||
@@ -1,22 +1,20 @@
|
||||
use common::{tcp_mux, FramedUidMux, Role, DEFAULT_LOCAL};
|
||||
use serio::{stream::IoStreamExt, SinkExt};
|
||||
use common::{tcp_connect, Role, DEFAULT_LOCAL};
|
||||
use serio::{
|
||||
codec::{Bincode, Codec},
|
||||
stream::IoStreamExt,
|
||||
SinkExt,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Open connection and poll it in the background.
|
||||
let (future, mut ctrl) = tcp_mux(Role::Bob, DEFAULT_LOCAL).await.unwrap();
|
||||
let join_handle = tokio::spawn(future);
|
||||
|
||||
// Open a channel.
|
||||
let mut channel = ctrl.open_framed(b"1").await.unwrap();
|
||||
// Open a connection.
|
||||
let tcp = tcp_connect(Role::Bob, DEFAULT_LOCAL).await.unwrap();
|
||||
let mut channel = Bincode::default().new_framed(tcp);
|
||||
|
||||
// Wait for Alice to send her number, then increment and send it back.
|
||||
let mut received: u32 = channel.expect_next().await.unwrap();
|
||||
println!("Bob received: {received}");
|
||||
|
||||
received += 1;
|
||||
channel.send(received).await.unwrap();
|
||||
|
||||
// Properly close the connection.
|
||||
ctrl.mux_mut().close();
|
||||
join_handle.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
//! In this unit we want to run a simple connection test. Alice wants to open a multiplexed TCP
|
||||
//! connection with Bob that automatically de-/serializes sent/received messages. Bob is just
|
||||
//! listening on some port and awaiting Alice's incoming connection.
|
||||
//! In this unit we want to run a simple connection test. Alice wants to open a TCP connection with
|
||||
//! Bob that automatically de-/serializes sent/received messages. Bob is just listening on some
|
||||
//! port and awaiting Alice's incoming connection.
|
||||
//!
|
||||
//! Luckily we already prepared some tooling to simplify the IO setup for you. In
|
||||
//! [`common::tcp_mux`] you can find a function to easily set up said TCP connection. Running the
|
||||
//! returned future with [`tokio::spawn`] in the background allows you to open a channel with
|
||||
//! [`common::FramedUidMux::open_framed`] which is implemented by [`common::MuxControl`].
|
||||
//! [`common::tcp_connect`] you can find a function to easily set up said TCP connection.
|
||||
//! Instantiating a [`serio::codec::Bincode`] and wrapping the TCP connection with
|
||||
//! [serio::codec::Codec::new_framed] allows you to open a channel.
|
||||
//!
|
||||
//! To check that you set up the connection correctly, Alice should send some number to Bob. Bob
|
||||
//! increments it by one and sends it back to Alice. For sending you can use [`serio::SinkExt::send`]
|
||||
//! and for receiving it is [`serio::stream::IoStreamExt::expect_next`] which is both implemented by
|
||||
//! the channel you just opened.
|
||||
//!
|
||||
//! After you are done you should make sure to properly close the connection on both sides. This
|
||||
//! can be done by calling [`common::MuxControl::mux_mut`] and then [`common::YamuxCtrl::close`].
|
||||
//! Also make sure in the end to await the join handle returned by [`tokio::spawn`].
|
||||
|
||||
@@ -1,34 +1,31 @@
|
||||
use common::{tcp_mux, Role, DEFAULT_LOCAL};
|
||||
use mpz_common::executor::MTExecutor;
|
||||
use common::{tcp_connect, Role, DEFAULT_LOCAL};
|
||||
use mpz_common::executor::STExecutor;
|
||||
use mpz_core::Block;
|
||||
use mpz_ot::{
|
||||
chou_orlandi::{Sender, SenderConfig},
|
||||
OTSender, OTSetup,
|
||||
};
|
||||
use serio::codec::{Bincode, Codec};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Open connection and poll it in the background.
|
||||
let (future, mut ctrl) = tcp_mux(Role::Alice, DEFAULT_LOCAL).await.unwrap();
|
||||
let join_handle = tokio::spawn(future);
|
||||
// Open a connection.
|
||||
let tcp = tcp_connect(Role::Alice, DEFAULT_LOCAL).await.unwrap();
|
||||
let channel = Bincode::default().new_framed(tcp);
|
||||
|
||||
// Create an executor and spawn a context.
|
||||
let mut executor = MTExecutor::new(ctrl.clone(), 32);
|
||||
let mut context = executor.new_thread().await.unwrap();
|
||||
// Create an executor.
|
||||
let mut executor = STExecutor::new(channel);
|
||||
|
||||
// Create an OT sender and set it up.
|
||||
let sender_config = SenderConfig::builder().build().unwrap();
|
||||
let sender_config = SenderConfig::default();
|
||||
let mut sender = Sender::new(sender_config);
|
||||
sender.setup(&mut context).await.unwrap();
|
||||
|
||||
// Create a message.
|
||||
sender.setup(&mut executor).await.unwrap();
|
||||
|
||||
// Create messages.
|
||||
let zero = Block::ZERO;
|
||||
let one = Block::ONE;
|
||||
|
||||
// Send OTs to Bob.
|
||||
sender.send(&mut context, &[[zero, one]]).await.unwrap();
|
||||
|
||||
// Properly close the connection.
|
||||
ctrl.mux_mut().close();
|
||||
join_handle.await.unwrap().unwrap();
|
||||
sender.send(&mut executor, &[[zero, one]]).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -1,33 +1,30 @@
|
||||
use common::{tcp_mux, Role, DEFAULT_LOCAL};
|
||||
use mpz_common::executor::MTExecutor;
|
||||
use common::{tcp_connect, Role, DEFAULT_LOCAL};
|
||||
use mpz_common::executor::STExecutor;
|
||||
use mpz_ot::{
|
||||
chou_orlandi::{Receiver, ReceiverConfig},
|
||||
OTReceiver, OTSetup,
|
||||
};
|
||||
use serio::codec::{Bincode, Codec};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Open connection and poll it in the background.
|
||||
let (future, mut ctrl) = tcp_mux(Role::Bob, DEFAULT_LOCAL).await.unwrap();
|
||||
let join_handle = tokio::spawn(future);
|
||||
// Open a connection.
|
||||
let tcp = tcp_connect(Role::Bob, DEFAULT_LOCAL).await.unwrap();
|
||||
let channel = Bincode::default().new_framed(tcp);
|
||||
|
||||
// Create an executor and spawn a context.
|
||||
let mut executor = MTExecutor::new(ctrl.clone(), 32);
|
||||
let mut context = executor.new_thread().await.unwrap();
|
||||
// Create an executor.
|
||||
let mut executor = STExecutor::new(channel);
|
||||
|
||||
// Create an OT receiver and set it up.
|
||||
let receiver_config = ReceiverConfig::builder().build().unwrap();
|
||||
let receiver_config = ReceiverConfig::default();
|
||||
let mut receiver = Receiver::new(receiver_config);
|
||||
receiver.setup(&mut context).await.unwrap();
|
||||
|
||||
// Create a choice.
|
||||
receiver.setup(&mut executor).await.unwrap();
|
||||
|
||||
// Make a choice.
|
||||
let choice = true;
|
||||
|
||||
// Receive OTs from Alice.
|
||||
let output = receiver.receive(&mut context, &[choice]).await.unwrap();
|
||||
let output = receiver.receive(&mut executor, &[choice]).await.unwrap();
|
||||
println!("Received from Alice: {:?}", output.msgs.first().unwrap());
|
||||
|
||||
// Properly close the connection.
|
||||
ctrl.mux_mut().close();
|
||||
join_handle.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
@@ -1,16 +1,11 @@
|
||||
//! In this unit we want to do an oblivious transfer (OT). Alice will be the OT sender and Bob will
|
||||
//! be the OT receiver.
|
||||
//!
|
||||
//! We start again by opening a connection and polling it in the background, but this time there is
|
||||
//! no need to open a channel. This is because we use an executor that abstracts the creation of
|
||||
//! channels and threads for us. Simply create a new executor with
|
||||
//! [`mpz_common::executor::MTExecutor::new`] by injecting a cloned [`common::MuxControl`] (so that
|
||||
//! we still have one control to close the connection in the end). Then create a new thread with
|
||||
//! [`mpz_common::executor::MTExecutor::new_thread`], which will be used as the context for all IO
|
||||
//! between Alice and Bob.
|
||||
//! We start again by opening a connection. To be able use the connection with out OT API you need
|
||||
//! to wrap it in a [`mpz_common::executor::STExecutor`].
|
||||
//!
|
||||
//! Now either create an [`mpz_ot::OTSender`] or an [`mpz_ot::OTReceiver`] and set it up by calling
|
||||
//! [`mpz_ot::OTSetup::setup`]. You can use [`mpz_ot::chou_orlandi::Sender`] and
|
||||
//! [`mpz_ot::chou_orlandi::Receiver`] for this. Then perform the OT by calling
|
||||
//! [`mpz_ot::OTSender::send`] or [`mpz_ot::OTReceiver::receive`]. For creating messages that can be
|
||||
//! sent, you can use [`mpz_core::Block`].
|
||||
//! [`mpz_ot::OTSender::send`] or [`mpz_ot::OTReceiver::receive`]. For creating messages that can
|
||||
//! be sent, you can use [`mpz_core::Block`].
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use anyhow::Error as Anyhow;
|
||||
use mux::attach_mux;
|
||||
use tokio::net::TcpSocket;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use futures::{AsyncRead, AsyncWrite};
|
||||
use tokio::{
|
||||
net::TcpSocket,
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
|
||||
mod mux;
|
||||
|
||||
pub use mux::{MuxControl, MuxFuture};
|
||||
pub use uid_mux::{yamux::YamuxCtrl, FramedUidMux};
|
||||
|
||||
/// The default address we use for all examples.
|
||||
pub const DEFAULT_LOCAL: &str = "127.0.0.1:8083";
|
||||
|
||||
@@ -19,14 +16,14 @@ pub enum Role {
|
||||
Bob,
|
||||
}
|
||||
|
||||
/// Opens a multiplexed TCP connection.
|
||||
/// Opens a TCP connection.
|
||||
///
|
||||
/// Depending on the `role` either listens or connects to `address`. Returns a [`MuxFuture`] which
|
||||
/// has to be continuously polled and a [`MuxControl`], which allows to open new channels.
|
||||
pub async fn tcp_mux(
|
||||
/// Depending on the `role` either listens or connects to `address`.
|
||||
/// Returns a tcp stream that implements [`AsyncRead`] and [`AsyncWrite`].
|
||||
pub async fn tcp_connect(
|
||||
role: Role,
|
||||
address: impl AsRef<str>,
|
||||
) -> Result<(MuxFuture, MuxControl), Anyhow> {
|
||||
) -> Result<impl AsyncRead + AsyncWrite, Anyhow> {
|
||||
let addr = address.as_ref().parse()?;
|
||||
|
||||
let tcp_stream = match role {
|
||||
@@ -47,11 +44,11 @@ pub async fn tcp_mux(
|
||||
}
|
||||
};
|
||||
|
||||
Ok(attach_mux(tcp_stream.compat(), role))
|
||||
Ok(tcp_stream.compat())
|
||||
}
|
||||
|
||||
/// Opens a multiplexed WebRTC datachannel.
|
||||
pub async fn webrtc_mux(_role: Role) -> Result<(MuxFuture, MuxControl), Anyhow> {
|
||||
/// Opens a WebRTC datachannel.
|
||||
pub async fn webrtc(_role: Role) -> Result<(), Anyhow> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
//! Multiplexer used in the TLSNotary protocol.
|
||||
|
||||
use crate::Role;
|
||||
use futures::{
|
||||
future::{FusedFuture, FutureExt},
|
||||
AsyncRead, AsyncWrite, Future,
|
||||
};
|
||||
use serio::codec::Bincode;
|
||||
use std::future::IntoFuture;
|
||||
use uid_mux::{yamux, FramedMux};
|
||||
|
||||
/// Multiplexer supporting unique deterministic stream IDs.
|
||||
pub type Mux<Io> = yamux::Yamux<Io>;
|
||||
/// Multiplexer controller providing streams with a codec attached.
|
||||
pub type MuxControl = FramedMux<yamux::YamuxCtrl, Bincode>;
|
||||
|
||||
/// Multiplexer future which must be polled for the muxer to make progress.
|
||||
pub struct MuxFuture(
|
||||
Box<dyn FusedFuture<Output = Result<(), yamux::ConnectionError>> + Send + Unpin>,
|
||||
);
|
||||
|
||||
impl MuxFuture {
|
||||
/// Returns true if the muxer is complete.
|
||||
pub fn is_complete(&self) -> bool {
|
||||
self.0.is_terminated()
|
||||
}
|
||||
|
||||
/// Awaits a future, polling the muxer future concurrently.
|
||||
pub async fn poll_with<F, R>(&mut self, fut: F) -> R
|
||||
where
|
||||
F: Future<Output = R>,
|
||||
{
|
||||
let mut fut = Box::pin(fut.fuse());
|
||||
// Poll the future concurrently with the muxer future.
|
||||
// If the muxer returns an error, continue polling the future
|
||||
// until it completes.
|
||||
loop {
|
||||
futures::select! {
|
||||
res = fut => return res,
|
||||
res = &mut self.0 => if let Err(e) = res {
|
||||
eprintln!("mux error: {:?}", e);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for MuxFuture {
|
||||
type Output = Result<(), yamux::ConnectionError>;
|
||||
|
||||
fn poll(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
self.0.as_mut().poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Attaches a multiplexer to the provided socket.
|
||||
///
|
||||
/// Returns the multiplexer and a controller for creating streams with a codec attached.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `socket` - The socket to attach the multiplexer to.
|
||||
/// * `role` - The role of the party using the multiplexer.
|
||||
pub fn attach_mux<T: AsyncWrite + AsyncRead + Send + Unpin + 'static>(
|
||||
socket: T,
|
||||
role: Role,
|
||||
) -> (MuxFuture, MuxControl) {
|
||||
let mut mux_config = yamux::Config::default();
|
||||
mux_config.set_max_num_streams(64);
|
||||
|
||||
let mux_role = match role {
|
||||
Role::Alice => yamux::Mode::Client,
|
||||
Role::Bob => yamux::Mode::Server,
|
||||
};
|
||||
|
||||
let mux = Mux::new(socket, mux_config, mux_role);
|
||||
let ctrl = FramedMux::new(mux.control(), Bincode);
|
||||
|
||||
if let Role::Alice = role {
|
||||
ctrl.mux().alloc(64);
|
||||
}
|
||||
|
||||
(MuxFuture(Box::new(mux.into_future().fuse())), ctrl)
|
||||
}
|
||||
Reference in New Issue
Block a user