From 295bfed30faea065f01fb25c10d95c1c9ecf670b Mon Sep 17 00:00:00 2001 From: narodnik Date: Fri, 22 Jan 2021 16:16:30 +0100 Subject: [PATCH] create specialized network error that is copyable and easily sent between threads. --- src/error.rs | 15 ++++++ src/net/acceptor.rs | 72 ++++++++++++++++++--------- src/net/channel.rs | 20 ++++---- src/net/connector.rs | 8 +-- src/net/message_subscriber.rs | 19 ++----- src/net/mod.rs | 1 + src/net/p2p.rs | 10 ++-- src/net/protocols/protocol_ping.rs | 6 +-- src/net/protocols/protocol_pong.rs | 22 -------- src/net/protocols/protocol_seed.rs | 6 +-- src/net/protocols/protocol_version.rs | 12 ++--- src/net/sessions/inbound_session.rs | 35 ++++++++++--- src/net/sessions/seed_session.rs | 12 ++--- src/net/sessions/session.rs | 4 +- src/system/stoppable_task.rs | 12 ++--- 15 files changed, 139 insertions(+), 115 deletions(-) delete mode 100644 src/net/protocols/protocol_pong.rs diff --git a/src/error.rs b/src/error.rs index f70f98817..821b4b58a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ use std::fmt; +use crate::net::error::NetError; use crate::vm::ZKVMError; pub type Result = std::result::Result; @@ -124,3 +125,17 @@ impl From for Error { Error::ParseIntError } } + +impl From for Error { + fn from(err: NetError) -> Error { + match err { + NetError::OperationFailed => Error::OperationFailed, + NetError::ConnectFailed => Error::ConnectFailed, + NetError::ConnectTimeout => Error::ConnectTimeout, + NetError::ChannelStopped => Error::ChannelStopped, + NetError::ChannelTimeout => Error::ChannelTimeout, + NetError::ServiceStopped => Error::ServiceStopped, + } + } +} + diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 60baef592..956a858e7 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -4,14 +4,14 @@ use smol::{Async, Executor}; use std::net::{SocketAddr, TcpListener}; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::{Channel, ChannelPtr, SettingsPtr}; use crate::system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}; pub type AcceptorPtr = Arc; pub struct Acceptor { - channel_subscriber: SubscriberPtr>, + channel_subscriber: SubscriberPtr>, task: StoppableTaskPtr, settings: SettingsPtr, } @@ -25,16 +25,15 @@ impl Acceptor { }) } - pub fn accept( + pub fn start( self: Arc, accept_addr: SocketAddr, executor: Arc>, - ) -> Result<()> { - let listener = Async::::bind(accept_addr)?; - info!("Listening on {}", listener.get_ref().local_addr()?); + ) -> NetResult<()> { + let listener = Self::setup(accept_addr)?; // Start detached task and return instantly - self.accept_or_stop(listener, executor); + self.accept(listener, executor); Ok(()) } @@ -44,30 +43,49 @@ impl Acceptor { self.task.stop().await; } - fn accept_or_stop(self: Arc, listener: Async, executor: Arc>) { + pub async fn subscribe(self: Arc) -> Subscription> { + self.channel_subscriber.clone().subscribe().await + } + + fn setup( + accept_addr: SocketAddr) -> NetResult> { + let listener = match Async::::bind(accept_addr) { + Ok(l) => l, + Err(err) => { + error!("Bind listener failed: {}", err); + return Err(NetError::OperationFailed); + } + }; + let local_addr = match listener.get_ref().local_addr() { + Ok(a) => a, + Err(err) => { + error!("Failed to get local address: {}", err); + return Err(NetError::OperationFailed); + } + }; + info!("Listening on {}", local_addr); + + Ok(listener) + } + + fn accept(self: Arc, listener: Async, executor: Arc>) { self.task.clone().start( - self.clone().run_accept(listener), + self.clone().run_accept_loop(listener), |result| self.handle_stop(result), - executor, + NetError::ServiceStopped, + executor ); } - async fn run_accept(self: Arc, listener: Async) -> Result<()> { + async fn run_accept_loop(self: Arc, listener: Async) -> NetResult<()> { loop { - match self.tick_accept(&listener).await { - Ok(channel) => { - let channel_result = Arc::new(Ok(channel)); - self.channel_subscriber.notify(channel_result).await; - } - Err(err) => { - error!("Error listening for connections: {}", err); - return Err(Error::ServiceStopped); - } - } + let channel = self.tick_accept(&listener).await?; + let channel_result = Arc::new(Ok(channel)); + self.channel_subscriber.notify(channel_result).await; } } - async fn handle_stop(self: Arc, result: Result<()>) { + async fn handle_stop(self: Arc, result: NetResult<()>) { match result { Ok(()) => panic!("Acceptor task should never complete without error status"), Err(err) => { @@ -78,8 +96,14 @@ impl Acceptor { } } - async fn tick_accept(&self, listener: &Async) -> Result { - let (stream, peer_addr) = listener.accept().await?; + async fn tick_accept(&self, listener: &Async) -> NetResult { + let (stream, peer_addr) = match listener.accept().await { + Ok((s, a)) => (s, a), + Err(err) => { + error!("Error listening for connections: {}", err); + return Err(NetError::ServiceStopped); + } + }; info!("Accepted client: {}", peer_addr); let channel = Channel::new(stream, peer_addr, self.settings.clone()); diff --git a/src/net/channel.rs b/src/net/channel.rs index d78ba6266..729aff3d6 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -10,7 +10,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::message_subscriber::{ MessageSubscriber, MessageSubscriberPtr, MessageSubscription, }; @@ -26,7 +26,7 @@ pub struct Channel { writer: Mutex>>, address: SocketAddr, message_subscriber: MessageSubscriberPtr, - stop_subscriber: SubscriberPtr, + stop_subscriber: SubscriberPtr, stopped: AtomicBool, settings: SettingsPtr, } @@ -51,9 +51,9 @@ impl Channel { executor.spawn(self.receive_loop()).detach(); } - pub async fn send(self: Arc, message: messages::Message) -> Result<()> { + pub async fn send(self: Arc, message: messages::Message) -> NetResult<()> { if self.stopped.load(Ordering::Relaxed) { - return Err(Error::ChannelStopped); + return Err(NetError::ChannelStopped); } // Catch failure and stop channel, return a net error @@ -62,7 +62,7 @@ impl Channel { Err(err) => { error!("Channel error {}, closing {}", err, self.address()); self.stop().await; - Err(Error::ChannelStopped) + Err(NetError::ChannelStopped) } } } @@ -78,17 +78,17 @@ impl Channel { self.message_subscriber.clone().subscribe(packet_type).await } - pub async fn subscribe_stop(self: Arc) -> Subscription { + pub async fn subscribe_stop(self: Arc) -> Subscription { self.stop_subscriber.clone().subscribe().await } pub async fn stop(&self) { self.stopped.store(false, Ordering::Relaxed); - let stop_err = Arc::new(Error::ChannelStopped); + let stop_err = Arc::new(NetError::ChannelStopped); self.stop_subscriber.notify(stop_err).await; } - async fn receive_loop(self: Arc) -> Result<()> { + async fn receive_loop(self: Arc) -> NetResult<()> { let stop_sub = self.clone().subscribe_stop().await; let reader = &mut *self.reader.lock().await; @@ -100,12 +100,12 @@ impl Channel { Err(err) => { error!("Read error on channel {}", err); self.stop().await; - Err(Error::ChannelStopped) + Err(NetError::ChannelStopped) } } } stop_err = stop_sub.receive().fuse() => { - Err(clone_net_error(&*stop_err)) + Err(*stop_err) } }; diff --git a/src/net/connector.rs b/src/net/connector.rs index a18f4063a..9bc664404 100644 --- a/src/net/connector.rs +++ b/src/net/connector.rs @@ -4,7 +4,7 @@ use smol::{Async, Executor}; use std::net::{SocketAddr, TcpStream}; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::utility::sleep; use crate::net::{Channel, ChannelPtr, SettingsPtr}; @@ -17,15 +17,15 @@ impl Connector { Self { settings } } - pub async fn connect(&self, hostaddr: SocketAddr) -> Result { + pub async fn connect(&self, hostaddr: SocketAddr) -> NetResult { futures::select! { stream_result = Async::::connect(hostaddr).fuse() => { match stream_result { Ok(stream) => Ok(Channel::new(stream, hostaddr, self.settings.clone())), - Err(_) => Err(Error::ConnectFailed) + Err(_) => Err(NetError::ConnectFailed) } } - _ = sleep(self.settings.connect_timeout_seconds).fuse() => Err(Error::ConnectTimeout) + _ = sleep(self.settings.connect_timeout_seconds).fuse() => Err(NetError::ConnectTimeout) } } } diff --git a/src/net/message_subscriber.rs b/src/net/message_subscriber.rs index fe10ddc62..bbc4ac479 100644 --- a/src/net/message_subscriber.rs +++ b/src/net/message_subscriber.rs @@ -3,13 +3,13 @@ use rand::Rng; use std::collections::HashMap; use std::sync::Arc; -use crate::error::Result; +use crate::net::error::NetResult; use crate::net::messages::{Message, PacketType}; use crate::net::utility::clone_net_error; pub type MessageSubscriberPtr = Arc; -pub type MessageResult = Result>; +pub type MessageResult = NetResult>; pub type MessageSubscriptionID = u64; macro_rules! receive_message { @@ -25,19 +25,6 @@ macro_rules! receive_message { }}; } -trait CloneMessageResult { - fn clone(&self) -> Self; -} - -impl CloneMessageResult for Result> { - fn clone(&self) -> Self { - match self { - Ok(message) => Ok(message.clone()), - Err(err) => Err(clone_net_error(err)), - } - } -} - pub struct MessageSubscription { id: MessageSubscriptionID, filter: PacketType, @@ -119,7 +106,7 @@ impl MessageSubscriber { self.subs.lock().await.remove(&sub_id); } - pub async fn notify(&self, message_result: Result>) { + pub async fn notify(&self, message_result: NetResult>) { for sub in (*self.subs.lock().await).values() { match sub.send(message_result.clone()).await { Ok(()) => {} diff --git a/src/net/mod.rs b/src/net/mod.rs index 289669d54..41f493cf0 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -4,6 +4,7 @@ use std::net::TcpStream; pub mod acceptor; pub mod channel; pub mod connector; +pub mod error; #[macro_use] pub mod message_subscriber; pub mod hosts; diff --git a/src/net/p2p.rs b/src/net/p2p.rs index c48e18686..79caf5815 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use crate::error::Result; +use crate::net::error::NetResult; use crate::net::sessions::{InboundSession, SeedSession}; use crate::net::{Channel, ChannelPtr, Connector, Hosts, HostsPtr, Settings, SettingsPtr}; @@ -31,7 +31,7 @@ impl P2p { } /// Invoke startup and seeding sequence. Call from constructing thread. - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { + pub async fn start(self: Arc, executor: Arc>) -> NetResult<()> { // Start manual connections // Start seed session let seed = SeedSession::new(Arc::downgrade(&self)); @@ -41,10 +41,10 @@ impl P2p { /// Synchronize the blockchain and then begin long running sessions, /// call after start() is invoked. - pub async fn run(self: Arc, executor: Arc>) -> Result<()> { + pub async fn run(self: Arc, executor: Arc>) -> NetResult<()> { let inbound = InboundSession::new(Arc::downgrade(&self)); - let inbound_task = inbound.start(executor.clone()); - inbound_task.await + inbound.start(executor.clone())?; + Ok(()) } pub async fn store(self: Arc, channel: ChannelPtr) { diff --git a/src/net/protocols/protocol_ping.rs b/src/net/protocols/protocol_ping.rs index 516c2b20f..9192a7ae5 100644 --- a/src/net/protocols/protocol_ping.rs +++ b/src/net/protocols/protocol_ping.rs @@ -3,7 +3,7 @@ use rand::Rng; use smol::{Executor, Task}; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::messages; use crate::net::utility::{clone_net_error, sleep}; use crate::net::{ChannelPtr, SettingsPtr}; @@ -18,11 +18,11 @@ impl ProtocolPing { Arc::new(Self { channel, settings }) } - pub fn start(self: Arc, executor: Arc>) -> Task> { + pub fn start(self: Arc, executor: Arc>) -> Task> { executor.spawn(self.run_ping_pong()) } - async fn run_ping_pong(self: Arc) -> Result<()> { + async fn run_ping_pong(self: Arc) -> NetResult<()> { let pong_sub = self .channel .clone() diff --git a/src/net/protocols/protocol_pong.rs b/src/net/protocols/protocol_pong.rs deleted file mode 100644 index 60308b32c..000000000 --- a/src/net/protocols/protocol_pong.rs +++ /dev/null @@ -1,22 +0,0 @@ -use futures::FutureExt; -use smol::Executor; -use std::sync::Arc; - -use crate::error::{Error, Result}; -use crate::net::{ChannelPtr, SettingsPtr}; - -pub struct ProtocolPong { - channel: ChannelPtr, - settings: SettingsPtr, -} - -impl ProtocolPong { - pub fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc { - Arc::new(Self { channel, settings }) - } - - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { - Ok(()) - } -} - diff --git a/src/net/protocols/protocol_seed.rs b/src/net/protocols/protocol_seed.rs index 9fb2fbcd4..a487a12d7 100644 --- a/src/net/protocols/protocol_seed.rs +++ b/src/net/protocols/protocol_seed.rs @@ -3,7 +3,7 @@ use owning_ref::OwningRef; use smol::Executor; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::messages; use crate::net::{ChannelPtr, HostsPtr, SettingsPtr}; @@ -22,7 +22,7 @@ impl ProtocolSeed { }) } - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { + pub async fn start(self: Arc, executor: Arc>) -> NetResult<()> { let addr_sub = self .channel .clone() @@ -43,7 +43,7 @@ impl ProtocolSeed { Ok(()) } - pub async fn send_own_address(&self) -> Result<()> { + pub async fn send_own_address(&self) -> NetResult<()> { match self.settings.external_addr { Some(addr) => { let addr = messages::Message::Addrs(messages::AddrsMessage { addrs: vec![addr] }); diff --git a/src/net/protocols/protocol_version.rs b/src/net/protocols/protocol_version.rs index a8ea9ab82..508288b26 100644 --- a/src/net/protocols/protocol_version.rs +++ b/src/net/protocols/protocol_version.rs @@ -2,7 +2,7 @@ use futures::FutureExt; use smol::Executor; use std::sync::Arc; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::messages; use crate::net::utility::{clone_net_error, sleep}; use crate::net::{ChannelPtr, SettingsPtr}; @@ -17,25 +17,25 @@ impl ProtocolVersion { Arc::new(Self { channel, settings }) } - pub async fn run(self: Arc, executor: Arc>) -> Result<()> { + pub async fn run(self: Arc, executor: Arc>) -> NetResult<()> { // Start timer // Send version, wait for verack // Wait for version, send verack // Fin. futures::select! { _ = self.clone().exchange_versions(executor).fuse() => Ok(()), - _ = sleep(self.settings.channel_handshake_seconds).fuse() => Err(Error::ChannelTimeout) + _ = sleep(self.settings.channel_handshake_seconds).fuse() => Err(NetError::ChannelTimeout) } } - async fn exchange_versions(self: Arc, executor: Arc>) -> Result<()> { + async fn exchange_versions(self: Arc, executor: Arc>) -> NetResult<()> { let send = executor.spawn(self.clone().send_version()); let recv = executor.spawn(self.recv_version()); send.await.and(recv.await) } - async fn send_version(self: Arc) -> Result<()> { + async fn send_version(self: Arc) -> NetResult<()> { let version = messages::Message::Version(messages::VersionMessage {}); self.channel.clone().send(version).await?; @@ -43,7 +43,7 @@ impl ProtocolVersion { Ok(()) } - async fn recv_version(self: Arc) -> Result<()> { + async fn recv_version(self: Arc) -> NetResult<()> { let version_sub = self .channel .clone() diff --git a/src/net/sessions/inbound_session.rs b/src/net/sessions/inbound_session.rs index a85fd893e..d286e692e 100644 --- a/src/net/sessions/inbound_session.rs +++ b/src/net/sessions/inbound_session.rs @@ -3,15 +3,18 @@ use log::*; use std::net::SocketAddr; use std::sync::{Arc, Weak}; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::protocols::{ProtocolPing, ProtocolSeed}; use crate::net::sessions::Session; use crate::net::{Acceptor, AcceptorPtr}; use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr}; +use crate::net::utility::clone_net_error; +use crate::system::{StoppableTask, StoppableTaskPtr}; pub struct InboundSession { p2p: Weak, acceptor: AcceptorPtr, + accept_task: StoppableTaskPtr, } impl InboundSession { @@ -23,19 +26,27 @@ impl InboundSession { let acceptor = Acceptor::new(settings); - Arc::new(Self { p2p, acceptor }) + Arc::new(Self { p2p, acceptor, accept_task: StoppableTask::new() }) } - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { + pub fn start(self: Arc, executor: Arc>) -> NetResult<()> { match self.p2p().settings().inbound { Some(accept_addr) => { - self.start_accept_session(accept_addr, executor).await?; + self.clone().start_accept_session(accept_addr, executor.clone())?; } None => { info!("Not configured for accepting incoming connections."); + return Ok(()); } } + self.accept_task.clone().start( + self.clone().channel_sub_loop(), + // Ignore stop handler + |_| { async {} }, + NetError::ServiceStopped, + executor); + Ok(()) } @@ -43,14 +54,15 @@ impl InboundSession { self.acceptor.stop().await; } - async fn start_accept_session( + fn start_accept_session( self: Arc, accept_addr: SocketAddr, executor: Arc>, - ) -> Result<()> { + ) -> NetResult<()> { info!("Starting inbound session on {}", accept_addr); - match self.acceptor.clone().accept(accept_addr, executor) { - Ok(()) => {} + match self.acceptor.clone().start(accept_addr, executor) { + Ok(()) => { + } Err(err) => { error!("Error starting listener: {}", err); return Err(err); @@ -58,6 +70,13 @@ impl InboundSession { } Ok(()) } + + async fn channel_sub_loop(self: Arc) -> NetResult<()> { + let channel_sub = self.acceptor.clone().subscribe().await; + loop { + //let channel = (*channel_sub.receive().await)?; + } + } } impl Session for InboundSession { diff --git a/src/net/sessions/seed_session.rs b/src/net/sessions/seed_session.rs index b2945ff0f..f5371ad96 100644 --- a/src/net/sessions/seed_session.rs +++ b/src/net/sessions/seed_session.rs @@ -3,7 +3,7 @@ use log::*; use std::net::SocketAddr; use std::sync::{Arc, Weak}; -use crate::error::{Error, Result}; +use crate::net::error::{NetError, NetResult}; use crate::net::protocols::{ProtocolPing, ProtocolSeed}; use crate::net::sessions::Session; use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr}; @@ -17,7 +17,7 @@ impl SeedSession { Arc::new(Self { p2p }) } - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { + pub async fn start(self: Arc, executor: Arc>) -> NetResult<()> { let settings = { let p2p = self.p2p.upgrade().unwrap(); p2p.settings() @@ -28,7 +28,7 @@ impl SeedSession { // if seeds empty then seeding required but empty if settings.seeds.is_empty() { error!("Seeding is required but no seeds are configured."); - return Err(Error::OperationFailed); + return Err(NetError::OperationFailed); } let mut tasks = Vec::new(); @@ -52,7 +52,7 @@ impl SeedSession { self: Arc, seed: SocketAddr, executor: Arc>, - ) -> Result<()> { + ) -> NetResult<()> { let (hosts, settings) = { let p2p = self.p2p.upgrade().unwrap(); (p2p.hosts(), p2p.settings()) @@ -83,7 +83,7 @@ impl SeedSession { self: Arc, channel: ChannelPtr, executor: Arc>, - ) -> Result<()> { + ) -> NetResult<()> { let handshake_task = self.perform_handshake_protocols(channel.clone(), executor.clone()); // start channel @@ -98,7 +98,7 @@ impl SeedSession { hosts: HostsPtr, settings: SettingsPtr, executor: Arc>, - ) -> Result<()> { + ) -> NetResult<()> { let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); let ping_task = protocol_ping.start(executor.clone()); diff --git a/src/net/sessions/session.rs b/src/net/sessions/session.rs index c2f9b9e2d..9f918c65d 100644 --- a/src/net/sessions/session.rs +++ b/src/net/sessions/session.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use smol::Executor; use std::sync::Arc; -use crate::error::Result; +use crate::net::error::NetResult; use crate::net::p2p::P2pPtr; use crate::net::protocols::ProtocolVersion; use crate::net::ChannelPtr; @@ -22,7 +22,7 @@ pub trait Session { &self, channel: ChannelPtr, executor: Arc>, - ) -> Result<()> { + ) -> NetResult<()> { let p2p = self.p2p(); // Perform handshake diff --git a/src/system/stoppable_task.rs b/src/system/stoppable_task.rs index b445f2269..a5d4b8bdd 100644 --- a/src/system/stoppable_task.rs +++ b/src/system/stoppable_task.rs @@ -3,8 +3,6 @@ use futures::Future; use futures::FutureExt; use std::sync::Arc; -use crate::error::{Error, Result}; - pub type StoppableTaskPtr = Arc; pub struct StoppableTask { @@ -26,20 +24,22 @@ impl StoppableTask { let _ = self.stop_send.send(()).await; } - pub fn start<'a, MainFut, StopFut, StopFn>( + pub fn start<'a, MainFut, StopFut, StopFn, Error>( self: Arc, main: MainFut, stop_handler: StopFn, + stop_value: Error, executor: Arc>, ) where - MainFut: Future> + Send + 'a, + MainFut: Future> + Send + 'a, StopFut: Future + Send, - StopFn: FnOnce(Result<()>) -> StopFut + Send + 'a, + StopFn: FnOnce(std::result::Result<(), Error>) -> StopFut + Send + 'a, + Error: std::error::Error + Send + 'a { executor .spawn(async move { let result = futures::select! { - _ = self.stop_recv.recv().fuse() => Err(Error::ServiceStopped), + _ = self.stop_recv.recv().fuse() => Err(stop_value), result = main.fuse() => result };