create specialized network error that is copyable and easily sent between threads.

This commit is contained in:
narodnik
2021-01-22 16:16:30 +01:00
parent 235099e440
commit 295bfed30f
15 changed files with 139 additions and 115 deletions

View File

@@ -1,5 +1,6 @@
use std::fmt;
use crate::net::error::NetError;
use crate::vm::ZKVMError;
pub type Result<T> = std::result::Result<T, Error>;
@@ -124,3 +125,17 @@ impl From<std::num::ParseIntError> for Error {
Error::ParseIntError
}
}
impl From<NetError> for Error {
fn from(err: NetError) -> Error {
match err {
NetError::OperationFailed => Error::OperationFailed,
NetError::ConnectFailed => Error::ConnectFailed,
NetError::ConnectTimeout => Error::ConnectTimeout,
NetError::ChannelStopped => Error::ChannelStopped,
NetError::ChannelTimeout => Error::ChannelTimeout,
NetError::ServiceStopped => Error::ServiceStopped,
}
}
}

View File

@@ -4,14 +4,14 @@ use smol::{Async, Executor};
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::{Channel, ChannelPtr, SettingsPtr};
use crate::system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription};
pub type AcceptorPtr = Arc<Acceptor>;
pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
channel_subscriber: SubscriberPtr<NetResult<ChannelPtr>>,
task: StoppableTaskPtr,
settings: SettingsPtr,
}
@@ -25,16 +25,15 @@ impl Acceptor {
})
}
pub fn accept(
pub fn start(
self: Arc<Self>,
accept_addr: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let listener = Async::<TcpListener>::bind(accept_addr)?;
info!("Listening on {}", listener.get_ref().local_addr()?);
) -> NetResult<()> {
let listener = Self::setup(accept_addr)?;
// Start detached task and return instantly
self.accept_or_stop(listener, executor);
self.accept(listener, executor);
Ok(())
}
@@ -44,30 +43,49 @@ impl Acceptor {
self.task.stop().await;
}
fn accept_or_stop(self: Arc<Self>, listener: Async<TcpListener>, executor: Arc<Executor<'_>>) {
pub async fn subscribe(self: Arc<Self>) -> Subscription<NetResult<ChannelPtr>> {
self.channel_subscriber.clone().subscribe().await
}
fn setup(
accept_addr: SocketAddr) -> NetResult<Async<TcpListener>> {
let listener = match Async::<TcpListener>::bind(accept_addr) {
Ok(l) => l,
Err(err) => {
error!("Bind listener failed: {}", err);
return Err(NetError::OperationFailed);
}
};
let local_addr = match listener.get_ref().local_addr() {
Ok(a) => a,
Err(err) => {
error!("Failed to get local address: {}", err);
return Err(NetError::OperationFailed);
}
};
info!("Listening on {}", local_addr);
Ok(listener)
}
fn accept(self: Arc<Self>, listener: Async<TcpListener>, executor: Arc<Executor<'_>>) {
self.task.clone().start(
self.clone().run_accept(listener),
self.clone().run_accept_loop(listener),
|result| self.handle_stop(result),
executor,
NetError::ServiceStopped,
executor
);
}
async fn run_accept(self: Arc<Self>, listener: Async<TcpListener>) -> Result<()> {
async fn run_accept_loop(self: Arc<Self>, listener: Async<TcpListener>) -> NetResult<()> {
loop {
match self.tick_accept(&listener).await {
Ok(channel) => {
let channel_result = Arc::new(Ok(channel));
self.channel_subscriber.notify(channel_result).await;
}
Err(err) => {
error!("Error listening for connections: {}", err);
return Err(Error::ServiceStopped);
}
}
let channel = self.tick_accept(&listener).await?;
let channel_result = Arc::new(Ok(channel));
self.channel_subscriber.notify(channel_result).await;
}
}
async fn handle_stop(self: Arc<Self>, result: Result<()>) {
async fn handle_stop(self: Arc<Self>, result: NetResult<()>) {
match result {
Ok(()) => panic!("Acceptor task should never complete without error status"),
Err(err) => {
@@ -78,8 +96,14 @@ impl Acceptor {
}
}
async fn tick_accept(&self, listener: &Async<TcpListener>) -> Result<ChannelPtr> {
let (stream, peer_addr) = listener.accept().await?;
async fn tick_accept(&self, listener: &Async<TcpListener>) -> NetResult<ChannelPtr> {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(err) => {
error!("Error listening for connections: {}", err);
return Err(NetError::ServiceStopped);
}
};
info!("Accepted client: {}", peer_addr);
let channel = Channel::new(stream, peer_addr, self.settings.clone());

View File

@@ -10,7 +10,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::message_subscriber::{
MessageSubscriber, MessageSubscriberPtr, MessageSubscription,
};
@@ -26,7 +26,7 @@ pub struct Channel {
writer: Mutex<WriteHalf<Async<TcpStream>>>,
address: SocketAddr,
message_subscriber: MessageSubscriberPtr,
stop_subscriber: SubscriberPtr<Error>,
stop_subscriber: SubscriberPtr<NetError>,
stopped: AtomicBool,
settings: SettingsPtr,
}
@@ -51,9 +51,9 @@ impl Channel {
executor.spawn(self.receive_loop()).detach();
}
pub async fn send(self: Arc<Self>, message: messages::Message) -> Result<()> {
pub async fn send(self: Arc<Self>, message: messages::Message) -> NetResult<()> {
if self.stopped.load(Ordering::Relaxed) {
return Err(Error::ChannelStopped);
return Err(NetError::ChannelStopped);
}
// Catch failure and stop channel, return a net error
@@ -62,7 +62,7 @@ impl Channel {
Err(err) => {
error!("Channel error {}, closing {}", err, self.address());
self.stop().await;
Err(Error::ChannelStopped)
Err(NetError::ChannelStopped)
}
}
}
@@ -78,17 +78,17 @@ impl Channel {
self.message_subscriber.clone().subscribe(packet_type).await
}
pub async fn subscribe_stop(self: Arc<Self>) -> Subscription<Error> {
pub async fn subscribe_stop(self: Arc<Self>) -> Subscription<NetError> {
self.stop_subscriber.clone().subscribe().await
}
pub async fn stop(&self) {
self.stopped.store(false, Ordering::Relaxed);
let stop_err = Arc::new(Error::ChannelStopped);
let stop_err = Arc::new(NetError::ChannelStopped);
self.stop_subscriber.notify(stop_err).await;
}
async fn receive_loop(self: Arc<Self>) -> Result<()> {
async fn receive_loop(self: Arc<Self>) -> NetResult<()> {
let stop_sub = self.clone().subscribe_stop().await;
let reader = &mut *self.reader.lock().await;
@@ -100,12 +100,12 @@ impl Channel {
Err(err) => {
error!("Read error on channel {}", err);
self.stop().await;
Err(Error::ChannelStopped)
Err(NetError::ChannelStopped)
}
}
}
stop_err = stop_sub.receive().fuse() => {
Err(clone_net_error(&*stop_err))
Err(*stop_err)
}
};

View File

@@ -4,7 +4,7 @@ use smol::{Async, Executor};
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::utility::sleep;
use crate::net::{Channel, ChannelPtr, SettingsPtr};
@@ -17,15 +17,15 @@ impl Connector {
Self { settings }
}
pub async fn connect(&self, hostaddr: SocketAddr) -> Result<ChannelPtr> {
pub async fn connect(&self, hostaddr: SocketAddr) -> NetResult<ChannelPtr> {
futures::select! {
stream_result = Async::<TcpStream>::connect(hostaddr).fuse() => {
match stream_result {
Ok(stream) => Ok(Channel::new(stream, hostaddr, self.settings.clone())),
Err(_) => Err(Error::ConnectFailed)
Err(_) => Err(NetError::ConnectFailed)
}
}
_ = sleep(self.settings.connect_timeout_seconds).fuse() => Err(Error::ConnectTimeout)
_ = sleep(self.settings.connect_timeout_seconds).fuse() => Err(NetError::ConnectTimeout)
}
}
}

View File

@@ -3,13 +3,13 @@ use rand::Rng;
use std::collections::HashMap;
use std::sync::Arc;
use crate::error::Result;
use crate::net::error::NetResult;
use crate::net::messages::{Message, PacketType};
use crate::net::utility::clone_net_error;
pub type MessageSubscriberPtr = Arc<MessageSubscriber>;
pub type MessageResult = Result<Arc<Message>>;
pub type MessageResult = NetResult<Arc<Message>>;
pub type MessageSubscriptionID = u64;
macro_rules! receive_message {
@@ -25,19 +25,6 @@ macro_rules! receive_message {
}};
}
trait CloneMessageResult {
fn clone(&self) -> Self;
}
impl CloneMessageResult for Result<Arc<Message>> {
fn clone(&self) -> Self {
match self {
Ok(message) => Ok(message.clone()),
Err(err) => Err(clone_net_error(err)),
}
}
}
pub struct MessageSubscription {
id: MessageSubscriptionID,
filter: PacketType,
@@ -119,7 +106,7 @@ impl MessageSubscriber {
self.subs.lock().await.remove(&sub_id);
}
pub async fn notify(&self, message_result: Result<Arc<Message>>) {
pub async fn notify(&self, message_result: NetResult<Arc<Message>>) {
for sub in (*self.subs.lock().await).values() {
match sub.send(message_result.clone()).await {
Ok(()) => {}

View File

@@ -4,6 +4,7 @@ use std::net::TcpStream;
pub mod acceptor;
pub mod channel;
pub mod connector;
pub mod error;
#[macro_use]
pub mod message_subscriber;
pub mod hosts;

View File

@@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use crate::error::Result;
use crate::net::error::NetResult;
use crate::net::sessions::{InboundSession, SeedSession};
use crate::net::{Channel, ChannelPtr, Connector, Hosts, HostsPtr, Settings, SettingsPtr};
@@ -31,7 +31,7 @@ impl P2p {
}
/// Invoke startup and seeding sequence. Call from constructing thread.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
// Start manual connections
// Start seed session
let seed = SeedSession::new(Arc::downgrade(&self));
@@ -41,10 +41,10 @@ impl P2p {
/// Synchronize the blockchain and then begin long running sessions,
/// call after start() is invoked.
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
let inbound = InboundSession::new(Arc::downgrade(&self));
let inbound_task = inbound.start(executor.clone());
inbound_task.await
inbound.start(executor.clone())?;
Ok(())
}
pub async fn store(self: Arc<Self>, channel: ChannelPtr) {

View File

@@ -3,7 +3,7 @@ use rand::Rng;
use smol::{Executor, Task};
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::messages;
use crate::net::utility::{clone_net_error, sleep};
use crate::net::{ChannelPtr, SettingsPtr};
@@ -18,11 +18,11 @@ impl ProtocolPing {
Arc::new(Self { channel, settings })
}
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Task<Result<()>> {
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Task<NetResult<()>> {
executor.spawn(self.run_ping_pong())
}
async fn run_ping_pong(self: Arc<Self>) -> Result<()> {
async fn run_ping_pong(self: Arc<Self>) -> NetResult<()> {
let pong_sub = self
.channel
.clone()

View File

@@ -1,22 +0,0 @@
use futures::FutureExt;
use smol::Executor;
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::{ChannelPtr, SettingsPtr};
pub struct ProtocolPong {
channel: ChannelPtr,
settings: SettingsPtr,
}
impl ProtocolPong {
pub fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc<Self> {
Arc::new(Self { channel, settings })
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
Ok(())
}
}

View File

@@ -3,7 +3,7 @@ use owning_ref::OwningRef;
use smol::Executor;
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::messages;
use crate::net::{ChannelPtr, HostsPtr, SettingsPtr};
@@ -22,7 +22,7 @@ impl ProtocolSeed {
})
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
let addr_sub = self
.channel
.clone()
@@ -43,7 +43,7 @@ impl ProtocolSeed {
Ok(())
}
pub async fn send_own_address(&self) -> Result<()> {
pub async fn send_own_address(&self) -> NetResult<()> {
match self.settings.external_addr {
Some(addr) => {
let addr = messages::Message::Addrs(messages::AddrsMessage { addrs: vec![addr] });

View File

@@ -2,7 +2,7 @@ use futures::FutureExt;
use smol::Executor;
use std::sync::Arc;
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::messages;
use crate::net::utility::{clone_net_error, sleep};
use crate::net::{ChannelPtr, SettingsPtr};
@@ -17,25 +17,25 @@ impl ProtocolVersion {
Arc::new(Self { channel, settings })
}
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
// Start timer
// Send version, wait for verack
// Wait for version, send verack
// Fin.
futures::select! {
_ = self.clone().exchange_versions(executor).fuse() => Ok(()),
_ = sleep(self.settings.channel_handshake_seconds).fuse() => Err(Error::ChannelTimeout)
_ = sleep(self.settings.channel_handshake_seconds).fuse() => Err(NetError::ChannelTimeout)
}
}
async fn exchange_versions(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
async fn exchange_versions(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
let send = executor.spawn(self.clone().send_version());
let recv = executor.spawn(self.recv_version());
send.await.and(recv.await)
}
async fn send_version(self: Arc<Self>) -> Result<()> {
async fn send_version(self: Arc<Self>) -> NetResult<()> {
let version = messages::Message::Version(messages::VersionMessage {});
self.channel.clone().send(version).await?;
@@ -43,7 +43,7 @@ impl ProtocolVersion {
Ok(())
}
async fn recv_version(self: Arc<Self>) -> Result<()> {
async fn recv_version(self: Arc<Self>) -> NetResult<()> {
let version_sub = self
.channel
.clone()

View File

@@ -3,15 +3,18 @@ use log::*;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::protocols::{ProtocolPing, ProtocolSeed};
use crate::net::sessions::Session;
use crate::net::{Acceptor, AcceptorPtr};
use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr};
use crate::net::utility::clone_net_error;
use crate::system::{StoppableTask, StoppableTaskPtr};
pub struct InboundSession {
p2p: Weak<P2p>,
acceptor: AcceptorPtr,
accept_task: StoppableTaskPtr,
}
impl InboundSession {
@@ -23,19 +26,27 @@ impl InboundSession {
let acceptor = Acceptor::new(settings);
Arc::new(Self { p2p, acceptor })
Arc::new(Self { p2p, acceptor, accept_task: StoppableTask::new() })
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
match self.p2p().settings().inbound {
Some(accept_addr) => {
self.start_accept_session(accept_addr, executor).await?;
self.clone().start_accept_session(accept_addr, executor.clone())?;
}
None => {
info!("Not configured for accepting incoming connections.");
return Ok(());
}
}
self.accept_task.clone().start(
self.clone().channel_sub_loop(),
// Ignore stop handler
|_| { async {} },
NetError::ServiceStopped,
executor);
Ok(())
}
@@ -43,14 +54,15 @@ impl InboundSession {
self.acceptor.stop().await;
}
async fn start_accept_session(
fn start_accept_session(
self: Arc<Self>,
accept_addr: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
) -> NetResult<()> {
info!("Starting inbound session on {}", accept_addr);
match self.acceptor.clone().accept(accept_addr, executor) {
Ok(()) => {}
match self.acceptor.clone().start(accept_addr, executor) {
Ok(()) => {
}
Err(err) => {
error!("Error starting listener: {}", err);
return Err(err);
@@ -58,6 +70,13 @@ impl InboundSession {
}
Ok(())
}
async fn channel_sub_loop(self: Arc<Self>) -> NetResult<()> {
let channel_sub = self.acceptor.clone().subscribe().await;
loop {
//let channel = (*channel_sub.receive().await)?;
}
}
}
impl Session for InboundSession {

View File

@@ -3,7 +3,7 @@ use log::*;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use crate::error::{Error, Result};
use crate::net::error::{NetError, NetResult};
use crate::net::protocols::{ProtocolPing, ProtocolSeed};
use crate::net::sessions::Session;
use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr};
@@ -17,7 +17,7 @@ impl SeedSession {
Arc::new(Self { p2p })
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
let settings = {
let p2p = self.p2p.upgrade().unwrap();
p2p.settings()
@@ -28,7 +28,7 @@ impl SeedSession {
// if seeds empty then seeding required but empty
if settings.seeds.is_empty() {
error!("Seeding is required but no seeds are configured.");
return Err(Error::OperationFailed);
return Err(NetError::OperationFailed);
}
let mut tasks = Vec::new();
@@ -52,7 +52,7 @@ impl SeedSession {
self: Arc<Self>,
seed: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
) -> NetResult<()> {
let (hosts, settings) = {
let p2p = self.p2p.upgrade().unwrap();
(p2p.hosts(), p2p.settings())
@@ -83,7 +83,7 @@ impl SeedSession {
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
) -> NetResult<()> {
let handshake_task = self.perform_handshake_protocols(channel.clone(), executor.clone());
// start channel
@@ -98,7 +98,7 @@ impl SeedSession {
hosts: HostsPtr,
settings: SettingsPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
) -> NetResult<()> {
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let ping_task = protocol_ping.start(executor.clone());

View File

@@ -2,7 +2,7 @@ use async_trait::async_trait;
use smol::Executor;
use std::sync::Arc;
use crate::error::Result;
use crate::net::error::NetResult;
use crate::net::p2p::P2pPtr;
use crate::net::protocols::ProtocolVersion;
use crate::net::ChannelPtr;
@@ -22,7 +22,7 @@ pub trait Session {
&self,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
) -> NetResult<()> {
let p2p = self.p2p();
// Perform handshake

View File

@@ -3,8 +3,6 @@ use futures::Future;
use futures::FutureExt;
use std::sync::Arc;
use crate::error::{Error, Result};
pub type StoppableTaskPtr = Arc<StoppableTask>;
pub struct StoppableTask {
@@ -26,20 +24,22 @@ impl StoppableTask {
let _ = self.stop_send.send(()).await;
}
pub fn start<'a, MainFut, StopFut, StopFn>(
pub fn start<'a, MainFut, StopFut, StopFn, Error>(
self: Arc<Self>,
main: MainFut,
stop_handler: StopFn,
stop_value: Error,
executor: Arc<Executor<'a>>,
) where
MainFut: Future<Output = Result<()>> + Send + 'a,
MainFut: Future<Output = std::result::Result<(), Error>> + Send + 'a,
StopFut: Future<Output = ()> + Send,
StopFn: FnOnce(Result<()>) -> StopFut + Send + 'a,
StopFn: FnOnce(std::result::Result<(), Error>) -> StopFut + Send + 'a,
Error: std::error::Error + Send + 'a
{
executor
.spawn(async move {
let result = futures::select! {
_ = self.stop_recv.recv().fuse() => Err(Error::ServiceStopped),
_ = self.stop_recv.recv().fuse() => Err(stop_value),
result = main.fuse() => result
};