mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
net: fix race condition bug in channel
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use futures::{
|
||||
io::{ReadHalf, WriteHalf},
|
||||
@@ -63,7 +62,7 @@ pub struct Channel {
|
||||
message_subsystem: MessageSubsystem,
|
||||
stop_subscriber: SubscriberPtr<Error>,
|
||||
receive_task: StoppableTaskPtr,
|
||||
stopped: AtomicBool,
|
||||
stopped: Mutex<bool>,
|
||||
info: Mutex<ChannelInfo>,
|
||||
}
|
||||
|
||||
@@ -86,7 +85,7 @@ impl Channel {
|
||||
message_subsystem,
|
||||
stop_subscriber: Subscriber::new(),
|
||||
receive_task: StoppableTask::new(),
|
||||
stopped: AtomicBool::new(false),
|
||||
stopped: Mutex::new(false),
|
||||
info: Mutex::new(ChannelInfo::new()),
|
||||
})
|
||||
}
|
||||
@@ -115,13 +114,15 @@ impl Channel {
|
||||
/// the channel has been closed.
|
||||
pub async fn stop(&self) {
|
||||
debug!(target: "net", "Channel::stop() [START, address={}]", self.address());
|
||||
assert!(!self.stopped.load(Ordering::Relaxed));
|
||||
// Changes memory ordering to relaxed. We don't need strict thread locking here.
|
||||
self.stopped.store(false, Ordering::Relaxed);
|
||||
self.stop_subscriber.notify(Error::ChannelStopped).await;
|
||||
self.receive_task.stop().await;
|
||||
self.message_subsystem.trigger_error(Error::ChannelStopped).await;
|
||||
debug!(target: "net", "Channel::stop() [END, address={}]", self.address());
|
||||
let mut stopped = self.stopped.lock().await;
|
||||
if !*stopped {
|
||||
*stopped = true;
|
||||
self.stop_subscriber.notify(Error::ChannelStopped).await;
|
||||
self.receive_task.stop().await;
|
||||
self.message_subsystem.trigger_error(Error::ChannelStopped).await;
|
||||
debug!(target: "net", "Channel::stop() [END, address={}]", self.address());
|
||||
}
|
||||
drop(stopped);
|
||||
}
|
||||
|
||||
/// Creates a subscription to a stopped signal.
|
||||
@@ -149,8 +150,13 @@ impl Channel {
|
||||
M::name(),
|
||||
self.address()
|
||||
);
|
||||
if self.stopped.load(Ordering::Relaxed) {
|
||||
return Err(Error::ChannelStopped)
|
||||
|
||||
// TODO can we use RwLock here instead of Mutex
|
||||
{
|
||||
let stopped = *self.stopped.lock().await;
|
||||
if stopped {
|
||||
return Err(Error::ChannelStopped)
|
||||
}
|
||||
}
|
||||
|
||||
// Catch failure and stop channel, return a net error
|
||||
|
||||
Reference in New Issue
Block a user