mirror of
https://github.com/tlsnotary/tlsn-utils.git
synced 2026-01-07 20:03:55 -05:00
feat(web-spawn): first version (#50)
* feat(web-spawn): first version * Update web-spawn/Cargo.toml Co-authored-by: Hendrik Eeckhaut <hendrik@eeckhaut.org> --------- Co-authored-by: Hendrik Eeckhaut <hendrik@eeckhaut.org>
This commit is contained in:
@@ -6,7 +6,8 @@ members = [
|
||||
"utils",
|
||||
"utils-aio",
|
||||
"utils/fuzz",
|
||||
"websocket-relay"
|
||||
"websocket-relay",
|
||||
"web-spawn",
|
||||
]
|
||||
|
||||
[workspace.dependencies]
|
||||
@@ -39,4 +40,4 @@ tokio = "1.23"
|
||||
tokio-serde = "0.8"
|
||||
tokio-util = "0.7"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
2
rustfmt.toml
Normal file
2
rustfmt.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
imports_granularity = "Crate"
|
||||
wrap_comments = true
|
||||
8
web-spawn/.cargo/config.toml
Normal file
8
web-spawn/.cargo/config.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[build]
|
||||
target = "wasm32-unknown-unknown"
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")']
|
||||
rustflags = ["-C", "target-feature=+atomics,+bulk-memory,+mutable-globals"]
|
||||
|
||||
[unstable]
|
||||
build-std = ["panic_abort", "std"]
|
||||
7
web-spawn/CHANGELOG.md
Normal file
7
web-spawn/CHANGELOG.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Changelog
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
23
web-spawn/Cargo.toml
Normal file
23
web-spawn/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "web-spawn"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
description = "`std` spawn replacement for WASM in the browser."
|
||||
repository = "https://github.com/tlsnotary/tlsn-utils"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
wasm-bindgen = { version = "0.2" }
|
||||
wasm-bindgen-futures = { version = "0.4" }
|
||||
crossbeam-channel = { version = "0.5" }
|
||||
futures = { version = "0.3" }
|
||||
js-sys = { version = "0.3" }
|
||||
web-sys = { version = "0.3", features = [
|
||||
"WorkerOptions",
|
||||
"WorkerType",
|
||||
"Blob",
|
||||
"Url",
|
||||
] }
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
|
||||
wasm-bindgen-test = { version = "0.3" }
|
||||
45
web-spawn/README.md
Normal file
45
web-spawn/README.md
Normal file
@@ -0,0 +1,45 @@
|
||||
# web-spawn
|
||||
|
||||
[](https://crates.io/crates/web-spawn)
|
||||
[](https://docs.rs/web-spawn)
|
||||
|
||||
This crate provides a `std::thread` shim for WASM builds targeting web browsers.
|
||||
|
||||
It borrows from and is heavily inspired by both [`wasm-bindgen-rayon`](https://crates.io/crates/wasm-bindgen-rayon) and [`wasm_thread`](https://crates.io/crates/wasm_thread) but makes a couple different design choices.
|
||||
|
||||
Most notably, spawning is explicitly delegated to run in a background task and must be initialized at the start of the program. This task can either run on the main browser thread, or be moved to a dedicated worker to avoid potential interference from other loads.
|
||||
|
||||
# Usage
|
||||
|
||||
Add `web-spawn` as a dependency in your `Cargo.toml`:
|
||||
|
||||
```toml
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
web-spawn = { version = "0.1" }
|
||||
```
|
||||
|
||||
Then **you must ensure that spawning is initialized**. One way to do this is to re-export the following function:
|
||||
|
||||
```rust,ignore
|
||||
pub use web_spawn::start_spawner;
|
||||
```
|
||||
|
||||
On the javascript side this can be awaited:
|
||||
|
||||
```javascript
|
||||
import init, { startSpawner } from /* your package */;
|
||||
|
||||
await init();
|
||||
|
||||
// Runs the spawner on a dedicated web worker.
|
||||
await startSpawner();
|
||||
```
|
||||
|
||||
Now, in the rest of your Rust code you can conditionally use `web-spawn` anywhere you would otherwise use `std::thread::spawn`:
|
||||
|
||||
```rust,ignore
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use web_spawn as thread;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::thread;
|
||||
```
|
||||
7
web-spawn/run_tests.sh
Executable file
7
web-spawn/run_tests.sh
Executable file
@@ -0,0 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Test from main browser thread
|
||||
WASM_BINDGEN_USE_BROWSER=1 wasm-pack test --headless --chrome --firefox
|
||||
|
||||
# Test from worker thread
|
||||
WASM_BINDGEN_USE_DEDICATED_WORKER=1 wasm-pack test --headless --chrome --firefox
|
||||
2
web-spawn/rust-toolchain
Normal file
2
web-spawn/rust-toolchain
Normal file
@@ -0,0 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "nightly"
|
||||
11
web-spawn/src/js/script_path.js
Normal file
11
web-spawn/src/js/script_path.js
Normal file
@@ -0,0 +1,11 @@
|
||||
/// Extracts current script file path from artificially generated stack trace
|
||||
function script_path() {
|
||||
try {
|
||||
throw new Error();
|
||||
} catch (e) {
|
||||
let parts = e.stack.match(/(?:\(|@)(\S+):\d+:\d+/);
|
||||
return parts[1];
|
||||
}
|
||||
}
|
||||
|
||||
script_path()
|
||||
23
web-spawn/src/js/spawner.js
Normal file
23
web-spawn/src/js/spawner.js
Normal file
@@ -0,0 +1,23 @@
|
||||
import init, { web_spawn_recover_spawner } from "WASM_BINDGEN_SHIM_URL";
|
||||
|
||||
console.log('spawner spawned');
|
||||
|
||||
self.onmessage = event => {
|
||||
const [module_or_path, memory, spawner] = event.data;
|
||||
init({ module_or_path, memory })
|
||||
.catch(err => {
|
||||
console.error(err);
|
||||
// Propagate to main `onerror`:
|
||||
setTimeout(() => {
|
||||
throw err;
|
||||
});
|
||||
throw err;
|
||||
})
|
||||
.then(async () => {
|
||||
self.postMessage('ready');
|
||||
|
||||
await web_spawn_recover_spawner(spawner).run();
|
||||
|
||||
close();
|
||||
});
|
||||
};
|
||||
21
web-spawn/src/js/worker.js
Normal file
21
web-spawn/src/js/worker.js
Normal file
@@ -0,0 +1,21 @@
|
||||
import init, { web_spawn_start_worker } from "WASM_BINDGEN_SHIM_URL";
|
||||
|
||||
self.onmessage = event => {
|
||||
const [module_or_path, memory, worker] = event.data;
|
||||
init({ module_or_path, memory })
|
||||
.catch(err => {
|
||||
console.error(err);
|
||||
// Propagate to main `onerror`:
|
||||
setTimeout(() => {
|
||||
throw err;
|
||||
});
|
||||
throw err;
|
||||
})
|
||||
.then(() => {
|
||||
self.postMessage('ready');
|
||||
|
||||
web_spawn_start_worker(worker);
|
||||
|
||||
close();
|
||||
});
|
||||
};
|
||||
7
web-spawn/src/lib.rs
Normal file
7
web-spawn/src/lib.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
#![doc = include_str!("../README.md")]
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
mod wasm;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub use wasm::*;
|
||||
63
web-spawn/src/wasm.rs
Normal file
63
web-spawn/src/wasm.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
#[cfg(all(target_arch = "wasm32", not(doc), not(target_feature = "atomics")))]
|
||||
compile_error!("web-spawn requires the `atomics` and `bulk-memory` features to be enabled");
|
||||
|
||||
mod spawner;
|
||||
mod thread;
|
||||
pub(crate) mod utils;
|
||||
mod worker;
|
||||
|
||||
pub use spawner::Spawner;
|
||||
pub use thread::Builder;
|
||||
|
||||
use std::{any::Any, sync::OnceLock};
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
use futures::channel::mpsc::UnboundedSender;
|
||||
use js_sys::Promise;
|
||||
use wasm_bindgen::prelude::*;
|
||||
|
||||
pub(crate) type Closure = dyn FnOnce() + Send;
|
||||
|
||||
/// Global sender channel for spawning threads.
|
||||
pub(crate) static SENDER: OnceLock<UnboundedSender<(Builder, Box<Closure>)>> = OnceLock::new();
|
||||
|
||||
/// Initializes the thread spawner.
|
||||
#[wasm_bindgen(js_name = initSpawner)]
|
||||
pub fn init_spawner() -> Spawner {
|
||||
Spawner::new()
|
||||
}
|
||||
|
||||
/// Starts the thread spawner on a dedicated worker thread.
|
||||
#[wasm_bindgen(js_name = startSpawner)]
|
||||
pub fn start_spawner() -> Promise {
|
||||
Spawner::new().spawn()
|
||||
}
|
||||
|
||||
/// Spawns a closure onto a new thread.
|
||||
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
Builder::new()
|
||||
.spawn(f)
|
||||
.expect("spawner should be initialized")
|
||||
}
|
||||
|
||||
/// A join handle for a spawned thread.
|
||||
#[derive(Debug)]
|
||||
pub struct JoinHandle<T>(Receiver<std::thread::Result<T>>);
|
||||
|
||||
impl<T> JoinHandle<T> {
|
||||
/// Returns `true` if the thread has finished.
|
||||
pub fn is_finished(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
/// Waits for the thread to finish and returns the result.
|
||||
pub fn join(self) -> std::thread::Result<T> {
|
||||
self.0
|
||||
.recv()
|
||||
.map_err(|_| Box::new("worker thread dropped return channel") as Box<dyn Any + Send>)?
|
||||
}
|
||||
}
|
||||
83
web-spawn/src/wasm/spawner.rs
Normal file
83
web-spawn/src/wasm/spawner.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use futures::{
|
||||
StreamExt,
|
||||
channel::mpsc::{UnboundedReceiver, unbounded},
|
||||
};
|
||||
use js_sys::Promise;
|
||||
use wasm_bindgen::prelude::*;
|
||||
|
||||
use crate::wasm::{
|
||||
Closure, SENDER,
|
||||
thread::Builder,
|
||||
utils::{callback, encode_script, get_shim_url},
|
||||
worker::WorkerData,
|
||||
};
|
||||
|
||||
/// Global spawner which spawns closures into web workers.
|
||||
#[wasm_bindgen]
|
||||
pub struct Spawner {
|
||||
shim_url: String,
|
||||
worker_url: String,
|
||||
receiver: UnboundedReceiver<(Builder, Box<Closure>)>,
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
impl Spawner {
|
||||
/// Creates a new spawner.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the spawner is already initialized.
|
||||
pub(crate) fn new() -> Self {
|
||||
let (sender, receiver) = unbounded();
|
||||
|
||||
if let Err(_) = SENDER.set(sender) {
|
||||
panic!("spawner already initialized");
|
||||
}
|
||||
|
||||
let shim_url = get_shim_url();
|
||||
let worker_url = encode_script(&shim_url, include_str!("../js/worker.js"));
|
||||
|
||||
Self {
|
||||
shim_url,
|
||||
worker_url,
|
||||
receiver,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns the spawner into a dedicated web worker.
|
||||
pub fn spawn(self) -> Promise {
|
||||
let options = web_sys::WorkerOptions::new();
|
||||
options.set_type(web_sys::WorkerType::Module);
|
||||
options.set_name("web_spawn_spawner");
|
||||
|
||||
let script_url = encode_script(&self.shim_url, include_str!("../js/spawner.js"));
|
||||
let worker = web_sys::Worker::new_with_options(&script_url, &options).unwrap_throw();
|
||||
|
||||
let data = js_sys::Array::new();
|
||||
data.push(&wasm_bindgen::module());
|
||||
data.push(&wasm_bindgen::memory());
|
||||
data.push(&JsValue::from(Box::into_raw(Box::new(self))));
|
||||
|
||||
worker.post_message(&data).unwrap_throw();
|
||||
|
||||
callback(&worker)
|
||||
}
|
||||
|
||||
/// Runs the spawner.
|
||||
pub async fn run(mut self) {
|
||||
// Spawn a new worker for every closure.
|
||||
while let Some((builder, f)) = self.receiver.next().await {
|
||||
WorkerData::new(f).spawn(builder, &self.worker_url);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
#[doc(hidden)]
|
||||
pub fn web_spawn_recover_spawner(spawner: *mut Spawner) -> Spawner {
|
||||
// # Safety
|
||||
// This is safe because we know the spawner was allocated on the heap with
|
||||
// `Box`. Afterwhich, it was converted to a raw pointer using
|
||||
// `Box::into_raw` which prevents it from being deallocated.
|
||||
unsafe { *Box::from_raw(spawner) }
|
||||
}
|
||||
53
web-spawn/src/wasm/thread.rs
Normal file
53
web-spawn/src/wasm/thread.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use std::panic::{AssertUnwindSafe, catch_unwind};
|
||||
|
||||
use crossbeam_channel::bounded;
|
||||
|
||||
use crate::wasm::{JoinHandle, SENDER};
|
||||
|
||||
/// Builder for a thread.
|
||||
pub struct Builder {
|
||||
pub(crate) name: Option<String>,
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
/// Creates a new thread builder.
|
||||
pub fn new() -> Self {
|
||||
Self { name: None }
|
||||
}
|
||||
|
||||
/// Names the thread-to-be.
|
||||
pub fn name(mut self, name: String) -> Self {
|
||||
self.name = Some(name);
|
||||
self
|
||||
}
|
||||
|
||||
/// Spawns a new thread, running the provided function.
|
||||
pub fn spawn<F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
|
||||
where
|
||||
F: FnOnce() -> T + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let (sender, receiver) = bounded(1);
|
||||
|
||||
let f = move || {
|
||||
let result = catch_unwind(AssertUnwindSafe(f));
|
||||
// Ignore if the join handle is dropped.
|
||||
let _ = sender.send(result);
|
||||
};
|
||||
|
||||
SENDER
|
||||
.get()
|
||||
.ok_or_else(|| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"spawner has not been initialized",
|
||||
)
|
||||
})?
|
||||
.unbounded_send((self, Box::new(f)))
|
||||
.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "spawner has been stopped")
|
||||
})?;
|
||||
|
||||
Ok(JoinHandle(receiver))
|
||||
}
|
||||
}
|
||||
46
web-spawn/src/wasm/utils.rs
Normal file
46
web-spawn/src/wasm/utils.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use js_sys::Promise;
|
||||
use wasm_bindgen::prelude::*;
|
||||
use web_sys::{Blob, MessageEvent, Url, Worker};
|
||||
|
||||
/// Returns the URL for the wasm bindgen shim.
|
||||
pub(crate) fn get_shim_url() -> String {
|
||||
js_sys::eval(include_str!("../js/script_path.js"))
|
||||
.unwrap_throw()
|
||||
.as_string()
|
||||
.unwrap_throw()
|
||||
}
|
||||
|
||||
/// Generates worker script as URL encoded blob
|
||||
pub(crate) fn encode_script(wasm_bindgen_shim_url: &str, template: &str) -> String {
|
||||
let script = template.replace("WASM_BINDGEN_SHIM_URL", &wasm_bindgen_shim_url);
|
||||
|
||||
// Create url encoded blob
|
||||
let arr = js_sys::Array::new();
|
||||
arr.set(0, JsValue::from_str(&script));
|
||||
let blob = Blob::new_with_str_sequence(&arr).unwrap();
|
||||
let url = Url::create_object_url_with_blob(
|
||||
&blob
|
||||
.slice_with_f64_and_f64_and_content_type(0.0, blob.size(), "text/javascript")
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
url
|
||||
}
|
||||
|
||||
pub(crate) fn callback(worker: &Worker) -> Promise {
|
||||
Promise::new(&mut |resolve, _reject| {
|
||||
// Create a one-time closure that resolves the promise when a message is
|
||||
// received.
|
||||
let callback = Closure::once(move |event: MessageEvent| {
|
||||
// Resolve the promise with the event's data.
|
||||
resolve.call1(&JsValue::NULL, &event.data()).unwrap();
|
||||
});
|
||||
|
||||
// Attach the callback to the worker's onmessage event.
|
||||
worker.set_onmessage(Some(callback.as_ref().unchecked_ref()));
|
||||
|
||||
// Ensure the callback isn't dropped prematurely.
|
||||
callback.forget();
|
||||
})
|
||||
}
|
||||
45
web-spawn/src/wasm/worker.rs
Normal file
45
web-spawn/src/wasm/worker.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use wasm_bindgen::prelude::*;
|
||||
|
||||
use crate::wasm::{Closure, thread::Builder};
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub struct WorkerData {
|
||||
f: Box<Closure>,
|
||||
}
|
||||
|
||||
impl WorkerData {
|
||||
pub(crate) fn new(f: Box<Closure>) -> Self {
|
||||
WorkerData { f }
|
||||
}
|
||||
|
||||
/// Spawns this worker in a new web worker.
|
||||
pub(crate) fn spawn(self, builder: Builder, script_url: &str) {
|
||||
let options = web_sys::WorkerOptions::new();
|
||||
options.set_type(web_sys::WorkerType::Module);
|
||||
|
||||
if let Some(name) = builder.name {
|
||||
options.set_name(&name);
|
||||
}
|
||||
|
||||
let worker = web_sys::Worker::new_with_options(script_url, &options).unwrap_throw();
|
||||
|
||||
let data = js_sys::Array::new();
|
||||
data.push(&wasm_bindgen::module());
|
||||
data.push(&wasm_bindgen::memory());
|
||||
data.push(&JsValue::from(Box::into_raw(Box::new(self))));
|
||||
|
||||
worker.post_message(&data).unwrap_throw();
|
||||
}
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
#[doc(hidden)]
|
||||
pub fn web_spawn_start_worker(worker: *mut WorkerData) {
|
||||
// # Safety
|
||||
// This is safe because we know the worker was allocated on the heap with
|
||||
// `Box`. Afterwhich, it was converted to a raw pointer using
|
||||
// `Box::into_raw` which prevents it from being deallocated.
|
||||
let WorkerData { f } = unsafe { *Box::from_raw(worker) };
|
||||
|
||||
f();
|
||||
}
|
||||
45
web-spawn/tests/test.rs
Normal file
45
web-spawn/tests/test.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
#![cfg(target_arch = "wasm32")]
|
||||
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use wasm_bindgen_futures::JsFuture;
|
||||
use wasm_bindgen_test::*;
|
||||
use web_spawn::{spawn, start_spawner};
|
||||
|
||||
static INIT: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
async fn init() {
|
||||
// If it is set return immediately.
|
||||
if INIT.swap(true, std::sync::atomic::Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
JsFuture::from(start_spawner()).await.unwrap();
|
||||
}
|
||||
|
||||
#[wasm_bindgen_test]
|
||||
async fn test_pass() {
|
||||
init().await;
|
||||
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
spawn(|| sender.send(42).unwrap());
|
||||
let value = receiver.await.unwrap();
|
||||
|
||||
assert_eq!(value, 42);
|
||||
}
|
||||
|
||||
#[wasm_bindgen_test]
|
||||
async fn test_join() {
|
||||
init().await;
|
||||
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
// Blocking join only works on spawned threads.
|
||||
spawn(|| {
|
||||
let handle = spawn(|| 42);
|
||||
assert_eq!(handle.join().unwrap(), 42);
|
||||
sender.send(()).unwrap();
|
||||
});
|
||||
|
||||
receiver.await.unwrap();
|
||||
}
|
||||
Reference in New Issue
Block a user