From 1d6e92bf55fb6928fdc7b9ceecc44e2ce8ea2cd7 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 13 May 2022 05:08:11 +0300 Subject: [PATCH] rpc: create function to open channels to feed rpc connection and keep the stream alive --- src/rpc/jsonrpc.rs | 101 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/src/rpc/jsonrpc.rs b/src/rpc/jsonrpc.rs index 53a624414..018672011 100644 --- a/src/rpc/jsonrpc.rs +++ b/src/rpc/jsonrpc.rs @@ -1,5 +1,7 @@ +use async_std::sync::Arc; use std::{env, str, time::Duration}; +use async_executor::Executor; use async_std::io::timeout; use futures::{AsyncReadExt, AsyncWriteExt}; use log::error; @@ -159,6 +161,105 @@ pub fn notification(m: Value, p: Value) -> JsonNotification { JsonNotification { jsonrpc: json!("2.0"), method: m, params: p } } +async fn reqrep_loop( + mut stream: T, + data_receiver: async_channel::Receiver, + result_sender: async_channel::Sender, +) -> Result<()> { + // If we don't get a reply after 30 seconds, we'll fail. + let read_timeout = Duration::from_secs(30); + + loop { + let mut buf = [0; 8192]; + + let data = data_receiver.recv().await?; + let data_str = serde_json::to_string(&data)?; + + stream.write_all(data_str.as_bytes()).await?; + + let bytes_read = timeout(read_timeout, async { stream.read(&mut buf[..]).await }).await?; + + let reply: JsonResult = serde_json::from_slice(&buf[0..bytes_read])?; + + result_sender.send(reply).await?; + } +} + +pub async fn open_channels( + uri: &Url, + executor: Arc>, +) -> Result<(async_channel::Sender, async_channel::Receiver)> { + let (data_sender, data_receiver) = async_channel::unbounded(); + let (result_sender, result_receiver) = async_channel::unbounded(); + + let transport_name = TransportName::try_from(uri.clone())?; + + macro_rules! hanlde_stream { + ($stream:expr, $transport:expr, $upgrade:expr) => { + if let Err(err) = $stream { + error!("RPC Setup for {} failed: {}", uri, err); + return Err(Error::ConnectFailed) + } + + let stream = $stream?.await; + + if let Err(err) = stream { + error!("RPC Connection to {} failed: {}", uri, err); + return Err(Error::ConnectFailed) + } + + let stream = stream?; + + match $upgrade { + None => { + executor.spawn(reqrep_loop(stream, data_receiver, result_sender)).detach(); + } + Some(u) if u == "tls" => { + let stream = $transport.upgrade_dialer(stream)?.await?; + executor.spawn(reqrep_loop(stream, data_receiver, result_sender)).detach(); + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } + }; + } + + match transport_name { + TransportName::Tcp(upgrade) => { + let transport = TcpTransport::new(None, 1024); + let stream = transport.dial(uri.clone()); + + hanlde_stream!(stream, transport, upgrade); + } + TransportName::Tor(upgrade) => { + let socks5_url = Url::parse( + &env::var("DARKFI_TOR_SOCKS5_URL") + .unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()), + )?; + + let transport = TorTransport::new(socks5_url, None)?; + + let stream = transport.clone().dial(uri.clone()); + + hanlde_stream!(stream, transport, upgrade); + } + TransportName::Unix => { + let transport = UnixTransport::new(); + + let stream = transport.dial(uri.clone()).await; + + if let Err(err) = stream { + error!("RPC Connection to {} failed: {}", uri, err); + return Err(Error::ConnectFailed) + } + + executor.spawn(reqrep_loop(stream?, data_receiver, result_sender)).detach(); + } + _ => unimplemented!(), + } + + Ok((data_sender, result_receiver)) +} + pub async fn send_request(uri: &Url, data: Value) -> Result { let data_str = serde_json::to_string(&data)?;