create ircd2 bin: ircd based on hashchain

This commit is contained in:
ghassmo
2022-09-30 16:04:25 +04:00
parent 68f67c665a
commit d4f0aa8285
22 changed files with 3042 additions and 0 deletions

33
Cargo.lock generated
View File

@@ -2418,6 +2418,39 @@ dependencies = [
"url",
]
[[package]]
name = "ircd2"
version = "0.4.0"
dependencies = [
"async-channel",
"async-executor",
"async-std",
"async-trait",
"bs58",
"chrono",
"clap 3.2.22",
"crypto_box",
"ctrlc",
"darkfi",
"easy-parallel",
"futures",
"futures-rustls",
"fxhash",
"hex",
"log",
"rand",
"ripemd",
"rustls-pemfile",
"serde",
"serde_json",
"simplelog",
"smol",
"structopt",
"structopt-toml",
"toml",
"url",
]
[[package]]
name = "itertools"
version = "0.10.5"

View File

@@ -27,6 +27,7 @@ members = [
"bin/fud/fu",
"bin/fud/fud",
"bin/ircd",
"bin/ircd2",
"bin/dnetview",
"bin/daod",
"bin/dao-cli",

49
bin/ircd2/Cargo.toml Normal file
View File

@@ -0,0 +1,49 @@
[package]
name = "ircd2"
description = "P2P IRC daemon"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"
homepage = "https://dark.fi"
repository = "https://github.com/darkrenaissance/darkfi"
keywords = []
categories = []
[dependencies]
darkfi = {path = "../../", features = ["net", "rpc", "bs58"]}
# Async
smol = "1.2.5"
futures = "0.3.24"
futures-rustls = "0.22.2"
rustls-pemfile = "1.0.1"
async-std = "1.12.0"
async-trait = "0.1.57"
async-channel = "1.7.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
# Crypto
crypto_box = "0.8.1"
rand = "0.8.5"
# Misc
clap = {version = "3.2.20", features = ["derive"]}
log = "0.4.17"
simplelog = "0.12.0"
fxhash = "0.2.1"
ctrlc = { version = "3.2.3", features = ["termination"] }
url = "2.3.1"
chrono = "0.4.22"
ripemd = "0.1.3"
# Encoding and parsing
serde_json = "1.0.85"
serde = {version = "1.0.145", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.1"
bs58 = "0.4.0"
toml = "0.5.9"
hex = "0.4.3"

4
bin/ircd2/README.md Normal file
View File

@@ -0,0 +1,4 @@
# ircd
see [Darkfi Book](https://darkrenaissance.github.io/darkfi/misc/ircd.html) for the installation guide.

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:1234"
## IRC listen URL
irc_listen="127.0.0.1:11067"
## Sets Datastore Path
datastore="~/.config/ircd-inbound"
## Raft net settings
[net]
## P2P accept addresses
inbound=["127.0.0.1:11002"]
## Connection slots
#outbound_connections=5
## P2P external addresses
external_addr=["127.0.0.1:11004"]
## Peers to connect to
#peers=["127.0.0.1:11003"]
## Seed nodes to connect to
seeds=["127.0.0.1:11001"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:7777"
## IRC listen URL
irc_listen="127.0.0.1:11066"
## Sets Datastore Path
datastore="~/.config/ircd-outbound"
## Raft net settings
[net]
## P2P accept addresses
# inbound=["127.0.0.1:11002"]
## Connection slots
outbound_connections=5
## P2P external addresses
#external_addr=["127.0.0.1:11002"]
## Peers to connect to
#peers=["127.0.0.1:11003"]
## Seed nodes to connect to
seeds=["127.0.0.1:11001"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,33 @@
## JSON-RPC listen URL
rpc_listen="127.0.0.1:8000"
## IRC listen URL
irc_listen="127.0.0.1:11065"
## Sets Datastore Path
datastore="~/.config/ircd-seed"
## Raft net settings
[net]
## P2P accept addresses
inbound=["127.0.0.1:11001"]
## Connection slots
# outbound_connections=5
## P2P external addresses
# external_addr=["127.0.0.1:11001"]
## Peers to connect to
# peers=["127.0.0.1:11001"]
## Seed nodes to connect to
# seeds=["127.0.0.1:11002"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,84 @@
## JSON-RPC listen URL
#rpc_listen="tcp://127.0.0.1:25550"
## IRC listen URL
#irc_listen="tcp://127.0.0.1:6667"
#irc_listen="tls://0.0.0.0:6697"
## TLS certificate path if IRC acceptor uses TLS (optional)
#irc_tls_cert = "/etc/letsencrypt/ircd/fullchain.pem"
## TLS secret key path if IRC acceptor uses TLS (optional)
#irc_tls_secret = "/etc/letsencrypt/ircd/privkey.pem"
## List of channels to autojoin for new client connections
autojoin = ["#dev", "#memes", "#philosophy", "#markets", "#math", "#random"]
## Daemon specific password (optional, but once you configure it,
## it is required from the client side)
#password="CHANGE_ME"
## P2P net settings
[net]
## Connection slots
outbound_connections=5
## P2P accept addresses
#inbound = ["tls://0.0.0.0:25551"]
# ipv6 version:
#inbound = ["tls://[::]:25551"]
## P2P external addresses
## Put your IPs or hostnames here
## This is how people can reach you on the inbound port configured above
## You can also put an ipv6 address :)
#external_addr = ["tls://XXX.XXX.XXX.XXX:25551"]
# ipv6 version:
#external_addr = ["tls://[ipv6 address here]:25551"]
## Manually configured peers to connect to
#peers = ["tls://127.0.0.1:25551"]
## Seed nodes to connect to
seeds = ["tls://lilith0.dark.fi:25551", "tls://lilith1.dark.fi:25551"]
# Prefered transports for outbound connections
#outbound_transports = ["tls", "tcp"]
## Only used for debugging. Compromises privacy when set.
#node_id = "foo"
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10
## Per-channel settings
#[channel."#foo"]
## Create with `ircd --gen-secret`
#secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"
## Topic to set for the channel
#topic = "DarkFi Foo Stuff"
[channel."#dev"]
topic = "DarkFi Development HQ"
### Contacts list
## Private key used to decrypt direct messages to you along with
## contact_pubkey. (configured only once)
## Generate keypair using 'ircd --gen-keypair'
## or to save to a file 'ircd --gen-keypair -o ~/some_dir/filename'
## !!SHOULD NEVER SHARE THIS!!
#[private_key."955Dfa83pU7RCevT2rMrGfhza4kcy6FShSNE6AdR4Q7A"]
## Shared Public key that encrypt direct communication between two nicknames
## on the network.
## These are in the form of [Contact_nick, Contact_pubkey]
## Per-contact settings
## contact nickname (this is not irc nickname, and could be anything you want)
## irc msgs should be pointed to this nickname.
#[contact."narodnik"]
## contact public key
#contact_pubkey = "C9vC6HNDfGQofWCapZfQK5MkV1JR8Cct839RDUCqbDGK"

View File

@@ -0,0 +1,18 @@
IRC Bots
========
## `meetbot`
`meetbot.py` is a bot used to keep topics for IRC channels, that can
be discussed on meetings. Multiple channels can be configured and this
is done through `meetbot_cfg.py`.
**Notes:**
* Never add secrets to the public repo config!
### Setup
* Download `meetbot.py` and `meetbot_cfg.py`
* Edit `meetbot_cfg.py` for your needs.
* Navigate terminal to the folder where `meetbot.py` is.
* Run the bot: `$ python meetbot.py`

249
bin/ircd2/script/meetbot.py Executable file
View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
import asyncio
import logging
import pickle
from base58 import b58decode
from nacl.public import PrivateKey, Box
from meetbot_cfg import config
# Initialized channels from the configuration
CHANS = {}
# Pickle DB
PICKLE_DB = "meetbot.pickle"
# TODO: while this is nice to support, it would perhaps be better to do it
# all over the same connection rather than opening a socket for each channel.
async def channel_listen(host, port, nick, chan):
try:
logging.info("%s: Connecting to %s:%s", chan, host, port)
reader, writer = await asyncio.open_connection(host, port)
logging.debug("%s: Send CAP msg", chan)
msg = "CAP REQ : no-history\r\n"
writer.write(msg.encode("utf-8"))
await writer.drain()
logging.debug("%s: Send NICK msg", chan)
msg = f"NICK {nick}\r\n"
writer.write(msg.encode("utf-8"))
await writer.drain()
logging.debug("%s: Send CAP END msg", chan)
msg = "CAP END\r\n"
writer.write(msg.encode("utf-8"))
await writer.drain()
logging.debug("%s: Send JOIN msg", chan)
msg = f"JOIN {chan}\r\n"
writer.write(msg.encode("utf-8"))
await writer.drain()
logging.info("%s: Listening to channel", chan)
while True:
msg = await reader.readline()
msg = msg.decode("utf8")
if not msg:
continue
split_msg = msg.split(" ")
command = split_msg[1]
nick_c = split_msg[0][1:].rsplit("!", 1)[0]
logging.debug("%s: Recv: %s", chan, msg.rstrip())
if command == "PRIVMSG":
msg_title = msg.split(" ")[3][1:].rstrip()
if not msg_title:
logging.info("%s: Recv empty PRIVMSG, ignoring", chan)
continue
if msg_title == "!start":
logging.info("%s: Got !start", chan)
topics = CHANS[chan]["topics"]
reply = f"PRIVMSG {chan} :Meeting started"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
if len(topics) == 0:
reply = f"PRIVMSG {chan} :No topics"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
continue
reply = f"PRIVMSG {chan} :Topics:"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
for i, topic in enumerate(topics):
reply = f"PRIVMSG {chan} :{i+1}. {topic}"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
cur_topic = topics.pop(0)
reply = f"PRIVMSG {chan} :Current topic: {cur_topic}\r\n"
CHANS[chan]["topics"] = topics
writer.write(reply.encode("utf-8"))
await writer.drain()
continue
if msg_title == "!end":
logging.info("%s: Got !end", chan)
reply = f"PRIVMSG {chan} :Meeting ended"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
continue
if msg_title == "!topic":
logging.info("%s: Got !topic", chan)
topic = msg.split(" ", 4)
if len(topic) != 5:
logging.debug("%s: Topic msg len not 5, skipping",
chan)
continue
topic = topic[4].rstrip() + f" (by {nick_c})"
if topic == "":
logging.debug("%s: Topic message empty, skipping",
chan)
continue
topics = CHANS[chan]["topics"]
if topic not in topics:
topics.append(topic)
CHANS[chan]["topics"] = topics
logging.debug("%s: Appended topic to channel topics",
chan)
reply = f"PRIVMSG {chan} :Added topic: {topic}"
logging.info("%s: Send: %s", chan, reply)
else:
logging.debug("%s: Topic already in list of topics",
chan)
reply = f"PRIVMSG {chan} :Topic already in list"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
continue
if msg_title == "!list":
logging.info("%s: Got !list", chan)
topics = CHANS[chan]["topics"]
if len(topics) == 0:
reply = f"PRIVMSG {chan} :No topics"
else:
reply = f"PRIVMSG {chan} :Topics:"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
for i, topic in enumerate(topics):
reply = f"PRIVMSG {chan} :{i+1}. {topic}"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
continue
if msg_title == "!next":
logging.info("%s: Got !next", chan)
topics = CHANS[chan]["topics"]
if len(topics) == 0:
reply = f"PRIVMSG {chan} :No further topics"
else:
cur_topic = topics.pop(0)
CHANS[chan]["topics"] = topics
reply = f"PRIVMSG {chan} :Current topic: {cur_topic}"
logging.info("%s: Send: %s", chan, reply)
writer.write((reply + "\r\n").encode("utf-8"))
await writer.drain()
continue
except KeyboardInterrupt:
return
except ConnectionRefusedError:
logging.warning("%s: Connection refused, trying again in 3s...", chan)
await asyncio.sleep(3)
await channel_listen(host, port, nick, chan)
except Exception as e:
logging.error("EXCEPTION: %s", e)
logging.warn("%s: Connection interrupted. Reconnecting in 3s...", chan)
await asyncio.sleep(3)
await channel_listen(host, port, nick, chan)
async def main(debug=False):
global CHANS
loglevel = logging.DEBUG if debug else logging.INFO
logfmt = "%(asctime)s [%(levelname)s]\t%(message)s"
logging.basicConfig(format=logfmt,
level=loglevel,
datefmt="%Y-%m-%d %H:%M:%S")
try:
with open(PICKLE_DB, "rb") as pickle_fd:
CHANS = pickle.load(pickle_fd)
logging.info("Loaded pickle database")
except:
logging.info("Did not find pickle database")
for i in config["channels"]:
name = i["name"]
logging.info("Found config for channel %s", name)
# TODO: This will be useful when ircd has a CAP that tells it to
# give **all** messages to the connected client, no matter if ircd
# itself has a configured secret or not.
# This way the ircd itself doesn't have to keep channel secrets, but
# they can rather only be held by this bot. In turn this means the bot
# can be deployed with any ircd.
if i["secret"]:
logging.info("Instantiating NaCl box for %s", name)
secret = b58decode(i["secret"].encode("utf-8"))
secret = PrivateKey(secret)
public = secret.public_key
box = Box(secret, public)
else:
box = None
if not CHANS.get(name):
CHANS[name] = {}
if not CHANS[name].get("topics"):
CHANS[name]["topics"] = []
CHANS[name]["box"] = box
coroutines = []
for i in CHANS.keys():
logging.debug("Creating async task for %s", i)
task = asyncio.create_task(
channel_listen(config["host"], config["port"], config["nick"], i))
coroutines.append(task)
await asyncio.gather(*coroutines)
if __name__ == "__main__":
from sys import argv
DBG = bool(len(argv) == 2 and argv[1] == "-v")
try:
asyncio.run(main(debug=DBG))
except KeyboardInterrupt:
print("\rCaught ^C, saving pickle and exiting")
with open(PICKLE_DB, "wb") as fdesc:
pickle.dump(CHANS, fdesc, protocol=pickle.HIGHEST_PROTOCOL)

View File

@@ -0,0 +1,23 @@
config = {
# IRC server host
"host": "127.0.0.1",
# IRC server port
"port": 6667,
# IRC nickname
"nick": "meetbot",
"channels": [
{
"name": "#foo",
"secret": None,
},
{
"name": "#secret_channel",
# TODO: This is useless right now, but it would be nice
# to add a CAP in ircd to give all incoming PRIVMSG to be
# able to check them.
"secret": "HNEKcUmwsspdaL9b8sFn45b8Rf3bzv1LdYS1JVNvkPGL",
},
],
}

130
bin/ircd2/src/crypto.rs Normal file
View File

@@ -0,0 +1,130 @@
use std::fmt;
use crypto_box::{
aead::{Aead, AeadCore},
SalsaBox,
};
use fxhash::FxHashMap;
use rand::rngs::OsRng;
use crate::{
privmsg::PrivMsgEvent,
settings::{ChannelInfo, ContactInfo},
};
#[derive(serde::Serialize)]
pub struct KeyPair {
pub private_key: String,
pub public_key: String,
}
impl fmt::Display for KeyPair {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Public key: {}\nPrivate key: {}", self.public_key, self.private_key)
}
}
/// The format we're using is nonce+ciphertext, where nonce is 24 bytes.
fn try_decrypt(salt_box: &SalsaBox, ciphertext: &str) -> Option<String> {
let bytes = match bs58::decode(ciphertext).into_vec() {
Ok(v) => v,
Err(_) => return None,
};
if bytes.len() < 25 {
return None
}
// Try extracting the nonce
let nonce = match bytes[0..24].try_into() {
Ok(v) => v,
Err(_) => return None,
};
// Take the remaining ciphertext
let message = &bytes[24..];
// Try decrypting the message
match salt_box.decrypt(nonce, message) {
Ok(v) => Some(String::from_utf8_lossy(&v).to_string()),
Err(_) => None,
}
}
/// The format we're using is nonce+ciphertext, where nonce is 24 bytes.
pub fn encrypt(salt_box: &SalsaBox, plaintext: &str) -> String {
let nonce = SalsaBox::generate_nonce(&mut OsRng);
let mut ciphertext = salt_box.encrypt(&nonce, plaintext.as_bytes()).unwrap();
let mut concat = vec![];
concat.append(&mut nonce.as_slice().to_vec());
concat.append(&mut ciphertext);
bs58::encode(concat).into_string()
}
/// Decrypt PrivMsg target
pub fn decrypt_target(
contact: &mut String,
privmsg: &mut PrivMsgEvent,
configured_chans: FxHashMap<String, ChannelInfo>,
configured_contacts: FxHashMap<String, ContactInfo>,
) {
for chan_name in configured_chans.keys() {
let chan_info = configured_chans.get(chan_name).unwrap();
if !chan_info.joined {
continue
}
let salt_box = chan_info.salt_box.clone();
if let Some(salt_box) = salt_box {
let decrypted_target = try_decrypt(&salt_box, &privmsg.target);
if decrypted_target.is_none() {
continue
}
let target = decrypted_target.unwrap();
if *chan_name == target {
privmsg.target = target;
return
}
}
}
for cnt_name in configured_contacts.keys() {
let cnt_info = configured_contacts.get(cnt_name).unwrap();
let salt_box = cnt_info.salt_box.clone();
if let Some(salt_box) = salt_box {
let decrypted_target = try_decrypt(&salt_box, &privmsg.target);
if decrypted_target.is_none() {
continue
}
let target = decrypted_target.unwrap();
privmsg.target = target;
*contact = cnt_name.into();
return
}
}
}
/// Decrypt PrivMsg nickname and message
pub fn decrypt_privmsg(salt_box: &SalsaBox, privmsg: &mut PrivMsgEvent) {
let decrypted_nick = try_decrypt(&salt_box.clone(), &privmsg.nick);
let decrypted_msg = try_decrypt(&salt_box.clone(), &privmsg.msg);
if decrypted_nick.is_none() && decrypted_msg.is_none() {
return
}
privmsg.nick = decrypted_nick.unwrap();
privmsg.msg = decrypted_msg.unwrap();
}
/// Encrypt PrivMsg
pub fn encrypt_privmsg(salt_box: &SalsaBox, privmsg: &mut PrivMsgEvent) {
privmsg.nick = encrypt(salt_box, &privmsg.nick);
privmsg.target = encrypt(salt_box, &privmsg.target);
privmsg.msg = encrypt(salt_box, &privmsg.msg);
}

View File

@@ -0,0 +1,24 @@
use std::sync::Arc;
use darkfi::{Error, Result};
use crate::model::Event;
pub type EventsQueuePtr = Arc<EventsQueue>;
pub struct EventsQueue(async_channel::Sender<Event>, async_channel::Receiver<Event>);
impl EventsQueue {
pub fn new() -> EventsQueuePtr {
let (sn, rv) = async_channel::unbounded();
Arc::new(Self(sn, rv))
}
pub async fn fetch(&self) -> Result<Event> {
self.1.recv().await.map_err(Error::from)
}
pub async fn dispatch(&self, event: &Event) -> Result<()> {
self.0.send(event.clone()).await.map_err(Error::from)
}
}

548
bin/ircd2/src/irc/client.rs Normal file
View File

@@ -0,0 +1,548 @@
use std::net::SocketAddr;
use futures::{
io::{BufReader, ReadHalf, WriteHalf},
AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt,
};
use log::{debug, error, info, warn};
use darkfi::{system::Subscription, Error, Result};
use crate::{
crypto::{decrypt_privmsg, decrypt_target, encrypt_privmsg},
privmsg::PrivMsgEvent,
settings,
settings::RPL,
ChannelInfo,
};
use super::IrcConfig;
pub struct IrcClient<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
// network stream
write_stream: WriteHalf<C>,
read_stream: BufReader<ReadHalf<C>>,
pub address: SocketAddr,
// irc config
irc_config: IrcConfig,
server_notifier: async_channel::Sender<(PrivMsgEvent, u64)>,
subscription: Subscription<PrivMsgEvent>,
}
impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
pub fn new(
write_stream: WriteHalf<C>,
read_stream: BufReader<ReadHalf<C>>,
address: SocketAddr,
irc_config: IrcConfig,
server_notifier: async_channel::Sender<(PrivMsgEvent, u64)>,
subscription: Subscription<PrivMsgEvent>,
) -> Self {
Self { write_stream, read_stream, address, irc_config, subscription, server_notifier }
}
/// Start listening for messages came from View or irc client
pub async fn listen(&mut self) {
loop {
let mut line = String::new();
futures::select! {
// Process msg from View
msg = self.subscription.receive().fuse() => {
if let Err(e) = self.process_msg(&msg).await {
error!("[CLIENT {}] Process msg: {}", self.address, e);
break
}
}
// Process msg from IRC client
err = self.read_stream.read_line(&mut line).fuse() => {
if let Err(e) = err {
error!("[CLIENT {}] Read line error: {}", self.address, e);
break
}
if let Err(e) = self.process_line(line).await {
error!("[CLIENT {}] Process line failed: {}", self.address, e);
break
}
}
}
}
warn!("[CLIENT {}] Close connection", self.address);
self.subscription.unsubscribe().await;
}
pub async fn process_msg(&mut self, msg: &PrivMsgEvent) -> Result<()> {
info!("[CLIENT {}] msg from View: {:?}", self.address, msg.to_string());
let mut msg = msg.clone();
let mut contact = String::new();
decrypt_target(
&mut contact,
&mut msg,
self.irc_config.configured_chans.clone(),
self.irc_config.configured_contacts.clone(),
);
if msg.target.starts_with('#') {
// Try to potentially decrypt the incoming message.
if !self.irc_config.configured_chans.contains_key(&msg.target) {
return Ok(())
}
let chan_info = self.irc_config.configured_chans.get_mut(&msg.target).unwrap();
if !chan_info.joined {
return Ok(())
}
if let Some(salt_box) = &chan_info.salt_box {
decrypt_privmsg(salt_box, &mut msg);
info!(
"[CLIENT {}] Decrypted received message: {:?}",
self.address,
msg.to_string()
);
}
// add the nickname to the channel's names
if !chan_info.names.contains(&msg.nick) {
chan_info.names.push(msg.nick.clone());
}
self.reply(&msg.to_string()).await?;
} else if self.irc_config.is_cap_end && self.irc_config.is_nick_init {
if !self.irc_config.configured_contacts.contains_key(&contact) {
return Ok(())
}
let contact_info = self.irc_config.configured_contacts.get(&contact).unwrap();
if let Some(salt_box) = &contact_info.salt_box {
decrypt_privmsg(salt_box, &mut msg);
// This is for /query
msg.nick = contact;
info!(
"[CLIENT {}] Decrypted received message: {:?}",
self.address,
msg.to_string()
);
}
self.reply(&msg.to_string()).await?;
}
Ok(())
}
pub async fn process_line(&mut self, line: String) -> Result<()> {
let irc_msg = match clean_input_line(line) {
Ok(msg) => msg,
Err(e) => {
warn!("[CLIENT {}] Connection error: {}", self.address, e);
return Err(Error::ChannelStopped)
}
};
info!("[CLIENT {}] Process msg: {}", self.address, irc_msg);
if let Err(e) = self.update(irc_msg).await {
warn!("[CLIENT {}] Connection error: {}", self.address, e);
return Err(Error::ChannelStopped)
}
Ok(())
}
async fn update(&mut self, line: String) -> Result<()> {
if line.len() > settings::MAXIMUM_LENGTH_OF_MESSAGE {
return Err(Error::MalformedPacket)
}
if self.irc_config.password.is_empty() {
self.irc_config.is_pass_init = true
}
let (command, value) = parse_line(&line)?;
let (command, value) = (command.as_str(), value.as_str());
match command {
"PASS" => self.on_receive_pass(value).await?,
"USER" => self.on_receive_user().await?,
"NAMES" => self.on_receive_names(value.split(',').map(String::from).collect()).await?,
"NICK" => self.on_receive_nick(value).await?,
"JOIN" => self.on_receive_join(value.split(',').map(String::from).collect()).await?,
"PART" => self.on_receive_part(value.split(',').map(String::from).collect()).await?,
"TOPIC" => self.on_receive_topic(&line, value).await?,
"PING" => self.on_ping(value).await?,
"PRIVMSG" => self.on_receive_privmsg(&line, value).await?,
"CAP" => self.on_receive_cap(&line, &value.to_uppercase()).await?,
"QUIT" => self.on_quit()?,
_ => warn!("[CLIENT {}] Unimplemented `{}` command", self.address, command),
}
self.registre().await?;
Ok(())
}
async fn registre(&mut self) -> Result<()> {
if !self.irc_config.is_registered &&
self.irc_config.is_cap_end &&
self.irc_config.is_nick_init &&
self.irc_config.is_user_init
{
debug!("Initializing peer connection");
let register_reply =
format!(":darkfi 001 {} :Let there be dark\r\n", self.irc_config.nickname);
self.reply(&register_reply).await?;
self.irc_config.is_registered = true;
// join all channels
self.on_receive_join(self.irc_config.auto_channels.clone()).await?;
self.on_receive_join(self.irc_config.configured_chans.keys().cloned().collect())
.await?;
if *self.irc_config.capabilities.get("no-history").unwrap() {
return Ok(())
}
}
Ok(())
}
async fn reply(&mut self, message: &str) -> Result<()> {
self.write_stream.write_all(message.as_bytes()).await?;
debug!("Sent {}", message.trim_end());
Ok(())
}
fn on_quit(&self) -> Result<()> {
// Close the connection
Err(Error::NetworkServiceStopped)
}
async fn on_receive_user(&mut self) -> Result<()> {
// We can stuff any extra things like public keys in here.
// Ignore it for now.
if self.irc_config.is_pass_init {
self.irc_config.is_user_init = true;
} else {
// Close the connection
warn!("[CLIENT {}] Password is required", self.address);
return self.on_quit()
}
Ok(())
}
async fn on_receive_pass(&mut self, password: &str) -> Result<()> {
if self.irc_config.password == password {
self.irc_config.is_pass_init = true
} else {
// Close the connection
warn!("[CLIENT {}] Password is not correct!", self.address);
return self.on_quit()
}
Ok(())
}
async fn on_receive_nick(&mut self, nickname: &str) -> Result<()> {
if nickname.len() > settings::MAXIMUM_LENGTH_OF_NICKNAME {
return Ok(())
}
self.irc_config.is_nick_init = true;
let old_nick = std::mem::replace(&mut self.irc_config.nickname, nickname.to_string());
let nick_reply =
format!(":{}!anon@dark.fi NICK {}\r\n", old_nick, self.irc_config.nickname);
self.reply(&nick_reply).await
}
async fn on_receive_part(&mut self, channels: Vec<String>) -> Result<()> {
for chan in channels.iter() {
let part_reply =
format!(":{}!anon@dark.fi PART {}\r\n", self.irc_config.nickname, chan);
self.reply(&part_reply).await?;
if self.irc_config.configured_chans.contains_key(chan) {
let chan_info = self.irc_config.configured_chans.get_mut(chan).unwrap();
chan_info.joined = false;
}
}
Ok(())
}
async fn on_receive_topic(&mut self, line: &str, channel: &str) -> Result<()> {
if let Some(substr_idx) = line.find(':') {
// Client is setting the topic
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let topic = &line[substr_idx + 1..];
let chan_info = self.irc_config.configured_chans.get_mut(channel).unwrap();
chan_info.topic = Some(topic.to_string());
let topic_reply = format!(
":{}!anon@dark.fi TOPIC {} :{}\r\n",
self.irc_config.nickname, channel, topic
);
self.reply(&topic_reply).await?;
} else {
// Client is asking or the topic
let chan_info = self.irc_config.configured_chans.get(channel).unwrap();
let topic_reply = if let Some(topic) = &chan_info.topic {
format!(
"{} {} {} :{}\r\n",
RPL::Topic as u32,
self.irc_config.nickname,
channel,
topic
)
} else {
const TOPIC: &str = "No topic is set";
format!(
"{} {} {} :{}\r\n",
RPL::NoTopic as u32,
self.irc_config.nickname,
channel,
TOPIC
)
};
self.reply(&topic_reply).await?;
}
Ok(())
}
async fn on_ping(&mut self, value: &str) -> Result<()> {
let pong = format!("PONG {}\r\n", value);
self.reply(&pong).await
}
async fn on_receive_cap(&mut self, line: &str, subcommand: &str) -> Result<()> {
self.irc_config.is_cap_end = false;
let capabilities_keys: Vec<String> = self.irc_config.capabilities.keys().cloned().collect();
match subcommand {
"LS" => {
let cap_ls_reply = format!(
":{}!anon@dark.fi CAP * LS :{}\r\n",
self.irc_config.nickname,
capabilities_keys.join(" ")
);
self.reply(&cap_ls_reply).await?;
}
"REQ" => {
let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?;
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let cap: Vec<&str> = line[substr_idx + 1..].split(' ').collect();
let mut ack_list = vec![];
let mut nak_list = vec![];
for c in cap {
if self.irc_config.capabilities.contains_key(c) {
self.irc_config.capabilities.insert(c.to_string(), true);
ack_list.push(c);
} else {
nak_list.push(c);
}
}
let cap_ack_reply = format!(
":{}!anon@dark.fi CAP * ACK :{}\r\n",
self.irc_config.nickname,
ack_list.join(" ")
);
let cap_nak_reply = format!(
":{}!anon@dark.fi CAP * NAK :{}\r\n",
self.irc_config.nickname,
nak_list.join(" ")
);
self.reply(&cap_ack_reply).await?;
self.reply(&cap_nak_reply).await?;
}
"LIST" => {
let enabled_capabilities: Vec<String> = self
.irc_config
.capabilities
.clone()
.into_iter()
.filter(|(_, v)| *v)
.map(|(k, _)| k)
.collect();
let cap_list_reply = format!(
":{}!anon@dark.fi CAP * LIST :{}\r\n",
self.irc_config.nickname,
enabled_capabilities.join(" ")
);
self.reply(&cap_list_reply).await?;
}
"END" => {
self.irc_config.is_cap_end = true;
}
_ => {}
}
Ok(())
}
async fn on_receive_names(&mut self, channels: Vec<String>) -> Result<()> {
for chan in channels.iter() {
if !chan.starts_with('#') {
continue
}
if self.irc_config.configured_chans.contains_key(chan) {
let chan_info = self.irc_config.configured_chans.get(chan).unwrap();
if chan_info.names.is_empty() {
return Ok(())
}
let names_reply = format!(
":{}!anon@dark.fi {} = {} : {}\r\n",
self.irc_config.nickname,
RPL::NameReply as u32,
chan,
chan_info.names.join(" ")
);
self.reply(&names_reply).await?;
let end_of_names = format!(
":DarkFi {:03} {} {} :End of NAMES list\r\n",
RPL::EndOfNames as u32,
self.irc_config.nickname,
chan
);
self.reply(&end_of_names).await?;
}
}
Ok(())
}
async fn on_receive_privmsg(&mut self, line: &str, target: &str) -> Result<()> {
let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?;
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let message = line[substr_idx + 1..].to_string();
info!("[CLIENT {}] (Plain) PRIVMSG {} :{}", self.address, target, message,);
let mut privmsg = PrivMsgEvent {
nick: self.irc_config.nickname.clone(),
target: target.to_string(),
msg: message,
};
if target.starts_with('#') {
if !self.irc_config.configured_chans.contains_key(target) {
return Ok(())
}
let channel_info = self.irc_config.configured_chans.get(target).unwrap();
if !channel_info.joined {
return Ok(())
}
if let Some(salt_box) = &channel_info.salt_box {
encrypt_privmsg(salt_box, &mut privmsg);
info!("[CLIENT {}] (Encrypted) PRIVMSG: {:?}", self.address, privmsg.to_string());
}
} else {
if !self.irc_config.configured_contacts.contains_key(target) {
return Ok(())
}
let contact_info = self.irc_config.configured_contacts.get(target).unwrap();
if let Some(salt_box) = &contact_info.salt_box {
encrypt_privmsg(salt_box, &mut privmsg);
info!("[CLIENT {}] (Encrypted) PRIVMSG: {:?}", self.address, privmsg.to_string());
}
}
// Notify the server
self.server_notifier.send((privmsg, self.subscription.get_id())).await?;
Ok(())
}
async fn on_receive_join(&mut self, channels: Vec<String>) -> Result<()> {
for chan in channels.iter() {
if !chan.starts_with('#') {
continue
}
if !self.irc_config.configured_chans.contains_key(chan) {
let mut chan_info = ChannelInfo::new()?;
chan_info.topic = Some("n/a".to_string());
self.irc_config.configured_chans.insert(chan.to_string(), chan_info);
}
let chan_info = self.irc_config.configured_chans.get_mut(chan).unwrap();
if chan_info.joined {
return Ok(())
}
chan_info.joined = true;
let topic =
if let Some(topic) = chan_info.topic.clone() { topic } else { "n/a".to_string() };
chan_info.topic = Some(topic.to_string());
{
let j = format!(":{}!anon@dark.fi JOIN {}\r\n", self.irc_config.nickname, chan);
let t = format!(":DarkFi TOPIC {} :{}\r\n", chan, topic);
self.reply(&j).await?;
self.reply(&t).await?;
}
}
Ok(())
}
}
//
// Helper functions
//
fn clean_input_line(mut line: String) -> Result<String> {
if line.is_empty() {
return Err(Error::ChannelStopped)
}
if line == "\n" || line == "\r\n" {
return Err(Error::ChannelStopped)
}
if &line[(line.len() - 2)..] == "\r\n" {
// Remove CRLF
line.pop();
line.pop();
} else if &line[(line.len() - 1)..] == "\n" {
line.pop();
} else {
return Err(Error::ChannelStopped)
}
Ok(line.clone())
}
fn parse_line(line: &str) -> Result<(String, String)> {
let mut tokens = line.split_ascii_whitespace();
// Commands can begin with :garbage but we will reject clients doing
// that for now to keep the protocol simple and focused.
let command = tokens.next().ok_or(Error::MalformedPacket)?.to_uppercase();
let value = tokens.next().ok_or(Error::MalformedPacket)?;
Ok((command, value.to_owned()))
}

239
bin/ircd2/src/irc/mod.rs Normal file
View File

@@ -0,0 +1,239 @@
use async_std::{net::TcpListener, sync::Arc};
use std::{fs::File, net::SocketAddr};
use async_executor::Executor;
use futures::{io::BufReader, AsyncRead, AsyncReadExt, AsyncWrite};
use futures_rustls::{rustls, TlsAcceptor};
use fxhash::FxHashMap;
use log::{error, info};
use darkfi::{
system::SubscriberPtr,
util::path::{expand_path, get_config_path},
Error, Result,
};
use crate::{
privmsg::PrivMsgEvent,
settings::{
parse_configured_channels, parse_configured_contacts, Args, ChannelInfo, ContactInfo,
CONFIG_FILE,
},
};
mod client;
pub use client::IrcClient;
#[derive(Clone)]
pub struct IrcConfig {
// init bool
pub is_nick_init: bool,
pub is_user_init: bool,
pub is_registered: bool,
pub is_cap_end: bool,
pub is_pass_init: bool,
// user config
pub nickname: String,
pub password: String,
pub capabilities: FxHashMap<String, bool>,
// channels and contacts
pub auto_channels: Vec<String>,
pub configured_chans: FxHashMap<String, ChannelInfo>,
pub configured_contacts: FxHashMap<String, ContactInfo>,
}
impl IrcConfig {
pub fn new(settings: &Args) -> Result<Self> {
let password = settings.password.as_ref().unwrap_or(&String::new()).clone();
let auto_channels = settings.autojoin.clone();
// Pick up channel settings from the TOML configuration
let cfg_path = get_config_path(settings.config.clone(), CONFIG_FILE)?;
let toml_contents = std::fs::read_to_string(cfg_path)?;
let configured_chans = parse_configured_channels(&toml_contents)?;
let configured_contacts = parse_configured_contacts(&toml_contents)?;
let mut capabilities = FxHashMap::default();
capabilities.insert("no-history".to_string(), false);
Ok(Self {
is_nick_init: false,
is_user_init: false,
is_registered: false,
is_cap_end: true,
is_pass_init: false,
nickname: "anon".to_string(),
password,
auto_channels,
configured_chans,
configured_contacts,
capabilities,
})
}
}
pub struct IrcServer {
settings: Args,
irc_config: IrcConfig,
clients_subscriptions: SubscriberPtr<PrivMsgEvent>,
}
impl IrcServer {
pub async fn new(
settings: Args,
clients_subscriptions: SubscriberPtr<PrivMsgEvent>,
) -> Result<Self> {
let irc_config = IrcConfig::new(&settings)?;
Ok(Self { settings, irc_config, clients_subscriptions })
}
pub async fn start(&self, executor: Arc<Executor<'_>>) -> Result<()> {
let (notifier, recv) = async_channel::unbounded();
// Listen to msgs from clients
executor.spawn(Self::listen_to_msgs(recv, self.clients_subscriptions.clone())).detach();
// Start listening for new connections
self.listen(notifier, executor.clone()).await?;
Ok(())
}
/// Start listening to msgs from irc clients
pub async fn listen_to_msgs(
recv: async_channel::Receiver<(PrivMsgEvent, u64)>,
clients_subscriptions: SubscriberPtr<PrivMsgEvent>,
) -> Result<()> {
loop {
let (msg, subscription_id) = recv.recv().await?;
// TODO Add to View to prevent duplicate msg, since a client may has already added the
// msg to its buffer
// Since this will be added to the View directly, other clients connected to irc
// server must get informed about this new msg
clients_subscriptions.notify_with_exclude(msg, &[subscription_id]).await;
// TODO broadcast to the p2p network
}
}
/// Start listening to new connections from irc clients
pub async fn listen(
&self,
notifier: async_channel::Sender<(PrivMsgEvent, u64)>,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let (listener, acceptor) = self.setup_listener().await?;
info!("[IRC SERVER] listening on {}", self.settings.irc_listen);
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("[IRC SERVER] Failed accepting new connections: {}", e);
continue
}
};
let result = if let Some(acceptor) = acceptor.clone() {
// TLS connection
let stream = match acceptor.accept(stream).await {
Ok(s) => s,
Err(e) => {
error!("[IRC SERVER] Failed accepting TLS connection: {}", e);
continue
}
};
self.process_connection(stream, peer_addr, notifier.clone(), executor.clone()).await
} else {
// TCP connection
self.process_connection(stream, peer_addr, notifier.clone(), executor.clone()).await
};
if let Err(e) = result {
error!("[IRC SERVER] Failed processing connection {}: {}", peer_addr, e);
continue
};
info!("[IRC SERVER] Accept new connection: {}", peer_addr);
}
}
/// On every new connection create new IrcClient
async fn process_connection<C: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
&self,
stream: C,
peer_addr: SocketAddr,
notifier: async_channel::Sender<(PrivMsgEvent, u64)>,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let (reader, writer) = stream.split();
let reader = BufReader::new(reader);
// Subscription for the new client
let client_subscription = self.clients_subscriptions.clone().subscribe().await;
// New irc client
let mut client = IrcClient::new(
writer,
reader,
peer_addr,
self.irc_config.clone(),
notifier,
client_subscription,
);
// Start listening and detach
executor
.spawn(async move {
client.listen().await;
})
.detach();
Ok(())
}
/// Setup a listener for irc server
async fn setup_listener(&self) -> Result<(TcpListener, Option<TlsAcceptor>)> {
let listenaddr = self.settings.irc_listen.socket_addrs(|| None)?[0];
let listener = TcpListener::bind(listenaddr).await?;
let acceptor = match self.settings.irc_listen.scheme() {
"tls" => {
// openssl genpkey -algorithm ED25519 > example.com.key
// openssl req -new -out example.com.csr -key example.com.key
// openssl x509 -req -days 700 -in example.com.csr -signkey example.com.key -out example.com.crt
if self.settings.irc_tls_secret.is_none() || self.settings.irc_tls_cert.is_none() {
error!("[IRC SERVER] To listen using TLS, please set irc_tls_secret and irc_tls_cert in your config file.");
return Err(Error::KeypairPathNotFound)
}
let file =
File::open(expand_path(self.settings.irc_tls_secret.as_ref().unwrap())?)?;
let mut reader = std::io::BufReader::new(file);
let secret = &rustls_pemfile::pkcs8_private_keys(&mut reader)?[0];
let secret = rustls::PrivateKey(secret.clone());
let file = File::open(expand_path(self.settings.irc_tls_cert.as_ref().unwrap())?)?;
let mut reader = std::io::BufReader::new(file);
let certificate = &rustls_pemfile::certs(&mut reader)?[0];
let certificate = rustls::Certificate(certificate.clone());
let config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![certificate], secret)?;
let acceptor = TlsAcceptor::from(Arc::new(config));
Some(acceptor)
}
_ => None,
};
Ok((listener, acceptor))
}
}

