diff --git a/src/net/channel.rs b/src/net/channel.rs index c569b3552..5a4efe00e 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -1,5 +1,4 @@ use async_std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicBool, Ordering}; use futures::{ io::{ReadHalf, WriteHalf}, @@ -63,7 +62,7 @@ pub struct Channel { message_subsystem: MessageSubsystem, stop_subscriber: SubscriberPtr, receive_task: StoppableTaskPtr, - stopped: AtomicBool, + stopped: Mutex, info: Mutex, } @@ -86,7 +85,7 @@ impl Channel { message_subsystem, stop_subscriber: Subscriber::new(), receive_task: StoppableTask::new(), - stopped: AtomicBool::new(false), + stopped: Mutex::new(false), info: Mutex::new(ChannelInfo::new()), }) } @@ -115,13 +114,15 @@ impl Channel { /// the channel has been closed. pub async fn stop(&self) { debug!(target: "net", "Channel::stop() [START, address={}]", self.address()); - assert!(!self.stopped.load(Ordering::Relaxed)); - // Changes memory ordering to relaxed. We don't need strict thread locking here. - self.stopped.store(false, Ordering::Relaxed); - self.stop_subscriber.notify(Error::ChannelStopped).await; - self.receive_task.stop().await; - self.message_subsystem.trigger_error(Error::ChannelStopped).await; - debug!(target: "net", "Channel::stop() [END, address={}]", self.address()); + let mut stopped = self.stopped.lock().await; + if !*stopped { + *stopped = true; + self.stop_subscriber.notify(Error::ChannelStopped).await; + self.receive_task.stop().await; + self.message_subsystem.trigger_error(Error::ChannelStopped).await; + debug!(target: "net", "Channel::stop() [END, address={}]", self.address()); + } + drop(stopped); } /// Creates a subscription to a stopped signal. @@ -149,8 +150,13 @@ impl Channel { M::name(), self.address() ); - if self.stopped.load(Ordering::Relaxed) { - return Err(Error::ChannelStopped) + + // TODO can we use RwLock here instead of Mutex + { + let stopped = *self.stopped.lock().await; + if stopped { + return Err(Error::ChannelStopped) + } } // Catch failure and stop channel, return a net error