diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 84587f6a7..5dc629d63 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -12,7 +12,6 @@ use darkfi::{ util::{ cli::{log_config, spawn_config, Config}, path::get_config_path, - sleep, }, Error, }; @@ -48,7 +47,6 @@ async fn start(settings: Settings, executor: Arc>) -> TaudResult<() let raft_sender = raft.get_broadcast(); let commits = raft.get_commits(); - let initial_sync_commits = commits.clone(); let initial_sync_raft_sender = raft_sender.clone(); // @@ -67,53 +65,48 @@ async fn start(settings: Settings, executor: Arc>) -> TaudResult<() let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path.clone())); let dataset_path_cloned = settings.dataset_path.clone(); - let recv_update_from_raft: smol::Task> = executor.spawn(async move { + let recv_update_from_rpc: smol::Task> = executor.spawn(async move { loop { let task_info = rpc_rcv.recv().await.map_err(Error::from)?; - if let Some(tk) = task_info { info!(target: "tau", "save the received task {:?}", tk); tk.save(&dataset_path_cloned)?; raft_sender.send(tk).await.map_err(Error::from)?; } + } + }); - let recv_commits = commits.lock().await; - for task_info in recv_commits.iter() { - info!(target: "tau", "update from the commits"); - task_info.save(&dataset_path_cloned)?; - } + let dataset_path_cloned = settings.dataset_path.clone(); + let recv_update_from_raft: smol::Task> = executor.spawn(async move { + loop { + let task = commits.recv().await.map_err(Error::from)?; + info!(target: "tau", "update from the commits"); + task.save(&dataset_path_cloned)?; } }); let dataset_path_cloned = settings.dataset_path.clone(); let initial_sync: smol::Task> = executor.spawn(async move { - info!(target: "tau", "Start initial sync waiting the network for 5 seconds"); - sleep(5).await; - - let recv_commits = initial_sync_commits.lock().await; - for task_info in recv_commits.iter() { - info!(target: "tau", "Save received tasks {:?}", task_info); - task_info.save(&dataset_path_cloned)?; - } - + info!(target: "tau", "Start initial sync"); info!(target: "tau", "Upload local tasks"); - let tasks = MonthTasks::load_current_open_tasks(&dataset_path_cloned)?; for task in tasks { info!(target: "tau", "send local task {:?}", task); initial_sync_raft_sender.send(task).await.map_err(Error::from)?; } - Ok(()) }); - let ex2 = executor.clone(); - ex2.spawn(listen_and_serve(server_config, rpc_interface, executor.clone())).detach(); + let executor_cloned = executor.clone(); + executor_cloned + .spawn(listen_and_serve(server_config, rpc_interface, executor.clone())) + .detach(); // blocking raft.start(p2p_settings.clone(), executor.clone()).await?; + recv_update_from_rpc.cancel().await; recv_update_from_raft.cancel().await; initial_sync.cancel().await; Ok(()) diff --git a/src/raft/raft.rs b/src/raft/raft.rs index 0737f216e..a5b2390ae 100644 --- a/src/raft/raft.rs +++ b/src/raft/raft.rs @@ -24,7 +24,7 @@ const HEARTBEATTIMEOUT: u64 = 100; const TIMEOUT: u64 = 300; const TIMEOUT_NODES: u64 = 300; -pub type BroadcastMsg = (async_channel::Sender, async_channel::Receiver); +pub type Broadcast = (async_channel::Sender, async_channel::Receiver); type Sender = (async_channel::Sender, async_channel::Receiver); pub struct Raft { @@ -38,8 +38,6 @@ pub struct Raft { voted_for: Option, logs: Logs, commit_length: u64, - // the log will be added to this vector if it's committed by the majority of nodes - commits: Arc>>, role: Role, @@ -56,7 +54,9 @@ pub struct Raft { sender: Sender, - broadcast_msg: BroadcastMsg, + broadcast_msg: Broadcast, + + broadcast_commits: Broadcast, datastore: DataStore, } @@ -74,7 +74,6 @@ impl Raft { let mut voted_for = None; let mut logs = Logs(vec![]); let mut commit_length = 0; - let mut commits = Arc::new(Mutex::new(vec![])); let datastore = if db_path.exists() { let datastore = DataStore::new(db_path_str)?; @@ -82,13 +81,13 @@ impl Raft { voted_for = datastore.voted_for.get_last()?.flatten(); logs = Logs(datastore.logs.get_all()?); commit_length = datastore.commits_length.get_last()?.unwrap_or(0); - commits = Arc::new(Mutex::new(datastore.commits.get_all()?)); datastore } else { DataStore::new(db_path_str)? }; let broadcast_msg = async_channel::unbounded::(); + let broadcast_commits = async_channel::unbounded::(); let sender = async_channel::unbounded::(); Ok(Self { @@ -97,7 +96,6 @@ impl Raft { voted_for, logs, commit_length, - commits, role: Role::Follower, current_leader: None, votes_received: vec![], @@ -107,6 +105,7 @@ impl Raft { last_term: 0, sender, broadcast_msg, + broadcast_commits, datastore, }) } @@ -167,8 +166,6 @@ impl Raft { if self_id.is_none() { return } - // wait the network - task::sleep(Duration::from_secs(5)).await; loop { debug!(target: "raft", "load node ids from p2p hosts ips"); task::sleep(Duration::from_millis(TIMEOUT_NODES * 10)).await; @@ -211,8 +208,8 @@ impl Raft { } } - pub fn get_commits(&self) -> Arc>> { - self.commits.clone() + pub fn get_commits(&self) -> async_channel::Receiver { + self.broadcast_commits.1.clone() } pub fn get_broadcast(&self) -> async_channel::Sender { @@ -572,7 +569,7 @@ impl Raft { } async fn push_commit(&mut self, commit: &[u8]) -> Result<()> { let commit: T = deserialize(commit)?; - self.commits.lock().await.push(commit.clone()); + self.broadcast_commits.0.send(commit.clone()).await?; self.datastore.commits.insert(&commit) } fn push_log(&mut self, i: &Log) -> Result<()> {