bin/tau: watch changes on tasks files and broadcast accordingly

This commit is contained in:
ghassmo
2022-07-10 01:37:26 +03:00
parent 6675cc2262
commit 92cd0ed4c2
4 changed files with 333 additions and 76 deletions

259
Cargo.lock generated
View File

@@ -59,7 +59,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -170,7 +170,7 @@ dependencies = [
"slab",
"socket2",
"waker-fn",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -219,7 +219,7 @@ dependencies = [
"libc",
"once_cell",
"signal-hook",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -303,7 +303,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -654,7 +654,7 @@ dependencies = [
"num-integer",
"num-traits",
"time 0.1.44",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -756,7 +756,7 @@ dependencies = [
"regex",
"terminal_size",
"unicode-width",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -987,11 +987,11 @@ dependencies = [
"bitflags",
"crossterm_winapi",
"libc",
"mio",
"mio 0.8.4",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1000,7 +1000,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1094,7 +1094,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b37feaa84e6861e00a1f5e5aa8da3ee56d605c9992d33e082786754828e20865"
dependencies = [
"nix 0.24.1",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1105,7 +1105,7 @@ checksum = "598e9d68e769aa1283460a3b0ec0d049ccfb6170277aea37089fa3f58fd721a1"
dependencies = [
"async-std",
"nix 0.23.1",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1411,7 +1411,7 @@ checksum = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901"
dependencies = [
"libc",
"redox_users 0.3.5",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1441,7 +1441,7 @@ checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users 0.4.3",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1452,7 +1452,7 @@ checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users 0.4.3",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1514,7 +1514,7 @@ checksum = "439a1c2ba5611ad3ed731280541d36d2e9c4ac5e7fb818a27b604bdc5a6aa65b"
dependencies = [
"lazy_static",
"libc",
"winapi",
"winapi 0.3.9",
"wio",
]
@@ -1706,6 +1706,18 @@ dependencies = [
"subtle",
]
[[package]]
name = "filetime"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall 0.2.13",
"windows-sys 0.36.1",
]
[[package]]
name = "float-ord"
version = "0.2.0"
@@ -1752,7 +1764,7 @@ dependencies = [
"pathfinder_simd",
"servo-fontconfig",
"walkdir",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -1808,9 +1820,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
name = "fsevent"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6"
dependencies = [
"bitflags",
"fsevent-sys",
]
[[package]]
name = "fsevent-sys"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0"
dependencies = [
"libc",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "funty"
version = "2.0.0"
@@ -2214,6 +2261,26 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "inotify"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -2223,6 +2290,15 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "ircd"
version = "0.3.0"
@@ -2290,6 +2366,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
@@ -2311,6 +2397,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "leb128"
version = "0.2.5"
@@ -2330,7 +2422,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd"
dependencies = [
"cfg-if 1.0.0",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -2454,6 +2546,25 @@ dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.4"
@@ -2466,6 +2577,30 @@ dependencies = [
"windows-sys 0.36.1",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio 0.6.23",
"slab",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "more-asserts"
version = "0.2.2"
@@ -2490,6 +2625,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "net2"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "nix"
version = "0.23.1"
@@ -2524,6 +2670,24 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "4.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257"
dependencies = [
"bitflags",
"filetime",
"fsevent",
"fsevent-sys",
"inotify",
"libc",
"mio 0.6.23",
"mio-extras",
"walkdir",
"winapi 0.3.9",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2721,7 +2885,7 @@ dependencies = [
"libc",
"redox_syscall 0.2.13",
"smallvec",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -2907,7 +3071,7 @@ dependencies = [
"libc",
"log",
"wepoll-ffi",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -3241,7 +3405,7 @@ dependencies = [
"bitflags",
"libc",
"mach",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -3250,7 +3414,7 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -3274,7 +3438,7 @@ dependencies = [
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -3590,7 +3754,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"mio 0.8.4",
"signal-hook",
]
@@ -3682,7 +3846,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -3967,6 +4131,7 @@ dependencies = [
"futures",
"hex",
"log",
"notify",
"rand",
"serde",
"serde_json",
@@ -3989,7 +4154,7 @@ dependencies = [
"libc",
"redox_syscall 0.2.13",
"remove_dir_all",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4000,7 +4165,7 @@ checksum = "edd106a334b7657c10b7c540a0106114feadeb4dc314513e97df481d5d966f42"
dependencies = [
"byteorder",
"dirs 1.0.5",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4019,7 +4184,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4077,7 +4242,7 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4377,7 +4542,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi 0.3.9",
"winapi-util",
]
@@ -4498,7 +4663,7 @@ dependencies = [
"wasmer-types",
"wasmer-vm",
"wat",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4649,7 +4814,7 @@ dependencies = [
"wasmer-engine-universal-artifact",
"wasmer-types",
"wasmer-vm",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4733,7 +4898,7 @@ dependencies = [
"thiserror",
"wasmer-artifact",
"wasmer-types",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4792,7 +4957,7 @@ dependencies = [
"cfg-if 0.1.10",
"libc",
"memory_units",
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4821,6 +4986,12 @@ dependencies = [
"libc",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
version = "0.3.9"
@@ -4831,6 +5002,12 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
@@ -4843,7 +5020,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@@ -4944,7 +5121,17 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d129932f4644ac2396cb456385cbf9e63b5b30c6e8dc4820bdca4eb082037a5"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]]

View File

@@ -31,3 +31,4 @@ structopt = "0.3.26"
structopt-toml = "0.5.0"
crypto_box = {version = "0.7.2", features = ["std"]}
hex = "0.4.3"
notify = "4.0.17"

View File

@@ -11,7 +11,6 @@ use darkfi::{
server::RequestHandler,
},
util::Timestamp,
Error,
};
use crate::{
@@ -22,7 +21,6 @@ use crate::{
pub struct JsonRpcInterface {
dataset_path: PathBuf,
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
nickname: String,
}
@@ -36,8 +34,6 @@ struct BaseTaskInfo {
rank: Option<f32>,
}
// TODO: Make more like RPC in darkfid, this implies the method categories,
// and function signatures, and safety checks.
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
@@ -47,10 +43,6 @@ impl RequestHandler for JsonRpcInterface {
let params = req.params.as_array().unwrap();
if self.notify_queue_sender.send(None).await.is_err() {
return JsonError::new(ErrorCode::InternalError, None, req.id).into()
}
let rep = match req.method.as_str() {
Some("add") => self.add(params).await,
Some("get_ids") => self.get_ids(params).await,
@@ -66,12 +58,8 @@ impl RequestHandler for JsonRpcInterface {
}
impl JsonRpcInterface {
pub fn new(
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
dataset_path: PathBuf,
nickname: String,
) -> Self {
Self { notify_queue_sender, dataset_path, nickname }
pub fn new(dataset_path: PathBuf, nickname: String) -> Self {
Self { dataset_path, nickname }
}
// RPCAPI:
@@ -104,8 +92,7 @@ impl JsonRpcInterface {
new_task.set_project(&task.project);
new_task.set_assign(&task.assign);
self.notify_queue_sender.send(Some(new_task)).await.map_err(Error::from)?;
new_task.save(&self.dataset_path)?;
Ok(json!(true))
}
@@ -132,9 +119,7 @@ impl JsonRpcInterface {
}
let task = self.check_params_for_update(&params[0], &params[1])?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
task.save(&self.dataset_path)?;
Ok(json!(true))
}
@@ -160,7 +145,7 @@ impl JsonRpcInterface {
task.set_state(&state);
}
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
task.save(&self.dataset_path)?;
Ok(json!(true))
}
@@ -181,7 +166,8 @@ impl JsonRpcInterface {
let mut task: TaskInfo = self.load_task_by_id(&params[0])?;
task.set_comment(Comment::new(&comment_content, &self.nickname));
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
task.save(&self.dataset_path)?;
Ok(json!(true))
}

View File

@@ -1,10 +1,11 @@
use async_std::sync::{Arc, Mutex};
use std::{env, fs::create_dir_all};
use std::{env, fs::create_dir_all, sync::mpsc, time::Duration};
use async_executor::Executor;
use crypto_box::{aead::Aead, Box, SecretKey, KEY_SIZE};
use futures::{select, FutureExt};
use log::{debug, error, info, warn};
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use smol::future;
use structopt_toml::StructOptToml;
@@ -72,8 +73,22 @@ fn decrypt_task(encrypt_task: &EncryptedTask, secret_key: &SecretKey) -> TaudRes
Ok(task)
}
fn load_task_path_from_osstr(task_path: std::path::PathBuf) -> Option<String> {
if task_path.file_name().is_none() {
return None
}
let task_path = task_path.file_name().unwrap().to_str().unwrap_or("");
if task_path.is_empty() {
return None
}
Some(task_path.to_string())
}
async fn start_sync_loop(
rpc_rcv: async_channel::Receiver<Option<TaskInfo>>,
broadcast_rcv: async_channel::Receiver<TaskInfo>,
raft_msgs_sender: async_channel::Sender<EncryptedTask>,
commits_recv: async_channel::Receiver<EncryptedTask>,
datastore_path: std::path::PathBuf,
@@ -84,14 +99,11 @@ async fn start_sync_loop(
loop {
select! {
task = rpc_rcv.recv().fuse() => {
let task = task.map_err(Error::from)?;
if let Some(tk) = task {
info!(target: "tau", "Save the received task {:?}", tk);
let encrypted_task = encrypt_task(&tk, &secret_key,&mut rng)?;
tk.save(&datastore_path)?;
raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?;
}
task = broadcast_rcv.recv().fuse() => {
let tk = task.map_err(Error::from)?;
info!(target: "tau", "Save the received task {:?}", tk);
let encrypted_task = encrypt_task(&tk, &secret_key,&mut rng)?;
raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?;
}
task = commits_recv.recv().fuse() => {
let recv = task.map_err(Error::from)?;
@@ -110,6 +122,67 @@ async fn start_sync_loop(
}
}
async fn watch_files(
broadcast_snd: async_channel::Sender<TaskInfo>,
datastore_path: std::path::PathBuf,
(tx, rx): (mpsc::Sender<DebouncedEvent>, mpsc::Receiver<DebouncedEvent>),
) -> TaudResult<()> {
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1)).unwrap();
let watch_path = datastore_path.join("task");
info!("Start watching local tasks files: {:?}", &watch_path);
watcher.watch(watch_path, RecursiveMode::Recursive).unwrap();
let mut last_write = TaskInfo::new("", "", "", None, 0.0, &datastore_path)?;
loop {
let event = rx.recv();
if let Err(e) = event {
error!("Watch files error: {:?}", e);
continue
}
let event = event.unwrap();
match event {
DebouncedEvent::Write(ev) => {
let task_path = load_task_path_from_osstr(ev);
if task_path.is_none() {
continue
}
if let Ok(task) = TaskInfo::load(&task_path.unwrap(), &datastore_path) {
if last_write == task {
continue
}
last_write = task.clone();
broadcast_snd.send(task).await.map_err(Error::from)?;
}
}
DebouncedEvent::Create(ev) => {
let task_path = load_task_path_from_osstr(ev);
if task_path.is_none() {
continue
}
if let Ok(task) = TaskInfo::load(&task_path.unwrap(), &datastore_path) {
broadcast_snd.send(task).await.map_err(Error::from)?;
}
}
DebouncedEvent::Error(err, _) => {
warn!("Catching files changes: {}", err);
break
}
_ => {}
}
}
Ok(())
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let datastore_path = expand_path(&settings.datastore)?;
@@ -150,19 +223,17 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
SecretKey::try_from(sk_bytes)?
};
let (broadcast_snd, broadcast_rcv) = async_channel::unbounded::<TaskInfo>();
//
// RPC
//
let (rpc_snd, rpc_rcv) = async_channel::unbounded::<Option<TaskInfo>>();
let rpc_interface =
Arc::new(JsonRpcInterface::new(rpc_snd, datastore_path.clone(), nickname.unwrap()));
let rpc_interface = Arc::new(JsonRpcInterface::new(datastore_path.clone(), nickname.unwrap()));
executor.spawn(listen_and_serve(settings.rpc_listen.clone(), rpc_interface)).detach();
//
//Raft
//
let net_settings = settings.net;
let seen_net_msgs = Arc::new(Mutex::new(vec![]));
@@ -175,16 +246,18 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
executor
.spawn(start_sync_loop(
rpc_rcv,
broadcast_rcv,
raft.get_msgs_channel(),
raft.get_commits_channel(),
datastore_path,
datastore_path.clone(),
secret_key,
rng,
))
.detach();
//
// P2p setup
//
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<NetMsg>();
let p2p = net::P2p::new(net_settings.into()).await;
@@ -208,15 +281,25 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
executor.spawn(p2p.clone().run(executor.clone())).detach();
//
// Watch changes in tasks files
//
let (tx, rx) = mpsc::channel();
executor.spawn(watch_files(broadcast_snd, datastore_path.clone(), (tx.clone(), rx))).detach();
//
// Waiting Exit signal
//
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "tau", "taud start() Exit Signal");
warn!(target: "tau", "Catch exit signal");
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
tx.send(DebouncedEvent::Error(notify::Error::Generic("Catch exit signal".into()), None))
.unwrap();
})
.unwrap();
// blocking
raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?;
Ok(())