Revert "rpc: implemented notify handler"

This reverts commit 5325268d14.
This commit is contained in:
aggstam
2022-11-23 13:14:45 +02:00
parent 60f028e3ac
commit 5f32bd58b4
2 changed files with 47 additions and 131 deletions

View File

@@ -34,11 +34,9 @@ use darkfi::{
ErrorCode::{InvalidParams, MethodNotFound},
JsonError, JsonRequest, JsonResponse, JsonResult,
},
server::{listen_and_serve, NotifyHandler, RequestHandler},
server::{listen_and_serve, RequestHandler},
},
system::{Subscriber, SubscriberPtr},
util::{
async_util::sleep,
file::{load_file, save_file},
path::{expand_path, get_config_path},
},
@@ -259,28 +257,6 @@ fn save_hosts(path: &Path, spawns: FxHashMap<String, Vec<String>>) {
}
}
// Auxillary function generating all configured JSON-RPC notification handlers for Lilith
async fn setup_dummy_notification_handlers(mut accept_url: Url, ex: Arc<smol::Executor<'_>>) {
info!("Generating JSON-RPC notification handlers");
// Creating a strings notification handler listening on accept_url:18999
if accept_url.set_port(Some(18999)).is_err() {
error!("Error while setting port of notification url.");
return
}
let subscriber: SubscriberPtr<String> = Subscriber::new();
let notify_handler = NotifyHandler::new(subscriber.clone()).await;
// Handler starts listening for connections
ex.spawn(listen_and_serve(accept_url, notify_handler)).detach();
// Notifications simulation
let message = String::from("INSPIRACIJA");
loop {
subscriber.notify(message.clone()).await;
sleep(10).await;
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
// We use this handler to block this function after detaching all
@@ -332,11 +308,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen.clone(), lilith.clone())).detach();
// JSON-RPC notifications
let _ex = ex.clone();
ex.spawn(setup_dummy_notification_handlers(args.rpc_listen, _ex)).detach();
ex.spawn(listen_and_serve(args.rpc_listen, lilith.clone())).detach();
// Wait for SIGINT
shutdown.recv().await?;

View File

