From 82ee1a9331db844ff6262f371c7ebc1ea9112cd3 Mon Sep 17 00:00:00 2001 From: parazyd Date: Sun, 22 May 2022 18:58:41 +0200 Subject: [PATCH] rpc: Clean up and document the server implementation. --- bin/daod/src/main.rs | 2 +- bin/darkfid/src/main.rs | 2 +- bin/faucetd/src/main.rs | 2 +- bin/ircd/src/main.rs | 2 +- bin/ircd/src/rpc.rs | 2 +- bin/tau/taud/src/jsonrpc.rs | 2 +- bin/tau/taud/src/main.rs | 2 +- src/net/transport/tor.rs | 25 ++++++ src/rpc/mod.rs | 2 +- src/rpc/{rpcserver.rs => server.rs} | 126 ++++++++++++---------------- 10 files changed, 85 insertions(+), 82 deletions(-) rename src/rpc/{rpcserver.rs => server.rs} (55%) diff --git a/bin/daod/src/main.rs b/bin/daod/src/main.rs index 95036db75..1d89fcc38 100644 --- a/bin/daod/src/main.rs +++ b/bin/daod/src/main.rs @@ -9,7 +9,7 @@ use url::Url; use darkfi::{ rpc::{ jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode::*, JsonRequest, JsonResult}, - rpcserver::{listen_and_serve, RequestHandler}, + server::{listen_and_serve, RequestHandler}, }, Result, }; diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 589d04db8..654bc3108 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -34,7 +34,7 @@ use darkfi::{ ErrorCode::{InvalidParams, MethodNotFound}, JsonRequest, JsonResult, }, - rpcserver::{listen_and_serve, RequestHandler}, + server::{listen_and_serve, RequestHandler}, }, util::{ cli::{log_config, spawn_config}, diff --git a/bin/faucetd/src/main.rs b/bin/faucetd/src/main.rs index 05854d016..3d4f61423 100644 --- a/bin/faucetd/src/main.rs +++ b/bin/faucetd/src/main.rs @@ -33,7 +33,7 @@ use darkfi::{ ErrorCode::{InternalError, InvalidParams, MethodNotFound}, JsonRequest, JsonResult, }, - rpcserver::{listen_and_serve, RequestHandler}, + server::{listen_and_serve, RequestHandler}, }, util::{ cli::{log_config, spawn_config}, diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index fec4c6e41..1440f97eb 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -19,7 +19,7 @@ use url::Url; use darkfi::{ async_daemonize, net, raft::{NetMsg, ProtocolRaft, Raft}, - rpc::rpcserver::listen_and_serve, + rpc::server::listen_and_serve, util::{ cli::{log_config, spawn_config}, path::{expand_path, get_config_path}, diff --git a/bin/ircd/src/rpc.rs b/bin/ircd/src/rpc.rs index 8fdf5ec8b..44a5dc555 100644 --- a/bin/ircd/src/rpc.rs +++ b/bin/ircd/src/rpc.rs @@ -8,7 +8,7 @@ use darkfi::{ rpc::{ jsonrpc, jsonrpc::{ErrorCode, JsonRequest, JsonResult}, - rpcserver::RequestHandler, + server::RequestHandler, }, }; diff --git a/bin/tau/taud/src/jsonrpc.rs b/bin/tau/taud/src/jsonrpc.rs index eae703bc2..a1d0f585b 100644 --- a/bin/tau/taud/src/jsonrpc.rs +++ b/bin/tau/taud/src/jsonrpc.rs @@ -8,7 +8,7 @@ use serde_json::{json, Value}; use darkfi::{ rpc::{ jsonrpc::{error as jsonerr, ErrorCode, JsonRequest, JsonResult}, - rpcserver::RequestHandler, + server::RequestHandler, }, util::Timestamp, Error, diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 58cdc7379..bb71170b5 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -14,7 +14,7 @@ use url::Url; use darkfi::{ async_daemonize, net, raft::{NetMsg, ProtocolRaft, Raft}, - rpc::rpcserver::listen_and_serve, + rpc::server::listen_and_serve, util::{ cli::{log_config, spawn_config}, expand_path, diff --git a/src/net/transport/tor.rs b/src/net/transport/tor.rs index e699a2758..064f95012 100644 --- a/src/net/transport/tor.rs +++ b/src/net/transport/tor.rs @@ -156,6 +156,31 @@ impl TorTransport { } } + /// Query the environment for Tor variables, or fallback to defaults + pub fn get_env() -> Result<(Url, Url, String)> { + let socks5_url = Url::parse( + &std::env::var("DARKFI_TOR_SOCKS5_URL") + .unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()), + )?; + + let torc_url = Url::parse( + &std::env::var("DARKFI_TOR_CONTROL_URL") + .unwrap_or_else(|_| "tcp://127.0.0.1:9051".to_string()), + )?; + + let auth_cookie = std::env::var("DARKFI_TOR_COOKIE"); + if auth_cookie.is_err() { + return Err(Error::TorError( + "Please set the env var DARKFI_TOR_COOKIE to the configured Tor cookie file.\n\ + For example:\n\ + \'export DARKFI_TOR_COOKIE\"/var/lib/tor/control_auth_cookie\"\'" + .to_string(), + )) + } + + Ok((socks5_url, torc_url, auth_cookie.unwrap())) + } + /// Creates an ephemeral hidden service pointing to local address, returns onion address /// when successful. /// diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d559839c2..3c153a54b 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,4 +1,4 @@ pub mod jsonrpc; pub mod rpcclient; -pub mod rpcserver; +pub mod server; pub mod websockets; diff --git a/src/rpc/rpcserver.rs b/src/rpc/server.rs similarity index 55% rename from src/rpc/rpcserver.rs rename to src/rpc/server.rs index 25c7a0582..6dca85819 100644 --- a/src/rpc/rpcserver.rs +++ b/src/rpc/server.rs @@ -1,77 +1,70 @@ -use async_std::{ - io::{ReadExt, WriteExt}, - sync::Arc, -}; -use std::{env, fs}; - +//! JSON-RPC server-side implementation. +use async_std::sync::Arc; use async_trait::async_trait; -use log::{debug, error, info}; +use futures::{AsyncReadExt, AsyncWriteExt}; +use log::{debug, error, info, warn}; use url::Url; use super::jsonrpc::{JsonRequest, JsonResult}; use crate::{ - net::transport::{ - TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream, - UnixTransport, + net::{ + transport::Transport, TcpTransport, TorTransport, TransportListener, TransportName, + TransportStream, UnixTransport, }, Error, Result, }; +/// Asynchronous trait implementing a handler for incoming JSON-RPC requests. +/// Can be used by matching on methods and branching out to functions that +/// handle respective methods. #[async_trait] pub trait RequestHandler: Sync + Send { async fn handle_request(&self, req: JsonRequest) -> JsonResult; } -async fn run_accept_loop( - listener: Box, - rh: Arc, -) -> Result<()> { - while let Ok((stream, peer_addr)) = listener.next().await { - info!(target: "JSON-RPC SERVER", "RPC Accepted connection {}", peer_addr); - accept(stream, rh.clone()).await?; - } - Ok(()) -} - +/// Internal accept function that runs inside a loop for accepting incoming +/// JSON-RPC requests and passing them to the [`RequestHandler`]. async fn accept( mut stream: Box, + peer_addr: Url, rh: Arc, ) -> Result<()> { - let mut buf = vec![0; 8192]; - loop { + // Nasty size + let mut buf = vec![0; 2048 * 10]; + let n = match stream.read(&mut buf).await { Ok(n) if n == 0 => { - info!(target: "JSON-RPC SERVER", "Closed connection"); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } Ok(n) => n, Err(e) => { - error!(target: "JSON-RPC SERVER", "Failed reading from socket: {}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); + error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } }; let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { Ok(r) => { - debug!(target: "JSON-RPC SERVER", "--> {}", String::from_utf8_lossy(&buf)); + debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf)); r } Err(e) => { - error!(target: "JSON-RPC SERVER", "Received invalid JSON: {:?}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); + warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } }; let reply = rh.handle_request(r).await; - let j = serde_json::to_string(&reply)?; - debug!(target: "JSON-RPC SERVER", "<-- {}", j); + let j = serde_json::to_string(&reply).unwrap(); + debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j); if let Err(e) = stream.write_all(j.as_bytes()).await { - error!(target: "JSON-RPC SERVER", "Failed writing to socket: {}", e); - info!(target: "JSON-RPC SERVER", "Closed connection"); + error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } } @@ -79,36 +72,50 @@ async fn accept( Ok(()) } +/// Wrapper function around [`accept`] to take the incoming connection and +/// pass it forward. +async fn run_accept_loop( + listener: Box, + rh: Arc, +) -> Result<()> { + while let Ok((stream, peer_addr)) = listener.next().await { + info!("JSON-RPC server accepted connection from {}", peer_addr); + accept(stream, peer_addr, rh.clone()).await?; + } + + Ok(()) +} + +/// Start a JSON-RPC server bound to the given accept URL and use the given +/// [`RequestHandler`] to handle incoming requests. pub async fn listen_and_serve( accept_url: Url, rh: Arc, ) -> Result<()> { - debug!(target: "JSON-RPC SERVER", "Trying to start listener on {}", accept_url); + debug!(target: "jsonrpc-server", "Trying to bind listener on {}", accept_url); macro_rules! accept { ($listener:expr, $transport:expr, $upgrade:expr) => {{ if let Err(err) = $listener { - error!("Setup for {} failed: {}", accept_url, err); + error!("JSON-RPC server setup for {} failed: {}", accept_url, err); return Err(Error::BindFailed(accept_url.as_str().into())) } let listener = $listener?.await; - if let Err(err) = listener { - error!("Bind listener to {} failed: {}", accept_url, err); + error!("JSON-RPC listener bind to {} failed: {}", accept_url, err); return Err(Error::BindFailed(accept_url.as_str().into())) } let listener = listener?; - match $upgrade { None => { - info!("RPC listening to: {}", accept_url); + info!("JSON-RPC listener bound to {}", accept_url); run_accept_loop(Box::new(listener), rh).await?; } Some(u) if u == "tls" => { - info!("RPC listening to: {}", accept_url); let tls_listener = $transport.upgrade_listener(listener)?.await?; + info!("JSON-RPC listener bound to {}", accept_url); run_accept_loop(Box::new(tls_listener), rh).await?; } Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), @@ -124,53 +131,24 @@ pub async fn listen_and_serve( accept!(listener, transport, upgrade); } TransportName::Tor(upgrade) => { - let socks5_url = Url::parse( - &env::var("DARKFI_TOR_SOCKS5_URL") - .unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()), - )?; - - let torc_url = Url::parse( - &env::var("DARKFI_TOR_CONTROL_URL") - .unwrap_or_else(|_| "tcp://127.0.0.1:9051".to_string()), - )?; - - let auth_cookie = env::var("DARKFI_TOR_COOKIE"); - - if auth_cookie.is_err() { - return Err(Error::TorError( - "Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \ - For example: \ - \'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'" - .to_string(), - )) - } - - let auth_cookie = auth_cookie.unwrap(); - - let auth_cookie = hex::encode(&fs::read(auth_cookie).unwrap()); - + let (socks5_url, torc_url, auth_cookie) = TorTransport::get_env()?; + let auth_cookie = hex::encode(&std::fs::read(auth_cookie).unwrap()); let transport = TorTransport::new(socks5_url, Some((torc_url, auth_cookie)))?; - // generate EHS pointing to local address + // Generate EHS pointing to local address let hurl = transport.create_ehs(accept_url.clone())?; - - info!("EHS TOR: {}", hurl.to_string()); + info!("Created ephemeral hidden service: {}", hurl.to_string()); let listener = transport.clone().listen_on(accept_url.clone()); - accept!(listener, transport, upgrade); } - TransportName::Unix => { let transport = UnixTransport::new(); - let listener = transport.listen(accept_url.clone()).await; - if let Err(err) = listener { - error!("RPC Bind listener to {} failed: {}", accept_url, err); + error!("JSON-RPC Unix socket bind to {} failed: {}", accept_url, err); return Err(Error::BindFailed(accept_url.as_str().into())) } - run_accept_loop(Box::new(listener?), rh).await?; } _ => unimplemented!(),