From 7fd08432003e3f32dd969435824aa1ee4dfd4ffc Mon Sep 17 00:00:00 2001 From: ghassmo Date: Mon, 9 May 2022 12:03:03 +0300 Subject: [PATCH] rpc: use new network transport for jsonrpc --- src/rpc/jsonrpc.rs | 113 +++++++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/src/rpc/jsonrpc.rs b/src/rpc/jsonrpc.rs index d33db5820..ed36e7212 100644 --- a/src/rpc/jsonrpc.rs +++ b/src/rpc/jsonrpc.rs @@ -1,14 +1,17 @@ -use std::{net::TcpStream, os::unix::net::UnixStream, str, time::Duration}; +use std::{env, str, time::Duration}; use async_std::io::timeout; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncReadExt, AsyncWriteExt}; +use log::error; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use smol::Async; use url::Url; -use crate::{Error, Result}; +use crate::{ + net::{TcpTransport, TorTransport, Transport, TransportName, TransportStream}, + Error, Result, +}; #[derive(Debug, Clone)] pub enum ErrorCode { @@ -159,62 +162,74 @@ pub fn notification(m: Value, p: Value) -> JsonNotification { pub async fn send_request(uri: &Url, data: Value, socks_url: Option) -> Result { let data_str = serde_json::to_string(&data)?; - let socket_addr = uri.socket_addrs(|| None)?[0]; - let host = socket_addr.ip().to_string(); - let port = socket_addr.port(); + let transport_name = TransportName::try_from(uri.clone())?; - match uri.scheme() { - "tcp" | "tls" => { - let mut stream = Async::::connect(socket_addr).await?; + match transport_name { + TransportName::Tcp(upgrade) => { + let transport = TcpTransport::new(None, 1024); + let stream = transport.dial(uri.clone()); - if uri.scheme() == "tls" { - let mut stream = async_native_tls::connect(&host, stream).await?; - get_reply(&mut stream, data_str).await - } else { - get_reply(&mut stream, data_str).await - } - } - "unix" => { - let mut stream = Async::::connect(uri.path()).await?; - get_reply(&mut stream, data_str).await - } - "tor" | "nym" => { - use fast_socks5::client::{Config, Socks5Stream}; - - if socks_url.is_none() { - return Err(Error::NoSocks5UrlFound) + if let Err(err) = stream { + error!("TCP Setup failed: {}", err); + return Err(Error::ConnectFailed) } - let socks_url = socks_url.unwrap(); + let stream = stream?.await; - let config = Config::default(); + if let Err(err) = stream { + error!("TCP Connection failed: {}", err); + return Err(Error::ConnectFailed) + } - let socks_url_str = socks_url.socket_addrs(|| None)?[0].to_string(); - - let mut stream = if !socks_url.username().is_empty() && socks_url.password().is_some() { - Socks5Stream::connect_with_password( - socks_url_str, - host, - port, - socks_url.username().to_string(), - socks_url.password().unwrap().to_string(), - config, - ) - .await? - } else { - Socks5Stream::connect(socks_url_str, host, port, config).await? - }; - - get_reply(&mut stream, data_str).await + match upgrade { + None => get_reply(&mut stream?, data_str).await, + Some(u) if u == "tls" => { + let mut stream = transport.upgrade_dialer(stream?)?.await?; + get_reply(&mut stream, data_str).await + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } } + // TODO + // "unix" => { + // let mut stream = Async::::connect(uri.path()).await?; + // get_reply(&mut stream, data_str).await + // } + TransportName::Tor(upgrade) => { + let socks5_url = Url::parse( + &env::var("DARKFI_TOR_SOCKS5_URL").unwrap_or("socks5://127.0.0.1:9050".to_string()), + )?; + + let transport = TorTransport::new(socks5_url, None)?; + + let stream = transport.clone().dial(uri.clone()); + + if let Err(err) = stream { + error!("TOR Setup failed: {}", err); + return Err(Error::ConnectFailed) + } + + let stream = stream?.await; + + if let Err(err) = stream { + error!("TOR Connection failed: {}", err); + return Err(Error::ConnectFailed) + } + match upgrade { + None => get_reply(&mut stream?, data_str).await, + Some(u) if u == "tls" => { + let mut stream = transport.upgrade_dialer(stream?)?.await?; + get_reply(&mut stream, data_str).await + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } + } + _ => unimplemented!(), } } -async fn get_reply( - stream: &mut T, - data_str: String, -) -> Result { +async fn get_reply(stream: &mut T, data_str: String) -> Result { // If we don't get a reply after 30 seconds, we'll fail. let read_timeout = Duration::from_secs(30);