migrate client_protocol to Arc<Self> model.

This commit is contained in:
narodnik
2020-12-20 17:58:33 +01:00
parent 939bc52072
commit 3af393cf78
6 changed files with 59 additions and 93 deletions

View File

@@ -1,7 +1,7 @@
#[macro_use]
extern crate clap;
use async_channel::unbounded;
use async_dup::Arc;
use std::sync::Arc;
use async_executor::Executor;
use async_std::sync::Mutex;
use easy_parallel::Parallel;
@@ -60,13 +60,13 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
let accept_addr = options.accept_addr.clone();
let mut client_slots: Vec<ClientProtocol> = vec![];
let mut client_slots = vec![];
for i in 0..options.connection_slots {
debug!("Starting connection slot {}", i);
let mut client = ClientProtocol::new(connections.clone());
client
.start(accept_addr.clone(), stored_addrs.clone(), executor.clone())
let mut client = ClientProtocol::new(connections.clone(), accept_addr.clone(), stored_addrs.clone());
client.clone()
.start(executor.clone())
.await;
client_slots.push(client);
}
@@ -74,12 +74,11 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
for remote_addr in options.manual_connects {
debug!("Starting connection (manual) to {}", remote_addr);
let mut client = ClientProtocol::new(connections.clone());
client
let mut client = ClientProtocol::new(connections.clone(), accept_addr.clone(),
stored_addrs.clone());
client.clone()
.start_manual(
remote_addr,
accept_addr.clone(),
stored_addrs.clone(),
executor.clone(),
)
.await;

View File

@@ -1,4 +1,4 @@
use async_dup::Arc;
use std::sync::Arc;
use futures::prelude::*;
use log::*;
use num_enum::{IntoPrimitive, TryFromPrimitive};

View File

@@ -1,4 +1,5 @@
use async_dup::Arc;
use async_std::sync::Mutex;
use std::sync::Arc;
use log::*;
use rand::seq::SliceRandom;
use smol::{Async, Executor};
@@ -14,18 +15,25 @@ pub struct ClientProtocol {
send_sx: async_channel::Sender<net::Message>,
send_rx: async_channel::Receiver<net::Message>,
connections: ConnectionsMap,
main_process: Option<smol::Task<()>>,
main_process: Mutex<Option<smol::Task<()>>>,
accept_addr: Option<SocketAddr>,
stored_addrs: AddrsStorage,
}
impl ClientProtocol {
pub fn new(connections: ConnectionsMap) -> Self {
pub fn new(connections: ConnectionsMap, accept_addr: Option<SocketAddr>,
stored_addrs: AddrsStorage,
) -> Arc<Self> {
let (send_sx, send_rx) = async_channel::unbounded::<net::Message>();
Self {
Arc::new(Self {
send_sx,
send_rx,
connections,
main_process: None,
}
main_process: Mutex::new(None),
accept_addr,
stored_addrs
})
}
pub fn get_send_pipe(&self) -> async_channel::Sender<net::Message> {
@@ -33,6 +41,7 @@ impl ClientProtocol {
}
async fn fetch_random_addr(
self: Arc<Self>,
accept_addr: &Option<SocketAddr>,
stored_addrs: &AddrsStorage,
connections: &ConnectionsMap,
@@ -58,19 +67,15 @@ impl ClientProtocol {
}
pub async fn start(
&mut self,
accept_addr: Option<SocketAddr>,
stored_addrs: AddrsStorage,
self: Arc<Self>,
executor: Arc<Executor<'_>>,
) {
let connections = self.connections.clone();
let (send_sx, send_rx) = (self.send_sx.clone(), self.send_rx.clone());
let executor2 = executor.clone();
let self2 = self.clone();
self.main_process = Some(executor.spawn(async move {
*self2.main_process.lock().await = Some(executor.spawn(async move {
loop {
let addr = match stored_addrs.lock().await.choose(&mut rand_core::OsRng) {
let addr = match self.stored_addrs.lock().await.choose(&mut rand_core::OsRng) {
Some(addr) => addr.clone(),
None => {
debug!("No addresses in store. Sleeping for 2 secs before retrying...");
@@ -78,10 +83,10 @@ impl ClientProtocol {
continue;
}
};
if connections.lock().await.contains_key(&addr) {
if self.connections.lock().await.contains_key(&addr) {
continue;
}
if let Some(accept_addr) = accept_addr {
if let Some(accept_addr) = self.accept_addr {
if addr == accept_addr {
continue;
}
@@ -89,12 +94,8 @@ impl ClientProtocol {
debug!("Attempting connect to {}", addr);
Self::try_connect_process(
self.try_connect_process(
addr,
connections.clone(),
accept_addr.clone(),
stored_addrs.clone(),
(send_sx.clone(), send_rx.clone()),
executor2.clone(),
)
.await;
@@ -106,28 +107,20 @@ impl ClientProtocol {
}
pub async fn start_manual(
&mut self,
self: Arc<Self>,
remote_addr: SocketAddr,
accept_addr: Option<SocketAddr>,
stored_addrs: AddrsStorage,
executor: Arc<Executor<'_>>,
) {
let connections = self.connections.clone();
let (send_sx, send_rx) = (self.send_sx.clone(), self.send_rx.clone());
let executor2 = executor.clone();
let self2 = self.clone();
self.main_process = Some(executor.spawn(async move {
*self2.main_process.lock().await = Some(executor.spawn(async move {
loop {
for _ in 0..4 {
debug!("Attempting connect to {}", remote_addr);
Self::try_connect_process(
self.try_connect_process(
remote_addr,
connections.clone(),
accept_addr.clone(),
stored_addrs.clone(),
(send_sx.clone(), send_rx.clone()),
executor2.clone(),
)
.await;
@@ -138,25 +131,15 @@ impl ClientProtocol {
}
pub async fn try_connect_process(
&self,
address: SocketAddr,
connections: ConnectionsMap,
accept_addr: Option<SocketAddr>,
stored_addrs: AddrsStorage,
(send_sx, send_rx): (
async_channel::Sender<net::Message>,
async_channel::Receiver<net::Message>,
),
executor: Arc<Executor<'_>>,
) {
match Async::<TcpStream>::connect(address.clone()).await {
Ok(stream) => {
let _ = Self::handle_connect(
let _ = self.handle_connect(
stream,
stored_addrs.clone(),
connections,
address,
(send_sx.clone(), send_rx.clone()),
accept_addr,
executor,
)
.await;
@@ -168,32 +151,22 @@ impl ClientProtocol {
}
async fn handle_connect(
&self,
stream: Async<TcpStream>,
stored_addrs: AddrsStorage,
connections: ConnectionsMap,
address: SocketAddr,
(send_sx, send_rx): (
async_channel::Sender<net::Message>,
async_channel::Receiver<net::Message>,
),
accept_addr: Option<SocketAddr>,
executor: Arc<Executor<'_>>,
) -> Result<()> {
debug!("Connected to {}", address);
let stream = async_dup::Arc::new(stream);
connections
self.connections
.lock()
.await
.insert(address.clone(), send_sx.clone());
.insert(address.clone(), self.send_sx.clone());
// Run event loop
match Self::event_loop_process(
match self.event_loop_process(
stream,
stored_addrs,
(send_sx, send_rx),
accept_addr,
connections.clone(),
executor,
)
.await
@@ -205,7 +178,7 @@ impl ClientProtocol {
warn!("Server disconnected: {}", err);
}
}
connections.lock().await.remove(&address);
self.connections.lock().await.remove(&address);
Ok(())
}
@@ -225,30 +198,24 @@ impl ClientProtocol {
}
pub async fn event_loop_process(
&self,
mut stream: net::AsyncTcpStream,
stored_addrs: AddrsStorage,
(send_sx, send_rx): (
async_channel::Sender<net::Message>,
async_channel::Receiver<net::Message>,
),
accept_addr: Option<SocketAddr>,
connections: ConnectionsMap,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let inactivity_timer = net::InactivityTimer::new(executor.clone());
let clock = Arc::new(AtomicU64::new(0));
let send_sx2 = send_sx.clone();
let send_sx2 = self.send_sx.clone();
let clock2 = clock.clone();
let ping_task = executor.spawn(protocol_base::repeat_ping(send_sx2, clock2));
let mut send_addr_task = None;
if let Some(accept_addr) = accept_addr {
send_addr_task = Some(executor.spawn(Self::send_addr(send_sx.clone(), accept_addr)));
if let Some(accept_addr) = self.accept_addr {
send_addr_task = Some(executor.spawn(Self::send_addr(self.send_sx.clone(), accept_addr.clone())));
}
loop {
let event = net::select_event(&mut stream, &send_rx, &inactivity_timer).await?;
let event = net::select_event(&mut stream, &self.send_rx, &inactivity_timer).await?;
match event {
net::Event::Send(message) => {
@@ -258,10 +225,10 @@ impl ClientProtocol {
inactivity_timer.reset().await?;
protocol_base::protocol(
message,
&stored_addrs,
&send_sx,
&self.stored_addrs,
&self.send_sx,
Some(&clock),
connections.clone(),
self.connections.clone(),
)
.await?;
}

View File

@@ -1,4 +1,4 @@
use async_dup::Arc;
use std::sync::Arc;
use log::*;
use smol::{Async, Executor};
use std::net::{SocketAddr, TcpStream};
@@ -92,7 +92,7 @@ impl SeedProtocol {
.send(net::Message::GetAddrs(net::GetAddrsMessage {}))
.await?;
let stream = Arc::new(stream);
let stream = async_dup::Arc::new(stream);
// Run event loop
match Self::event_loop_process(stream, stored_addrs.clone(), (send_sx, send_rx), executor)

View File

@@ -1,4 +1,4 @@
use async_dup::Arc;
use std::sync::Arc;
use log::*;
use smol::{Async, Executor};
use std::net::{SocketAddr, TcpListener};
@@ -33,7 +33,7 @@ impl ServerProtocol {
&mut self,
address: SocketAddr,
stored_addrs: AddrsStorage,
executor: async_dup::Arc<Executor<'_>>,
executor: std::sync::Arc<Executor<'_>>,
) -> Result<()> {
let listener = Async::<TcpListener>::bind(address)?;
info!("Listening on {}", listener.get_ref().local_addr()?);
@@ -41,7 +41,7 @@ impl ServerProtocol {
loop {
let (stream, peer_addr) = listener.accept().await?;
info!("Accepted client: {}", peer_addr);
let stream = Arc::new(stream);
let stream = async_dup::Arc::new(stream);
let (send_sx, send_rx) = (self.send_sx.clone(), self.send_rx.clone());

View File

@@ -1,4 +1,4 @@
use async_dup::Arc;
use std::sync::Arc;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::prelude::*;
@@ -12,13 +12,13 @@ use smol::{Executor, Task};
//use crate::{net, serial, Channel, ClientProtocol, Result, SlabsManagerSafe};
use crate::{net::net, serial, Result};
pub type ConnectionsMap = async_dup::Arc<
pub type ConnectionsMap = std::sync::Arc<
async_std::sync::Mutex<HashMap<SocketAddr, async_channel::Sender<net::Message>>>,
>;
pub type AddrsStorage = async_dup::Arc<async_std::sync::Mutex<Vec<SocketAddr>>>;
pub type AddrsStorage = std::sync::Arc<async_std::sync::Mutex<Vec<SocketAddr>>>;
pub type Clock = async_dup::Arc<AtomicU64>;
pub type Clock = std::sync::Arc<AtomicU64>;
pub fn get_current_time() -> u64 {
let start = SystemTime::now();