mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
rpc: use new network transport for jsonrpc
This commit is contained in:
@@ -1,14 +1,17 @@
|
||||
use std::{net::TcpStream, os::unix::net::UnixStream, str, time::Duration};
|
||||
use std::{env, str, time::Duration};
|
||||
|
||||
use async_std::io::timeout;
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use futures::{AsyncReadExt, AsyncWriteExt};
|
||||
use log::error;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use smol::Async;
|
||||
use url::Url;
|
||||
|
||||
use crate::{Error, Result};
|
||||
use crate::{
|
||||
net::{TcpTransport, TorTransport, Transport, TransportName, TransportStream},
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ErrorCode {
|
||||
@@ -159,62 +162,74 @@ pub fn notification(m: Value, p: Value) -> JsonNotification {
|
||||
pub async fn send_request(uri: &Url, data: Value, socks_url: Option<Url>) -> Result<JsonResult> {
|
||||
let data_str = serde_json::to_string(&data)?;
|
||||
|
||||
let socket_addr = uri.socket_addrs(|| None)?[0];
|
||||
let host = socket_addr.ip().to_string();
|
||||
let port = socket_addr.port();
|
||||
let transport_name = TransportName::try_from(uri.clone())?;
|
||||
|
||||
match uri.scheme() {
|
||||
"tcp" | "tls" => {
|
||||
let mut stream = Async::<TcpStream>::connect(socket_addr).await?;
|
||||
match transport_name {
|
||||
TransportName::Tcp(upgrade) => {
|
||||
let transport = TcpTransport::new(None, 1024);
|
||||
let stream = transport.dial(uri.clone());
|
||||
|
||||
if uri.scheme() == "tls" {
|
||||
let mut stream = async_native_tls::connect(&host, stream).await?;
|
||||
get_reply(&mut stream, data_str).await
|
||||
} else {
|
||||
get_reply(&mut stream, data_str).await
|
||||
}
|
||||
}
|
||||
"unix" => {
|
||||
let mut stream = Async::<UnixStream>::connect(uri.path()).await?;
|
||||
get_reply(&mut stream, data_str).await
|
||||
}
|
||||
"tor" | "nym" => {
|
||||
use fast_socks5::client::{Config, Socks5Stream};
|
||||
|
||||
if socks_url.is_none() {
|
||||
return Err(Error::NoSocks5UrlFound)
|
||||
if let Err(err) = stream {
|
||||
error!("TCP Setup failed: {}", err);
|
||||
return Err(Error::ConnectFailed)
|
||||
}
|
||||
|
||||
let socks_url = socks_url.unwrap();
|
||||
let stream = stream?.await;
|
||||
|
||||
let config = Config::default();
|
||||
if let Err(err) = stream {
|
||||
error!("TCP Connection failed: {}", err);
|
||||
return Err(Error::ConnectFailed)
|
||||
}
|
||||
|
||||
let socks_url_str = socks_url.socket_addrs(|| None)?[0].to_string();
|
||||
|
||||
let mut stream = if !socks_url.username().is_empty() && socks_url.password().is_some() {
|
||||
Socks5Stream::connect_with_password(
|
||||
socks_url_str,
|
||||
host,
|
||||
port,
|
||||
socks_url.username().to_string(),
|
||||
socks_url.password().unwrap().to_string(),
|
||||
config,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
Socks5Stream::connect(socks_url_str, host, port, config).await?
|
||||
};
|
||||
|
||||
get_reply(&mut stream, data_str).await
|
||||
match upgrade {
|
||||
None => get_reply(&mut stream?, data_str).await,
|
||||
Some(u) if u == "tls" => {
|
||||
let mut stream = transport.upgrade_dialer(stream?)?.await?;
|
||||
get_reply(&mut stream, data_str).await
|
||||
}
|
||||
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
|
||||
}
|
||||
}
|
||||
// TODO
|
||||
// "unix" => {
|
||||
// let mut stream = Async::<UnixStream>::connect(uri.path()).await?;
|
||||
// get_reply(&mut stream, data_str).await
|
||||
// }
|
||||
TransportName::Tor(upgrade) => {
|
||||
let socks5_url = Url::parse(
|
||||
&env::var("DARKFI_TOR_SOCKS5_URL").unwrap_or("socks5://127.0.0.1:9050".to_string()),
|
||||
)?;
|
||||
|
||||
let transport = TorTransport::new(socks5_url, None)?;
|
||||
|
||||
let stream = transport.clone().dial(uri.clone());
|
||||
|
||||
if let Err(err) = stream {
|
||||
error!("TOR Setup failed: {}", err);
|
||||
return Err(Error::ConnectFailed)
|
||||
}
|
||||
|
||||
let stream = stream?.await;
|
||||
|
||||
if let Err(err) = stream {
|
||||
error!("TOR Connection failed: {}", err);
|
||||
return Err(Error::ConnectFailed)
|
||||
}
|
||||
match upgrade {
|
||||
None => get_reply(&mut stream?, data_str).await,
|
||||
Some(u) if u == "tls" => {
|
||||
let mut stream = transport.upgrade_dialer(stream?)?.await?;
|
||||
get_reply(&mut stream, data_str).await
|
||||
}
|
||||
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
|
||||
}
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_reply<T: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: &mut T,
|
||||
data_str: String,
|
||||
) -> Result<JsonResult> {
|
||||
async fn get_reply<T: TransportStream>(stream: &mut T, data_str: String) -> Result<JsonResult> {
|
||||
// If we don't get a reply after 30 seconds, we'll fail.
|
||||
let read_timeout = Duration::from_secs(30);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user