net: add/use macros connect! and accept! for connector.rs and acceptor.rs

This commit is contained in:
ghassmo
2022-05-13 07:31:53 +03:00
parent 69fe7485b7
commit 9692ebf29d
2 changed files with 30 additions and 64 deletions

View File

@@ -37,17 +37,15 @@ impl Acceptor {
executor: Arc<Executor<'_>>,
) -> Result<()> {
let transport_name = TransportName::try_from(accept_url.clone())?;
match transport_name {
TransportName::Tcp(upgrade) => {
let transport = TcpTransport::new(None, 1024);
let listener = transport.listen_on(accept_url.clone());
if let Err(err) = listener {
macro_rules! accept {
($listener:expr, $transport:expr, $upgrade:expr) => {{
if let Err(err) = $listener {
error!("Setup for {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?.await;
let listener = $listener?.await;
if let Err(err) = listener {
error!("Bind listener to {} failed: {}", accept_url, err);
@@ -56,18 +54,25 @@ impl Acceptor {
let listener = listener?;
match upgrade {
match $upgrade {
None => {
self.accept(Box::new(listener), executor);
}
Some(u) if u == "tls" => {
let tls_listener = transport.upgrade_listener(listener)?.await?;
let tls_listener = $transport.upgrade_listener(listener)?.await?;
self.accept(Box::new(tls_listener), executor);
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
}
}
}};
}
match transport_name {
TransportName::Tcp(upgrade) => {
let transport = TcpTransport::new(None, 1024);
let listener = transport.listen_on(accept_url.clone());
accept!(listener, transport, upgrade);
}
TransportName::Tor(upgrade) => {
let socks5_url = Url::parse(
&env::var("DARKFI_TOR_SOCKS5_URL")
@@ -102,30 +107,7 @@ impl Acceptor {
let listener = transport.clone().listen_on(accept_url.clone());
if let Err(err) = listener {
error!("Setup for {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?.await;
if let Err(err) = listener {
error!("Bind listener to {} failed: {}", accept_url, err);
return Err(Error::BindFailed(accept_url.as_str().into()))
}
let listener = listener?;
match upgrade {
None => {
self.accept(Box::new(listener), executor);
}
Some(u) if u == "tls" => {
let tls_listener = transport.upgrade_listener(listener)?.await?;
self.accept(Box::new(tls_listener), executor);
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
}
accept!(listener, transport, upgrade);
}
_ => unimplemented!(),
}

View File

@@ -37,33 +37,38 @@ impl Connector {
connect_url: Url,
transport_name: TransportName,
) -> Result<Arc<Channel>> {
match transport_name {
TransportName::Tcp(upgrade) => {
let transport = TcpTransport::new(None, 1024);
let stream = transport.dial(connect_url.clone());
if let Err(err) = stream {
macro_rules! connect {
($stream:expr, $transport:expr, $upgrade:expr) => {{
if let Err(err) = $stream {
error!("Setup for {} failed: {}", connect_url, err);
return Err(Error::ConnectFailed)
}
let stream = stream?.await;
let stream = $stream?.await;
if let Err(err) = stream {
error!("Connection to {} failed: {}", connect_url, err);
return Err(Error::ConnectFailed)
}
let channel = match upgrade {
let channel = match $upgrade {
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
Some(u) if u == "tls" => {
let stream = transport.upgrade_dialer(stream?)?.await;
let stream = $transport.upgrade_dialer(stream?)?.await;
Channel::new(Box::new(stream?), connect_url).await
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
};
Ok(channel)
}};
}
match transport_name {
TransportName::Tcp(upgrade) => {
let transport = TcpTransport::new(None, 1024);
let stream = transport.dial(connect_url.clone());
connect!(stream, transport, upgrade)
}
TransportName::Tor(upgrade) => {
let socks5_url = Url::parse(
@@ -75,28 +80,7 @@ impl Connector {
let stream = transport.clone().dial(connect_url.clone());
if let Err(err) = stream {
error!("Setup for {} failed: {}", connect_url, err);
return Err(Error::ConnectFailed)
}
let stream = stream?.await;
if let Err(err) = stream {
error!("Connection to {} failed: {}", connect_url, err);
return Err(Error::ConnectFailed)
}
let channel = match upgrade {
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
Some(u) if u == "tls" => {
let stream = transport.upgrade_dialer(stream?)?.await;
Channel::new(Box::new(stream?), connect_url).await
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
};
Ok(channel)
connect!(stream, transport, upgrade)
}
_ => unimplemented!(),
}