Implement async Websocket client using smol.

This commit is contained in:
parazyd
2021-09-24 13:45:35 +02:00
parent d968741394
commit 72ae436249
6 changed files with 136 additions and 50 deletions

53
Cargo.lock generated
View File

@@ -314,6 +314,19 @@ dependencies = [
"syn 1.0.76",
]
[[package]]
name = "async-tungstenite"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "742cc7dcb20b2f84a42f4691aa999070ec7e78f8e7e7438bf14be7017b44907e"
dependencies = [
"futures-io",
"futures-util",
"log",
"pin-project-lite",
"tungstenite 0.15.0",
]
[[package]]
name = "asynchronous-codec"
version = "0.6.0"
@@ -1170,6 +1183,7 @@ dependencies = [
"async-native-tls",
"async-std",
"async-trait",
"async-tungstenite",
"bellman",
"bimap",
"bitcoin",
@@ -1206,8 +1220,8 @@ dependencies = [
"solana-client",
"solana-sdk",
"spl-token",
"tokio-tungstenite",
"toml",
"tungstenite 0.15.0",
"url",
"zcash_primitives",
"zcash_proofs",
@@ -2724,26 +2738,6 @@ dependencies = [
"ucd-trie",
]
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@@ -4317,19 +4311,6 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-tungstenite"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8"
dependencies = [
"futures-util",
"log",
"pin-project",
"tokio",
"tungstenite 0.14.0",
]
[[package]]
name = "tokio-util"
version = "0.6.8"
@@ -4407,9 +4388,9 @@ dependencies = [
[[package]]
name = "tungstenite"
version = "0.14.0"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5"
checksum = "983d40747bce878d2fb67d910dcb8bd3eca2b2358540c3cc1b98c027407a3ae3"
dependencies = [
"base64 0.13.0",
"byteorder",

View File

@@ -44,6 +44,9 @@ async-native-tls = "0.3.3"
native-tls = "0.2.8"
easy-parallel = "3.1.0"
tungstenite = "0.15.0"
async-tungstenite = "0.15.0"
simple_logger = "1.13.0"
log = "0.4.14"
clap = "2.33.3"
@@ -62,7 +65,6 @@ rocksdb = {version = "0.16.0", default-features = false, features = ["lz4"]}
spl-token = {version = "3.2.0", optional = true}
solana-sdk = {version = "1.7.11", optional = true}
solana-client = {version = "1.7.11", optional = true}
tokio-tungstenite = {version = "0.15.0", optional = true}
## Cashier Bitcoin Dependencies
bitcoin = {version = "0.27.0", optional = true }
@@ -80,4 +82,4 @@ features = ["async-std-runtime", "all-transport"]
[features]
btc = ["bitcoin", "secp256k1", "electrum-client"]
sol = ["solana-sdk", "solana-client", "spl-token", "tokio-tungstenite"]
sol = ["solana-sdk", "solana-client", "spl-token"]

View File

@@ -64,6 +64,7 @@ pub enum Error {
ConfigNotFound,
SetLoggerError,
TokenParseError,
TungsteniteError,
}
impl std::error::Error for Error {}
@@ -129,6 +130,7 @@ impl fmt::Display for Error {
}
Error::SetLoggerError => f.write_str("SetLoggerError"),
Error::TokenParseError => f.write_str("Could not parse token list"),
Error::TungsteniteError => f.write_str("TungsteniteError"),
}
}
}
@@ -284,3 +286,9 @@ impl From<log::SetLoggerError> for Error {
Error::SetLoggerError
}
}
impl From<tungstenite::Error> for Error {
fn from(_err: tungstenite::Error) -> Error {
Error::TungsteniteError
}
}

View File

@@ -1,3 +1,3 @@
// pub mod adapter;
pub mod jsonrpc;
pub mod rpcserver;
pub mod websockets;

95
src/rpc/websockets.rs Normal file
View File

