diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index 1e60222af..4132402da 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -34,11 +34,9 @@ use darkfi::{ ErrorCode::{InvalidParams, MethodNotFound}, JsonError, JsonRequest, JsonResponse, JsonResult, }, - server::{listen_and_serve, NotifyHandler, RequestHandler}, + server::{listen_and_serve, RequestHandler}, }, - system::{Subscriber, SubscriberPtr}, util::{ - async_util::sleep, file::{load_file, save_file}, path::{expand_path, get_config_path}, }, @@ -259,28 +257,6 @@ fn save_hosts(path: &Path, spawns: FxHashMap>) { } } -// Auxillary function generating all configured JSON-RPC notification handlers for Lilith -async fn setup_dummy_notification_handlers(mut accept_url: Url, ex: Arc>) { - info!("Generating JSON-RPC notification handlers"); - - // Creating a strings notification handler listening on accept_url:18999 - if accept_url.set_port(Some(18999)).is_err() { - error!("Error while setting port of notification url."); - return - } - let subscriber: SubscriberPtr = Subscriber::new(); - let notify_handler = NotifyHandler::new(subscriber.clone()).await; - // Handler starts listening for connections - ex.spawn(listen_and_serve(accept_url, notify_handler)).detach(); - - // Notifications simulation - let message = String::from("INSPIRACIJA"); - loop { - subscriber.notify(message.clone()).await; - sleep(10).await; - } -} - async_daemonize!(realmain); async fn realmain(args: Args, ex: Arc>) -> Result<()> { // We use this handler to block this function after detaching all @@ -332,11 +308,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // JSON-RPC server info!("Starting JSON-RPC server"); - ex.spawn(listen_and_serve(args.rpc_listen.clone(), lilith.clone())).detach(); - - // JSON-RPC notifications - let _ex = ex.clone(); - ex.spawn(setup_dummy_notification_handlers(args.rpc_listen, _ex)).detach(); + ex.spawn(listen_and_serve(args.rpc_listen, lilith.clone())).detach(); // Wait for SIGINT shutdown.recv().await?; diff --git a/src/rpc/server.rs b/src/rpc/server.rs index 0fc76a708..24c699661 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -21,7 +21,6 @@ use async_std::sync::Arc; use async_trait::async_trait; use futures::{AsyncReadExt, AsyncWriteExt}; use log::{debug, error, info, warn}; -use serde::Serialize; use url::Url; use super::jsonrpc::{JsonRequest, JsonResult}; @@ -30,22 +29,9 @@ use crate::{ TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream, UnixTransport, }, - system::SubscriberPtr, Error, Result, }; -/// Asynchronous trait implementing an internal accept function -/// that runs inside a loop for accepting incoming JSON-RPC requests -/// and passing them to the handler trait. -#[async_trait] -pub trait AcceptTrait { - async fn accept( - self: Arc, - mut stream: Box, - peer_addr: Url, - ) -> Result<()>; -} - /// Asynchronous trait implementing a handler for incoming JSON-RPC requests. /// Can be used by matching on methods and branching out to functions that /// handle respective methods. @@ -54,107 +40,65 @@ pub trait RequestHandler: Sync + Send { async fn handle_request(&self, req: JsonRequest) -> JsonResult; } -#[async_trait] -impl AcceptTrait for T { - async fn accept( - self: Arc, - mut stream: Box, - peer_addr: Url, - ) -> Result<()> { - loop { - // FIXME: Nasty size. 8M - let mut buf = vec![0; 1024 * 8192]; +/// Internal accept function that runs inside a loop for accepting incoming +/// JSON-RPC requests and passing them to the [`RequestHandler`]. +async fn accept( + mut stream: Box, + peer_addr: Url, + rh: Arc, +) -> Result<()> { + loop { + // FIXME: Nasty size. 8M + let mut buf = vec![0; 1024 * 8192]; - let n = match stream.read(&mut buf).await { - Ok(n) if n == 0 => { - debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); - break - } - Ok(n) => n, - Err(e) => { - error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e); - debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); - break - } - }; - - let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { - Ok(r) => { - debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf)); - r - } - Err(e) => { - warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e); - debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); - break - } - }; - - let reply = self.handle_request(r).await; - let j = serde_json::to_string(&reply).unwrap(); - debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j); - - if let Err(e) = stream.write_all(j.as_bytes()).await { - error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e); + let n = match stream.read(&mut buf).await { + Ok(n) if n == 0 => { debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } - } - - Ok(()) - } -} - -/// Wrapper struct to notify an incoming connection about subscription items. -pub struct NotifyHandler { - subscriber: SubscriberPtr, -} - -impl NotifyHandler { - pub async fn new(subscriber: SubscriberPtr) -> Arc { - Arc::new(Self { subscriber }) - } -} - -#[async_trait] -impl AcceptTrait for NotifyHandler { - async fn accept( - self: Arc, - mut stream: Box, - peer_addr: Url, - ) -> Result<()> { - let subscription = self.subscriber.clone().subscribe().await; - - loop { - // Listen subscription for notifications - let notification = subscription.receive().await; - - // Push notification - let j = serde_json::to_string(¬ification).unwrap(); - debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j); - - if let Err(e) = stream.write_all(j.as_bytes()).await { - debug!(target: "jsonrpc-server", "JSON-RPC server failed writing to {} socket: {}", peer_addr, e); + Ok(n) => n, + Err(e) => { + error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e); debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); break } + }; + + let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { + Ok(r) => { + debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf)); + r + } + Err(e) => { + warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); + break + } + }; + + let reply = rh.handle_request(r).await; + let j = serde_json::to_string(&reply).unwrap(); + debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j); + + if let Err(e) = stream.write_all(j.as_bytes()).await { + error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e); + debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr); + break } - - subscription.unsubscribe().await; - - Ok(()) } + + Ok(()) } /// Wrapper function around [`accept()`] to take the incoming connection and /// pass it forward. async fn run_accept_loop( listener: Box, - handler: Arc, + rh: Arc, ) -> Result<()> { while let Ok((stream, peer_addr)) = listener.next().await { info!("JSON-RPC server accepted connection from {}", peer_addr); - handler.clone().accept(stream, peer_addr).await?; + accept(stream, peer_addr, rh.clone()).await?; } Ok(()) @@ -164,7 +108,7 @@ async fn run_accept_loop( /// [`RequestHandler`] to handle incoming requests. pub async fn listen_and_serve( accept_url: Url, - handler: Arc, + rh: Arc, ) -> Result<()> { debug!(target: "jsonrpc-server", "Trying to bind listener on {}", accept_url); @@ -185,12 +129,12 @@ pub async fn listen_and_serve( match $upgrade { None => { info!("JSON-RPC listener bound to {}", accept_url); - run_accept_loop(Box::new(listener), handler).await?; + run_accept_loop(Box::new(listener), rh).await?; } Some(u) if u == "tls" => { let tls_listener = $transport.upgrade_listener(listener)?.await?; info!("JSON-RPC listener bound to {}", accept_url); - run_accept_loop(Box::new(tls_listener), handler).await?; + run_accept_loop(Box::new(tls_listener), rh).await?; } Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), } @@ -223,7 +167,7 @@ pub async fn listen_and_serve( error!("JSON-RPC Unix socket bind to {} failed: {}", accept_url, err); return Err(Error::BindFailed(accept_url.as_str().into())) } - run_accept_loop(Box::new(listener?), handler).await?; + run_accept_loop(Box::new(listener?), rh).await?; } _ => unimplemented!(), }