bin/taud: do initial sync on the startup and boradcast the local changes

This commit is contained in:
ghassmo
2022-04-11 11:30:10 +04:00
parent d51ac9ad8a
commit 6074736a30
7 changed files with 159 additions and 157 deletions

View File

@@ -56,7 +56,7 @@ impl RequestHandler for JsonRpcInterface {
Some("get_state") => self.get_state(req.params).await,
Some("set_state") => self.set_state(req.params).await,
Some("set_comment") => self.set_comment(req.params).await,
Some("show") => self.show(req.params).await,
Some("get_by_id") => self.get_by_id(req.params).await,
Some(_) | None => {
return JsonResult::Err(jsonerr(ErrorCode::MethodNotFound, None, req.id))
}
@@ -189,10 +189,10 @@ impl JsonRpcInterface {
}
// RPCAPI:
// Show a task by id.
// --> {"jsonrpc": "2.0", "method": "show", "params": [task_id], "id": 1}
// Get a task by id.
// --> {"jsonrpc": "2.0", "method": "get_by_id", "params": [task_id], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
async fn show(&self, params: Value) -> TaudResult<Value> {
async fn get_by_id(&self, params: Value) -> TaudResult<Value> {
let args = params.as_array().unwrap();
if args.len() != 1 {

View File

@@ -1,7 +1,8 @@
use std::sync::Arc;
use async_std::sync::Arc;
use async_executor::Executor;
use clap::Parser;
use log::info;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use darkfi::{
@@ -13,7 +14,7 @@ use darkfi::{
path::get_config_path,
sleep,
},
Error, Result,
Error,
};
mod error;
@@ -25,11 +26,12 @@ mod util;
use crate::{
error::TaudResult,
jsonrpc::JsonRpcInterface,
month_tasks::MonthTasks,
task_info::TaskInfo,
util::{CliTaud, Settings, TauConfig, CONFIG_FILE_CONTENTS},
};
async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> Result<()> {
async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()> {
let p2p_settings = P2pSettings {
inbound: settings.accept_address,
outbound_connections: settings.outbound_connections,
@@ -46,6 +48,8 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> Result<()> {
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
let initial_sync_commits = raft.get_commits().clone();
let initial_sync_raft_sender = raft.get_broadcast().clone();
//
// RPC
@@ -60,48 +64,62 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> Result<()> {
let (rpc_snd, rpc_rcv) = async_channel::unbounded::<Option<TaskInfo>>();
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path));
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path.clone()));
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
let task_info = rpc_rcv.recv().await.map_err(Error::from)?;
if let Some(tk) = task_info {
tk.save()?;
raft_sender.send(tk).await.map_err(Error::from)?;
}
// XXX THIS FOR DEBUGING
sleep(1).await;
let recv_commits = commits.lock().await;
for task_info in recv_commits.iter() {
task_info.save()?;
if task_info.get_state() == "open" {
task_info.activate()?;
} else {
let mut mt = task_info.get_month_task()?;
mt.remove(&task_info.get_ref_id());
}
}
}
});
let initial_sync: smol::Task<TaudResult<()>> = executor.spawn(async move {
info!("Start initial sync waiting the network for 5 seconds");
sleep(5).await;
info!("Save received tasks");
let recv_commits = initial_sync_commits.lock().await;
for task_info in recv_commits.iter() {
task_info.save()?;
}
info!("Upload local tasks");
let tasks = MonthTasks::load_current_open_tasks(&settings.dataset_path)?;
for task in tasks {
initial_sync_raft_sender.send(task).await.map_err(Error::from)?;
}
Ok(())
});
let ex2 = executor.clone();
ex2.spawn(listen_and_serve(server_config, rpc_interface, executor.clone())).detach();
// blocking
raft.start(p2p_settings.clone(), executor.clone()).await?;
recv_update_from_raft.cancel().await;
initial_sync.cancel().await;
Ok(())
}
#[async_std::main]
async fn main() -> Result<()> {
async fn main() -> TaudResult<()> {
let args = CliTaud::parse();
let (lvl, conf) = log_config(args.verbose.into())?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto).map_err(Error::from)?;
let config_path = get_config_path(args.config.clone(), "taud_config.toml")?;
spawn_config(&config_path, CONFIG_FILE_CONTENTS)?;
@@ -113,95 +131,3 @@ async fn main() -> Result<()> {
let ex = Arc::new(Executor::new());
smol::block_on(ex.run(start(settings, ex.clone())))
}
#[cfg(test)]
mod tests {
use std::{
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
};
use super::*;
use crate::{
error::TaudResult, month_tasks::MonthTasks, task_info::TaskInfo, util::get_current_time,
};
const TEST_DATA_PATH: &str = "/tmp/test_tau_data";
fn get_path() -> Result<PathBuf> {
remove_dir_all(TEST_DATA_PATH).ok();
let path = PathBuf::from(TEST_DATA_PATH);
// mkdir dataset_path if not exists
create_dir_all(path.join("month"))?;
create_dir_all(path.join("task"))?;
Ok(path)
}
#[test]
fn load_and_save_tasks() -> TaudResult<()> {
let dataset_path = get_path()?;
// load and save TaskInfo
///////////////////////
let mut task = TaskInfo::new("test_title", "test_desc", None, 0.0, &dataset_path)?;
task.save()?;
let t_load = TaskInfo::load(&task.get_ref_id(), &dataset_path)?;
assert_eq!(task, t_load);
task.set_title("test_title_2");
task.save()?;
let t_load = TaskInfo::load(&task.get_ref_id(), &dataset_path)?;
assert_eq!(task, t_load);
// load and save MonthTasks
///////////////////////
let task_tks = vec![];
let mut mt = MonthTasks::new(&task_tks, &dataset_path);
mt.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert_eq!(mt, mt_load);
mt.add(&task.get_ref_id());
mt.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert_eq!(mt, mt_load);
// activate task
///////////////////////
let task = TaskInfo::new("test_title_3", "test_desc", None, 0.0, &dataset_path)?;
task.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert!(!mt_load.get_task_tks().contains(&task.get_ref_id()));
task.activate()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert!(mt_load.get_task_tks().contains(&task.get_ref_id()));
remove_dir_all(TEST_DATA_PATH).ok();
Ok(())
}
}

View File

@@ -53,10 +53,6 @@ impl MonthTasks {
self.created_at = date.clone();
}
pub fn get_task_tks(&self) -> Vec<String> {
self.task_tks.clone()
}
fn get_path(date: &Timestamp, dataset_path: &Path) -> PathBuf {
dataset_path.join("month").join(Utc.timestamp(date.0, 0).format("%m%y").to_string())
}
@@ -86,3 +82,93 @@ impl MonthTasks {
Ok(mt.objects()?.into_iter().filter(|t| t.get_state() != "stop").collect())
}
}
#[cfg(test)]
mod tests {
use std::{
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
};
use super::*;
use darkfi::Result;
const TEST_DATA_PATH: &str = "/tmp/test_tau_data";
fn get_path() -> Result<PathBuf> {
remove_dir_all(TEST_DATA_PATH).ok();
let path = PathBuf::from(TEST_DATA_PATH);
// mkdir dataset_path if not exists
create_dir_all(path.join("month"))?;
create_dir_all(path.join("task"))?;
Ok(path)
}
#[test]
fn load_and_save_tasks() -> TaudResult<()> {
let dataset_path = get_path()?;
// load and save TaskInfo
///////////////////////
let mut task = TaskInfo::new("test_title", "test_desc", None, 0.0, &dataset_path)?;
task.save()?;
let t_load = TaskInfo::load(&task.ref_id, &dataset_path)?;
assert_eq!(task, t_load);
task.set_title("test_title_2");
task.save()?;
let t_load = TaskInfo::load(&task.ref_id, &dataset_path)?;
assert_eq!(task, t_load);
// load and save MonthTasks
///////////////////////
let task_tks = vec![];
let mut mt = MonthTasks::new(&task_tks, &dataset_path);
mt.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert_eq!(mt, mt_load);
mt.add(&task.ref_id);
mt.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert_eq!(mt, mt_load);
// activate task
///////////////////////
let task = TaskInfo::new("test_title_3", "test_desc", None, 0.0, &dataset_path)?;
task.save()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert!(!mt_load.task_tks.contains(&task.ref_id));
task.activate()?;
let mt_load = MonthTasks::load_or_create(&get_current_time(), &dataset_path)?;
assert!(mt_load.task_tks.contains(&task.ref_id));
remove_dir_all(TEST_DATA_PATH).ok();
Ok(())
}
}

View File

@@ -51,7 +51,7 @@ pub struct TaskAssigns(Vec<String>);
#[derive(Clone, Debug, Serialize, Deserialize, SerialEncodable, SerialDecodable, PartialEq)]
pub struct TaskInfo {
ref_id: String,
pub(crate) ref_id: String,
id: u32,
title: String,
desc: String,
@@ -114,7 +114,15 @@ impl TaskInfo {
pub fn save(&self) -> TaudResult<()> {
crate::util::save::<Self>(&Self::get_path(&self.ref_id, &self.dataset_path), self)
.map_err(TaudError::Darkfi)
.map_err(TaudError::Darkfi)?;
if self.get_state() != "open" {
self.deactivate()?;
} else {
self.activate()?;
}
Ok(())
}
pub fn activate(&self) -> TaudResult<()> {
@@ -123,8 +131,10 @@ impl TaskInfo {
mt.save()
}
pub fn get_month_task(&self) -> TaudResult<MonthTasks> {
MonthTasks::load_or_create(&self.created_at, &self.dataset_path)
pub fn deactivate(&self) -> TaudResult<()> {
let mut mt = MonthTasks::load_or_create(&self.created_at, &self.dataset_path)?;
mt.remove(&self.ref_id);
mt.save()
}
pub fn get_state(&self) -> String {
@@ -143,10 +153,6 @@ impl TaskInfo {
self.id
}
pub fn get_ref_id(&self) -> String {
self.ref_id.clone()
}
pub fn set_title(&mut self, title: &str) {
self.title = title.into();
}

View File

@@ -19,7 +19,7 @@ use darkfi::{
Error, Result,
};
pub const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../taud_config.toml");
pub const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../../taud_config.toml");
pub fn random_ref_id() -> String {
thread_rng().sample_iter(&Alphanumeric).take(30).map(char::from).collect()

View File

@@ -1,31 +0,0 @@
## taud configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
# Path to the dataset
dataset_path = "~/.config/tau"
datastore_raft = "~/.config/tau.db"
# Path to DER-formatted PKCS#12 archive. (used only with tls url)
# This can be created using openssl:
# openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile chain_certs.pem
tls_identity_path = ""
# The address where taud should bind its RPC socket
[rpc_listener_url]
url="127.0.0.1:8875"
# Password for the created TLS identity or tor password
password = "FOOBAR"
[accept]
url="127.0.0.1:8875"
password = "FOOBAR"
outbound_connections = 5
[[seeds]]
url="127.0.0.1:8875"
password = "FOOBAR"
# one or more seed

View File

@@ -15,6 +15,21 @@ tls_identity_path = ""
# The address where taud should bind its RPC socket
[rpc_listener_url]
url="127.0.0.1:8875"
# Password for the created TLS identity or tor password
password = "FOOBAR"
##
## P2P network
##
### The accept address
#[accept]
#url="127.0.0.1:8822"
#password = "FOOBAR"
### Number of outbound connections
#outbound_connections = 5
### Seed node addresses
#[[seeds]]
#url="127.0.0.1:8811"
#password = "FOOBAR"