@@ -0,0 +1,95 @@
use async_native_tls::{TlsConnector, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::sink::Sink;
use smol::{prelude::*, Async};
use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin;
use std::task::{Context, Poll};
use tungstenite::handshake::client::Response;
use tungstenite::Message;
use url::Url;
use crate::{Error, Result as DrkResult};
pub enum WsStream {
Tcp(WebSocketStream<Async<TcpStream>>),
Tls(WebSocketStream<TlsStream<Async<TcpStream>>>),
}
impl Sink<Message> for WsStream {
type Error = tungstenite::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Tcp(s) => Pin::new(s).poll_ready(cx),
WsStream::Tls(s) => Pin::new(s).poll_ready(cx),
}
}
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match &mut *self {
WsStream::Tcp(s) => Pin::new(s).start_send(item),
WsStream::Tls(s) => Pin::new(s).start_send(item),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Tcp(s) => Pin::new(s).poll_flush(cx),
WsStream::Tls(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Tcp(s) => Pin::new(s).poll_close(cx),
WsStream::Tls(s) => Pin::new(s).poll_close(cx),
}
}
}
impl Stream for WsStream {
type Item = tungstenite::Result<Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self {
WsStream::Tcp(s) => Pin::new(s).poll_next(cx),
WsStream::Tls(s) => Pin::new(s).poll_next(cx),
}
}
}
/// Connects to a WebSocket address (optionally secured by TLS).
pub async fn connect(addr: &str, tls: TlsConnector) -> DrkResult<(WsStream, Response)> {
let url = Url::parse(addr)?;
let host = url
.host_str()
.ok_or_else(|| Error::UrlParseError)?
.to_string();
let port = url
.port_or_known_default()
.ok_or_else(|| Error::UrlParseError)?;
let socket_addr = {
let host = host.clone();
smol::unblock(move || (host.as_str(), port).to_socket_addrs())
.await?
.next()
.ok_or_else(|| Error::UrlParseError)?
};
match url.scheme() {
"ws" => {
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Tcp(stream), resp))
}
"wss" => {
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let stream = tls.connect(host, stream).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Tls(stream), resp))
}
_scheme => Err(Error::UrlParseError),
}
}

View File

@@ -1,12 +1,13 @@
use crate::rpc::{jsonrpc, jsonrpc::JsonResult};
use crate::rpc::{jsonrpc, jsonrpc::JsonResult, websockets::connect};
use crate::serial::{deserialize, serialize, Decodable, Encodable};
use crate::{Error, Result};
use super::bridge::{TokenClient, TokenNotification, TokenSubscribtion};
use async_trait::async_trait;
use async_executor::Executor;
use async_native_tls::TlsConnector;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use log::*;
use rand::rngs::OsRng;
@@ -17,12 +18,10 @@ use solana_sdk::{
pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction,
transaction::Transaction,
};
use tokio_tungstenite::{connect_async, tungstenite, tungstenite::protocol::Message};
use async_std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::str::FromStr;
use tungstenite::Message;
//const RPC_SERVER: &str = "https://api.mainnet-beta.solana.com";
//const WSS_SERVER: &str = "wss://api.mainnet-beta.solana.com";
@@ -71,9 +70,10 @@ impl SolClient {
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> SolResult<()> {
// WebSocket handshake/connect
let (ws_stream, _) = connect_async(WSS_SERVER).await?;
let (mut write, read) = ws_stream.split();
let builder = native_tls::TlsConnector::builder();
let tls = TlsConnector::from(builder);
let (stream, _) = connect(WSS_SERVER, tls).await?;
let (mut write, read) = stream.split();
let self2 = self.clone();
let _: async_executor::Task<Result<()>> = executor.spawn(async move {
@@ -83,13 +83,13 @@ impl SolClient {
// write the request to websocket
write
.send(Message::Text(serde_json::to_string(&sub_msg)?))
.send(Message::text(serde_json::to_string(&sub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
}
});
read.for_each(|message| async {
futures::StreamExt::for_each(read, |message| async {
// read ws msg
self.clone()
.read_ws_msg(message)