diff --git a/Cargo.lock b/Cargo.lock index c2dd0eb5a..083d63033 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3838,7 +3838,7 @@ dependencies = [ "ctrlc-async", "darkfi", "easy-parallel", - "futures", + "futures-lite", "log", "num_cpus", "rand", @@ -3846,6 +3846,8 @@ dependencies = [ "serde_json", "simplelog", "smol", + "structopt", + "structopt-toml", "thiserror", ] diff --git a/bin/tau/taud/Cargo.toml b/bin/tau/taud/Cargo.toml index 337c093d9..9ad0238fd 100644 --- a/bin/tau/taud/Cargo.toml +++ b/bin/tau/taud/Cargo.toml @@ -10,12 +10,12 @@ darkfi = { path = "../../../", features = ["rpc", "raft", "net"]} # Async smol = "1.2.5" -futures = "0.3.21" async-std = {version = "1.11.0", features = ["attributes"]} async-trait = "0.1.53" async-channel = "1.6.1" async-executor = "1.4.1" easy-parallel = "3.2.0" +futures-lite = "1.12.0" # Misc clap = {version = "3.1.12", features = ["derive"]} @@ -30,3 +30,5 @@ ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-st # Encoding and parsing serde = {version = "1.0.136", features = ["derive"]} serde_json = "1.0.79" +structopt = "0.3.26" +structopt-toml = "0.5.0" diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 7bc6a2181..a1d0887ec 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -1,19 +1,23 @@ use async_std::sync::Arc; +use std::path::PathBuf; use async_executor::Executor; -use clap::Parser; +use easy_parallel::Parallel; +use futures_lite::future; use log::{info, warn}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use structopt_toml::StructOptToml; use darkfi::{ + async_daemonize, net::Settings as P2pSettings, raft::Raft, rpc::rpcserver::{listen_and_serve, RpcServerConfig}, util::{ - cli::{log_config, spawn_config, Config}, + cli::{log_config, spawn_config}, path::get_config_path, }, - Error, + Error, Result, }; mod error; @@ -27,24 +31,28 @@ use crate::{ error::TaudResult, jsonrpc::JsonRpcInterface, month_tasks::MonthTasks, - settings::{CliTaud, Settings, TauConfig, CONFIG_FILE_CONTENTS}, + settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, task_info::TaskInfo, }; -async fn start(settings: Settings, executor: Arc>) -> TaudResult<()> { +async_daemonize!(realmain); +async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let p2p_settings = P2pSettings { - inbound: settings.accept_address, - outbound_connections: settings.outbound_connections, - external_addr: settings.accept_address, + inbound: settings.accept, + outbound_connections: settings.slots, + external_addr: settings.accept, peers: settings.connect.clone(), seeds: settings.seeds.clone(), ..Default::default() }; + let datastore_path = PathBuf::from(&settings.datastore); + // //Raft // - let mut raft = Raft::::new(settings.accept_address, settings.datastore_raft.clone())?; + let datastore_raft = datastore_path.join("tau.db"); + let mut raft = Raft::::new(settings.accept, datastore_raft)?; let raft_sender = raft.get_broadcast(); let commits = raft.get_commits(); @@ -54,7 +62,7 @@ async fn start(settings: Settings, executor: Arc>) -> TaudResult<() // RPC // let server_config = RpcServerConfig { - socket_addr: settings.rpc_listener_url, + socket_addr: settings.rpc_listen, use_tls: false, // this is all random filler that is meaningless bc tls is disabled identity_path: Default::default(), @@ -63,34 +71,33 @@ async fn start(settings: Settings, executor: Arc>) -> TaudResult<() let (rpc_snd, rpc_rcv) = async_channel::unbounded::>(); - let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path.clone())); + let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, datastore_path.clone())); - let dataset_path_cloned = settings.dataset_path.clone(); + let datastore_path_cloned = datastore_path.clone(); let recv_update_from_rpc: smol::Task> = executor.spawn(async move { loop { let task_info = rpc_rcv.recv().await.map_err(Error::from)?; if let Some(tk) = task_info { info!(target: "tau", "save the received task {:?}", tk); - tk.save(&dataset_path_cloned)?; + tk.save(&datastore_path_cloned)?; raft_sender.send(tk).await.map_err(Error::from)?; } } }); - let dataset_path_cloned = settings.dataset_path.clone(); + let datastore_path_cloned = datastore_path.clone(); let recv_update_from_raft: smol::Task> = executor.spawn(async move { loop { let task = commits.recv().await.map_err(Error::from)?; info!(target: "tau", "receive update from the commits {:?}", task); - task.save(&dataset_path_cloned)?; + task.save(&datastore_path_cloned)?; } }); - let dataset_path_cloned = settings.dataset_path.clone(); let initial_sync: smol::Task> = executor.spawn(async move { info!(target: "tau", "Start initial sync"); info!(target: "tau", "Upload local tasks"); - let tasks = MonthTasks::load_current_open_tasks(&dataset_path_cloned)?; + let tasks = MonthTasks::load_current_open_tasks(&datastore_path)?; for task in tasks { info!(target: "tau", "send local task {:?}", task); @@ -103,39 +110,20 @@ async fn start(settings: Settings, executor: Arc>) -> TaudResult<() let rpc_listener_taks = executor_cloned.spawn(listen_and_serve(server_config, rpc_interface, executor.clone())); - let stop_signal = async_channel::bounded::<()>(10); - + let (signal, shutdown) = async_channel::bounded::<()>(1); ctrlc_async::set_async_handler(async move { warn!(target: "tau", "taud start() Exit Signal"); // cleaning up tasks running in the background + signal.send(()).await.unwrap(); rpc_listener_taks.cancel().await; - stop_signal.0.send(()).await.expect("send exit signal to raft"); recv_update_from_rpc.cancel().await; recv_update_from_raft.cancel().await; initial_sync.cancel().await; }) - .expect("handle exit signal"); + .unwrap(); // blocking - raft.start(p2p_settings.clone(), executor.clone(), stop_signal.1.clone()).await?; + raft.start(p2p_settings.clone(), executor.clone(), shutdown.clone()).await?; Ok(()) } - -#[async_std::main] -async fn main() -> TaudResult<()> { - let args = CliTaud::parse(); - - let (lvl, conf) = log_config(args.verbose.into())?; - TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto).map_err(Error::from)?; - - let config_path = get_config_path(args.config.clone(), "taud_config.toml")?; - spawn_config(&config_path, CONFIG_FILE_CONTENTS)?; - - let config: TauConfig = Config::::load(config_path)?; - - let settings = Settings::load(args, config)?; - - let ex = Arc::new(Executor::new()); - smol::block_on(ex.run(start(settings, ex.clone()))) -} diff --git a/bin/tau/taud/src/settings.rs b/bin/tau/taud/src/settings.rs index 338bf9f75..a07627f3e 100644 --- a/bin/tau/taud/src/settings.rs +++ b/bin/tau/taud/src/settings.rs @@ -1,160 +1,39 @@ -use std::{fs::create_dir_all, net::SocketAddr, path::PathBuf}; +use std::net::SocketAddr; -use clap::Parser; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use structopt::StructOpt; +use structopt_toml::StructOptToml; -use darkfi::{ - util::{ - cli::UrlConfig, - expand_path, - serial::{SerialDecodable, SerialEncodable}, - }, - Error, Result, -}; - -pub const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../../taud_config.toml"); - -#[derive(Clone, Debug)] -pub struct Settings { - pub dataset_path: PathBuf, - pub datastore_raft: PathBuf, - pub rpc_listener_url: SocketAddr, - pub accept_address: Option, - pub outbound_connections: u32, - pub connect: Vec, - pub seeds: Vec, -} - -impl Settings { - pub fn load(args: CliTaud, config: TauConfig) -> Result { - if config.dataset_path.is_empty() { - return Err(Error::ParseFailed("Failed to parse dataset_path")) - } - - let dataset_path = expand_path(&config.dataset_path)?; - - // mkdir dataset_path if not exists - create_dir_all(dataset_path.join("month"))?; - create_dir_all(dataset_path.join("task"))?; - - if config.datastore_raft.is_empty() { - return Err(Error::ParseFailed("Failed to parse datastore_raft path")) - } - - let datastore_raft = expand_path(&config.datastore_raft)?; - - let rpc_listener_url = SocketAddr::try_from(config.rpc_listener_url)?; - - let accept_address = if args.accept.is_none() { - match config.accept_address { - Some(addr) => { - let socket_addr = SocketAddr::try_from(addr)?; - Some(socket_addr) - } - None => None, - } - } else { - args.accept - }; - - let outbound_connections = if args.slots == 0 { - config.outbound_connections.unwrap_or_default() - } else { - args.slots - }; - - let connect = args.connect; - - let config_seeds = config - .seeds - .map(|addrs| { - addrs.iter().filter_map(|addr| SocketAddr::try_from(addr.clone()).ok()).collect() - }) - .unwrap_or_default(); - - let seeds = if args.seeds.is_empty() { config_seeds } else { args.seeds }; - - Ok(Settings { - dataset_path, - datastore_raft, - rpc_listener_url, - accept_address, - outbound_connections, - connect, - seeds, - }) - } -} - -#[derive( - Clone, Debug, Serialize, Deserialize, SerialEncodable, SerialDecodable, PartialEq, PartialOrd, -)] -pub struct Timestamp(pub i64); +pub const CONFIG_FILE: &str = "taud_config.toml"; +pub const CONFIG_FILE_CONTENTS: &str = include_str!("../../taud_config.toml"); /// taud cli -#[derive(Parser)] -#[clap(name = "taud")] -pub struct CliTaud { +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "taud")] +pub struct Args { /// Sets a custom config file - #[clap(long)] + #[structopt(long)] pub config: Option, + /// JSON-RPC listen URL + #[structopt(long, default_value = "127.0.0.1:8857")] + pub rpc_listen: SocketAddr, + /// Sets Datastore Path + #[structopt(long, default_value = "~/.config/tau")] + pub datastore: String, /// Raft Accept address - #[clap(short, long)] + #[structopt(short, long)] pub accept: Option, /// Raft Seed nodes (repeatable) - #[clap(short, long)] + #[structopt(short, long)] pub seeds: Vec, /// Raft Manual connection (repeatable) - #[clap(short, long)] + #[structopt(short, long)] pub connect: Vec, /// Raft Connection slots - #[clap(long, default_value = "0")] + #[structopt(long, default_value = "0")] pub slots: u32, /// Increase verbosity - #[clap(short, parse(from_occurrences))] + #[structopt(short, parse(from_occurrences))] pub verbose: u8, } - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TauConfig { - /// path to dataset - pub dataset_path: String, - /// path to datastore for raft - pub datastore_raft: String, - /// Path to DER-formatted PKCS#12 archive. (used only with tls listener url) - pub tls_identity_path: String, - /// The address where taud should bind its RPC socket - pub rpc_listener_url: UrlConfig, - /// Accept address for p2p network - pub accept_address: Option, - /// Number of outbound connections for p2p - pub outbound_connections: Option, - /// The seeds for receiving ip addresses from the p2p network - pub seeds: Option>, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn find_free_id_test() -> Result<()> { - let mut ids: Vec = vec![1, 3, 8, 9, 10, 3]; - let ids_empty: Vec = vec![]; - let ids_duplicate: Vec = vec![1; 100]; - - let find_id = find_free_id(&ids); - - assert_eq!(find_id, 2); - - ids.push(find_id); - - assert_eq!(find_free_id(&ids), 4); - - assert_eq!(find_free_id(&ids_empty), 1); - - assert_eq!(find_free_id(&ids_duplicate), 2); - - Ok(()) - } -} diff --git a/bin/tau/taud_config.toml b/bin/tau/taud_config.toml index 8d620e99d..8b1378917 100644 --- a/bin/tau/taud_config.toml +++ b/bin/tau/taud_config.toml @@ -1,32 +1 @@ -## taud configuration file -## -## Please make sure you go through all the settings so you can configure -## your daemon properly. - -# Path to the dataset -dataset_path = "~/.config/tau" -datastore_raft = "~/.config/tau/tau.db" - -# Path to DER-formatted PKCS#12 archive. (used only with tls url) -# This can be created using openssl: -# openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile chain_certs.pem -tls_identity_path = "" - -### Number of outbound connections -#outbound_connections = 5 - -# The address where taud should bind its RPC socket -[rpc_listener_url] -url="127.0.0.1:8875" -password = "FOOBAR" - -### The accept address -#[accept_address] -#url="127.0.0.1:8822" -#password = "FOOBAR" - -### Seed node addresses -#[[seeds]] -#url="127.0.0.1:8811" -#password = "FOOBAR"