diff --git a/Cargo.lock b/Cargo.lock index 44103cb71..085ad3fa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,6 +314,19 @@ dependencies = [ "syn 1.0.76", ] +[[package]] +name = "async-tungstenite" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "742cc7dcb20b2f84a42f4691aa999070ec7e78f8e7e7438bf14be7017b44907e" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tungstenite 0.15.0", +] + [[package]] name = "asynchronous-codec" version = "0.6.0" @@ -1170,6 +1183,7 @@ dependencies = [ "async-native-tls", "async-std", "async-trait", + "async-tungstenite", "bellman", "bimap", "bitcoin", @@ -1206,8 +1220,8 @@ dependencies = [ "solana-client", "solana-sdk", "spl-token", - "tokio-tungstenite", "toml", + "tungstenite 0.15.0", "url", "zcash_primitives", "zcash_proofs", @@ -2724,26 +2738,6 @@ dependencies = [ "ucd-trie", ] -[[package]] -name = "pin-project" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" -dependencies = [ - "proc-macro2 1.0.29", - "quote 1.0.9", - "syn 1.0.76", -] - [[package]] name = "pin-project-lite" version = "0.2.7" @@ -4317,19 +4311,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "tokio-tungstenite" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" -dependencies = [ - "futures-util", - "log", - "pin-project", - "tokio", - "tungstenite 0.14.0", -] - [[package]] name = "tokio-util" version = "0.6.8" @@ -4407,9 +4388,9 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" +checksum = "983d40747bce878d2fb67d910dcb8bd3eca2b2358540c3cc1b98c027407a3ae3" dependencies = [ "base64 0.13.0", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 8316d5810..f25b479cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,9 @@ async-native-tls = "0.3.3" native-tls = "0.2.8" easy-parallel = "3.1.0" +tungstenite = "0.15.0" +async-tungstenite = "0.15.0" + simple_logger = "1.13.0" log = "0.4.14" clap = "2.33.3" @@ -62,7 +65,6 @@ rocksdb = {version = "0.16.0", default-features = false, features = ["lz4"]} spl-token = {version = "3.2.0", optional = true} solana-sdk = {version = "1.7.11", optional = true} solana-client = {version = "1.7.11", optional = true} -tokio-tungstenite = {version = "0.15.0", optional = true} ## Cashier Bitcoin Dependencies bitcoin = {version = "0.27.0", optional = true } @@ -80,4 +82,4 @@ features = ["async-std-runtime", "all-transport"] [features] btc = ["bitcoin", "secp256k1", "electrum-client"] -sol = ["solana-sdk", "solana-client", "spl-token", "tokio-tungstenite"] +sol = ["solana-sdk", "solana-client", "spl-token"] diff --git a/src/error.rs b/src/error.rs index 1016b4152..e28938ebb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -64,6 +64,7 @@ pub enum Error { ConfigNotFound, SetLoggerError, TokenParseError, + TungsteniteError, } impl std::error::Error for Error {} @@ -129,6 +130,7 @@ impl fmt::Display for Error { } Error::SetLoggerError => f.write_str("SetLoggerError"), Error::TokenParseError => f.write_str("Could not parse token list"), + Error::TungsteniteError => f.write_str("TungsteniteError"), } } } @@ -284,3 +286,9 @@ impl From for Error { Error::SetLoggerError } } + +impl From for Error { + fn from(_err: tungstenite::Error) -> Error { + Error::TungsteniteError + } +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 6c02b9bbc..63bf26870 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,3 +1,3 @@ -// pub mod adapter; pub mod jsonrpc; pub mod rpcserver; +pub mod websockets; diff --git a/src/rpc/websockets.rs b/src/rpc/websockets.rs new file mode 100644 index 000000000..3750f799e --- /dev/null +++ b/src/rpc/websockets.rs @@ -0,0 +1,95 @@ +use async_native_tls::{TlsConnector, TlsStream}; +use async_tungstenite::WebSocketStream; +use futures::sink::Sink; +use smol::{prelude::*, Async}; +use std::net::{TcpStream, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tungstenite::handshake::client::Response; +use tungstenite::Message; +use url::Url; + +use crate::{Error, Result as DrkResult}; + +pub enum WsStream { + Tcp(WebSocketStream>), + Tls(WebSocketStream>>), +} + +impl Sink for WsStream { + type Error = tungstenite::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + WsStream::Tcp(s) => Pin::new(s).poll_ready(cx), + WsStream::Tls(s) => Pin::new(s).poll_ready(cx), + } + } + + fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + match &mut *self { + WsStream::Tcp(s) => Pin::new(s).start_send(item), + WsStream::Tls(s) => Pin::new(s).start_send(item), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + WsStream::Tcp(s) => Pin::new(s).poll_flush(cx), + WsStream::Tls(s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + WsStream::Tcp(s) => Pin::new(s).poll_close(cx), + WsStream::Tls(s) => Pin::new(s).poll_close(cx), + } + } +} + +impl Stream for WsStream { + type Item = tungstenite::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + WsStream::Tcp(s) => Pin::new(s).poll_next(cx), + WsStream::Tls(s) => Pin::new(s).poll_next(cx), + } + } +} + +/// Connects to a WebSocket address (optionally secured by TLS). +pub async fn connect(addr: &str, tls: TlsConnector) -> DrkResult<(WsStream, Response)> { + let url = Url::parse(addr)?; + let host = url + .host_str() + .ok_or_else(|| Error::UrlParseError)? + .to_string(); + let port = url + .port_or_known_default() + .ok_or_else(|| Error::UrlParseError)?; + + let socket_addr = { + let host = host.clone(); + smol::unblock(move || (host.as_str(), port).to_socket_addrs()) + .await? + .next() + .ok_or_else(|| Error::UrlParseError)? + }; + + match url.scheme() { + "ws" => { + let stream = Async::::connect(socket_addr).await?; + let (stream, resp) = async_tungstenite::client_async(addr, stream).await?; + Ok((WsStream::Tcp(stream), resp)) + } + "wss" => { + let stream = Async::::connect(socket_addr).await?; + let stream = tls.connect(host, stream).await?; + let (stream, resp) = async_tungstenite::client_async(addr, stream).await?; + Ok((WsStream::Tls(stream), resp)) + } + _scheme => Err(Error::UrlParseError), + } +} diff --git a/src/service/sol.rs b/src/service/sol.rs index 668a8d14d..a10b0d374 100644 --- a/src/service/sol.rs +++ b/src/service/sol.rs @@ -1,12 +1,13 @@ -use crate::rpc::{jsonrpc, jsonrpc::JsonResult}; +use crate::rpc::{jsonrpc, jsonrpc::JsonResult, websockets::connect}; use crate::serial::{deserialize, serialize, Decodable, Encodable}; use crate::{Error, Result}; use super::bridge::{TokenClient, TokenNotification, TokenSubscribtion}; -use async_trait::async_trait; - use async_executor::Executor; +use async_native_tls::TlsConnector; +use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; use futures::{SinkExt, StreamExt}; use log::*; use rand::rngs::OsRng; @@ -17,12 +18,10 @@ use solana_sdk::{ pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction, transaction::Transaction, }; -use tokio_tungstenite::{connect_async, tungstenite, tungstenite::protocol::Message}; - -use async_std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::convert::TryFrom; use std::str::FromStr; +use tungstenite::Message; //const RPC_SERVER: &str = "https://api.mainnet-beta.solana.com"; //const WSS_SERVER: &str = "wss://api.mainnet-beta.solana.com"; @@ -71,9 +70,10 @@ impl SolClient { pub async fn run(self: Arc, executor: Arc>) -> SolResult<()> { // WebSocket handshake/connect - let (ws_stream, _) = connect_async(WSS_SERVER).await?; - - let (mut write, read) = ws_stream.split(); + let builder = native_tls::TlsConnector::builder(); + let tls = TlsConnector::from(builder); + let (stream, _) = connect(WSS_SERVER, tls).await?; + let (mut write, read) = stream.split(); let self2 = self.clone(); let _: async_executor::Task> = executor.spawn(async move { @@ -83,13 +83,13 @@ impl SolClient { // write the request to websocket write - .send(Message::Text(serde_json::to_string(&sub_msg)?)) + .send(Message::text(serde_json::to_string(&sub_msg)?)) .await .map_err(|err| SolFailed::from(err))?; } }); - read.for_each(|message| async { + futures::StreamExt::for_each(read, |message| async { // read ws msg self.clone() .read_ws_msg(message)