147
bin/ircd2/src/main.rs Normal file
View File

@@ -0,0 +1,147 @@
use async_executor::Executor;
use async_std::sync::{Arc, Mutex};
use log::{info, warn};
use rand::rngs::OsRng;
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
async_daemonize, net,
rpc::server::listen_and_serve,
system::Subscriber,
util::{
cli::{get_log_config, get_log_level, spawn_config},
file::save_json_file,
path::{expand_path, get_config_path},
},
Result,
};
pub mod crypto;
pub mod events_queue;
pub mod irc;
pub mod model;
pub mod privmsg;
pub mod protocol_event;
pub mod rpc;
pub mod settings;
pub mod view;
use crate::{
crypto::KeyPair,
events_queue::EventsQueue,
irc::IrcServer,
model::Model,
protocol_event::{ProtocolEvent, Seen, UnreadEvents},
rpc::JsonRpcInterface,
settings::{Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS},
view::View,
};
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
////////////////////
// Generate new keypair and exit
////////////////////
if settings.gen_keypair {
let secret_key = crypto_box::SecretKey::generate(&mut OsRng);
let pub_key = secret_key.public_key();
let prv_encoded = bs58::encode(secret_key.as_bytes()).into_string();
let pub_encoded = bs58::encode(pub_key.as_bytes()).into_string();
let kp = KeyPair { private_key: prv_encoded, public_key: pub_encoded };
if settings.output.is_some() {
let datastore = expand_path(&settings.output.unwrap())?;
save_json_file(&datastore, &kp)?;
} else {
println!("Generated KeyPair:\n{}", kp);
}
return Ok(())
}
////////////////////
// Initialize the base structures
////////////////////
let events_queue = EventsQueue::new();
let model = Arc::new(Mutex::new(Model::new(events_queue.clone())));
let view = Arc::new(Mutex::new(View::new(events_queue)));
////////////////////
// P2p setup
////////////////////
// Buffers
let seen_event = Seen::new();
let seen_inv = Seen::new();
let unread_events = UnreadEvents::new();
// Check the version
let mut net_settings = settings.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
// New p2p
let p2p = net::P2p::new(net_settings.into()).await;
let p2p2 = p2p.clone();
// Register the protocol_event
let registry = p2p.protocol_registry();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let seen_event = seen_event.clone();
let seen_inv = seen_inv.clone();
let model = model.clone();
let unread_events = unread_events.clone();
async move {
ProtocolEvent::init(channel, p2p, model, seen_event, seen_inv, unread_events).await
}
})
.await;
// Start
p2p.clone().start(executor.clone()).await?;
// Run
let executor_cloned = executor.clone();
executor_cloned.spawn(p2p.clone().run(executor.clone())).detach();
////////////////////
// RPC interface setup
////////////////////
let rpc_listen_addr = settings.rpc_listen.clone();
let rpc_interface =
Arc::new(JsonRpcInterface { addr: rpc_listen_addr.clone(), p2p: p2p.clone() });
executor.spawn(async move { listen_and_serve(rpc_listen_addr, rpc_interface).await }).detach();
////////////////////
// IRC server
////////////////////
let clients_subscriptions = Subscriber::new();
// New irc server
let irc_server = IrcServer::new(settings.clone(), clients_subscriptions).await?;
// Start the irc server and detach it
let executor_cloned = executor.clone();
executor_cloned.spawn(async move { irc_server.start(executor.clone()).await }).detach();
////////////////////
// Wait for SIGINT
////////////////////
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc::set_handler(move || {
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
async_std::task::block_on(signal.send(())).unwrap();
})
.unwrap();
shutdown.recv().await?;
print!("\r");
info!("Caught termination signal, cleaning up and exiting...");
// stop p2p
p2p2.stop().await;
Ok(())
}

