bin/ircd2: using async_daemonize macro

This commit is contained in:
ghassmo
2022-04-24 18:33:33 +03:00
parent 6453c5ea37
commit 81d80ecfab
5 changed files with 66 additions and 215 deletions

2
Cargo.lock generated
View File

@@ -2194,6 +2194,8 @@ dependencies = [
"serde_json",
"simplelog",
"smol",
"structopt",
"structopt-toml",
]
[[package]]

View File

@@ -32,3 +32,5 @@ ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-st
# Encoding and parsing
serde_json = "1.0.79"
serde = {version = "1.0.136", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.0"

View File

@@ -1,36 +1 @@
## taud configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
# Path to the datastore
datastore_raft = "~/.config/tau/tau.db"
# Path to DER-formatted PKCS#12 archive. (used only with tls url)
# This can be created using openssl:
# openssl pkcs12 -export -out identity.pfx -inkey key.pem -in cert.pem -certfile chain_certs.pem
tls_identity_path = ""
### Number of outbound connections
#outbound_connections = 5
# The address where taud should bind its RPC socket
[rpc_listener_url]
url="127.0.0.1:6667"
password = "FOOBAR"
# The address where taud should bind its RPC socket
[irc_listener_url]
url="127.0.0.1:8000"
password = "FOOBAR"
### The accept address
#[accept_address]
#url="127.0.0.1:8822"
#password = "FOOBAR"
### Seed node addresses
#[[seeds]]
#url="127.0.0.1:8811"
#password = "FOOBAR"

View File

@@ -1,20 +1,21 @@
use async_std::net::{TcpListener, TcpStream};
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use async_channel::Receiver;
use async_executor::Executor;
use clap::Parser;
use easy_parallel::Parallel;
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use log::{debug, error, info, warn};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
net,
async_daemonize, net,
raft::Raft,
rpc::rpcserver::{listen_and_serve, RpcServerConfig},
util::{
cli::{log_config, spawn_config, Config},
cli::{log_config, spawn_config},
path::get_config_path,
},
Error, Result,
@@ -29,7 +30,7 @@ use crate::{
privmsg::Privmsg,
rpc::JsonRpcInterface,
server::IrcServerConnection,
settings::{CliArgs, IrcdConfig, Settings, CONFIG_FILE_CONTENTS},
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
};
async fn process_user_input(
@@ -96,16 +97,29 @@ async fn process(
}
}
async fn start(executor: Arc<Executor<'_>>, settings: Settings) -> Result<()> {
let listener = TcpListener::bind(settings.irc_listener_url).await?;
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let listener = TcpListener::bind(settings.irc_listen).await?;
let local_addr = listener.local_addr()?;
info!("Listening on {}", local_addr);
let p2p_settings = net::Settings {
inbound: settings.accept,
outbound_connections: settings.slots,
external_addr: settings.accept,
peers: settings.connect.clone(),
seeds: settings.seeds.clone(),
..Default::default()
};
let datastore_path = PathBuf::from(&settings.datastore);
//
// Raft
//Raft
//
let mut raft =
Raft::<Privmsg>::new(settings.accept_address, std::path::PathBuf::from("msgs.db"))?;
let datastore_raft = datastore_path.join("ircd.db");
let mut raft = Raft::<Privmsg>::new(settings.accept, datastore_raft)?;
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
@@ -114,14 +128,14 @@ async fn start(executor: Arc<Executor<'_>>, settings: Settings) -> Result<()> {
// RPC interface
//
let rpc_config = RpcServerConfig {
socket_addr: settings.rpc_listener_url,
socket_addr: settings.rpc_listen,
// TODO: Use net/transport:
use_tls: false,
identity_path: Default::default(),
identity_pass: Default::default(),
};
let executor_cloned = executor.clone();
let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listener_url });
let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listen });
let rpc_task = executor.spawn(async move {
listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await
});
@@ -148,58 +162,18 @@ async fn start(executor: Arc<Executor<'_>>, settings: Settings) -> Result<()> {
}
});
let stop_signal = async_channel::bounded::<()>(10);
let net_settings = net::Settings {
inbound: settings.accept_address,
outbound_connections: settings.outbound_connections,
external_addr: settings.accept_address,
peers: settings.connect.clone(),
seeds: settings.seeds.clone(),
..Default::default()
};
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "ircd", "ircd start() Exit Signal");
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
stop_signal.0.send(()).await.expect("send exit signal to raft");
signal.send(()).await.unwrap();
rpc_task.cancel().await;
irc_task.cancel().await;
})
.expect("handle exit signal");
.unwrap();
// blocking
raft.start(net_settings.clone(), executor.clone(), stop_signal.1.clone()).await?;
raft.start(p2p_settings.clone(), executor.clone(), shutdown.clone()).await?;
Ok(())
}
fn main() -> Result<()> {
let args = CliArgs::parse();
let (lvl, conf) = log_config(args.verbose.into())?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;
let config_path = get_config_path(args.config.clone(), "ircd_config.toml")?;
spawn_config(&config_path, CONFIG_FILE_CONTENTS)?;
let config: IrcdConfig = Config::<IrcdConfig>::load(config_path)?;
let settings = Settings::load(args, config)?;
let ex = Arc::new(Executor::new());
let ex_clone = ex.clone();
let (signal, shutdown) = async_channel::unbounded::<()>();
let (_, result) = Parallel::new()
.each(0..4, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
start(ex_clone.clone(), settings).await?;
drop(signal);
Ok::<(), darkfi::Error>(())
})
});
result
}

