From e03904f435d9dbd5c437571c42fe91b73253ed4e Mon Sep 17 00:00:00 2001 From: aggstam Date: Thu, 28 Apr 2022 15:21:59 +0300 Subject: [PATCH 01/10] consensus/task/proposal.rs: leader broadcasting finalized blocks added --- bin/darkfid2/src/main.rs | 2 +- src/blockchain/blockstore.rs | 4 ++-- src/consensus/proto/protocol_sync.rs | 2 +- src/consensus/task/proposal.rs | 26 ++++++++++++++++++++++++-- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index 91cc19b14..2b871f24c 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -410,7 +410,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { .detach(); info!("Starting consensus protocol task"); - ex.spawn(proposal_task(consensus_p2p.unwrap(), state)).detach(); + ex.spawn(proposal_task(consensus_p2p.unwrap(), sync_p2p.unwrap(), state)).detach(); } else { info!("Not starting consensus P2P network"); } diff --git a/src/blockchain/blockstore.rs b/src/blockchain/blockstore.rs index e3bba4609..2f473568b 100644 --- a/src/blockchain/blockstore.rs +++ b/src/blockchain/blockstore.rs @@ -1,4 +1,4 @@ -use log::warn; +use log::debug; use sled::Batch; use crate::{ @@ -130,7 +130,7 @@ impl BlockOrderStore { ret.push(Some(hash)); } else { if strict { - warn!("BlockOrderStore::get() Slot {} not found", i); + debug!("BlockOrderStore::get() Slot {} not found", i); return Err(Error::SlotNotFound(*i)) } ret.push(None); diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 681410760..e45b09334 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -78,7 +78,7 @@ impl ProtocolSync { debug!("ProtocolSync::handle_receive_block() received block"); - // Node stores finalized flock, if it doesn't exist (checking by slot), + // Node stores finalized block, if it doesn't exist (checking by slot), // and removes its transactions from the unconfirmed_txs vector. // Consensus-mode enabled nodes have already performed these steps, // during proposal finalization. diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 963fa3969..0da1e7a79 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -10,7 +10,7 @@ use crate::{ }; /// async task used for participating in the consensus protocol -pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { +pub async fn proposal_task(p2p: net::P2pPtr, sync_p2p: net::P2pPtr, state: ValidatorStatePtr) { // Node waits just before the current or next epoch end, // so it can start syncing latest state. let mut seconds_until_next_epoch = state.read().await.next_epoch_start(); @@ -89,7 +89,29 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { let vote = v.unwrap(); let result = state.write().await.receive_vote(&vote); match result { - Ok(_) => info!(target: "consensus", "Vote saved successfully."), + Ok((_, to_broadcast)) => { + info!(target: "consensus", "Vote saved successfully."); + // Broadcast finalized blocks info, if any + match to_broadcast { + Some(blocks) => { + debug!("handle_receive_vote(): Broadcasting finalized blocks"); + for info in blocks { + let result = sync_p2p.broadcast(info).await; + match result { + Ok(()) => { + info!(target: "consensus", "Finalized block broadcasted successfully.") + } + Err(e) => { + error!(target: "consensus", "Failed broadcasting finalized block: {}", e) + } + } + } + } + None => { + debug!("handle_receive_vote(): No finalized blocks to broadcast"); + } + } + } Err(e) => { error!(target: "consensus", "Vote save failed: {}", e) } From 87acffb63ebe41ad275f59b32f52a848f8c66ea1 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Thu, 28 Apr 2022 18:42:35 +0300 Subject: [PATCH 02/10] ircd: create entry point socket for bots and plugins --- Cargo.lock | 4 +- bin/ircd/Cargo.toml | 4 +- bin/ircd/ircd_config.toml | 3 + bin/ircd/src/main.rs | 175 +++++++++++++++++++++++++++----------- bin/ircd/src/rpc.rs | 10 --- bin/ircd/src/server.rs | 25 +++--- bin/ircd/src/settings.rs | 3 + 7 files changed, 150 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d81e2ccc8..c1822d930 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2155,8 +2155,8 @@ dependencies = [ ] [[package]] -name = "ircd2" -version = "0.3.0" +name = "ircd" +version = "0.1.0" dependencies = [ "async-channel", "async-executor", diff --git a/bin/ircd/Cargo.toml b/bin/ircd/Cargo.toml index 95a2842d1..e8ada3f56 100644 --- a/bin/ircd/Cargo.toml +++ b/bin/ircd/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "ircd2" -version = "0.3.0" +name = "ircd" +version = "0.1.0" homepage = "https://dark.fi" description = "P2P IRC daemon" authors = ["darkfi "] diff --git a/bin/ircd/ircd_config.toml b/bin/ircd/ircd_config.toml index ce76ac486..cb198b897 100644 --- a/bin/ircd/ircd_config.toml +++ b/bin/ircd/ircd_config.toml @@ -4,6 +4,9 @@ ## IRC listen URL #irc_listen="127.0.0.1:11066" +## Plugin listen URL +#plugin_listen="127.0.0.1:11077" + ## Sets Datastore Path #datastore="~/.config/ircd" diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 83472280f..e8590aa01 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -4,15 +4,8 @@ use async_std::{ }; use std::net::SocketAddr; -use async_channel::Receiver; +use async_channel::{Receiver, Sender}; use async_executor::Executor; -use easy_parallel::Parallel; -use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; -use log::{debug, error, info, warn}; -use simplelog::{ColorChoice, TermLogger, TerminalMode}; -use smol::future; -use structopt_toml::StructOptToml; - use darkfi::{ async_daemonize, raft::Raft, @@ -23,6 +16,12 @@ use darkfi::{ }, Error, Result, }; +use easy_parallel::Parallel; +use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, FutureExt}; +use log::{debug, error, info, warn}; +use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use smol::future; +use structopt_toml::StructOptToml; pub mod privmsg; pub mod rpc; @@ -36,17 +35,19 @@ use crate::{ settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, }; -pub type SeenMsgId = Arc>>; +pub type SeenMsgIds = Arc>>; -async fn process_user_input( - mut line: String, - peer_addr: SocketAddr, - conn: &mut IrcServerConnection, - sender: async_channel::Sender, - seen_msg_id: SeenMsgId, -) -> Result<()> { +fn build_irc_msg(msg: &Privmsg) -> String { + debug!("ABOUT TO SEND: {:?}", msg); + let irc_msg = + format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", msg.nickname, msg.channel, msg.message,); + irc_msg +} + +fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { if line.is_empty() { - warn!("Received empty line from {}. Closing connection.", peer_addr); + warn!("Received empty line from {}. ", peer_addr); + warn!("Closing connection."); return Err(Error::ChannelStopped) } @@ -55,9 +56,17 @@ async fn process_user_input( line.pop(); line.pop(); - debug!("Received '{}' from {}", line, peer_addr); + Ok(line) +} - if let Err(e) = conn.update(line, sender, seen_msg_id).await { +async fn broadcast_msg( + irc_msg: String, + peer_addr: SocketAddr, + conn: &mut IrcServerConnection, +) -> Result<()> { + info!("Send msg to IRC server '{}' from {}", irc_msg, peer_addr); + + if let Err(e) = conn.update(irc_msg).await { warn!("Connection error: {} for {}", e, peer_addr); return Err(Error::ChannelStopped) } @@ -65,49 +74,82 @@ async fn process_user_input( Ok(()) } -async fn process( - receiver: Receiver, +async fn plugin_process( + recv_for_plugin: Receiver, + send_from_plugin: Sender, stream: TcpStream, - peer_addr: SocketAddr, - sender: async_channel::Sender, - seen_msg_id: SeenMsgId, ) -> Result<()> { - let (reader, writer) = stream.split(); - + let peer_addr = stream.peer_addr()?.clone(); + let (reader, mut writer) = stream.split(); let mut reader = BufReader::new(reader); - let mut conn = IrcServerConnection::new(writer); loop { let mut line = String::new(); futures::select! { - privmsg = receiver.recv().fuse() => { + irc_msg = recv_for_plugin.recv().fuse() => { + let irc_msg = irc_msg?; + info!("Plugin recv {}", irc_msg); + writer.write_all(irc_msg.as_bytes()).await?; + }, + err = reader.read_line(&mut line).fuse() => { + if let Err(e) = err { + warn!("Read line error: {}", e); + return Ok(()) + } + let irc_msg = clean_input(line, &peer_addr)?; + info!("Plugin send {}", irc_msg); + send_from_plugin.send(irc_msg).await?; + } + }; + } +} + +async fn process( + raft_receiver: Receiver, + stream: TcpStream, + peer_addr: SocketAddr, + raft_sender: Sender, + send_for_plugin: Sender, + recv_from_plugin: Receiver, + seen_msg_id: SeenMsgIds, +) -> Result<()> { + let (reader, writer) = stream.split(); + + let mut reader = BufReader::new(reader); + let mut conn = IrcServerConnection::new(writer, seen_msg_id.clone(), raft_sender); + + loop { + let mut line = String::new(); + futures::select! { + privmsg = raft_receiver.recv().fuse() => { + info!("Receive msg from raft"); let msg = privmsg?; let mut smi = seen_msg_id.lock().await; if smi.contains(&msg.id) { continue } - smi.push(msg.id); drop(smi); - debug!("ABOUT TO SEND: {:?}", msg); - let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", - msg.nickname, - msg.channel, - msg.message, - ); - + let irc_msg = build_irc_msg(&msg); conn.reply(&irc_msg).await?; + send_for_plugin.send(irc_msg).await?; } + irc_msg = recv_from_plugin.recv().fuse() => { + let irc_msg = irc_msg?; + info!("Receive msg from plugin"); + broadcast_msg(irc_msg, peer_addr, &mut conn).await?; + } err = reader.read_line(&mut line).fuse() => { if let Err(e) = err { warn!("Read line error. Closing stream for {}: {}", peer_addr, e); return Ok(()) } - - process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?; + info!("Receive msg from IRC server"); + let irc_msg = clean_input(line, &peer_addr)?; + broadcast_msg(irc_msg, peer_addr,&mut conn).await?; } }; } @@ -117,29 +159,25 @@ async_daemonize!(realmain); async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let listener = TcpListener::bind(settings.irc_listen).await?; let local_addr = listener.local_addr()?; - info!("Listening on {}", local_addr); + info!("IRC listening on {}", local_addr); - let datastore_path = expand_path(&settings.datastore)?; + let seen_msg_id: SeenMsgIds = Arc::new(Mutex::new(vec![])); - let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![])); - - let net_settings = settings.net; // //Raft // + let datastore_path = expand_path(&settings.datastore)?; + let net_settings = settings.net; let datastore_raft = datastore_path.join("ircd.db"); - let mut raft = Raft::::new(net_settings.inbound, datastore_raft)?; - let raft_sender = raft.get_broadcast(); - let commits = raft.get_commits(); + let raft_receiver = raft.get_commits(); // // RPC interface // let rpc_config = RpcServerConfig { socket_addr: settings.rpc_listen, - // TODO: Use net/transport: use_tls: false, identity_path: Default::default(), identity_pass: Default::default(), @@ -150,6 +188,9 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await }); + let (send_from_plugin, recv_from_plugin) = async_channel::unbounded::(); + let (send_for_plugin, recv_for_plugin) = async_channel::unbounded::(); + // // IRC instance // @@ -164,20 +205,55 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { } }; - info!("Accepted client: {}", peer_addr); + info!("IRC Accepted client: {}", peer_addr); executor_cloned .spawn(process( - commits.clone(), + raft_receiver.clone(), stream, peer_addr, raft_sender.clone(), + send_for_plugin.clone(), + recv_from_plugin.clone(), seen_msg_id.clone(), )) .detach(); } }); + // + // Plugin instance + // + let executor_cloned = executor.clone(); + let send_from_plugin_cloned = send_from_plugin.clone(); + let recv_for_plugin_cloned = recv_for_plugin.clone(); + let plugin_task: smol::Task> = executor.spawn(async move { + if settings.plugin_listen.is_none() { + return Ok(()) + } + let plugin_listener = TcpListener::bind(settings.plugin_listen.unwrap()).await?; + let plugin_local_addr = plugin_listener.local_addr()?; + info!("Plugin listening on {}", plugin_local_addr); + loop { + let (stream, peer_addr) = match plugin_listener.accept().await { + Ok((s, a)) => (s, a), + Err(e) => { + error!("Failed listening for connections: {}", e); + return Err(Error::ServiceStopped) + } + }; + + info!("Plugin Accepted client: {}", peer_addr); + executor_cloned + .spawn(plugin_process( + recv_for_plugin_cloned.clone(), + send_from_plugin_cloned.clone(), + stream, + )) + .detach(); + } + }); + let (signal, shutdown) = async_channel::bounded::<()>(1); ctrlc_async::set_async_handler(async move { warn!(target: "ircd", "ircd start Exit Signal"); @@ -185,6 +261,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { signal.send(()).await.unwrap(); rpc_task.cancel().await; irc_task.cancel().await; + plugin_task.cancel().await; }) .unwrap(); diff --git a/bin/ircd/src/rpc.rs b/bin/ircd/src/rpc.rs index a1fe2dd13..674033233 100644 --- a/bin/ircd/src/rpc.rs +++ b/bin/ircd/src/rpc.rs @@ -40,14 +40,4 @@ impl JsonRpcInterface { async fn pong(&self, id: Value, _params: Value) -> JsonResult { jsonrpc::response(json!("pong"), id).into() } - - // TODO - // RPCAPI: - // Retrieves P2P network information. - // --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42} - // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} - //async fn get_info(&self, id: Value, _params: Value) -> JsonResult { - // let resp = self.p2p.get_info().await; - // jsonrpc::response(resp, id).into() - //} } diff --git a/bin/ircd/src/server.rs b/bin/ircd/src/server.rs index 736ce44b2..5b0c5cc29 100644 --- a/bin/ircd/src/server.rs +++ b/bin/ircd/src/server.rs @@ -5,7 +5,7 @@ use rand::{rngs::OsRng, RngCore}; use darkfi::{Error, Result}; -use crate::privmsg::Privmsg; +use crate::{privmsg::Privmsg, SeenMsgIds}; pub struct IrcServerConnection { write_stream: WriteHalf, @@ -14,10 +14,16 @@ pub struct IrcServerConnection { is_registered: bool, nickname: String, _channels: Vec, + seen_msg_id: SeenMsgIds, + p2p_sender: async_channel::Sender, } impl IrcServerConnection { - pub fn new(write_stream: WriteHalf) -> Self { + pub fn new( + write_stream: WriteHalf, + seen_msg_id: SeenMsgIds, + p2p_sender: async_channel::Sender, + ) -> Self { Self { write_stream, is_nick_init: false, @@ -25,21 +31,18 @@ impl IrcServerConnection { is_registered: false, nickname: "".to_string(), _channels: vec![], + seen_msg_id, + p2p_sender, } } - pub async fn update( - &mut self, - line: String, - sender: async_channel::Sender, - seen_msg_id: crate::SeenMsgId, - ) -> Result<()> { + pub async fn update(&mut self, line: String) -> Result<()> { let mut tokens = line.split_ascii_whitespace(); // Commands can begin with :garbage but we will reject clients doing // that for now to keep the protocol simple and focused. let command = tokens.next().ok_or(Error::MalformedPacket)?; - debug!("Received command: {}", command); + info!("Received command: {}", command); match command { "USER" => { @@ -93,11 +96,11 @@ impl IrcServerConnection { message: message.to_string(), }; - let mut smi = seen_msg_id.lock().await; + let mut smi = self.seen_msg_id.lock().await; smi.push(random_id); drop(smi); - sender.send(protocol_msg).await?; + self.p2p_sender.send(protocol_msg).await?; } "QUIT" => { // Close the connection diff --git a/bin/ircd/src/settings.rs b/bin/ircd/src/settings.rs index a86d94474..9deaa2f8f 100644 --- a/bin/ircd/src/settings.rs +++ b/bin/ircd/src/settings.rs @@ -23,6 +23,9 @@ pub struct Args { /// IRC listen URL #[structopt(long = "irc", default_value = "127.0.0.1:11066")] pub irc_listen: SocketAddr, + /// Plugin listen URL + #[structopt(long)] + pub plugin_listen: Option, /// Sets Datastore Path #[structopt(long, default_value = "~/.config/ircd")] pub datastore: String, From 35c77562edaa3afbba27505c7f792903bce0c5f2 Mon Sep 17 00:00:00 2001 From: parazyd Date: Thu, 28 Apr 2022 20:18:22 +0200 Subject: [PATCH 03/10] contrib: Add script to sync all toml package versions to the root lib. --- contrib/update_pkg_versions.py | 42 ++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100755 contrib/update_pkg_versions.py diff --git a/contrib/update_pkg_versions.py b/contrib/update_pkg_versions.py new file mode 100755 index 000000000..a8529f45b --- /dev/null +++ b/contrib/update_pkg_versions.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +import subprocess + +from os import chdir +from subprocess import PIPE + +import tomlkit + + +def update_package_version(filename, version): + with open(filename) as f: + content = f.read() + + p = tomlkit.parse(content) + p["package"]["version"] = version + + with open(filename, "w") as f: + f.write(tomlkit.dumps(p)) + + +def main(): + toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], + capture_output=True) + toplevel = toplevel.stdout.decode().strip() + chdir(toplevel) + + with open("Cargo.toml") as f: + content = f.read() + + p = tomlkit.parse(content) + version = p["package"]["version"] + + find_output = subprocess.run( + ["find", ".", "-type", "f", "-name", "Cargo.toml"], stdout=PIPE) + files = [i.strip() for i in find_output.stdout.decode().split("\n")][:-1] + + for filename in files: + update_package_version(filename, version) + + +if __name__ == "__main__": + main() From 01724a544d8b5742ccdd479901eeae56565afaf8 Mon Sep 17 00:00:00 2001 From: parazyd Date: Thu, 28 Apr 2022 20:19:04 +0200 Subject: [PATCH 04/10] Sync package versions in the repo. --- bin/ircd/Cargo.toml | 2 +- example/smart-contract/Cargo.toml | 2 +- script/research/consensusd/Cargo.toml | 2 +- script/research/crypsinous/zk/plonk-by-hand/Cargo.toml | 2 +- script/research/nodes-tool/Cargo.toml | 2 +- script/research/pasta/Cargo.toml | 2 +- script/research/streamlet_rust/Cargo.toml | 2 +- script/research/validatord/Cargo.toml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bin/ircd/Cargo.toml b/bin/ircd/Cargo.toml index e8ada3f56..6d4b5caf3 100644 --- a/bin/ircd/Cargo.toml +++ b/bin/ircd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ircd" -version = "0.1.0" +version = "0.3.0" homepage = "https://dark.fi" description = "P2P IRC daemon" authors = ["darkfi "] diff --git a/example/smart-contract/Cargo.toml b/example/smart-contract/Cargo.toml index 47509673f..bb1d040a5 100644 --- a/example/smart-contract/Cargo.toml +++ b/example/smart-contract/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "smart-contract" -version = "0.1.0" +version = "0.3.0" edition = "2021" [workspace] diff --git a/script/research/consensusd/Cargo.toml b/script/research/consensusd/Cargo.toml index 3af43a59a..000a4f803 100644 --- a/script/research/consensusd/Cargo.toml +++ b/script/research/consensusd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "consensusd" -version = "0.1.0" +version = "0.3.0" edition = "2021" [dependencies.darkfi] diff --git a/script/research/crypsinous/zk/plonk-by-hand/Cargo.toml b/script/research/crypsinous/zk/plonk-by-hand/Cargo.toml index 4e568c3db..3b783fd9c 100644 --- a/script/research/crypsinous/zk/plonk-by-hand/Cargo.toml +++ b/script/research/crypsinous/zk/plonk-by-hand/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "plonkbyhand" -version = "0.1.0" +version = "0.3.0" edition = "2021" diff --git a/script/research/nodes-tool/Cargo.toml b/script/research/nodes-tool/Cargo.toml index b39a9646d..b8263d5b5 100644 --- a/script/research/nodes-tool/Cargo.toml +++ b/script/research/nodes-tool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nodes-tool" -version = "0.1.0" +version = "0.3.0" edition = "2021" [dependencies.darkfi] diff --git a/script/research/pasta/Cargo.toml b/script/research/pasta/Cargo.toml index c7746d349..7b71b00f4 100644 --- a/script/research/pasta/Cargo.toml +++ b/script/research/pasta/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pasta" -version = "0.1.0" +version = "0.3.0" authors = ["narodnik "] edition = "2018" diff --git a/script/research/streamlet_rust/Cargo.toml b/script/research/streamlet_rust/Cargo.toml index d1f70fcd2..f99ab7a9d 100644 --- a/script/research/streamlet_rust/Cargo.toml +++ b/script/research/streamlet_rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "streamlet_rust" -version = "0.1.0" +version = "0.3.0" edition = "2021" [dependencies.darkfi] diff --git a/script/research/validatord/Cargo.toml b/script/research/validatord/Cargo.toml index ab2ea4e3b..c732ffbc9 100644 --- a/script/research/validatord/Cargo.toml +++ b/script/research/validatord/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "validatord" -version = "0.1.0" +version = "0.3.0" edition = "2021" [dependencies.darkfi] From f15f5906b80ba0f35c7d3cdfff9d1cf591fd50fd Mon Sep 17 00:00:00 2001 From: ghassmo Date: Thu, 28 Apr 2022 22:16:59 +0300 Subject: [PATCH 05/10] retrieve "ircd: create entry point socket for bots and plugins" --- bin/ircd/ircd_config.toml | 3 -- bin/ircd/src/main.rs | 80 +-------------------------------------- bin/ircd/src/settings.rs | 3 -- 3 files changed, 1 insertion(+), 85 deletions(-) diff --git a/bin/ircd/ircd_config.toml b/bin/ircd/ircd_config.toml index cb198b897..ce76ac486 100644 --- a/bin/ircd/ircd_config.toml +++ b/bin/ircd/ircd_config.toml @@ -4,9 +4,6 @@ ## IRC listen URL #irc_listen="127.0.0.1:11066" -## Plugin listen URL -#plugin_listen="127.0.0.1:11077" - ## Sets Datastore Path #datastore="~/.config/ircd" diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index e8590aa01..32eefbeaa 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -17,7 +17,7 @@ use darkfi::{ Error, Result, }; use easy_parallel::Parallel; -use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, FutureExt}; +use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; use log::{debug, error, info, warn}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; use smol::future; @@ -74,43 +74,11 @@ async fn broadcast_msg( Ok(()) } -async fn plugin_process( - recv_for_plugin: Receiver, - send_from_plugin: Sender, - stream: TcpStream, -) -> Result<()> { - let peer_addr = stream.peer_addr()?.clone(); - let (reader, mut writer) = stream.split(); - let mut reader = BufReader::new(reader); - - loop { - let mut line = String::new(); - futures::select! { - irc_msg = recv_for_plugin.recv().fuse() => { - let irc_msg = irc_msg?; - info!("Plugin recv {}", irc_msg); - writer.write_all(irc_msg.as_bytes()).await?; - }, - err = reader.read_line(&mut line).fuse() => { - if let Err(e) = err { - warn!("Read line error: {}", e); - return Ok(()) - } - let irc_msg = clean_input(line, &peer_addr)?; - info!("Plugin send {}", irc_msg); - send_from_plugin.send(irc_msg).await?; - } - }; - } -} - async fn process( raft_receiver: Receiver, stream: TcpStream, peer_addr: SocketAddr, raft_sender: Sender, - send_for_plugin: Sender, - recv_from_plugin: Receiver, seen_msg_id: SeenMsgIds, ) -> Result<()> { let (reader, writer) = stream.split(); @@ -134,13 +102,6 @@ async fn process( let irc_msg = build_irc_msg(&msg); conn.reply(&irc_msg).await?; - send_for_plugin.send(irc_msg).await?; - } - irc_msg = recv_from_plugin.recv().fuse() => { - let irc_msg = irc_msg?; - info!("Receive msg from plugin"); - broadcast_msg(irc_msg, peer_addr, &mut conn).await?; - } err = reader.read_line(&mut line).fuse() => { if let Err(e) = err { @@ -188,9 +149,6 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await }); - let (send_from_plugin, recv_from_plugin) = async_channel::unbounded::(); - let (send_for_plugin, recv_for_plugin) = async_channel::unbounded::(); - // // IRC instance // @@ -213,47 +171,12 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { stream, peer_addr, raft_sender.clone(), - send_for_plugin.clone(), - recv_from_plugin.clone(), seen_msg_id.clone(), )) .detach(); } }); - // - // Plugin instance - // - let executor_cloned = executor.clone(); - let send_from_plugin_cloned = send_from_plugin.clone(); - let recv_for_plugin_cloned = recv_for_plugin.clone(); - let plugin_task: smol::Task> = executor.spawn(async move { - if settings.plugin_listen.is_none() { - return Ok(()) - } - let plugin_listener = TcpListener::bind(settings.plugin_listen.unwrap()).await?; - let plugin_local_addr = plugin_listener.local_addr()?; - info!("Plugin listening on {}", plugin_local_addr); - loop { - let (stream, peer_addr) = match plugin_listener.accept().await { - Ok((s, a)) => (s, a), - Err(e) => { - error!("Failed listening for connections: {}", e); - return Err(Error::ServiceStopped) - } - }; - - info!("Plugin Accepted client: {}", peer_addr); - executor_cloned - .spawn(plugin_process( - recv_for_plugin_cloned.clone(), - send_from_plugin_cloned.clone(), - stream, - )) - .detach(); - } - }); - let (signal, shutdown) = async_channel::bounded::<()>(1); ctrlc_async::set_async_handler(async move { warn!(target: "ircd", "ircd start Exit Signal"); @@ -261,7 +184,6 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { signal.send(()).await.unwrap(); rpc_task.cancel().await; irc_task.cancel().await; - plugin_task.cancel().await; }) .unwrap(); diff --git a/bin/ircd/src/settings.rs b/bin/ircd/src/settings.rs index 9deaa2f8f..a86d94474 100644 --- a/bin/ircd/src/settings.rs +++ b/bin/ircd/src/settings.rs @@ -23,9 +23,6 @@ pub struct Args { /// IRC listen URL #[structopt(long = "irc", default_value = "127.0.0.1:11066")] pub irc_listen: SocketAddr, - /// Plugin listen URL - #[structopt(long)] - pub plugin_listen: Option, /// Sets Datastore Path #[structopt(long, default_value = "~/.config/ircd")] pub datastore: String, From 40b7ad62e0f4cac2c8b1b6b3184c1686f1a4c28b Mon Sep 17 00:00:00 2001 From: ghassmo Date: Thu, 28 Apr 2022 22:17:12 +0300 Subject: [PATCH 06/10] ircd: add simple bot example --- bin/ircd/script/meeting_bot.py | 24 ++++++++++++++++++++++++ bin/ircd/script/tmux_session.sh | 6 ------ 2 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 bin/ircd/script/meeting_bot.py delete mode 100755 bin/ircd/script/tmux_session.sh diff --git a/bin/ircd/script/meeting_bot.py b/bin/ircd/script/meeting_bot.py new file mode 100644 index 000000000..09e9d7cd1 --- /dev/null +++ b/bin/ircd/script/meeting_bot.py @@ -0,0 +1,24 @@ +import socket +import time + +def main(): + stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + host = "127.0.0.1" + port = 11066 + stream.connect((host, port)) + + nick_msg = b"NICK MEETING \r\n" + stream.send(nick_msg) + + join_msg = b"JOIN #dev \r\n" + stream.send(join_msg) + + while True: + time.sleep(6) + msg = b"PRIVMSG #dev :hello\r\n" + stream.send(msg) + + + +if __name__ == "__main__": + main() diff --git a/bin/ircd/script/tmux_session.sh b/bin/ircd/script/tmux_session.sh deleted file mode 100755 index 73681ed5e..000000000 --- a/bin/ircd/script/tmux_session.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -tmux new-session -d 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:9999 --irc 127.0.0.1:6688' -tmux split-window -v 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:11004 --external 127.0.0.1:11004 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6667' -tmux split-window -h 'LOG_TARGETS=net ../../../target/release/ircd -vv --slots 5 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6668' -tmux attach From d0b638226cbaaa98bd3db378d2228e383243ee12 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 29 Apr 2022 00:24:11 +0300 Subject: [PATCH 07/10] ircd: error handling for cleaning received msg --- bin/ircd/src/main.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 32eefbeaa..db2e4614d 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -51,7 +51,10 @@ fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { return Err(Error::ChannelStopped) } - assert!(&line[(line.len() - 2)..] == "\r\n"); + if &line[(line.len() - 2)..] != "\r\n" { + warn!("Closing connection."); + return Err(Error::ChannelStopped) + } // Remove CRLF line.pop(); line.pop(); @@ -109,7 +112,10 @@ async fn process( return Ok(()) } info!("Receive msg from IRC server"); - let irc_msg = clean_input(line, &peer_addr)?; + let irc_msg = match clean_input(line, &peer_addr) { + Ok(m) => m, + Err(e) => return Err(e) + }; broadcast_msg(irc_msg, peer_addr,&mut conn).await?; } }; From 13394e0fef7de2a50768455cc591932a17f298d7 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 29 Apr 2022 00:26:11 +0300 Subject: [PATCH 08/10] ircd: add meeting bot --- Cargo.lock | 2 +- bin/ircd/script/meeting_bot.py | 88 +++++++++++++++++++++++++++------- 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1822d930..31eeb8b0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,7 +2156,7 @@ dependencies = [ [[package]] name = "ircd" -version = "0.1.0" +version = "0.3.0" dependencies = [ "async-channel", "async-executor", diff --git a/bin/ircd/script/meeting_bot.py b/bin/ircd/script/meeting_bot.py index 09e9d7cd1..17b8da06c 100644 --- a/bin/ircd/script/meeting_bot.py +++ b/bin/ircd/script/meeting_bot.py @@ -1,24 +1,80 @@ -import socket -import time +import asyncio -def main(): - stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - host = "127.0.0.1" - port = 11066 - stream.connect((host, port)) - nick_msg = b"NICK MEETING \r\n" - stream.send(nick_msg) +async def start(): + host = "127.0.0.1" + port = 11066 + channel = "#dev" + nickname = "meeting" - join_msg = b"JOIN #dev \r\n" - stream.send(join_msg) + print(f"Start a connection {host}:{port}") + reader, writer = await asyncio.open_connection(host, port) + + print("Send NICK msg") + nick_msg = f"NICK {nickname} \r\n" + writer.write(nick_msg.encode('utf8')) + + print(f"Send JOIN msg: {channel}") + join_msg = f"JOIN {channel} \r\n" + writer.write(join_msg.encode('utf8')) + + topics = [] while True: - time.sleep(6) - msg = b"PRIVMSG #dev :hello\r\n" - stream.send(msg) + msg = await reader.read(350) + msg = msg.decode('utf8').strip() + if not msg: + print("Error: Receive empty msg") + break + command = msg.split(" ")[1] -if __name__ == "__main__": - main() + if command == "PRIVMSG": + + msg_title = msg.split(" ")[3][1:] + + if not msg_title: + continue + + reply = None + + if msg_title == "#m_start": + reply = f"PRIVMSG {channel} :meeting started \r\n" + msg_title = "#m_list" + + if msg_title == "#m_end": + reply = f"PRIVMSG {channel} :meeting end \r\n" + topics = [] + + if msg_title == "#m_topic": + topic = msg.split(" ", 4) + if len(topic) != 5: + continue + topic = topic[4] + topics.append(topic) + reply = f"PRIVMSG {channel} :add topic: {topic} \r\n" + + if msg_title == "#m_list": + tp = " ".join( + [f"{i}-{topic}" for i, topic in enumerate(topics, 1)]) + + reply = f"PRIVMSG {channel} :topics: {tp} \r\n" + + if msg_title == "#m_next": + if len(topics) == 0: + reply = f"PRIVMSG {channel} :no topics \r\n" + else: + tp = topics.pop(0) + reply = f"PRIVMSG {channel} :current topic: {tp} \r\n" + + if reply != None: + writer.write(reply.encode('utf8')) + await writer.drain() + + if command == "QUIT": + break + + writer.close() + +asyncio.run(start()) From cc18f509e828fdd8e63deaab23a4667fecbf4960 Mon Sep 17 00:00:00 2001 From: Dastan-glitch Date: Fri, 29 Apr 2022 02:23:18 +0300 Subject: [PATCH 09/10] bin/tau: adding encryption --- Cargo.lock | 221 +++++++++++++++++++++++++++++++++++++-- bin/tau/taud/Cargo.toml | 3 + bin/tau/taud/src/main.rs | 67 +++++++++++- 3 files changed, 278 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31eeb8b0f..d7a7bc87f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", + "rand_core 0.6.3", +] + [[package]] name = "ahash" version = "0.7.6" @@ -426,7 +436,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "constant_time_eq", - "digest", + "digest 0.10.3", ] [[package]] @@ -627,6 +637,31 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b72a433d0cf2aef113ba70f62634c56fddb0f244e6377185c56a7cadbd8f91" +dependencies = [ + "cfg-if 1.0.0", + "cipher", + "cpufeatures", + "zeroize", +] + +[[package]] +name = "chacha20poly1305" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b84ed6d1d5f7aa9bdde921a5090e0ca4d934d250ea3b402a5fab3a994e28a2a" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.19" @@ -640,6 +675,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "clap" version = "2.34.0" @@ -996,6 +1040,21 @@ dependencies = [ "crypto_api", ] +[[package]] +name = "crypto_box" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2bcbd5e4fc3ad3de2d0e75509f870a6fa9f488e0e2c9a8ce49721a52efc4e" +dependencies = [ + "chacha20", + "chacha20poly1305", + "rand_core 0.6.3", + "salsa20", + "x25519-dalek", + "xsalsa20poly1305", + "zeroize", +] + [[package]] name = "csv" version = "1.1.6" @@ -1055,6 +1114,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "curve25519-dalek" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f9d052967f590a76e62eb387bd0bbb1b000182c3cefe5364db6b7211651bc0" +dependencies = [ + "byteorder", + "digest 0.9.0", + "rand_core 0.5.1", + "subtle", + "zeroize", +] + [[package]] name = "dao-cli" version = "0.3.0" @@ -1313,6 +1385,15 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.3" @@ -1494,6 +1575,24 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "encoding_rs_io" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cc3c5651fb62ab8aa3103998dade57efdd028544bd300516baa31840c252a83" +dependencies = [ + "encoding_rs", +] + [[package]] name = "enum-iterator" version = "0.7.0" @@ -1623,7 +1722,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2958d04124b9f27f175eaeb9a9f383d026098aa837eadd8ba22c11f13a05b9e" dependencies = [ "bitvec 0.22.3", - "rand_core", + "rand_core 0.6.3", "subtle", ] @@ -1964,7 +2063,7 @@ checksum = "bc5ac374b108929de78460075f3dc439fa66df9d8fc77e8f12caa5165fcf0c89" dependencies = [ "byteorder", "ff", - "rand_core", + "rand_core 0.6.3", "subtle", ] @@ -1999,7 +2098,7 @@ dependencies = [ "group", "pasta_curves", "plotters", - "rand_core", + "rand_core 0.6.3", "rayon", "tabbycat", ] @@ -2558,6 +2657,12 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "openssl" version = "0.10.38" @@ -2801,6 +2906,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "poly1305" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -2965,7 +3081,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.3", ] [[package]] @@ -2975,9 +3091,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.3", ] +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" + [[package]] name = "rand_core" version = "0.6.3" @@ -2993,7 +3115,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" dependencies = [ - "rand_core", + "rand_core 0.6.3", ] [[package]] @@ -3260,6 +3382,16 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +[[package]] +name = "salsa20" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0fbb5f676da676c260ba276a8f43a8dc67cf02d1438423aeb1c677a7212686" +dependencies = [ + "cipher", + "zeroize", +] + [[package]] name = "same-file" version = "1.0.6" @@ -3420,7 +3552,7 @@ checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ "cfg-if 1.0.0", "cpufeatures", - "digest", + "digest 0.10.3", ] [[package]] @@ -3431,7 +3563,7 @@ checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" dependencies = [ "cfg-if 1.0.0", "cpufeatures", - "digest", + "digest 0.10.3", ] [[package]] @@ -3761,6 +3893,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "tabbycat" version = "0.1.2" @@ -3816,9 +3960,12 @@ dependencies = [ "async-trait", "chrono", "clap 3.1.12", + "crypto_box", "ctrlc-async", "darkfi", "easy-parallel", + "encoding_rs", + "encoding_rs_io", "futures", "log", "num_cpus", @@ -4120,6 +4267,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -4657,6 +4814,31 @@ dependencies = [ "tap", ] +[[package]] +name = "x25519-dalek" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077" +dependencies = [ + "curve25519-dalek", + "rand_core 0.5.1", + "zeroize", +] + +[[package]] +name = "xsalsa20poly1305" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e68bcb965d6c650091450b95cea12f07dcd299a01c15e2f9433b0813ea3c0886" +dependencies = [ + "aead", + "poly1305", + "rand_core 0.6.3", + "salsa20", + "subtle", + "zeroize", +] + [[package]] name = "yasna" version = "0.5.0" @@ -4666,6 +4848,27 @@ dependencies = [ "time 0.3.9", ] +[[package]] +name = "zeroize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zkas" version = "0.3.0" diff --git a/bin/tau/taud/Cargo.toml b/bin/tau/taud/Cargo.toml index c322cb6b9..295de5c5e 100644 --- a/bin/tau/taud/Cargo.toml +++ b/bin/tau/taud/Cargo.toml @@ -26,9 +26,12 @@ rand = "0.8.5" chrono = "0.4.19" thiserror = "1.0.30" ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} +encoding_rs = "0.8.31" +encoding_rs_io = "0.1.7" # Encoding and parsing serde = {version = "1.0.136", features = ["derive"]} serde_json = "1.0.79" structopt = "0.3.26" structopt-toml = "0.5.0" +crypto_box = {version = "0.7.2", features = ["std"]} diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 1c3d9ba81..b80f1bf73 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -1,10 +1,12 @@ use async_std::sync::Arc; +use serde::{Deserialize, Serialize}; use std::fs::create_dir_all; use async_executor::Executor; +use crypto_box::{aead::Aead, Box, SecretKey, KEY_SIZE}; use easy_parallel::Parallel; use futures::{select, FutureExt}; -use log::{info, warn}; +use log::{error, info, warn}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; use smol::future; use structopt_toml::StructOptToml; @@ -17,6 +19,7 @@ use darkfi::{ cli::{log_config, spawn_config}, expand_path, path::get_config_path, + serial::{deserialize, serialize, SerialDecodable, SerialEncodable}, }, Error, Result, }; @@ -34,8 +37,15 @@ use crate::{ month_tasks::MonthTasks, settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, task_info::TaskInfo, + util::{load, save}, }; +#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Serialize, Deserialize)] +pub struct MsgPayload { + nonce: Vec, + payload: Vec, +} + async_daemonize!(realmain); async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let datastore_path = expand_path(&settings.datastore)?; @@ -44,6 +54,23 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { create_dir_all(datastore_path.join("month"))?; create_dir_all(datastore_path.join("task"))?; + let mut rng = crypto_box::rand_core::OsRng; + + let secret_key = match load::<[u8; KEY_SIZE]>(&datastore_path.join("secret_key")) { + Ok(t) => SecretKey::try_from(t)?, + Err(_) => { + info!(target: "tau", "generating a new secret key"); + let secret = SecretKey::generate(&mut rng); + let sk_string = secret.as_bytes(); + save::<[u8; KEY_SIZE]>(&datastore_path.join("secret_key"), sk_string) + .map_err(Error::from)?; + secret + } + }; + + let public_key = secret_key.public_key(); + let msg_box = Box::new(&public_key, &secret_key); + // // RPC // @@ -69,7 +96,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { //Raft // let datastore_raft = datastore_path.join("tau.db"); - let mut raft = Raft::::new(net_settings.inbound, datastore_raft)?; + let mut raft = Raft::>::new(net_settings.inbound, datastore_raft)?; let raft_sender = raft.get_broadcast(); let commits = raft.get_commits(); @@ -83,7 +110,15 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { for task in tasks { info!(target: "tau", "send local task {:?}", task); - initial_sync_raft_sender.send(task).await.map_err(Error::from)?; + + let nonce = crypto_box::generate_nonce(&mut rng); + let payload = &serialize(&task)[..]; + let encrypted_payload = msg_box.encrypt(&nonce, payload).unwrap(); + + let msg = MsgPayload { nonce: nonce.to_vec(), payload: encrypted_payload }; + let ser_msg = serialize(&msg); + + initial_sync_raft_sender.send(ser_msg).await.map_err(Error::from)?; } loop { @@ -93,12 +128,36 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { if let Some(tk) = task { info!(target: "tau", "save the received task {:?}", tk); tk.save(&datastore_path_cloned)?; - raft_sender.send(tk).await.map_err(Error::from)?; + + let nonce = crypto_box::generate_nonce(&mut rng); + let payload = &serialize(&tk)[..]; + let encrypted_payload = msg_box.encrypt(&nonce, payload).unwrap(); + + let msg = MsgPayload { + nonce: nonce.to_vec(), + payload: encrypted_payload, + }; + let ser_msg = serialize(&msg); + + raft_sender.send(ser_msg).await.map_err(Error::from)?; } } task = commits.recv().fuse() => { let task = task.map_err(Error::from)?; + + let recv: MsgPayload = deserialize(&task)?; + let nonce = recv.nonce.as_slice(); + let message = match msg_box.decrypt(nonce.try_into().unwrap(), &recv.payload[..]){ + Ok(m) => m, + Err(_) => { + error!("Invalid secret or public key"); + vec![] + }, + }; + + let task: TaskInfo = deserialize(&message)?; info!(target: "tau", "receive update from the commits {:?}", task); + task.save(&datastore_path_cloned)?; } From 713a937c6b4beb3991069fe767b8a7bdb8abce4e Mon Sep 17 00:00:00 2001 From: Dastan-glitch Date: Fri, 29 Apr 2022 02:23:47 +0300 Subject: [PATCH 10/10] bin/tau: add default value for rpc_listen --- bin/tau/tau-cli/src/main.rs | 5 ++++- bin/tau/tau-cli/src/util.rs | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/bin/tau/tau-cli/src/main.rs b/bin/tau/tau-cli/src/main.rs index dc4ab71ed..1a753b2e6 100644 --- a/bin/tau/tau-cli/src/main.rs +++ b/bin/tau/tau-cli/src/main.rs @@ -168,7 +168,10 @@ async fn main() -> Result<()> { spawn_config(&config_path, CONFIG_FILE_CONTENTS)?; - let config: TauConfig = Config::::load(config_path)?; + let config: TauConfig = match Config::::load(config_path) { + Ok(c) => c, + Err(_) => TauConfig::default(), + }; start(args, config).await } diff --git a/bin/tau/tau-cli/src/util.rs b/bin/tau/tau-cli/src/util.rs index 9b0aa7c2e..29cbb4c52 100644 --- a/bin/tau/tau-cli/src/util.rs +++ b/bin/tau/tau-cli/src/util.rs @@ -2,7 +2,7 @@ use std::{ env::{temp_dir, var}, fs::{self, File}, io::{self, Read, Write}, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, ops::Index, process::Command, }; @@ -24,6 +24,12 @@ pub struct TauConfig { pub rpc_listen: SocketAddr, } +impl Default for TauConfig { + fn default() -> Self { + Self { rpc_listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 11055) } + } +} + #[derive(Subcommand)] pub enum CliTauSubCommands { /// Add a new task