bin/irc-raft: create and clone ircd with raft consensus

This commit is contained in:
ghassmo
2022-06-17 02:46:52 +03:00
parent d0319b6b73
commit 038dbb2870
17 changed files with 1023 additions and 0 deletions

29
Cargo.lock generated
View File

@@ -2232,6 +2232,35 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "irc-raft"
version = "0.3.0"
dependencies = [
"async-channel",
"async-executor",
"async-std",
"async-trait",
"bs58",
"clap 3.1.18",
"crypto_box",
"ctrlc-async",
"darkfi",
"easy-parallel",
"futures",
"futures-rustls",
"fxhash",
"log",
"rand",
"serde",
"serde_json",
"simplelog",
"smol",
"structopt",
"structopt-toml",
"toml",
"url",
]
[[package]]
name = "ircd"
version = "0.3.0"

View File

@@ -24,6 +24,7 @@ members = [
"bin/drk",
"bin/faucetd",
"bin/ircd",
"bin/irc-raft",
"bin/dnetview",
"bin/daod",
"bin/dao-cli",

41
bin/irc-raft/Cargo.toml Normal file
View File

@@ -0,0 +1,41 @@
[package]
name = "irc-raft"
version = "0.3.0"
homepage = "https://dark.fi"
description = "P2P IRC daemon With Raft Consensus"
authors = ["darkfi <dev@dark.fi>"]
repository = "https://github.com/darkrenaissance/darkfi"
license = "AGPL-3.0-only"
edition = "2021"
[dependencies]
darkfi = {path = "../../", features = ["net", "rpc", "raft"]}
# Async
smol = "1.2.5"
futures = "0.3.21"
futures-rustls = "0.22.1"
async-std = "1.11.0"
async-trait = "0.1.53"
async-channel = "1.6.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
# Crypto
crypto_box = "0.7.2"
rand = "0.8.5"
# Misc
clap = {version = "3.1.18", features = ["derive"]}
log = "0.4.17"
simplelog = "0.12.0"
fxhash = "0.2.1"
ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]}
url = "2.2.2"
# Encoding and parsing
serde_json = "1.0.81"
serde = {version = "1.0.137", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.0"
bs58 = "0.4.0"
toml = "0.5.9"

4
bin/irc-raft/README.md Normal file
View File

@@ -0,0 +1,4 @@
# ircd
see [Darkfi Book](https://darkrenaissance.github.io/darkfi/misc/ircd.html) for the installation guide.

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:1234"
## IRC listen URL
irc_listen="127.0.0.1:11067"
## Sets Datastore Path
datastore="~/.config/ircd-inbound"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11002"
## Connection slots
#outbound_connections=5
## P2P external address
external_addr="127.0.0.1:11004"
## Peers to connect to
#peers=["127.0.0.1:11003"]
## Seed nodes to connect to
seeds=["127.0.0.1:11001"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:7777"
## IRC listen URL
irc_listen="127.0.0.1:11066"
## Sets Datastore Path
datastore="~/.config/ircd-outbound"
## Raft net settings
[net]
## P2P accept address
# inbound="127.0.0.1:11002"
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="127.0.0.1:11002"
## Peers to connect to
#peers=["127.0.0.1:11003"]
## Seed nodes to connect to
seeds=["127.0.0.1:11001"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:8000"
## IRC listen URL
irc_listen="127.0.0.1:11065"
## Sets Datastore Path
datastore="~/.config/ircd-seed"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11001"
## Connection slots
# outbound_connections=5
## P2P external address
# external_addr="127.0.0.1:11001"
## Peers to connect to
# peers=["127.0.0.1:11001"]
## Seed nodes to connect to
# seeds=["127.0.0.1:11002"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,42 @@
## JSON-RPC listen URL
#rpc_listen="tcp://127.0.0.1:11055"
## IRC listen URL
#irc_listen="tcp://127.0.0.1:11066"
## Sets Datastore Path
#datastore="~/.config/darkfi/ircd"
## List of channels to autojoin for new client connections
autojoin = ["#dev"]
## Raft net settings
[net]
## P2P accept address
#inbound="tls://127.0.0.1:11002"
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="tls://127.0.0.1:11002"
## Peers to connect to
#peers=["tls://127.0.0.1:11003"]
## Seed nodes to connect to
seeds=["tls://irc0.dark.fi:11001", "tls://irc1.dark.fi:11001"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10
## Per-channel settings
#[channel."#dev"]
## Create with `ircd --gen-secret`
#secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"
## Topic to set for the channel
#topic = "DarkFi Development HQ"

View File

@@ -0,0 +1,80 @@
import asyncio
async def start():
host = "127.0.0.1"
port = 11066
channel = "#dev"
nickname = "meeting"
print(f"Start a connection {host}:{port}")
reader, writer = await asyncio.open_connection(host, port)
print("Send NICK msg")
nick_msg = f"NICK {nickname} \r\n"
writer.write(nick_msg.encode('utf8'))
print(f"Send JOIN msg: {channel}")
join_msg = f"JOIN {channel} \r\n"
writer.write(join_msg.encode('utf8'))
topics = []
while True:
msg = await reader.read(350)
msg = msg.decode('utf8').strip()
if not msg:
print("Error: Receive empty msg")
break
command = msg.split(" ")[1]
if command == "PRIVMSG":
msg_title = msg.split(" ")[3][1:]
if not msg_title:
continue
reply = None
if msg_title == "#m_start":
reply = f"PRIVMSG {channel} :meeting started \r\n"
msg_title = "#m_list"
if msg_title == "#m_end":
reply = f"PRIVMSG {channel} :meeting end \r\n"
topics = []
if msg_title == "#m_topic":
topic = msg.split(" ", 4)
if len(topic) != 5:
continue
topic = topic[4]
topics.append(topic)
reply = f"PRIVMSG {channel} :add topic: {topic} \r\n"
if msg_title == "#m_list":
tp = " ".join(
[f"{i}-{topic}" for i, topic in enumerate(topics, 1)])
reply = f"PRIVMSG {channel} :topics: {tp} \r\n"
if msg_title == "#m_next":
if len(topics) == 0:
reply = f"PRIVMSG {channel} :no topics \r\n"
else:
tp = topics.pop(0)
reply = f"PRIVMSG {channel} :current topic: {tp} \r\n"
if reply != None:
writer.write(reply.encode('utf8'))
await writer.drain()
if command == "QUIT":
break
writer.close()
asyncio.run(start())

View File

@@ -0,0 +1,9 @@
#!/bin/bash
# Change this value to the hostname of the seed server
SEED_HOSTNAME=XXX
LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}')
SEED_IP=$(getent hosts $SEED_HOSTNAME.local | awk '{print $1}' | head -n 1)
cargo run -- --accept 0.0.0.0:11004 --slots 5 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667

View File

@@ -0,0 +1,4 @@
#!/bin/bash
LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}')
cargo run -- --accept 0.0.0.0:9999 --irc 127.0.0.1:6688

View File

@@ -0,0 +1,43 @@
use crypto_box::aead::Aead;
use rand::rngs::OsRng;
/// Try decrypting a message given a NaCl box and a base58 string.
/// The format we're using is nonce+ciphertext, where nonce is 24 bytes.
pub fn try_decrypt_message(salt_box: &crypto_box::Box, ciphertext: &str) -> Option<String> {
let bytes = match bs58::decode(ciphertext).into_vec() {
Ok(v) => v,
Err(_) => return None,
};
if bytes.len() < 25 {
return None
}
// Try extracting the nonce
let nonce = match bytes[0..24].try_into() {
Ok(v) => v,
Err(_) => return None,
};
// Take the remaining ciphertext
let message = &bytes[24..];
// Try decrypting the message
match salt_box.decrypt(nonce, message) {
Ok(v) => Some(String::from_utf8_lossy(&v).to_string()),
Err(_) => None,
}
}
/// Encrypt a message given a NaCl box and a plaintext string.
/// The format we're using is nonce+ciphertext, where nonce is 24 bytes.
pub fn encrypt_message(salt_box: &crypto_box::Box, plaintext: &str) -> String {
let nonce = crypto_box::generate_nonce(&mut OsRng);
let mut ciphertext = salt_box.encrypt(&nonce, plaintext.as_bytes()).unwrap();
let mut concat = vec![];
concat.append(&mut nonce.as_slice().to_vec());
concat.append(&mut ciphertext);
bs58::encode(concat).into_string()
}

261
bin/irc-raft/src/main.rs Normal file
View File

@@ -0,0 +1,261 @@
use std::{net::SocketAddr, sync::atomic::Ordering};
use async_channel::{Receiver, Sender};
use async_executor::Executor;
use async_std::{
net::{TcpListener, TcpStream},
sync::{Arc, Mutex},
};
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use fxhash::FxHashMap;
use log::{debug, error, info, warn};
use rand::rngs::OsRng;
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
async_daemonize, net,
raft::{NetMsg, ProtocolRaft, Raft},
rpc::server::listen_and_serve,
util::{
cli::{get_log_config, get_log_level, spawn_config},
path::{expand_path, get_config_path},
},
Error, Result,
};
pub mod crypto;
pub mod privmsg;
pub mod rpc;
pub mod server;
pub mod settings;
use crate::{
crypto::try_decrypt_message,
privmsg::Privmsg,
rpc::JsonRpcInterface,
server::IrcServerConnection,
settings::{parse_configured_channels, Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS},
};
pub type SeenMsgIds = Arc<Mutex<Vec<u32>>>;
fn build_irc_msg(msg: &Privmsg) -> String {
debug!("ABOUT TO SEND: {:?}", msg);
let irc_msg =
format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", msg.nickname, msg.channel, msg.message);
irc_msg
}
fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result<String> {
if line.is_empty() {
warn!("Received empty line from {}. ", peer_addr);
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
if &line[(line.len() - 2)..] != "\r\n" {
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
// Remove CRLF
line.pop();
line.pop();
Ok(line)
}
async fn broadcast_msg(
irc_msg: String,
peer_addr: SocketAddr,
conn: &mut IrcServerConnection,
) -> Result<()> {
info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr);
if let Err(e) = conn.update(irc_msg).await {
warn!("Connection error: {} for {}", e, peer_addr);
return Err(Error::ChannelStopped)
}
Ok(())
}
async fn process(
raft_receiver: Receiver<Privmsg>,
stream: TcpStream,
peer_addr: SocketAddr,
raft_sender: Sender<Privmsg>,
seen_msg_id: SeenMsgIds,
autojoin_chans: Vec<String>,
configured_chans: FxHashMap<String, ChannelInfo>,
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(
writer,
seen_msg_id.clone(),
raft_sender,
autojoin_chans,
configured_chans,
);
loop {
let mut line = String::new();
futures::select! {
privmsg = raft_receiver.recv().fuse() => {
let mut msg = privmsg?;
info!("Received msg from Raft: {:?}", msg);
let mut smi = seen_msg_id.lock().await;
if smi.contains(&msg.id) {
continue
}
smi.push(msg.id);
drop(smi);
// Try to potentially decrypt the incoming message.
if conn.configured_chans.contains_key(&msg.channel) {
let chan_info = conn.configured_chans.get(&msg.channel).unwrap();
if !chan_info.joined.load(Ordering::Relaxed) {
continue
}
if let Some(salt_box) = &chan_info.salt_box {
if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) {
msg.message = decrypted_msg;
info!("Decrypted received message: {:?}", msg);
}
}
}
let irc_msg = build_irc_msg(&msg);
conn.reply(&irc_msg).await?;
}
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
info!("Received msg from IRC client: {:?}", line);
let irc_msg = match clean_input(line, &peer_addr) {
Ok(m) => m,
Err(e) => return Err(e)
};
broadcast_msg(irc_msg, peer_addr,&mut conn).await?;
}
};
}
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
if settings.gen_secret {
let secret_key = crypto_box::SecretKey::generate(&mut OsRng);
let encoded = bs58::encode(secret_key.as_bytes());
println!("{}", encoded.into_string());
return Ok(())
}
let seen_msg_id: SeenMsgIds = Arc::new(Mutex::new(vec![]));
// Pick up channel settings from the TOML configuration
let cfg_path = get_config_path(settings.config, CONFIG_FILE)?;
let configured_chans = parse_configured_channels(&cfg_path)?;
//
//Raft
//
let datastore_path = expand_path(&settings.datastore)?;
let net_settings = settings.net;
let datastore_raft = datastore_path.join("ircd.db");
let mut raft = Raft::<Privmsg>::new(net_settings.inbound.clone(), datastore_raft)?;
let raft_sender = raft.get_broadcast();
let raft_receiver = raft.get_commits();
// P2p setup
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<NetMsg>();
let p2p = net::P2p::new(net_settings.into()).await;
let p2p = p2p.clone();
let registry = p2p.protocol_registry();
let seen_net_msg = Arc::new(Mutex::new(vec![]));
let raft_node_id = raft.id.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let raft_node_id = raft_node_id.clone();
let sender = p2p_send_channel.clone();
let seen_net_msg_cloned = seen_net_msg.clone();
async move {
ProtocolRaft::init(raft_node_id, channel, sender, p2p, seen_net_msg_cloned).await
}
})
.await;
p2p.clone().start(executor.clone()).await?;
let executor_cloned = executor.clone();
let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone()));
//
// RPC interface
//
let rpc_listen_addr = settings.rpc_listen.clone();
let rpc_interface =
Arc::new(JsonRpcInterface { addr: rpc_listen_addr.clone(), p2p: p2p.clone() });
let rpc_task =
executor.spawn(async move { listen_and_serve(rpc_listen_addr, rpc_interface).await });
//
// IRC instance
//
let irc_listen_addr = settings.irc_listen.socket_addrs(|| None)?[0];
let listener = TcpListener::bind(irc_listen_addr).await?;
let local_addr = listener.local_addr()?;
info!("IRC listening on {}", local_addr);
let executor_cloned = executor.clone();
let raft_receiver_cloned = raft_receiver.clone();
let irc_task: smol::Task<Result<()>> = executor.spawn(async move {
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed listening for connections: {}", e);
return Err(Error::ServiceStopped)
}
};
info!("IRC Accepted client: {}", peer_addr);
executor_cloned
.spawn(process(
raft_receiver_cloned.clone(),
stream,
peer_addr,
raft_sender.clone(),
seen_msg_id.clone(),
settings.autojoin.clone(),
configured_chans.clone(),
))
.detach();
}
});
// Run once receive exit signal
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
rpc_task.cancel().await;
irc_task.cancel().await;
p2p_run_task.cancel().await;
})
.unwrap();
// blocking
raft.start(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?;
Ok(())
}

