bin/tau: using futures::select for receivng update from raft and rpc

This commit is contained in:
ghassmo
2022-04-24 18:21:43 +03:00
parent 58d96e62c9
commit 6453c5ea37
3 changed files with 25 additions and 28 deletions

2
Cargo.lock generated
View File

@@ -3838,7 +3838,7 @@ dependencies = [
"ctrlc-async",
"darkfi",
"easy-parallel",
"futures-lite",
"futures",
"log",
"num_cpus",
"rand",

View File

@@ -15,7 +15,7 @@ async-trait = "0.1.53"
async-channel = "1.6.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
futures-lite = "1.12.0"
futures = "0.3.21"
# Misc
clap = {version = "3.1.12", features = ["derive"]}

View File

@@ -3,9 +3,10 @@ use std::path::PathBuf;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::future;
use futures::{select, FutureExt};
use log::{info, warn};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
@@ -74,27 +75,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, datastore_path.clone()));
let datastore_path_cloned = datastore_path.clone();
let recv_update_from_rpc: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
let task_info = rpc_rcv.recv().await.map_err(Error::from)?;
if let Some(tk) = task_info {
info!(target: "tau", "save the received task {:?}", tk);
tk.save(&datastore_path_cloned)?;
raft_sender.send(tk).await.map_err(Error::from)?;
}
}
});
let datastore_path_cloned = datastore_path.clone();
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
let task = commits.recv().await.map_err(Error::from)?;
info!(target: "tau", "receive update from the commits {:?}", task);
task.save(&datastore_path_cloned)?;
}
});
let initial_sync: smol::Task<TaudResult<()>> = executor.spawn(async move {
let recv_update: smol::Task<TaudResult<()>> = executor.spawn(async move {
info!(target: "tau", "Start initial sync");
info!(target: "tau", "Upload local tasks");
let tasks = MonthTasks::load_current_open_tasks(&datastore_path)?;
@@ -103,7 +84,25 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
info!(target: "tau", "send local task {:?}", task);
initial_sync_raft_sender.send(task).await.map_err(Error::from)?;
}
Ok(())
loop {
select! {
task = rpc_rcv.recv().fuse() => {
let task = task.map_err(Error::from)?;
if let Some(tk) = task {
info!(target: "tau", "save the received task {:?}", tk);
tk.save(&datastore_path_cloned)?;
raft_sender.send(tk).await.map_err(Error::from)?;
}
}
task = commits.recv().fuse() => {
let task = task.map_err(Error::from)?;
info!(target: "tau", "receive update from the commits {:?}", task);
task.save(&datastore_path_cloned)?;
}
}
}
});
let executor_cloned = executor.clone();
@@ -116,9 +115,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
rpc_listener_taks.cancel().await;
recv_update_from_rpc.cancel().await;
recv_update_from_raft.cancel().await;
initial_sync.cancel().await;
recv_update.cancel().await;
})
.unwrap();