bin/tau: catching errors and exit signals

This commit is contained in:
ghassmo
2022-07-10 16:20:39 +03:00
parent 41803ebc37
commit eb101f22e2
2 changed files with 63 additions and 63 deletions

View File

@@ -96,8 +96,6 @@ async fn start_sync_loop(
secret_key: SecretKey,
mut rng: crypto_box::rand_core::OsRng,
) -> TaudResult<()> {
info!(target: "tau", "Start sync loop");
loop {
select! {
task = broadcast_rcv.recv().fuse() => {
@@ -175,7 +173,7 @@ async fn watch_files(
broadcast_snd.send(task).await.map_err(Error::from)?;
}
DebouncedEvent::Error(err, _) => {
warn!("Catching files changes: {}", err);
debug!("Watching files Error: {}", err);
break
}
_ => {}
@@ -305,13 +303,19 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
ctrlc_async::set_async_handler(async move {
warn!(target: "tau", "Catch exit signal");
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
tx.send(DebouncedEvent::Error(notify::Error::Generic("Catch exit signal".into()), None))
.unwrap();
if let Err(e) = signal.send(()).await {
error!("Error on sending exit signal: {}", e);
}
})
.unwrap();
raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?;
if let Ok(_) =
tx.send(DebouncedEvent::Error(notify::Error::Generic("Catch exit signal".into()), None))
{
warn!(target: "tau", "Terminating..");
}
Ok(())
}

View File

@@ -28,9 +28,9 @@ use super::{
const HEARTBEATTIMEOUT: u64 = 500;
const TIMEOUT: u64 = 6000;
const TIMEOUT_NODES: u64 = 1000;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 1000;
const SYNC_ATTEMPTS: u64 = 5;
const SYNC_ATTEMPTS: u64 = 8;
async fn load_node_ids_loop(
nodes: Arc<Mutex<HashMap<NodeId, Url>>>,
@@ -156,37 +156,33 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let load_ips_task =
executor.spawn(load_node_ids_loop(self.nodes.clone(), p2p.clone(), self.role.clone()));
let mut synced = false;
// Sync listener node
if self.role == Role::Listener {
let synced = Arc::new(Mutex::new(false));
let last_term =
if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 };
let sync_request = SyncRequest { logs_len: self.logs.len(), last_term };
info!("Start Syncing...");
for i in 0..SYNC_ATTEMPTS {
if *synced.lock().await {
info!("Synced Successfully!!");
for _ in 0..SYNC_ATTEMPTS {
if synced {
break
}
info!("Send sync request {}", i);
self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?;
self.waiting_for_sync(
executor.clone(),
p2p_recv_channel.clone(),
stop_signal.clone(),
synced.clone(),
)
.await?;
synced = self
.waiting_for_sync(
executor.clone(),
p2p_recv_channel.clone(),
stop_signal.clone(),
)
.await?;
}
if !(*synced.lock().await) {
error!("Syncing failed");
return Ok(())
if synced {
info!("SYNCED SUCCESSFULLY!!");
}
}
@@ -194,35 +190,38 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let broadcast_msg_rv = self.msgs_channel.1.clone();
loop {
let timeout: Duration = if self.role == Role::Leader {
Duration::from_millis(HEARTBEATTIMEOUT)
} else {
Duration::from_millis(rng.gen_range(0..HEARTBEATTIMEOUT) + TIMEOUT)
};
if !synced && self.role == Role::Listener {
error!("SYNCING FAILED!!");
} else {
loop {
let timeout: Duration = if self.role == Role::Leader {
Duration::from_millis(HEARTBEATTIMEOUT)
} else {
Duration::from_millis(rng.gen_range(0..HEARTBEATTIMEOUT) + TIMEOUT)
};
let result: Result<()>;
let result: Result<()>;
select! {
m = p2p_recv_channel.recv().fuse() => result = self.handle_method(m?).await,
m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await,
_ = task::sleep(timeout).fuse() => {
result = if self.role == Role::Leader {
self.send_heartbeat().await
}else {
self.send_vote_request().await
};
},
_ = stop_signal.recv().fuse() => break,
}
select! {
m = p2p_recv_channel.recv().fuse() => result = self.handle_method(m?).await,
m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await,
_ = task::sleep(timeout).fuse() => {
result = if self.role == Role::Leader {
self.send_heartbeat().await
}else {
self.send_vote_request().await
};
},
_ = stop_signal.recv().fuse() => break,
}
match result {
Ok(_) => {}
Err(e) => warn!(target: "raft", "warn: {}", e),
match result {
Ok(_) => {}
Err(e) => warn!(target: "raft", "warn: {}", e),
}
}
}
warn!(target: "raft", "Raft start() Exit Signal");
warn!(target: "raft", "Raft Terminating...");
load_ips_task.cancel().await;
p2p_send_task.cancel().await;
self.datastore.flush().await?;
@@ -255,7 +254,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
.await?;
}
info!(target: "raft", "Role: {:?}, broadcast a msg id: {:?} ", self.role, msg_id);
debug!(target: "raft", "Role: {:?}, broadcast a msg id: {:?} ", self.role, msg_id);
Ok(())
}
@@ -284,7 +283,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.broadcast_msg(&d, Some(msg.id)).await?;
}
NetMsgMethod::SyncRequest => {
info!("Receive sync request");
debug!(target: "raft", "Receive sync request");
let sr: SyncRequest = deserialize(&msg.payload)?;
self.receive_sync_request(&sr, msg.id).await?;
}
@@ -292,7 +291,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
debug!(target: "raft", "Role: {:?} receive msg id: {} recipient_id: {:?} method: {:?} ",
self.role, msg.id, &msg.recipient_id.is_some(), &msg.method);
self.role, msg.id, &msg.recipient_id.is_some(), &msg.method);
Ok(())
}
@@ -318,7 +317,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
wipe,
};
info!("Send sync response");
debug!(target: "raft", "Send sync response");
self.send(None, &serialize(&sync_response), NetMsgMethod::SyncResponse, None).await?;
} else {
self.send(
@@ -334,7 +333,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> {
info!("Receive sync response");
debug!(target: "raft", "Receive sync response");
if sr.wipe {
self.set_commit_length(&0)?;
self.push_logs(&sr.logs)?;
@@ -369,7 +368,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let random_id = if msg_id.is_some() { msg_id.unwrap() } else { OsRng.next_u64() };
debug!(target: "raft","Role: {:?} send a msg id: {} recipient_id: {:?} method: {:?} ",
self.role, random_id, &recipient_id.is_some(), &method);
self.role, random_id, &recipient_id.is_some(), &method);
let net_msg = NetMsg { id: random_id, recipient_id, payload: payload.to_vec(), method };
self.seen_msgs.lock().await.push(random_id);
@@ -383,8 +382,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
executor: Arc<Executor<'_>>,
p2p_recv_channel: async_channel::Receiver<NetMsg>,
stop_signal: async_channel::Receiver<()>,
synced: Arc<Mutex<bool>>,
) -> Result<()> {
) -> Result<bool> {
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
executor
.spawn(async move {
@@ -393,7 +391,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
})
.detach();
info!("Waiting for sync...");
loop {
select! {
msg = p2p_recv_channel.recv().fuse() => {
@@ -401,14 +398,13 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
if msg.method == NetMsgMethod::SyncResponse {
let sr: SyncResponse = deserialize(&msg.payload)?;
self.receive_sync_response(&sr).await?;
*synced.lock().await = true;
break
return Ok(true)
}},
_ = stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
_ = stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
}
}
Ok(())
Ok(false)
}
async fn send_heartbeat(&self) -> Result<()> {