@@ -21,7 +21,6 @@ use async_std::sync::Arc;
use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncWriteExt};
use log::{debug, error, info, warn};
use serde::Serialize;
use url::Url;
use super::jsonrpc::{JsonRequest, JsonResult};
@@ -30,22 +29,9 @@ use crate::{
TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream,
UnixTransport,
},
system::SubscriberPtr,
Error, Result,
};
/// Asynchronous trait implementing an internal accept function
/// that runs inside a loop for accepting incoming JSON-RPC requests
/// and passing them to the handler trait.
#[async_trait]
pub trait AcceptTrait {
async fn accept(
self: Arc<Self>,
mut stream: Box<dyn TransportStream>,
peer_addr: Url,
) -> Result<()>;
}
/// Asynchronous trait implementing a handler for incoming JSON-RPC requests.
/// Can be used by matching on methods and branching out to functions that
/// handle respective methods.
@@ -54,107 +40,65 @@ pub trait RequestHandler: Sync + Send {
async fn handle_request(&self, req: JsonRequest) -> JsonResult;
}
#[async_trait]
impl<T: RequestHandler> AcceptTrait for T {
async fn accept(
self: Arc<Self>,
mut stream: Box<dyn TransportStream>,
peer_addr: Url,
) -> Result<()> {
loop {
// FIXME: Nasty size. 8M
let mut buf = vec![0; 1024 * 8192];
/// Internal accept function that runs inside a loop for accepting incoming
/// JSON-RPC requests and passing them to the [`RequestHandler`].
async fn accept(
mut stream: Box<dyn TransportStream>,
peer_addr: Url,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
loop {
// FIXME: Nasty size. 8M
let mut buf = vec![0; 1024 * 8192];
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => {
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
Ok(n) => n,
Err(e) => {
error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) {
Ok(r) => {
debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf));
r
}
Err(e) => {
warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let reply = self.handle_request(r).await;
let j = serde_json::to_string(&reply).unwrap();
debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j);
if let Err(e) = stream.write_all(j.as_bytes()).await {
error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e);
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => {
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
}
Ok(())
}
}
/// Wrapper struct to notify an incoming connection about subscription items.
pub struct NotifyHandler<T> {
subscriber: SubscriberPtr<T>,
}
impl<T: Clone + Serialize> NotifyHandler<T> {
pub async fn new(subscriber: SubscriberPtr<T>) -> Arc<Self> {
Arc::new(Self { subscriber })
}
}
#[async_trait]
impl<T: Clone + Send + Serialize> AcceptTrait for NotifyHandler<T> {
async fn accept(
self: Arc<Self>,
mut stream: Box<dyn TransportStream>,
peer_addr: Url,
) -> Result<()> {
let subscription = self.subscriber.clone().subscribe().await;
loop {
// Listen subscription for notifications
let notification = subscription.receive().await;
// Push notification
let j = serde_json::to_string(&notification).unwrap();
debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j);
if let Err(e) = stream.write_all(j.as_bytes()).await {
debug!(target: "jsonrpc-server", "JSON-RPC server failed writing to {} socket: {}", peer_addr, e);
Ok(n) => n,
Err(e) => {
error!("JSON-RPC server failed reading from {} socket: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) {
Ok(r) => {
debug!(target: "jsonrpc-server", "{} --> {}", peer_addr, String::from_utf8_lossy(&buf));
r
}
Err(e) => {
warn!("JSON-RPC server received invalid JSON from {}: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
};
let reply = rh.handle_request(r).await;
let j = serde_json::to_string(&reply).unwrap();
debug!(target: "jsonrpc-server", "{} <-- {}", peer_addr, j);
if let Err(e) = stream.write_all(j.as_bytes()).await {
error!("JSON-RPC server failed writing to {} socket: {}", peer_addr, e);
debug!(target: "jsonrpc-server", "Closed connection for {}", peer_addr);
break
}
subscription.unsubscribe().await;
Ok(())
}
Ok(())
}
/// Wrapper function around [`accept()`] to take the incoming connection and
/// pass it forward.
async fn run_accept_loop(
listener: Box<dyn TransportListener>,
handler: Arc<impl AcceptTrait + 'static>,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
while let Ok((stream, peer_addr)) = listener.next().await {
info!("JSON-RPC server accepted connection from {}", peer_addr);
handler.clone().accept(stream, peer_addr).await?;
accept(stream, peer_addr, rh.clone()).await?;
}
Ok(())
@@ -164,7 +108,7 @@ async fn run_accept_loop(
/// [`RequestHandler`] to handle incoming requests.
pub async fn listen_and_serve(
accept_url: Url,
handler: Arc<impl AcceptTrait + 'static>,
rh: Arc<impl RequestHandler + 'static>,
) -> Result<()> {
debug!(target: "jsonrpc-server", "Trying to bind listener on {}", accept_url);
@@ -185,12 +129,12 @@ pub async fn listen_and_serve(
match $upgrade {
None => {
info!("JSON-RPC listener bound to {}", accept_url);
run_accept_loop(Box::new(listener), handler).await?;
run_accept_loop(Box::new(listener), rh).await?;
}
Some(u) if u == "tls" => {
let tls_listener = $transport.upgrade_listener(listener)?.await?;
info!("JSON-RPC listener bound to {}", accept_url);
run_accept_loop(Box::new(tls_listener), handler).await?;
run_accept_loop(Box::new(tls_listener), rh).await?;
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
}
@@ -223,7 +167,7 @@ pub async fn listen_and_serve(
error!("JSON-RPC Unix socket bind to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
run_accept_loop(Box::new(listener?), handler).await?;
run_accept_loop(Box::new(listener?), rh).await?;
}
_ => unimplemented!(),
}