diff --git a/src/net/channel.rs b/src/net/channel.rs index 19a0e735a..ca5888604 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -10,6 +10,7 @@ use std::net::{SocketAddr, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use crate::error; use crate::net::error::{NetError, NetResult}; use crate::net::message_subscriber::{ MessageSubscriber, MessageSubscriberPtr, MessageSubscription, @@ -87,6 +88,13 @@ impl Channel { self.stop_subscriber.notify(stop_err).await; } + fn is_eof_error(err: &error::Error) -> bool { + match err { + error::Error::Io(io_err) => io_err.kind() == std::io::ErrorKind::UnexpectedEof, + _ => false + } + } + async fn receive_loop(self: Arc) -> NetResult<()> { let stop_sub = self.clone().subscribe_stop().await; let reader = &mut *self.reader.lock().await; @@ -97,7 +105,11 @@ impl Channel { match message_result { Ok(message) => Ok(Arc::new(message)), Err(err) => { - error!("Read error on channel: {}", err); + if Self::is_eof_error(&err) { + info!("Closing channel {} disconnected", self.address()); + } else { + error!("Read error on channel: {}", err); + } self.stop().await; Err(NetError::ChannelStopped) } diff --git a/src/net/messages.rs b/src/net/messages.rs index c41d686fb..8f53ffb2c 100644 --- a/src/net/messages.rs +++ b/src/net/messages.rs @@ -343,6 +343,7 @@ pub async fn read_packet(stream: &mut R) -> Result // Packets have a 4 byte header of magic digits // This is used for network debugging let mut magic = [0u8; 4]; + //debug!("reading magic..."); stream.read_exact(&mut magic).await?; //debug!("read magic {:?}", magic); if magic != MAGIC_BYTES { diff --git a/src/net/sessions/seed_session.rs b/src/net/sessions/seed_session.rs index cfc187c76..25b3c7d1d 100644 --- a/src/net/sessions/seed_session.rs +++ b/src/net/sessions/seed_session.rs @@ -94,9 +94,11 @@ impl SeedSession { let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); protocol_ping.start(executor.clone()).await; - let protocol_seed = ProtocolSeed::new(channel, hosts, settings.clone()); + let protocol_seed = ProtocolSeed::new(channel.clone(), hosts, settings.clone()); protocol_seed.start(executor.clone()).await?; + channel.stop().await; + Ok(()) } }