mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
bin/taud: using async_daemonize macro
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -3838,7 +3838,7 @@ dependencies = [
|
||||
"ctrlc-async",
|
||||
"darkfi",
|
||||
"easy-parallel",
|
||||
"futures",
|
||||
"futures-lite",
|
||||
"log",
|
||||
"num_cpus",
|
||||
"rand",
|
||||
@@ -3846,6 +3846,8 @@ dependencies = [
|
||||
"serde_json",
|
||||
"simplelog",
|
||||
"smol",
|
||||
"structopt",
|
||||
"structopt-toml",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
|
||||
@@ -10,12 +10,12 @@ darkfi = { path = "../../../", features = ["rpc", "raft", "net"]}
|
||||
|
||||
# Async
|
||||
smol = "1.2.5"
|
||||
futures = "0.3.21"
|
||||
async-std = {version = "1.11.0", features = ["attributes"]}
|
||||
async-trait = "0.1.53"
|
||||
async-channel = "1.6.1"
|
||||
async-executor = "1.4.1"
|
||||
easy-parallel = "3.2.0"
|
||||
futures-lite = "1.12.0"
|
||||
|
||||
# Misc
|
||||
clap = {version = "3.1.12", features = ["derive"]}
|
||||
@@ -30,3 +30,5 @@ ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-st
|
||||
# Encoding and parsing
|
||||
serde = {version = "1.0.136", features = ["derive"]}
|
||||
serde_json = "1.0.79"
|
||||
structopt = "0.3.26"
|
||||
structopt-toml = "0.5.0"
|
||||
|
||||
@@ -1,19 +1,23 @@
|
||||
use async_std::sync::Arc;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use async_executor::Executor;
|
||||
use clap::Parser;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use log::{info, warn};
|
||||
use simplelog::{ColorChoice, TermLogger, TerminalMode};
|
||||
use structopt_toml::StructOptToml;
|
||||
|
||||
use darkfi::{
|
||||
async_daemonize,
|
||||
net::Settings as P2pSettings,
|
||||
raft::Raft,
|
||||
rpc::rpcserver::{listen_and_serve, RpcServerConfig},
|
||||
util::{
|
||||
cli::{log_config, spawn_config, Config},
|
||||
cli::{log_config, spawn_config},
|
||||
path::get_config_path,
|
||||
},
|
||||
Error,
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
mod error;
|
||||
@@ -27,24 +31,28 @@ use crate::{
|
||||
error::TaudResult,
|
||||
jsonrpc::JsonRpcInterface,
|
||||
month_tasks::MonthTasks,
|
||||
settings::{CliTaud, Settings, TauConfig, CONFIG_FILE_CONTENTS},
|
||||
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
|
||||
task_info::TaskInfo,
|
||||
};
|
||||
|
||||
async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()> {
|
||||
async_daemonize!(realmain);
|
||||
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
let p2p_settings = P2pSettings {
|
||||
inbound: settings.accept_address,
|
||||
outbound_connections: settings.outbound_connections,
|
||||
external_addr: settings.accept_address,
|
||||
inbound: settings.accept,
|
||||
outbound_connections: settings.slots,
|
||||
external_addr: settings.accept,
|
||||
peers: settings.connect.clone(),
|
||||
seeds: settings.seeds.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let datastore_path = PathBuf::from(&settings.datastore);
|
||||
|
||||
//
|
||||
//Raft
|
||||
//
|
||||
let mut raft = Raft::<TaskInfo>::new(settings.accept_address, settings.datastore_raft.clone())?;
|
||||
let datastore_raft = datastore_path.join("tau.db");
|
||||
let mut raft = Raft::<TaskInfo>::new(settings.accept, datastore_raft)?;
|
||||
|
||||
let raft_sender = raft.get_broadcast();
|
||||
let commits = raft.get_commits();
|
||||
@@ -54,7 +62,7 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()
|
||||
// RPC
|
||||
//
|
||||
let server_config = RpcServerConfig {
|
||||
socket_addr: settings.rpc_listener_url,
|
||||
socket_addr: settings.rpc_listen,
|
||||
use_tls: false,
|
||||
// this is all random filler that is meaningless bc tls is disabled
|
||||
identity_path: Default::default(),
|
||||
@@ -63,34 +71,33 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()
|
||||
|
||||
let (rpc_snd, rpc_rcv) = async_channel::unbounded::<Option<TaskInfo>>();
|
||||
|
||||
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path.clone()));
|
||||
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, datastore_path.clone()));
|
||||
|
||||
let dataset_path_cloned = settings.dataset_path.clone();
|
||||
let datastore_path_cloned = datastore_path.clone();
|
||||
let recv_update_from_rpc: smol::Task<TaudResult<()>> = executor.spawn(async move {
|
||||
loop {
|
||||
let task_info = rpc_rcv.recv().await.map_err(Error::from)?;
|
||||
if let Some(tk) = task_info {
|
||||
info!(target: "tau", "save the received task {:?}", tk);
|
||||
tk.save(&dataset_path_cloned)?;
|
||||
tk.save(&datastore_path_cloned)?;
|
||||
raft_sender.send(tk).await.map_err(Error::from)?;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let dataset_path_cloned = settings.dataset_path.clone();
|
||||
let datastore_path_cloned = datastore_path.clone();
|
||||
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
|
||||
loop {
|
||||
let task = commits.recv().await.map_err(Error::from)?;
|
||||
info!(target: "tau", "receive update from the commits {:?}", task);
|
||||
task.save(&dataset_path_cloned)?;
|
||||
task.save(&datastore_path_cloned)?;
|
||||
}
|
||||
});
|
||||
|
||||
let dataset_path_cloned = settings.dataset_path.clone();
|
||||
let initial_sync: smol::Task<TaudResult<()>> = executor.spawn(async move {
|
||||
info!(target: "tau", "Start initial sync");
|
||||
info!(target: "tau", "Upload local tasks");
|
||||
let tasks = MonthTasks::load_current_open_tasks(&dataset_path_cloned)?;
|
||||
let tasks = MonthTasks::load_current_open_tasks(&datastore_path)?;
|
||||
|
||||
for task in tasks {
|
||||
info!(target: "tau", "send local task {:?}", task);
|
||||
@@ -103,39 +110,20 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()
|
||||
let rpc_listener_taks =
|
||||
executor_cloned.spawn(listen_and_serve(server_config, rpc_interface, executor.clone()));
|
||||
|
||||
let stop_signal = async_channel::bounded::<()>(10);
|
||||
|
||||
let (signal, shutdown) = async_channel::bounded::<()>(1);
|
||||
ctrlc_async::set_async_handler(async move {
|
||||
warn!(target: "tau", "taud start() Exit Signal");
|
||||
// cleaning up tasks running in the background
|
||||
signal.send(()).await.unwrap();
|
||||
rpc_listener_taks.cancel().await;
|
||||
stop_signal.0.send(()).await.expect("send exit signal to raft");
|
||||
recv_update_from_rpc.cancel().await;
|
||||
recv_update_from_raft.cancel().await;
|
||||
initial_sync.cancel().await;
|
||||
})
|
||||
.expect("handle exit signal");
|
||||
.unwrap();
|
||||
|
||||
// blocking
|
||||
raft.start(p2p_settings.clone(), executor.clone(), stop_signal.1.clone()).await?;
|
||||
raft.start(p2p_settings.clone(), executor.clone(), shutdown.clone()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> TaudResult<()> {
|
||||
let args = CliTaud::parse();
|
||||
|
||||
let (lvl, conf) = log_config(args.verbose.into())?;
|
||||
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto).map_err(Error::from)?;
|
||||
|
||||
let config_path = get_config_path(args.config.clone(), "taud_config.toml")?;
|
||||
spawn_config(&config_path, CONFIG_FILE_CONTENTS)?;
|
||||
|
||||
let config: TauConfig = Config::<TauConfig>::load(config_path)?;
|
||||
|
||||
let settings = Settings::load(args, config)?;
|
||||
|
||||
let ex = Arc::new(Executor::new());
|
||||
smol::block_on(ex.run(start(settings, ex.clone())))
|
||||
}
|
||||
|
||||
@@ -1,160 +1,39 @@
|
||||
use std::{fs::create_dir_all, net::SocketAddr, path::PathBuf};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Deserialize;
|
||||
use structopt::StructOpt;
|
||||
use structopt_toml::StructOptToml;
|
||||
|
||||
use darkfi::{
|
||||
util::{
|
||||
cli::UrlConfig,
|
||||
expand_path,
|
||||
serial::{SerialDecodable, SerialEncodable},
|
||||
},
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
pub const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../../taud_config.toml");
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Settings {
|
||||
pub dataset_path: PathBuf,
|
||||
pub datastore_raft: PathBuf,
|
||||
pub rpc_listener_url: SocketAddr,
|
||||
pub accept_address: Option<SocketAddr>,
|
||||
pub outbound_connections: u32,
|
||||
pub connect: Vec<SocketAddr>,
|
||||
pub seeds: Vec<SocketAddr>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub fn load(args: CliTaud, config: TauConfig) -> Result<Self> {
|
||||
if config.dataset_path.is_empty() {
|
||||
return Err(Error::ParseFailed("Failed to parse dataset_path"))
|
||||
}
|
||||
|
||||
let dataset_path = expand_path(&config.dataset_path)?;
|
||||
|
||||
// mkdir dataset_path if not exists
|
||||
create_dir_all(dataset_path.join("month"))?;
|
||||
create_dir_all(dataset_path.join("task"))?;
|
||||
|
||||
if config.datastore_raft.is_empty() {
|
||||
return Err(Error::ParseFailed("Failed to parse datastore_raft path"))
|
||||
}
|
||||
|
||||
let datastore_raft = expand_path(&config.datastore_raft)?;
|
||||
|
||||
let rpc_listener_url = SocketAddr::try_from(config.rpc_listener_url)?;
|
||||
|
||||
let accept_address = if args.accept.is_none() {
|
||||
match config.accept_address {
|
||||
Some(addr) => {
|
||||
let socket_addr = SocketAddr::try_from(addr)?;
|
||||
Some(socket_addr)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
args.accept
|
||||
};
|
||||
|
||||
let outbound_connections = if args.slots == 0 {
|
||||
config.outbound_connections.unwrap_or_default()
|
||||
} else {
|
||||
args.slots
|
||||
};
|
||||
|
||||
let connect = args.connect;
|
||||
|
||||
let config_seeds = config
|
||||
.seeds
|
||||
.map(|addrs| {
|
||||
addrs.iter().filter_map(|addr| SocketAddr::try_from(addr.clone()).ok()).collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let seeds = if args.seeds.is_empty() { config_seeds } else { args.seeds };
|
||||
|
||||
Ok(Settings {
|
||||
dataset_path,
|
||||
datastore_raft,
|
||||
rpc_listener_url,
|
||||
accept_address,
|
||||
outbound_connections,
|
||||
connect,
|
||||
seeds,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone, Debug, Serialize, Deserialize, SerialEncodable, SerialDecodable, PartialEq, PartialOrd,
|
||||
)]
|
||||
pub struct Timestamp(pub i64);
|
||||
pub const CONFIG_FILE: &str = "taud_config.toml";
|
||||
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../../taud_config.toml");
|
||||
|
||||
/// taud cli
|
||||
#[derive(Parser)]
|
||||
#[clap(name = "taud")]
|
||||
pub struct CliTaud {
|
||||
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
|
||||
#[serde(default)]
|
||||
#[structopt(name = "taud")]
|
||||
pub struct Args {
|
||||
/// Sets a custom config file
|
||||
#[clap(long)]
|
||||
#[structopt(long)]
|
||||
pub config: Option<String>,
|
||||
/// JSON-RPC listen URL
|
||||
#[structopt(long, default_value = "127.0.0.1:8857")]
|
||||
pub rpc_listen: SocketAddr,
|
||||
/// Sets Datastore Path
|
||||
#[structopt(long, default_value = "~/.config/tau")]
|
||||
pub datastore: String,
|
||||
/// Raft Accept address
|
||||
#[clap(short, long)]
|
||||
#[structopt(short, long)]
|
||||
pub accept: Option<SocketAddr>,
|
||||
/// Raft Seed nodes (repeatable)
|
||||
#[clap(short, long)]
|
||||
#[structopt(short, long)]
|
||||
pub seeds: Vec<SocketAddr>,
|
||||
/// Raft Manual connection (repeatable)
|
||||
#[clap(short, long)]
|
||||
#[structopt(short, long)]
|
||||
pub connect: Vec<SocketAddr>,
|
||||
/// Raft Connection slots
|
||||
#[clap(long, default_value = "0")]
|
||||
#[structopt(long, default_value = "0")]
|
||||
pub slots: u32,
|
||||
/// Increase verbosity
|
||||
#[clap(short, parse(from_occurrences))]
|
||||
#[structopt(short, parse(from_occurrences))]
|
||||
pub verbose: u8,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TauConfig {
|
||||
/// path to dataset
|
||||
pub dataset_path: String,
|
||||
/// path to datastore for raft
|
||||
pub datastore_raft: String,
|
||||
/// Path to DER-formatted PKCS#12 archive. (used only with tls listener url)
|
||||
pub tls_identity_path: String,
|
||||
/// The address where taud should bind its RPC socket
|
||||
pub rpc_listener_url: UrlConfig,
|
||||
/// Accept address for p2p network
|
||||
pub accept_address: Option<UrlConfig>,
|
||||
/// Number of outbound connections for p2p
|
||||
pub outbound_connections: Option<u32>,
|
||||
/// The seeds for receiving ip addresses from the p2p network
|
||||
pub seeds: Option<Vec<UrlConfig>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn find_free_id_test() -> Result<()> {
|
||||
let mut ids: Vec<u32> = vec![1, 3, 8, 9, 10, 3];
|
||||
let ids_empty: Vec<u32> = vec![];
|
||||
let ids_duplicate: Vec<u32> = vec![1; 100];
|
||||
|
||||
let find_id = find_free_id(&ids);
|
||||
|
||||
assert_eq!(find_id, 2);
|
||||
|
||||
ids.push(find_id);
|
||||
|
||||
assert_eq!(find_free_id(&ids), 4);
|
||||
|
||||
assert_eq!(find_free_id(&ids_empty), 1);
|
||||
|
||||
assert_eq!(find_free_id(&ids_duplicate), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +1 @@
|
||||
## taud configuration file
|
||||
##
|
||||
## Please make sure you go through all the settings so you can configure
|
||||
## your daemon properly.
|
||||
|
||||
# Path to the dataset
|
||||
dataset_path = "~/.config/tau"
|
||||
datastore_raft = "~/.config/tau/tau.db"
|
||||
|
||||
# Path to DER-formatted PKCS#12 archive. (used only with tls url)
|
||||
# This can be created using openssl:
|
||||
# openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile chain_certs.pem
|
||||
tls_identity_path = ""
|
||||
|
||||
### Number of outbound connections
|
||||
#outbound_connections = 5
|
||||
|
||||
# The address where taud should bind its RPC socket
|
||||
[rpc_listener_url]
|
||||
url="127.0.0.1:8875"
|
||||
password = "FOOBAR"
|
||||
|
||||
### The accept address
|
||||
#[accept_address]
|
||||
#url="127.0.0.1:8822"
|
||||
#password = "FOOBAR"
|
||||
|
||||
### Seed node addresses
|
||||
#[[seeds]]
|
||||
#url="127.0.0.1:8811"
|
||||
#password = "FOOBAR"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user