From 6453c5ea3784ee91da2df7fe731c1283774192d3 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sun, 24 Apr 2022 18:21:43 +0300 Subject: [PATCH] bin/tau: using futures::select for receivng update from raft and rpc --- Cargo.lock | 2 +- bin/tau/taud/Cargo.toml | 2 +- bin/tau/taud/src/main.rs | 49 +++++++++++++++++++--------------------- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 083d63033..aaf97a0c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3838,7 +3838,7 @@ dependencies = [ "ctrlc-async", "darkfi", "easy-parallel", - "futures-lite", + "futures", "log", "num_cpus", "rand", diff --git a/bin/tau/taud/Cargo.toml b/bin/tau/taud/Cargo.toml index 9ad0238fd..c322cb6b9 100644 --- a/bin/tau/taud/Cargo.toml +++ b/bin/tau/taud/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.53" async-channel = "1.6.1" async-executor = "1.4.1" easy-parallel = "3.2.0" -futures-lite = "1.12.0" +futures = "0.3.21" # Misc clap = {version = "3.1.12", features = ["derive"]} diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index a1d0887ec..9476cdd9e 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -3,9 +3,10 @@ use std::path::PathBuf; use async_executor::Executor; use easy_parallel::Parallel; -use futures_lite::future; +use futures::{select, FutureExt}; use log::{info, warn}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use smol::future; use structopt_toml::StructOptToml; use darkfi::{ @@ -74,27 +75,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, datastore_path.clone())); let datastore_path_cloned = datastore_path.clone(); - let recv_update_from_rpc: smol::Task> = executor.spawn(async move { - loop { - let task_info = rpc_rcv.recv().await.map_err(Error::from)?; - if let Some(tk) = task_info { - info!(target: "tau", "save the received task {:?}", tk); - tk.save(&datastore_path_cloned)?; - raft_sender.send(tk).await.map_err(Error::from)?; - } - } - }); - - let datastore_path_cloned = datastore_path.clone(); - let recv_update_from_raft: smol::Task> = executor.spawn(async move { - loop { - let task = commits.recv().await.map_err(Error::from)?; - info!(target: "tau", "receive update from the commits {:?}", task); - task.save(&datastore_path_cloned)?; - } - }); - - let initial_sync: smol::Task> = executor.spawn(async move { + let recv_update: smol::Task> = executor.spawn(async move { info!(target: "tau", "Start initial sync"); info!(target: "tau", "Upload local tasks"); let tasks = MonthTasks::load_current_open_tasks(&datastore_path)?; @@ -103,7 +84,25 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { info!(target: "tau", "send local task {:?}", task); initial_sync_raft_sender.send(task).await.map_err(Error::from)?; } - Ok(()) + + loop { + select! { + task = rpc_rcv.recv().fuse() => { + let task = task.map_err(Error::from)?; + if let Some(tk) = task { + info!(target: "tau", "save the received task {:?}", tk); + tk.save(&datastore_path_cloned)?; + raft_sender.send(tk).await.map_err(Error::from)?; + } + } + task = commits.recv().fuse() => { + let task = task.map_err(Error::from)?; + info!(target: "tau", "receive update from the commits {:?}", task); + task.save(&datastore_path_cloned)?; + } + + } + } }); let executor_cloned = executor.clone(); @@ -116,9 +115,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { // cleaning up tasks running in the background signal.send(()).await.unwrap(); rpc_listener_taks.cancel().await; - recv_update_from_rpc.cancel().await; - recv_update_from_raft.cancel().await; - initial_sync.cancel().await; + recv_update.cancel().await; }) .unwrap();