diff --git a/src/net/channel.rs b/src/net/channel.rs index 18111d639..113dfda08 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -20,8 +20,9 @@ pub type ChannelPtr = Arc; /// Async channel interface that handles the sending of messages across the /// network. Public interface is used to create new channels, to stop and start -/// a channel, send messages. Also implements message functionality. Implements -/// the message subscriber subsystem. +/// a channel, and to send messages. +/// +/// Implements message functionality and the message subscriber subsystem. pub struct Channel { reader: Mutex>>, writer: Mutex>>, @@ -33,7 +34,9 @@ pub struct Channel { } impl Channel { - /// Create a new channel. + /// Sets up a new channel. Creates a reader and writer TCP stream and + /// summons the message subscriber subsystem. Performs a network + /// handshake on the subsystem dispatchers. pub async fn new(stream: Async, address: SocketAddr) -> Arc { let (reader, writer) = stream.split(); let reader = Mutex::new(reader); @@ -53,7 +56,8 @@ impl Channel { }) } - /// Start the channel. + /// Starts the channel. Runs a receive loop to start receiving messages or + /// handles a network failure. pub fn start(self: Arc, executor: Arc>) { debug!(target: "net", "Channel::start() [START, address={}]", self.address()); let self2 = self.clone(); @@ -67,10 +71,13 @@ impl Channel { debug!(target: "net", "Channel::start() [END, address={}]", self.address()); } - /// Stop the channel. + /// Stops the channel. Steps through each component of the channel + /// connection and sends a stop signal. Notifies all subscribers that + /// the channel has been closed. pub async fn stop(&self) { debug!(target: "net", "Channel::stop() [START, address={}]", self.address()); assert_eq!(self.stopped.load(Ordering::Relaxed), false); + // Changes memory ordering to relaxed. We don't need strict thread locking here. self.stopped.store(false, Ordering::Relaxed); self.stop_subscriber.notify(NetError::ChannelStopped).await; self.receive_task.stop().await; @@ -80,7 +87,7 @@ impl Channel { debug!(target: "net", "Channel::stop() [END, address={}]", self.address()); } - /// Stop the channel and create a new sub. + /// Creates a subscription to a stopped signal. pub async fn subscribe_stop(&self) -> Subscription { debug!(target: "net", "Channel::subscribe_stop() [START, address={}]", @@ -96,7 +103,9 @@ impl Channel { sub } - /// Send a message across a channel. + /// Sends a message across a channel. Calls function 'send_message' that + /// creates a new payload and sends it over the TCP connection as a + /// packet. Returns an error if something goes wrong. pub async fn send(&self, message: M) -> NetResult<()> { debug!(target: "net", "Channel::send() [START, command={:?}, address={}]", @@ -124,7 +133,10 @@ impl Channel { result } - /// Implements send message functionality. + /// Implements send message functionality. Creates a new payload and encodes + /// it. Then creates a message packet- the base type of the network- and + /// copies the payload into it. Then we send the packet over the TCP + /// stream. async fn send_message(&self, message: M) -> error::Result<()> { let mut payload = Vec::new(); message.encode(&mut payload)?; @@ -137,7 +149,7 @@ impl Channel { messages::send_packet(stream, packet).await } - /// Subscribe to a message type. + /// Subscribe to a messages on the message subsystem. pub async fn subscribe_msg(&self) -> NetResult> { debug!(target: "net", "Channel::subscribe_msg() [START, command={:?}, address={}]",