rpc: create function to open channels to feed rpc connection and keep the stream alive

This commit is contained in:
ghassmo
2022-05-13 05:08:11 +03:00
parent a1c54329e2
commit 1d6e92bf55

View File

@@ -1,5 +1,7 @@
use async_std::sync::Arc;
use std::{env, str, time::Duration};
use async_executor::Executor;
use async_std::io::timeout;
use futures::{AsyncReadExt, AsyncWriteExt};
use log::error;
@@ -159,6 +161,105 @@ pub fn notification(m: Value, p: Value) -> JsonNotification {
JsonNotification { jsonrpc: json!("2.0"), method: m, params: p }
}
async fn reqrep_loop<T: TransportStream>(
mut stream: T,
data_receiver: async_channel::Receiver<Value>,
result_sender: async_channel::Sender<JsonResult>,
) -> Result<()> {
// If we don't get a reply after 30 seconds, we'll fail.
let read_timeout = Duration::from_secs(30);
loop {
let mut buf = [0; 8192];
let data = data_receiver.recv().await?;
let data_str = serde_json::to_string(&data)?;
stream.write_all(data_str.as_bytes()).await?;
let bytes_read = timeout(read_timeout, async { stream.read(&mut buf[..]).await }).await?;
let reply: JsonResult = serde_json::from_slice(&buf[0..bytes_read])?;
result_sender.send(reply).await?;
}
}
pub async fn open_channels(
uri: &Url,
executor: Arc<Executor<'_>>,
) -> Result<(async_channel::Sender<Value>, async_channel::Receiver<JsonResult>)> {
let (data_sender, data_receiver) = async_channel::unbounded();
let (result_sender, result_receiver) = async_channel::unbounded();
let transport_name = TransportName::try_from(uri.clone())?;
macro_rules! hanlde_stream {
($stream:expr, $transport:expr, $upgrade:expr) => {
if let Err(err) = $stream {
error!("RPC Setup for {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
let stream = $stream?.await;
if let Err(err) = stream {
error!("RPC Connection to {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
let stream = stream?;
match $upgrade {
None => {
executor.spawn(reqrep_loop(stream, data_receiver, result_sender)).detach();
}
Some(u) if u == "tls" => {
let stream = $transport.upgrade_dialer(stream)?.await?;
executor.spawn(reqrep_loop(stream, data_receiver, result_sender)).detach();
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
}
};
}
match transport_name {
TransportName::Tcp(upgrade) => {
let transport = TcpTransport::new(None, 1024);
let stream = transport.dial(uri.clone());
hanlde_stream!(stream, transport, upgrade);
}
TransportName::Tor(upgrade) => {
let socks5_url = Url::parse(
&env::var("DARKFI_TOR_SOCKS5_URL")
.unwrap_or_else(|_| "socks5://127.0.0.1:9050".to_string()),
)?;
let transport = TorTransport::new(socks5_url, None)?;
let stream = transport.clone().dial(uri.clone());
hanlde_stream!(stream, transport, upgrade);
}
TransportName::Unix => {
let transport = UnixTransport::new();
let stream = transport.dial(uri.clone()).await;
if let Err(err) = stream {
error!("RPC Connection to {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
executor.spawn(reqrep_loop(stream?, data_receiver, result_sender)).detach();
}
_ => unimplemented!(),
}
Ok((data_sender, result_receiver))
}
pub async fn send_request(uri: &Url, data: Value) -> Result<JsonResult> {
let data_str = serde_json::to_string(&data)?;