584
bin/ircd2/src/model.rs Normal file
View File

@@ -0,0 +1,584 @@
use async_std::sync::{Arc, Mutex};
use std::fmt;
use fxhash::FxHashMap;
use ripemd::{Digest, Ripemd256};
use darkfi::serial::{Encodable, SerialDecodable, SerialEncodable};
use crate::{
events_queue::EventsQueuePtr,
privmsg::{EventAction, PrivMsgEvent},
settings::get_current_time,
};
pub type EventId = [u8; 32];
const MAX_DEPTH: u32 = 300;
const MAX_HEIGHT: u32 = 300;
#[derive(SerialEncodable, SerialDecodable, Clone)]
pub struct Event {
previous_event_hash: EventId,
action: EventAction,
pub timestamp: u64,
#[skip_serialize]
pub read_confirms: u8,
}
impl Event {
pub fn hash(&self) -> EventId {
let mut bytes = Vec::new();
self.encode(&mut bytes).expect("serialize failed!");
let mut hasher = Ripemd256::new();
hasher.update(bytes);
let bytes = hasher.finalize().to_vec();
let mut result = [0u8; 32];
result.copy_from_slice(bytes.as_slice());
result
}
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.action {
EventAction::PrivMsg(event) => {
write!(f, "PRIVMSG {}: {} ({})", event.nick, event.msg, self.timestamp)
}
}
}
}
#[derive(Debug, Clone)]
struct EventNode {
// Only current root has this set to None
parent: Option<EventId>,
event: Event,
children: Vec<EventId>,
}
pub type ModelPtr = Arc<Mutex<Model>>;
pub struct Model {
// This is periodically updated so we discard old nodes
current_root: EventId,
orphans: FxHashMap<EventId, Event>,
event_map: FxHashMap<EventId, EventNode>,
events_queue: EventsQueuePtr,
}
impl Model {
pub fn new(events_queue: EventsQueuePtr) -> Self {
let root_node = EventNode {
parent: None,
event: Event {
previous_event_hash: [0u8; 32],
action: EventAction::PrivMsg(PrivMsgEvent {
nick: "root".to_string(),
msg: "Let there be dark".to_string(),
target: "root".to_string(),
}),
timestamp: get_current_time(),
read_confirms: 0,
},
children: Vec::new(),
};
let root_node_id = root_node.event.hash();
let mut event_map = FxHashMap::default();
event_map.insert(root_node_id.clone(), root_node);
Self { current_root: root_node_id, orphans: FxHashMap::default(), event_map, events_queue }
}
pub fn add(&mut self, event: Event) {
self.orphans.insert(event.hash(), event);
self.reorganize();
}
pub fn is_orphan(&self, event: &Event) -> bool {
!self.event_map.contains_key(&event.previous_event_hash)
}
pub fn find_leaves(&self) -> Vec<EventId> {
// collect the leaves in the tree
let mut leaves = vec![];
for (event_hash, node) in self.event_map.iter() {
// check if the node is a leaf
if node.children.is_empty() {
leaves.push(event_hash.clone());
}
}
leaves
}
pub fn get_event(&self, event: &EventId) -> Option<Event> {
self.event_map.get(event).map(|en| en.event.clone())
}
pub fn get_event_children(&self, event: &EventId) -> Vec<Event> {
let mut children = vec![];
if let Some(ev) = self.event_map.get(event) {
for child in ev.children.iter() {
let child = self.event_map.get(child).unwrap();
children.push(child.event.clone());
}
}
children
}
fn reorganize(&mut self) {
for (_, orphan) in std::mem::take(&mut self.orphans) {
if self.is_orphan(&orphan) {
// TODO should we remove orphan if it's too old
continue
}
let prev_event = orphan.previous_event_hash.clone();
let node =
EventNode { parent: Some(prev_event), event: orphan.clone(), children: Vec::new() };
let node_hash = node.event.hash();
let parent = self.event_map.get_mut(&prev_event).unwrap();
parent.children.push(node_hash);
self.event_map.insert(node_hash, node);
// TODO dispatch to events_queue
// to use events_queue here the add() and reorganize() functions should change to async
// clean up the tree from old eventnodes
self.prune_chains();
self.update_root();
}
}
fn prune_chains(&mut self) {
let head = self.find_head();
let leaves = self.find_leaves();
// Reject events which attach to chains too low in the chain
// At some point we ignore all events from old branches
for leaf in leaves {
// skip the head event
if leaf == head {
continue
}
let depth = self.diff_depth(leaf.clone(), head);
if depth > MAX_DEPTH {
self.remove_node(leaf);
}
}
}
fn update_root(&mut self) {
let head = self.find_head();
let leaves = self.find_leaves();
// find the common ancestor for each leaf and the head event
let mut ancestors = vec![];
for leaf in leaves {
if leaf == head {
continue
}
let ancestor = self.find_ancestor(leaf.clone(), head);
ancestors.push(ancestor);
}
// find the highest ancestor
let highest_ancestor = ancestors.iter().max_by(|&a, &b| {
self.find_depth(a.clone(), &head).cmp(&self.find_depth(b.clone(), &head))
});
// set the new root
if let Some(ancestor) = highest_ancestor {
// the ancestor must have at least height > MAX_HEIGHT
let ancestor_height = self.find_height(&self.current_root, ancestor).unwrap();
if ancestor_height < MAX_HEIGHT {
return
}
// removing the parents of the new root node
let mut root = self.event_map.get(&self.current_root).unwrap();
loop {
let root_hash = root.event.hash();
if &root_hash == ancestor {
break
}
let root_childs = &root.children;
assert_eq!(root_childs.len(), 1);
let child = root_childs.get(0).unwrap().clone();
self.event_map.remove(&root_hash);
root = self.event_map.get(&child).unwrap();
}
self.current_root = ancestor.clone();
}
}
fn remove_node(&mut self, mut event_id: EventId) {
loop {
if !self.event_map.contains_key(&event_id) {
break
}
let node = self.event_map.get(&event_id).unwrap().clone();
self.event_map.remove(&event_id);
let parent = self.event_map.get_mut(&node.parent.unwrap()).unwrap();
let index = parent.children.iter().position(|&n| n == event_id).unwrap();
parent.children.remove(index);
if !parent.children.is_empty() {
break
}
event_id = parent.event.hash();
}
}
// find_head
// -> recursively call itself
// -> + 1 for every recursion, return self if no children
// -> select max from returned values
// Gets the lead node with the maximal number of events counting from root
fn find_head(&self) -> EventId {
self.find_longest_chain(&self.current_root, 0).0
}
fn find_longest_chain(&self, parent_node: &EventId, i: u32) -> (EventId, u32) {
let children = &self.event_map.get(parent_node).unwrap().children;
if children.is_empty() {
return (parent_node.clone(), i)
}
let mut current_max = 0;
let mut current_node = None;
for node in children.iter() {
let (grandchild_node, grandchild_i) = self.find_longest_chain(node, i + 1);
if grandchild_i > current_max {
current_max = grandchild_i;
current_node = Some(grandchild_node);
} else if grandchild_i == current_max {
// Break ties using the timestamp
let grandchild_node_timestamp =
self.event_map.get(&grandchild_node).unwrap().event.timestamp;
let current_node_timestamp =
self.event_map.get(&current_node.unwrap()).unwrap().event.timestamp;
if grandchild_node_timestamp > current_node_timestamp {
current_max = grandchild_i;
current_node = Some(grandchild_node);
}
}
}
assert_ne!(current_max, 0);
(current_node.expect("internal logic error"), current_max)
}
fn find_depth(&self, mut node: EventId, ancestor_id: &EventId) -> u32 {
let mut depth = 0;
while &node != ancestor_id {
depth += 1;
if let Some(parent) = self.event_map.get(&node).unwrap().parent.clone() {
node = parent
} else {
break
}
}
depth
}
fn find_height(&self, node: &EventId, child_id: &EventId) -> Option<u32> {
let mut height = 0;
if node == child_id {
return Some(height)
}
height += 1;
let children = &self.event_map.get(node).unwrap().children;
if children.is_empty() {
return None
}
for child in children.iter() {
if let Some(h) = self.find_height(child, child_id) {
return Some(height + h)
}
}
None
}
fn find_ancestor(&self, mut node_a: EventId, mut node_b: EventId) -> EventId {
// node_a is a child of node_b
let is_child = node_b == self.event_map.get(&node_a).unwrap().parent.unwrap();
if is_child {
return node_b.clone()
}
while node_a != node_b {
let node_a_parent = self.event_map.get(&node_a).unwrap().parent.unwrap();
let node_b_parent = self.event_map.get(&node_b).unwrap().parent.unwrap();
if node_a_parent == self.current_root || node_b_parent == self.current_root {
return self.current_root
}
node_a = node_a_parent;
node_b = node_b_parent;
}
node_a.clone()
}
fn diff_depth(&self, node_a: EventId, node_b: EventId) -> u32 {
let ancestor = self.find_ancestor(node_a, node_b);
let node_a_depth = self.find_depth(node_a, &ancestor);
let node_b_depth = self.find_depth(node_b, &ancestor);
(node_b_depth + 1) - node_a_depth
}
fn _debug(&self) {
for (event_id, event_node) in &self.event_map {
let depth = self.find_depth(event_id.clone(), &self.current_root);
println!("{}: {:?} [depth={}]", hex::encode(&event_id), event_node.event, depth);
}
println!("root: {}", hex::encode(&self.current_root));
println!("head: {}", hex::encode(&self.find_head()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events_queue::EventsQueue;
fn create_message(
previous_event_hash: EventId,
nick: &str,
msg: &str,
timestamp: u64,
) -> Event {
Event {
previous_event_hash,
action: EventAction::PrivMsg(PrivMsgEvent {
nick: nick.to_string(),
msg: msg.to_string(),
target: "".to_string(),
}),
timestamp,
read_confirms: 4,
}
}
#[test]
fn test_update_root() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with MAX_HEIGHT events
let mut id1 = root_id;
for x in 0..MAX_HEIGHT {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Fill this node with MAX_HEIGHT + 10 events
let mut id2 = root_id;
for x in 0..(MAX_HEIGHT + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
// Fill id2 node with MAX_HEIGHT / 2
let mut id3 = id2;
for x in (MAX_HEIGHT + 10)..(MAX_HEIGHT * 2) {
let timestamp = get_current_time() + 1;
let node =
create_message(id3, &format!("chain 2 branch 1 msg {}", x), "message", timestamp);
id3 = node.hash();
model.add(node);
}
// Fill id2 node with 9 events
let mut id4 = id2;
for x in (MAX_HEIGHT + 10)..(MAX_HEIGHT * 2 + 30) {
let timestamp = get_current_time() + 1;
let node =
create_message(id4, &format!("chain 2 branch 2 msg {}", x), "message", timestamp);
id4 = node.hash();
model.add(node);
}
assert_eq!(model.find_height(&model.current_root, &id2).unwrap(), 0);
assert_eq!(model.find_height(&model.current_root, &id3).unwrap(), (MAX_HEIGHT - 10));
assert_eq!(model.find_height(&model.current_root, &id4).unwrap(), (MAX_HEIGHT + 20));
assert_eq!(model.current_root, id2);
}
#[test]
fn test_find_height() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with 8 events
let mut id1 = root_id;
for x in 0..8 {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Fill this node with 14 events
let mut id2 = root_id;
for x in 0..14 {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_height(&model.current_root, &id1).unwrap(), 8);
assert_eq!(model.find_height(&model.current_root, &id2).unwrap(), 14);
}
#[test]
fn test_prune_chains() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with 3 events
let mut event_node_1_ids = vec![];
let mut id1 = root_id;
for x in 0..3 {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
event_node_1_ids.push(id1);
}
// event_node 2
// Start from the root_id and fill the node with 14 events
// All the events from event_node_1 should get removed from the tree
let mut id2 = root_id;
for x in 0..(MAX_DEPTH + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id2);
for id in event_node_1_ids {
assert!(!model.event_map.contains_key(&id));
}
assert_eq!(model.event_map.len(), (MAX_DEPTH + 11) as usize);
}
#[test]
fn test_diff_depth() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with (MAX_DEPTH / 2) events
let mut id1 = root_id;
for x in 0..(MAX_DEPTH / 2) {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Start from the root_id and fill the node with (MAX_DEPTH + 10) events
// all the events must be added since the depth between id1
// and the last head is less than MAX_DEPTH
let mut id2 = root_id;
for x in 0..(MAX_DEPTH + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id2);
// event_node 3
// This will start as new chain, but no events will be added
// since the last event's depth is MAX_DEPTH + 10
let mut id3 = root_id;
for x in 0..30 {
let timestamp = get_current_time() + 1;
let node = create_message(id3, &format!("chain 3 msg {}", x), "message", timestamp);
id3 = node.hash();
model.add(node);
// ensure events are not added
assert!(!model.event_map.contains_key(&id3));
}
assert_eq!(model.find_head(), id2);
// Add more events to the event_node 1
// At the end this chain must overtake the event_node 2
for x in (MAX_DEPTH / 2)..(MAX_DEPTH + 15) {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id1);
}
#[test]
fn test_event_hash() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
let timestamp = get_current_time() + 1;
let event = create_message(root_id, "msg", "message", timestamp);
let mut event2 = event.clone();
let event_hash = event.hash();
event2.read_confirms += 3;
let event2_hash = event2.hash();
assert_eq!(event2_hash, event_hash);
assert_ne!(event2.read_confirms, event.read_confirms);
}
}

44
bin/ircd2/src/privmsg.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::io;
use darkfi::serial::{Decodable, Encodable, ReadExt, SerialDecodable, SerialEncodable};
#[derive(SerialEncodable, SerialDecodable, Clone)]
pub struct PrivMsgEvent {
pub nick: String,
pub msg: String,
pub target: String,
}
#[derive(Clone)]
pub enum EventAction {
PrivMsg(PrivMsgEvent),
}
impl std::string::ToString for PrivMsgEvent {
fn to_string(&self) -> String {
format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", self.nick, self.target, self.msg)
}
}
impl Encodable for EventAction {
fn encode<S: io::Write>(&self, mut s: S) -> core::result::Result<usize, io::Error> {
match self {
Self::PrivMsg(event) => {
let mut len = 0;
len += 0u8.encode(&mut s)?;
len += event.encode(s)?;
Ok(len)
}
}
}
}
impl Decodable for EventAction {
fn decode<D: io::Read>(mut d: D) -> core::result::Result<Self, io::Error> {
let type_id = d.read_u8()?;
match type_id {
0 => Ok(Self::PrivMsg(PrivMsgEvent::decode(d)?)),
_ => Err(io::Error::new(io::ErrorKind::Other, "Bad type ID byte for Event")),
}
}
}

View File

@@ -0,0 +1,376 @@
use std::collections::VecDeque;
use async_executor::Executor;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fxhash::FxHashMap;
use log::debug;
use rand::{rngs::OsRng, RngCore};
use darkfi::{
net,
serial::{SerialDecodable, SerialEncodable},
util::async_util::sleep,
Result,
};
use crate::{
model::{Event, EventId, ModelPtr},
settings::get_current_time,
};
const UNREAD_EVENT_EXPIRE_TIME: u64 = 3600; // in seconds
const SIZE_OF_SEEN_BUFFER: usize = 65536;
const MAX_CONFIRM: u8 = 4;
#[derive(Clone)]
struct RingBuffer<T> {
pub items: VecDeque<T>,
}
impl<T: Eq + PartialEq + Clone> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let items = VecDeque::with_capacity(capacity);
Self { items }
}
pub fn push(&mut self, val: T) {
if self.items.len() == self.items.capacity() {
self.items.pop_front();
}
self.items.push_back(val);
}
pub fn contains(&self, val: &T) -> bool {
self.items.contains(val)
}
}
type InvId = u64;
#[derive(SerialEncodable, SerialDecodable, Clone, Debug, PartialEq, Eq, Hash)]
struct InvItem {
id: InvId,
hash: EventId,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct Inv {
invs: Vec<InvItem>,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct SyncEvent {
leaves: Vec<EventId>,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct GetData {
events: Vec<EventId>,
}
pub type SeenPtr<T> = Arc<Seen<T>>;
pub struct Seen<T> {
seen: Mutex<RingBuffer<T>>,
}
impl<T: Eq + PartialEq + Clone> Seen<T> {
pub fn new() -> SeenPtr<T> {
Arc::new(Self { seen: Mutex::new(RingBuffer::new(SIZE_OF_SEEN_BUFFER)) })
}
pub async fn push(&self, item: &T) -> bool {
let seen = &mut self.seen.lock().await;
if !seen.contains(item) {
seen.push(item.clone());
return true
}
false
}
}
pub type UnreadEventsPtr = Arc<Mutex<UnreadEvents>>;
pub struct UnreadEvents {
events: FxHashMap<EventId, Event>,
}
impl UnreadEvents {
pub fn new() -> UnreadEventsPtr {
Arc::new(Mutex::new(Self { events: FxHashMap::default() }))
}
fn contains(&self, key: &EventId) -> bool {
self.events.contains_key(key)
}
fn get(&self, key: &EventId) -> Option<Event> {
self.events.get(key).cloned()
}
// Increase the read_confirms for an event, if it has exceeded the MAX_CONFIRM
// then remove it from the hash_map and return Some(event), otherwise return None
fn inc_read_confirms(&mut self, key: &EventId) -> Option<Event> {
let mut result = None;
if let Some(event) = self.events.get_mut(key) {
event.read_confirms += 1;
if event.read_confirms >= MAX_CONFIRM {
result = Some(event.clone())
}
}
if result.is_some() {
self.events.remove(key);
}
result
}
fn insert(&mut self, event: &Event) {
// prune expired events
let mut prune_ids = vec![];
for (id, e) in self.events.iter() {
if e.timestamp + (UNREAD_EVENT_EXPIRE_TIME * 1000) < get_current_time() {
prune_ids.push(id.clone());
}
}
for id in prune_ids {
self.events.remove(&id);
}
self.events.insert(event.hash().clone(), event.clone());
}
}
pub struct ProtocolEvent {
jobsman: net::ProtocolJobsManagerPtr,
event_sub: net::MessageSubscription<Event>,
inv_sub: net::MessageSubscription<Inv>,
getdata_sub: net::MessageSubscription<GetData>,
syncevent_sub: net::MessageSubscription<SyncEvent>,
p2p: net::P2pPtr,
channel: net::ChannelPtr,
model: ModelPtr,
seen_event: SeenPtr<EventId>,
seen_inv: SeenPtr<InvId>,
unread_events: UnreadEventsPtr,
}
impl ProtocolEvent {
pub async fn init(
channel: net::ChannelPtr,
p2p: net::P2pPtr,
model: ModelPtr,
seen_event: SeenPtr<EventId>,
seen_inv: SeenPtr<InvId>,
unread_events: UnreadEventsPtr,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Event>().await;
message_subsytem.add_dispatch::<Inv>().await;
message_subsytem.add_dispatch::<GetData>().await;
message_subsytem.add_dispatch::<SyncEvent>().await;
let event_sub =
channel.clone().subscribe_msg::<Event>().await.expect("Missing Event dispatcher!");
let inv_sub = channel.subscribe_msg::<Inv>().await.expect("Missing Inv dispatcher!");
let getdata_sub =
channel.clone().subscribe_msg::<GetData>().await.expect("Missing GetData dispatcher!");
let syncevent_sub = channel
.clone()
.subscribe_msg::<SyncEvent>()
.await
.expect("Missing SyncEvent dispatcher!");
Arc::new(Self {
jobsman: net::ProtocolJobsManager::new("ProtocolEvent", channel.clone()),
event_sub,
inv_sub,
getdata_sub,
syncevent_sub,
p2p,
channel,
model,
seen_event,
seen_inv,
unread_events,
})
}
async fn handle_receive_event(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_event() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let event = self.event_sub.receive().await?;
let mut event = (*event).to_owned();
if !self.seen_event.push(&event.hash()).await {
continue
}
event.read_confirms += 1;
if event.read_confirms >= MAX_CONFIRM {
self.new_event(&event).await?;
} else {
self.unread_events.lock().await.insert(&event);
self.send_inv(&event).await?;
}
// Broadcast the msg
self.p2p.broadcast_with_exclude(event, &exclude_list).await?;
}
}
async fn handle_receive_inv(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_inv() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let inv = self.inv_sub.receive().await?;
let inv = (*inv).to_owned();
for inv in inv.invs.iter() {
if !self.seen_inv.push(&inv.id).await {
continue
}
{
let mut unread_events = self.unread_events.lock().await;
if !unread_events.contains(&inv.hash) {
self.send_getdata(vec![inv.hash]).await?;
} else if let Some(event) = unread_events.inc_read_confirms(&inv.hash) {
self.new_event(&event).await?;
}
}
}
// Broadcast the inv msg
self.p2p.broadcast_with_exclude(inv, &exclude_list).await?;
}
}
async fn handle_receive_getdata(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_getdata() [START]");
loop {
let getdata = self.getdata_sub.receive().await?;
let events = (*getdata).to_owned().events;
for event_id in events {
let unread_event = self.unread_events.lock().await.get(&event_id);
if let Some(event) = unread_event {
self.channel.send(event).await?;
continue
}
let model_event = self.model.lock().await.get_event(&event_id);
if let Some(event) = model_event {
self.channel.send(event).await?;
}
}
}
}
async fn handle_receive_syncevent(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_syncevent() [START]");
loop {
let syncevent = self.syncevent_sub.receive().await?;
let model = self.model.lock().await;
let leaves = model.find_leaves();
if leaves == syncevent.leaves {
continue
}
for leaf in syncevent.leaves.iter() {
if leaves.contains(leaf) {
continue
}
let children = model.get_event_children(leaf);
for child in children {
self.channel.send(child).await?;
}
}
}
}
// every 2 seconds send a SyncEvent msg
async fn send_sync_hash_loop(self: Arc<Self>) -> Result<()> {
loop {
sleep(2).await;
let leaves = self.model.lock().await.find_leaves();
self.channel.send(SyncEvent { leaves }).await?;
}
}
async fn new_event(&self, event: &Event) -> Result<()> {
let mut model = self.model.lock().await;
if model.is_orphan(event) {
self.send_getdata(vec![event.hash()]).await?;
} else {
model.add(event.clone());
}
Ok(())
}
async fn send_inv(&self, event: &Event) -> Result<()> {
let id = OsRng.next_u64();
self.p2p.broadcast(Inv { invs: vec![InvItem { id, hash: event.hash() }] }).await?;
Ok(())
}
async fn send_getdata(&self, events: Vec<EventId>) -> Result<()> {
self.channel.send(GetData { events }).await?;
Ok(())
}
}
#[async_trait]
impl net::ProtocolBase for ProtocolEvent {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_event(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_inv(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_getdata(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_syncevent(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().send_sync_hash_loop(), executor.clone()).await;
debug!(target: "ircd", "ProtocolEvent::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolEvent"
}
}
impl net::Message for Event {
fn name() -> &'static str {
"event"
}
}
impl net::Message for Inv {
fn name() -> &'static str {
"inv"
}
}
impl net::Message for SyncEvent {
fn name() -> &'static str {
"syncevent"
}
}
impl net::Message for GetData {
fn name() -> &'static str {
"getdata"
}
}

