mirror of
https://github.com/tlsnotary/ws_stream_wasm.git
synced 2026-01-09 21:17:56 -05:00
Split wasm into separate crate
This commit is contained in:
2
.cargo/config
Normal file
2
.cargo/config
Normal file
@@ -0,0 +1,2 @@
|
||||
[target.wasm32-unknown-unknown]
|
||||
runner = 'wasm-bindgen-test-runner'
|
||||
43
.travis.yml
Normal file
43
.travis.yml
Normal file
@@ -0,0 +1,43 @@
|
||||
language: rust
|
||||
rust: nightly
|
||||
|
||||
addons:
|
||||
firefox: latest
|
||||
chrome: stable
|
||||
|
||||
install:
|
||||
- rustup target add wasm32-unknown-unknown
|
||||
|
||||
# Downloads a `wasm-bindgen` release binary from https://github.com/rustwasm/wasm-bindgen/releases.
|
||||
# - curl -OL https://github.com/rustwasm/wasm-bindgen/releases/download/0.2.21/wasm-bindgen-0.2.21-x86_64-unknown-linux-musl.tar.gz
|
||||
# - tar xf wasm-bindgen-0.2.21-x86_64-unknown-linux-musl.tar.gz
|
||||
# - chmod +x wasm-bindgen-0.2.21-x86_64-unknown-linux-musl/wasm-bindgen
|
||||
|
||||
# Alternatively, use `wasm-pack` to manage `wasm-bindgen` binaries for you
|
||||
- curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
|
||||
# Moves the binaries to a directory that is in your PATH
|
||||
# - mv wasm-bindgen-0.2.21-x86_64-unknown-linux-musl/wasm-bindgen* ~/.cargo/bin
|
||||
# Install node.js with nvm.
|
||||
- curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.8/install.sh | bash
|
||||
- source ~/.nvm/nvm.sh
|
||||
- nvm install v10.5
|
||||
# Install chromedriver.
|
||||
- curl --retry 5 -LO https://chromedriver.storage.googleapis.com/2.41/chromedriver_linux64.zip
|
||||
- unzip chromedriver_linux64.zip
|
||||
# Install geckodriver.
|
||||
- curl --retry 5 -LO https://github.com/mozilla/geckodriver/releases/download/v0.21.0/geckodriver-v0.21.0-linux64.tar.gz
|
||||
- tar xf geckodriver-v0.21.0-linux64.tar.gz
|
||||
|
||||
script:
|
||||
|
||||
# turn on server
|
||||
- cd server
|
||||
- cargo run > /dev/null 2>&1 &
|
||||
- cd ../
|
||||
|
||||
- wasm-pack test --firefox --headless
|
||||
|
||||
# Test in Chrome.
|
||||
# - CHROMEDRIVER=$(pwd)/chromedriver cargo test --target wasm32-unknown-unknown
|
||||
# Test in Firefox.
|
||||
# - GECKODRIVER=$(pwd)/geckodriver cargo test --target wasm32-unknown-unknown
|
||||
66
Cargo.toml
Normal file
66
Cargo.toml
Normal file
@@ -0,0 +1,66 @@
|
||||
# Auto-generated from "Cargo.yml"
|
||||
[dependencies]
|
||||
console_error_panic_hook = "^0.1"
|
||||
console_log = "^0.1"
|
||||
failure = "^0.1"
|
||||
log = "^0.4"
|
||||
|
||||
[dependencies.futures-preview]
|
||||
features = ["io-compat", "compat"]
|
||||
version = "=0.3.0-alpha.16"
|
||||
|
||||
[dependencies.futures_01]
|
||||
package = "futures"
|
||||
version = "^0.1"
|
||||
|
||||
[dependencies.js-sys]
|
||||
version = "^0.3"
|
||||
|
||||
[dependencies.tokio]
|
||||
default-features = false
|
||||
features = ["codec"]
|
||||
version = "^0.1"
|
||||
|
||||
[dependencies.wasm-bindgen]
|
||||
version = "^0.2"
|
||||
|
||||
[dependencies.web-sys]
|
||||
features = ["BinaryType", "Blob", "console", "MessageEvent", "WebSocket"]
|
||||
version = "^0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "^0.4"
|
||||
failure = "^0.1"
|
||||
flexi_logger = "^0.11"
|
||||
pretty_assertions = "^0.6"
|
||||
rand = "^0.6"
|
||||
rand_xoshiro = "^0.1"
|
||||
serde_cbor = "0.9.0"
|
||||
tokio-serde-cbor = "0.3.1"
|
||||
wasm-bindgen-test = "^0.2"
|
||||
|
||||
[dev-dependencies.async_runtime]
|
||||
package = "naja_async_runtime"
|
||||
version = "=0.1.6"
|
||||
|
||||
[dev-dependencies.serde]
|
||||
features = ["derive"]
|
||||
version = "1.0.87"
|
||||
|
||||
[package]
|
||||
authors = ["Naja Melan <najamelan@autistici.org>"]
|
||||
categories = ["api-bindings", "wasm", "web-programming", "web-programming::websocket"]
|
||||
description = "Provide AsyncRead/AsyncWrite over WebSockets (both tokio and 0.3) in WASM"
|
||||
documentation = "https://docs.rs/ws_stream_wasm"
|
||||
edition = "2018"
|
||||
homepage = "https://github.com/najamelan/wasm_websocket_stream"
|
||||
keywords = ["wasm", "websocket", "tokio", "stream", "async"]
|
||||
license = "Unlicense"
|
||||
name = "ws_stream_wasm"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/najamelan/wasm_websocket_stream"
|
||||
version = "0.1.0"
|
||||
|
||||
[profile]
|
||||
[profile.release]
|
||||
debug = true
|
||||
61
Cargo.yml
Normal file
61
Cargo.yml
Normal file
@@ -0,0 +1,61 @@
|
||||
package:
|
||||
|
||||
name : ws_stream_wasm
|
||||
version : 0.1.0 # remember to update html_root_url in lib.rs
|
||||
edition : '2018'
|
||||
authors : [ Naja Melan <najamelan@autistici.org> ]
|
||||
description : Provide AsyncRead/AsyncWrite over WebSockets (both tokio and 0.3) in WASM
|
||||
license : Unlicense
|
||||
documentation : https://docs.rs/ws_stream_wasm
|
||||
homepage : https://github.com/najamelan/wasm_websocket_stream
|
||||
repository : https://github.com/najamelan/wasm_websocket_stream
|
||||
readme : README.md
|
||||
keywords : [ wasm, websocket, tokio, stream, async ]
|
||||
categories : [ api-bindings, wasm, web-programming, "web-programming::websocket" ]
|
||||
|
||||
|
||||
dependencies:
|
||||
|
||||
console_error_panic_hook: ^0.1
|
||||
console_log : ^0.1
|
||||
failure : ^0.1
|
||||
futures-preview : { version: =0.3.0-alpha.16, features: [io-compat, compat] }
|
||||
log : ^0.4
|
||||
futures_01 : { version: ^0.1, package: futures }
|
||||
|
||||
js-sys : { version: ^0.3 }
|
||||
wasm-bindgen : { version: ^0.2 }
|
||||
tokio : { version: ^0.1, default-features: false, features: [codec] }
|
||||
|
||||
web-sys:
|
||||
version : ^0.3
|
||||
|
||||
features:
|
||||
|
||||
- BinaryType
|
||||
- Blob
|
||||
- console
|
||||
- MessageEvent
|
||||
- WebSocket
|
||||
|
||||
dev-dependencies:
|
||||
|
||||
failure : ^0.1
|
||||
|
||||
pretty_assertions: ^0.6
|
||||
rand_xoshiro : ^0.1
|
||||
rand : ^0.6
|
||||
tokio-serde-cbor : 0.3.1
|
||||
serde_cbor : 0.9.0
|
||||
serde : { version: 1.0.87, features: [ derive ] }
|
||||
|
||||
bytes : ^0.4
|
||||
flexi_logger : ^0.11
|
||||
async_runtime : { version: =0.1.6, package: naja_async_runtime }
|
||||
# futures_codec : { path: ../../futures-codec }
|
||||
wasm-bindgen-test: ^0.2
|
||||
|
||||
|
||||
profile:
|
||||
release:
|
||||
debug: true
|
||||
24
LICENSE
Normal file
24
LICENSE
Normal file
@@ -0,0 +1,24 @@
|
||||
This is free and unencumbered software released into the public domain.
|
||||
|
||||
Anyone is free to copy, modify, publish, use, compile, sell, or
|
||||
distribute this software, either in source code form or as a compiled
|
||||
binary, for any purpose, commercial or non-commercial, and by any
|
||||
means.
|
||||
|
||||
In jurisdictions that recognize copyright laws, the author or authors
|
||||
of this software dedicate any and all copyright interest in the
|
||||
software to the public domain. We make this dedication for the benefit
|
||||
of the public at large and to the detriment of our heirs and
|
||||
successors. We intend this dedication to be an overt act of
|
||||
relinquishment in perpetuity of all present and future rights to this
|
||||
software under copyright law.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
|
||||
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
|
||||
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
For more information, please refer to <http://unlicense.org>
|
||||
41
README.md
Normal file
41
README.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# wasm_websocket_stream
|
||||
|
||||
A convenience layer for using WebSockets from WebAssembly.
|
||||
|
||||
It implements a futures Stream/Sink and tokio AsyncRead/AsyncWrite on top of the web-sys interface [WebSocket](https://docs.rs/web-sys/0.3.22/web_sys/struct.WebSocket.html). This allows you to communicate between your server and a browser wasm module transparently without worrying about the underlying protocol. You can use tokio codec to get framed messages of any type that implements [serde::Serialize](https://docs.rs/serde/1.0.89/serde/trait.Serialize.html).
|
||||
|
||||
There are basic wrapper types around the JavaScript types MessageEvent and MessageEventData but these are not feature-complete.
|
||||
This library tries to work with [async_await] wherever possible, with the exemption of WsStream because tokio is on futures 0.1. It requires a nightly compiler for now.
|
||||
|
||||
# Examples
|
||||
|
||||
For examples please run `cargo doc --open` or look at the unit tests.
|
||||
|
||||
# Tests
|
||||
|
||||
Do not forget to turn on the server:
|
||||
|
||||
```
|
||||
cd server
|
||||
cargo run
|
||||
```
|
||||
|
||||
|
||||
## TODO
|
||||
|
||||
In order to run the tests, go into the server crate and run `cargo run`. In another tab run `wasm-pack test --firefox --headless`.
|
||||
|
||||
little note on performance:
|
||||
- `wasm-pack test --firefox --headless --release`: 13.3s
|
||||
- `wasm-pack test --chrome --headless --release`: 10.4s
|
||||
|
||||
- all `FIXME` in `src/` and `test/`
|
||||
|
||||
- callback_future is not something that belongs in wasm_websocket_stream. Maybe a functionality like that can be added to wasm_bindgen? It should also be possible to do this for callbacks that need to take parameters.
|
||||
|
||||
- the sink? It's always ready to write, it's always flushed? The websocket browser api does not really give any info about the state here... Need better unit testing to verify what happens under stress I suppose.
|
||||
|
||||
- We implement AsyncRead for WsStream. This required that we implement std::io::Read, returning WouldBlock. However when calling next on the stream we get from BytesCodec, only read gets called, not poll_read. In principle tokio provides a default implementation of poll_read. It's not clear whether it would ever get called and in what circumstances. Since it is important that the current task gets woken up when a new message arrives, I have implemented poll_read so it wakes up the task. Is this necessary? It doesn't cost much to leave the implementation there now it's written.
|
||||
|
||||
- We don't have Clone or Eq on JsWebSocket or WsStream... Is that ok?
|
||||
|
||||
47
src/callback_future.rs
Normal file
47
src/callback_future.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use
|
||||
{
|
||||
crate :: { import::* },
|
||||
};
|
||||
|
||||
|
||||
/// Turn a JavaScript callback type interface into a future. The future will resolve when the callback gets called.
|
||||
/// There is currently no support for calbacks that need to take parameters.
|
||||
///
|
||||
/// This uses futures channels under the hood.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// This example shows how to set the [`on_open`](https://docs.rs/web-sys/0.3.22/web_sys/struct.WebSocket.html#method.set_onopen) callback for [web_sys::WebSocket](https://docs.rs/web-sys/0.3.22/web_sys/struct.WebSocket.html)
|
||||
///
|
||||
/// ```
|
||||
/// use wasm_websocket_stream::future_event;
|
||||
///
|
||||
/// pub async fn connect( &self )
|
||||
/// {
|
||||
/// future_event( |cb| self.ws.set_onopen( cb ) ).await;
|
||||
///
|
||||
/// trace!( "WebSocket connection opened!" );
|
||||
///
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
pub async fn future_event( setter: impl Fn( Option<&js_sys::Function> ) )
|
||||
{
|
||||
// We give the user a closure they can pass to js functions requiring a callback, and when our
|
||||
// closure gets called, the future resolves.
|
||||
//
|
||||
// The js closure wants an fn mut, so we can't use onshot here
|
||||
//
|
||||
let (onready, ready) = unbounded::<()>();
|
||||
|
||||
let on_ready = Closure::wrap( Box::new( move ||
|
||||
{
|
||||
onready.unbounded_send(()).expect_throw( "unbounded channel failed" );
|
||||
onready.close_channel();
|
||||
|
||||
}) as Box< dyn FnMut() > );
|
||||
|
||||
setter( Some( on_ready.as_ref().unchecked_ref() ));
|
||||
|
||||
ready.into_future().await;
|
||||
}
|
||||
128
src/chunk_stream.rs
Normal file
128
src/chunk_stream.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
use crate::import::*;
|
||||
|
||||
|
||||
/// Turn a stream/sink of binary messages into a stream of bytes (AsyncRead/AsyncWrite).
|
||||
//
|
||||
#[ derive( Debug, Clone ) ]
|
||||
//
|
||||
pub struct ChunkStream
|
||||
{
|
||||
queue: VecDeque<Uint8Array>,
|
||||
|
||||
// The u32 is the pointer into the first element on the queue. Since the buffer fed
|
||||
// to read is not necessarily of the same size of the messages we receive, this
|
||||
// pointer allows to keep track of the data we have already read.
|
||||
//
|
||||
already_read: u32,
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl ChunkStream
|
||||
{
|
||||
/// Create a new ChunkStream
|
||||
//
|
||||
pub fn new() -> Self
|
||||
{
|
||||
Self { queue: VecDeque::new(), already_read: 0 }
|
||||
}
|
||||
|
||||
|
||||
/// Whether the queue is empty (no data left to read)
|
||||
//
|
||||
#[ inline( always ) ]
|
||||
//
|
||||
pub fn is_empty( &self ) -> bool
|
||||
{
|
||||
self.queue.is_empty()
|
||||
}
|
||||
|
||||
|
||||
/// Add a new message to the queue
|
||||
//
|
||||
#[ inline( always ) ]
|
||||
//
|
||||
pub fn push( &mut self, msg: Uint8Array )
|
||||
{
|
||||
self.queue.push_back( msg );
|
||||
}
|
||||
|
||||
/// Try to read some bytes.
|
||||
/// FIXME? Do not call this method if the queue is empty!
|
||||
//
|
||||
// We have a queue of Uint8Arrays with incoming data, and a buf &mut [u8] to copy it to. Ideally we would like to consider
|
||||
// the queue as one long buffer, but it's not, so we keep track of:
|
||||
// - already_read: the offset in the first array on the queue until where we have already read
|
||||
// - as soon as we completely read an array, we pop it from the queue and set already_read to 0
|
||||
// - if we can't fit an entire array in the buffer, we set already_read to the correct offset and return
|
||||
//
|
||||
//
|
||||
// this method returns the number of bytes that where copied.
|
||||
//
|
||||
#[ allow( clippy::needless_return )]
|
||||
//
|
||||
pub fn read( &mut self, mut copied: u32, buf: &mut [u8] ) -> u32
|
||||
{
|
||||
|
||||
let space = buf.len() as u32 ;
|
||||
let data = self.queue[0].length() - self.already_read ;
|
||||
let to_copy = min( data, space ) ;
|
||||
let end = self.already_read + to_copy ;
|
||||
|
||||
|
||||
// Buffer is exactly the right size, just copy it over.
|
||||
//
|
||||
if data == space
|
||||
{
|
||||
if self.already_read == 0 { self.queue[0] .copy_to( &mut buf[ ..(to_copy as usize) ] ); }
|
||||
else { self.queue[0].subarray( self.already_read, end ).copy_to( &mut buf[ ..(to_copy as usize) ] ); }
|
||||
|
||||
self.pop();
|
||||
|
||||
return copied + to_copy;
|
||||
}
|
||||
|
||||
|
||||
// The first message has less data than the buffer can hold.
|
||||
// We will copy the first message entirely and then see if there are any more messges to copy.
|
||||
//
|
||||
else if data < space
|
||||
{
|
||||
if self.already_read == 0 { self.queue[0] .copy_to( &mut buf[ ..(to_copy as usize) ] ); }
|
||||
else { self.queue[0].slice( self.already_read, end ).copy_to( &mut buf[ ..(to_copy as usize) ] ); }
|
||||
|
||||
self.pop();
|
||||
copied += to_copy;
|
||||
|
||||
// Only recurse if there are more arrays waiting to be copied
|
||||
//
|
||||
if self.queue.is_empty() { return copied }
|
||||
|
||||
else
|
||||
{
|
||||
return self.read( copied, &mut buf[ (to_copy as usize).. ] )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// The first message has more data than the buffer can hold.
|
||||
//
|
||||
else // if data > space
|
||||
{
|
||||
self.queue[0].slice( self.already_read, end ).copy_to( &mut buf[ ..(to_copy as usize) ] );
|
||||
|
||||
self.already_read = end;
|
||||
|
||||
return copied + to_copy;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[ inline( always ) ]
|
||||
//
|
||||
fn pop( &mut self )
|
||||
{
|
||||
self.queue.pop_front();
|
||||
self.already_read = 0;
|
||||
}
|
||||
}
|
||||
123
src/error.rs
Normal file
123
src/error.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use crate::{ import::* };
|
||||
|
||||
|
||||
/// The error type for errors happening in `async_runtime`.
|
||||
///
|
||||
/// Use [`WsErr::kind()`] to know which kind of error happened. [WsErrKind] implements [Eq],
|
||||
/// so you can do the following if all you want to know is the kind of error:
|
||||
///
|
||||
/// ```ignore
|
||||
/// use async_runtime::*;
|
||||
///
|
||||
/// rt::init( RtConfig::Local ).expect( "Set default executor" );
|
||||
///
|
||||
/// match rt::init( RtConfig::Pool )
|
||||
/// {
|
||||
/// Err(e) =>
|
||||
/// {
|
||||
/// if let WsErrKind::DoubleExecutorInit = e.kind()
|
||||
/// {
|
||||
/// println!( "{}", e );
|
||||
/// }
|
||||
///
|
||||
/// // This also works:
|
||||
/// //
|
||||
/// match e.kind()
|
||||
/// {
|
||||
/// WsErrKind::DoubleExecutorInit => println!( "{}", e ),
|
||||
/// _ => {},
|
||||
/// }
|
||||
/// },
|
||||
///
|
||||
/// Ok(_) => {}
|
||||
/// }
|
||||
/// ```
|
||||
//
|
||||
#[ derive( Debug ) ]
|
||||
//
|
||||
pub struct WsErr
|
||||
{
|
||||
inner: FailContext<WsErrKind>,
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// The different kind of errors that can happen when you use the `async_runtime` API.
|
||||
//
|
||||
#[ derive( Clone, PartialEq, Eq, Debug, Fail ) ]
|
||||
//
|
||||
pub enum WsErrKind
|
||||
{
|
||||
/// This is an error from tokio-tungstenite.
|
||||
//
|
||||
#[ fail( display = "The WebSocket handshake failed" ) ]
|
||||
//
|
||||
WsHandshake,
|
||||
|
||||
/// Invalid input to `WsReadyState::try_from( u16 )`
|
||||
///
|
||||
#[ fail( display = "Invalid input to conversion to WsReadyState: {}", _0 ) ]
|
||||
//
|
||||
InvalidReadyState( u16 ),
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl Fail for WsErr
|
||||
{
|
||||
fn cause( &self ) -> Option< &dyn Fail >
|
||||
{
|
||||
self.inner.cause()
|
||||
}
|
||||
|
||||
fn backtrace( &self ) -> Option< &Backtrace >
|
||||
{
|
||||
self.inner.backtrace()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl fmt::Display for WsErr
|
||||
{
|
||||
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
|
||||
{
|
||||
fmt::Display::fmt( &self.inner, f )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl WsErr
|
||||
{
|
||||
/// Allows matching on the error kind
|
||||
//
|
||||
pub fn kind( &self ) -> &WsErrKind
|
||||
{
|
||||
self.inner.get_context()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WsErrKind> for WsErr
|
||||
{
|
||||
fn from( kind: WsErrKind ) -> WsErr
|
||||
{
|
||||
WsErr { inner: FailContext::new( kind ) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From< FailContext<WsErrKind> > for WsErr
|
||||
{
|
||||
fn from( inner: FailContext<WsErrKind> ) -> WsErr
|
||||
{
|
||||
WsErr { inner }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TODO: this no longer compiles. It compiles fine in thespis, but not in this crate even though this
|
||||
// file is largely copy/paste. The problem is that there is a blanket impl for Fail in failure for every
|
||||
// E: std::error::Error + 'static + Send + Sync
|
||||
//
|
||||
// impl std::error::Error for WsErr {}
|
||||
|
||||
|
||||
100
src/js_msg_event.rs
Normal file
100
src/js_msg_event.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
use
|
||||
{
|
||||
crate :: { import::* },
|
||||
};
|
||||
|
||||
|
||||
/// A wrapper around the [`web_sys::MessageEvent`](https://docs.rs/web-sys/0.3.17/web_sys/struct.MessageEvent.html) for convenience.
|
||||
/// Allows to extract data as a [JsMsgEvtData].
|
||||
///
|
||||
#[ derive( Debug, Clone ) ]
|
||||
//
|
||||
pub struct JsMsgEvent
|
||||
{
|
||||
/// The wrapped web_sys::MessageEvent if you need it
|
||||
///
|
||||
pub msg_evt: MessageEvent
|
||||
}
|
||||
|
||||
|
||||
impl JsMsgEvent
|
||||
{
|
||||
/// The data contained by the message.
|
||||
///
|
||||
pub fn data( &self ) -> JsMsgEvtData
|
||||
{
|
||||
JsMsgEvtData::from( self )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// [Data](https://docs.rs/web-sys/0.3.17/web_sys/struct.MessageEvent.html#method.data) contained in a MessageEvent. See:
|
||||
/// [Html5 specs](https://html.spec.whatwg.org/multipage/web-sockets.html#feedback-from-the-protocol)
|
||||
///
|
||||
/// Data can be a string or binary.
|
||||
///
|
||||
#[ derive( Debug, Clone, PartialEq, Eq, Hash ) ]
|
||||
//
|
||||
pub enum JsMsgEvtData
|
||||
{
|
||||
/// The data of the message is a string
|
||||
///
|
||||
Text ( String ),
|
||||
|
||||
/// The message contains binary data
|
||||
///
|
||||
Binary( Vec<u8> ),
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl From< &JsMsgEvent > for JsMsgEvtData
|
||||
{
|
||||
fn from( evt: &JsMsgEvent ) -> Self
|
||||
{
|
||||
let data = evt.msg_evt.data();
|
||||
|
||||
if data.is_instance_of::< ArrayBuffer >()
|
||||
{
|
||||
trace!( "JsWebSocket received binary message" );
|
||||
|
||||
let buf = data.dyn_into::< ArrayBuffer >().unwrap_throw();
|
||||
|
||||
let buffy = Uint8Array::new( buf.as_ref() );
|
||||
let mut v : Vec<u8> = vec![ 0; buffy.length() as usize ];
|
||||
|
||||
buffy.copy_to( &mut v ); // FIXME: get rid of this copy
|
||||
|
||||
JsMsgEvtData::Binary( v )
|
||||
}
|
||||
|
||||
|
||||
else if data.is_string()
|
||||
{
|
||||
let text = data.as_string().unwrap_throw();
|
||||
|
||||
JsMsgEvtData::Text( text )
|
||||
}
|
||||
|
||||
|
||||
// We have set the binary mode to array buffer, so normally this shouldn't happen. That is as long
|
||||
// as this is used within the context of the websocket library.
|
||||
//
|
||||
// FIXME: find a way to convert a blob...
|
||||
//
|
||||
else if data.is_instance_of::< Blob >()
|
||||
{
|
||||
error!( "JsWebSocket received a blob...unimplemented!" );
|
||||
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
|
||||
else
|
||||
{
|
||||
error!( "JsWebSocket received data that is not string, nor binary, nor blob, bailing..." );
|
||||
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
}
|
||||
346
src/js_web_socket.rs
Normal file
346
src/js_web_socket.rs
Normal file
@@ -0,0 +1,346 @@
|
||||
use
|
||||
{
|
||||
crate :: { import::*, WsErr, WsErrKind, JsMsgEvent, JsMsgEvtData, future_event },
|
||||
};
|
||||
|
||||
|
||||
/// A wrapper around [web_sys::WebSocket](https://docs.rs/web-sys/0.3.17/web_sys/struct.WebSocket.html) to make it more rust idiomatic.
|
||||
/// It does not provide any extra functionality over the wrapped WebSocket object.
|
||||
///
|
||||
/// It turns the callback based mechanisms into futures Sink and Stream. The stream yields [JsMsgEvent], which is a wrapper
|
||||
/// around [`web_sys::MessageEvent`](https://docs.rs/web-sys/0.3.17/web_sys/struct.MessageEvent.html) and the sink takes a
|
||||
/// [JsMsgEvtData] which is a wrapper around [`web_sys::MessageEvent.data()`](https://docs.rs/web-sys/0.3.17/web_sys/struct.MessageEvent.html#method.data).
|
||||
/// There is no error when the server is not running, and no timeout mechanism provided here to detect that connection
|
||||
/// never happens. The connect future will just never resolve.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// ```
|
||||
/// #![ feature( async_await, await_macro, futures_api )]
|
||||
///
|
||||
/// use
|
||||
/// {
|
||||
/// futures::prelude ::* ,
|
||||
/// wasm_bindgen::prelude ::* ,
|
||||
/// wasm_bindgen_futures ::* ,
|
||||
/// wasm_websocket_stream ::* ,
|
||||
/// log ::* ,
|
||||
/// };
|
||||
///
|
||||
/// let fut = async
|
||||
/// {
|
||||
/// let ws = JsWebSocket::new( URL ).expect_throw( "Could not create websocket" );
|
||||
///
|
||||
/// ws.connect().await;
|
||||
///
|
||||
/// let (mut tx, mut rx) = ws.split();
|
||||
/// let message = "Hello from browser".to_string();
|
||||
///
|
||||
///
|
||||
/// tx.send( JsMsgEvtData::Text( message.clone() )).await
|
||||
///
|
||||
/// .expect_throw( "Failed to write to websocket" );
|
||||
///
|
||||
///
|
||||
/// let msg = rx.next().await;
|
||||
/// let result = &msg.expect_throw( "Stream closed" );
|
||||
///
|
||||
/// assert_eq!( JsMsgEvtData::Text( message ), result.data() );
|
||||
///
|
||||
/// Ok(())
|
||||
///
|
||||
/// }.boxed().compat();
|
||||
///
|
||||
/// spawn_local( fut );
|
||||
/// ```
|
||||
///
|
||||
#[ allow( dead_code ) ] // we keep the closure to keep it form being dropped
|
||||
//
|
||||
pub struct JsWebSocket
|
||||
{
|
||||
ws : WebSocket ,
|
||||
on_mesg: Closure< dyn FnMut( MessageEvent ) + 'static > ,
|
||||
queue : Rc<RefCell< VecDeque<JsMsgEvent> >> ,
|
||||
task : Rc<RefCell< Option<task::Task> >> ,
|
||||
}
|
||||
|
||||
|
||||
impl JsWebSocket
|
||||
{
|
||||
/// Create a new JsWebSocket. Can fail if there is a
|
||||
/// [security error](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#Exceptions_thrown).
|
||||
///
|
||||
pub fn new< T: AsRef<str> >( url: T ) -> Result< Self, JsValue >
|
||||
{
|
||||
let task: Rc<RefCell<Option<task::Task>>> = Rc::new( RefCell::new( None ) );
|
||||
|
||||
let queue = Rc::new( RefCell::new( VecDeque::new() ) );
|
||||
let q2 = queue.clone();
|
||||
let t2 = task .clone();
|
||||
let ws = WebSocket::new( url.as_ref() )?;
|
||||
|
||||
|
||||
// Send the incoming ws messages to the WsStream object
|
||||
//
|
||||
let on_mesg = Closure::wrap( Box::new( move |msg_evt: MessageEvent|
|
||||
{
|
||||
trace!( "WsStream: message received!" );
|
||||
|
||||
#[ cfg( debug_assertions )]
|
||||
//
|
||||
dbg( &msg_evt );
|
||||
|
||||
q2.borrow_mut().push_back( JsMsgEvent{ msg_evt } );
|
||||
|
||||
if let Some( ref t ) = *t2.borrow()
|
||||
{
|
||||
trace!( "WsStream: waking up task" );
|
||||
t.notify()
|
||||
}
|
||||
|
||||
}) as Box< dyn FnMut( MessageEvent ) > );
|
||||
|
||||
|
||||
// Install callback
|
||||
//
|
||||
ws.set_onmessage ( Some( on_mesg.as_ref().unchecked_ref() ) );
|
||||
ws.set_binary_type( BinaryType::Arraybuffer );
|
||||
|
||||
|
||||
Ok( Self
|
||||
{
|
||||
ws ,
|
||||
queue ,
|
||||
on_mesg ,
|
||||
task ,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Create a new JsWebSocket with the callback for received messages. Can fail if there is a
|
||||
/// [security error](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#Exceptions_thrown).
|
||||
///
|
||||
pub fn with_on_message< T: AsRef<str> >( url: T, onmesg: Box< dyn FnMut( MessageEvent ) > ) -> Result< Self, JsValue >
|
||||
{
|
||||
// Internal note, in this case self.queue and self.task wont be used.
|
||||
//
|
||||
let task: Rc<RefCell<Option<task::Task>>> = Rc::new( RefCell::new( None ) );
|
||||
|
||||
let queue = Rc::new( RefCell::new( VecDeque::new() ) );
|
||||
let ws = WebSocket::new( url.as_ref() )?;
|
||||
|
||||
let on_mesg = Closure::wrap( onmesg );
|
||||
|
||||
// Install callback
|
||||
//
|
||||
ws.set_onmessage ( Some( on_mesg.as_ref().unchecked_ref() ) );
|
||||
ws.set_binary_type( BinaryType::Arraybuffer );
|
||||
|
||||
Ok( Self
|
||||
{
|
||||
ws ,
|
||||
queue ,
|
||||
on_mesg ,
|
||||
task ,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Connect to the server. The future will resolve when the connection has been established. There is currently
|
||||
/// no timeout mechanism here in case of failure. You should implement that yourself.
|
||||
///
|
||||
pub async fn connect( &self )
|
||||
{
|
||||
// FIXME: Can we just pass the function without having to run a closure here? When I tried this didn't work, because
|
||||
// rustc complained: `Attempted to take value of method 'set_onopen' on type 'web_sys::WebSocket'`
|
||||
//
|
||||
future_event( |cb| self.ws.set_onopen( cb ) ).await;
|
||||
|
||||
trace!( "WebSocket connection opened!" );
|
||||
|
||||
}
|
||||
|
||||
|
||||
/// Close the socket. The future will resolve once the socket's state has become `WsReadyState::CLOSED`.
|
||||
///
|
||||
pub async fn close( &self )
|
||||
{
|
||||
// This can not throw normally, because the only errors the api
|
||||
// can return is if we use a code or a reason string, which we don't.
|
||||
// See [mdn](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#Exceptions_thrown).
|
||||
//
|
||||
self.ws.close().unwrap_throw();
|
||||
|
||||
future_event( |cb| self.ws.set_onclose( cb ) ).await;
|
||||
|
||||
trace!( "WebSocket connection closed!" );
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Verify the [WsReadyState] of the connection.
|
||||
///
|
||||
pub fn ready_state( &self ) -> WsReadyState
|
||||
{
|
||||
self.ws.ready_state().try_into().map_err( |e| error!( "{}", e ) ).unwrap_throw()
|
||||
}
|
||||
|
||||
|
||||
/// Access the wrapped [web_sys::WebSocket](https://docs.rs/web-sys/0.3.17/web_sys/struct.WebSocket.html).
|
||||
///
|
||||
pub fn wrapped( &self ) -> &WebSocket
|
||||
{
|
||||
&self.ws
|
||||
}
|
||||
|
||||
|
||||
/// Retrieve the address to which this socket is connected.
|
||||
///
|
||||
pub fn url( &self ) -> String
|
||||
{
|
||||
self.ws.url()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl fmt::Debug for JsWebSocket
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
|
||||
{
|
||||
write!( f, "JsWebSocket connected to: {}", self.url() )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl fmt::Display for JsWebSocket
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
|
||||
{
|
||||
write!( f, "JsWebSocket connected to: {}", self.url() )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl Stream for JsWebSocket
|
||||
{
|
||||
type Item = JsMsgEvent;
|
||||
|
||||
// Forward the call to the channel on which we are listening.
|
||||
//
|
||||
// Currently requires an unfortunate copy from Js memory to Wasm memory. Hopefully one
|
||||
// day we will be able to receive the JsMsgEvent directly in Wasm.
|
||||
//
|
||||
fn poll_next( mut self: Pin<&mut Self>, _: &mut std::task::Context ) -> Poll<Option< Self::Item >>
|
||||
{
|
||||
trace!( "JsWebSocket as Stream gets polled" );
|
||||
|
||||
if self.queue.borrow().is_empty()
|
||||
{
|
||||
*self.task.borrow_mut() = Some( task::current() );
|
||||
|
||||
match self.ready_state()
|
||||
{
|
||||
WsReadyState::OPEN => Poll::Pending ,
|
||||
WsReadyState::CONNECTING => Poll::Pending ,
|
||||
_ => Poll::Ready ( None ),
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
else { Poll::Ready( self.queue.borrow_mut().pop_front() ) }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
impl Sink<JsMsgEvtData> for JsWebSocket
|
||||
{
|
||||
type SinkError = JsValue;
|
||||
|
||||
|
||||
fn poll_ready( self: Pin<&mut Self>, _: &mut std::task::Context ) -> Poll<Result<(), Self::SinkError>>
|
||||
{
|
||||
trace!( "Sink: Websocket ready to poll" );
|
||||
|
||||
Poll::Ready( Ok(()) )
|
||||
}
|
||||
|
||||
|
||||
fn start_send( self: Pin<&mut Self>, item: JsMsgEvtData ) -> Result<(), Self::SinkError>
|
||||
{
|
||||
trace!( "Sink: start_send" );
|
||||
|
||||
match item
|
||||
{
|
||||
JsMsgEvtData::Binary( mut d ) => self.ws.send_with_u8_array( &mut d ),
|
||||
JsMsgEvtData::Text ( s ) => self.ws.send_with_str ( & s ),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn poll_flush( self: Pin<&mut Self>, _: &mut std::task::Context ) -> Poll<Result<(), Self::SinkError>>
|
||||
{
|
||||
trace!( "Sink: poll_flush" );
|
||||
|
||||
Poll::Ready( Ok(()) )
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn poll_close( self: Pin<&mut Self>, _: &mut std::task::Context ) -> Poll<Result<(), Self::SinkError>>
|
||||
{
|
||||
trace!( "Sink: poll_close" );
|
||||
|
||||
Poll::Ready( self.ws.close() )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/// Indicates the state of a Websocket connection. The only state in which it's valid to send and receive messages
|
||||
/// is OPEN.
|
||||
///
|
||||
/// See [MDN](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState) for the ready state values.
|
||||
///
|
||||
#[ allow( missing_docs ) ]
|
||||
//
|
||||
#[ derive( Debug, Clone, Copy, PartialEq, Eq ) ]
|
||||
//
|
||||
pub enum WsReadyState
|
||||
{
|
||||
CONNECTING,
|
||||
OPEN ,
|
||||
CLOSING ,
|
||||
CLOSED ,
|
||||
}
|
||||
|
||||
|
||||
/// Internally ready state is a u16, so it's possible to create one from a u16. Only 0-3 are valid values.
|
||||
///
|
||||
/// See [MDN](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState) for the ready state values.
|
||||
///
|
||||
impl TryFrom<u16> for WsReadyState
|
||||
{
|
||||
type Error = WsErr;
|
||||
|
||||
fn try_from( state: u16 ) -> Result< Self, Self::Error >
|
||||
{
|
||||
match state
|
||||
{
|
||||
0 => Ok ( WsReadyState::CONNECTING ) ,
|
||||
1 => Ok ( WsReadyState::OPEN ) ,
|
||||
2 => Ok ( WsReadyState::CLOSING ) ,
|
||||
3 => Ok ( WsReadyState::CLOSED ) ,
|
||||
_ => Err( WsErrKind::InvalidReadyState( state ).into() ) ,
|
||||
}
|
||||
}
|
||||
}
|
||||
55
src/lib.rs
Normal file
55
src/lib.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
//! A convenience layer for using WebSockets from WebAssembly.
|
||||
//! It implements a futures Stream/Sink and tokio AsyncRead/AsyncWrite on top of the web-sys interface
|
||||
//! [WebSocket](https://docs.rs/web-sys/0.3.17/web_sys/struct.WebSocket.html).
|
||||
//!
|
||||
//! This allows you to communicate between your server and a browser wasm module transparently without worrying about
|
||||
//! the underlying protocol. You can use tokio codec to get framed messages of any type that implements [serde::Serialize](https://docs.rs/serde/1.0.89/serde/trait.Serialize.html).
|
||||
//!
|
||||
//! This library tries to work with [async_await] wherever possible, with the exemption of WsStream because tokio is on futures 0.1.
|
||||
//! It requires a nightly compiler for now.
|
||||
//!
|
||||
//! For examples please have a look at [JsWebSocket] and [WsStream].
|
||||
//!
|
||||
#![ doc ( html_root_url = "https://docs.rs/wasm_websocket_stream/0.1.0" ) ]
|
||||
#![ feature( async_await ) ]
|
||||
#![ deny ( missing_docs ) ]
|
||||
#![ forbid ( unsafe_code ) ]
|
||||
#![ allow ( clippy::suspicious_else_formatting ) ]
|
||||
|
||||
mod chunk_stream ;
|
||||
mod error ;
|
||||
mod js_msg_event ;
|
||||
mod js_web_socket ;
|
||||
mod ws_stream ;
|
||||
mod callback_future ;
|
||||
|
||||
pub use
|
||||
{
|
||||
callback_future :: { future_event } ,
|
||||
chunk_stream :: { ChunkStream } ,
|
||||
error :: { WsErr , WsErrKind } ,
|
||||
js_msg_event :: { JsMsgEvent , JsMsgEvtData } ,
|
||||
js_web_socket :: { JsWebSocket, WsReadyState } ,
|
||||
ws_stream :: { WsStream } ,
|
||||
};
|
||||
|
||||
|
||||
|
||||
mod import
|
||||
{
|
||||
pub use
|
||||
{
|
||||
failure :: { Backtrace, Fail, Context as FailContext } ,
|
||||
futures :: { channel::{ oneshot, mpsc::unbounded }, Poll } ,
|
||||
futures :: { compat ::{ Compat01As03Sink, Stream01CompatExt, Sink01CompatExt, Future01CompatExt, AsyncWrite01CompatExt, AsyncRead01CompatExt } } ,
|
||||
futures :: { prelude::{ Stream, Sink, AsyncRead, AsyncWrite }, task::Waker, stream::StreamExt } ,
|
||||
tokio :: { io::{ AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01 }, prelude::{ Async, task } } ,
|
||||
std :: { cmp::{ self, min }, io::{ self, Read, ErrorKind::WouldBlock }, collections::VecDeque, future::Future } ,
|
||||
std :: { sync::{ Mutex, Arc }, rc::Rc, cell::{ RefCell, RefMut }, pin::Pin, convert::{ TryFrom, TryInto }, fmt } ,
|
||||
log :: { debug, info, warn, trace, error } ,
|
||||
// async_runtime:: { rt },
|
||||
js_sys :: { ArrayBuffer, Uint8Array } ,
|
||||
wasm_bindgen :: { closure::Closure, JsCast, JsValue, UnwrapThrowExt } ,
|
||||
web_sys :: { *, console::debug_1 as dbg, BinaryType, Blob } ,
|
||||
};
|
||||
}
|
||||
288
src/ws_stream.rs
Normal file
288
src/ws_stream.rs
Normal file
@@ -0,0 +1,288 @@
|
||||
use
|
||||
{
|
||||
crate :: { import::*, ChunkStream, JsWebSocket, WsReadyState },
|
||||
};
|
||||
|
||||
|
||||
type AsyncIoResult = Result< Async<usize>, io::Error >;
|
||||
type AsyncTokioResult = Result< Async<()> , tokio::io::Error >;
|
||||
|
||||
|
||||
/// A tokio AsyncRead/AsyncWrite representing a WebSocket connection. It only supports binary mode. Contrary to the rest of this library,
|
||||
/// this will work on types from [futures 0.1](https://docs.rs/futures/0.1.25/futures/) instead of [0.3](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/index.html). This is because tokio currently is on futures 0.1, so the stream returned from
|
||||
/// a codec will be 0.1.
|
||||
///
|
||||
/// Currently !Sync and !Send.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// This example if from the integration tests. Uses [tokio-serde-cbor](https://docs.rs/tokio-serde-cbor/0.3.1/tokio_serde_cbor/) to send arbitrary data that implements [serde::Serialize](https://docs.rs/serde/1.0.89/serde/trait.Serialize.html) over a websocket.
|
||||
///
|
||||
/// ```
|
||||
/// #[ derive( Debug, Clone, Serialize, Deserialize, PartialEq, Eq ) ]
|
||||
/// //
|
||||
/// struct Data
|
||||
/// {
|
||||
/// hello: String ,
|
||||
/// data : Vec<u32> ,
|
||||
/// num : u64 ,
|
||||
/// }
|
||||
///
|
||||
/// let dataset: Vec<Data> = vec!
|
||||
/// [
|
||||
/// Data{ hello: "Hello CBOR - basic" .to_string(), data: vec![ 0, 33245, 3, 36 ], num: 3948594 },
|
||||
/// Data{ hello: "Hello CBOR - 4MB data".to_string(), data: vec![ 1; 1_024_000 ], num: 3948594 },
|
||||
/// ];
|
||||
///
|
||||
/// let fut = async move
|
||||
/// {
|
||||
/// for data in dataset
|
||||
/// {
|
||||
/// echo_cbor( data ).await;
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
///
|
||||
/// }.boxed().compat();
|
||||
///
|
||||
/// spawn_local( fut );
|
||||
///
|
||||
///
|
||||
///
|
||||
/// // Send data to an echo server and verify that what returns is exactly the same
|
||||
/// //
|
||||
/// async fn echo_cbor( data: Data )
|
||||
/// {
|
||||
/// console_log!( " Enter echo_cbor: {}", &data.hello );
|
||||
///
|
||||
/// let ws = connect().await;
|
||||
///
|
||||
/// let codec: Codec<Data, Data> = Codec::new().packed( true );
|
||||
/// let (tx, mut rx) = codec.framed( ws ).split();
|
||||
///
|
||||
/// tx.send( data.clone() ).await.expect_throw( "Failed to write to websocket" );
|
||||
///
|
||||
/// let msg = rx.next().await;
|
||||
/// let result = &mut msg.unwrap_throw().unwrap_throw();
|
||||
///
|
||||
/// assert_eq!( &data, result );
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
#[ allow( dead_code ) ] // for the on_mesg...
|
||||
//
|
||||
#[ derive( Debug ) ]
|
||||
//
|
||||
pub struct WsStream
|
||||
{
|
||||
// RefCell because we want to be able to read on a not mutable reference of WsStream.
|
||||
// And because the closure also accesses this property.
|
||||
//
|
||||
incoming : Rc<RefCell< ChunkStream >> ,
|
||||
ws : JsWebSocket ,
|
||||
|
||||
// The task to be notified if we receive new data
|
||||
//
|
||||
task : Rc<RefCell< Option<task::Task> >> ,
|
||||
}
|
||||
|
||||
|
||||
impl WsStream
|
||||
{
|
||||
/// Create a new WsStream. The future resolves when the connection is established.
|
||||
///
|
||||
pub async fn connect< T: AsRef<str> >( url: T ) -> Result< WsStream, JsValue >
|
||||
{
|
||||
let task: Rc<RefCell<Option<task::Task>>> = Rc::new( RefCell::new( None ) ) ;
|
||||
let t2 = task.clone();
|
||||
|
||||
let incoming = Rc::new( RefCell::new( ChunkStream::new() ) );
|
||||
let i2 = incoming.clone();
|
||||
|
||||
|
||||
|
||||
let cb = Box::new( move |msg_evt: MessageEvent|
|
||||
{
|
||||
trace!( "WsStream: message received!" );
|
||||
|
||||
|
||||
let data = msg_evt.data();
|
||||
|
||||
if data.is_instance_of::< ArrayBuffer >()
|
||||
{
|
||||
let raw = data.dyn_into::< ArrayBuffer >().unwrap_throw();
|
||||
|
||||
i2.borrow_mut().push( Uint8Array::new( raw.as_ref() ) );
|
||||
}
|
||||
|
||||
else if data.is_string()
|
||||
{
|
||||
i2.borrow_mut().push( Uint8Array::new(&data) )
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
error!( "WsStream: Invalid data format" );
|
||||
};
|
||||
|
||||
|
||||
if let Some( ref t ) = *t2.borrow()
|
||||
{
|
||||
trace!( "WsStream: waking up task" );
|
||||
t.notify()
|
||||
}
|
||||
});
|
||||
|
||||
let ws = JsWebSocket::with_on_message( url.as_ref(), cb )? ;
|
||||
|
||||
|
||||
ws.connect().await;
|
||||
|
||||
Ok( Self { ws, task, incoming } )
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Verify the [WsReadyState] of the connection.
|
||||
///
|
||||
pub fn state( &self ) -> WsReadyState { self.ws.ready_state() }
|
||||
|
||||
|
||||
|
||||
/// Get the url of the server we are connected to.
|
||||
///
|
||||
pub fn url ( &self ) -> String { self.ws.url() }
|
||||
|
||||
|
||||
|
||||
//---------- impl io::Read
|
||||
//
|
||||
fn io_read( &self, buf: &mut [u8] ) -> Result< usize, io::Error >
|
||||
{
|
||||
trace!( "WsStream: read called" );
|
||||
|
||||
let mut inc = self.incoming.borrow_mut();
|
||||
|
||||
if inc.is_empty()
|
||||
{
|
||||
trace!( "WsStream: read would block" );
|
||||
|
||||
*self.task.borrow_mut() = Some( task::current() );
|
||||
|
||||
Err( io::Error::from( WouldBlock ) )
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
Ok( inc.read( 0, buf ) as usize )
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// -------io:Write impl
|
||||
//
|
||||
fn io_write( &self, buf: &[u8] ) -> io::Result< usize >
|
||||
{
|
||||
// FIXME: avoid extra copy? Probably use one of the other methods on WebSocket, like
|
||||
// send_with_array_buffer_view. We have to figure out how to create those from a &[u8]
|
||||
//
|
||||
let result = self.ws.wrapped().send_with_u8_array( &mut Vec::from( buf ) );
|
||||
|
||||
match result
|
||||
{
|
||||
Ok (_ ) => Ok ( buf.len() ) ,
|
||||
Err(_e) => Err( io::Error::from( io::ErrorKind::NotConnected ) ) ,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn io_flush( &self ) -> io::Result<()>
|
||||
{
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
|
||||
// -------AsyncRead impl
|
||||
//
|
||||
fn async_pread( &self, buf: &mut [u8] ) -> AsyncIoResult
|
||||
{
|
||||
trace!( "WsStream: poll_read called" );
|
||||
|
||||
match self.io_read( buf )
|
||||
{
|
||||
Ok ( t ) => Ok ( Async::Ready(t) ),
|
||||
Err( ref e ) if e.kind() == WouldBlock =>
|
||||
{
|
||||
trace!( "WsStream: registering interested task" );
|
||||
|
||||
*self.task.borrow_mut() = Some( task::current() );
|
||||
Ok ( Async::NotReady )
|
||||
},
|
||||
|
||||
// Order is important here, don't move this above the other error match
|
||||
//
|
||||
Err( e ) => Err( e ),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// -------AsyncWrite impl
|
||||
//
|
||||
fn async_shutdown( &self ) -> AsyncTokioResult
|
||||
{
|
||||
// This can not throw normally, because the only errors the api
|
||||
// can return is if we use a code or a reason string, which we don't.
|
||||
//
|
||||
self.ws.wrapped().close().unwrap_throw();
|
||||
|
||||
Ok(().into())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl Drop for WsStream
|
||||
{
|
||||
fn drop( &mut self )
|
||||
{
|
||||
// This can not throw normally, because the only errors the api
|
||||
// can return is if we use a code or a reason string, which we don't.
|
||||
//
|
||||
self.ws.wrapped().close().unwrap_throw();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// FIXME: Can we not do this by just implementing Deref? It seems dumb to have all this boilerplate.
|
||||
// When I checked tokio code for (Tcp, Uds, ...) they all seem to have 2 copies for each impl,
|
||||
// not at all keeping it DRY. At least here we refactor those into impl WsStream... I'm a bit
|
||||
// confused by this impls for references.
|
||||
|
||||
impl io::Read for WsStream { fn read( &mut self, buf: &mut [u8] ) -> Result< usize, io::Error > { self.io_read( buf ) } }
|
||||
impl<'a> io::Read for &'a WsStream { fn read( &mut self, buf: &mut [u8] ) -> Result< usize, io::Error > { self.io_read( buf ) } }
|
||||
|
||||
|
||||
impl io::Write for WsStream
|
||||
{
|
||||
fn write( &mut self, buf: &[u8] ) -> io::Result< usize > { self.io_write( buf ) }
|
||||
fn flush( &mut self ) -> io::Result< () > { self.io_flush( ) }
|
||||
}
|
||||
|
||||
impl<'a> io::Write for &'a WsStream
|
||||
{
|
||||
fn write( &mut self, buf: &[u8] ) -> io::Result< usize > { self.io_write( buf ) }
|
||||
fn flush( &mut self ) -> io::Result< () > { self.io_flush( ) }
|
||||
}
|
||||
|
||||
|
||||
impl AsyncRead01 for WsStream { fn poll_read( &mut self, buf: &mut [u8] ) -> AsyncIoResult { self.async_pread( buf ) } }
|
||||
impl<'a> AsyncRead01 for &'a WsStream { fn poll_read( &mut self, buf: &mut [u8] ) -> AsyncIoResult { self.async_pread( buf ) } }
|
||||
|
||||
|
||||
impl AsyncWrite01 for WsStream { fn shutdown( &mut self ) -> AsyncTokioResult { self.async_shutdown() } }
|
||||
impl<'a> AsyncWrite01 for &'a WsStream { fn shutdown( &mut self ) -> AsyncTokioResult { self.async_shutdown() } }
|
||||
157
tests/js_web_socket.rs
Normal file
157
tests/js_web_socket.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
#![ cfg( target_arch = "wasm32" ) ]
|
||||
#![ feature( async_await, trait_alias )]
|
||||
wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
|
||||
use
|
||||
{
|
||||
futures_01 :: Future as Future01,
|
||||
futures::prelude :: * ,
|
||||
wasm_bindgen::prelude :: * ,
|
||||
wasm_bindgen_test :: * ,
|
||||
ws_stream_wasm :: * ,
|
||||
log :: * ,
|
||||
};
|
||||
|
||||
|
||||
const URL: &str = "ws://localhost:3212/";
|
||||
|
||||
|
||||
|
||||
// Verify that a round trip to an echo server generates identical textual data.
|
||||
//
|
||||
#[ wasm_bindgen_test(async) ]
|
||||
//
|
||||
pub fn round_trip_text() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
info!( "starting test: round_trip" );
|
||||
|
||||
async
|
||||
{
|
||||
let ws = JsWebSocket::new( URL ).expect_throw( "Could not create websocket" );
|
||||
|
||||
ws.connect().await;
|
||||
|
||||
let (mut tx, mut rx) = ws.split();
|
||||
let message = "Hello from browser".to_string();
|
||||
|
||||
|
||||
tx.send( JsMsgEvtData::Text( message.clone() ) ).await
|
||||
|
||||
.expect_throw( "Failed to write to websocket" );
|
||||
|
||||
|
||||
let msg = rx.next().await;
|
||||
let result = &msg.expect_throw( "Stream closed" );
|
||||
|
||||
assert_eq!( JsMsgEvtData::Text( message ), result.data() );
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Verify that a round trip to an echo server generates identical binary data.
|
||||
//
|
||||
#[ wasm_bindgen_test(async) ]
|
||||
//
|
||||
pub fn round_trip_binary() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
info!( "starting test: round_trip" );
|
||||
|
||||
async
|
||||
{
|
||||
let ws = JsWebSocket::new( URL ).expect_throw( "Could not create websocket" );
|
||||
|
||||
ws.connect().await;
|
||||
|
||||
let (mut tx, mut rx) = ws.split();
|
||||
let message = b"Hello from browser".to_vec();
|
||||
|
||||
|
||||
tx.send( JsMsgEvtData::Binary( message.clone() ) ).await
|
||||
|
||||
.expect_throw( "Failed to write to websocket" );
|
||||
|
||||
|
||||
let msg = rx.next().await;
|
||||
let result = &msg.unwrap();
|
||||
|
||||
assert_eq!( JsMsgEvtData::Binary( message ), result.data() );
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Verify url method.
|
||||
//
|
||||
#[ wasm_bindgen_test(async) ]
|
||||
//
|
||||
pub fn url() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
info!( "starting test: url" );
|
||||
|
||||
async
|
||||
{
|
||||
let ws = JsWebSocket::new( URL ).expect_throw( "Could not create websocket" );
|
||||
|
||||
ws.connect().await;
|
||||
|
||||
assert_eq!( URL, ws.url() );
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Verify state method.
|
||||
//
|
||||
#[ wasm_bindgen_test(async) ]
|
||||
//
|
||||
pub fn state() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
info!( "starting test: state" );
|
||||
|
||||
async
|
||||
{
|
||||
let ws = JsWebSocket::new( URL ).expect_throw( "Could not create websocket" );
|
||||
|
||||
assert_eq!( WsReadyState::CONNECTING, ws.ready_state() );
|
||||
|
||||
ws.connect().await;
|
||||
|
||||
assert_eq!( WsReadyState::OPEN, ws.ready_state() );
|
||||
|
||||
ws.close().await;
|
||||
|
||||
assert_eq!( WsReadyState::CLOSED, ws.ready_state() );
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
213
tests/wasm_ws_stream.rs
Normal file
213
tests/wasm_ws_stream.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
#![ cfg( target_arch = "wasm32" ) ]
|
||||
#![ feature( async_await, trait_alias )]
|
||||
wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
|
||||
|
||||
use
|
||||
{
|
||||
wasm_bindgen::prelude :: { * } ,
|
||||
wasm_bindgen_test :: { * } ,
|
||||
ws_stream_wasm :: { * } ,
|
||||
log :: { * } ,
|
||||
rand_xoshiro :: { * } ,
|
||||
tokio :: { codec::{ BytesCodec, Decoder } } ,
|
||||
bytes :: { Bytes } ,
|
||||
futures_01 :: { Future as Future01 } ,
|
||||
futures :: { prelude::{ FutureExt, TryFutureExt, StreamExt, SinkExt } } ,
|
||||
futures::compat :: { Stream01CompatExt, Sink01CompatExt } ,
|
||||
|
||||
rand :: { RngCore, SeedableRng } ,
|
||||
tokio::prelude :: { Stream } ,
|
||||
web_sys :: { console::log_1 as dbg } ,
|
||||
serde :: { Serialize, Deserialize } ,
|
||||
tokio_serde_cbor :: { Codec } ,
|
||||
|
||||
};
|
||||
|
||||
const URL: &str = "ws://localhost:3212";
|
||||
|
||||
|
||||
|
||||
|
||||
async fn connect() -> WsStream
|
||||
{
|
||||
WsStream::connect( URL )
|
||||
|
||||
.await
|
||||
.map_err ( |e| { dbg( &e ); e } )
|
||||
.expect_throw( "Couldn't create websocket connection" )
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Verify that a round trip to an echo server generates identical data.
|
||||
//
|
||||
#[ wasm_bindgen_test( async ) ]
|
||||
//
|
||||
pub fn data_integrity() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
console_log!( "starting test: data_integrity" );
|
||||
|
||||
let big_size = 10240;
|
||||
let mut random = vec![ 0; big_size ];
|
||||
let mut rng = Xoshiro256Plus::from_seed( [ 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0 ] );
|
||||
|
||||
rng.fill_bytes( &mut random );
|
||||
|
||||
let dataset: Vec<(&str, usize, Bytes)> = vec!
|
||||
[
|
||||
( "basic" , 18, Bytes::from_static( b"Hello from browser" ) ),
|
||||
|
||||
// 20 random bytes, not valid unicode
|
||||
//
|
||||
( "random bytes", 20, Bytes::from( vec![ 72, 31, 238, 236, 85, 240, 197, 235, 149, 238, 245, 206, 227, 201, 139, 63, 173, 214, 158, 134 ] ) ),
|
||||
|
||||
// Test with something big:
|
||||
//
|
||||
( "big random" , big_size, Bytes::from( random ) ),
|
||||
];
|
||||
|
||||
async move
|
||||
{
|
||||
for data in dataset
|
||||
{
|
||||
echo( data.0, data.1, data.2 ).await;
|
||||
}
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Send data to an echo server and verify that what returns is exactly the same
|
||||
// We run 2 connections in parallel, the second one we verify that we can use a reference
|
||||
// to a WsStream
|
||||
//
|
||||
async fn echo( name: &str, size: usize, data: Bytes )
|
||||
{
|
||||
console_log!( " Enter echo: {}", name );
|
||||
|
||||
let ws = connect().await;
|
||||
let ws2 = connect().await;
|
||||
|
||||
let ( tx , rx ) = BytesCodec::new().framed( ws ).split();
|
||||
let ( tx2, rx2 ) = BytesCodec::new().framed( &ws2 ).split(); // This is where we verify reference works
|
||||
|
||||
let mut tx = tx .sink_compat();
|
||||
let mut tx2 = tx2.sink_compat();
|
||||
|
||||
let mut rx = rx .compat();
|
||||
let mut rx2 = rx2.compat();
|
||||
|
||||
|
||||
tx .send( data.clone() ).await.expect_throw( "Failed to write to websocket" );
|
||||
tx2.send( data.clone() ).await.expect_throw( "Failed to write to websocket" );
|
||||
|
||||
let mut result: Vec<u8> = Vec::new();
|
||||
|
||||
while &result.len() < &size
|
||||
{
|
||||
let msg = rx.next().await.unwrap_throw();
|
||||
let buf: &[u8] = msg.as_ref().unwrap_throw();
|
||||
result.extend( buf );
|
||||
}
|
||||
|
||||
let mut result2: Vec<u8> = Vec::new();
|
||||
|
||||
while &result2.len() < &size
|
||||
{
|
||||
let msg = rx2.next().await.unwrap_throw();
|
||||
let buf: &[u8] = msg.as_ref().unwrap_throw();
|
||||
result2.extend( buf );
|
||||
}
|
||||
|
||||
assert_eq!( &data, &Bytes::from( result ) );
|
||||
assert_eq!( &data, &Bytes::from( result2 ) );
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/////////////////////
|
||||
// With serde-cbor //
|
||||
/////////////////////
|
||||
|
||||
|
||||
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq, Eq ) ]
|
||||
//
|
||||
struct Data
|
||||
{
|
||||
hello: String ,
|
||||
data : Vec<u32> ,
|
||||
num : u64 ,
|
||||
}
|
||||
|
||||
|
||||
// Verify that a round trip to an echo server generates identical data.
|
||||
//
|
||||
#[ wasm_bindgen_test( async ) ]
|
||||
//
|
||||
pub fn data_integrity_cbor() -> impl Future01<Item = (), Error = JsValue>
|
||||
{
|
||||
let _ = console_log::init_with_level( Level::Trace );
|
||||
|
||||
console_log!( "starting test: data_integrity_cbor" );
|
||||
|
||||
let dataset: Vec<Data> = vec!
|
||||
[
|
||||
Data{ hello: "Hello CBOR - basic".to_string(), data: vec![ 0, 33245, 3, 36 ], num: 3948594 },
|
||||
|
||||
// Test with something big
|
||||
//
|
||||
Data{ hello: "Hello CBOR - 4MB data".to_string(), data: vec![ 1; 1_024_000 ], num: 3948594 },
|
||||
];
|
||||
|
||||
async move
|
||||
{
|
||||
for data in dataset
|
||||
{
|
||||
echo_cbor( data ).await;
|
||||
}
|
||||
|
||||
let r: Result<(), wasm_bindgen::JsValue> = Ok(());
|
||||
|
||||
r
|
||||
|
||||
}.boxed_local().compat()
|
||||
}
|
||||
|
||||
|
||||
// Send data to an echo server and verify that what returns is exactly the same
|
||||
//
|
||||
async fn echo_cbor( data: Data )
|
||||
{
|
||||
console_log!( " Enter echo_cbor: {}", &data.hello );
|
||||
|
||||
let ws = connect().await;
|
||||
|
||||
let codec: Codec<Data, Data> = Codec::new().packed( true );
|
||||
let (tx, rx) = codec.framed( ws ).split();
|
||||
|
||||
let mut tx = tx .sink_compat();
|
||||
let mut rx = rx .compat();
|
||||
|
||||
|
||||
tx.send( data.clone() ).await.expect_throw( "Failed to write to websocket" );
|
||||
|
||||
let msg = rx.next().await;
|
||||
let result = &mut msg.unwrap_throw().unwrap_throw();
|
||||
|
||||
assert_eq!( &data, result );
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user