rpc: add support for unix socket

This commit is contained in:
ghassmo
2022-05-11 06:03:23 +03:00
committed by parazyd
parent 525efd7dd9
commit 47257c022e
2 changed files with 42 additions and 17 deletions

View File

@@ -9,7 +9,7 @@ use serde_json::{json, Value};
use url::Url;
use crate::{
net::{TcpTransport, TorTransport, Transport, TransportName, TransportStream},
net::{TcpTransport, TorTransport, Transport, TransportName, TransportStream, UnixTransport},
Error, Result,
};
@@ -170,14 +170,14 @@ pub async fn send_request(uri: &Url, data: Value) -> Result<JsonResult> {
let stream = transport.dial(uri.clone());
if let Err(err) = stream {
error!("RPC TCP Setup failed: {}", err);
error!("RPC Setup for {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
let stream = stream?.await;
if let Err(err) = stream {
error!("RPC TCP Connection failed: {}", err);
error!("RPC Connection to {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
@@ -200,16 +200,17 @@ pub async fn send_request(uri: &Url, data: Value) -> Result<JsonResult> {
let stream = transport.clone().dial(uri.clone());
if let Err(err) = stream {
error!("RPC TOR Setup failed: {}", err);
error!("RPC Setup for {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
let stream = stream?.await;
if let Err(err) = stream {
error!("RPC TOR Connection failed: {}", err);
error!("RPC Connection to {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
match upgrade {
None => get_reply(&mut stream?, data_str).await,
Some(u) if u == "tls" => {
@@ -219,7 +220,18 @@ pub async fn send_request(uri: &Url, data: Value) -> Result<JsonResult> {
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
}
}
TransportName::Unix => {
let transport = UnixTransport::new();
let stream = transport.dial(uri.clone()).await;
if let Err(err) = stream {
error!("RPC Connection to {} failed: {}", uri, err);
return Err(Error::ConnectFailed)
}
get_reply(&mut stream?, data_str).await
}
_ => unimplemented!(),
}
}

View File

@@ -12,6 +12,7 @@ use super::jsonrpc::{JsonRequest, JsonResult};
use crate::{
net::transport::{
TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream,
UnixTransport,
},
Error, Result,
};
@@ -91,26 +92,26 @@ pub async fn listen_and_serve(
let listener = transport.listen_on(accept_url.clone());
if let Err(err) = listener {
error!("TCP Setup failed: {}", err);
return Err(Error::BindFailed(accept_url.clone().to_string()))
error!("RPC Setup for {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?.await;
if let Err(err) = listener {
error!("TCP Bind listener failed: {}", err);
return Err(Error::BindFailed(accept_url.to_string()))
error!("RPC Bind listener to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?;
match upgrade {
None => {
info!("RPC TCP listening to: {}", accept_url);
info!("RPC listening to: {}", accept_url);
run_accept_loop(Box::new(listener), rh).await?;
}
Some(u) if u == "tls" => {
info!("RPC TCP+TLS listening to: {}", accept_url);
info!("RPC listening to: {}", accept_url);
let tls_listener = transport.upgrade_listener(listener)?.await?;
run_accept_loop(Box::new(tls_listener), rh).await?;
}
@@ -151,15 +152,15 @@ pub async fn listen_and_serve(
let listener = transport.clone().listen_on(accept_url.clone());
if let Err(err) = listener {
error!("TOR Setup failed: {}", err);
return Err(Error::BindFailed(accept_url.clone().to_string()))
error!("RPC Setup for {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?.await;
if let Err(err) = listener {
error!("TOR Bind listener failed: {}", err);
return Err(Error::BindFailed(accept_url.to_string()))
error!("RPC Bind listener to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?;
@@ -167,10 +168,10 @@ pub async fn listen_and_serve(
match upgrade {
None => {
run_accept_loop(Box::new(listener), rh).await?;
info!("RPC TOR listening to: {}", accept_url);
info!("RPC listening to: {}", accept_url);
}
Some(u) if u == "tls" => {
info!("RPC TOR+TLS listening to: {}", accept_url);
info!("RPC listening to: {}", accept_url);
let tls_listener = transport.upgrade_listener(listener)?.await?;
run_accept_loop(Box::new(tls_listener), rh).await?;
}
@@ -178,6 +179,18 @@ pub async fn listen_and_serve(
}
}
TransportName::Unix => {
let transport = UnixTransport::new();
let listener = transport.listen(accept_url.clone()).await;
if let Err(err) = listener {
error!("RPC Bind listener to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
run_accept_loop(Box::new(listener?), rh).await?;
}
_ => unimplemented!(),
}