Merge branch 'master' of github.com:darkrenaissance/darkfi

This commit is contained in:
lunar-mining
2022-04-28 09:19:03 +02:00
26 changed files with 411 additions and 923 deletions

20
Cargo.lock generated
View File

@@ -2154,26 +2154,6 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "ircd"
version = "0.3.0"
dependencies = [
"async-channel",
"async-executor",
"async-std",
"async-trait",
"clap 3.1.12",
"darkfi",
"easy-parallel",
"futures",
"fxhash",
"log",
"rand",
"serde_json",
"simplelog",
"smol",
]
[[package]]
name = "ircd2"
version = "0.3.0"

View File

@@ -26,7 +26,6 @@ members = [
"bin/faucetd",
#"bin/gatewayd",
"bin/ircd",
"bin/ircd2",
"bin/dnetview",
"bin/daod",
"bin/dao-cli",
@@ -90,7 +89,7 @@ itertools = {version = "0.10.3", optional = true}
darkfi-derive = {path = "src/util/derive", optional = true}
darkfi-derive-internal = {path = "src/util/derive-internal", optional = true}
chrono = {version = "0.4.19", optional = true}
regex = {version = "1.1.9", optional = true}
regex = {version = "1.5.5", optional = true}
# Misc
termion = {version = "1.5.6", optional = true}

View File

@@ -1,5 +1,5 @@
[package]
name = "ircd"
name = "ircd2"
version = "0.3.0"
homepage = "https://dark.fi"
description = "P2P IRC daemon"
@@ -9,7 +9,7 @@ license = "AGPL-3.0-only"
edition = "2021"
[dependencies]
darkfi = {path = "../../", features = ["net", "rpc"]}
darkfi = {path = "../../", features = ["net", "rpc", "raft"]}
# Async
smol = "1.2.5"
futures = "0.3.21"
@@ -23,10 +23,14 @@ easy-parallel = "3.2.0"
rand = "0.8.5"
# Misc
clap = {version = "3.1.12", features = ["derive"]}
clap = {version = "3.1.8", features = ["derive"]}
log = "0.4.16"
simplelog = "0.12.0"
simplelog = "0.12.0-alpha1"
fxhash = "0.2.1"
ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]}
# Encoding and parsing
serde_json = "1.0.79"
serde = {version = "1.0.136", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.0"

View File

@@ -1,72 +1,49 @@
use std::{net::SocketAddr, sync::Arc};
use async_std::{
net::{TcpListener, TcpStream},
sync::{Arc, Mutex},
};
use std::net::SocketAddr;
use async_channel::Receiver;
use async_executor::Executor;
use async_std::net::{TcpListener, TcpStream};
use clap::Parser;
use easy_parallel::Parallel;
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use log::{debug, error, info, warn};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
cli_desc, net,
async_daemonize,
raft::Raft,
rpc::rpcserver::{listen_and_serve, RpcServerConfig},
util::cli::log_config,
util::{
cli::{log_config, spawn_config},
path::{expand_path, get_config_path},
},
Error, Result,
};
pub(crate) mod proto;
pub(crate) mod rpc;
pub(crate) mod server;
pub mod privmsg;
pub mod rpc;
pub mod server;
pub mod settings;
use crate::{
proto::privmsg::{Privmsg, ProtocolPrivmsg, SeenPrivmsgIds, SeenPrivmsgIdsPtr},
privmsg::Privmsg,
rpc::JsonRpcInterface,
server::IrcServerConnection,
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
};
#[derive(Parser)]
#[clap(name = "ircd", about = cli_desc!(), version)]
struct Args {
/// Accept address
#[clap(short, long)]
accept: Option<SocketAddr>,
/// Seed node (repeatable)
#[clap(short, long)]
seed: Vec<SocketAddr>,
/// Manual connection (repeatable)
#[clap(short, long)]
connect: Vec<SocketAddr>,
/// Connection slots
#[clap(long, default_value_t = 0)]
slots: u32,
/// External address
#[clap(short, long)]
external: Option<SocketAddr>,
/// IRC listen address
#[clap(short = 'r', long, default_value = "127.0.0.1:6667")]
irc: SocketAddr,
/// RPC listen address
#[clap(long, default_value = "127.0.0.1:8000")]
rpc: SocketAddr,
/// Verbosity level
#[clap(short, parse(from_occurrences))]
verbose: u8,
}
pub type SeenMsgId = Arc<Mutex<Vec<u32>>>;
async fn process_user_input(
mut line: String,
peer_addr: SocketAddr,
conn: &mut IrcServerConnection,
p2p: net::P2pPtr,
sender: async_channel::Sender<Privmsg>,
seen_msg_id: SeenMsgId,
) -> Result<()> {
if line.is_empty() {
warn!("Received empty line from {}. Closing connection.", peer_addr);
@@ -80,7 +57,7 @@ async fn process_user_input(
debug!("Received '{}' from {}", line, peer_addr);
if let Err(e) = conn.update(line, p2p.clone()).await {
if let Err(e) = conn.update(line, sender, seen_msg_id).await {
warn!("Connection error: {} for {}", e, peer_addr);
return Err(Error::ChannelStopped)
}
@@ -89,28 +66,37 @@ async fn process_user_input(
}
async fn process(
receiver: Receiver<Arc<Privmsg>>,
receiver: Receiver<Privmsg>,
stream: TcpStream,
peer_addr: SocketAddr,
p2p: net::P2pPtr,
seen_privmsg_ids: SeenPrivmsgIdsPtr,
sender: async_channel::Sender<Privmsg>,
seen_msg_id: SeenMsgId,
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(writer, seen_privmsg_ids);
let mut conn = IrcServerConnection::new(writer);
loop {
let mut line = String::new();
futures::select! {
privmsg = receiver.recv().fuse() => {
let msg = privmsg.expect("internal message queue error");
let msg = privmsg?;
let mut smi = seen_msg_id.lock().await;
if smi.contains(&msg.id) {
continue
}
smi.push(msg.id);
drop(smi);
debug!("ABOUT TO SEND: {:?}", msg);
let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n",
msg.nickname,
msg.channel,
msg.message,
);
msg.nickname,
msg.channel,
msg.message,
);
conn.reply(&irc_msg).await?;
}
@@ -121,121 +107,89 @@ async fn process(
return Ok(())
}
process_user_input(line, peer_addr, &mut conn, p2p.clone()).await?;
process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?;
}
};
}
}
async fn start(executor: Arc<Executor<'_>>, args: Args, net_settings: net::Settings) -> Result<()> {
let listener = TcpListener::bind(args.irc).await?;
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let listener = TcpListener::bind(settings.irc_listen).await?;
let local_addr = listener.local_addr()?;
info!("Listening on {}", local_addr);
let datastore_path = expand_path(&settings.datastore)?;
let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![]));
let net_settings = settings.net;
//
//Raft
//
let datastore_raft = datastore_path.join("ircd.db");
let mut raft = Raft::<Privmsg>::new(net_settings.inbound, datastore_raft)?;
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
//
// RPC interface
//
let rpc_config = RpcServerConfig {
socket_addr: args.rpc,
socket_addr: settings.rpc_listen,
// TODO: Use net/transport:
use_tls: false,
identity_path: Default::default(),
identity_pass: Default::default(),
};
//
// Privmsg protocol
//
let seen_privmsg_ids = SeenPrivmsgIds::new();
let seen_privmsg_ids_clone = seen_privmsg_ids.clone();
let (sender, receiver) = async_channel::unbounded();
let sender_clone = sender.clone();
let p2p = net::P2p::new(net_settings).await;
let registry = p2p.protocol_registry();
registry
.register(!net::SESSION_SEED, move |channel, p2p| {
let sender = sender_clone.clone();
let seen_privmsg_ids = seen_privmsg_ids_clone.clone();
async move { ProtocolPrivmsg::init(channel, sender, seen_privmsg_ids, p2p).await }
})
.await;
//
// P2P network main instance
//
p2p.clone().start(executor.clone()).await?;
let executor_clone = executor.clone();
let p2p_clone = p2p.clone();
executor
.spawn(async move {
if let Err(e) = p2p_clone.run(executor_clone).await {
error!("P2P run failed: {}", e);
}
})
.detach();
//
// RPC interface
let executor_clone = executor.clone();
let rpc_interface = Arc::new(JsonRpcInterface { p2p: p2p.clone(), addr: args.rpc });
executor
.spawn(async move { listen_and_serve(rpc_config, rpc_interface, executor_clone.clone()).await })
.detach();
let executor_cloned = executor.clone();
let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listen });
let rpc_task = executor.spawn(async move {
listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await
});
//
// IRC instance
//
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed listening for connections: {}", e);
return Err(Error::ServiceStopped)
}
};
let executor_cloned = executor.clone();
let irc_task: smol::Task<Result<()>> = executor.spawn(async move {
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed listening for connections: {}", e);
return Err(Error::ServiceStopped)
}
};
info!("Accepted client: {}", peer_addr);
info!("Accepted client: {}", peer_addr);
let p2p_clone = p2p.clone();
executor
.spawn(process(
receiver.clone(),
stream,
peer_addr,
p2p_clone,
seen_privmsg_ids.clone(),
))
.detach();
}
}
fn main() -> Result<()> {
let args = Args::parse();
let (lvl, conf) = log_config(args.verbose.into())?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;
let net_settings = net::Settings {
inbound: args.accept,
outbound_connections: args.slots,
external_addr: args.external,
peers: args.connect.clone(),
seeds: args.seed.clone(),
..Default::default()
};
let ex = Arc::new(Executor::new());
let ex_clone = ex.clone();
let (signal, shutdown) = async_channel::unbounded::<()>();
let (_, result) = Parallel::new()
.each(0..4, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
start(ex_clone.clone(), args, net_settings).await?;
drop(signal);
Ok::<(), darkfi::Error>(())
})
});
result
executor_cloned
.spawn(process(
commits.clone(),
stream,
peer_addr,
raft_sender.clone(),
seen_msg_id.clone(),
))
.detach();
}
});
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
rpc_task.cancel().await;
irc_task.cancel().await;
})
.unwrap();
// blocking
raft.start(net_settings.into(), executor.clone(), shutdown.clone()).await?;
Ok(())
}