View File

@@ -1,134 +1,42 @@
use std::{net::SocketAddr, path::PathBuf};
use std::net::SocketAddr;
use clap::Parser;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use darkfi::{
cli_desc,
util::{cli::UrlConfig, path::expand_path},
Error, Result,
};
pub const CONFIG_FILE: &str = "ircd_config.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../ircd_config.toml");
pub const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../ircd_config.toml");
#[derive(Clone, Debug)]
pub struct Settings {
pub datastore_raft: PathBuf,
pub rpc_listener_url: SocketAddr,
pub irc_listener_url: SocketAddr,
pub accept_address: Option<SocketAddr>,
pub outbound_connections: u32,
pub connect: Vec<SocketAddr>,
pub seeds: Vec<SocketAddr>,
}
impl Settings {
pub fn load(args: CliArgs, config: IrcdConfig) -> Result<Self> {
if config.datastore_raft.is_empty() {
return Err(Error::ParseFailed("Failed to parse datastore_raft path"))
}
let datastore_raft = expand_path(&config.datastore_raft)?;
let rpc_listener_url = if args.rpc.is_none() {
SocketAddr::try_from(config.rpc_listener_url)?
} else {
args.rpc.unwrap()
};
let irc_listener_url = if args.irc.is_none() {
SocketAddr::try_from(config.irc_listener_url)?
} else {
args.irc.unwrap()
};
let accept_address = if args.accept.is_none() {
match config.accept_address {
Some(addr) => {
let socket_addr = SocketAddr::try_from(addr)?;
Some(socket_addr)
}
None => None,
}
} else {
args.accept
};
let outbound_connections = if args.slots == 0 {
config.outbound_connections.unwrap_or_default()
} else {
args.slots
};
let connect = args.connect;
let config_seeds = config
.seeds
.map(|addrs| {
addrs.iter().filter_map(|addr| SocketAddr::try_from(addr.clone()).ok()).collect()
})
.unwrap_or_default();
let seeds = if args.seeds.is_empty() { config_seeds } else { args.seeds };
Ok(Settings {
datastore_raft,
rpc_listener_url,
irc_listener_url,
accept_address,
outbound_connections,
connect,
seeds,
})
}
}
#[derive(Parser)]
#[clap(name = "ircd", about = cli_desc!(), version)]
pub struct CliArgs {
/// ircd cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "ircd")]
pub struct Args {
/// Sets a custom config file
#[clap(long)]
#[structopt(long)]
pub config: Option<String>,
/// Accept address
#[clap(short, long)]
/// JSON-RPC listen URL
#[structopt(long, default_value = "127.0.0.1:8857")]
pub rpc_listen: SocketAddr,
/// IRC listen URL
#[structopt(long, default_value = "127.0.0.1:8855")]
pub irc_listen: SocketAddr,
/// Sets Datastore Path
#[structopt(long, default_value = "~/.config/tau")]
pub datastore: String,
/// Raft Accept address
#[structopt(short, long)]
pub accept: Option<SocketAddr>,
/// Seed node (repeatable)
#[clap(short, long)]
/// Raft Seed nodes (repeatable)
#[structopt(short, long)]
pub seeds: Vec<SocketAddr>,
/// Manual connection (repeatable)
#[clap(short, long)]
/// Raft Manual connection (repeatable)
#[structopt(short, long)]
pub connect: Vec<SocketAddr>,
/// Connection slots
#[clap(long, default_value = "0")]
/// Raft Connection slots
#[structopt(long, default_value = "0")]
pub slots: u32,
/// External address
#[clap(short, long)]
pub external: Option<SocketAddr>,
/// IRC listen address
#[clap(short = 'r', long)]
pub irc: Option<SocketAddr>,
/// RPC listen address
#[clap(long)]
pub rpc: Option<SocketAddr>,
/// Verbosity level
#[clap(short, parse(from_occurrences))]
/// Increase verbosity
#[structopt(short, parse(from_occurrences))]
pub verbose: u8,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IrcdConfig {
/// path to datastore for raft
pub datastore_raft: String,
/// Path to DER-formatted PKCS#12 archive. (used only with tls listener url)
pub tls_identity_path: String,
/// The address where taud should bind its RPC socket
pub rpc_listener_url: UrlConfig,
/// IRC listen address
pub irc_listener_url: UrlConfig,
/// Accept address for p2p network
pub accept_address: Option<UrlConfig>,
/// Number of outbound connections for p2p
pub outbound_connections: Option<u32>,
/// The seeds for receiving ip addresses from the p2p network
pub seeds: Option<Vec<UrlConfig>>,
}