From eb101f22e2cd4b96f46aa6a8ab5e8cf1e3455ff1 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sun, 10 Jul 2022 16:20:39 +0300 Subject: [PATCH] bin/tau: catching errors and exit signals --- bin/tau/taud/src/main.rs | 16 +++--- src/raft/consensus.rs | 110 +++++++++++++++++++-------------------- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 942633e20..29d18cd20 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -96,8 +96,6 @@ async fn start_sync_loop( secret_key: SecretKey, mut rng: crypto_box::rand_core::OsRng, ) -> TaudResult<()> { - info!(target: "tau", "Start sync loop"); - loop { select! { task = broadcast_rcv.recv().fuse() => { @@ -175,7 +173,7 @@ async fn watch_files( broadcast_snd.send(task).await.map_err(Error::from)?; } DebouncedEvent::Error(err, _) => { - warn!("Catching files changes: {}", err); + debug!("Watching files Error: {}", err); break } _ => {} @@ -305,13 +303,19 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { ctrlc_async::set_async_handler(async move { warn!(target: "tau", "Catch exit signal"); // cleaning up tasks running in the background - signal.send(()).await.unwrap(); - tx.send(DebouncedEvent::Error(notify::Error::Generic("Catch exit signal".into()), None)) - .unwrap(); + if let Err(e) = signal.send(()).await { + error!("Error on sending exit signal: {}", e); + } }) .unwrap(); raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?; + if let Ok(_) = + tx.send(DebouncedEvent::Error(notify::Error::Generic("Catch exit signal".into()), None)) + { + warn!(target: "tau", "Terminating.."); + } + Ok(()) } diff --git a/src/raft/consensus.rs b/src/raft/consensus.rs index f8ad415bb..28366966f 100644 --- a/src/raft/consensus.rs +++ b/src/raft/consensus.rs @@ -28,9 +28,9 @@ use super::{ const HEARTBEATTIMEOUT: u64 = 500; const TIMEOUT: u64 = 6000; const TIMEOUT_NODES: u64 = 1000; -const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000; +const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 1000; -const SYNC_ATTEMPTS: u64 = 5; +const SYNC_ATTEMPTS: u64 = 8; async fn load_node_ids_loop( nodes: Arc>>, @@ -156,37 +156,33 @@ impl Raft { let load_ips_task = executor.spawn(load_node_ids_loop(self.nodes.clone(), p2p.clone(), self.role.clone())); + let mut synced = false; + // Sync listener node if self.role == Role::Listener { - let synced = Arc::new(Mutex::new(false)); - let last_term = if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 }; let sync_request = SyncRequest { logs_len: self.logs.len(), last_term }; info!("Start Syncing..."); - for i in 0..SYNC_ATTEMPTS { - if *synced.lock().await { - info!("Synced Successfully!!"); + for _ in 0..SYNC_ATTEMPTS { + if synced { break } - info!("Send sync request {}", i); self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?; - self.waiting_for_sync( - executor.clone(), - p2p_recv_channel.clone(), - stop_signal.clone(), - synced.clone(), - ) - .await?; + synced = self + .waiting_for_sync( + executor.clone(), + p2p_recv_channel.clone(), + stop_signal.clone(), + ) + .await?; } - - if !(*synced.lock().await) { - error!("Syncing failed"); - return Ok(()) + if synced { + info!("SYNCED SUCCESSFULLY!!"); } } @@ -194,35 +190,38 @@ impl Raft { let broadcast_msg_rv = self.msgs_channel.1.clone(); - loop { - let timeout: Duration = if self.role == Role::Leader { - Duration::from_millis(HEARTBEATTIMEOUT) - } else { - Duration::from_millis(rng.gen_range(0..HEARTBEATTIMEOUT) + TIMEOUT) - }; + if !synced && self.role == Role::Listener { + error!("SYNCING FAILED!!"); + } else { + loop { + let timeout: Duration = if self.role == Role::Leader { + Duration::from_millis(HEARTBEATTIMEOUT) + } else { + Duration::from_millis(rng.gen_range(0..HEARTBEATTIMEOUT) + TIMEOUT) + }; - let result: Result<()>; + let result: Result<()>; - select! { - m = p2p_recv_channel.recv().fuse() => result = self.handle_method(m?).await, - m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await, - _ = task::sleep(timeout).fuse() => { - result = if self.role == Role::Leader { - self.send_heartbeat().await - }else { - self.send_vote_request().await - }; - }, - _ = stop_signal.recv().fuse() => break, - } + select! { + m = p2p_recv_channel.recv().fuse() => result = self.handle_method(m?).await, + m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await, + _ = task::sleep(timeout).fuse() => { + result = if self.role == Role::Leader { + self.send_heartbeat().await + }else { + self.send_vote_request().await + }; + }, + _ = stop_signal.recv().fuse() => break, + } - match result { - Ok(_) => {} - Err(e) => warn!(target: "raft", "warn: {}", e), + match result { + Ok(_) => {} + Err(e) => warn!(target: "raft", "warn: {}", e), + } } } - - warn!(target: "raft", "Raft start() Exit Signal"); + warn!(target: "raft", "Raft Terminating..."); load_ips_task.cancel().await; p2p_send_task.cancel().await; self.datastore.flush().await?; @@ -255,7 +254,7 @@ impl Raft { .await?; } - info!(target: "raft", "Role: {:?}, broadcast a msg id: {:?} ", self.role, msg_id); + debug!(target: "raft", "Role: {:?}, broadcast a msg id: {:?} ", self.role, msg_id); Ok(()) } @@ -284,7 +283,7 @@ impl Raft { self.broadcast_msg(&d, Some(msg.id)).await?; } NetMsgMethod::SyncRequest => { - info!("Receive sync request"); + debug!(target: "raft", "Receive sync request"); let sr: SyncRequest = deserialize(&msg.payload)?; self.receive_sync_request(&sr, msg.id).await?; } @@ -292,7 +291,7 @@ impl Raft { } debug!(target: "raft", "Role: {:?} receive msg id: {} recipient_id: {:?} method: {:?} ", - self.role, msg.id, &msg.recipient_id.is_some(), &msg.method); + self.role, msg.id, &msg.recipient_id.is_some(), &msg.method); Ok(()) } @@ -318,7 +317,7 @@ impl Raft { wipe, }; - info!("Send sync response"); + debug!(target: "raft", "Send sync response"); self.send(None, &serialize(&sync_response), NetMsgMethod::SyncResponse, None).await?; } else { self.send( @@ -334,7 +333,7 @@ impl Raft { } async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> { - info!("Receive sync response"); + debug!(target: "raft", "Receive sync response"); if sr.wipe { self.set_commit_length(&0)?; self.push_logs(&sr.logs)?; @@ -369,7 +368,7 @@ impl Raft { let random_id = if msg_id.is_some() { msg_id.unwrap() } else { OsRng.next_u64() }; debug!(target: "raft","Role: {:?} send a msg id: {} recipient_id: {:?} method: {:?} ", - self.role, random_id, &recipient_id.is_some(), &method); + self.role, random_id, &recipient_id.is_some(), &method); let net_msg = NetMsg { id: random_id, recipient_id, payload: payload.to_vec(), method }; self.seen_msgs.lock().await.push(random_id); @@ -383,8 +382,7 @@ impl Raft { executor: Arc>, p2p_recv_channel: async_channel::Receiver, stop_signal: async_channel::Receiver<()>, - synced: Arc>, - ) -> Result<()> { + ) -> Result { let (timeout_s, timeout_r) = async_channel::unbounded::<()>(); executor .spawn(async move { @@ -393,7 +391,6 @@ impl Raft { }) .detach(); - info!("Waiting for sync..."); loop { select! { msg = p2p_recv_channel.recv().fuse() => { @@ -401,14 +398,13 @@ impl Raft { if msg.method == NetMsgMethod::SyncResponse { let sr: SyncResponse = deserialize(&msg.payload)?; self.receive_sync_response(&sr).await?; - *synced.lock().await = true; - break + return Ok(true) }}, - _ = stop_signal.recv().fuse() => break, - _ = timeout_r.recv().fuse() => break, + _ = stop_signal.recv().fuse() => break, + _ = timeout_r.recv().fuse() => break, } } - Ok(()) + Ok(false) } async fn send_heartbeat(&self) -> Result<()> {