Improve code and add comments

This commit is contained in:
narodnik
2021-03-06 18:06:16 +01:00
parent b12d38bdc7
commit 5934940f36
13 changed files with 41 additions and 38 deletions

View File

@@ -4,7 +4,7 @@ use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use crate::net::error::{NetError, NetResult};
use crate::net::{Channel, ChannelPtr, SettingsPtr};
use crate::net::{Channel, ChannelPtr};
use crate::system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription};
pub type AcceptorPtr = Arc<Acceptor>;
@@ -12,15 +12,13 @@ pub type AcceptorPtr = Arc<Acceptor>;
pub struct Acceptor {
channel_subscriber: SubscriberPtr<NetResult<ChannelPtr>>,
task: StoppableTaskPtr,
settings: SettingsPtr,
}
impl Acceptor {
pub fn new(settings: SettingsPtr) -> Arc<Self> {
pub fn new() -> Arc<Self> {
Arc::new(Self {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
settings,
})
}
@@ -48,14 +46,14 @@ impl Acceptor {
fn setup(accept_addr: SocketAddr) -> NetResult<Async<TcpListener>> {
let listener = match Async::<TcpListener>::bind(accept_addr) {
Ok(l) => l,
Ok(listener) => listener,
Err(err) => {
error!("Bind listener failed: {}", err);
return Err(NetError::OperationFailed);
}
};
let local_addr = match listener.get_ref().local_addr() {
Ok(a) => a,
Ok(addr) => addr,
Err(err) => {
error!("Failed to get local address: {}", err);
return Err(NetError::OperationFailed);
@@ -104,7 +102,7 @@ impl Acceptor {
};
info!("Accepted client: {}", peer_addr);
let channel = Channel::new(stream, peer_addr, self.settings.clone()).await;
let channel = Channel::new(stream, peer_addr).await;
Ok(channel)
}
}

View File

@@ -13,7 +13,6 @@ use crate::error;
use crate::net::error::{NetError, NetResult};
use crate::net::message_subscriber::{MessageSubscription, MessageSubsystem};
use crate::net::messages;
use crate::net::settings::SettingsPtr;
use crate::system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription};
pub type ChannelPtr = Arc<Channel>;
@@ -26,14 +25,12 @@ pub struct Channel {
stop_subscriber: SubscriberPtr<NetError>,
receive_task: StoppableTaskPtr,
stopped: AtomicBool,
settings: SettingsPtr,
}
impl Channel {
pub async fn new(
stream: Async<TcpStream>,
address: SocketAddr,
settings: SettingsPtr,
) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
@@ -50,7 +47,6 @@ impl Channel {
stop_subscriber: Subscriber::new(),
receive_task: StoppableTask::new(),
stopped: AtomicBool::new(false),
settings,
})
}

View File

@@ -19,7 +19,7 @@ impl Connector {
futures::select! {
stream_result = Async::<TcpStream>::connect(hostaddr).fuse() => {
match stream_result {
Ok(stream) => Ok(Channel::new(stream, hostaddr, self.settings.clone()).await),
Ok(stream) => Ok(Channel::new(stream, hostaddr).await),
Err(_) => Err(NetError::ConnectFailed)
}
}

View File

@@ -3,20 +3,16 @@ use rand::seq::SliceRandom;
use std::net::SocketAddr;
use std::sync::Arc;
use crate::net::SettingsPtr;
pub type HostsPtr = Arc<Hosts>;
pub struct Hosts {
addrs: Mutex<Vec<SocketAddr>>,
settings: SettingsPtr,
}
impl Hosts {
pub fn new(settings: SettingsPtr) -> Arc<Self> {
pub fn new() -> Arc<Self> {
Arc::new(Self {
addrs: Mutex::new(Vec::new()),
settings,
})
}

View File

@@ -207,7 +207,10 @@ impl MessageSubsystem {
}
}
pub async fn doteste() {
// This is a test function for the message subsystem code above
// Normall we would use the #[test] macro but cannot since it is async code
// Instead we call it using smol::block_on() in the unit test code after this func
async fn _do_message_subscriber_test() {
struct MyVersionMessage {
x: u32,
}
@@ -256,6 +259,7 @@ pub async fn doteste() {
// receive
// 1. do a get easy
let msg2 = sub.receive().await.unwrap();
assert_eq!(msg2.x, 110);
println!("{}", msg2.x);
subsystem.trigger_error(NetError::ChannelStopped).await;
@@ -265,3 +269,14 @@ pub async fn doteste() {
sub.unsubscribe().await;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_subscriber() {
smol::block_on(_do_message_subscriber_test());
}
}

View File

@@ -4,7 +4,6 @@ use std::io;
use std::net::SocketAddr;
use crate::error::{Error, Result};
pub use crate::net::AsyncTcpStream;
use crate::serial::{Decodable, Encodable, VarInt};
const MAGIC_BYTES: [u8; 4] = [0xd9, 0xef, 0xb6, 0x7d];

View File

@@ -1,11 +1,7 @@
use smol::Async;
use std::net::TcpStream;
pub mod acceptor;
pub mod channel;
pub mod connector;
pub mod error;
#[macro_use]
pub mod message_subscriber;
pub mod hosts;
pub mod messages;
@@ -15,8 +11,6 @@ pub mod sessions;
pub mod settings;
pub mod utility;
pub type AsyncTcpStream = async_dup::Arc<Async<TcpStream>>;
pub use acceptor::{Acceptor, AcceptorPtr};
pub use channel::{Channel, ChannelPtr};
pub use connector::Connector;

View File

@@ -31,7 +31,7 @@ impl P2p {
pending: Mutex::new(HashSet::new()),
channels: Mutex::new(HashMap::new()),
stop_subscriber: Subscriber::new(),
hosts: Hosts::new(settings.clone()),
hosts: Hosts::new(),
settings,
})
}

View File

@@ -6,7 +6,7 @@ use crate::net::error::NetResult;
use crate::net::message_subscriber::MessageSubscription;
use crate::net::messages;
use crate::net::protocols::{ProtocolJobsManager, ProtocolJobsManagerPtr};
use crate::net::{ChannelPtr, HostsPtr, SettingsPtr};
use crate::net::{ChannelPtr, HostsPtr};
pub struct ProtocolAddress {
channel: ChannelPtr,
@@ -15,13 +15,12 @@ pub struct ProtocolAddress {
get_addrs_sub: MessageSubscription<messages::GetAddrsMessage>,
hosts: HostsPtr,
settings: SettingsPtr,
jobsman: ProtocolJobsManagerPtr,
}
impl ProtocolAddress {
pub async fn new(channel: ChannelPtr, hosts: HostsPtr, settings: SettingsPtr) -> Arc<Self> {
pub async fn new(channel: ChannelPtr, hosts: HostsPtr) -> Arc<Self> {
let addrs_sub = channel
.clone()
.subscribe_msg::<messages::AddrsMessage>()
@@ -39,7 +38,6 @@ impl ProtocolAddress {
addrs_sub,
get_addrs_sub,
hosts,
settings,
jobsman: ProtocolJobsManager::new("ProtocolAddress", channel),
})
}

View File

@@ -29,6 +29,7 @@ impl ProtocolJobsManager {
executor.spawn(self.handle_stop()).detach()
}
/// Spawns a new task adding it to the internal queue
pub async fn spawn<'a, F>(&self, future: F, executor: ExecutorPtr<'a>)
where
F: Future<Output = NetResult<()>> + Send + 'a,
@@ -36,6 +37,7 @@ impl ProtocolJobsManager {
self.tasks.lock().await.push(executor.spawn(future))
}
/// This is run in start(). When the channel closes, we also stop all the tasks
async fn handle_stop(self: Arc<Self>) {
let stop_sub = self.channel.clone().subscribe_stop().await;
@@ -52,8 +54,10 @@ impl ProtocolJobsManager {
self.name,
self.channel.address()
);
// Take all the tasks from our internal queue...
let tasks = std::mem::take(&mut *self.tasks.lock().await);
for task in tasks {
// ... and cancel them
let _ = task.cancel().await;
}
}

View File

@@ -18,12 +18,7 @@ pub struct InboundSession {
impl InboundSession {
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
let settings = {
let p2p = p2p.upgrade().unwrap();
p2p.settings()
};
let acceptor = Acceptor::new(settings);
let acceptor = Acceptor::new();
Arc::new(Self {
p2p,
@@ -73,6 +68,7 @@ impl InboundSession {
result
}
/// Wait for all new channels created by the acceptor and call setup_channel() on them.
async fn channel_sub_loop(self: Arc<Self>, executor: Arc<Executor<'_>>) -> NetResult<()> {
let channel_sub = self.acceptor.clone().subscribe().await;
loop {
@@ -108,7 +104,7 @@ impl InboundSession {
let hosts = self.p2p().hosts().clone();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_addr = ProtocolAddress::new(channel, hosts, settings).await;
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;
protocol_addr.start(executor).await;

View File

@@ -95,6 +95,10 @@ impl OutboundSession {
}
}
/// Load a valid address that we can connect to.
/// Valid means we aren't connecting (pending state) or connected (open channel)
/// in another slot, and it isn't our own inbound address.
/// Retry otherwise.
async fn load_address(&self, slot_number: u32) -> NetResult<SocketAddr> {
let p2p = self.p2p();
let hosts = p2p.hosts();
@@ -146,7 +150,7 @@ impl OutboundSession {
let hosts = self.p2p().hosts().clone();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_addr = ProtocolAddress::new(channel, hosts, settings).await;
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;
protocol_addr.start(executor).await;

View File

@@ -36,6 +36,9 @@ impl SeedSession {
tasks.push(executor.spawn(self.clone().start_seed(i, seed.clone(), executor.clone())));
}
// This line loops through all the tasks and waits for them to finish.
// But if the seed_query_timeout_seconds times out before they are finished,
// then it will simply quit and the tasks will get dropped.
futures::select! {
_ = async move {
for (i, task) in tasks.into_iter().enumerate() {