View File

@@ -0,0 +1,11 @@
use darkfi::util::serial::{SerialDecodable, SerialEncodable};
pub type PrivmsgId = u32;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Privmsg {
pub id: PrivmsgId,
pub nickname: String,
pub channel: String,
pub message: String,
}

53
bin/irc-raft/src/rpc.rs Normal file
View File

@@ -0,0 +1,53 @@
use async_trait::async_trait;
use log::debug;
use serde_json::{json, Value};
use url::Url;
use darkfi::{
net,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
server::RequestHandler,
},
};
pub struct JsonRpcInterface {
pub addr: Url,
pub p2p: net::P2pPtr,
}
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if req.params.as_array().is_none() {
return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into()
}
debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap());
match req.method.as_str() {
Some("ping") => self.pong(req.id, req.params).await,
Some("get_info") => self.get_info(req.id, req.params).await,
Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
}
impl JsonRpcInterface {
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
async fn pong(&self, id: Value, _params: Value) -> JsonResult {
JsonResponse::new(json!("pong"), id).into()
}
// RPCAPI:
// Retrieves P2P network information.
// --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
let resp = self.p2p.get_info().await;
JsonResponse::new(resp, id).into()
}
}

226
bin/irc-raft/src/server.rs Normal file
View File

@@ -0,0 +1,226 @@
use std::sync::atomic::Ordering;
use async_std::net::TcpStream;
use futures::{io::WriteHalf, AsyncWriteExt};
use fxhash::FxHashMap;
use log::{debug, info, warn};
use rand::{rngs::OsRng, RngCore};
use darkfi::{Error, Result};
use crate::{crypto::encrypt_message, privmsg::Privmsg, ChannelInfo, SeenMsgIds};
const RPL_NOTOPIC: u32 = 331;
const RPL_TOPIC: u32 = 332;
pub struct IrcServerConnection {
write_stream: WriteHalf<TcpStream>,
is_nick_init: bool,
is_user_init: bool,
is_registered: bool,
nickname: String,
seen_msg_id: SeenMsgIds,
p2p_sender: async_channel::Sender<Privmsg>,
auto_channels: Vec<String>,
pub configured_chans: FxHashMap<String, ChannelInfo>,
}
impl IrcServerConnection {
pub fn new(
write_stream: WriteHalf<TcpStream>,
seen_msg_id: SeenMsgIds,
p2p_sender: async_channel::Sender<Privmsg>,
auto_channels: Vec<String>,
configured_chans: FxHashMap<String, ChannelInfo>,
) -> Self {
Self {
write_stream,
is_nick_init: false,
is_user_init: false,
is_registered: false,
nickname: "anon".to_string(),
seen_msg_id,
p2p_sender,
auto_channels,
configured_chans,
}
}
pub async fn update(&mut self, line: String) -> Result<()> {
let mut tokens = line.split_ascii_whitespace();
// Commands can begin with :garbage but we will reject clients doing
// that for now to keep the protocol simple and focused.
let command = tokens.next().ok_or(Error::MalformedPacket)?;
info!("IRC server received command: {}", command.to_uppercase());
match command.to_uppercase().as_str() {
"USER" => {
// We can stuff any extra things like public keys in here.
// Ignore it for now.
self.is_user_init = true;
}
"NICK" => {
let nickname = tokens.next().ok_or(Error::MalformedPacket)?;
self.is_nick_init = true;
let old_nick = std::mem::replace(&mut self.nickname, nickname.to_string());
let nick_reply = format!(":{}!anon@dark.fi NICK {}\r\n", old_nick, self.nickname);
self.reply(&nick_reply).await?;
}
"JOIN" => {
let channels = tokens.next().ok_or(Error::MalformedPacket)?;
for chan in channels.split(',') {
if !chan.starts_with('#') {
warn!("{} is not a valid name for channel", chan);
continue
}
let join_reply = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, chan);
self.reply(&join_reply).await?;
if !self.configured_chans.contains_key(chan) {
self.configured_chans.insert(chan.to_string(), ChannelInfo::new()?);
} else {
let chan_info = self.configured_chans.get_mut(chan).unwrap();
chan_info.joined.store(true, Ordering::Relaxed);
}
}
}
"PART" => {
let channels = tokens.next().ok_or(Error::MalformedPacket)?;
for chan in channels.split(',') {
let part_reply = format!(":{}!anon@dark.fi PART {}\r\n", self.nickname, chan);
self.reply(&part_reply).await?;
if self.configured_chans.contains_key(chan) {
let chan_info = self.configured_chans.get_mut(chan).unwrap();
chan_info.joined.store(false, Ordering::Relaxed);
}
}
}
"TOPIC" => {
let channel = tokens.next().ok_or(Error::MalformedPacket)?;
if let Some(substr_idx) = line.find(':') {
// Client is setting the topic
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let topic = &line[substr_idx + 1..];
let chan_info = self.configured_chans.get_mut(channel).unwrap();
chan_info.topic = Some(topic.to_string());
let topic_reply =
format!(":{}!anon@dark.fi TOPIC {} :{}\r\n", self.nickname, channel, topic);
self.reply(&topic_reply).await?;
} else {
// Client is asking or the topic
let chan_info = self.configured_chans.get(channel).unwrap();
let topic_reply = if let Some(topic) = &chan_info.topic {
format!("{} {} {} :{}\r\n", RPL_TOPIC, self.nickname, channel, topic)
} else {
const TOPIC: &str = "No topic is set";
format!("{} {} {} :{}\r\n", RPL_NOTOPIC, self.nickname, channel, TOPIC)
};
self.reply(&topic_reply).await?;
}
}
"PING" => {
let line_clone = line.clone();
let split_line: Vec<&str> = line_clone.split_whitespace().collect();
if split_line.len() > 1 {
let pong = format!("PONG {}\r\n", split_line[1]);
self.reply(&pong).await?;
}
}
"PRIVMSG" => {
let channel = tokens.next().ok_or(Error::MalformedPacket)?;
let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?;
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let message = &line[substr_idx + 1..];
info!("(Plain) PRIVMSG {} :{}", channel, message);
if self.configured_chans.contains_key(channel) {
let channel_info = self.configured_chans.get(channel).unwrap();
if channel_info.joined.load(Ordering::Relaxed) {
let message = if let Some(salt_box) = &channel_info.salt_box {
let encrypted = encrypt_message(salt_box, message);
info!("(Encrypted) PRIVMSG {} :{}", channel, encrypted);
encrypted
} else {
message.to_string()
};
let random_id = OsRng.next_u32();
let protocol_msg = Privmsg {
id: random_id,
nickname: self.nickname.clone(),
channel: channel.to_string(),
message,
};
let mut smi = self.seen_msg_id.lock().await;
smi.push(random_id);
drop(smi);
debug!(target: "ircd", "PRIVMSG to be sent: {:?}", protocol_msg);
self.p2p_sender.send(protocol_msg).await?;
}
}
}
"QUIT" => {
// Close the connection
return Err(Error::ServiceStopped)
}
_ => {
warn!("Unimplemented `{}` command", command);
}
}
if !self.is_registered && self.is_nick_init && self.is_user_init {
debug!("Initializing peer connection");
let register_reply = format!(":darkfi 001 {} :Let there be dark\r\n", self.nickname);
self.reply(&register_reply).await?;
self.is_registered = true;
// Auto-joins
macro_rules! autojoin {
($channel:expr,$topic:expr) => {
let j = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, $channel);
let t = format!(":DarkFi TOPIC {} :{}\r\n", $channel, $topic);
self.reply(&j).await?;
self.reply(&t).await?;
};
}
for chan in self.auto_channels.clone() {
if self.configured_chans.contains_key(&chan) {
let chan_info = self.configured_chans.get_mut(&chan).unwrap();
let topic = if let Some(topic) = chan_info.topic.clone() {
topic
} else {
"n/a".to_string()
};
chan_info.topic = Some(topic.to_string());
autojoin!(chan, topic);
} else {
let mut chan_info = ChannelInfo::new()?;
chan_info.topic = Some("n/a".to_string());
self.configured_chans.insert(chan.clone(), chan_info);
autojoin!(chan, "n/a");
}
}
}
Ok(())
}
pub async fn reply(&mut self, message: &str) -> Result<()> {
self.write_stream.write_all(message.as_bytes()).await?;
debug!("Sent {}", message);
Ok(())
}
}

