Async backend general (#206)

* Added rayon backend for OTSender

* Added new SpawnRayon trait

* Remove &self reference

* Typo

* Improved implementation and added a test

* Undo changes in mpc

* Backend is now a global static ZST

- OT setup now uses backend

* Replaced old RayonBackend for Garbling with new Backend

* Move tokio to dev-dependencies

* Added feedback

* Added correct feature flags to dev-dependency tokio in
actor-share-conversion

* WIP: Repairing rebase...
This commit is contained in:
th4s
2023-03-24 11:28:55 +00:00
committed by GitHub
parent 66da186abc
commit 617a4f3d14
4 changed files with 65 additions and 0 deletions

View File

@@ -14,3 +14,4 @@ async-tungstenite = "0.16"
prost-build = "0.9"
bytes = "1"
async-std = "1"
rayon = "1"

View File

@@ -23,6 +23,7 @@ futures-util.workspace = true
async-trait.workspace = true
thiserror.workspace = true
async-std.workspace = true
rayon.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = [

View File

@@ -7,6 +7,7 @@ pub mod expect_msg;
pub mod factory;
#[cfg(feature = "mux")]
pub mod mux;
pub mod non_blocking_backend;
pub trait Channel<T>: futures::Stream<Item = T> + futures::Sink<T> + Send + Unpin {}

View File

@@ -0,0 +1,62 @@
use async_trait::async_trait;
use futures::channel::oneshot;
pub type Backend = RayonBackend;
/// Allows to spawn a closure on a thread outside of the async runtime
///
/// This allows to perform CPU-intensive tasks without blocking the runtime.
#[async_trait]
pub trait NonBlockingBackend {
/// Spawn the closure in a separate thread and await the result
async fn spawn<
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: std::error::Error + From<oneshot::Canceled> + Send + 'static,
>(
closure: F,
) -> Result<T, E>;
}
/// A CPU backend that uses Rayon
pub struct RayonBackend;
#[async_trait]
impl NonBlockingBackend for RayonBackend {
async fn spawn<
F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: std::error::Error + From<oneshot::Canceled> + Send + 'static,
>(
closure: F,
) -> Result<T, E> {
let (sender, receiver) = oneshot::channel();
rayon::spawn(move || {
_ = sender.send(closure());
});
receiver.await?
}
}
#[cfg(test)]
mod tests {
use super::{Backend, NonBlockingBackend};
use futures::channel::oneshot::Canceled;
#[tokio::test]
async fn test_spawn() {
let sum = Backend::spawn(compute_sum).await.unwrap();
assert_eq!(sum, 4950);
}
#[derive(thiserror::Error, Debug)]
enum TestError {
#[error("")]
Foo(#[from] Canceled),
}
fn compute_sum() -> Result<u32, TestError> {
Ok((0..100).sum())
}
}