View File

@@ -1 +0,0 @@
pub(crate) mod privmsg;

View File

@@ -1,121 +0,0 @@
use std::sync::Arc;
use async_channel::Sender;
use async_executor::Executor;
use async_std::sync::Mutex;
use async_trait::async_trait;
use fxhash::FxHashSet;
use log::debug;
use darkfi::{
net,
util::serial::{SerialDecodable, SerialEncodable},
Result,
};
pub type PrivmsgId = u32;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Privmsg {
pub id: PrivmsgId,
pub nickname: String,
pub channel: String,
pub message: String,
}
impl net::Message for Privmsg {
fn name() -> &'static str {
"privmsg"
}
}
pub struct SeenPrivmsgIds {
ids: Mutex<FxHashSet<PrivmsgId>>,
}
pub type SeenPrivmsgIdsPtr = Arc<SeenPrivmsgIds>;
impl SeenPrivmsgIds {
pub fn new() -> Arc<Self> {
Arc::new(Self { ids: Mutex::new(FxHashSet::default()) })
}
pub async fn add_seen(&self, id: u32) {
self.ids.lock().await.insert(id);
}
pub async fn is_seen(&self, id: u32) -> bool {
self.ids.lock().await.contains(&id)
}
}
pub struct ProtocolPrivmsg {
notify_queue_sender: Sender<Arc<Privmsg>>,
privmsg_sub: net::MessageSubscription<Privmsg>,
jobsman: net::ProtocolJobsManagerPtr,
seen_ids: SeenPrivmsgIdsPtr,
p2p: net::P2pPtr,
}
#[async_trait]
impl net::ProtocolBase for ProtocolPrivmsg {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivMsg::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_privmsg(), executor.clone()).await;
debug!(target: "ircd", "ProtocolPrivmsg::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolPrivMsg"
}
}
impl ProtocolPrivmsg {
pub async fn init(
channel: net::ChannelPtr,
notify_queue_sender: Sender<Arc<Privmsg>>,
seen_ids: SeenPrivmsgIdsPtr,
p2p: net::P2pPtr,
) -> net::ProtocolBasePtr {
let message_subsystem = channel.get_message_subsystem();
message_subsystem.add_dispatch::<Privmsg>().await;
let sub = channel.subscribe_msg::<Privmsg>().await.expect("Missing Privmsg dispatcher!");
Arc::new(Self {
notify_queue_sender,
privmsg_sub: sub,
jobsman: net::ProtocolJobsManager::new("PrivmsgProtocol", channel),
seen_ids,
p2p,
})
}
async fn handle_receive_privmsg(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_privmsg() [START]");
loop {
let privmsg = self.privmsg_sub.receive().await?;
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_privmsg() received {:?}", privmsg);
// Do we already have this message?
if self.seen_ids.is_seen(privmsg.id).await {
continue
}
self.seen_ids.add_seen(privmsg.id).await;
// If not, then broadcast to network.
let privmsg_copy = (*privmsg).clone();
self.p2p.broadcast(privmsg_copy).await?;
self.notify_queue_sender.send(privmsg).await.expect("notify_queue_sender send failed!");
}
}
}