View File

@@ -0,0 +1,120 @@
use std::{
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
};
use fxhash::FxHashMap;
use log::info;
use serde::Deserialize;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use toml::Value;
use url::Url;
use darkfi::{net::settings::SettingsOpt, Result};
pub const CONFIG_FILE: &str = "ircd_config.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../ircd_config.toml");
/// ircd cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "ircd")]
pub struct Args {
/// Sets a custom config file
#[structopt(long)]
pub config: Option<String>,
/// JSON-RPC listen URL
#[structopt(long = "rpc", default_value = "tcp://127.0.0.1:11055")]
pub rpc_listen: Url,
/// IRC listen URL
#[structopt(long = "irc", default_value = "tcp://127.0.0.1:11066")]
pub irc_listen: Url,
/// Sets Datastore Path
#[structopt(long, default_value = "~/.config/darkfi/ircd")]
pub datastore: String,
/// Generate a new NaCl secret and exit
#[structopt(long)]
pub gen_secret: bool,
/// Autojoin channels
#[structopt(long)]
pub autojoin: Vec<String>,
#[structopt(flatten)]
pub net: SettingsOpt,
/// Increase verbosity
#[structopt(short, parse(from_occurrences))]
pub verbose: u8,
}
/// This struct holds information about preconfigured channels.
/// In the TOML configuration file, we can configure channels as such:
/// ```toml
/// [channel."#dev"]
/// secret = "GvH4kno3kUu6dqPrZ8zjMhqxTUDZ2ev16EdprZiZJgj1"
/// topic = "DarkFi Development Channel"
/// ```
/// Having a secret will enable a NaCl box that is able to encrypt and
/// decrypt messages in this channel using this set shared secret.
/// The secret should be shared OOB, via a secure channel.
/// Having a topic set is useful if one wants to have a topic in the
/// configured channel. It is not shared with others, but it is useful
/// for personal reference.
#[derive(Clone)]
pub struct ChannelInfo {
/// Optional topic for the channel
pub topic: Option<String>,
/// Optional NaCl box for the channel, used for {en,de}cryption.
pub salt_box: Option<crypto_box::Box>,
/// Flag indicates whether the user has joined the channel or not
pub joined: Arc<AtomicBool>,
}
impl ChannelInfo {
pub fn new() -> Result<Self> {
Ok(Self { topic: None, salt_box: None, joined: Arc::new(AtomicBool::new(true)) })
}
}
/// Parse the configuration file for any configured channels and return
/// a map containing said configurations.
pub fn parse_configured_channels(config_file: &PathBuf) -> Result<FxHashMap<String, ChannelInfo>> {
let toml_contents = std::fs::read_to_string(config_file)?;
let mut ret = FxHashMap::default();
if let Value::Table(map) = toml::from_str(&toml_contents)? {
if map.contains_key("channel") && map["channel"].is_table() {
for chan in map["channel"].as_table().unwrap() {
info!("Found configuration for channel {}", chan.0);
let mut channel_info = ChannelInfo::new()?;
if chan.1.as_table().unwrap().contains_key("topic") {
let topic = chan.1["topic"].as_str().unwrap().to_string();
info!("Found topic for channel {}: {}", chan.0, topic);
channel_info.topic = Some(topic);
}
if chan.1.as_table().unwrap().contains_key("secret") {
// Build the NaCl box
let s = chan.1["secret"].as_str().unwrap();
let bytes: [u8; 32] = bs58::decode(s).into_vec()?.try_into().unwrap();
let secret = crypto_box::SecretKey::from(bytes);
let public = secret.public_key();
let msg_box = crypto_box::Box::new(&public, &secret);
channel_info.salt_box = Some(msg_box);
info!("Instantiated NaCl box for channel {}", chan.0);
}
ret.insert(chan.0.to_string(), channel_info);
}
}
};
Ok(ret)
}