rpc: Clean up and document the server implementation.

This commit is contained in:
parazyd
2022-05-22 18:58:41 +02:00
parent 0beab27e31
commit 82ee1a9331
10 changed files with 85 additions and 82 deletions

View File

@@ -9,7 +9,7 @@ use url::Url;
use darkfi::{
rpc::{
jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode::*, JsonRequest, JsonResult},
rpcserver::{listen_and_serve, RequestHandler},
server::{listen_and_serve, RequestHandler},
},
Result,
};

View File

@@ -34,7 +34,7 @@ use darkfi::{
ErrorCode::{InvalidParams, MethodNotFound},
JsonRequest, JsonResult,
},
rpcserver::{listen_and_serve, RequestHandler},
server::{listen_and_serve, RequestHandler},
},
util::{
cli::{log_config, spawn_config},

View File

@@ -33,7 +33,7 @@ use darkfi::{
ErrorCode::{InternalError, InvalidParams, MethodNotFound},
JsonRequest, JsonResult,
},
rpcserver::{listen_and_serve, RequestHandler},
server::{listen_and_serve, RequestHandler},
},
util::{
cli::{log_config, spawn_config},

View File

@@ -19,7 +19,7 @@ use url::Url;
use darkfi::{
async_daemonize, net,
raft::{NetMsg, ProtocolRaft, Raft},
rpc::rpcserver::listen_and_serve,
rpc::server::listen_and_serve,
util::{
cli::{log_config, spawn_config},
path::{expand_path, get_config_path},

View File

@@ -8,7 +8,7 @@ use darkfi::{
rpc::{
jsonrpc,
jsonrpc::{ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
server::RequestHandler,
},
};

View File

@@ -8,7 +8,7 @@ use serde_json::{json, Value};
use darkfi::{
rpc::{
jsonrpc::{error as jsonerr, ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
server::RequestHandler,
},
util::Timestamp,
Error,

View File

@@ -14,7 +14,7 @@ use url::Url;
use darkfi::{
async_daemonize, net,
raft::{NetMsg, ProtocolRaft, Raft},
rpc::rpcserver::listen_and_serve,
rpc::server::listen_and_serve,
util::{
cli::{log_config, spawn_config},
expand_path,

View File

@@ -156,6 +156,31 @@ impl TorTransport {
}
}
/// Query the environment for Tor variables, or fallback to defaults
pub fn get_env() -> Result<(Url, Url, String)> {
let socks5_url = Url::parse(
&std::env::var("DARKFI_TOR_SOCKS5_URL")
.unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()),
)?;
let torc_url = Url::parse(
&std::env::var("DARKFI_TOR_CONTROL_URL")
.unwrap_or_else(|_| "tcp://127.0.0.1:9051".to_string()),
)?;
let auth_cookie = std::env::var("DARKFI_TOR_COOKIE");
if auth_cookie.is_err() {
return Err(Error::TorError(
"Please set the env var DARKFI_TOR_COOKIE to the configured Tor cookie file.\n\
For example:\n\
\'export DARKFI_TOR_COOKIE\"/var/lib/tor/control_auth_cookie\"\'"
.to_string(),
))
}
Ok((socks5_url, torc_url, auth_cookie.unwrap()))
}
/// Creates an ephemeral hidden service pointing to local address, returns onion address
/// when successful.
///

View File

@@ -1,4 +1,4 @@
pub mod jsonrpc;
pub mod rpcclient;
pub mod rpcserver;
pub mod server;
pub mod websockets;

View File

@@ -1,77 +1,70 @@
use async_std::{
io::{ReadExt, WriteExt},
sync::Arc,
};
use std::{env, fs};
//! JSON-RPC server-side implementation.
use async_std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, info};
use futures::{AsyncReadExt, AsyncWriteExt};
use log::{debug, error, info, warn};
use url::Url;
use super::jsonrpc::{JsonRequest, JsonResult};
use crate::{
net::transport::{
TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream,
UnixTransport,
net::{
transport::Transport, TcpTransport, TorTransport, TransportListener, TransportName,
TransportStream, UnixTransport,
},
Error, Result,
};
/// Asynchronous trait implementing a handler for incoming JSON-RPC requests.
/// Can be used by matching on methods and branching out to functions that
/// handle respective methods.
#[async_trait]
pub trait RequestHandler: Sync + Send {
async fn handle_request(&self, req: JsonRequest) -> JsonResult;
}
async fn run_accept_loop(
listener: Box<dyn TransportListener>,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
while let Ok((stream, peer_addr)) = listener.next().await {
info!(target: "JSON-RPC SERVER", "RPC Accepted connection {}", peer_addr);
accept(stream, rh.clone()).await?;
}
Ok(())
}
/// Internal accept function that runs inside a loop for accepting incoming
/// JSON-RPC requests and passing them to the [`RequestHandler`].
async fn accept(
mut stream: Box<dyn TransportStream>,
peer_addr: Url,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
let mut buf = vec![0; 8192];
loop {
// Nasty size
let mut buf = vec![0; 2048 * 10];
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => {
info!(target: "JSON-RPC SERVER", "Closed connection");
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
Ok(n) => n,
Err(e) => {
error!(target: "JSON-RPC SERVER", "Failed reading from socket: {}", e);
info!(target: "JSON-RPC SERVER", "Closed connection");
error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) {
Ok(r) => {
debug!(target: "JSON-RPC SERVER", "--> {}", String::from_utf8_lossy(&buf));
debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf));
r
}
Err(e) => {
error!(target: "JSON-RPC SERVER", "Received invalid JSON: {:?}", e);
info!(target: "JSON-RPC SERVER", "Closed connection");
warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let reply = rh.handle_request(r).await;
let j = serde_json::to_string(&reply)?;
debug!(target: "JSON-RPC SERVER", "<-- {}", j);
let j = serde_json::to_string(&reply).unwrap();
debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j);
if let Err(e) = stream.write_all(j.as_bytes()).await {
error!(target: "JSON-RPC SERVER", "Failed writing to socket: {}", e);
info!(target: "JSON-RPC SERVER", "Closed connection");
error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
}
@@ -79,36 +72,50 @@ async fn accept(
Ok(())
}
/// Wrapper function around [`accept`] to take the incoming connection and
/// pass it forward.
async fn run_accept_loop(
listener: Box<dyn TransportListener>,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
while let Ok((stream, peer_addr)) = listener.next().await {
info!("JSON-RPC server accepted connection from {}", peer_addr);
accept(stream, peer_addr, rh.clone()).await?;
}
Ok(())
}
/// Start a JSON-RPC server bound to the given accept URL and use the given
/// [`RequestHandler`] to handle incoming requests.
pub async fn listen_and_serve(
accept_url: Url,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
debug!(target: "JSON-RPC SERVER", "Trying to start listener on {}", accept_url);
debug!(target: "jsonrpc-server", "Trying to bind listener on {}", accept_url);
macro_rules! accept {
($listener:expr, $transport:expr, $upgrade:expr) => {{
if let Err(err) = $listener {
error!("Setup for {} failed: {}", accept_url, err);
error!("JSON-RPC server setup for {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = $listener?.await;
if let Err(err) = listener {
error!("Bind listener to {} failed: {}", accept_url, err);
error!("JSON-RPC listener bind to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?;
match $upgrade {
None => {
info!("RPC listening to: {}", accept_url);
info!("JSON-RPC listener bound to {}", accept_url);
run_accept_loop(Box::new(listener), rh).await?;
}
Some(u) if u == "tls" => {
info!("RPC listening to: {}", accept_url);
let tls_listener = $transport.upgrade_listener(listener)?.await?;
info!("JSON-RPC listener bound to {}", accept_url);
run_accept_loop(Box::new(tls_listener), rh).await?;
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
@@ -124,53 +131,24 @@ pub async fn listen_and_serve(
accept!(listener, transport, upgrade);
}
TransportName::Tor(upgrade) => {
let socks5_url = Url::parse(
&env::var("DARKFI_TOR_SOCKS5_URL")
.unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()),
)?;
let torc_url = Url::parse(
&env::var("DARKFI_TOR_CONTROL_URL")
.unwrap_or_else(|_| "tcp://127.0.0.1:9051".to_string()),
)?;
let auth_cookie = env::var("DARKFI_TOR_COOKIE");
if auth_cookie.is_err() {
return Err(Error::TorError(
"Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \
For example: \
\'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'"
.to_string(),
))
}
let auth_cookie = auth_cookie.unwrap();
let auth_cookie = hex::encode(&fs::read(auth_cookie).unwrap());
let (socks5_url, torc_url, auth_cookie) = TorTransport::get_env()?;
let auth_cookie = hex::encode(&std::fs::read(auth_cookie).unwrap());
let transport = TorTransport::new(socks5_url, Some((torc_url, auth_cookie)))?;
// generate EHS pointing to local address
// Generate EHS pointing to local address
let hurl = transport.create_ehs(accept_url.clone())?;
info!("EHS TOR: {}", hurl.to_string());
info!("Created ephemeral hidden service: {}", hurl.to_string());
let listener = transport.clone().listen_on(accept_url.clone());
accept!(listener, transport, upgrade);
}
TransportName::Unix => {
let transport = UnixTransport::new();
let listener = transport.listen(accept_url.clone()).await;
if let Err(err) = listener {
error!("RPC Bind listener to {} failed: {}", accept_url, err);
error!("JSON-RPC Unix socket bind to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
run_accept_loop(Box::new(listener?), rh).await?;
}
_ => unimplemented!(),