53
bin/ircd2/src/rpc.rs Normal file
View File

@@ -0,0 +1,53 @@
use async_trait::async_trait;
use log::debug;
use serde_json::{json, Value};
use url::Url;
use darkfi::{
net,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
server::RequestHandler,
},
};
pub struct JsonRpcInterface {
pub addr: Url,
pub p2p: net::P2pPtr,
}
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if req.params.as_array().is_none() {
return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into()
}
debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap());
match req.method.as_str() {
Some("ping") => self.pong(req.id, req.params).await,
Some("get_info") => self.get_info(req.id, req.params).await,
Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
}
impl JsonRpcInterface {
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
async fn pong(&self, id: Value, _params: Value) -> JsonResult {
JsonResponse::new(json!("pong"), id).into()
}
// RPCAPI:
// Retrieves P2P network information.
// --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
let resp = self.p2p.get_info().await;
JsonResponse::new(resp, id).into()
}
}

310
bin/ircd2/src/settings.rs Normal file
View File

@@ -0,0 +1,310 @@
use crypto_box::SalsaBox;
use fxhash::FxHashMap;
use log::{info, warn};
use serde::Deserialize;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use toml::Value;
use url::Url;
use darkfi::{net::settings::SettingsOpt, Result};
// Location for config file
pub const CONFIG_FILE: &str = "ircd_config.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../ircd_config.toml");
// Msg config
pub const MAXIMUM_LENGTH_OF_MESSAGE: usize = 1024;
pub const MAXIMUM_LENGTH_OF_NICKNAME: usize = 32;
// IRC Client
pub enum RPL {
NoTopic = 331,
Topic = 332,
NameReply = 353,
EndOfNames = 366,
}
/// ircd cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "ircd")]
pub struct Args {
/// Sets a custom config file
#[structopt(long)]
pub config: Option<String>,
/// JSON-RPC listen URL
#[structopt(long = "rpc", default_value = "tcp://127.0.0.1:25550")]
pub rpc_listen: Url,
/// IRC listen URL
#[structopt(long = "irc", default_value = "tcp://127.0.0.1:6667")]
pub irc_listen: Url,
/// Optional TLS certificate file path if `irc_listen` uses TLS
pub irc_tls_cert: Option<String>,
/// Optional TLS certificate key file path if `irc_listen` uses TLS
pub irc_tls_secret: Option<String>,
/// Generate a new NaCl keypair and exit
#[structopt(long)]
pub gen_keypair: bool,
/// Path to save keypair in
#[structopt(short)]
pub output: Option<String>,
/// Autojoin channels
#[structopt(long)]
pub autojoin: Vec<String>,
/// Password
#[structopt(long)]
pub password: Option<String>,
#[structopt(flatten)]
pub net: SettingsOpt,
/// Increase verbosity
#[structopt(short, parse(from_occurrences))]
pub verbose: u8,
}
#[derive(Clone)]
pub struct ContactInfo {
/// Optional NaCl box for the channel, used for {en,de}cryption.
pub salt_box: Option<SalsaBox>,
}
impl ContactInfo {
pub fn new() -> Result<Self> {
Ok(Self { salt_box: None })
}
}
/// This struct holds information about preconfigured channels.
/// In the TOML configuration file, we can configure channels as such:
/// ```toml
/// [channel."#dev"]
/// secret = "GvH4kno3kUu6dqPrZ8zjMhqxTUDZ2ev16EdprZiZJgj1"
/// topic = "DarkFi Development Channel"
/// ```
/// Having a secret will enable a NaCl box that is able to encrypt and
/// decrypt messages in this channel using this set shared secret.
/// The secret should be shared OOB, via a secure channel.
/// Having a topic set is useful if one wants to have a topic in the
/// configured channel. It is not shared with others, but it is useful
/// for personal reference.
#[derive(Clone)]
pub struct ChannelInfo {
/// Optional topic for the channel
pub topic: Option<String>,
/// Optional NaCl box for the channel, used for {en,de}cryption.
pub salt_box: Option<SalsaBox>,
/// Flag indicates whether the user has joined the channel or not
pub joined: bool,
/// All nicknames which are visible on the channel
pub names: Vec<String>,
}
impl ChannelInfo {
pub fn new() -> Result<Self> {
Ok(Self { topic: None, salt_box: None, joined: false, names: vec![] })
}
}
fn salt_box_from_shared_secret(s: &str) -> Result<SalsaBox> {
let bytes: [u8; 32] = bs58::decode(s).into_vec()?.try_into().unwrap();
let secret = crypto_box::SecretKey::from(bytes);
let public = secret.public_key();
Ok(SalsaBox::new(&public, &secret))
}
fn parse_priv_key(data: &str) -> Result<String> {
let mut pk = String::new();
let map = match toml::from_str(data)? {
Value::Table(m) => m,
_ => return Ok(pk),
};
if !map.contains_key("private_key") {
return Ok(pk)
}
if !map["private_key"].is_table() {
return Ok(pk)
}
let private_keys = map["private_key"].as_table().unwrap();
for prv_key in private_keys {
pk = prv_key.0.into();
}
info!("Found secret key in config, noted it down.");
Ok(pk)
}
/// Parse a TOML string for any configured contact list and return
/// a map containing said configurations.
///
/// ```toml
/// [contact."nick"]
/// contact_pubkey = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"
/// ```
pub fn parse_configured_contacts(data: &str) -> Result<FxHashMap<String, ContactInfo>> {
let mut ret = FxHashMap::default();
let map = match toml::from_str(data) {
Ok(Value::Table(m)) => m,
_ => {
warn!("Invalid TOML string passed as argument to parse_configured_contacts()");
return Ok(ret)
}
};
if !map.contains_key("contact") {
return Ok(ret)
}
if !map["contact"].is_table() {
warn!("TOML configuration contains a \"contact\" field, but it is not a table.");
return Ok(ret)
}
let contacts = map["contact"].as_table().unwrap();
// Our secret key for NaCl boxes.
let found_priv = match parse_priv_key(data) {
Ok(v) => v,
Err(_) => {
info!("Did not found private key in config, skipping contact configuration.");
return Ok(ret)
}
};
let bytes: [u8; 32] = match bs58::decode(found_priv).into_vec() {
Ok(v) => {
if v.len() != 32 {
warn!("Decoded base58 secret key string is not 32 bytes");
warn!("Skipping private contact configuration");
return Ok(ret)
}
v.try_into().unwrap()
}
Err(e) => {
warn!("Failed to decode base58 secret key from string: {}", e);
warn!("Skipping private contact configuration");
return Ok(ret)
}
};
let secret = crypto_box::SecretKey::from(bytes);
for cnt in contacts {
info!("Found configuration for contact {}", cnt.0);
let mut contact_info = ContactInfo::new()?;
if !cnt.1.is_table() {
warn!("Config for contact {} isn't a TOML table", cnt.0);
continue
}
let table = cnt.1.as_table().unwrap();
if table.is_empty() {
warn!("Configuration for contact {} is empty.", cnt.0);
continue
}
// Build the NaCl box
if !table.contains_key("contact_pubkey") || !table["contact_pubkey"].is_str() {
warn!("Contact {} doesn't have `contact_pubkey` set or is not a string.", cnt.0);
continue
}
let pub_str = table["contact_pubkey"].as_str().unwrap();
let bytes: [u8; 32] = match bs58::decode(pub_str).into_vec() {
Ok(v) => {
if v.len() != 32 {
warn!("Decoded base58 string is not 32 bytes");
continue
}
v.try_into().unwrap()
}
Err(e) => {
warn!("Failed to decode base58 pubkey from string: {}", e);
continue
}
};
let public = crypto_box::PublicKey::from(bytes);
contact_info.salt_box = Some(SalsaBox::new(&public, &secret));
ret.insert(cnt.0.to_string(), contact_info);
info!("Instantiated NaCl box for contact {}", cnt.0);
}
Ok(ret)
}
/// Parse a TOML string for any configured channels and return
/// a map containing said configurations.
///
/// ```toml
/// [channel."#memes"]
/// secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"
/// topic = "Dank Memes"
/// ```
pub fn parse_configured_channels(data: &str) -> Result<FxHashMap<String, ChannelInfo>> {
let mut ret = FxHashMap::default();
let map = match toml::from_str(data)? {
Value::Table(m) => m,
_ => return Ok(ret),
};
if !map.contains_key("channel") {
return Ok(ret)
}
if !map["channel"].is_table() {
return Ok(ret)
}
for chan in map["channel"].as_table().unwrap() {
info!("Found configuration for channel {}", chan.0);
let mut channel_info = ChannelInfo::new()?;
if chan.1.as_table().unwrap().contains_key("topic") {
let topic = chan.1["topic"].as_str().unwrap().to_string();
info!("Found topic for channel {}: {}", chan.0, topic);
channel_info.topic = Some(topic);
}
if chan.1.as_table().unwrap().contains_key("secret") {
// Build the NaCl box
if let Some(s) = chan.1["secret"].as_str() {
let salt_box = salt_box_from_shared_secret(s)?;
channel_info.salt_box = Some(salt_box);
info!("Instantiated NaCl box for channel {}", chan.0);
}
}
ret.insert(chan.0.to_string(), channel_info);
}
Ok(ret)
}
pub fn get_current_time() -> u64 {
let start = std::time::SystemTime::now();
start
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis()
.try_into()
.unwrap()
}

27
bin/ircd2/src/view.rs Normal file
View File

@@ -0,0 +1,27 @@
use fxhash::FxHashMap;
use darkfi::Result;
use crate::{
events_queue::EventsQueuePtr,
model::{Event, EventId},
};
pub struct View {
seen: FxHashMap<EventId, Event>,
events_queue: EventsQueuePtr,
}
impl View {
pub fn new(events_queue: EventsQueuePtr) -> Self {
Self { seen: FxHashMap::default(), events_queue }
}
pub async fn process(&mut self) -> Result<()> {
loop {
let new_event = self.events_queue.fetch().await?;
// TODO sort the events
self.seen.insert(new_event.hash(), new_event);
}
}
}