View File

@@ -5,17 +5,13 @@ use async_trait::async_trait;
use log::debug;
use serde_json::{json, Value};
use darkfi::{
net,
rpc::{
jsonrpc,
jsonrpc::{ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
},
use darkfi::rpc::{
jsonrpc,
jsonrpc::{ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
};
pub struct JsonRpcInterface {
pub p2p: net::P2pPtr,
pub addr: SocketAddr,
}
@@ -30,7 +26,7 @@ impl RequestHandler for JsonRpcInterface {
match req.method.as_str() {
Some("ping") => self.pong(req.id, req.params).await,
Some("get_info") => self.get_info(req.id, req.params).await,
//Some("get_info") => self.get_info(req.id, req.params).await,
Some(_) | None => jsonrpc::error(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
@@ -45,12 +41,13 @@ impl JsonRpcInterface {
jsonrpc::response(json!("pong"), id).into()
}
// TODO
// RPCAPI:
// Retrieves P2P network information.
// --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
let resp = self.p2p.get_info().await;
jsonrpc::response(resp, id).into()
}
//async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
// let resp = self.p2p.get_info().await;
// jsonrpc::response(resp, id).into()
//}
}

View File

@@ -3,13 +3,12 @@ use futures::{io::WriteHalf, AsyncWriteExt};
use log::{debug, info, warn};
use rand::{rngs::OsRng, RngCore};
use darkfi::{net, Error, Result};
use darkfi::{Error, Result};
use crate::proto::privmsg::{Privmsg, SeenPrivmsgIdsPtr};
use crate::privmsg::Privmsg;
pub struct IrcServerConnection {
write_stream: WriteHalf<TcpStream>,
seen_privmsg_ids: SeenPrivmsgIdsPtr,
is_nick_init: bool,
is_user_init: bool,
is_registered: bool,
@@ -18,10 +17,9 @@ pub struct IrcServerConnection {
}
impl IrcServerConnection {
pub fn new(write_stream: WriteHalf<TcpStream>, seen_ids: SeenPrivmsgIdsPtr) -> Self {
pub fn new(write_stream: WriteHalf<TcpStream>) -> Self {
Self {
write_stream,
seen_privmsg_ids: seen_ids,
is_nick_init: false,
is_user_init: false,
is_registered: false,
@@ -30,7 +28,12 @@ impl IrcServerConnection {
}
}
pub async fn update(&mut self, line: String, p2p: net::P2pPtr) -> Result<()> {
pub async fn update(
&mut self,
line: String,
sender: async_channel::Sender<Privmsg>,
seen_msg_id: crate::SeenMsgId,
) -> Result<()> {
let mut tokens = line.split_ascii_whitespace();
// Commands can begin with :garbage but we will reject clients doing
// that for now to keep the protocol simple and focused.
@@ -82,7 +85,6 @@ impl IrcServerConnection {
info!("Message {}: {}", channel, message);
let random_id = OsRng.next_u32();
self.seen_privmsg_ids.add_seen(random_id).await;
let protocol_msg = Privmsg {
id: random_id,
@@ -91,7 +93,11 @@ impl IrcServerConnection {
message: message.to_string(),
};
p2p.broadcast(protocol_msg).await?;
let mut smi = seen_msg_id.lock().await;
smi.push(random_id);
drop(smi);
sender.send(protocol_msg).await?;
}
"QUIT" => {
// Close the connection

View File

@@ -1,36 +0,0 @@
[package]
name = "ircd2"
version = "0.3.0"
homepage = "https://dark.fi"
description = "P2P IRC daemon"
authors = ["darkfi <dev@dark.fi>"]
repository = "https://github.com/darkrenaissance/darkfi"
license = "AGPL-3.0-only"
edition = "2021"
[dependencies]
darkfi = {path = "../../", features = ["net", "rpc", "raft"]}
# Async
smol = "1.2.5"
futures = "0.3.21"
async-std = "1.11.0"
async-trait = "0.1.53"
async-channel = "1.6.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
# Crypto
rand = "0.8.5"
# Misc
clap = {version = "3.1.8", features = ["derive"]}
log = "0.4.16"
simplelog = "0.12.0-alpha1"
fxhash = "0.2.1"
ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]}
# Encoding and parsing
serde_json = "1.0.79"
serde = {version = "1.0.136", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.0"

View File

@@ -1,65 +0,0 @@
# p2p IRC
This is a local daemon which can be attached to with any IRC frontend.
It uses the darkfi p2p engine to synchronize chats between hosts.
## Local Deployment
### Seed Node
First you must run a seed node. The seed node is a static host which nodes can
connect to when they first connect to the network. The `seed_session` simply
connects to a seed node and runs `protocol_seed`, which requests a list of
addresses from the seed node and disconnects straight after receiving them.
LOG_TARGETS=net cargo run -- -vv --accept 0.0.0.0:9999 --irc 127.0.0.1:6688
Note that the above command doesn't specify an external address since the
seed node shouldn't be advertised in the list of connectable nodes. The seed
node does not participate as a normal node in the p2p network. It simply allows
new nodes to discover other nodes in the network during the bootstrapping phase.
### Inbound Node
This is a node accepting inbound connections on the network but which is not
making any outbound connections.
The external address is important and must be correct.
LOG_TARGETS=net cargo run -- -vv --accept 0.0.0.0:11004 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667
### Outbound Node
This is a node which has 8 outbound connection slots and no inbound connections.
This means the node has 8 slots which will actively search for unique nodes to
connect to in the p2p network.
LOG_TARGETS=net cargo run -- -vv --slots 5 --seeds $SEED_IP:9999 --irc 127.0.0.1:6668
### Attaching the IRC Frontend
Assuming you have run the above 3 commands to create a small model testnet,
and both inbound and outbound nodes above are connected, you can test them
out using weechat.
To create separate weechat instances, use the `--dir` command:
weechat --dir /tmp/a/
weechat --dir /tmp/b/
Then in both clients, you must set the option to connect to temporary servers:
/set irc.look.temporary_servers on
Finally you can attach to the local IRCd instances:
/connect localhost/6667
/connect localhost/6668
And send messages to yourself.
### Running a Fullnode
See the script `script/run_node.sh` for an example of how to deploy a full node which
does seed session synchronization, and accepts both inbound and outbound
connections.

View File

@@ -1,9 +0,0 @@
#!/bin/bash
# Change this value to the hostname of the seed server
SEED_HOSTNAME=XXX
LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}')
SEED_IP=$(getent hosts $SEED_HOSTNAME.local | awk '{print $1}' | head -n 1)
cargo run -- --accept 0.0.0.0:11004 --slots 5 --external $LOCAL_IP:11004 --seeds $SEED_IP:9999 --irc 127.0.0.1:6667

View File

@@ -1,4 +0,0 @@
#!/bin/bash
LOCAL_IP=$(ip route get 8.8.8.8 | head -1 | awk '{print $7}')
cargo run -- --accept 0.0.0.0:9999 --irc 127.0.0.1:6688

View File

@@ -1,6 +0,0 @@
#!/bin/sh
tmux new-session -d 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:9999 --irc 127.0.0.1:6688'
tmux split-window -v 'LOG_TARGETS=net ../../../target/release/ircd -vv --accept 127.0.0.1:11004 --external 127.0.0.1:11004 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6667'
tmux split-window -h 'LOG_TARGETS=net ../../../target/release/ircd -vv --slots 5 --seeds 127.0.0.1:9999 --irc 127.0.0.1:6668'
tmux attach

View File

@@ -1,171 +0,0 @@
use async_std::net::{TcpListener, TcpStream};
use std::{net::SocketAddr, sync::Arc};
use async_channel::Receiver;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use log::{debug, error, info, warn};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use darkfi::{
async_daemonize,
raft::Raft,
rpc::rpcserver::{listen_and_serve, RpcServerConfig},
util::{
cli::{log_config, spawn_config},
path::{expand_path, get_config_path},
},
Error, Result,
};
pub mod privmsg;
pub mod rpc;
pub mod server;
pub mod settings;
use crate::{
privmsg::Privmsg,
rpc::JsonRpcInterface,
server::IrcServerConnection,
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
};
async fn process_user_input(
mut line: String,
peer_addr: SocketAddr,
conn: &mut IrcServerConnection,
sender: async_channel::Sender<Privmsg>,
) -> Result<()> {
if line.is_empty() {
warn!("Received empty line from {}. Closing connection.", peer_addr);
return Err(Error::ChannelStopped)
}
assert!(&line[(line.len() - 2)..] == "\r\n");
// Remove CRLF
line.pop();
line.pop();
debug!("Received '{}' from {}", line, peer_addr);
if let Err(e) = conn.update(line, sender).await {
warn!("Connection error: {} for {}", e, peer_addr);
return Err(Error::ChannelStopped)
}
Ok(())
}
async fn process(
receiver: Receiver<Privmsg>,
stream: TcpStream,
peer_addr: SocketAddr,
sender: async_channel::Sender<Privmsg>,
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(writer);
loop {
let mut line = String::new();
futures::select! {
privmsg = receiver.recv().fuse() => {
let msg = privmsg?;
debug!("ABOUT TO SEND: {:?}", msg);
let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n",
msg.nickname,
msg.channel,
msg.message,
);
conn.reply(&irc_msg).await?;
}
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
process_user_input(line, peer_addr, &mut conn, sender.clone()).await?;
}
};
}
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let listener = TcpListener::bind(settings.irc_listen).await?;
let local_addr = listener.local_addr()?;
info!("Listening on {}", local_addr);
let datastore_path = expand_path(&settings.datastore)?;
let net_settings = settings.net;
//
//Raft
//
let datastore_raft = datastore_path.join("ircd.db");
let mut raft = Raft::<Privmsg>::new(net_settings.inbound, datastore_raft)?;
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
//
// RPC interface
//
let rpc_config = RpcServerConfig {
socket_addr: settings.rpc_listen,
// TODO: Use net/transport:
use_tls: false,
identity_path: Default::default(),
identity_pass: Default::default(),
};
let executor_cloned = executor.clone();
let rpc_interface = Arc::new(JsonRpcInterface { addr: settings.rpc_listen });
let rpc_task = executor.spawn(async move {
listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await
});
//
// IRC instance
//
let executor_cloned = executor.clone();
let irc_task: smol::Task<Result<()>> = executor.spawn(async move {
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed listening for connections: {}", e);
return Err(Error::ServiceStopped)
}
};
info!("Accepted client: {}", peer_addr);
executor_cloned
.spawn(process(commits.clone(), stream, peer_addr, raft_sender.clone()))
.detach();
}
});
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
signal.send(()).await.unwrap();
rpc_task.cancel().await;
irc_task.cancel().await;
})
.unwrap();
// blocking
raft.start(net_settings.into(), executor.clone(), shutdown.clone()).await?;
Ok(())
}

View File

@@ -1,53 +0,0 @@
use std::{net::SocketAddr, sync::Arc};
use async_executor::Executor;
use async_trait::async_trait;
use log::debug;
use serde_json::{json, Value};
use darkfi::rpc::{
jsonrpc,
jsonrpc::{ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
};
pub struct JsonRpcInterface {
pub addr: SocketAddr,
}
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest, _executor: Arc<Executor<'_>>) -> JsonResult {
if req.params.as_array().is_none() {
return jsonrpc::error(ErrorCode::InvalidRequest, None, req.id).into()
}
debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap());
match req.method.as_str() {
Some("ping") => self.pong(req.id, req.params).await,
//Some("get_info") => self.get_info(req.id, req.params).await,
Some(_) | None => jsonrpc::error(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
}
impl JsonRpcInterface {
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
async fn pong(&self, id: Value, _params: Value) -> JsonResult {
jsonrpc::response(json!("pong"), id).into()
}
// TODO
// RPCAPI:
// Retrieves P2P network information.
// --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
//async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
// let resp = self.p2p.get_info().await;
// jsonrpc::response(resp, id).into()
//}
}

View File

@@ -1,135 +0,0 @@
use async_std::net::TcpStream;
use futures::{io::WriteHalf, AsyncWriteExt};
use log::{debug, info, warn};
use rand::{rngs::OsRng, RngCore};
use darkfi::{Error, Result};
use crate::privmsg::Privmsg;
pub struct IrcServerConnection {
write_stream: WriteHalf<TcpStream>,
is_nick_init: bool,
is_user_init: bool,
is_registered: bool,
nickname: String,
_channels: Vec<String>,
}
impl IrcServerConnection {
pub fn new(write_stream: WriteHalf<TcpStream>) -> Self {
Self {
write_stream,
is_nick_init: false,
is_user_init: false,
is_registered: false,
nickname: "".to_string(),
_channels: vec![],
}
}
pub async fn update(
&mut self,
line: String,
sender: async_channel::Sender<Privmsg>,
) -> Result<()> {
let mut tokens = line.split_ascii_whitespace();
// Commands can begin with :garbage but we will reject clients doing
// that for now to keep the protocol simple and focused.
let command = tokens.next().ok_or(Error::MalformedPacket)?;
debug!("Received command: {}", command);
match command {
"USER" => {
// We can stuff any extra things like public keys in here.
// Ignore it for now.
self.is_user_init = true;
}
"NICK" => {
let nickname = tokens.next().ok_or(Error::MalformedPacket)?;
self.is_nick_init = true;
let old_nick = std::mem::replace(&mut self.nickname, nickname.to_string());
let nick_reply = format!(":{}!anon@dark.fi NICK {}\r\n", old_nick, self.nickname);
self.reply(&nick_reply).await?;
}
"JOIN" => {
// Ignore since channels are all autojoin
// let channel = tokens.next().ok_or(Error::MalformedPacket)?;
// self.channels.push(channel.to_string());
// let join_reply = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, channel);
// self.reply(&join_reply).await?;
// self.write_stream.write_all(b":f00!f00@127.0.01 PRIVMSG #dev :y0\r\n").await?;
}
"PING" => {
let line_clone = line.clone();
let split_line: Vec<&str> = line_clone.split_whitespace().collect();
if split_line.len() > 1 && split_line[0] == "PING" {
let pong = format!("PONG {}\r\n", split_line[1]);
self.reply(&pong).await?;
}
}
"PRIVMSG" => {
let channel = tokens.next().ok_or(Error::MalformedPacket)?;
let substr_idx = line.find(':').ok_or(Error::MalformedPacket)?;
if substr_idx >= line.len() {
return Err(Error::MalformedPacket)
}
let message = &line[substr_idx + 1..];
info!("Message {}: {}", channel, message);
let random_id = OsRng.next_u32();
let protocol_msg = Privmsg {
id: random_id,
nickname: self.nickname.clone(),
channel: channel.to_string(),
message: message.to_string(),
};
sender.send(protocol_msg).await?;
}
"QUIT" => {
// Close the connection
return Err(Error::ServiceStopped)
}
_ => {
warn!("Unimplemented `{}` command", command);
}
}
if !self.is_registered && self.is_nick_init && self.is_user_init {
debug!("Initializing peer connection");
let register_reply = format!(":darkfi 001 {} :Let there be dark\r\n", self.nickname);
self.reply(&register_reply).await?;
self.is_registered = true;
// Auto-joins
macro_rules! autojoin {
($channel:expr,$topic:expr) => {
let j = format!(":{}!anon@dark.fi JOIN {}\r\n", self.nickname, $channel);
let t = format!(":DarkFi TOPIC {} :{}\r\n", $channel, $topic);
self.reply(&j).await?;
self.reply(&t).await?;
};
}
autojoin!("#dev", "Development of DarkFi");
autojoin!("#markets", "Markets, trading, DeFi, algo, biz, finance, and economics");
autojoin!("#memes", "Memetic engineering");
}
Ok(())
}
pub async fn reply(&mut self, message: &str) -> Result<()> {
self.write_stream.write_all(message.as_bytes()).await?;
debug!("Sent {}", message);
Ok(())
}
}

View File

@@ -14,7 +14,8 @@ make BINS=darkfid2
pids=()
# Starting node 0 (seed) in background
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-p2p-accept 127.0.0.1:6000 \
--consensus-p2p-external 127.0.0.1:6000 \
@@ -34,7 +35,8 @@ sleep 20
bound=$(($nodes-2))
for i in $(eval echo "{1..$bound}")
do
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \
@@ -64,7 +66,8 @@ function ctrl_c() {
bound=$(($nodes-1))
# Starting last node
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \

View File

@@ -1,3 +1,4 @@
use async_std::sync::Arc;
use std::{fs::File, io::Write};
use darkfi::{
@@ -357,13 +358,13 @@ async fn main() -> Result<()> {
let nodes = 4;
let genesis_ts = Timestamp(1648383795);
let genesis_data = *TESTNET_GENESIS_HASH_BYTES;
let pass = "changeme";
for i in 0..nodes {
// Initialize or load wallet
let path = format!("../../../tmp/node{:?}/wallet.db", i);
let pass = "changeme";
let wallet = init_wallet(&path, &pass).await?;
Client::new(wallet.clone()).await?;
let address = wallet.get_default_address().await?;
let client = Arc::new(Client::new(wallet).await?);
// Initialize or load sled database
let path = format!("../../../tmp/node{:?}/blockchain/testnet", i);
@@ -372,7 +373,8 @@ async fn main() -> Result<()> {
// Data export
println!("Exporting data for node{:?} - {:?}", i, address.to_string());
let state = ValidatorState::new(&sled_db, address, genesis_ts, genesis_data)?;
let state =
ValidatorState::new(&sled_db, genesis_ts, genesis_data, client, vec![], vec![]).await?;
let info = StateInfo::new(&*state.read().await);
let info_string = format!("{:#?}", info);
let path = format!("node{:?}_testnet_db", i);

View File

@@ -118,8 +118,8 @@ pub struct ValidatorState {
pub client: Arc<Client>,
/// Pending transactions
pub unconfirmed_txs: Vec<Tx>,
/// Participation flag
pub participating: bool,
/// Participating start epoch
pub participating: Option<u64>,
}
impl ValidatorState {
@@ -137,7 +137,7 @@ impl ValidatorState {
let consensus = ConsensusState::new(genesis_ts, genesis_data)?;
let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?;
let unconfirmed_txs = vec![];
let participating = false;
let participating = None;
let address = client.wallet.get_default_address().await?;
let state_machine = Arc::new(Mutex::new(State {
@@ -219,6 +219,12 @@ impl ValidatorState {
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Set participating epoch to next.
pub fn set_participating(&mut self) -> Result<()> {
self.participating = Some(self.current_epoch() + 1);
Ok(())
}
/// Find epoch leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating
/// in the network.
@@ -326,8 +332,13 @@ impl ValidatorState {
/// and proceed with voting on it.
pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
// Node hasn't started participating
if !self.participating {
return Ok(None)
match self.participating {
Some(start) => {
if self.current_epoch() < start {
return Ok(None)
}
}
None => return Ok(None),
}
// Node refreshes participants records
@@ -480,9 +491,15 @@ impl ValidatorState {
/// Finally, we check if the notarization of the proposal can finalize
/// parent proposals in its chain.
pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option<Vec<BlockInfo>>)> {
let current_epoch = self.current_epoch();
// Node hasn't started participating
if !self.participating {
return Ok((false, None))
match self.participating {
Some(start) => {
if current_epoch < start {
return Ok((false, None))
}
}
None => return Ok((false, None)),
}
let mut encoded_proposal = vec![];
@@ -508,10 +525,23 @@ impl ValidatorState {
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.address) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
let mut participant = participant.clone();
if current_epoch <= participant.joined {
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string());
return Ok((false, None))
}
// Updating participant vote
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.address, participant);
}
None => {
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string());
@@ -559,22 +589,6 @@ impl ValidatorState {
}
}
// Updating participant vote
let mut participant = match self.consensus.participants.get(&vote.address) {
Some(p) => p.clone(),
None => Participant::new(vote.address, vote.sl),
};
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.address, participant);
Ok((true, Some(to_broadcast)))
}
@@ -762,8 +776,9 @@ impl ValidatorState {
}
}
None => {
if participant.joined < previous_epoch &&
participant.joined < previous_from_last_epoch
if (previous_epoch == last_epoch && participant.joined < previous_epoch) ||
(previous_epoch != last_epoch &&
participant.joined < previous_from_last_epoch)
{
warn!(
"refresh_participants(): Inactive participant: {:?} (joined {:?}, voted {:?})",

View File

@@ -46,15 +46,17 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
Err(e) => error!("Failed broadcasting consensus participation: {}", e),
}
// After initialization node waits for next epoch to start participating
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
// Note modifies its participating flag to true.
state.write().await.participating = true;
// Node modifies its participating epoch to next.
match state.write().await.set_participating() {
Ok(()) => info!("Node will start participating at next epoch!"),
Err(e) => error!("Failed to set participation epoch: {}", e),
}
loop {
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
// Node refreshes participants records
match state.write().await.refresh_participants() {
Ok(()) => debug!("Participants refreshed successfully."),
@@ -73,57 +75,53 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
Ok(proposal) => {
if proposal.is_none() {
info!(target: "consensus", "Node is not the epoch leader. Sleeping till next epoch...");
} else {
// Leader creates a vote for the proposal and broadcasts them both
let proposal = proposal.unwrap();
info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal);
let vote = state.write().await.receive_proposal(&proposal);
match vote {
Ok(v) => {
if v.is_none() {
debug!("proposal_task(): Node did not vote for the proposed block");
} else {
let vote = v.unwrap();
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!(target: "consensus", "Vote saved successfully."),
Err(e) => {
error!(target: "consensus", "Vote save failed: {}", e)
}
continue
}
// Leader creates a vote for the proposal and broadcasts them both
let proposal = proposal.unwrap();
info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal);
let vote = state.write().await.receive_proposal(&proposal);
match vote {
Ok(v) => {
if v.is_none() {
debug!("proposal_task(): Node did not vote for the proposed block");
} else {
let vote = v.unwrap();
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!(target: "consensus", "Vote saved successfully."),
Err(e) => {
error!(target: "consensus", "Vote save failed: {}", e)
}
}
// Broadcast block
let result = p2p.broadcast(proposal).await;
match result {
Ok(()) => {
info!(target: "consensus", "Proposal broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting proposal: {}", e)
}
// Broadcast block
let result = p2p.broadcast(proposal).await;
match result {
Ok(()) => {
info!(target: "consensus", "Proposal broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting proposal: {}", e)
}
}
// Broadcast leader vote
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => {
info!(target: "consensus", "Leader vote broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting leader vote: {}", e)
}
// Broadcast leader vote
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => {
info!(target: "consensus", "Leader vote broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting leader vote: {}", e)
}
}
}
Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e),
}
Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e),
}
}
Err(e) => error!("Block proposal failed: {}", e),
}
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
}
}

View File

@@ -20,6 +20,20 @@ pub enum Role {
Leader,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct SyncRequest {
logs_len: u64,
last_term: u64,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct SyncResponse {
logs: Logs,
commit_length: u64,
leader_id: NodeId,
wipe: bool,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct VoteRequest {
node_id: NodeId,
@@ -139,7 +153,7 @@ impl MapLength {
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct NetMsg {
id: u32,
id: u64,
recipient_id: Option<NodeId>,
method: NetMsgMethod,
payload: Vec<u8>,
@@ -153,6 +167,9 @@ pub enum NetMsgMethod {
VoteResponse = 2,
VoteRequest = 3,
BroadcastRequest = 4,
// this only used for listener node
SyncRequest = 5,
SyncResponse = 6,
}
impl Encodable for NetMsgMethod {
@@ -163,6 +180,8 @@ impl Encodable for NetMsgMethod {
Self::VoteResponse => 2,
Self::VoteRequest => 3,
Self::BroadcastRequest => 4,
Self::SyncRequest => 5,
Self::SyncResponse => 6,
};
(len as u8).encode(s)
}
@@ -176,7 +195,9 @@ impl Decodable for NetMsgMethod {
1 => Self::LogRequest,
2 => Self::VoteResponse,
3 => Self::VoteRequest,
_ => Self::BroadcastRequest,
4 => Self::BroadcastRequest,
5 => Self::SyncRequest,
_ => Self::SyncResponse,
})
}
}

View File

@@ -14,7 +14,7 @@ pub struct ProtocolRaft {
notify_queue_sender: async_channel::Sender<NetMsg>,
msg_sub: net::MessageSubscription<NetMsg>,
p2p: net::P2pPtr,
msgs: Arc<Mutex<Vec<u32>>>,
msgs: Arc<Mutex<Vec<u64>>>,
}
impl ProtocolRaft {
@@ -23,6 +23,7 @@ impl ProtocolRaft {
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<NetMsg>,
p2p: net::P2pPtr,
msgs: Arc<Mutex<Vec<u64>>>,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<NetMsg>().await;
@@ -35,7 +36,7 @@ impl ProtocolRaft {
msg_sub,
jobsman: net::ProtocolJobsManager::new("ProtocolRaft", channel),
p2p,
msgs: Arc::new(Mutex::new(vec![])),
msgs,
})
}
@@ -75,7 +76,9 @@ impl ProtocolRaft {
// then the local node will only handle the msg if its method
// is LogRequest
(None, Some(_)) => {
if msg.method != NetMsgMethod::LogRequest {
if msg.method != NetMsgMethod::LogRequest &&
msg.method != NetMsgMethod::SyncResponse
{
continue
}
}

View File

@@ -17,7 +17,7 @@ use crate::{
use super::{
BroadcastMsgRequest, DataStore, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg,
NetMsgMethod, NodeId, ProtocolRaft, Role, VoteRequest, VoteResponse,
NetMsgMethod, NodeId, ProtocolRaft, Role, SyncRequest, SyncResponse, VoteRequest, VoteResponse,
};
const HEARTBEATTIMEOUT: u64 = 100;
@@ -124,12 +124,16 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let registry = p2p.protocol_registry();
let seen_net_msg = Arc::new(Mutex::new(vec![]));
let self_id = self.id.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let self_id = self_id.clone();
let sender = p2p_snd.clone();
async move { ProtocolRaft::init(self_id, channel, sender, p2p).await }
let seen_net_msg_cloned = seen_net_msg.clone();
async move {
ProtocolRaft::init(self_id, channel, sender, p2p, seen_net_msg_cloned).await
}
})
.await;
@@ -179,16 +183,57 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
});
if self.id.is_none() {
let last_term =
if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 };
let sync_request = SyncRequest { logs_len: self.logs.len(), last_term };
info!("send sync request");
self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?;
loop {
select! {
msg = receive_queues.recv().fuse() => {
let msg = msg?;
if msg.method == NetMsgMethod::SyncResponse {
info!("receive sync response");
let sr: SyncResponse = deserialize(&msg.payload)?;
if sr.wipe {
self.set_commit_length(&0)?;
self.push_logs(&sr.logs)?;
} else {
for log in sr.logs.0.iter() {
self.push_log(log)?;
}
}
if !self.logs.is_empty() {
self.set_current_term(&self.logs.0.last().unwrap().term.clone())?;
}
if self.commit_length > sr.commit_length {
self.set_commit_length(&0)?;
}
for i in self.commit_length..sr.commit_length {
self.push_commit(&self.logs.get(i)?.msg).await?;
}
self.set_commit_length(&sr.commit_length)?;
self.current_leader = Some(sr.leader_id);
break
}},
_ = stop_signal.recv().fuse() => break,
}
}
}
let mut rng = rand::thread_rng();
let broadcast_msg_rv = self.broadcast_msg.1.clone();
// send data form datastore through broadcast_commits channel
let commits = self.datastore.commits.get_all()?;
for commit in commits {
self.broadcast_commits.0.send(commit).await?;
}
loop {
let timeout: Duration = if self.role == Role::Leader {
Duration::from_millis(HEARTBEATTIMEOUT)
@@ -200,7 +245,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
select! {
m = receive_queues.recv().fuse() => result = self.handle_method(m?).await,
m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?).await,
m = broadcast_msg_rv.recv().fuse() => result = self.broadcast_msg(&m?,None).await,
_ = task::sleep(timeout).fuse() => {
result = if self.role == Role::Leader {
self.send_heartbeat().await
@@ -233,29 +278,30 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.broadcast_msg.0.clone()
}
async fn broadcast_msg(&mut self, msg: &T) -> Result<()> {
async fn broadcast_msg(&mut self, msg: &T, msg_id: Option<u64>) -> Result<()> {
if self.role == Role::Leader {
let msg = serialize(msg);
let log = Log { msg, term: self.current_term };
self.push_log(&log)?;
self.acked_length.insert(&self.id.clone().unwrap(), self.logs.len());
let nodes = self.nodes.lock().await.clone();
for node in nodes.iter() {
self.update_logs(node.0).await?;
}
} else {
let b_msg = BroadcastMsgRequest(serialize(msg));
self.send(
self.current_leader.clone(),
&serialize(&b_msg),
NetMsgMethod::BroadcastRequest,
msg_id,
)
.await?;
}
info!(target: "raft", "has id: {} {:?} broadcast a msg", self.id.is_some(), self.role);
info!(target: "raft",
"Node has id: {}, Node status: {:?}, broadcast a msg id: {:?} ",
self.id.is_some(),
self.role, msg_id
);
Ok(())
}
@@ -280,8 +326,14 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
NetMsgMethod::BroadcastRequest => {
let vr: BroadcastMsgRequest = deserialize(&msg.payload)?;
let d: T = deserialize(&vr.0)?;
self.broadcast_msg(&d).await?;
self.broadcast_msg(&d, Some(msg.id)).await?;
}
NetMsgMethod::SyncRequest => {
info!("receive sync request");
let sr: SyncRequest = deserialize(&msg.payload)?;
self.receive_sync_request(&sr, msg.id).await?;
}
NetMsgMethod::SyncResponse => {}
}
debug!(
@@ -291,13 +343,64 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
);
Ok(())
}
async fn receive_sync_request(&self, sr: &SyncRequest, msg_id: u64) -> Result<()> {
if self.id.is_none() {
return Ok(())
}
if self.role == Role::Leader {
let mut wipe = false;
let logs = if sr.logs_len == 0 {
self.logs.clone()
} else if self.logs.len() >= sr.logs_len &&
self.logs.get(sr.logs_len - 1)?.term == sr.last_term
{
self.logs.slice_from(sr.logs_len).unwrap()
} else {
wipe = true;
self.logs.clone()
};
let sync_response = SyncResponse {
logs,
commit_length: self.commit_length,
leader_id: self.id.clone().unwrap(),
wipe,
};
info!("send sync response");
for _ in 0..2 {
self.send(
self.current_leader.clone(),
&serialize(&sync_response),
NetMsgMethod::SyncResponse,
None,
)
.await?;
}
} else {
self.send(
self.current_leader.clone(),
&serialize(sr),
NetMsgMethod::SyncRequest,
Some(msg_id),
)
.await?;
}
Ok(())
}
async fn send(
&self,
recipient_id: Option<NodeId>,
payload: &[u8],
method: NetMsgMethod,
msg_id: Option<u64>,
) -> Result<()> {
let random_id = OsRng.next_u32();
let random_id = if msg_id.is_some() { msg_id.unwrap() } else { OsRng.next_u64() };
debug!(
target: "raft",
@@ -313,8 +416,10 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn send_heartbeat(&self) -> Result<()> {
if self.role == Role::Leader {
let nodes = self.nodes.lock().await.clone();
for node in nodes.iter() {
let nodes = self.nodes.lock().await;
let nodes_cloned = nodes.clone();
drop(nodes);
for node in nodes_cloned.iter() {
self.update_logs(node.0).await?;
}
}
@@ -344,7 +449,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
};
let payload = serialize(&request);
self.send(None, &payload, NetMsgMethod::VoteRequest).await
self.send(None, &payload, NetMsgMethod::VoteRequest, None).await
}
async fn receive_vote_request(&mut self, vr: VoteRequest) -> Result<()> {
@@ -383,7 +488,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
let payload = serialize(&response);
self.send(Some(vr.node_id), &payload, NetMsgMethod::VoteResponse).await
self.send(Some(vr.node_id), &payload, NetMsgMethod::VoteResponse, None).await
}
async fn receive_vote_response(&mut self, vr: VoteResponse) -> Result<()> {
@@ -391,16 +496,17 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.votes_received.push(vr.node_id);
let nodes = self.nodes.lock().await;
if self.votes_received.len() >= ((nodes.len() + 1) / 2) {
let nodes_cloned = nodes.clone();
drop(nodes);
if self.votes_received.len() >= ((nodes_cloned.len() + 1) / 2) {
self.role = Role::Leader;
self.current_leader = Some(self.id.clone().unwrap());
for node in nodes.iter() {
for node in nodes_cloned.iter() {
self.sent_length.insert(node.0, self.logs.len());
self.acked_length.insert(node.0, 0);
self.update_logs(node.0).await?;
}
}
drop(nodes);
} else if vr.current_term > self.current_term {
self.set_current_term(&vr.current_term)?;
self.role = Role::Follower;
@@ -441,7 +547,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
};
let payload = serialize(&request);
self.send(Some(node_id.clone()), &payload, NetMsgMethod::LogRequest).await
self.send(Some(node_id.clone()), &payload, NetMsgMethod::LogRequest, None).await
}
async fn receive_log_request(&mut self, lr: LogRequest) -> Result<()> {
@@ -455,7 +561,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.current_leader = Some(lr.leader_id.clone());
}
let ok = (self.logs.len() >= lr.prefix_len) &&
let mut ok = (self.logs.len() >= lr.prefix_len) &&
(lr.prefix_len == 0 || self.logs.get(lr.prefix_len - 1)?.term == lr.prefix_term);
let mut ack = 0;
@@ -463,6 +569,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
if lr.current_term == self.current_term && ok {
self.append_log(lr.prefix_len, lr.commit_length, &lr.suffix).await?;
ack = lr.prefix_len + lr.suffix.len();
} else {
ok = false;
}
if self.id.is_none() {
@@ -477,7 +585,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
};
let payload = serialize(&response);
self.send(Some(lr.leader_id.clone()), &payload, NetMsgMethod::LogResponse).await
self.send(Some(lr.leader_id.clone()), &payload, NetMsgMethod::LogResponse, None).await
}
async fn receive_log_response(&mut self, lr: LogResponse) -> Result<()> {
@@ -488,7 +596,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.commit_log().await?;
} else if self.sent_length.get(&lr.node_id)? > 0 {
self.sent_length.insert(&lr.node_id, self.sent_length.get(&lr.node_id)? - 1);
self.update_logs(&lr.node_id).await?;
}
} else if lr.current_term > self.current_term {
self.set_current_term(&lr.current_term)?;
@@ -523,20 +630,20 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let nodes = nodes_ptr.clone();
drop(nodes_ptr);
let ready: Vec<u64> = self
.logs
.0
.iter()
.enumerate()
.filter(|(i, _)| self.acks(nodes.clone(), *i as u64).len() >= min_acks)
.map(|(i, _)| i as u64)
.collect();
let mut ready: Vec<u64> = vec![];
for len in 1..(self.logs.len() + 1) {
if self.acks(nodes.clone(), len).len() >= min_acks {
ready.push(len);
}
}
if ready.is_empty() {
return Ok(())
}
let max_ready = *ready.iter().max().unwrap();
if max_ready > self.commit_length && self.logs.get(max_ready - 1)?.term == self.current_term
{
for i in self.commit_length..max_ready {
@@ -563,7 +670,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
if prefix_len + suffix.len() > self.logs.len() {
for i in (self.logs.len() - prefix_len)..(suffix.len() - 1) {
for i in (self.logs.len() - prefix_len)..suffix.len() {
self.push_log(&suffix.get(i)?)?;
}
}