From 7021cc40d1d1efde95116937eeb32da1045358c6 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 27 Apr 2022 20:01:35 +0300 Subject: [PATCH 1/5] raft: reduce cpu usage, fix bugs, and clean up --- src/raft/mod.rs | 25 ++++++- src/raft/p2p.rs | 9 ++- src/raft/raft.rs | 185 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 175 insertions(+), 44 deletions(-) diff --git a/src/raft/mod.rs b/src/raft/mod.rs index 445b8c065..bf8989438 100644 --- a/src/raft/mod.rs +++ b/src/raft/mod.rs @@ -20,6 +20,20 @@ pub enum Role { Leader, } +#[derive(SerialDecodable, SerialEncodable, Clone, Debug)] +pub struct SyncRequest { + logs_len: u64, + last_term: u64, +} + +#[derive(SerialDecodable, SerialEncodable, Clone, Debug)] +pub struct SyncResponse { + logs: Logs, + commit_length: u64, + leader_id: NodeId, + wipe: bool, +} + #[derive(SerialDecodable, SerialEncodable, Clone, Debug)] pub struct VoteRequest { node_id: NodeId, @@ -139,7 +153,7 @@ impl MapLength { #[derive(SerialDecodable, SerialEncodable, Clone, Debug)] pub struct NetMsg { - id: u32, + id: u64, recipient_id: Option, method: NetMsgMethod, payload: Vec, @@ -153,6 +167,9 @@ pub enum NetMsgMethod { VoteResponse = 2, VoteRequest = 3, BroadcastRequest = 4, + // this only used for listener node + SyncRequest = 5, + SyncResponse = 6, } impl Encodable for NetMsgMethod { @@ -163,6 +180,8 @@ impl Encodable for NetMsgMethod { Self::VoteResponse => 2, Self::VoteRequest => 3, Self::BroadcastRequest => 4, + Self::SyncRequest => 5, + Self::SyncResponse => 6, }; (len as u8).encode(s) } @@ -176,7 +195,9 @@ impl Decodable for NetMsgMethod { 1 => Self::LogRequest, 2 => Self::VoteResponse, 3 => Self::VoteRequest, - _ => Self::BroadcastRequest, + 4 => Self::BroadcastRequest, + 5 => Self::SyncRequest, + _ => Self::SyncResponse, }) } } diff --git a/src/raft/p2p.rs b/src/raft/p2p.rs index 3fb4f0071..f1ad96b19 100644 --- a/src/raft/p2p.rs +++ b/src/raft/p2p.rs @@ -14,7 +14,7 @@ pub struct ProtocolRaft { notify_queue_sender: async_channel::Sender, msg_sub: net::MessageSubscription, p2p: net::P2pPtr, - msgs: Arc>>, + msgs: Arc>>, } impl ProtocolRaft { @@ -23,6 +23,7 @@ impl ProtocolRaft { channel: net::ChannelPtr, notify_queue_sender: async_channel::Sender, p2p: net::P2pPtr, + msgs: Arc>>, ) -> net::ProtocolBasePtr { let message_subsytem = channel.get_message_subsystem(); message_subsytem.add_dispatch::().await; @@ -35,7 +36,7 @@ impl ProtocolRaft { msg_sub, jobsman: net::ProtocolJobsManager::new("ProtocolRaft", channel), p2p, - msgs: Arc::new(Mutex::new(vec![])), + msgs, }) } @@ -75,7 +76,9 @@ impl ProtocolRaft { // then the local node will only handle the msg if its method // is LogRequest (None, Some(_)) => { - if msg.method != NetMsgMethod::LogRequest { + if msg.method != NetMsgMethod::LogRequest && + msg.method != NetMsgMethod::SyncResponse + { continue } } diff --git a/src/raft/raft.rs b/src/raft/raft.rs index 16f301246..fdadc3ece 100644 --- a/src/raft/raft.rs +++ b/src/raft/raft.rs @@ -17,7 +17,7 @@ use crate::{ use super::{ BroadcastMsgRequest, DataStore, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg, - NetMsgMethod, NodeId, ProtocolRaft, Role, VoteRequest, VoteResponse, + NetMsgMethod, NodeId, ProtocolRaft, Role, SyncRequest, SyncResponse, VoteRequest, VoteResponse, }; const HEARTBEATTIMEOUT: u64 = 100; @@ -124,12 +124,16 @@ impl Raft { let registry = p2p.protocol_registry(); + let seen_net_msg = Arc::new(Mutex::new(vec![])); let self_id = self.id.clone(); registry .register(net::SESSION_ALL, move |channel, p2p| { let self_id = self_id.clone(); let sender = p2p_snd.clone(); - async move { ProtocolRaft::init(self_id, channel, sender, p2p).await } + let seen_net_msg_cloned = seen_net_msg.clone(); + async move { + ProtocolRaft::init(self_id, channel, sender, p2p, seen_net_msg_cloned).await + } }) .await; @@ -179,16 +183,57 @@ impl Raft { } }); + if self.id.is_none() { + let last_term = + if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 }; + + let sync_request = SyncRequest { logs_len: self.logs.len(), last_term }; + + info!("send sync request"); + self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?; + + loop { + select! { + msg = receive_queues.recv().fuse() => { + let msg = msg?; + if msg.method == NetMsgMethod::SyncResponse { + info!("receive sync response"); + let sr: SyncResponse = deserialize(&msg.payload)?; + if sr.wipe { + self.set_commit_length(&0)?; + self.push_logs(&sr.logs)?; + } else { + for log in sr.logs.0.iter() { + self.push_log(log)?; + } + } + + if !self.logs.is_empty() { + self.set_current_term(&self.logs.0.last().unwrap().term.clone())?; + } + + if self.commit_length > sr.commit_length { + self.set_commit_length(&0)?; + } + + for i in self.commit_length..sr.commit_length { + self.push_commit(&self.logs.get(i)?.msg).await?; + } + self.set_commit_length(&sr.commit_length)?; + + self.current_leader = Some(sr.leader_id); + + break + }}, + _ = stop_signal.recv().fuse() => break, + } + } + } + let mut rng = rand::thread_rng(); let broadcast_msg_rv = self.broadcast_msg.1.clone(); - // send data form datastore through broadcast_commits channel - let commits = self.datastore.commits.get_all()?; - for commit in commits { - self.broadcast_commits.0.send(commit).await?; - } - loop { let timeout: Duration = if self.role == Role::Leader { Duration::from_millis(HEARTBEATTIMEOUT) @@ -200,7 +245,7 @@ impl Raft { select! { m = receive_queues.recv().fuse() => result = self.handle_method(m?).await, - m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?).await, + m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await, _ = task::sleep(timeout).fuse() => { result = if self.role == Role::Leader { self.send_heartbeat().await @@ -233,29 +278,30 @@ impl Raft { self.broadcast_msg.0.clone() } - async fn broadcast_msg(&mut self, msg: &T) -> Result<()> { + async fn broadcast_msg(&mut self, msg: &T, msg_id: Option) -> Result<()> { if self.role == Role::Leader { let msg = serialize(msg); let log = Log { msg, term: self.current_term }; self.push_log(&log)?; self.acked_length.insert(&self.id.clone().unwrap(), self.logs.len()); - - let nodes = self.nodes.lock().await.clone(); - for node in nodes.iter() { - self.update_logs(node.0).await?; - } } else { let b_msg = BroadcastMsgRequest(serialize(msg)); self.send( self.current_leader.clone(), &serialize(&b_msg), NetMsgMethod::BroadcastRequest, + msg_id, ) .await?; } - info!(target: "raft", "has id: {} {:?} broadcast a msg", self.id.is_some(), self.role); + info!(target: "raft", + "Node has id: {}, Node status: {:?}, broadcast a msg id: {:?} ", + self.id.is_some(), + self.role, msg_id + ); + Ok(()) } @@ -280,8 +326,14 @@ impl Raft { NetMsgMethod::BroadcastRequest => { let vr: BroadcastMsgRequest = deserialize(&msg.payload)?; let d: T = deserialize(&vr.0)?; - self.broadcast_msg(&d).await?; + self.broadcast_msg(&d, Some(msg.id)).await?; } + NetMsgMethod::SyncRequest => { + info!("receive sync request"); + let sr: SyncRequest = deserialize(&msg.payload)?; + self.receive_sync_request(&sr, msg.id).await?; + } + NetMsgMethod::SyncResponse => {} } debug!( @@ -291,13 +343,64 @@ impl Raft { ); Ok(()) } + + async fn receive_sync_request(&self, sr: &SyncRequest, msg_id: u64) -> Result<()> { + if self.id.is_none() { + return Ok(()) + } + + if self.role == Role::Leader { + let mut wipe = false; + + let logs = if sr.logs_len == 0 { + self.logs.clone() + } else if self.logs.len() >= sr.logs_len && + self.logs.get(sr.logs_len - 1)?.term == sr.last_term + { + self.logs.slice_from(sr.logs_len).unwrap() + } else { + wipe = true; + self.logs.clone() + }; + + let sync_response = SyncResponse { + logs, + commit_length: self.commit_length, + leader_id: self.id.clone().unwrap(), + wipe, + }; + + info!("send sync response"); + for _ in 0..2 { + self.send( + self.current_leader.clone(), + &serialize(&sync_response), + NetMsgMethod::SyncResponse, + None, + ) + .await?; + } + } else { + self.send( + self.current_leader.clone(), + &serialize(sr), + NetMsgMethod::SyncRequest, + Some(msg_id), + ) + .await?; + } + + Ok(()) + } + async fn send( &self, recipient_id: Option, payload: &[u8], method: NetMsgMethod, + msg_id: Option, ) -> Result<()> { - let random_id = OsRng.next_u32(); + let random_id = if msg_id.is_some() { msg_id.unwrap() } else { OsRng.next_u64() }; debug!( target: "raft", @@ -313,8 +416,10 @@ impl Raft { async fn send_heartbeat(&self) -> Result<()> { if self.role == Role::Leader { - let nodes = self.nodes.lock().await.clone(); - for node in nodes.iter() { + let nodes = self.nodes.lock().await; + let nodes_cloned = nodes.clone(); + drop(nodes); + for node in nodes_cloned.iter() { self.update_logs(node.0).await?; } } @@ -344,7 +449,7 @@ impl Raft { }; let payload = serialize(&request); - self.send(None, &payload, NetMsgMethod::VoteRequest).await + self.send(None, &payload, NetMsgMethod::VoteRequest, None).await } async fn receive_vote_request(&mut self, vr: VoteRequest) -> Result<()> { @@ -383,7 +488,7 @@ impl Raft { } let payload = serialize(&response); - self.send(Some(vr.node_id), &payload, NetMsgMethod::VoteResponse).await + self.send(Some(vr.node_id), &payload, NetMsgMethod::VoteResponse, None).await } async fn receive_vote_response(&mut self, vr: VoteResponse) -> Result<()> { @@ -391,16 +496,17 @@ impl Raft { self.votes_received.push(vr.node_id); let nodes = self.nodes.lock().await; - if self.votes_received.len() >= ((nodes.len() + 1) / 2) { + let nodes_cloned = nodes.clone(); + drop(nodes); + + if self.votes_received.len() >= ((nodes_cloned.len() + 1) / 2) { self.role = Role::Leader; self.current_leader = Some(self.id.clone().unwrap()); - for node in nodes.iter() { + for node in nodes_cloned.iter() { self.sent_length.insert(node.0, self.logs.len()); self.acked_length.insert(node.0, 0); - self.update_logs(node.0).await?; } } - drop(nodes); } else if vr.current_term > self.current_term { self.set_current_term(&vr.current_term)?; self.role = Role::Follower; @@ -441,7 +547,7 @@ impl Raft { }; let payload = serialize(&request); - self.send(Some(node_id.clone()), &payload, NetMsgMethod::LogRequest).await + self.send(Some(node_id.clone()), &payload, NetMsgMethod::LogRequest, None).await } async fn receive_log_request(&mut self, lr: LogRequest) -> Result<()> { @@ -455,7 +561,7 @@ impl Raft { self.current_leader = Some(lr.leader_id.clone()); } - let ok = (self.logs.len() >= lr.prefix_len) && + let mut ok = (self.logs.len() >= lr.prefix_len) && (lr.prefix_len == 0 || self.logs.get(lr.prefix_len - 1)?.term == lr.prefix_term); let mut ack = 0; @@ -463,6 +569,8 @@ impl Raft { if lr.current_term == self.current_term && ok { self.append_log(lr.prefix_len, lr.commit_length, &lr.suffix).await?; ack = lr.prefix_len + lr.suffix.len(); + } else { + ok = false; } if self.id.is_none() { @@ -477,7 +585,7 @@ impl Raft { }; let payload = serialize(&response); - self.send(Some(lr.leader_id.clone()), &payload, NetMsgMethod::LogResponse).await + self.send(Some(lr.leader_id.clone()), &payload, NetMsgMethod::LogResponse, None).await } async fn receive_log_response(&mut self, lr: LogResponse) -> Result<()> { @@ -488,7 +596,6 @@ impl Raft { self.commit_log().await?; } else if self.sent_length.get(&lr.node_id)? > 0 { self.sent_length.insert(&lr.node_id, self.sent_length.get(&lr.node_id)? - 1); - self.update_logs(&lr.node_id).await?; } } else if lr.current_term > self.current_term { self.set_current_term(&lr.current_term)?; @@ -523,20 +630,20 @@ impl Raft { let nodes = nodes_ptr.clone(); drop(nodes_ptr); - let ready: Vec = self - .logs - .0 - .iter() - .enumerate() - .filter(|(i, _)| self.acks(nodes.clone(), *i as u64).len() >= min_acks) - .map(|(i, _)| i as u64) - .collect(); + let mut ready: Vec = vec![]; + + for len in 1..(self.logs.len() + 1) { + if self.acks(nodes.clone(), len).len() >= min_acks { + ready.push(len); + } + } if ready.is_empty() { return Ok(()) } let max_ready = *ready.iter().max().unwrap(); + if max_ready > self.commit_length && self.logs.get(max_ready - 1)?.term == self.current_term { for i in self.commit_length..max_ready { @@ -563,7 +670,7 @@ impl Raft { } if prefix_len + suffix.len() > self.logs.len() { - for i in (self.logs.len() - prefix_len)..(suffix.len() - 1) { + for i in (self.logs.len() - prefix_len)..suffix.len() { self.push_log(&suffix.get(i)?)?; } } From 60a4b6f211c3a405d3bf2fdd678223d6b6bb31f5 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 27 Apr 2022 20:02:56 +0300 Subject: [PATCH 2/5] ircd2: avoid duplicate messages --- bin/ircd2/src/main.rs | 34 +++++++++++++++++++++++++++++----- bin/ircd2/src/server.rs | 5 +++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/bin/ircd2/src/main.rs b/bin/ircd2/src/main.rs index 8a0bc60b6..83472280f 100644 --- a/bin/ircd2/src/main.rs +++ b/bin/ircd2/src/main.rs @@ -1,5 +1,8 @@ -use async_std::net::{TcpListener, TcpStream}; -use std::{net::SocketAddr, sync::Arc}; +use async_std::{ + net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, +}; +use std::net::SocketAddr; use async_channel::Receiver; use async_executor::Executor; @@ -33,11 +36,14 @@ use crate::{ settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, }; +pub type SeenMsgId = Arc>>; + async fn process_user_input( mut line: String, peer_addr: SocketAddr, conn: &mut IrcServerConnection, sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { if line.is_empty() { warn!("Received empty line from {}. Closing connection.", peer_addr); @@ -51,7 +57,7 @@ async fn process_user_input( debug!("Received '{}' from {}", line, peer_addr); - if let Err(e) = conn.update(line, sender).await { + if let Err(e) = conn.update(line, sender, seen_msg_id).await { warn!("Connection error: {} for {}", e, peer_addr); return Err(Error::ChannelStopped) } @@ -64,6 +70,7 @@ async fn process( stream: TcpStream, peer_addr: SocketAddr, sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { let (reader, writer) = stream.split(); @@ -75,6 +82,15 @@ async fn process( futures::select! { privmsg = receiver.recv().fuse() => { let msg = privmsg?; + + let mut smi = seen_msg_id.lock().await; + if smi.contains(&msg.id) { + continue + } + + smi.push(msg.id); + drop(smi); + debug!("ABOUT TO SEND: {:?}", msg); let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", msg.nickname, @@ -91,7 +107,7 @@ async fn process( return Ok(()) } - process_user_input(line, peer_addr, &mut conn, sender.clone()).await?; + process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?; } }; } @@ -105,6 +121,8 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let datastore_path = expand_path(&settings.datastore)?; + let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![])); + let net_settings = settings.net; // //Raft @@ -149,7 +167,13 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { info!("Accepted client: {}", peer_addr); executor_cloned - .spawn(process(commits.clone(), stream, peer_addr, raft_sender.clone())) + .spawn(process( + commits.clone(), + stream, + peer_addr, + raft_sender.clone(), + seen_msg_id.clone(), + )) .detach(); } }); diff --git a/bin/ircd2/src/server.rs b/bin/ircd2/src/server.rs index 2adbd76bf..736ce44b2 100644 --- a/bin/ircd2/src/server.rs +++ b/bin/ircd2/src/server.rs @@ -32,6 +32,7 @@ impl IrcServerConnection { &mut self, line: String, sender: async_channel::Sender, + seen_msg_id: crate::SeenMsgId, ) -> Result<()> { let mut tokens = line.split_ascii_whitespace(); // Commands can begin with :garbage but we will reject clients doing @@ -92,6 +93,10 @@ impl IrcServerConnection { message: message.to_string(), }; + let mut smi = seen_msg_id.lock().await; + smi.push(random_id); + drop(smi); + sender.send(protocol_msg).await?; } "QUIT" => { From c2d301a912eaf33c62e3545d1ed23fa35875cfd2 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 27 Apr 2022 20:09:21 +0300 Subject: [PATCH 3/5] update regex crate --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5bd4ebf34..16c7161b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ itertools = {version = "0.10.3", optional = true} darkfi-derive = {path = "src/util/derive", optional = true} darkfi-derive-internal = {path = "src/util/derive-internal", optional = true} chrono = {version = "0.4.19", optional = true} -regex = {version = "1.1.9", optional = true} +regex = {version = "1.5.5", optional = true} # Misc termion = {version = "1.5.6", optional = true} From 193e9d16c3023405143670a242901f04fc5456d0 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 27 Apr 2022 20:57:44 +0300 Subject: [PATCH 4/5] replace old ircd code with ircd2 --- Cargo.lock | 20 --- Cargo.toml | 1 - bin/ircd/Cargo.toml | 12 +- bin/{ircd2 => ircd}/ircd_config.toml | 0 bin/ircd/src/main.rs | 258 +++++++++++---------------- bin/{ircd2 => ircd}/src/privmsg.rs | 0 bin/ircd/src/proto/mod.rs | 1 - bin/ircd/src/proto/privmsg.rs | 121 ------------- bin/ircd/src/rpc.rs | 23 ++- bin/ircd/src/server.rs | 22 ++- bin/{ircd2 => ircd}/src/settings.rs | 0 bin/ircd2/Cargo.toml | 36 ---- bin/ircd2/README.md | 65 ------- bin/ircd2/script/run_node.sh | 9 - bin/ircd2/script/run_seed_node.sh | 4 - bin/ircd2/script/tmux_session.sh | 6 - bin/ircd2/src/main.rs | 195 -------------------- bin/ircd2/src/rpc.rs | 53 ------ bin/ircd2/src/server.rs | 140 --------------- 19 files changed, 138 insertions(+), 828 deletions(-) rename bin/{ircd2 => ircd}/ircd_config.toml (100%) rename bin/{ircd2 => ircd}/src/privmsg.rs (100%) delete mode 100644 bin/ircd/src/proto/mod.rs delete mode 100644 bin/ircd/src/proto/privmsg.rs rename bin/{ircd2 => ircd}/src/settings.rs (100%) delete mode 100644 bin/ircd2/Cargo.toml delete mode 100644 bin/ircd2/README.md delete mode 100755 bin/ircd2/script/run_node.sh delete mode 100755 bin/ircd2/script/run_seed_node.sh delete mode 100755 bin/ircd2/script/tmux_session.sh delete mode 100644 bin/ircd2/src/main.rs delete mode 100644 bin/ircd2/src/rpc.rs delete mode 100644 bin/ircd2/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 875f95e30..d81e2ccc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2154,26 +2154,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "ircd" -version = "0.3.0" -dependencies = [ - "async-channel", - "async-executor", - "async-std", - "async-trait", - "clap 3.1.12", - "darkfi", - "easy-parallel", - "futures", - "fxhash", - "log", - "rand", - "serde_json", - "simplelog", - "smol", -] - [[package]] name = "ircd2" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 16c7161b4..e07db6a30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ members = [ "bin/faucetd", #"bin/gatewayd", "bin/ircd", - "bin/ircd2", "bin/dnetview", "bin/daod", "bin/dao-cli", diff --git a/bin/ircd/Cargo.toml b/bin/ircd/Cargo.toml index 3eb25c06f..95a2842d1 100644 --- a/bin/ircd/Cargo.toml +++ b/bin/ircd/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "ircd" +name = "ircd2" version = "0.3.0" homepage = "https://dark.fi" description = "P2P IRC daemon" @@ -9,7 +9,7 @@ license = "AGPL-3.0-only" edition = "2021" [dependencies] -darkfi = {path = "../../", features = ["net", "rpc"]} +darkfi = {path = "../../", features = ["net", "rpc", "raft"]} # Async smol = "1.2.5" futures = "0.3.21" @@ -23,10 +23,14 @@ easy-parallel = "3.2.0" rand = "0.8.5" # Misc -clap = {version = "3.1.12", features = ["derive"]} +clap = {version = "3.1.8", features = ["derive"]} log = "0.4.16" -simplelog = "0.12.0" +simplelog = "0.12.0-alpha1" fxhash = "0.2.1" +ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} # Encoding and parsing serde_json = "1.0.79" +serde = {version = "1.0.136", features = ["derive"]} +structopt = "0.3.26" +structopt-toml = "0.5.0" diff --git a/bin/ircd2/ircd_config.toml b/bin/ircd/ircd_config.toml similarity index 100% rename from bin/ircd2/ircd_config.toml rename to bin/ircd/ircd_config.toml diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index bced22641..83472280f 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -1,72 +1,49 @@ -use std::{net::SocketAddr, sync::Arc}; +use async_std::{ + net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, +}; +use std::net::SocketAddr; use async_channel::Receiver; use async_executor::Executor; -use async_std::net::{TcpListener, TcpStream}; -use clap::Parser; use easy_parallel::Parallel; use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; use log::{debug, error, info, warn}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use smol::future; +use structopt_toml::StructOptToml; use darkfi::{ - cli_desc, net, + async_daemonize, + raft::Raft, rpc::rpcserver::{listen_and_serve, RpcServerConfig}, - util::cli::log_config, + util::{ + cli::{log_config, spawn_config}, + path::{expand_path, get_config_path}, + }, Error, Result, }; -pub(crate) mod proto; -pub(crate) mod rpc; -pub(crate) mod server; +pub mod privmsg; +pub mod rpc; +pub mod server; +pub mod settings; use crate::{ - proto::privmsg::{Privmsg, ProtocolPrivmsg, SeenPrivmsgIds, SeenPrivmsgIdsPtr}, + privmsg::Privmsg, rpc::JsonRpcInterface, server::IrcServerConnection, + settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, }; -#[derive(Parser)] -#[clap(name = "ircd", about = cli_desc!(), version)] -struct Args { - /// Accept address - #[clap(short, long)] - accept: Option, - - /// Seed node (repeatable) - #[clap(short, long)] - seed: Vec, - - /// Manual connection (repeatable) - #[clap(short, long)] - connect: Vec, - - /// Connection slots - #[clap(long, default_value_t = 0)] - slots: u32, - - /// External address - #[clap(short, long)] - external: Option, - - /// IRC listen address - #[clap(short = 'r', long, default_value = "127.0.0.1:6667")] - irc: SocketAddr, - - /// RPC listen address - #[clap(long, default_value = "127.0.0.1:8000")] - rpc: SocketAddr, - - /// Verbosity level - #[clap(short, parse(from_occurrences))] - verbose: u8, -} +pub type SeenMsgId = Arc>>; async fn process_user_input( mut line: String, peer_addr: SocketAddr, conn: &mut IrcServerConnection, - p2p: net::P2pPtr, + sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { if line.is_empty() { warn!("Received empty line from {}. Closing connection.", peer_addr); @@ -80,7 +57,7 @@ async fn process_user_input( debug!("Received '{}' from {}", line, peer_addr); - if let Err(e) = conn.update(line, p2p.clone()).await { + if let Err(e) = conn.update(line, sender, seen_msg_id).await { warn!("Connection error: {} for {}", e, peer_addr); return Err(Error::ChannelStopped) } @@ -89,28 +66,37 @@ async fn process_user_input( } async fn process( - receiver: Receiver>, + receiver: Receiver, stream: TcpStream, peer_addr: SocketAddr, - p2p: net::P2pPtr, - seen_privmsg_ids: SeenPrivmsgIdsPtr, + sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { let (reader, writer) = stream.split(); let mut reader = BufReader::new(reader); - let mut conn = IrcServerConnection::new(writer, seen_privmsg_ids); + let mut conn = IrcServerConnection::new(writer); loop { let mut line = String::new(); futures::select! { privmsg = receiver.recv().fuse() => { - let msg = privmsg.expect("internal message queue error"); + let msg = privmsg?; + + let mut smi = seen_msg_id.lock().await; + if smi.contains(&msg.id) { + continue + } + + smi.push(msg.id); + drop(smi); + debug!("ABOUT TO SEND: {:?}", msg); let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", - msg.nickname, - msg.channel, - msg.message, - ); + msg.nickname, + msg.channel, + msg.message, + ); conn.reply(&irc_msg).await?; } @@ -121,121 +107,89 @@ async fn process( return Ok(()) } - process_user_input(line, peer_addr, &mut conn, p2p.clone()).await?; + process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?; } }; } } -async fn start(executor: Arc>, args: Args, net_settings: net::Settings) -> Result<()> { - let listener = TcpListener::bind(args.irc).await?; +async_daemonize!(realmain); +async fn realmain(settings: Args, executor: Arc>) -> Result<()> { + let listener = TcpListener::bind(settings.irc_listen).await?; let local_addr = listener.local_addr()?; info!("Listening on {}", local_addr); + let datastore_path = expand_path(&settings.datastore)?; + + let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![])); + + let net_settings = settings.net; + // + //Raft + // + let datastore_raft = datastore_path.join("ircd.db"); + + let mut raft = Raft::::new(net_settings.inbound, datastore_raft)?; + + let raft_sender = raft.get_broadcast(); + let commits = raft.get_commits(); + + // + // RPC interface + // let rpc_config = RpcServerConfig { - socket_addr: args.rpc, + socket_addr: settings.rpc_listen, // TODO: Use net/transport: use_tls: false, identity_path: Default::default(), identity_pass: Default::default(), }; - - // - // Privmsg protocol - // - let seen_privmsg_ids = SeenPrivmsgIds::new(); - let seen_privmsg_ids_clone = seen_privmsg_ids.clone(); - - let (sender, receiver) = async_channel::unbounded(); - let sender_clone = sender.clone(); - - let p2p = net::P2p::new(net_settings).await; - let registry = p2p.protocol_registry(); - registry - .register(!net::SESSION_SEED, move |channel, p2p| { - let sender = sender_clone.clone(); - let seen_privmsg_ids = seen_privmsg_ids_clone.clone(); - async move { ProtocolPrivmsg::init(channel, sender, seen_privmsg_ids, p2p).await } - }) - .await; - - // - // P2P network main instance - // - p2p.clone().start(executor.clone()).await?; - let executor_clone = executor.clone(); - let p2p_clone = p2p.clone(); - executor - .spawn(async move { - if let Err(e) = p2p_clone.run(executor_clone).await { - error!("P2P run failed: {}", e); - } - }) - .detach(); - - // - // RPC interface - let executor_clone = executor.clone(); - let rpc_interface = Arc::new(JsonRpcInterface { p2p: p2p.clone(), addr: args.rpc }); - executor - .spawn(async move { listen_and_serve(rpc_config, rpc_interface, executor_clone.clone()).await }) - .detach(); + let executor_cloned = executor.clone(); + let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listen }); + let rpc_task = executor.spawn(async move { + listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await + }); // // IRC instance // - loop { - let (stream, peer_addr) = match listener.accept().await { - Ok((s, a)) => (s, a), - Err(e) => { - error!("Failed listening for connections: {}", e); - return Err(Error::ServiceStopped) - } - }; + let executor_cloned = executor.clone(); + let irc_task: smol::Task> = executor.spawn(async move { + loop { + let (stream, peer_addr) = match listener.accept().await { + Ok((s, a)) => (s, a), + Err(e) => { + error!("Failed listening for connections: {}", e); + return Err(Error::ServiceStopped) + } + }; - info!("Accepted client: {}", peer_addr); + info!("Accepted client: {}", peer_addr); - let p2p_clone = p2p.clone(); - executor - .spawn(process( - receiver.clone(), - stream, - peer_addr, - p2p_clone, - seen_privmsg_ids.clone(), - )) - .detach(); - } -} - -fn main() -> Result<()> { - let args = Args::parse(); - - let (lvl, conf) = log_config(args.verbose.into())?; - TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?; - - let net_settings = net::Settings { - inbound: args.accept, - outbound_connections: args.slots, - external_addr: args.external, - peers: args.connect.clone(), - seeds: args.seed.clone(), - ..Default::default() - }; - - let ex = Arc::new(Executor::new()); - let ex_clone = ex.clone(); - let (signal, shutdown) = async_channel::unbounded::<()>(); - let (_, result) = Parallel::new() - .each(0..4, |_| smol::future::block_on(ex.run(shutdown.recv()))) - // Run the main future on the current thread. - .finish(|| { - smol::future::block_on(async move { - start(ex_clone.clone(), args, net_settings).await?; - drop(signal); - Ok::<(), darkfi::Error>(()) - }) - }); - - result + executor_cloned + .spawn(process( + commits.clone(), + stream, + peer_addr, + raft_sender.clone(), + seen_msg_id.clone(), + )) + .detach(); + } + }); + + let (signal, shutdown) = async_channel::bounded::<()>(1); + ctrlc_async::set_async_handler(async move { + warn!(target: "ircd", "ircd start Exit Signal"); + // cleaning up tasks running in the background + signal.send(()).await.unwrap(); + rpc_task.cancel().await; + irc_task.cancel().await; + }) + .unwrap(); + + // blocking + raft.start(net_settings.into(), executor.clone(), shutdown.clone()).await?; + + Ok(()) } diff --git a/bin/ircd2/src/privmsg.rs b/bin/ircd/src/privmsg.rs similarity index 100% rename from bin/ircd2/src/privmsg.rs rename to bin/ircd/src/privmsg.rs diff --git a/bin/ircd/src/proto/mod.rs b/bin/ircd/src/proto/mod.rs deleted file mode 100644 index 4606abdd9..000000000 --- a/bin/ircd/src/proto/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) mod privmsg; diff --git a/bin/ircd/src/proto/privmsg.rs b/bin/ircd/src/proto/privmsg.rs deleted file mode 100644 index 6db74e273..000000000 --- a/bin/ircd/src/proto/privmsg.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::sync::Arc; - -use async_channel::Sender; -use async_executor::Executor; -use async_std::sync::Mutex; -use async_trait::async_trait; -use fxhash::FxHashSet; -use log::debug; - -use darkfi::{ - net, - util::serial::{SerialDecodable, SerialEncodable}, - Result, -}; - -pub type PrivmsgId = u32; - -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct Privmsg { - pub id: PrivmsgId, - pub nickname: String, - pub channel: String, - pub message: String, -} - -impl net::Message for Privmsg { - fn name() -> &'static str { - "privmsg" - } -} - -pub struct SeenPrivmsgIds { - ids: Mutex>, -} - -pub type SeenPrivmsgIdsPtr = Arc; - -impl SeenPrivmsgIds { - pub fn new() -> Arc { - Arc::new(Self { ids: Mutex::new(FxHashSet::default()) }) - } - - pub async fn add_seen(&self, id: u32) { - self.ids.lock().await.insert(id); - } - - pub async fn is_seen(&self, id: u32) -> bool { - self.ids.lock().await.contains(&id) - } -} - -pub struct ProtocolPrivmsg { - notify_queue_sender: Sender>, - privmsg_sub: net::MessageSubscription, - jobsman: net::ProtocolJobsManagerPtr, - seen_ids: SeenPrivmsgIdsPtr, - p2p: net::P2pPtr, -} - -#[async_trait] -impl net::ProtocolBase for ProtocolPrivmsg { - /// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the - /// protocol task manager, then queues the reply. Sends out a ping and - /// waits for pong reply. Waits for ping and replies with a pong. - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "ircd", "ProtocolPrivMsg::start() [START]"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_privmsg(), executor.clone()).await; - debug!(target: "ircd", "ProtocolPrivmsg::start() [END]"); - Ok(()) - } - - fn name(&self) -> &'static str { - "ProtocolPrivMsg" - } -} - -impl ProtocolPrivmsg { - pub async fn init( - channel: net::ChannelPtr, - notify_queue_sender: Sender>, - seen_ids: SeenPrivmsgIdsPtr, - p2p: net::P2pPtr, - ) -> net::ProtocolBasePtr { - let message_subsystem = channel.get_message_subsystem(); - message_subsystem.add_dispatch::().await; - - let sub = channel.subscribe_msg::().await.expect("Missing Privmsg dispatcher!"); - - Arc::new(Self { - notify_queue_sender, - privmsg_sub: sub, - jobsman: net::ProtocolJobsManager::new("PrivmsgProtocol", channel), - seen_ids, - p2p, - }) - } - - async fn handle_receive_privmsg(self: Arc) -> Result<()> { - debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_privmsg() [START]"); - - loop { - let privmsg = self.privmsg_sub.receive().await?; - - debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_privmsg() received {:?}", privmsg); - - // Do we already have this message? - if self.seen_ids.is_seen(privmsg.id).await { - continue - } - - self.seen_ids.add_seen(privmsg.id).await; - - // If not, then broadcast to network. - let privmsg_copy = (*privmsg).clone(); - self.p2p.broadcast(privmsg_copy).await?; - - self.notify_queue_sender.send(privmsg).await.expect("notify_queue_sender send failed!"); - } - } -} diff --git a/bin/ircd/src/rpc.rs b/bin/ircd/src/rpc.rs index d2f83d4db..a1fe2dd13 100644 --- a/bin/ircd/src/rpc.rs +++ b/bin/ircd/src/rpc.rs @@ -5,17 +5,13 @@ use async_trait::async_trait; use log::debug; use serde_json::{json, Value}; -use darkfi::{ - net, - rpc::{ - jsonrpc, - jsonrpc::{ErrorCode, JsonRequest, JsonResult}, - rpcserver::RequestHandler, - }, +use darkfi::rpc::{ + jsonrpc, + jsonrpc::{ErrorCode, JsonRequest, JsonResult}, + rpcserver::RequestHandler, }; pub struct JsonRpcInterface { - pub p2p: net::P2pPtr, pub addr: SocketAddr, } @@ -30,7 +26,7 @@ impl RequestHandler for JsonRpcInterface { match req.method.as_str() { Some("ping") => self.pong(req.id, req.params).await, - Some("get_info") => self.get_info(req.id, req.params).await, + //Some("get_info") => self.get_info(req.id, req.params).await, Some(_) | None => jsonrpc::error(ErrorCode::MethodNotFound, None, req.id).into(), } } @@ -45,12 +41,13 @@ impl JsonRpcInterface { jsonrpc::response(json!("pong"), id).into() } + // TODO // RPCAPI: // Retrieves P2P network information. // --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} - async fn get_info(&self, id: Value, _params: Value) -> JsonResult { - let resp = self.p2p.get_info().await; - jsonrpc::response(resp, id).into() - } + //async fn get_info(&self, id: Value, _params: Value) -> JsonResult { + // let resp = self.p2p.get_info().await; + // jsonrpc::response(resp, id).into() + //} } diff --git a/bin/ircd/src/server.rs b/bin/ircd/src/server.rs index ecfc4c273..736ce44b2 100644 --- a/bin/ircd/src/server.rs +++ b/bin/ircd/src/server.rs @@ -3,13 +3,12 @@ use futures::{io::WriteHalf, AsyncWriteExt}; use log::{debug, info, warn}; use rand::{rngs::OsRng, RngCore}; -use darkfi::{net, Error, Result}; +use darkfi::{Error, Result}; -use crate::proto::privmsg::{Privmsg, SeenPrivmsgIdsPtr}; +use crate::privmsg::Privmsg; pub struct IrcServerConnection { write_stream: WriteHalf, - seen_privmsg_ids: SeenPrivmsgIdsPtr, is_nick_init: bool, is_user_init: bool, is_registered: bool, @@ -18,10 +17,9 @@ pub struct IrcServerConnection { } impl IrcServerConnection { - pub fn new(write_stream: WriteHalf, seen_ids: SeenPrivmsgIdsPtr) -> Self { + pub fn new(write_stream: WriteHalf) -> Self { Self { write_stream, - seen_privmsg_ids: seen_ids, is_nick_init: false, is_user_init: false, is_registered: false, @@ -30,7 +28,12 @@ impl IrcServerConnection { } } - pub async fn update(&mut self, line: String, p2p: net::P2pPtr) -> Result<()> { + pub async fn update( + &mut self, + line: String, + sender: async_channel::Sender, + seen_msg_id: crate::SeenMsgId, + ) -> Result<()> { let mut tokens = line.split_ascii_whitespace(); // Commands can begin with :garbage but we will reject clients doing // that for now to keep the protocol simple and focused. @@ -82,7 +85,6 @@ impl IrcServerConnection { info!("Message {}: {}", channel, message); let random_id = OsRng.next_u32(); - self.seen_privmsg_ids.add_seen(random_id).await; let protocol_msg = Privmsg { id: random_id, @@ -91,7 +93,11 @@ impl IrcServerConnection { message: message.to_string(), }; - p2p.broadcast(protocol_msg).await?; + let mut smi = seen_msg_id.lock().await; + smi.push(random_id); + drop(smi); + + sender.send(protocol_msg).await?; } "QUIT" => { // Close the connection diff --git a/bin/ircd2/src/settings.rs b/bin/ircd/src/settings.rs similarity index 100% rename from bin/ircd2/src/settings.rs rename to bin/ircd/src/settings.rs diff --git a/bin/ircd2/Cargo.toml b/bin/ircd2/Cargo.toml deleted file mode 100644 index 95a2842d1..000000000 --- a/bin/ircd2/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -[package] -name = "ircd2" -version = "0.3.0" -homepage = "https://dark.fi" -description = "P2P IRC daemon" -authors = ["darkfi "] -repository = "https://github.com/darkrenaissance/darkfi" -license = "AGPL-3.0-only" -edition = "2021" - -[dependencies] -darkfi = {path = "../../", features = ["net", "rpc", "raft"]} -# Async -smol = "1.2.5" -futures = "0.3.21" -async-std = "1.11.0" -async-trait = "0.1.53" -async-channel = "1.6.1" -async-executor = "1.4.1" -easy-parallel = "3.2.0" - -# Crypto -rand = "0.8.5" - -# Misc -clap = {version = "3.1.8", features = ["derive"]} -log = "0.4.16" -simplelog = "0.12.0-alpha1" -fxhash = "0.2.1" -ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} - -# Encoding and parsing -serde_json = "1.0.79" -serde = {version = "1.0.136", features = ["derive"]} -structopt = "0.3.26" -structopt-toml = "0.5.0" diff --git a/bin/ircd2/README.md b/bin/ircd2/README.md deleted file mode 100644 index 508bcb550..000000000 --- a/bin/ircd2/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# p2p IRC - -This is a local daemon which can be attached to with any IRC frontend. -It uses the darkfi p2p engine to synchronize chats between hosts. - -## Local Deployment - -### Seed Node - -First you must run a seed node. The seed node is a static host which nodes can -connect to when they first connect to the network. The `seed_session` simply -connects to a seed node and runs `protocol_seed`, which requests a list of -addresses from the seed node and disconnects straight after receiving them. - - LOG_TARGETS=net cargo run -- -vv --accept 0.0.0.0:9999 --irc 127.0.0.1:6688 - -Note that the above command doesn't specify an external address since the -seed node shouldn't be advertised in the list of connectable nodes. The seed -node does not participate as a normal node in the p2p network. It simply allows -new nodes to discover other nodes in the network during the bootstrapping phase. - -### Inbound Node - -This is a node accepting inbound connections on the network but which is not -making any outbound connections. - -The external address is important and must be correct. - - LOG_TARGETS=net cargo run -- -vv --accept 0.0.0.0:11004 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667 - -### Outbound Node - -This is a node which has 8 outbound connection slots and no inbound connections. -This means the node has 8 slots which will actively search for unique nodes to -connect to in the p2p network. - - LOG_TARGETS=net cargo run -- -vv --slots 5 --seeds $SEED_IP:9999 --irc 127.0.0.1:6668 - -### Attaching the IRC Frontend - -Assuming you have run the above 3 commands to create a small model testnet, -and both inbound and outbound nodes above are connected, you can test them -out using weechat. - -To create separate weechat instances, use the `--dir` command: - - weechat --dir /tmp/a/ - weechat --dir /tmp/b/ - -Then in both clients, you must set the option to connect to temporary servers: - - /set irc.look.temporary_servers on - -Finally you can attach to the local IRCd instances: - - /connect localhost/6667 - /connect localhost/6668 - -And send messages to yourself. - -### Running a Fullnode - -See the script `script/run_node.sh` for an example of how to deploy a full node which -does seed session synchronization, and accepts both inbound and outbound -connections. diff --git a/bin/ircd2/script/run_node.sh b/bin/ircd2/script/run_node.sh deleted file mode 100755 index 2c994367d..000000000 --- a/bin/ircd2/script/run_node.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -# Change this value to the hostname of the seed server -SEED_HOSTNAME=XXX - -LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}') -SEED_IP=$(getent hosts $SEED_HOSTNAME.local | awk '{print $1}' | head -n 1) -cargo run -- --accept 0.0.0.0:11004 --slots 5 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667 - diff --git a/bin/ircd2/script/run_seed_node.sh b/bin/ircd2/script/run_seed_node.sh deleted file mode 100755 index c782a5bf1..000000000 --- a/bin/ircd2/script/run_seed_node.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}') -cargo run -- --accept 0.0.0.0:9999 --irc 127.0.0.1:6688 - diff --git a/bin/ircd2/script/tmux_session.sh b/bin/ircd2/script/tmux_session.sh deleted file mode 100755 index 73681ed5e..000000000 --- a/bin/ircd2/script/tmux_session.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -tmux new-session -d 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:9999 --irc 127.0.0.1:6688' -tmux split-window -v 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:11004 --external 127.0.0.1:11004 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6667' -tmux split-window -h 'LOG_TARGETS=net ../../../target/release/ircd -vv --slots 5 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6668' -tmux attach diff --git a/bin/ircd2/src/main.rs b/bin/ircd2/src/main.rs deleted file mode 100644 index 83472280f..000000000 --- a/bin/ircd2/src/main.rs +++ /dev/null @@ -1,195 +0,0 @@ -use async_std::{ - net::{TcpListener, TcpStream}, - sync::{Arc, Mutex}, -}; -use std::net::SocketAddr; - -use async_channel::Receiver; -use async_executor::Executor; -use easy_parallel::Parallel; -use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; -use log::{debug, error, info, warn}; -use simplelog::{ColorChoice, TermLogger, TerminalMode}; -use smol::future; -use structopt_toml::StructOptToml; - -use darkfi::{ - async_daemonize, - raft::Raft, - rpc::rpcserver::{listen_and_serve, RpcServerConfig}, - util::{ - cli::{log_config, spawn_config}, - path::{expand_path, get_config_path}, - }, - Error, Result, -}; - -pub mod privmsg; -pub mod rpc; -pub mod server; -pub mod settings; - -use crate::{ - privmsg::Privmsg, - rpc::JsonRpcInterface, - server::IrcServerConnection, - settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, -}; - -pub type SeenMsgId = Arc>>; - -async fn process_user_input( - mut line: String, - peer_addr: SocketAddr, - conn: &mut IrcServerConnection, - sender: async_channel::Sender, - seen_msg_id: SeenMsgId, -) -> Result<()> { - if line.is_empty() { - warn!("Received empty line from {}. Closing connection.", peer_addr); - return Err(Error::ChannelStopped) - } - - assert!(&line[(line.len() - 2)..] == "\r\n"); - // Remove CRLF - line.pop(); - line.pop(); - - debug!("Received '{}' from {}", line, peer_addr); - - if let Err(e) = conn.update(line, sender, seen_msg_id).await { - warn!("Connection error: {} for {}", e, peer_addr); - return Err(Error::ChannelStopped) - } - - Ok(()) -} - -async fn process( - receiver: Receiver, - stream: TcpStream, - peer_addr: SocketAddr, - sender: async_channel::Sender, - seen_msg_id: SeenMsgId, -) -> Result<()> { - let (reader, writer) = stream.split(); - - let mut reader = BufReader::new(reader); - let mut conn = IrcServerConnection::new(writer); - - loop { - let mut line = String::new(); - futures::select! { - privmsg = receiver.recv().fuse() => { - let msg = privmsg?; - - let mut smi = seen_msg_id.lock().await; - if smi.contains(&msg.id) { - continue - } - - smi.push(msg.id); - drop(smi); - - debug!("ABOUT TO SEND: {:?}", msg); - let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", - msg.nickname, - msg.channel, - msg.message, - ); - - conn.reply(&irc_msg).await?; - } - - err = reader.read_line(&mut line).fuse() => { - if let Err(e) = err { - warn!("Read line error. Closing stream for {}: {}", peer_addr, e); - return Ok(()) - } - - process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?; - } - }; - } -} - -async_daemonize!(realmain); -async fn realmain(settings: Args, executor: Arc>) -> Result<()> { - let listener = TcpListener::bind(settings.irc_listen).await?; - let local_addr = listener.local_addr()?; - info!("Listening on {}", local_addr); - - let datastore_path = expand_path(&settings.datastore)?; - - let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![])); - - let net_settings = settings.net; - // - //Raft - // - let datastore_raft = datastore_path.join("ircd.db"); - - let mut raft = Raft::::new(net_settings.inbound, datastore_raft)?; - - let raft_sender = raft.get_broadcast(); - let commits = raft.get_commits(); - - // - // RPC interface - // - let rpc_config = RpcServerConfig { - socket_addr: settings.rpc_listen, - // TODO: Use net/transport: - use_tls: false, - identity_path: Default::default(), - identity_pass: Default::default(), - }; - let executor_cloned = executor.clone(); - let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listen }); - let rpc_task = executor.spawn(async move { - listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await - }); - - // - // IRC instance - // - let executor_cloned = executor.clone(); - let irc_task: smol::Task> = executor.spawn(async move { - loop { - let (stream, peer_addr) = match listener.accept().await { - Ok((s, a)) => (s, a), - Err(e) => { - error!("Failed listening for connections: {}", e); - return Err(Error::ServiceStopped) - } - }; - - info!("Accepted client: {}", peer_addr); - - executor_cloned - .spawn(process( - commits.clone(), - stream, - peer_addr, - raft_sender.clone(), - seen_msg_id.clone(), - )) - .detach(); - } - }); - - let (signal, shutdown) = async_channel::bounded::<()>(1); - ctrlc_async::set_async_handler(async move { - warn!(target: "ircd", "ircd start Exit Signal"); - // cleaning up tasks running in the background - signal.send(()).await.unwrap(); - rpc_task.cancel().await; - irc_task.cancel().await; - }) - .unwrap(); - - // blocking - raft.start(net_settings.into(), executor.clone(), shutdown.clone()).await?; - - Ok(()) -} diff --git a/bin/ircd2/src/rpc.rs b/bin/ircd2/src/rpc.rs deleted file mode 100644 index a1fe2dd13..000000000 --- a/bin/ircd2/src/rpc.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::{net::SocketAddr, sync::Arc}; - -use async_executor::Executor; -use async_trait::async_trait; -use log::debug; -use serde_json::{json, Value}; - -use darkfi::rpc::{ - jsonrpc, - jsonrpc::{ErrorCode, JsonRequest, JsonResult}, - rpcserver::RequestHandler, -}; - -pub struct JsonRpcInterface { - pub addr: SocketAddr, -} - -#[async_trait] -impl RequestHandler for JsonRpcInterface { - async fn handle_request(&self, req: JsonRequest, _executor: Arc>) -> JsonResult { - if req.params.as_array().is_none() { - return jsonrpc::error(ErrorCode::InvalidRequest, None, req.id).into() - } - - debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap()); - - match req.method.as_str() { - Some("ping") => self.pong(req.id, req.params).await, - //Some("get_info") => self.get_info(req.id, req.params).await, - Some(_) | None => jsonrpc::error(ErrorCode::MethodNotFound, None, req.id).into(), - } - } -} - -impl JsonRpcInterface { - // RPCAPI: - // Replies to a ping method. - // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} - // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} - async fn pong(&self, id: Value, _params: Value) -> JsonResult { - jsonrpc::response(json!("pong"), id).into() - } - - // TODO - // RPCAPI: - // Retrieves P2P network information. - // --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42} - // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} - //async fn get_info(&self, id: Value, _params: Value) -> JsonResult { - // let resp = self.p2p.get_info().await; - // jsonrpc::response(resp, id).into() - //} -} diff --git a/bin/ircd2/src/server.rs b/bin/ircd2/src/server.rs deleted file mode 100644 index 736ce44b2..000000000 --- a/bin/ircd2/src/server.rs +++ /dev/null @@ -1,140 +0,0 @@ -use async_std::net::TcpStream; -use futures::{io::WriteHalf, AsyncWriteExt}; -use log::{debug, info, warn}; -use rand::{rngs::OsRng, RngCore}; - -use darkfi::{Error, Result}; - -use crate::privmsg::Privmsg; - -pub struct IrcServerConnection { - write_stream: WriteHalf, - is_nick_init: bool, - is_user_init: bool, - is_registered: bool, - nickname: String, - _channels: Vec, -} - -impl IrcServerConnection { - pub fn new(write_stream: WriteHalf) -> Self { - Self { - write_stream, - is_nick_init: false, - is_user_init: false, - is_registered: false, - nickname: "".to_string(), - _channels: vec![], - } - } - - pub async fn update( - &mut self, - line: String, - sender: async_channel::Sender, - seen_msg_id: crate::SeenMsgId, - ) -> Result<()> { - let mut tokens = line.split_ascii_whitespace(); - // Commands can begin with :garbage but we will reject clients doing - // that for now to keep the protocol simple and focused. - let command = tokens.next().ok_or(Error::MalformedPacket)?; - - debug!("Received command: {}", command); - - match command { - "USER" => { - // We can stuff any extra things like public keys in here. - // Ignore it for now. - self.is_user_init = true; - } - "NICK" => { - let nickname = tokens.next().ok_or(Error::MalformedPacket)?; - self.is_nick_init = true; - let old_nick = std::mem::replace(&mut self.nickname, nickname.to_string()); - - let nick_reply = format!(":{}!anon@dark.fi NICK {}\r\n", old_nick, self.nickname); - self.reply(&nick_reply).await?; - } - "JOIN" => { - // Ignore since channels are all autojoin - // let channel = tokens.next().ok_or(Error::MalformedPacket)?; - // self.channels.push(channel.to_string()); - - // let join_reply = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, channel); - // self.reply(&join_reply).await?; - - // self.write_stream.write_all(b":f00!f00@127.0.01 PRIVMSG #dev :y0\r\n").await?; - } - "PING" => { - let line_clone = line.clone(); - let split_line: Vec<&str> = line_clone.split_whitespace().collect(); - if split_line.len() > 1 && split_line[0] == "PING" { - let pong = format!("PONG {}\r\n", split_line[1]); - self.reply(&pong).await?; - } - } - "PRIVMSG" => { - let channel = tokens.next().ok_or(Error::MalformedPacket)?; - let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?; - - if substr_idx >= line.len() { - return Err(Error::MalformedPacket) - } - - let message = &line[substr_idx + 1..]; - info!("Message {}: {}", channel, message); - - let random_id = OsRng.next_u32(); - - let protocol_msg = Privmsg { - id: random_id, - nickname: self.nickname.clone(), - channel: channel.to_string(), - message: message.to_string(), - }; - - let mut smi = seen_msg_id.lock().await; - smi.push(random_id); - drop(smi); - - sender.send(protocol_msg).await?; - } - "QUIT" => { - // Close the connection - return Err(Error::ServiceStopped) - } - _ => { - warn!("Unimplemented `{}` command", command); - } - } - - if !self.is_registered && self.is_nick_init && self.is_user_init { - debug!("Initializing peer connection"); - let register_reply = format!(":darkfi 001 {} :Let there be dark\r\n", self.nickname); - self.reply(®ister_reply).await?; - self.is_registered = true; - - // Auto-joins - macro_rules! autojoin { - ($channel:expr,$topic:expr) => { - let j = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, $channel); - let t = format!(":DarkFi TOPIC {} :{}\r\n", $channel, $topic); - self.reply(&j).await?; - self.reply(&t).await?; - }; - } - - autojoin!("#dev", "Development of DarkFi"); - autojoin!("#markets", "Markets, trading, DeFi, algo, biz, finance, and economics"); - autojoin!("#memes", "Memetic engineering"); - } - - Ok(()) - } - - pub async fn reply(&mut self, message: &str) -> Result<()> { - self.write_stream.write_all(message.as_bytes()).await?; - debug!("Sent {}", message); - Ok(()) - } -} From 341635f12b88c452530769e31398e20ce1bda7a1 Mon Sep 17 00:00:00 2001 From: aggstam Date: Wed, 27 Apr 2022 21:08:06 +0300 Subject: [PATCH 5/5] consensus: minor fixes --- script/consensus_simulation.sh | 9 ++- script/research/nodes-tool/src/main.rs | 8 ++- src/consensus/state.rs | 67 +++++++++++-------- src/consensus/task/proposal.rs | 92 +++++++++++++------------- 4 files changed, 97 insertions(+), 79 deletions(-) diff --git a/script/consensus_simulation.sh b/script/consensus_simulation.sh index 06efd5579..84b1e01c8 100755 --- a/script/consensus_simulation.sh +++ b/script/consensus_simulation.sh @@ -14,7 +14,8 @@ make BINS=darkfid2 pids=() # Starting node 0 (seed) in background -./darkfid2 \ +LOG_TARGETS="!sled,!net" ./darkfid2 \ + -v \ --consensus \ --consensus-p2p-accept 127.0.0.1:6000 \ --consensus-p2p-external 127.0.0.1:6000 \ @@ -34,7 +35,8 @@ sleep 20 bound=$(($nodes-2)) for i in $(eval echo "{1..$bound}") do - ./darkfid2 \ + LOG_TARGETS="!sled,!net" ./darkfid2 \ + -v \ --consensus \ --consensus-seed 127.0.0.1:6000 \ --sync-seed 127.0.0.1:6020 \ @@ -64,7 +66,8 @@ function ctrl_c() { bound=$(($nodes-1)) # Starting last node -./darkfid2 \ +LOG_TARGETS="!sled,!net" ./darkfid2 \ + -v \ --consensus \ --consensus-seed 127.0.0.1:6000 \ --sync-seed 127.0.0.1:6020 \ diff --git a/script/research/nodes-tool/src/main.rs b/script/research/nodes-tool/src/main.rs index da0cc5abf..8634562e2 100644 --- a/script/research/nodes-tool/src/main.rs +++ b/script/research/nodes-tool/src/main.rs @@ -1,3 +1,4 @@ +use async_std::sync::Arc; use std::{fs::File, io::Write}; use darkfi::{ @@ -357,13 +358,13 @@ async fn main() -> Result<()> { let nodes = 4; let genesis_ts = Timestamp(1648383795); let genesis_data = *TESTNET_GENESIS_HASH_BYTES; + let pass = "changeme"; for i in 0..nodes { // Initialize or load wallet let path = format!("../../../tmp/node{:?}/wallet.db", i); - let pass = "changeme"; let wallet = init_wallet(&path, &pass).await?; - Client::new(wallet.clone()).await?; let address = wallet.get_default_address().await?; + let client = Arc::new(Client::new(wallet).await?); // Initialize or load sled database let path = format!("../../../tmp/node{:?}/blockchain/testnet", i); @@ -372,7 +373,8 @@ async fn main() -> Result<()> { // Data export println!("Exporting data for node{:?} - {:?}", i, address.to_string()); - let state = ValidatorState::new(&sled_db, address, genesis_ts, genesis_data)?; + let state = + ValidatorState::new(&sled_db, genesis_ts, genesis_data, client, vec![], vec![]).await?; let info = StateInfo::new(&*state.read().await); let info_string = format!("{:#?}", info); let path = format!("node{:?}_testnet_db", i); diff --git a/src/consensus/state.rs b/src/consensus/state.rs index a87a943eb..962f83e5e 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -118,8 +118,8 @@ pub struct ValidatorState { pub client: Arc, /// Pending transactions pub unconfirmed_txs: Vec, - /// Participation flag - pub participating: bool, + /// Participating start epoch + pub participating: Option, } impl ValidatorState { @@ -137,7 +137,7 @@ impl ValidatorState { let consensus = ConsensusState::new(genesis_ts, genesis_data)?; let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?; let unconfirmed_txs = vec![]; - let participating = false; + let participating = None; let address = client.wallet.get_default_address().await?; let state_machine = Arc::new(Mutex::new(State { @@ -219,6 +219,12 @@ impl ValidatorState { Duration::new(diff.num_seconds().try_into().unwrap(), 0) } + /// Set participating epoch to next. + pub fn set_participating(&mut self) -> Result<()> { + self.participating = Some(self.current_epoch() + 1); + Ok(()) + } + /// Find epoch leader, using a simple hash method. /// Leader calculation is based on how many nodes are participating /// in the network. @@ -326,8 +332,13 @@ impl ValidatorState { /// and proceed with voting on it. pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result> { // Node hasn't started participating - if !self.participating { - return Ok(None) + match self.participating { + Some(start) => { + if self.current_epoch() < start { + return Ok(None) + } + } + None => return Ok(None), } // Node refreshes participants records @@ -480,9 +491,15 @@ impl ValidatorState { /// Finally, we check if the notarization of the proposal can finalize /// parent proposals in its chain. pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option>)> { + let current_epoch = self.current_epoch(); // Node hasn't started participating - if !self.participating { - return Ok((false, None)) + match self.participating { + Some(start) => { + if current_epoch < start { + return Ok((false, None)) + } + } + None => return Ok((false, None)), } let mut encoded_proposal = vec![]; @@ -508,10 +525,23 @@ impl ValidatorState { // Checking that the voter can actually vote. match self.consensus.participants.get(&vote.address) { Some(participant) => { - if self.current_epoch() <= participant.joined { + let mut participant = participant.clone(); + if current_epoch <= participant.joined { warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string()); return Ok((false, None)) } + + // Updating participant vote + match participant.voted { + Some(voted) => { + if vote.sl > voted { + participant.voted = Some(vote.sl); + } + } + None => participant.voted = Some(vote.sl), + } + + self.consensus.participants.insert(participant.address, participant); } None => { warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string()); @@ -559,22 +589,6 @@ impl ValidatorState { } } - // Updating participant vote - let mut participant = match self.consensus.participants.get(&vote.address) { - Some(p) => p.clone(), - None => Participant::new(vote.address, vote.sl), - }; - - match participant.voted { - Some(voted) => { - if vote.sl > voted { - participant.voted = Some(vote.sl); - } - } - None => participant.voted = Some(vote.sl), - } - - self.consensus.participants.insert(participant.address, participant); Ok((true, Some(to_broadcast))) } @@ -762,8 +776,9 @@ impl ValidatorState { } } None => { - if participant.joined < previous_epoch && - participant.joined < previous_from_last_epoch + if (previous_epoch == last_epoch && participant.joined < previous_epoch) || + (previous_epoch != last_epoch && + participant.joined < previous_from_last_epoch) { warn!( "refresh_participants(): Inactive participant: {:?} (joined {:?}, voted {:?})", diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 98f5c5e08..963fa3969 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -46,15 +46,17 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { Err(e) => error!("Failed broadcasting consensus participation: {}", e), } - // After initialization node waits for next epoch to start participating - let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); - info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); - sleep(seconds_until_next_epoch).await; - - // Note modifies its participating flag to true. - state.write().await.participating = true; + // Node modifies its participating epoch to next. + match state.write().await.set_participating() { + Ok(()) => info!("Node will start participating at next epoch!"), + Err(e) => error!("Failed to set participation epoch: {}", e), + } loop { + let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); + info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); + sleep(seconds_until_next_epoch).await; + // Node refreshes participants records match state.write().await.refresh_participants() { Ok(()) => debug!("Participants refreshed successfully."), @@ -73,57 +75,53 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { Ok(proposal) => { if proposal.is_none() { info!(target: "consensus", "Node is not the epoch leader. Sleeping till next epoch..."); - } else { - // Leader creates a vote for the proposal and broadcasts them both - let proposal = proposal.unwrap(); - info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal); - let vote = state.write().await.receive_proposal(&proposal); - match vote { - Ok(v) => { - if v.is_none() { - debug!("proposal_task(): Node did not vote for the proposed block"); - } else { - let vote = v.unwrap(); - let result = state.write().await.receive_vote(&vote); - match result { - Ok(_) => info!(target: "consensus", "Vote saved successfully."), - Err(e) => { - error!(target: "consensus", "Vote save failed: {}", e) - } + continue + } + // Leader creates a vote for the proposal and broadcasts them both + let proposal = proposal.unwrap(); + info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal); + let vote = state.write().await.receive_proposal(&proposal); + match vote { + Ok(v) => { + if v.is_none() { + debug!("proposal_task(): Node did not vote for the proposed block"); + } else { + let vote = v.unwrap(); + let result = state.write().await.receive_vote(&vote); + match result { + Ok(_) => info!(target: "consensus", "Vote saved successfully."), + Err(e) => { + error!(target: "consensus", "Vote save failed: {}", e) } + } - // Broadcast block - let result = p2p.broadcast(proposal).await; - match result { - Ok(()) => { - info!(target: "consensus", "Proposal broadcasted successfully.") - } - Err(e) => { - error!(target: "consensus", "Failed broadcasting proposal: {}", e) - } + // Broadcast block + let result = p2p.broadcast(proposal).await; + match result { + Ok(()) => { + info!(target: "consensus", "Proposal broadcasted successfully.") } + Err(e) => { + error!(target: "consensus", "Failed broadcasting proposal: {}", e) + } + } - // Broadcast leader vote - let result = p2p.broadcast(vote).await; - match result { - Ok(()) => { - info!(target: "consensus", "Leader vote broadcasted successfully.") - } - Err(e) => { - error!(target: "consensus", "Failed broadcasting leader vote: {}", e) - } + // Broadcast leader vote + let result = p2p.broadcast(vote).await; + match result { + Ok(()) => { + info!(target: "consensus", "Leader vote broadcasted successfully.") + } + Err(e) => { + error!(target: "consensus", "Failed broadcasting leader vote: {}", e) } } } - Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e), } + Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e), } } Err(e) => error!("Block proposal failed: {}", e), } - - let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); - info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); - sleep(seconds_until_next_epoch).await; } }