From ffee60228c3632910243bfd247c20fcb18b9a74b Mon Sep 17 00:00:00 2001 From: ghassmo Date: Mon, 9 May 2022 14:09:38 +0300 Subject: [PATCH] rpc: WIP use new transport for rpcserver2 --- src/rpc/jsonrpc.rs | 12 +-- src/rpc/rpcserver2.rs | 242 ++++++++++++++++++++++++++++-------------- 2 files changed, 165 insertions(+), 89 deletions(-) diff --git a/src/rpc/jsonrpc.rs b/src/rpc/jsonrpc.rs index ed36e7212..7ff960822 100644 --- a/src/rpc/jsonrpc.rs +++ b/src/rpc/jsonrpc.rs @@ -159,7 +159,7 @@ pub fn notification(m: Value, p: Value) -> JsonNotification { JsonNotification { jsonrpc: json!("2.0"), method: m, params: p } } -pub async fn send_request(uri: &Url, data: Value, socks_url: Option) -> Result { +pub async fn send_request(uri: &Url, data: Value) -> Result { let data_str = serde_json::to_string(&data)?; let transport_name = TransportName::try_from(uri.clone())?; @@ -170,14 +170,14 @@ pub async fn send_request(uri: &Url, data: Value, socks_url: Option) -> Res let stream = transport.dial(uri.clone()); if let Err(err) = stream { - error!("TCP Setup failed: {}", err); + error!("RPC TCP Setup failed: {}", err); return Err(Error::ConnectFailed) } let stream = stream?.await; if let Err(err) = stream { - error!("TCP Connection failed: {}", err); + error!("RPC TCP Connection failed: {}", err); return Err(Error::ConnectFailed) } @@ -205,14 +205,14 @@ pub async fn send_request(uri: &Url, data: Value, socks_url: Option) -> Res let stream = transport.clone().dial(uri.clone()); if let Err(err) = stream { - error!("TOR Setup failed: {}", err); + error!("RPC TOR Setup failed: {}", err); return Err(Error::ConnectFailed) } let stream = stream?.await; if let Err(err) = stream { - error!("TOR Connection failed: {}", err); + error!("RPC TOR Connection failed: {}", err); return Err(Error::ConnectFailed) } match upgrade { @@ -224,8 +224,6 @@ pub async fn send_request(uri: &Url, data: Value, socks_url: Option) -> Res Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), } } - - _ => unimplemented!(), } } diff --git a/src/rpc/rpcserver2.rs b/src/rpc/rpcserver2.rs index 1d4bf5376..b88462e52 100644 --- a/src/rpc/rpcserver2.rs +++ b/src/rpc/rpcserver2.rs @@ -1,15 +1,18 @@ use async_std::{ io::{ReadExt, WriteExt}, - stream::StreamExt, sync::Arc, }; +use std::{env, fs}; + use async_trait::async_trait; use log::{debug, error, info}; use url::Url; use super::jsonrpc::{JsonRequest, JsonResult}; use crate::{ - net::transport::{TcpTransport, TlsTransport, Transport}, + net::transport::{ + TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream, + }, Error, Result, }; @@ -18,85 +21,160 @@ pub trait RequestHandler: Sync + Send { async fn handle_request(&self, req: JsonRequest) -> JsonResult; } -pub async fn listen_and_serve(url: Url, rh: Arc) -> Result<()> { - debug!(target: "JSON-RPC SERVER", "Trying to start listener on {}", url); - - macro_rules! handle_stream { - ($stream:expr) => { - let mut buf = vec![0; 8192]; - - loop { - let n = match $stream.read(&mut buf).await { - Ok(n) if n == 0 => { - info!(target: "JSON-RPC SERVER", "Closed connection"); - break; - } - Ok(n) => n, - Err(e) => { - error!(target: "JSON-RPC SERVER", "Failed reading from socket: {}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); - break; - } - }; - - let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { - Ok(r) => { - debug!(target: "JSON-RPC SERVER", "--> {}", String::from_utf8_lossy(&buf)); - r - } - Err(e) => { - error!(target: "JSON-RPC SERVER", "Received invalid JSON: {:?}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); - break; - } - }; - - let reply = rh.handle_request(r).await; - let j = serde_json::to_string(&reply)?; - debug!(target: "JSON-RPC SERVER", "<-- {}", j); - - if let Err(e) = $stream.write_all(j.as_bytes()).await { - error!(target: "JSON-RPC SERVER", "Failed writing to socket: {}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); - break; - } - } - } - } - - match url.scheme() { - "tcp" => { - let transport = TcpTransport::new(None, 1024); - let listener = transport.listen_on(url).unwrap().await.unwrap(); - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - info!(target: "JSON-RPC SERVER", "Accepted TCP connection"); - let mut stream = stream.unwrap(); - handle_stream!(stream); - } - unreachable!() - } - - "tls" => { - let transport = TlsTransport::new(None, 1024); - let (acceptor, listener) = transport.listen_on(url).unwrap().await.unwrap(); - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - info!(target: "JSON-RPC SERVER", "Accepted TLS connection"); - let stream = stream.unwrap(); - let mut stream = acceptor.accept(stream).await.unwrap(); - handle_stream!(stream); - } - unreachable!() - } - - "tor" => { - todo!() - } - - x => { - error!(target: "JSON-RPC SERVER", "Transport protocol '{}' isn't implemented", x); - Err(Error::UnsupportedTransport(x.to_string())) - } +async fn run_accept_loop( + listener: Box, + rh: Arc, +) -> Result<()> { + // TODO can we spawn new task here ? + while let Ok((stream, peer_addr)) = listener.next().await { + info!(target: "JSON-RPC SERVER", "RPC Accepted connection {}", peer_addr); + accept(stream, rh.clone()).await?; } + Ok(()) +} + +async fn accept( + mut stream: Box, + rh: Arc, +) -> Result<()> { + let mut buf = vec![0; 8192]; + + loop { + let n = match stream.read(&mut buf).await { + Ok(n) if n == 0 => { + info!(target: "JSON-RPC SERVER", "Closed connection"); + break + } + Ok(n) => n, + Err(e) => { + error!(target: "JSON-RPC SERVER", "Failed reading from socket: {}", e); + info!(target: "JSON-RPC SERVER", "Closed connection"); + break + } + }; + + let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { + Ok(r) => { + debug!(target: "JSON-RPC SERVER", "--> {}", String::from_utf8_lossy(&buf)); + r + } + Err(e) => { + error!(target: "JSON-RPC SERVER", "Received invalid JSON: {:?}", e); + info!(target: "JSON-RPC SERVER", "Closed connection"); + break + } + }; + + let reply = rh.handle_request(r).await; + let j = serde_json::to_string(&reply)?; + debug!(target: "JSON-RPC SERVER", "<-- {}", j); + + if let Err(e) = stream.write_all(j.as_bytes()).await { + error!(target: "JSON-RPC SERVER", "Failed writing to socket: {}", e); + info!(target: "JSON-RPC SERVER", "Closed connection"); + break + } + } + + Ok(()) +} + +pub async fn listen_and_serve( + accept_url: Url, + rh: Arc, +) -> Result<()> { + debug!(target: "JSON-RPC SERVER", "Trying to start listener on {}", accept_url); + + let transport_name = TransportName::try_from(accept_url.clone())?; + match transport_name { + TransportName::Tcp(upgrade) => { + let transport = TcpTransport::new(None, 1024); + let listener = transport.listen_on(accept_url.clone()); + + if let Err(err) = listener { + error!("TCP Setup failed: {}", err); + return Err(Error::BindFailed(accept_url.clone().to_string())) + } + + let listener = listener?.await; + + if let Err(err) = listener { + error!("TCP Bind listener failed: {}", err); + return Err(Error::BindFailed(accept_url.to_string())) + } + + let listener = listener?; + + match upgrade { + None => { + run_accept_loop(Box::new(listener), rh).await?; + } + Some(u) if u == "tls" => { + let tls_listener = transport.upgrade_listener(listener)?.await?; + run_accept_loop(Box::new(tls_listener), rh).await?; + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } + } + TransportName::Tor(upgrade) => { + let socks5_url = Url::parse( + &env::var("DARKFI_TOR_SOCKS5_URL").unwrap_or("socks5://127.0.0.1:9050".to_string()), + )?; + + let torc_url = Url::parse( + &env::var("DARKFI_TOR_CONTROL_URL").unwrap_or("tcp://127.0.0.1:9051".to_string()), + )?; + + let auth_cookie = env::var("DARKFI_TOR_COOKIE"); + + if auth_cookie.is_err() { + return Err(Error::TorError( + "Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \ + For example: \ + \'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'" + .to_string(), + )) + } + + let auth_cookie = auth_cookie.unwrap(); + + let auth_cookie = hex::encode(&fs::read(auth_cookie).unwrap()); + + let transport = TorTransport::new(socks5_url, Some((torc_url, auth_cookie)))?; + + // generate EHS pointing to local address + let hurl = transport.create_ehs(accept_url.clone())?; + + info!("EHS TOR: {}", hurl.to_string()); + + let listener = transport.clone().listen_on(accept_url.clone()); + + if let Err(err) = listener { + error!("TOR Setup failed: {}", err); + return Err(Error::BindFailed(accept_url.clone().to_string())) + } + + let listener = listener?.await; + + if let Err(err) = listener { + error!("TOR Bind listener failed: {}", err); + return Err(Error::BindFailed(accept_url.to_string())) + } + + let listener = listener?; + + match upgrade { + None => { + run_accept_loop(Box::new(listener), rh).await?; + } + Some(u) if u == "tls" => { + let tls_listener = transport.upgrade_listener(listener)?.await?; + run_accept_loop(Box::new(tls_listener), rh).await?; + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } + } + } + + Ok(()) }