diff --git a/Cargo.lock b/Cargo.lock index 792b89bc9..42161d3af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,12 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "array-init" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6945cc5422176fc5e602e590c2878d2c2acd9a4fe20a4baa7c28022521698ec6" + [[package]] name = "arrayref" version = "0.3.6" @@ -2232,6 +2238,35 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "irc-raft" +version = "0.3.0" +dependencies = [ + "async-channel", + "async-executor", + "async-std", + "async-trait", + "bs58", + "clap 3.1.18", + "crypto_box", + "ctrlc-async", + "darkfi", + "easy-parallel", + "futures", + "futures-rustls", + "fxhash", + "log", + "rand", + "serde", + "serde_json", + "simplelog", + "smol", + "structopt", + "structopt-toml", + "toml", + "url", +] + [[package]] name = "ircd" version = "0.3.0" @@ -2251,6 +2286,7 @@ dependencies = [ "fxhash", "log", "rand", + "ringbuffer", "serde", "serde_json", "simplelog", @@ -3284,6 +3320,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "ringbuffer" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b30a00730a27595dcf899dce512aa031dd650f86aafcb132fd8dd9f409b369d0" +dependencies = [ + "array-init", +] + [[package]] name = "rkyv" version = "0.7.38" diff --git a/Cargo.toml b/Cargo.toml index fa5b235da..238a0f8f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "bin/drk", "bin/faucetd", "bin/ircd", + "bin/irc-raft", "bin/dnetview", "bin/daod", "bin/dao-cli", diff --git a/bin/irc-raft/Cargo.toml b/bin/irc-raft/Cargo.toml new file mode 100644 index 000000000..39166eabe --- /dev/null +++ b/bin/irc-raft/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "irc-raft" +version = "0.3.0" +homepage = "https://dark.fi" +description = "P2P IRC daemon With Raft Consensus" +authors = ["darkfi "] +repository = "https://github.com/darkrenaissance/darkfi" +license = "AGPL-3.0-only" +edition = "2021" + +[dependencies] +darkfi = {path = "../../", features = ["net", "rpc", "raft"]} +# Async +smol = "1.2.5" +futures = "0.3.21" +futures-rustls = "0.22.1" +async-std = "1.11.0" +async-trait = "0.1.53" +async-channel = "1.6.1" +async-executor = "1.4.1" +easy-parallel = "3.2.0" + +# Crypto +crypto_box = "0.7.2" +rand = "0.8.5" + +# Misc +clap = {version = "3.1.18", features = ["derive"]} +log = "0.4.17" +simplelog = "0.12.0" +fxhash = "0.2.1" +ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} +url = "2.2.2" + +# Encoding and parsing +serde_json = "1.0.81" +serde = {version = "1.0.137", features = ["derive"]} +structopt = "0.3.26" +structopt-toml = "0.5.0" +bs58 = "0.4.0" +toml = "0.5.9" diff --git a/bin/irc-raft/README.md b/bin/irc-raft/README.md new file mode 100644 index 000000000..afb3e54e1 --- /dev/null +++ b/bin/irc-raft/README.md @@ -0,0 +1,4 @@ +# ircd + +see [Darkfi Book](https://darkrenaissance.github.io/darkfi/misc/ircd.html) for the installation guide. + diff --git a/bin/irc-raft/config/inbound.toml b/bin/irc-raft/config/inbound.toml new file mode 100644 index 000000000..2ce669ed4 --- /dev/null +++ b/bin/irc-raft/config/inbound.toml @@ -0,0 +1,33 @@ +## JSON-RPC listen URL +rpc_listen="127.0.0.1:1234" + +## IRC listen URL +irc_listen="127.0.0.1:11067" + +## Sets Datastore Path +datastore="~/.config/ircd-inbound" + +## Raft net settings +[net] +## P2P accept address +inbound="127.0.0.1:11002" + +## Connection slots +#outbound_connections=5 + +## P2P external address +external_addr="127.0.0.1:11004" + +## Peers to connect to +#peers=["127.0.0.1:11003"] + +## Seed nodes to connect to +seeds=["127.0.0.1:11001"] + +## these are the default configuration for the p2p network +#manual_attempt_limit=0 +#seed_query_timeout_seconds=8 +#connect_timeout_seconds=10 +#channel_handshake_seconds=4 +#channel_heartbeat_seconds=10 + diff --git a/bin/irc-raft/config/outbound.toml b/bin/irc-raft/config/outbound.toml new file mode 100644 index 000000000..ca11eec8e --- /dev/null +++ b/bin/irc-raft/config/outbound.toml @@ -0,0 +1,33 @@ +## JSON-RPC listen URL +rpc_listen="127.0.0.1:7777" + +## IRC listen URL +irc_listen="127.0.0.1:11066" + +## Sets Datastore Path +datastore="~/.config/ircd-outbound" + +## Raft net settings +[net] +## P2P accept address +# inbound="127.0.0.1:11002" + +## Connection slots +outbound_connections=5 + +## P2P external address +#external_addr="127.0.0.1:11002" + +## Peers to connect to +#peers=["127.0.0.1:11003"] + +## Seed nodes to connect to +seeds=["127.0.0.1:11001"] + +## these are the default configuration for the p2p network +#manual_attempt_limit=0 +#seed_query_timeout_seconds=8 +#connect_timeout_seconds=10 +#channel_handshake_seconds=4 +#channel_heartbeat_seconds=10 + diff --git a/bin/irc-raft/config/seed.toml b/bin/irc-raft/config/seed.toml new file mode 100644 index 000000000..a405e06f9 --- /dev/null +++ b/bin/irc-raft/config/seed.toml @@ -0,0 +1,33 @@ +## JSON-RPC listen URL +rpc_listen="127.0.0.1:8000" + +## IRC listen URL +irc_listen="127.0.0.1:11065" + +## Sets Datastore Path +datastore="~/.config/ircd-seed" + +## Raft net settings +[net] +## P2P accept address +inbound="127.0.0.1:11001" + +## Connection slots +# outbound_connections=5 + +## P2P external address +# external_addr="127.0.0.1:11001" + +## Peers to connect to +# peers=["127.0.0.1:11001"] + +## Seed nodes to connect to +# seeds=["127.0.0.1:11002"] + +## these are the default configuration for the p2p network +#manual_attempt_limit=0 +#seed_query_timeout_seconds=8 +#connect_timeout_seconds=10 +#channel_handshake_seconds=4 +#channel_heartbeat_seconds=10 + diff --git a/bin/irc-raft/ircd_config.toml b/bin/irc-raft/ircd_config.toml new file mode 100644 index 000000000..4c9acb0df --- /dev/null +++ b/bin/irc-raft/ircd_config.toml @@ -0,0 +1,42 @@ +## JSON-RPC listen URL +#rpc_listen="tcp://127.0.0.1:11055" + +## IRC listen URL +#irc_listen="tcp://127.0.0.1:11066" + +## Sets Datastore Path +#datastore="~/.config/darkfi/ircd" + +## List of channels to autojoin for new client connections +autojoin = ["#dev"] + +## Raft net settings +[net] +## P2P accept address +#inbound="tls://127.0.0.1:11002" + +## Connection slots +outbound_connections=5 + +## P2P external address +#external_addr="tls://127.0.0.1:11002" + +## Peers to connect to +#peers=["tls://127.0.0.1:11003"] + +## Seed nodes to connect to +seeds=["tls://irc0.dark.fi:11001", "tls://irc1.dark.fi:11001"] + +## these are the default configuration for the p2p network +#manual_attempt_limit=0 +#seed_query_timeout_seconds=8 +#connect_timeout_seconds=10 +#channel_handshake_seconds=4 +#channel_heartbeat_seconds=10 + +## Per-channel settings +#[channel."#dev"] +## Create with `ircd --gen-secret` +#secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM" +## Topic to set for the channel +#topic = "DarkFi Development HQ" diff --git a/bin/irc-raft/script/meeting_bot.py b/bin/irc-raft/script/meeting_bot.py new file mode 100644 index 000000000..17b8da06c --- /dev/null +++ b/bin/irc-raft/script/meeting_bot.py @@ -0,0 +1,80 @@ +import asyncio + + +async def start(): + host = "127.0.0.1" + port = 11066 + channel = "#dev" + nickname = "meeting" + + print(f"Start a connection {host}:{port}") + reader, writer = await asyncio.open_connection(host, port) + + print("Send NICK msg") + nick_msg = f"NICK {nickname} \r\n" + writer.write(nick_msg.encode('utf8')) + + print(f"Send JOIN msg: {channel}") + join_msg = f"JOIN {channel} \r\n" + writer.write(join_msg.encode('utf8')) + + topics = [] + + while True: + msg = await reader.read(350) + msg = msg.decode('utf8').strip() + + if not msg: + print("Error: Receive empty msg") + break + + command = msg.split(" ")[1] + + if command == "PRIVMSG": + + msg_title = msg.split(" ")[3][1:] + + if not msg_title: + continue + + reply = None + + if msg_title == "#m_start": + reply = f"PRIVMSG {channel} :meeting started \r\n" + msg_title = "#m_list" + + if msg_title == "#m_end": + reply = f"PRIVMSG {channel} :meeting end \r\n" + topics = [] + + if msg_title == "#m_topic": + topic = msg.split(" ", 4) + if len(topic) != 5: + continue + topic = topic[4] + topics.append(topic) + reply = f"PRIVMSG {channel} :add topic: {topic} \r\n" + + if msg_title == "#m_list": + tp = " ".join( + [f"{i}-{topic}" for i, topic in enumerate(topics, 1)]) + + reply = f"PRIVMSG {channel} :topics: {tp} \r\n" + + if msg_title == "#m_next": + if len(topics) == 0: + reply = f"PRIVMSG {channel} :no topics \r\n" + else: + tp = topics.pop(0) + reply = f"PRIVMSG {channel} :current topic: {tp} \r\n" + + if reply != None: + writer.write(reply.encode('utf8')) + await writer.drain() + + if command == "QUIT": + break + + writer.close() + +asyncio.run(start()) diff --git a/bin/irc-raft/script/run_node.sh b/bin/irc-raft/script/run_node.sh new file mode 100755 index 000000000..2c994367d --- /dev/null +++ b/bin/irc-raft/script/run_node.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# Change this value to the hostname of the seed server +SEED_HOSTNAME=XXX + +LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}') +SEED_IP=$(getent hosts $SEED_HOSTNAME.local | awk '{print $1}' | head -n 1) +cargo run -- --accept 0.0.0.0:11004 --slots 5 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667 + diff --git a/bin/irc-raft/script/run_seed_node.sh b/bin/irc-raft/script/run_seed_node.sh new file mode 100755 index 000000000..c782a5bf1 --- /dev/null +++ b/bin/irc-raft/script/run_seed_node.sh @@ -0,0 +1,4 @@ +#!/bin/bash +LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}') +cargo run -- --accept 0.0.0.0:9999 --irc 127.0.0.1:6688 + diff --git a/bin/irc-raft/src/crypto.rs b/bin/irc-raft/src/crypto.rs new file mode 100644 index 000000000..8b18ceb05 --- /dev/null +++ b/bin/irc-raft/src/crypto.rs @@ -0,0 +1,43 @@ +use crypto_box::aead::Aead; +use rand::rngs::OsRng; + +/// Try decrypting a message given a NaCl box and a base58 string. +/// The format we're using is nonce+ciphertext, where nonce is 24 bytes. +pub fn try_decrypt_message(salt_box: &crypto_box::Box, ciphertext: &str) -> Option { + let bytes = match bs58::decode(ciphertext).into_vec() { + Ok(v) => v, + Err(_) => return None, + }; + + if bytes.len() < 25 { + return None + } + + // Try extracting the nonce + let nonce = match bytes[0..24].try_into() { + Ok(v) => v, + Err(_) => return None, + }; + + // Take the remaining ciphertext + let message = &bytes[24..]; + + // Try decrypting the message + match salt_box.decrypt(nonce, message) { + Ok(v) => Some(String::from_utf8_lossy(&v).to_string()), + Err(_) => None, + } +} + +/// Encrypt a message given a NaCl box and a plaintext string. +/// The format we're using is nonce+ciphertext, where nonce is 24 bytes. +pub fn encrypt_message(salt_box: &crypto_box::Box, plaintext: &str) -> String { + let nonce = crypto_box::generate_nonce(&mut OsRng); + let mut ciphertext = salt_box.encrypt(&nonce, plaintext.as_bytes()).unwrap(); + + let mut concat = vec![]; + concat.append(&mut nonce.as_slice().to_vec()); + concat.append(&mut ciphertext); + + bs58::encode(concat).into_string() +} diff --git a/bin/irc-raft/src/main.rs b/bin/irc-raft/src/main.rs new file mode 100644 index 000000000..5c63f82df --- /dev/null +++ b/bin/irc-raft/src/main.rs @@ -0,0 +1,261 @@ +use std::{net::SocketAddr, sync::atomic::Ordering}; + +use async_channel::{Receiver, Sender}; +use async_executor::Executor; +use async_std::{ + net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, +}; +use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; +use fxhash::FxHashMap; +use log::{debug, error, info, warn}; +use rand::rngs::OsRng; +use smol::future; +use structopt_toml::StructOptToml; + +use darkfi::{ + async_daemonize, net, + raft::{NetMsg, ProtocolRaft, Raft}, + rpc::server::listen_and_serve, + util::{ + cli::{get_log_config, get_log_level, spawn_config}, + path::{expand_path, get_config_path}, + }, + Error, Result, +}; + +pub mod crypto; +pub mod privmsg; +pub mod rpc; +pub mod server; +pub mod settings; + +use crate::{ + crypto::try_decrypt_message, + privmsg::Privmsg, + rpc::JsonRpcInterface, + server::IrcServerConnection, + settings::{parse_configured_channels, Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS}, +}; + +pub type SeenMsgIds = Arc>>; + +fn build_irc_msg(msg: &Privmsg) -> String { + debug!("ABOUT TO SEND: {:?}", msg); + let irc_msg = + format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", msg.nickname, msg.channel, msg.message); + irc_msg +} + +fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { + if line.is_empty() { + warn!("Received empty line from {}. ", peer_addr); + warn!("Closing connection."); + return Err(Error::ChannelStopped) + } + + if &line[(line.len() - 2)..] != "\r\n" { + warn!("Closing connection."); + return Err(Error::ChannelStopped) + } + // Remove CRLF + line.pop(); + line.pop(); + + Ok(line) +} + +async fn broadcast_msg( + irc_msg: String, + peer_addr: SocketAddr, + conn: &mut IrcServerConnection, +) -> Result<()> { + info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); + + if let Err(e) = conn.update(irc_msg).await { + warn!("Connection error: {} for {}", e, peer_addr); + return Err(Error::ChannelStopped) + } + + Ok(()) +} + +async fn process( + raft_receiver: Receiver, + stream: TcpStream, + peer_addr: SocketAddr, + raft_sender: Sender, + seen_msg_id: SeenMsgIds, + autojoin_chans: Vec, + configured_chans: FxHashMap, +) -> Result<()> { + let (reader, writer) = stream.split(); + + let mut reader = BufReader::new(reader); + let mut conn = IrcServerConnection::new( + writer, + seen_msg_id.clone(), + raft_sender, + autojoin_chans, + configured_chans, + ); + + loop { + let mut line = String::new(); + futures::select! { + privmsg = raft_receiver.recv().fuse() => { + let mut msg = privmsg?; + info!("Received msg from Raft: {:?}", msg); + + let mut smi = seen_msg_id.lock().await; + if smi.contains(&msg.id) { + continue + } + smi.push(msg.id); + drop(smi); + + // Try to potentially decrypt the incoming message. + if conn.configured_chans.contains_key(&msg.channel) { + let chan_info = conn.configured_chans.get(&msg.channel).unwrap(); + if !chan_info.joined.load(Ordering::Relaxed) { + continue + } + if let Some(salt_box) = &chan_info.salt_box { + if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) { + msg.message = decrypted_msg; + info!("Decrypted received message: {:?}", msg); + } + } + } + + let irc_msg = build_irc_msg(&msg); + conn.reply(&irc_msg).await?; + } + err = reader.read_line(&mut line).fuse() => { + if let Err(e) = err { + warn!("Read line error. Closing stream for {}: {}", peer_addr, e); + return Ok(()) + } + info!("Received msg from IRC client: {:?}", line); + let irc_msg = match clean_input(line, &peer_addr) { + Ok(m) => m, + Err(e) => return Err(e) + }; + broadcast_msg(irc_msg, peer_addr,&mut conn).await?; + } + }; + } +} + +async_daemonize!(realmain); +async fn realmain(settings: Args, executor: Arc>) -> Result<()> { + if settings.gen_secret { + let secret_key = crypto_box::SecretKey::generate(&mut OsRng); + let encoded = bs58::encode(secret_key.as_bytes()); + println!("{}", encoded.into_string()); + return Ok(()) + } + + let seen_msg_id: SeenMsgIds = Arc::new(Mutex::new(vec![])); + + // Pick up channel settings from the TOML configuration + let cfg_path = get_config_path(settings.config, CONFIG_FILE)?; + let configured_chans = parse_configured_channels(&cfg_path)?; + + // + //Raft + // + let datastore_path = expand_path(&settings.datastore)?; + let net_settings = settings.net; + let datastore_raft = datastore_path.join("ircd.db"); + let mut raft = Raft::::new(net_settings.inbound.clone(), datastore_raft)?; + let raft_sender = raft.get_broadcast(); + let raft_receiver = raft.get_commits(); + + // P2p setup + let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::(); + + let p2p = net::P2p::new(net_settings.into()).await; + let p2p = p2p.clone(); + + let registry = p2p.protocol_registry(); + + let seen_net_msg = Arc::new(Mutex::new(vec![])); + let raft_node_id = raft.id.clone(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let raft_node_id = raft_node_id.clone(); + let sender = p2p_send_channel.clone(); + let seen_net_msg_cloned = seen_net_msg.clone(); + async move { + ProtocolRaft::init(raft_node_id, channel, sender, p2p, seen_net_msg_cloned).await + } + }) + .await; + + p2p.clone().start(executor.clone()).await?; + + let executor_cloned = executor.clone(); + let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone())); + + // + // RPC interface + // + let rpc_listen_addr = settings.rpc_listen.clone(); + let rpc_interface = + Arc::new(JsonRpcInterface { addr: rpc_listen_addr.clone(), p2p: p2p.clone() }); + let rpc_task = + executor.spawn(async move { listen_and_serve(rpc_listen_addr, rpc_interface).await }); + + // + // IRC instance + // + let irc_listen_addr = settings.irc_listen.socket_addrs(|| None)?[0]; + let listener = TcpListener::bind(irc_listen_addr).await?; + let local_addr = listener.local_addr()?; + info!("IRC listening on {}", local_addr); + let executor_cloned = executor.clone(); + let raft_receiver_cloned = raft_receiver.clone(); + let irc_task: smol::Task> = executor.spawn(async move { + loop { + let (stream, peer_addr) = match listener.accept().await { + Ok((s, a)) => (s, a), + Err(e) => { + error!("Failed listening for connections: {}", e); + return Err(Error::ServiceStopped) + } + }; + + info!("IRC Accepted client: {}", peer_addr); + + executor_cloned + .spawn(process( + raft_receiver_cloned.clone(), + stream, + peer_addr, + raft_sender.clone(), + seen_msg_id.clone(), + settings.autojoin.clone(), + configured_chans.clone(), + )) + .detach(); + } + }); + + // Run once receive exit signal + let (signal, shutdown) = async_channel::bounded::<()>(1); + ctrlc_async::set_async_handler(async move { + warn!(target: "ircd", "ircd start Exit Signal"); + // cleaning up tasks running in the background + signal.send(()).await.unwrap(); + rpc_task.cancel().await; + irc_task.cancel().await; + p2p_run_task.cancel().await; + }) + .unwrap(); + + // blocking + raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?; + + Ok(()) +} diff --git a/bin/irc-raft/src/privmsg.rs b/bin/irc-raft/src/privmsg.rs new file mode 100644 index 000000000..f7c31951c --- /dev/null +++ b/bin/irc-raft/src/privmsg.rs @@ -0,0 +1,11 @@ +use darkfi::util::serial::{SerialDecodable, SerialEncodable}; + +pub type PrivmsgId = u32; + +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct Privmsg { + pub id: PrivmsgId, + pub nickname: String, + pub channel: String, + pub message: String, +} diff --git a/bin/irc-raft/src/rpc.rs b/bin/irc-raft/src/rpc.rs new file mode 100644 index 000000000..76cb184ec --- /dev/null +++ b/bin/irc-raft/src/rpc.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; +use log::debug; +use serde_json::{json, Value}; +use url::Url; + +use darkfi::{ + net, + rpc::{ + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, + server::RequestHandler, + }, +}; + +pub struct JsonRpcInterface { + pub addr: Url, + pub p2p: net::P2pPtr, +} + +#[async_trait] +impl RequestHandler for JsonRpcInterface { + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + if req.params.as_array().is_none() { + return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into() + } + + debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap()); + + match req.method.as_str() { + Some("ping") => self.pong(req.id, req.params).await, + Some("get_info") => self.get_info(req.id, req.params).await, + Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + } + } +} + +impl JsonRpcInterface { + // RPCAPI: + // Replies to a ping method. + // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} + // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} + async fn pong(&self, id: Value, _params: Value) -> JsonResult { + JsonResponse::new(json!("pong"), id).into() + } + + // RPCAPI: + // Retrieves P2P network information. + // --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42} + // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} + async fn get_info(&self, id: Value, _params: Value) -> JsonResult { + let resp = self.p2p.get_info().await; + JsonResponse::new(resp, id).into() + } +} diff --git a/bin/irc-raft/src/server.rs b/bin/irc-raft/src/server.rs new file mode 100644 index 000000000..54e9dcc2f --- /dev/null +++ b/bin/irc-raft/src/server.rs @@ -0,0 +1,226 @@ +use std::sync::atomic::Ordering; + +use async_std::net::TcpStream; +use futures::{io::WriteHalf, AsyncWriteExt}; +use fxhash::FxHashMap; +use log::{debug, info, warn}; +use rand::{rngs::OsRng, RngCore}; + +use darkfi::{Error, Result}; + +use crate::{crypto::encrypt_message, privmsg::Privmsg, ChannelInfo, SeenMsgIds}; + +const RPL_NOTOPIC: u32 = 331; +const RPL_TOPIC: u32 = 332; + +pub struct IrcServerConnection { + write_stream: WriteHalf, + is_nick_init: bool, + is_user_init: bool, + is_registered: bool, + nickname: String, + seen_msg_id: SeenMsgIds, + p2p_sender: async_channel::Sender, + auto_channels: Vec, + pub configured_chans: FxHashMap, +} + +impl IrcServerConnection { + pub fn new( + write_stream: WriteHalf, + seen_msg_id: SeenMsgIds, + p2p_sender: async_channel::Sender, + auto_channels: Vec, + configured_chans: FxHashMap, + ) -> Self { + Self { + write_stream, + is_nick_init: false, + is_user_init: false, + is_registered: false, + nickname: "anon".to_string(), + seen_msg_id, + p2p_sender, + auto_channels, + configured_chans, + } + } + + pub async fn update(&mut self, line: String) -> Result<()> { + let mut tokens = line.split_ascii_whitespace(); + // Commands can begin with :garbage but we will reject clients doing + // that for now to keep the protocol simple and focused. + let command = tokens.next().ok_or(Error::MalformedPacket)?; + + info!("IRC server received command: {}", command.to_uppercase()); + + match command.to_uppercase().as_str() { + "USER" => { + // We can stuff any extra things like public keys in here. + // Ignore it for now. + self.is_user_init = true; + } + "NICK" => { + let nickname = tokens.next().ok_or(Error::MalformedPacket)?; + self.is_nick_init = true; + let old_nick = std::mem::replace(&mut self.nickname, nickname.to_string()); + + let nick_reply = format!(":{}!anon@dark.fi NICK {}\r\n", old_nick, self.nickname); + self.reply(&nick_reply).await?; + } + "JOIN" => { + let channels = tokens.next().ok_or(Error::MalformedPacket)?; + for chan in channels.split(',') { + if !chan.starts_with('#') { + warn!("{} is not a valid name for channel", chan); + continue + } + let join_reply = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, chan); + self.reply(&join_reply).await?; + if !self.configured_chans.contains_key(chan) { + self.configured_chans.insert(chan.to_string(), ChannelInfo::new()?); + } else { + let chan_info = self.configured_chans.get_mut(chan).unwrap(); + chan_info.joined.store(true, Ordering::Relaxed); + } + } + } + "PART" => { + let channels = tokens.next().ok_or(Error::MalformedPacket)?; + for chan in channels.split(',') { + let part_reply = format!(":{}!anon@dark.fi PART {}\r\n", self.nickname, chan); + self.reply(&part_reply).await?; + if self.configured_chans.contains_key(chan) { + let chan_info = self.configured_chans.get_mut(chan).unwrap(); + chan_info.joined.store(false, Ordering::Relaxed); + } + } + } + "TOPIC" => { + let channel = tokens.next().ok_or(Error::MalformedPacket)?; + if let Some(substr_idx) = line.find(':') { + // Client is setting the topic + if substr_idx >= line.len() { + return Err(Error::MalformedPacket) + } + + let topic = &line[substr_idx + 1..]; + let chan_info = self.configured_chans.get_mut(channel).unwrap(); + chan_info.topic = Some(topic.to_string()); + + let topic_reply = + format!(":{}!anon@dark.fi TOPIC {} :{}\r\n", self.nickname, channel, topic); + self.reply(&topic_reply).await?; + } else { + // Client is asking or the topic + let chan_info = self.configured_chans.get(channel).unwrap(); + let topic_reply = if let Some(topic) = &chan_info.topic { + format!("{} {} {} :{}\r\n", RPL_TOPIC, self.nickname, channel, topic) + } else { + const TOPIC: &str = "No topic is set"; + format!("{} {} {} :{}\r\n", RPL_NOTOPIC, self.nickname, channel, TOPIC) + }; + self.reply(&topic_reply).await?; + } + } + "PING" => { + let line_clone = line.clone(); + let split_line: Vec<&str> = line_clone.split_whitespace().collect(); + if split_line.len() > 1 { + let pong = format!("PONG {}\r\n", split_line[1]); + self.reply(&pong).await?; + } + } + "PRIVMSG" => { + let channel = tokens.next().ok_or(Error::MalformedPacket)?; + let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?; + + if substr_idx >= line.len() { + return Err(Error::MalformedPacket) + } + + let message = &line[substr_idx + 1..]; + info!("(Plain) PRIVMSG {} :{}", channel, message); + + if self.configured_chans.contains_key(channel) { + let channel_info = self.configured_chans.get(channel).unwrap(); + if channel_info.joined.load(Ordering::Relaxed) { + let message = if let Some(salt_box) = &channel_info.salt_box { + let encrypted = encrypt_message(salt_box, message); + info!("(Encrypted) PRIVMSG {} :{}", channel, encrypted); + encrypted + } else { + message.to_string() + }; + + let random_id = OsRng.next_u32(); + + let protocol_msg = Privmsg { + id: random_id, + nickname: self.nickname.clone(), + channel: channel.to_string(), + message, + }; + + let mut smi = self.seen_msg_id.lock().await; + smi.push(random_id); + drop(smi); + + debug!(target: "ircd", "PRIVMSG to be sent: {:?}", protocol_msg); + self.p2p_sender.send(protocol_msg).await?; + } + } + } + "QUIT" => { + // Close the connection + return Err(Error::ServiceStopped) + } + _ => { + warn!("Unimplemented `{}` command", command); + } + } + + if !self.is_registered && self.is_nick_init && self.is_user_init { + debug!("Initializing peer connection"); + let register_reply = format!(":darkfi 001 {} :Let there be dark\r\n", self.nickname); + self.reply(®ister_reply).await?; + self.is_registered = true; + + // Auto-joins + macro_rules! autojoin { + ($channel:expr,$topic:expr) => { + let j = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, $channel); + let t = format!(":DarkFi TOPIC {} :{}\r\n", $channel, $topic); + self.reply(&j).await?; + self.reply(&t).await?; + }; + } + + for chan in self.auto_channels.clone() { + if self.configured_chans.contains_key(&chan) { + let chan_info = self.configured_chans.get_mut(&chan).unwrap(); + let topic = if let Some(topic) = chan_info.topic.clone() { + topic + } else { + "n/a".to_string() + }; + chan_info.topic = Some(topic.to_string()); + autojoin!(chan, topic); + } else { + let mut chan_info = ChannelInfo::new()?; + chan_info.topic = Some("n/a".to_string()); + self.configured_chans.insert(chan.clone(), chan_info); + autojoin!(chan, "n/a"); + } + } + } + + Ok(()) + } + + pub async fn reply(&mut self, message: &str) -> Result<()> { + self.write_stream.write_all(message.as_bytes()).await?; + debug!("Sent {}", message); + Ok(()) + } +} diff --git a/bin/irc-raft/src/settings.rs b/bin/irc-raft/src/settings.rs new file mode 100644 index 000000000..e0513d468 --- /dev/null +++ b/bin/irc-raft/src/settings.rs @@ -0,0 +1,120 @@ +use std::{ + path::PathBuf, + sync::{atomic::AtomicBool, Arc}, +}; + +use fxhash::FxHashMap; +use log::info; +use serde::Deserialize; +use structopt::StructOpt; +use structopt_toml::StructOptToml; +use toml::Value; +use url::Url; + +use darkfi::{net::settings::SettingsOpt, Result}; + +pub const CONFIG_FILE: &str = "ircd_config.toml"; +pub const CONFIG_FILE_CONTENTS: &str = include_str!("../ircd_config.toml"); + +/// ircd cli +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "ircd")] +pub struct Args { + /// Sets a custom config file + #[structopt(long)] + pub config: Option, + + /// JSON-RPC listen URL + #[structopt(long = "rpc", default_value = "tcp://127.0.0.1:11055")] + pub rpc_listen: Url, + + /// IRC listen URL + #[structopt(long = "irc", default_value = "tcp://127.0.0.1:11066")] + pub irc_listen: Url, + + /// Sets Datastore Path + #[structopt(long, default_value = "~/.config/darkfi/ircd")] + pub datastore: String, + + /// Generate a new NaCl secret and exit + #[structopt(long)] + pub gen_secret: bool, + + /// Autojoin channels + #[structopt(long)] + pub autojoin: Vec, + + #[structopt(flatten)] + pub net: SettingsOpt, + + /// Increase verbosity + #[structopt(short, parse(from_occurrences))] + pub verbose: u8, +} + +/// This struct holds information about preconfigured channels. +/// In the TOML configuration file, we can configure channels as such: +/// ```toml +/// [channel."#dev"] +/// secret = "GvH4kno3kUu6dqPrZ8zjMhqxTUDZ2ev16EdprZiZJgj1" +/// topic = "DarkFi Development Channel" +/// ``` +/// Having a secret will enable a NaCl box that is able to encrypt and +/// decrypt messages in this channel using this set shared secret. +/// The secret should be shared OOB, via a secure channel. +/// Having a topic set is useful if one wants to have a topic in the +/// configured channel. It is not shared with others, but it is useful +/// for personal reference. +#[derive(Clone)] +pub struct ChannelInfo { + /// Optional topic for the channel + pub topic: Option, + /// Optional NaCl box for the channel, used for {en,de}cryption. + pub salt_box: Option, + /// Flag indicates whether the user has joined the channel or not + pub joined: Arc, +} + +impl ChannelInfo { + pub fn new() -> Result { + Ok(Self { topic: None, salt_box: None, joined: Arc::new(AtomicBool::new(true)) }) + } +} + +/// Parse the configuration file for any configured channels and return +/// a map containing said configurations. +pub fn parse_configured_channels(config_file: &PathBuf) -> Result> { + let toml_contents = std::fs::read_to_string(config_file)?; + let mut ret = FxHashMap::default(); + + if let Value::Table(map) = toml::from_str(&toml_contents)? { + if map.contains_key("channel") && map["channel"].is_table() { + for chan in map["channel"].as_table().unwrap() { + info!("Found configuration for channel {}", chan.0); + let mut channel_info = ChannelInfo::new()?; + + if chan.1.as_table().unwrap().contains_key("topic") { + let topic = chan.1["topic"].as_str().unwrap().to_string(); + info!("Found topic for channel {}: {}", chan.0, topic); + channel_info.topic = Some(topic); + } + + if chan.1.as_table().unwrap().contains_key("secret") { + // Build the NaCl box + let s = chan.1["secret"].as_str().unwrap(); + let bytes: [u8; 32] = bs58::decode(s).into_vec()?.try_into().unwrap(); + let secret = crypto_box::SecretKey::from(bytes); + let public = secret.public_key(); + let msg_box = crypto_box::Box::new(&public, &secret); + channel_info.salt_box = Some(msg_box); + info!("Instantiated NaCl box for channel {}", chan.0); + } + + ret.insert(chan.0.to_string(), channel_info); + } + } + }; + + Ok(ret) +} diff --git a/bin/ircd/Cargo.toml b/bin/ircd/Cargo.toml index 8e2920c1d..0afbe7ca1 100644 --- a/bin/ircd/Cargo.toml +++ b/bin/ircd/Cargo.toml @@ -9,7 +9,7 @@ license = "AGPL-3.0-only" edition = "2021" [dependencies] -darkfi = {path = "../../", features = ["net", "rpc", "raft"]} +darkfi = {path = "../../", features = ["net", "rpc"]} # Async smol = "1.2.5" futures = "0.3.21" @@ -31,6 +31,7 @@ simplelog = "0.12.0" fxhash = "0.2.1" ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} url = "2.2.2" +ringbuffer = "0.8.4" # Encoding and parsing serde_json = "1.0.81" diff --git a/bin/ircd/ircd_config.toml b/bin/ircd/ircd_config.toml index 4c9acb0df..0a26cbf0a 100644 --- a/bin/ircd/ircd_config.toml +++ b/bin/ircd/ircd_config.toml @@ -4,13 +4,10 @@ ## IRC listen URL #irc_listen="tcp://127.0.0.1:11066" -## Sets Datastore Path -#datastore="~/.config/darkfi/ircd" - ## List of channels to autojoin for new client connections autojoin = ["#dev"] -## Raft net settings +## P2P net settings [net] ## P2P accept address #inbound="tls://127.0.0.1:11002" diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 5c63f82df..dae7d63fc 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -1,44 +1,47 @@ -use std::{net::SocketAddr, sync::atomic::Ordering}; - -use async_channel::{Receiver, Sender}; -use async_executor::Executor; use async_std::{ net::{TcpListener, TcpStream}, sync::{Arc, Mutex}, }; +use std::{net::SocketAddr, sync::atomic::Ordering}; + +use async_channel::Receiver; +use async_executor::Executor; + use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; use fxhash::FxHashMap; use log::{debug, error, info, warn}; use rand::rngs::OsRng; +use ringbuffer::RingBufferWrite; use smol::future; use structopt_toml::StructOptToml; use darkfi::{ async_daemonize, net, - raft::{NetMsg, ProtocolRaft, Raft}, rpc::server::listen_and_serve, util::{ cli::{get_log_config, get_log_level, spawn_config}, - path::{expand_path, get_config_path}, + path::get_config_path, }, Error, Result, }; pub mod crypto; pub mod privmsg; +pub mod protocol_privmsg; pub mod rpc; pub mod server; pub mod settings; use crate::{ crypto::try_decrypt_message, - privmsg::Privmsg, + privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds}, + protocol_privmsg::ProtocolPrivmsg, rpc::JsonRpcInterface, server::IrcServerConnection, settings::{parse_configured_channels, Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS}, }; -pub type SeenMsgIds = Arc>>; +const SIZE_OF_MSGS_BUFFER: usize = 4096; fn build_irc_msg(msg: &Privmsg) -> String { debug!("ABOUT TO SEND: {:?}", msg); @@ -65,90 +68,113 @@ fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { Ok(line) } -async fn broadcast_msg( - irc_msg: String, - peer_addr: SocketAddr, - conn: &mut IrcServerConnection, -) -> Result<()> { - info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); - - if let Err(e) = conn.update(irc_msg).await { - warn!("Connection error: {} for {}", e, peer_addr); - return Err(Error::ChannelStopped) - } - - Ok(()) -} - -async fn process( - raft_receiver: Receiver, - stream: TcpStream, - peer_addr: SocketAddr, - raft_sender: Sender, - seen_msg_id: SeenMsgIds, +struct Ircd { + // msgs + seen_msg_ids: SeenMsgIds, + privmsgs_buffer: PrivmsgsBuffer, + // channels autojoin_chans: Vec, configured_chans: FxHashMap, -) -> Result<()> { - let (reader, writer) = stream.split(); + // p2p + p2p: net::P2pPtr, + p2p_receiver: Receiver, +} - let mut reader = BufReader::new(reader); - let mut conn = IrcServerConnection::new( - writer, - seen_msg_id.clone(), - raft_sender, - autojoin_chans, - configured_chans, - ); +impl Ircd { + fn new( + seen_msg_ids: SeenMsgIds, + autojoin_chans: Vec, + configured_chans: FxHashMap, + p2p: net::P2pPtr, + p2p_receiver: Receiver, + ) -> Self { + let privmsgs_buffer: PrivmsgsBuffer = + Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER))); - loop { - let mut line = String::new(); - futures::select! { - privmsg = raft_receiver.recv().fuse() => { - let mut msg = privmsg?; - info!("Received msg from Raft: {:?}", msg); + Self { seen_msg_ids, privmsgs_buffer, autojoin_chans, configured_chans, p2p, p2p_receiver } + } - let mut smi = seen_msg_id.lock().await; - if smi.contains(&msg.id) { - continue - } - smi.push(msg.id); - drop(smi); + async fn process( + &self, + executor: Arc>, + stream: TcpStream, + peer_addr: SocketAddr, + ) -> Result<()> { + let (reader, writer) = stream.split(); - // Try to potentially decrypt the incoming message. - if conn.configured_chans.contains_key(&msg.channel) { - let chan_info = conn.configured_chans.get(&msg.channel).unwrap(); - if !chan_info.joined.load(Ordering::Relaxed) { - continue + let mut reader = BufReader::new(reader); + let mut conn = IrcServerConnection::new( + writer, + self.seen_msg_ids.clone(), + self.privmsgs_buffer.clone(), + self.autojoin_chans.clone(), + self.configured_chans.clone(), + self.p2p.clone(), + ); + + let p2p_receiver = self.p2p_receiver.clone(); + let privmsgs_buffer = self.privmsgs_buffer.clone(); + + executor.spawn(async move { + loop { + let mut line = String::new(); + futures::select! { + privmsg = p2p_receiver.recv().fuse() => { + let mut msg = privmsg?; + info!("Received msg from P2p network: {:?}", msg); + + // Try to potentially decrypt the incoming message. + if conn.configured_chans.contains_key(&msg.channel) { + let chan_info = conn.configured_chans.get(&msg.channel).unwrap(); + if !chan_info.joined.load(Ordering::Relaxed) { + continue + } + if let Some(salt_box) = &chan_info.salt_box { + if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) { + msg.message = decrypted_msg; + info!("Decrypted received message: {:?}", msg); + } + } + } + + // add the msg to buffer + { + (*privmsgs_buffer.lock().await).push(msg.clone()); + } + + let irc_msg = build_irc_msg(&msg); + conn.reply(&irc_msg).await?; } - if let Some(salt_box) = &chan_info.salt_box { - if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) { - msg.message = decrypted_msg; - info!("Decrypted received message: {:?}", msg); + err = reader.read_line(&mut line).fuse() => { + if let Err(e) = err { + warn!("Read line error. Closing stream for {}: {}", peer_addr, e); + return Ok(()) + } + info!("Received msg from IRC client: {:?}", line); + let irc_msg = match clean_input(line, &peer_addr) { + Ok(m) => m, + Err(e) => return Err(e) + }; + + info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); + + if let Err(e) = conn.update(irc_msg).await { + warn!("Connection error: {} for {}", e, peer_addr); + return Err(Error::ChannelStopped) } } - } - - let irc_msg = build_irc_msg(&msg); - conn.reply(&irc_msg).await?; - } - err = reader.read_line(&mut line).fuse() => { - if let Err(e) = err { - warn!("Read line error. Closing stream for {}: {}", peer_addr, e); - return Ok(()) - } - info!("Received msg from IRC client: {:?}", line); - let irc_msg = match clean_input(line, &peer_addr) { - Ok(m) => m, - Err(e) => return Err(e) }; - broadcast_msg(irc_msg, peer_addr,&mut conn).await?; } - }; + }).detach(); + + Ok(()) } } async_daemonize!(realmain); async fn realmain(settings: Args, executor: Arc>) -> Result<()> { + let seen_msg_ids = Arc::new(Mutex::new(vec![])); + if settings.gen_secret { let secret_key = crypto_box::SecretKey::generate(&mut OsRng); let encoded = bs58::encode(secret_key.as_bytes()); @@ -156,47 +182,34 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { return Ok(()) } - let seen_msg_id: SeenMsgIds = Arc::new(Mutex::new(vec![])); - // Pick up channel settings from the TOML configuration let cfg_path = get_config_path(settings.config, CONFIG_FILE)?; let configured_chans = parse_configured_channels(&cfg_path)?; // - //Raft - // - let datastore_path = expand_path(&settings.datastore)?; - let net_settings = settings.net; - let datastore_raft = datastore_path.join("ircd.db"); - let mut raft = Raft::::new(net_settings.inbound.clone(), datastore_raft)?; - let raft_sender = raft.get_broadcast(); - let raft_receiver = raft.get_commits(); - // P2p setup - let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::(); + // + let net_settings = settings.net; + let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::(); let p2p = net::P2p::new(net_settings.into()).await; let p2p = p2p.clone(); let registry = p2p.protocol_registry(); - let seen_net_msg = Arc::new(Mutex::new(vec![])); - let raft_node_id = raft.id.clone(); + let seen_msg_ids_cloned = seen_msg_ids.clone(); registry .register(net::SESSION_ALL, move |channel, p2p| { - let raft_node_id = raft_node_id.clone(); let sender = p2p_send_channel.clone(); - let seen_net_msg_cloned = seen_net_msg.clone(); - async move { - ProtocolRaft::init(raft_node_id, channel, sender, p2p, seen_net_msg_cloned).await - } + let seen_msg_ids_cloned = seen_msg_ids_cloned.clone(); + async move { ProtocolPrivmsg::init(channel, sender, p2p, seen_msg_ids_cloned).await } }) .await; p2p.clone().start(executor.clone()).await?; let executor_cloned = executor.clone(); - let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone())); + executor_cloned.spawn(p2p.clone().run(executor.clone())).detach(); // // RPC interface @@ -204,8 +217,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let rpc_listen_addr = settings.rpc_listen.clone(); let rpc_interface = Arc::new(JsonRpcInterface { addr: rpc_listen_addr.clone(), p2p: p2p.clone() }); - let rpc_task = - executor.spawn(async move { listen_and_serve(rpc_listen_addr, rpc_interface).await }); + executor.spawn(async move { listen_and_serve(rpc_listen_addr, rpc_interface).await }).detach(); // // IRC instance @@ -214,33 +226,38 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let listener = TcpListener::bind(irc_listen_addr).await?; let local_addr = listener.local_addr()?; info!("IRC listening on {}", local_addr); + let executor_cloned = executor.clone(); - let raft_receiver_cloned = raft_receiver.clone(); - let irc_task: smol::Task> = executor.spawn(async move { - loop { - let (stream, peer_addr) = match listener.accept().await { - Ok((s, a)) => (s, a), - Err(e) => { - error!("Failed listening for connections: {}", e); - return Err(Error::ServiceStopped) - } - }; + executor + .spawn(async move { + let ircd = Ircd::new( + seen_msg_ids.clone(), + settings.autojoin.clone(), + configured_chans.clone(), + p2p.clone(), + p2p_recv_channel.clone(), + ); - info!("IRC Accepted client: {}", peer_addr); + loop { + let (stream, peer_addr) = match listener.accept().await { + Ok((s, a)) => (s, a), + Err(e) => { + error!("failed accepting new connections: {}", e); + continue + } + }; - executor_cloned - .spawn(process( - raft_receiver_cloned.clone(), - stream, - peer_addr, - raft_sender.clone(), - seen_msg_id.clone(), - settings.autojoin.clone(), - configured_chans.clone(), - )) - .detach(); - } - }); + let result = ircd.process(executor_cloned.clone(), stream, peer_addr).await; + + if let Err(e) = result { + error!("failed process the {} connections: {}", peer_addr, e); + continue + }; + + info!("IRC Accepted new client: {}", peer_addr); + } + }) + .detach(); // Run once receive exit signal let (signal, shutdown) = async_channel::bounded::<()>(1); @@ -248,14 +265,13 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { warn!(target: "ircd", "ircd start Exit Signal"); // cleaning up tasks running in the background signal.send(()).await.unwrap(); - rpc_task.cancel().await; - irc_task.cancel().await; - p2p_run_task.cancel().await; }) .unwrap(); - // blocking - raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?; + // Wait for SIGINT + shutdown.recv().await?; + print!("\r"); + info!("Caught termination signal, cleaning up and exiting..."); Ok(()) } diff --git a/bin/ircd/src/privmsg.rs b/bin/ircd/src/privmsg.rs index f7c31951c..49a2297a4 100644 --- a/bin/ircd/src/privmsg.rs +++ b/bin/ircd/src/privmsg.rs @@ -1,6 +1,14 @@ +use async_std::sync::{Arc, Mutex}; + +use ringbuffer::AllocRingBuffer; + use darkfi::util::serial::{SerialDecodable, SerialEncodable}; -pub type PrivmsgId = u32; +pub type PrivmsgId = u64; + +pub type SeenMsgIds = Arc>>; + +pub type PrivmsgsBuffer = Arc>>; #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct Privmsg { diff --git a/bin/ircd/src/protocol_privmsg.rs b/bin/ircd/src/protocol_privmsg.rs new file mode 100644 index 000000000..7903b1238 --- /dev/null +++ b/bin/ircd/src/protocol_privmsg.rs @@ -0,0 +1,87 @@ +use async_std::sync::{Arc, Mutex}; + +use async_executor::Executor; +use async_trait::async_trait; +use log::debug; +use url::Url; + +use darkfi::{net, Result}; + +use crate::Privmsg; + +pub struct ProtocolPrivmsg { + jobsman: net::ProtocolJobsManagerPtr, + notify_queue_sender: async_channel::Sender, + msg_sub: net::MessageSubscription, + p2p: net::P2pPtr, + msg_ids: Arc>>, + channel_address: Url, +} + +impl ProtocolPrivmsg { + pub async fn init( + channel: net::ChannelPtr, + notify_queue_sender: async_channel::Sender, + p2p: net::P2pPtr, + msg_ids: Arc>>, + ) -> net::ProtocolBasePtr { + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + + let msg_sub = + channel.subscribe_msg::().await.expect("Missing Privmsg dispatcher!"); + let channel_address = channel.address(); + + Arc::new(Self { + notify_queue_sender, + msg_sub, + jobsman: net::ProtocolJobsManager::new("ProtocolPrivmsg", channel), + p2p, + msg_ids, + channel_address, + }) + } + + async fn handle_receive_msg(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_msg() [START]"); + let exclude_list = vec![self.channel_address.clone()]; + loop { + let msg = self.msg_sub.receive().await?; + + if self.msg_ids.lock().await.contains(&msg.id) { + continue + } + + self.msg_ids.lock().await.push(msg.id); + let msg = (*msg).clone(); + + self.notify_queue_sender.send(msg.clone()).await?; + + self.p2p.broadcast_with_exclude(msg.clone(), &exclude_list).await?; + } + } +} + +#[async_trait] +impl net::ProtocolBase for ProtocolPrivmsg { + /// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the + /// protocol task manager, then queues the reply. Sends out a ping and + /// waits for pong reply. Waits for ping and replies with a pong. + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "ircd", "ProtocolPrivmsg::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_msg(), executor.clone()).await; + debug!(target: "ircd", "ProtocolPrivmsg::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolPrivmsg" + } +} + +impl net::Message for Privmsg { + fn name() -> &'static str { + "privmsg" + } +} diff --git a/bin/ircd/src/server.rs b/bin/ircd/src/server.rs index 54e9dcc2f..ae19911d4 100644 --- a/bin/ircd/src/server.rs +++ b/bin/ircd/src/server.rs @@ -5,44 +5,56 @@ use futures::{io::WriteHalf, AsyncWriteExt}; use fxhash::FxHashMap; use log::{debug, info, warn}; use rand::{rngs::OsRng, RngCore}; +use ringbuffer::RingBufferWrite; -use darkfi::{Error, Result}; +use darkfi::{net::P2pPtr, Error, Result}; -use crate::{crypto::encrypt_message, privmsg::Privmsg, ChannelInfo, SeenMsgIds}; +use crate::{ + crypto::encrypt_message, + privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds}, + ChannelInfo, +}; const RPL_NOTOPIC: u32 = 331; const RPL_TOPIC: u32 = 332; pub struct IrcServerConnection { + // server stream write_stream: WriteHalf, + // msg ids + seen_msg_ids: SeenMsgIds, + privmsgs_buffer: PrivmsgsBuffer, + // user & channels is_nick_init: bool, is_user_init: bool, is_registered: bool, nickname: String, - seen_msg_id: SeenMsgIds, - p2p_sender: async_channel::Sender, auto_channels: Vec, pub configured_chans: FxHashMap, + // p2p + p2p: P2pPtr, } impl IrcServerConnection { pub fn new( write_stream: WriteHalf, - seen_msg_id: SeenMsgIds, - p2p_sender: async_channel::Sender, + seen_msg_ids: SeenMsgIds, + privmsgs_buffer: PrivmsgsBuffer, auto_channels: Vec, configured_chans: FxHashMap, + p2p: P2pPtr, ) -> Self { Self { write_stream, + seen_msg_ids, + privmsgs_buffer, is_nick_init: false, is_user_init: false, is_registered: false, nickname: "anon".to_string(), - seen_msg_id, - p2p_sender, auto_channels, configured_chans, + p2p, } } @@ -153,7 +165,7 @@ impl IrcServerConnection { message.to_string() }; - let random_id = OsRng.next_u32(); + let random_id = OsRng.next_u64(); let protocol_msg = Privmsg { id: random_id, @@ -162,12 +174,13 @@ impl IrcServerConnection { message, }; - let mut smi = self.seen_msg_id.lock().await; - smi.push(random_id); - drop(smi); + { + (*self.seen_msg_ids.lock().await).push(random_id); + (*self.privmsgs_buffer.lock().await).push(protocol_msg.clone()) + } debug!(target: "ircd", "PRIVMSG to be sent: {:?}", protocol_msg); - self.p2p_sender.send(protocol_msg).await?; + self.p2p.broadcast(protocol_msg).await?; } } } diff --git a/bin/ircd/src/settings.rs b/bin/ircd/src/settings.rs index e0513d468..c712d889a 100644 --- a/bin/ircd/src/settings.rs +++ b/bin/ircd/src/settings.rs @@ -33,10 +33,6 @@ pub struct Args { #[structopt(long = "irc", default_value = "tcp://127.0.0.1:11066")] pub irc_listen: Url, - /// Sets Datastore Path - #[structopt(long, default_value = "~/.config/darkfi/ircd")] - pub datastore: String, - /// Generate a new NaCl secret and exit #[structopt(long)] pub gen_secret: bool, diff --git a/bin/tau/tau-cli/src/filter.rs b/bin/tau/tau-cli/src/filter.rs index 209380272..338f50558 100644 --- a/bin/tau/tau-cli/src/filter.rs +++ b/bin/tau/tau-cli/src/filter.rs @@ -1,4 +1,4 @@ -use chrono::{Datelike, NaiveDate, NaiveDateTime}; +use chrono::{Datelike, NaiveDateTime, Utc}; use serde_json::Value; use crate::{primitives::TaskInfo, TaskEvent}; @@ -18,12 +18,11 @@ pub fn apply_filter(tasks: &mut Vec, filter: &str) { let (month, year) = (filter[..2].parse::().unwrap(), filter[2..].parse::().unwrap()); - let year = year + 2000; + let year = year + (Utc::today().year() / 100) * 100; tasks.retain(|task| { let date = task.created_at; let task_date = NaiveDateTime::from_timestamp(date, 0).date(); - let filter_date = NaiveDate::from_ymd(year, month, 1); - task_date.month() == filter_date.month() && task_date.year() == filter_date.year() + task_date.month() == month && task_date.year() == year }) } diff --git a/bin/tau/tau-cli/src/util.rs b/bin/tau/tau-cli/src/util.rs index 5cac2956c..84918eec7 100644 --- a/bin/tau/tau-cli/src/util.rs +++ b/bin/tau/tau-cli/src/util.rs @@ -12,8 +12,8 @@ use darkfi::{util::Timestamp, Result}; /// Parse due date (e.g. "1503" for 15 March) as i64 timestamp. pub fn due_as_timestamp(due: &str) -> Option { - if due.len() != 4 { - error!("Due date must be of length 4 (e.g. \"1503\" for 15 March)"); + if due.len() != 4 || !due.parse::().is_ok() { + error!("Due date must be digits of length 4 (e.g. \"1503\" for 15 March)"); return None } let (day, month) = (due[..2].parse::().unwrap(), due[2..].parse::().unwrap());