From 521ed07ea941e8238d8a023612bb9e1b8fd5697d Mon Sep 17 00:00:00 2001 From: ghassmo Date: Tue, 26 Apr 2022 01:39:35 +0300 Subject: [PATCH] net3: errors handling and clean up --- src/net3/acceptor.rs | 24 +++++++++++++----------- src/net3/connector.rs | 23 ++++++++++------------- src/net3/mod.rs | 2 +- src/net3/transport.rs | 5 +---- src/net3/transport/tcp.rs | 10 ++-------- 5 files changed, 27 insertions(+), 37 deletions(-) diff --git a/src/net3/acceptor.rs b/src/net3/acceptor.rs index 5c9dd7526..916ec6f27 100644 --- a/src/net3/acceptor.rs +++ b/src/net3/acceptor.rs @@ -11,7 +11,7 @@ use crate::{ }; use super::{ - transport::{TcpTransport, TlsTransport}, + TcpTransport, TlsTransport, Channel, ChannelPtr, Transport, }; @@ -68,29 +68,31 @@ impl Acceptor { let mut url = Url::parse(&accept_addr.to_string())?; url.set_host(Some("tcp"))?; - match url.scheme() { + match url.scheme() { "tcp" => { let transport = TcpTransport::new(None, 1024); - let listener = transport.listen_on(url).unwrap().await.unwrap(); + let listener = transport.listen_on(url)?.await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { - let stream = stream.unwrap(); - let peer_addr = stream.peer_addr().unwrap(); + let stream = stream?; + let peer_addr = stream.peer_addr()?; let channel = Channel::new(Box::new(stream), peer_addr).await; + self.channel_subscriber.notify(Ok(channel)).await; } } "tls" => { let transport = TlsTransport::new(None, 1024); - let (acceptor, listener) = transport.listen_on(url).unwrap().await.unwrap(); + let (acceptor, listener) = transport.listen_on(url)?.await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { - let stream = stream.unwrap(); - let peer_addr = stream.peer_addr().unwrap(); - let mut stream = acceptor.accept(stream).await.unwrap(); - let channel = - Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await; + let stream = stream?; + let peer_addr = stream.peer_addr()?; + let stream = acceptor.accept(stream).await?; + let channel = Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await; + self.channel_subscriber.notify(Ok(channel)).await; } } + "tor" => todo!(), _ => unimplemented!(), } Ok(()) diff --git a/src/net3/connector.rs b/src/net3/connector.rs index dd8a43ff7..7357fcbd5 100644 --- a/src/net3/connector.rs +++ b/src/net3/connector.rs @@ -1,13 +1,10 @@ -use async_std::{future::timeout, net::TcpStream}; +use async_std::future::timeout; use std::{net::SocketAddr, time::Duration}; use url::Url; -use crate::{Error, Result}; +use crate::Result; -use super::{ - transport::{TcpTransport, TlsTransport}, - Channel, ChannelPtr, SettingsPtr, Transport, -}; +use super::{Channel, ChannelPtr, SettingsPtr, TcpTransport, TlsTransport, Transport}; /// Create outbound socket connections. pub struct Connector { @@ -29,19 +26,19 @@ impl Connector { match url.scheme() { "tcp" => { let transport = TcpTransport::new(None, 1024); - let stream = transport.dial(url).unwrap().await.unwrap(); - Channel::new(Box::new(stream), hosturl).await + let stream = transport.dial(url)?.await?; + Ok(Channel::new(Box::new(stream), hosturl).await) } "tls" => { let transport = TlsTransport::new(None, 1024); - let stream = transport.dial(url).unwrap().await.unwrap(); - Channel::new(Box::new(stream), hosturl).await + let stream = transport.dial(url)?.await?; + Ok(Channel::new(Box::new(stream), hosturl).await) } + "tor" => todo!(), _ => unimplemented!(), } }) - .await - .unwrap(); - Ok(result) + .await?; + result } } diff --git a/src/net3/mod.rs b/src/net3/mod.rs index 91ff3fe73..894b69053 100644 --- a/src/net3/mod.rs +++ b/src/net3/mod.rs @@ -98,4 +98,4 @@ pub use p2p::{P2p, P2pPtr}; pub use protocol::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr}; pub use session::{SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED}; pub use settings::{Settings, SettingsPtr}; -pub use transport::Transport; +pub use transport::{TcpTransport, TlsTransport, TorTransport, Transport}; diff --git a/src/net3/transport.rs b/src/net3/transport.rs index e854daae3..9ac39b88b 100644 --- a/src/net3/transport.rs +++ b/src/net3/transport.rs @@ -1,10 +1,7 @@ use std::error::Error; use async_trait::async_trait; -use futures::{ - io::{ReadHalf, WriteHalf}, - prelude::*, -}; +use futures::prelude::*; use url::Url; mod tcp; diff --git a/src/net3/transport/tcp.rs b/src/net3/transport/tcp.rs index e5fb9d9d4..96175f9c7 100644 --- a/src/net3/transport/tcp.rs +++ b/src/net3/transport/tcp.rs @@ -1,13 +1,7 @@ -use async_std::{ - net::{TcpListener, TcpStream}, - sync::{Arc, Mutex}, -}; +use async_std::net::{TcpListener, TcpStream}; use std::{io, net::SocketAddr, pin::Pin}; -use futures::{ - io::{ReadHalf, WriteHalf}, - prelude::*, -}; +use futures::prelude::*; use log::debug; use socket2::{Domain, Socket, Type}; use url::Url;