Merge branch 'master' into dao_demo

This commit is contained in:
Dastan-glitch
2022-09-16 23:54:06 +03:00
63 changed files with 1099 additions and 488 deletions

10
Cargo.lock generated
View File

@@ -1296,7 +1296,7 @@ dependencies = [
[[package]]
name = "darkwiki"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-channel",
"async-std",
@@ -1315,7 +1315,7 @@ dependencies = [
[[package]]
name = "darkwikid"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-channel",
"async-executor",
@@ -2320,7 +2320,7 @@ dependencies = [
[[package]]
name = "ircd"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-channel",
"async-executor",
@@ -3986,7 +3986,7 @@ checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1"
[[package]]
name = "tau"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-std",
"chrono",
@@ -4006,7 +4006,7 @@ dependencies = [
[[package]]
name = "taud"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"async-channel",
"async-executor",

View File

@@ -20,8 +20,6 @@ name = "darkfi"
members = [
"bin/zkas",
#"bin/cashierd",
"bin/darkwiki",
"bin/darkwikid",
"bin/darkfid",
"bin/darkotc",
"bin/drk",
@@ -33,6 +31,8 @@ members = [
"bin/dao/dao-cli",
"bin/tau/taud",
"bin/tau/tau-cli",
"bin/darkwiki/darkwikid",
"bin/darkwiki/darkwiki-cli",
"bin/vanityaddr",
"bin/lilith",

View File

@@ -31,7 +31,7 @@ PROOFS_BIN = $(PROOFS:=.bin)
all: zkas $(PROOFS_BIN) $(BINS)
zkas:
zkas: $(BINDEPS)
RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) build --all-features --release --package $@
cp -f target/release/$@ $@

View File

@@ -15,7 +15,8 @@ contract "DaoExec" {
# DAO params
Base dao_proposer_limit,
Base dao_quorum,
Base dao_approval_ratio,
Base dao_approval_ratio_quot,
Base dao_approval_ratio_base,
Base gov_token_id,
Base dao_public_x,
Base dao_public_y,
@@ -45,13 +46,12 @@ circuit "DaoExec" {
dao_bulla = poseidon_hash(
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
gov_token_id,
dao_public_x,
dao_public_y,
dao_bulla_blind,
# @tmp-workaround
dao_bulla_blind,
);
# Proposal bulla is valid means DAO bulla is also valid
# because of dao-propose-main.zk, already checks that when
@@ -134,25 +134,23 @@ circuit "DaoExec" {
constrain_instance(user_spend_hook);
constrain_instance(user_data);
# total_votes >= dao_quorum
# TODO: waiting on this opcode in zkas
#
# greater_than_or_equal(total_votes, dao_quorum)
#
# Check that dao_quorum is less than or equal to all_votes_value
one = witness_base(1);
all_votes_value_1 = base_add(all_votes_value, one);
less_than(dao_quorum, all_votes_value_1);
# win_votes / total_votes >= approval_ratio_quot / approval_ratio_base
# approval_ratio_quot / approval_ratio_base <= yes_votes / all_votes
#
# The above is also equivalent to this:
#
# win_votes * approval_ratio_base >= total_votes * approval_ratio_quot
#
# TODO: waiting on this opcode in zkas
#
# lhs = base_mul(win_votes, approval_ratio_base);
# rhs = base_mul(total_votes, approval_ratio_quot);
# greater_than_or_equal(lhs, rhs);
#
# all_votes * approval_ratio_quot <= yes_votes * approval_ratio_base
rhs = base_mul(all_votes_value, dao_approval_ratio_quot);
lhs = base_mul(yes_votes_value, dao_approval_ratio_base);
lhs_1 = base_add(lhs, one);
less_than(rhs, lhs_1);
####
# Create coin 0

View File

@@ -4,7 +4,8 @@ constant "DaoMint" {
contract "DaoMint" {
Base dao_proposer_limit,
Base dao_quorum,
Base dao_approval_ratio,
Base dao_approval_ratio_quot,
Base dao_approval_ratio_base,
Base gdrk_token_id,
Base dao_public_x,
Base dao_public_y,
@@ -19,13 +20,12 @@ circuit "DaoMint" {
bulla = poseidon_hash(
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
gdrk_token_id,
dao_public_x,
dao_public_y,
dao_bulla_blind,
# @tmp-workaround
dao_bulla_blind,
);
constrain_instance(bulla);
}

View File

@@ -22,7 +22,8 @@ contract "DaoProposeMain" {
# DAO params
Base dao_proposer_limit,
Base dao_quorum,
Base dao_approval_ratio,
Base dao_approval_ratio_quot,
Base dao_approval_ratio_base,
Base gov_token_id,
Base dao_public_x,
Base dao_public_y,
@@ -39,13 +40,12 @@ circuit "DaoProposeMain" {
dao_bulla = poseidon_hash(
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
gov_token_id,
dao_public_x,
dao_public_y,
dao_bulla_blind,
# @tmp-workaround
dao_bulla_blind,
);
dao_root = merkle_root(dao_leaf_pos, dao_path, dao_bulla);
constrain_instance(dao_root);
@@ -65,18 +65,14 @@ circuit "DaoProposeMain" {
constrain_instance(proposal_bulla);
# Rangeproof check for proposal amount
# TODO: waiting on this opcode in zkas
#
# greater_than_zero(amount)
#
# Use this temporary workaround. ec_mul_short() does an internal rangeproof
rangeproof = ec_mul_short(proposal_amount, VALUE_COMMIT_VALUE);
zero = witness_base(0);
less_than(zero, proposal_amount);
# This is the main check
# TODO: check total_funds >= proposer_limit
#
# greater_than_or_equal(total_funds, proposer_limit)
#
# We check that dao_proposer_limit <= total_funds
one = witness_base(1);
total_funds_1 = base_add(total_funds, one);
less_than(dao_proposer_limit, total_funds_1);
# Pedersen commitment for coin's value
vcv = ec_mul_short(total_funds, VALUE_COMMIT_VALUE);

View File

@@ -15,7 +15,8 @@ contract "DaoVoteMain" {
# DAO params
Base dao_proposer_limit,
Base dao_quorum,
Base dao_approval_ratio,
Base dao_approval_ratio_quot,
Base dao_approval_ratio_base,
Base gov_token_id,
Base dao_public_x,
Base dao_public_y,
@@ -40,13 +41,12 @@ circuit "DaoVoteMain" {
dao_bulla = poseidon_hash(
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
gov_token_id,
dao_public_x,
dao_public_y,
dao_bulla_blind,
# @tmp-workaround
dao_bulla_blind,
);
# Proposal bulla is valid means DAO bulla is also valid
# because of dao-propose-main.zk, already checks that when
@@ -91,11 +91,8 @@ circuit "DaoVoteMain" {
constrain_instance(all_votes_commit_x);
constrain_instance(all_votes_commit_y);
# This is the main check
# TODO: vote option should be 0 or 1
#
# assert!(vote_option == 0 || vote_option == 1)
#
# Vote option should be 0 or 1
bool_check(vote_option);
}

View File

@@ -49,7 +49,8 @@ impl Builder {
let dao_proposer_limit = pallas::Base::from(self.dao.proposer_limit);
let dao_quorum = pallas::Base::from(self.dao.quorum);
let dao_approval_ratio = pallas::Base::from(self.dao.approval_ratio);
let dao_approval_ratio_quot = pallas::Base::from(self.dao.approval_ratio_quot);
let dao_approval_ratio_base = pallas::Base::from(self.dao.approval_ratio_base);
let dao_pubkey_coords = self.dao.public_key.0.to_affine().coordinates().unwrap();
@@ -61,13 +62,12 @@ impl Builder {
let dao_bulla = poseidon_hash::<8>([
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
self.dao.gov_token_id,
*dao_pubkey_coords.x(),
*dao_pubkey_coords.y(),
self.dao.bulla_blind,
// @tmp-workaround
self.dao.bulla_blind,
]);
let proposal_bulla = poseidon_hash::<8>([
@@ -133,7 +133,8 @@ impl Builder {
// DAO params
Witness::Base(Value::known(dao_proposer_limit)),
Witness::Base(Value::known(dao_quorum)),
Witness::Base(Value::known(dao_approval_ratio)),
Witness::Base(Value::known(dao_approval_ratio_quot)),
Witness::Base(Value::known(dao_approval_ratio_base)),
Witness::Base(Value::known(self.dao.gov_token_id)),
Witness::Base(Value::known(*dao_pubkey_coords.x())),
Witness::Base(Value::known(*dao_pubkey_coords.y())),

View File

@@ -19,7 +19,8 @@ use crate::{
pub struct DaoParams {
pub proposer_limit: u64,
pub quorum: u64,
pub approval_ratio: u64,
pub approval_ratio_quot: u64,
pub approval_ratio_base: u64,
pub gov_token_id: pallas::Base,
pub public_key: PublicKey,
pub bulla_blind: pallas::Base,
@@ -28,7 +29,8 @@ pub struct DaoParams {
pub struct Builder {
pub dao_proposer_limit: u64,
pub dao_quorum: u64,
pub dao_approval_ratio: u64,
pub dao_approval_ratio_quot: u64,
pub dao_approval_ratio_base: u64,
pub gov_token_id: pallas::Base,
pub dao_pubkey: PublicKey,
pub dao_bulla_blind: pallas::Base,
@@ -41,20 +43,20 @@ impl Builder {
// Dao bulla
let dao_proposer_limit = pallas::Base::from(self.dao_proposer_limit);
let dao_quorum = pallas::Base::from(self.dao_quorum);
let dao_approval_ratio = pallas::Base::from(self.dao_approval_ratio);
let dao_approval_ratio_quot = pallas::Base::from(self.dao_approval_ratio_quot);
let dao_approval_ratio_base = pallas::Base::from(self.dao_approval_ratio_base);
let dao_pubkey_coords = self.dao_pubkey.0.to_affine().coordinates().unwrap();
let dao_bulla = poseidon_hash::<8>([
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
self.gov_token_id,
*dao_pubkey_coords.x(),
*dao_pubkey_coords.y(),
self.dao_bulla_blind,
// @tmp-workaround
self.dao_bulla_blind,
]);
let dao_bulla = DaoBulla(dao_bulla);
@@ -69,7 +71,8 @@ impl Builder {
let prover_witnesses = vec![
Witness::Base(Value::known(dao_proposer_limit)),
Witness::Base(Value::known(dao_quorum)),
Witness::Base(Value::known(dao_approval_ratio)),
Witness::Base(Value::known(dao_approval_ratio_quot)),
Witness::Base(Value::known(dao_approval_ratio_base)),
Witness::Base(Value::known(self.gov_token_id)),
Witness::Base(Value::known(*dao_pubkey_coords.x())),
Witness::Base(Value::known(*dao_pubkey_coords.y())),

View File

@@ -175,20 +175,20 @@ impl Builder {
let dao_proposer_limit = pallas::Base::from(self.dao.proposer_limit);
let dao_quorum = pallas::Base::from(self.dao.quorum);
let dao_approval_ratio = pallas::Base::from(self.dao.approval_ratio);
let dao_approval_ratio_quot = pallas::Base::from(self.dao.approval_ratio_quot);
let dao_approval_ratio_base = pallas::Base::from(self.dao.approval_ratio_base);
let dao_pubkey_coords = self.dao.public_key.0.to_affine().coordinates().unwrap();
let dao_bulla = poseidon_hash::<8>([
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
self.dao.gov_token_id,
*dao_pubkey_coords.x(),
*dao_pubkey_coords.y(),
self.dao.bulla_blind,
// @tmp-workaround
self.dao.bulla_blind,
]);
let dao_leaf_position: u64 = self.dao_leaf_position.into();
@@ -228,7 +228,8 @@ impl Builder {
// DAO params
Witness::Base(Value::known(dao_proposer_limit)),
Witness::Base(Value::known(dao_quorum)),
Witness::Base(Value::known(dao_approval_ratio)),
Witness::Base(Value::known(dao_approval_ratio_quot)),
Witness::Base(Value::known(dao_approval_ratio_base)),
Witness::Base(Value::known(self.dao.gov_token_id)),
Witness::Base(Value::known(*dao_pubkey_coords.x())),
Witness::Base(Value::known(*dao_pubkey_coords.y())),

View File

@@ -184,20 +184,20 @@ impl Builder {
let dao_proposer_limit = pallas::Base::from(self.dao.proposer_limit);
let dao_quorum = pallas::Base::from(self.dao.quorum);
let dao_approval_ratio = pallas::Base::from(self.dao.approval_ratio);
let dao_approval_ratio_quot = pallas::Base::from(self.dao.approval_ratio_quot);
let dao_approval_ratio_base = pallas::Base::from(self.dao.approval_ratio_base);
let dao_pubkey_coords = self.dao.public_key.0.to_affine().coordinates().unwrap();
let dao_bulla = poseidon_hash::<8>([
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
self.dao.gov_token_id,
*dao_pubkey_coords.x(),
*dao_pubkey_coords.y(),
self.dao.bulla_blind,
// @tmp-workaround
self.dao.bulla_blind,
]);
let proposal_bulla = poseidon_hash::<8>([
@@ -241,7 +241,8 @@ impl Builder {
// DAO params
Witness::Base(Value::known(dao_proposer_limit)),
Witness::Base(Value::known(dao_quorum)),
Witness::Base(Value::known(dao_approval_ratio)),
Witness::Base(Value::known(dao_approval_ratio_quot)),
Witness::Base(Value::known(dao_approval_ratio_base)),
Witness::Base(Value::known(self.dao.gov_token_id)),
Witness::Base(Value::known(*dao_pubkey_coords.x())),
Witness::Base(Value::known(*dao_pubkey_coords.y())),

View File

@@ -315,7 +315,8 @@ pub async fn demo() -> Result<()> {
// DAO parameters
let dao_proposer_limit = 110;
let dao_quorum = 110;
let dao_approval_ratio = 2;
let dao_approval_ratio_quot = 1;
let dao_approval_ratio_base = 2;
// Lookup table for smart contract states
let mut states = StateRegistry::new();
@@ -399,7 +400,8 @@ pub async fn demo() -> Result<()> {
let builder = dao_contract::mint::wallet::Builder {
dao_proposer_limit,
dao_quorum,
dao_approval_ratio,
dao_approval_ratio_quot,
dao_approval_ratio_base,
gov_token_id: gdrk_token_id,
dao_pubkey: dao_keypair.public,
dao_bulla_blind,
@@ -754,7 +756,8 @@ pub async fn demo() -> Result<()> {
let dao_params = dao_contract::mint::wallet::DaoParams {
proposer_limit: dao_proposer_limit,
quorum: dao_quorum,
approval_ratio: dao_approval_ratio,
approval_ratio_base: dao_approval_ratio_base,
approval_ratio_quot: dao_approval_ratio_quot,
gov_token_id: gdrk_token_id,
public_key: dao_keypair.public,
bulla_blind: dao_bulla_blind,

View File

@@ -1,7 +1,7 @@
[package]
name = "darkwiki"
description = ""
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"
@@ -15,7 +15,7 @@ categories = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
darkfi = {path = "../../", features = ["rpc"]}
darkfi = {path = "../../../", features = ["rpc"]}
# Async

View File

@@ -16,7 +16,7 @@
#inbound = ["tls://127.0.0.1:24331"]
## Connection slots
outbound_connections=5
outbound_connections=8
## P2P external addresses
#external_addr = ["tls://127.0.0.1:24331"]
@@ -25,7 +25,7 @@ outbound_connections=5
#peers = ["tls://127.0.0.1:24331"]
## Seed nodes to connect to
seeds = ["tls://wiki0.dark.fi:24331", "tls://wiki1.dark.fi:24331"]
seeds = ["tls://lilith0.dark.fi:24331", "tls://lilith1.dark.fi:24331"]
# Prefered transports for outbound connections
#transports = ["tls", "tcp"]

View File

@@ -1,7 +1,7 @@
[package]
name = "darkwikid"
description = ""
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"
@@ -13,7 +13,7 @@ categories = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
darkfi = {path = "../../", features = ["raft", "net", "rpc"]}
darkfi = {path = "../../../", features = ["raft", "net", "rpc"]}
# Async

View File

@@ -48,7 +48,7 @@ use patch::{OpMethod, Patch};
type Patches = (Vec<Patch>, Vec<Patch>, Vec<Patch>, Vec<Patch>);
pub const CONFIG_FILE: &str = "darkwiki.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../darkwiki.toml");
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../../darkwiki.toml");
/// darkwikid cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
@@ -225,8 +225,9 @@ impl Darkwiki {
}
patch = self.raft.1.recv().fuse() => {
for (workspace, salsa_box) in self.workspaces.iter() {
if let Ok(patch) = decrypt_patch(&patch.clone()?, &salsa_box) {
if let Ok(mut patch) = decrypt_patch(&patch.clone()?, &salsa_box) {
info!("[{}] Receive a {:?}", workspace, patch);
patch.workspace = workspace.clone();
self.on_receive_patch(&patch)?;
}
}
@@ -563,7 +564,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
if confirm == "yes" || confirm == "y" {
remove_dir_all(docs_path).unwrap_or(());
println!("Local docs get removed");
println!("Local data removed successfully.");
} else {
error!("Unexpected Value: {}", confirm);
}
@@ -623,7 +624,6 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
//
// Raft
//
let net_settings = settings.net;
let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default()));
let datastore_raft = datastore_path.join("darkwiki.db");
@@ -634,6 +634,8 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
//
// P2p setup
//
let mut net_settings = settings.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<NetMsg>();
let p2p = net::P2p::new(net_settings.into()).await;

View File

@@ -1,3 +0,0 @@
# Darkwiki
see [Darkfi Book](https://darkrenaissance.github.io/darkfi/misc/darkwiki.html) for the installation guide.

View File

@@ -72,6 +72,7 @@ pub struct SessionInfo {
pub is_empty: bool,
pub children: Vec<ConnectInfo>,
pub accept_addr: Option<String>,
pub hosts: Option<Vec<String>>,
}
impl SessionInfo {
@@ -82,8 +83,9 @@ impl SessionInfo {
parent: String,
children: Vec<ConnectInfo>,
accept_addr: Option<String>,
hosts: Option<Vec<String>>,
) -> Self {
Self { id, name, is_empty, parent, children, accept_addr }
Self { id, name, is_empty, parent, children, accept_addr, hosts }
}
}

View File

@@ -133,7 +133,7 @@ impl DataParser {
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent, connects, accept_addr);
SessionInfo::new(session_id, name, is_empty, parent, connects, accept_addr, None);
sessions.push(session_info);
let node = NodeInfo::new(node_id, node_name, state, sessions.clone(), None, true);
@@ -405,12 +405,12 @@ impl DataParser {
if accept_vec.is_empty() {
let accept_addr = None;
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr, None);
Ok(session_info)
} else {
let accept_addr = Some(accept_vec[0].clone());
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr, None);
Ok(session_info)
}
}
@@ -455,8 +455,15 @@ impl DataParser {
let parent = connect_id;
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent, connects.clone(), accept_addr);
let session_info = SessionInfo::new(
session_id,
name,
is_empty,
parent,
connects.clone(),
accept_addr,
None,
);
Ok(session_info)
}
@@ -474,6 +481,8 @@ impl DataParser {
let slots = &outbound["slots"];
let mut slot_count = 0;
let hosts = &outbound["hosts"];
match slots.as_array() {
Some(slots) => {
for slot in slots {
@@ -549,9 +558,24 @@ impl DataParser {
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
match hosts.as_array() {
Some(hosts) => {
let hosts: Vec<String> =
hosts.iter().map(|addr| addr.as_str().unwrap().to_string()).collect();
let session_info = SessionInfo::new(
id,
name,
is_empty,
parent,
connects,
accept_addr,
Some(hosts),
);
Ok(session_info)
}
None => Err(DnetViewError::ValueIsNotObject),
}
}
None => Err(DnetViewError::ValueIsNotObject),
}

View File

@@ -322,11 +322,19 @@ impl<'a> View {
Some(SelectableObject::Session(session)) => {
//debug!(target: "dnetview", "render_info()::SelectableObject::Session");
if session.accept_addr.is_some() {
let session_info = Span::styled(
let accept_addr = Span::styled(
format!("Accept addr: {}", session.accept_addr.as_ref().unwrap()),
style,
);
lines.push(Spans::from(session_info));
lines.push(Spans::from(accept_addr));
}
if session.hosts.is_some() {
let hosts = Span::styled(format!("Hosts:"), style);
lines.push(Spans::from(hosts));
for host in session.hosts.as_ref().unwrap() {
let host = Span::styled(format!(" {}", host), style);
lines.push(Spans::from(host));
}
}
}
Some(SelectableObject::Connect(connect)) => {

View File

@@ -1,7 +1,7 @@
[package]
name = "ircd"
description = "P2P IRC daemon"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"

View File

@@ -7,67 +7,23 @@ use std::{
use chrono::Utc;
use ripemd::{Digest, Ripemd160};
use crate::Privmsg;
use crate::{settings, Privmsg};
pub const SIZE_OF_MSGS_BUFFER: usize = 4095;
pub const SIZE_OF_MSG_IDSS_BUFFER: usize = 65536;
pub const LIFETIME_FOR_ORPHAN: i64 = 600;
pub type InvSeenIds = Arc<Mutex<RingBuffer<u64>>>;
pub type SeenIds = Mutex<RingBuffer<u64>>;
pub type MutexPrivmsgsBuffer = Mutex<PrivmsgsBuffer>;
pub type UnreadMsgs = Mutex<UMsgs>;
pub type Buffers = Arc<Msgs>;
pub struct Msgs {
pub privmsgs: MutexPrivmsgsBuffer,
pub unread_msgs: UnreadMsgs,
pub privmsgs: PrivmsgsBuffer,
pub unread_msgs: UMsgs,
pub seen_ids: SeenIds,
}
pub fn create_buffers() -> Buffers {
let seen_ids = Mutex::new(RingBuffer::new(SIZE_OF_MSG_IDSS_BUFFER));
let seen_ids = SeenIds::new();
let privmsgs = PrivmsgsBuffer::new();
let unread_msgs = Mutex::new(UMsgs::new());
let unread_msgs = UMsgs::new();
Arc::new(Msgs { privmsgs, unread_msgs, seen_ids })
}
#[derive(Clone)]
pub struct UMsgs {
pub msgs: BTreeMap<String, Privmsg>,
capacity: usize,
}
impl UMsgs {
pub fn new() -> Self {
Self { msgs: BTreeMap::new(), capacity: SIZE_OF_MSGS_BUFFER }
}
pub fn insert(&mut self, msg: &Privmsg) -> String {
let mut hasher = Ripemd160::new();
hasher.update(msg.to_string());
let key = hex::encode(hasher.finalize());
if self.msgs.len() == self.capacity {
self.pop_front();
}
self.msgs.insert(key.clone(), msg.clone());
key
}
fn pop_front(&mut self) {
let first_key = self.msgs.iter().next_back().unwrap().0.clone();
self.msgs.remove(&first_key);
}
}
impl Default for UMsgs {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct RingBuffer<T> {
pub items: VecDeque<T>,
@@ -115,30 +71,86 @@ impl<T: Eq + PartialEq + Clone> RingBuffer<T> {
}
}
#[derive(Clone)]
pub struct PrivmsgsBuffer {
msgs: Mutex<OrderingAlgo>,
}
impl PrivmsgsBuffer {
pub fn new() -> Self {
Self { msgs: Mutex::new(OrderingAlgo::new()) }
}
pub async fn push(&self, privmsg: &Privmsg) {
self.msgs.lock().await.push(privmsg);
}
pub async fn load(&self) -> Vec<Privmsg> {
self.msgs.lock().await.load()
}
pub async fn get_msg_by_term(&self, term: u64) -> Option<Privmsg> {
self.msgs.lock().await.get_msg_by_term(term)
}
pub async fn len(&self) -> usize {
self.msgs.lock().await.len()
}
pub async fn is_empty(&self) -> bool {
self.msgs.lock().await.is_empty()
}
pub async fn last_term(&self) -> u64 {
self.msgs.lock().await.last_term()
}
pub async fn fetch_msgs(&self, term: u64) -> Vec<Privmsg> {
self.msgs.lock().await.fetch_msgs(term)
}
}
pub struct OrderingAlgo {
buffer: RingBuffer<Privmsg>,
orphans: RingBuffer<Orphan>,
}
impl PrivmsgsBuffer {
pub fn new() -> MutexPrivmsgsBuffer {
Mutex::new(Self {
buffer: RingBuffer::new(SIZE_OF_MSGS_BUFFER),
orphans: RingBuffer::new(SIZE_OF_MSGS_BUFFER),
})
impl Default for OrderingAlgo {
fn default() -> Self {
Self::new()
}
}
impl OrderingAlgo {
pub fn new() -> Self {
Self {
buffer: RingBuffer::new(settings::SIZE_OF_MSGS_BUFFER),
orphans: RingBuffer::new(settings::SIZE_OF_MSGS_BUFFER),
}
}
pub fn push(&mut self, privmsg: &Privmsg) {
match privmsg.term.cmp(&(self.last_term() + 1)) {
Ordering::Equal | Ordering::Less => self.buffer.push(privmsg.clone()),
Ordering::Equal => self.buffer.push(privmsg.clone()),
Ordering::Less => {
if let Some(msg) = self.get_msg_by_term(privmsg.term) {
if (msg.timestamp - privmsg.timestamp) <= settings::TERM_MAX_TIME_DIFFERENCE {
self.buffer.push(privmsg.clone());
}
} else {
self.buffer.push(privmsg.clone());
}
}
Ordering::Greater => self.orphans.push(Orphan::new(privmsg)),
}
self.update();
}
pub fn iter(&self) -> impl Iterator<Item = &Privmsg> + DoubleEndedIterator {
self.buffer.iter()
pub fn load(&self) -> Vec<Privmsg> {
self.buffer.iter().cloned().collect::<Vec<Privmsg>>()
}
pub fn get_msg_by_term(&self, term: u64) -> Option<Privmsg> {
self.buffer.iter().find(|p| p.term == term).cloned()
}
pub fn len(&self) -> usize {
@@ -146,7 +158,7 @@ impl PrivmsgsBuffer {
}
pub fn is_empty(&self) -> bool {
self.len() == 0
self.buffer.is_empty()
}
pub fn last_term(&self) -> u64 {
@@ -156,6 +168,10 @@ impl PrivmsgsBuffer {
}
}
pub fn fetch_msgs(&self, term: u64) -> Vec<Privmsg> {
self.buffer.iter().take_while(|p| p.term >= term).cloned().collect()
}
fn update(&mut self) {
self.sort_orphans();
self.update_orphans();
@@ -176,24 +192,35 @@ impl PrivmsgsBuffer {
});
}
fn oprhan_is_valid(&mut self, orphan: &Orphan) -> bool {
(orphan.timestamp + LIFETIME_FOR_ORPHAN) > Utc::now().timestamp()
fn oprhan_is_valid(orphan: &Orphan) -> bool {
(orphan.timestamp + settings::LIFETIME_FOR_ORPHAN) > Utc::now().timestamp()
}
fn update_orphans(&mut self) {
for orphan in self.orphans.clone().iter() {
let privmsg = orphan.msg.clone();
if !self.oprhan_is_valid(orphan) {
if !Self::oprhan_is_valid(orphan) {
self.orphans.remove(orphan);
continue
}
match privmsg.term.cmp(&(self.last_term() + 1)) {
Ordering::Equal | Ordering::Less => {
Ordering::Equal => {
self.buffer.push(privmsg.clone());
self.orphans.remove(orphan);
}
Ordering::Less => {
if let Some(msg) = self.get_msg_by_term(privmsg.term) {
if (msg.timestamp - privmsg.timestamp) <= settings::TERM_MAX_TIME_DIFFERENCE
{
self.buffer.push(privmsg.clone());
}
} else {
self.buffer.push(privmsg.clone());
}
self.orphans.remove(orphan);
}
Ordering::Greater => {}
}
}
@@ -212,6 +239,90 @@ impl Orphan {
}
}
pub struct SeenIds {
ids: Mutex<RingBuffer<u64>>,
}
impl Default for SeenIds {
fn default() -> Self {
Self::new()
}
}
impl SeenIds {
pub fn new() -> Self {
Self { ids: Mutex::new(RingBuffer::new(settings::SIZE_OF_IDSS_BUFFER)) }
}
pub async fn push(&self, id: u64) -> bool {
let ids = &mut self.ids.lock().await;
if !ids.contains(&id) {
ids.push(id);
return true
}
false
}
}
pub struct UMsgs {
msgs: Mutex<BTreeMap<String, Privmsg>>,
}
impl Default for UMsgs {
fn default() -> Self {
Self::new()
}
}
impl UMsgs {
pub fn new() -> Self {
Self { msgs: Mutex::new(BTreeMap::new()) }
}
pub async fn len(&self) -> usize {
self.msgs.lock().await.len()
}
pub async fn contains(&self, key: &str) -> bool {
self.msgs.lock().await.contains_key(key)
}
pub async fn remove(&self, key: &str) -> Option<Privmsg> {
self.msgs.lock().await.remove(key)
}
pub async fn get(&self, key: &str) -> Option<Privmsg> {
self.msgs.lock().await.get(key).cloned()
}
pub async fn load(&self) -> BTreeMap<String, Privmsg> {
self.msgs.lock().await.clone()
}
pub async fn inc_read_confirms(&self, key: &str) -> bool {
if let Some(msg) = self.msgs.lock().await.get_mut(key) {
msg.read_confirms += 1;
return true
}
false
}
pub async fn insert(&self, msg: &Privmsg) -> String {
let mut hasher = Ripemd160::new();
hasher.update(msg.to_string() + &msg.term.to_string());
let key = hex::encode(hasher.finalize());
let msgs = &mut self.msgs.lock().await;
if msgs.len() == settings::SIZE_OF_MSGS_BUFFER {
let first_key = msgs.iter().next_back().unwrap().0.clone();
msgs.remove(&first_key);
}
msgs.insert(key.clone(), msg.clone());
key
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -238,12 +349,9 @@ mod tests {
assert_eq!(b.iter().last().unwrap(), &"h9");
}
#[test]
fn test_privmsgs_buffer() {
let mut pms = PrivmsgsBuffer {
buffer: RingBuffer::new(SIZE_OF_MSGS_BUFFER),
orphans: RingBuffer::new(SIZE_OF_MSGS_BUFFER),
};
#[async_std::test]
async fn test_privmsgs_buffer() {
let pms = PrivmsgsBuffer::new();
//
// Fill the buffer with random generated terms in range 0..3001
@@ -253,12 +361,11 @@ mod tests {
for term in terms {
let privmsg = Privmsg::new("nick", "#dev", &format!("message_{}", term), term);
pms.push(&privmsg);
pms.push(&privmsg).await;
}
assert_eq!(pms.buffer.len(), 3000);
assert_eq!(pms.last_term(), 3000);
assert_eq!(pms.orphans.len(), 0);
assert_eq!(pms.len().await, 3000);
assert_eq!(pms.last_term().await, 3000);
//
// Fill the buffer with random generated terms in range 2000..4001
@@ -270,12 +377,11 @@ mod tests {
for term in terms {
let privmsg = Privmsg::new("nick", "#dev", &format!("message_{}", term), term);
pms.push(&privmsg);
pms.push(&privmsg).await;
}
assert_eq!(pms.buffer.len(), SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term(), 4000);
assert_eq!(pms.orphans.len(), 0);
assert_eq!(pms.len().await, settings::SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term().await, 4000);
//
// Fill the buffer with random generated terms in range 4000..7001
@@ -286,11 +392,44 @@ mod tests {
for term in terms {
let privmsg = Privmsg::new("nick", "#dev", &format!("message_{}", term), term);
pms.push(&privmsg);
pms.push(&privmsg).await;
}
assert_eq!(pms.buffer.len(), SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term(), 7000);
assert_eq!(pms.orphans.len(), 0);
assert_eq!(pms.len().await, settings::SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term().await, 7000);
}
#[async_std::test]
async fn test_seen_ids() {
let seen_ids = SeenIds::default();
assert!(seen_ids.push(3000).await);
assert!(seen_ids.push(3001).await);
assert!(!seen_ids.push(3000).await);
}
#[async_std::test]
async fn test_unread_msgs() {
let unread_msgs = UMsgs::default();
let p = Privmsg::new("nick", "#dev", &format!("message_{}", 0), 0);
let p_k = unread_msgs.insert(&p).await;
let p2 = Privmsg::new("nick", "#dev", &format!("message_{}", 0), 1);
let p2_k = unread_msgs.insert(&p2).await;
let p3 = Privmsg::new("nick", "#dev", &format!("message_{}", 0), 2);
let p3_k = unread_msgs.insert(&p3).await;
assert_eq!(unread_msgs.len().await, 3);
assert_eq!(unread_msgs.get(&p_k).await, Some(p.clone()));
assert_eq!(unread_msgs.get(&p2_k).await, Some(p2));
assert_eq!(unread_msgs.get(&p3_k).await, Some(p3));
assert!(unread_msgs.inc_read_confirms(&p_k).await);
assert!(!unread_msgs.inc_read_confirms("NONE KEY").await);
assert_ne!(unread_msgs.get(&p_k).await, Some(p));
assert_eq!(unread_msgs.get(&p_k).await.unwrap().read_confirms, 1);
}
}

View File

@@ -16,17 +16,13 @@ use darkfi::{
use crate::{
buffers::Buffers,
crypto::{decrypt_privmsg, decrypt_target, encrypt_privmsg},
privmsg::{MAXIMUM_LENGTH_OF_MESSAGE, MAXIMUM_LENGTH_OF_NICKNAME},
settings,
settings::RPL,
ChannelInfo, Privmsg,
};
use super::IrcConfig;
const RPL_NOTOPIC: u32 = 331;
const RPL_TOPIC: u32 = 332;
const RPL_NAMEREPLY: u32 = 353;
const RPL_ENDOFNAMES: u32 = 366;
pub struct IrcClient<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
// network stream
write_stream: WriteHalf<C>,
@@ -159,7 +155,7 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
}
async fn update(&mut self, line: String) -> Result<()> {
if line.len() > MAXIMUM_LENGTH_OF_MESSAGE {
if line.len() > settings::MAXIMUM_LENGTH_OF_MESSAGE {
return Err(Error::MalformedPacket)
}
@@ -211,9 +207,8 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
}
// Send dm messages in buffer
let privmsgs = self.buffers.privmsgs.lock().await.clone();
for msg in privmsgs.iter() {
self.process_msg(msg).await?;
for msg in self.buffers.privmsgs.load().await {
self.process_msg(&msg).await?;
}
}
Ok(())
@@ -255,7 +250,7 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
}
async fn on_receive_nick(&mut self, nickname: &str) -> Result<()> {
if nickname.len() > MAXIMUM_LENGTH_OF_NICKNAME {
if nickname.len() > settings::MAXIMUM_LENGTH_OF_NICKNAME {
return Ok(())
}
@@ -300,10 +295,22 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
// Client is asking or the topic
let chan_info = self.irc_config.configured_chans.get(channel).unwrap();
let topic_reply = if let Some(topic) = &chan_info.topic {
format!("{} {} {} :{}\r\n", RPL_TOPIC, self.irc_config.nickname, channel, topic)
format!(
"{} {} {} :{}\r\n",
RPL::Topic as u32,
self.irc_config.nickname,
channel,
topic
)
} else {
const TOPIC: &str = "No topic is set";
format!("{} {} {} :{}\r\n", RPL_NOTOPIC, self.irc_config.nickname, channel, TOPIC)
format!(
"{} {} {} :{}\r\n",
RPL::NoTopic as u32,
self.irc_config.nickname,
channel,
TOPIC
)
};
self.reply(&topic_reply).await?;
}
@@ -408,7 +415,7 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
let names_reply = format!(
":{}!anon@dark.fi {} = {} : {}\r\n",
self.irc_config.nickname,
RPL_NAMEREPLY,
RPL::NameReply as u32,
chan,
chan_info.names.join(" ")
);
@@ -417,7 +424,9 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
let end_of_names = format!(
":DarkFi {:03} {} {} :End of NAMES list\r\n",
RPL_ENDOFNAMES, self.irc_config.nickname, chan
RPL::EndOfNames as u32,
self.irc_config.nickname,
chan
);
self.reply(&end_of_names).await?;
@@ -437,9 +446,7 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
info!("[CLIENT {}] (Plain) PRIVMSG {} :{}", self.address, target, message,);
let privmsgs_buffer = self.buffers.privmsgs.lock().await;
let last_term = privmsgs_buffer.last_term() + 1;
drop(privmsgs_buffer);
let last_term = self.buffers.privmsgs.last_term().await + 1;
let mut privmsg = Privmsg::new(&self.irc_config.nickname, target, &message, last_term);
@@ -470,10 +477,8 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcClient<C> {
}
}
{
(*self.buffers.seen_ids.lock().await).push(privmsg.id);
(*self.buffers.privmsgs.lock().await).push(&privmsg);
}
self.buffers.seen_ids.push(privmsg.id).await;
self.buffers.privmsgs.push(&privmsg).await;
self.notify_clients
.notify_with_exclude(privmsg.clone(), &[self.subscription.get_id()])

View File

@@ -1,4 +1,4 @@
use async_std::sync::{Arc, Mutex};
use async_std::sync::Arc;
use std::fmt;
use async_channel::Receiver;
@@ -11,6 +11,7 @@ use structopt_toml::StructOptToml;
use darkfi::{
async_daemonize, net,
net::P2pPtr,
rpc::server::listen_and_serve,
system::{Subscriber, SubscriberPtr},
util::{
@@ -18,6 +19,7 @@ use darkfi::{
expand_path,
file::save_json_file,
path::get_config_path,
sleep,
},
Result,
};
@@ -31,10 +33,10 @@ pub mod rpc;
pub mod settings;
use crate::{
buffers::{create_buffers, Buffers, RingBuffer, SIZE_OF_MSG_IDSS_BUFFER},
buffers::{create_buffers, Buffers},
irc::IrcServer,
privmsg::Privmsg,
protocol_privmsg::ProtocolPrivmsg,
protocol_privmsg::{LastTerm, ProtocolPrivmsg},
rpc::JsonRpcInterface,
settings::{Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS},
};
@@ -51,6 +53,25 @@ impl fmt::Display for KeyPair {
}
}
async fn resend_unread_msgs(p2p: P2pPtr, buffers: Buffers) -> Result<()> {
loop {
sleep(settings::TIMEOUT_FOR_RESEND_UNREAD_MSGS).await;
for msg in buffers.unread_msgs.load().await.values() {
p2p.broadcast(msg.clone()).await?;
}
}
}
async fn send_last_term(p2p: P2pPtr, buffers: Buffers) -> Result<()> {
loop {
sleep(settings::BROADCAST_LAST_TERM_MSG).await;
let term = buffers.privmsgs.last_term().await;
p2p.broadcast(LastTerm { term }).await?;
}
}
struct Ircd {
notify_clients: SubscriberPtr<Privmsg>,
}
@@ -98,7 +119,6 @@ impl Ircd {
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let seen_inv_ids = Arc::new(Mutex::new(RingBuffer::new(SIZE_OF_MSG_IDSS_BUFFER)));
let buffers = create_buffers();
if settings.gen_secret {
@@ -130,7 +150,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
// P2p setup
//
let mut net_settings = settings.net.clone();
net_settings.app_version = option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<Privmsg>();
let p2p = net::P2p::new(net_settings.into()).await;
@@ -139,16 +159,11 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let registry = p2p.protocol_registry();
let buffers_cloned = buffers.clone();
let seen_inv_ids_cloned = seen_inv_ids.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let sender = p2p_send_channel.clone();
let seen_inv_ids_cloned = seen_inv_ids_cloned.clone();
let buffers_cloned = buffers_cloned.clone();
async move {
ProtocolPrivmsg::init(channel, sender, p2p, seen_inv_ids_cloned, buffers_cloned)
.await
}
async move { ProtocolPrivmsg::init(channel, sender, p2p, buffers_cloned).await }
})
.await;
@@ -157,6 +172,12 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let executor_cloned = executor.clone();
executor_cloned.spawn(p2p.clone().run(executor.clone())).detach();
//
// Sync tasks
//
executor.spawn(resend_unread_msgs(p2p.clone(), buffers.clone())).detach();
executor.spawn(send_last_term(p2p.clone(), buffers.clone())).detach();
//
// RPC interface
//

View File

@@ -5,9 +5,6 @@ use darkfi::util::serial::{SerialDecodable, SerialEncodable};
pub type PrivmsgId = u64;
pub const MAXIMUM_LENGTH_OF_MESSAGE: usize = 1024;
pub const MAXIMUM_LENGTH_OF_NICKNAME: usize = 32;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq, PartialEq)]
pub struct Privmsg {
pub id: PrivmsgId,

View File

@@ -1,4 +1,5 @@
use async_std::sync::Arc;
use std::cmp::Ordering;
use async_executor::Executor;
use async_trait::async_trait;
@@ -8,21 +9,11 @@ use rand::{rngs::OsRng, RngCore};
use darkfi::{
net,
util::{
serial::{SerialDecodable, SerialEncodable},
sleep,
},
util::serial::{SerialDecodable, SerialEncodable},
Result,
};
use crate::{
buffers::{Buffers, InvSeenIds},
Privmsg,
};
const MAX_CONFIRM: u8 = 4;
const SLEEP_TIME_FOR_RESEND: u64 = 1200;
const UNREAD_MSG_EXPIRE_TIME: i64 = 259200;
use crate::{buffers::Buffers, settings, Privmsg};
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct Inv {
@@ -30,6 +21,11 @@ struct Inv {
invs: Vec<InvObject>,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct LastTerm {
pub term: u64,
}
impl Inv {
fn new(invs: Vec<InvObject>) -> Self {
let id = OsRng.next_u64();
@@ -57,9 +53,9 @@ pub struct ProtocolPrivmsg {
msg_sub: net::MessageSubscription<Privmsg>,
inv_sub: net::MessageSubscription<Inv>,
getdata_sub: net::MessageSubscription<GetData>,
last_term_sub: net::MessageSubscription<LastTerm>,
p2p: net::P2pPtr,
channel: net::ChannelPtr,
inv_ids: InvSeenIds,
buffers: Buffers,
}
@@ -68,31 +64,37 @@ impl ProtocolPrivmsg {
channel: net::ChannelPtr,
notify: async_channel::Sender<Privmsg>,
p2p: net::P2pPtr,
inv_ids: InvSeenIds,
buffers: Buffers,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Privmsg>().await;
message_subsytem.add_dispatch::<Inv>().await;
message_subsytem.add_dispatch::<GetData>().await;
message_subsytem.add_dispatch::<LastTerm>().await;
let msg_sub =
channel.clone().subscribe_msg::<Privmsg>().await.expect("Missing Privmsg dispatcher!");
let inv_sub = channel.subscribe_msg::<Inv>().await.expect("Missing Inv dispatcher!");
let getdata_sub =
channel.clone().subscribe_msg::<GetData>().await.expect("Missing GetData dispatcher!");
let inv_sub = channel.subscribe_msg::<Inv>().await.expect("Missing Inv dispatcher!");
let last_term_sub = channel
.clone()
.subscribe_msg::<LastTerm>()
.await
.expect("Missing LastTerm dispatcher!");
Arc::new(Self {
notify,
msg_sub,
inv_sub,
getdata_sub,
last_term_sub,
jobsman: net::ProtocolJobsManager::new("ProtocolPrivmsg", channel.clone()),
p2p,
channel,
inv_ids,
buffers,
})
}
@@ -104,19 +106,13 @@ impl ProtocolPrivmsg {
let inv = self.inv_sub.receive().await?;
let inv = (*inv).to_owned();
let mut inv_ids = self.inv_ids.lock().await;
if inv_ids.contains(&inv.id) {
if !self.buffers.seen_ids.push(inv.id).await {
continue
}
inv_ids.push(inv.id);
drop(inv_ids);
let mut inv_requested = vec![];
for inv_object in inv.invs.iter() {
let msgs = &mut self.buffers.unread_msgs.lock().await.msgs;
if let Some(msg) = msgs.get_mut(&inv_object.0) {
msg.read_confirms += 1;
} else {
if !self.buffers.unread_msgs.inc_read_confirms(&inv_object.0).await {
inv_requested.push(inv_object.clone());
}
}
@@ -138,14 +134,11 @@ impl ProtocolPrivmsg {
let msg = self.msg_sub.receive().await?;
let mut msg = (*msg).to_owned();
let mut msg_ids = self.buffers.seen_ids.lock().await;
if msg_ids.contains(&msg.id) {
if !self.buffers.seen_ids.push(msg.id).await {
continue
}
msg_ids.push(msg.id);
drop(msg_ids);
if msg.read_confirms >= MAX_CONFIRM {
if msg.read_confirms >= settings::MAX_CONFIRM {
self.add_to_msgs(&msg).await?;
} else {
msg.read_confirms += 1;
@@ -158,15 +151,33 @@ impl ProtocolPrivmsg {
}
}
async fn handle_receive_last_term(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_last_term() [START]");
loop {
let last_term = self.last_term_sub.receive().await?;
let last_term = last_term.term;
self.update_unread_msgs().await?;
match self.buffers.privmsgs.last_term().await.cmp(&last_term) {
Ordering::Less => {
for msg in self.buffers.privmsgs.fetch_msgs(last_term).await {
self.channel.send(msg).await?;
}
}
Ordering::Greater | Ordering::Equal => continue,
}
}
}
async fn handle_receive_getdata(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_getdata() [START]");
loop {
let getdata = self.getdata_sub.receive().await?;
let getdata = (*getdata).to_owned();
let msgs = &self.buffers.unread_msgs.lock().await.msgs;
for inv in getdata.invs {
if let Some(msg) = msgs.get(&inv.0) {
if let Some(msg) = self.buffers.unread_msgs.get(&inv.0).await {
self.channel.send(msg.clone()).await?;
}
}
@@ -174,40 +185,28 @@ impl ProtocolPrivmsg {
}
async fn add_to_unread_msgs(&self, msg: &Privmsg) -> String {
self.buffers.unread_msgs.lock().await.insert(msg)
self.buffers.unread_msgs.insert(msg).await
}
async fn update_unread_msgs(&self) -> Result<()> {
let msgs = &mut self.buffers.unread_msgs.lock().await.msgs;
for (hash, msg) in msgs.clone() {
if msg.timestamp + UNREAD_MSG_EXPIRE_TIME < Utc::now().timestamp() {
msgs.remove(&hash);
for (hash, msg) in self.buffers.unread_msgs.load().await {
if msg.timestamp + settings::UNREAD_MSG_EXPIRE_TIME < Utc::now().timestamp() {
self.buffers.unread_msgs.remove(&hash).await;
continue
}
if msg.read_confirms >= MAX_CONFIRM {
if msg.read_confirms >= settings::MAX_CONFIRM {
self.add_to_msgs(&msg).await?;
msgs.remove(&hash);
self.buffers.unread_msgs.remove(&hash).await;
}
}
Ok(())
}
async fn add_to_msgs(&self, msg: &Privmsg) -> Result<()> {
self.buffers.privmsgs.lock().await.push(msg);
self.buffers.privmsgs.push(msg).await;
self.notify.send(msg.clone()).await?;
Ok(())
}
async fn resend_loop(self: Arc<Self>) -> Result<()> {
sleep(SLEEP_TIME_FOR_RESEND).await;
self.update_unread_msgs().await?;
for msg in self.buffers.unread_msgs.lock().await.msgs.values() {
self.channel.send(msg.clone()).await?;
}
Ok(())
}
}
#[async_trait]
@@ -217,18 +216,16 @@ impl net::ProtocolBase for ProtocolPrivmsg {
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
// once a channel get started
let msgs_buffer = self.buffers.privmsgs.lock().await;
for m in msgs_buffer.iter() {
self.channel.send(m.clone()).await?;
for m in self.buffers.privmsgs.load().await {
self.channel.send(m).await?;
}
drop(msgs_buffer);
debug!(target: "ircd", "ProtocolPrivmsg::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_msg(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_inv(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_getdata(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().resend_loop(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_last_term(), executor.clone()).await;
debug!(target: "ircd", "ProtocolPrivmsg::start() [END]");
Ok(())
}
@@ -255,3 +252,9 @@ impl net::Message for GetData {
"getdata"
}
}
impl net::Message for LastTerm {
fn name() -> &'static str {
"last_term"
}
}

View File

@@ -9,9 +9,34 @@ use url::Url;
use darkfi::{net::settings::SettingsOpt, Result};
// Location for config file
pub const CONFIG_FILE: &str = "ircd_config.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../ircd_config.toml");
// Buffers and ordering configuration
pub const SIZE_OF_MSGS_BUFFER: usize = 4095;
pub const SIZE_OF_IDSS_BUFFER: usize = 16384;
pub const LIFETIME_FOR_ORPHAN: i64 = 600;
pub const TERM_MAX_TIME_DIFFERENCE: i64 = 180;
pub const BROADCAST_LAST_TERM_MSG: u64 = 4;
// Msg config
pub const MAXIMUM_LENGTH_OF_MESSAGE: usize = 1024;
pub const MAXIMUM_LENGTH_OF_NICKNAME: usize = 32;
// Protocol config
pub const MAX_CONFIRM: u8 = 4;
pub const UNREAD_MSG_EXPIRE_TIME: i64 = 18000;
pub const TIMEOUT_FOR_RESEND_UNREAD_MSGS: u64 = 240;
// IRC Client
pub enum RPL {
NoTopic = 331,
Topic = 332,
NameReply = 353,
EndOfNames = 366,
}
/// ircd cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]

View File

@@ -12,6 +12,9 @@
# Daemon published urls, common for all enabled networks
#urls = ["tcp://127.0.0.1"]
# Hosts .tsv file to use
#hosts_file="~/.config/darkfi/lilith_hosts.tsv"
## Per-network settings
#[network."darkfid_sync"]
#port = 33032

View File

@@ -24,6 +24,10 @@ pub struct Args {
/// Daemon published urls, common for all enabled networks (repeatable flag)
pub urls: Vec<Url>,
#[structopt(long, default_value = "~/.config/darkfi/lilith_hosts.tsv")]
/// Hosts .tsv file to use
pub hosts_file: String,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
pub verbose: u8,

View File

@@ -1,8 +1,11 @@
use std::path::Path;
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use futures_lite::future;
use log::{error, info};
use fxhash::{FxHashMap, FxHashSet};
use log::{error, info, warn};
use serde_json::{json, Value};
use structopt_toml::StructOptToml;
use url::Url;
@@ -20,6 +23,7 @@ use darkfi::{
util::{
cli::{get_log_config, get_log_level, spawn_config},
expand_path,
file::{load_file, save_file},
path::get_config_path,
},
Result,
@@ -32,9 +36,9 @@ const CONFIG_FILE: &str = "lilith_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
/// Struct representing a spawned p2p network.
pub struct Spawn {
pub name: String,
pub p2p: P2pPtr,
struct Spawn {
name: String,
p2p: P2pPtr,
}
impl Spawn {
@@ -58,7 +62,7 @@ impl Spawn {
}
/// Struct representing the daemon.
pub struct Lilith {
struct Lilith {
/// Configured urls
urls: Vec<Url>,
/// Spawned networks
@@ -66,6 +70,16 @@ pub struct Lilith {
}
impl Lilith {
async fn spawns_hosts(&self) -> FxHashMap<String, Vec<String>> {
// Building urls string
let mut spawns = FxHashMap::default();
for spawn in &self.spawns {
spawns.insert(spawn.name.clone(), spawn.addresses().await);
}
spawns
}
// RPCAPI:
// Returns all spawned networks names with their node addresses.
// --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
@@ -121,6 +135,7 @@ async fn spawn_network(
name: &str,
info: NetInfo,
urls: Vec<Url>,
saved_hosts: Option<&FxHashSet<Url>>,
ex: Arc<Executor<'_>>,
) -> Result<Spawn> {
let mut full_urls = Vec::new();
@@ -135,11 +150,25 @@ async fn spawn_network(
peers: info.peers,
outbound_connections: 0,
localnet: info.localnet,
app_version: None,
..Default::default()
};
let p2p = net::P2p::new(network_settings).await;
// Setting saved hosts
match saved_hosts {
Some(hosts) => {
// Converting hashet to vec
let mut vec = vec![];
for url in hosts {
vec.push(url.clone());
}
p2p.hosts().store(vec).await;
}
None => info!("No saved hosts found for {}", name),
}
// Building ext_addr_vec string
let mut urls_vec = vec![];
for url in &full_urls {
@@ -161,6 +190,58 @@ async fn spawn_network(
Ok(spawn)
}
/// Retrieve saved hosts for provided networks
fn load_hosts(path: &Path, networks: &Vec<String>) -> FxHashMap<String, FxHashSet<Url>> {
let mut saved_hosts = FxHashMap::default();
info!("Retrieving saved hosts from: {:?}", path);
let contents = load_file(path);
if let Err(e) = contents {
warn!("Failed retrieving saved hosts: {}", e);
return saved_hosts
}
for line in contents.unwrap().lines() {
let data: Vec<&str> = line.split('\t').collect();
if networks.contains(&data[0].to_string()) {
let mut hosts = match saved_hosts.get(data[0]) {
Some(hosts) => hosts.clone(),
None => FxHashSet::default(),
};
let url = match Url::parse(data[1]) {
Ok(u) => u,
Err(e) => {
warn!("Skipping malformed url: {} ({})", data[1], e);
continue
}
};
hosts.insert(url);
saved_hosts.insert(data[0].to_string(), hosts);
}
}
saved_hosts
}
/// Save spawns current hosts
fn save_hosts(path: &Path, spawns: FxHashMap<String, Vec<String>>) {
let mut string = "".to_string();
for (name, urls) in spawns {
for url in urls {
string.push_str(&name);
string.push('\t');
string.push_str(&url);
string.push('\n');
}
}
if !string.eq("") {
info!("Saving current hosts of spawnned networks to: {:?}", path);
if let Err(e) = save_file(path, &string) {
error!("Failed saving hosts: {}", e);
}
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// We use this handler to block this function after detaching all
@@ -191,10 +272,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
urls.push(url);
}
// Retrieve saved hosts for configured networks
let full_path = expand_path(&args.hosts_file)?;
let saved_hosts = load_hosts(&full_path, &configured_nets.keys().cloned().collect());
// Spawn configured networks
let mut spawns = vec![];
for (name, info) in &configured_nets {
match spawn_network(name, info.clone(), urls.clone(), ex.clone()).await {
match spawn_network(name, info.clone(), urls.clone(), saved_hosts.get(name), ex.clone())
.await
{
Ok(spawn) => spawns.push(spawn),
Err(e) => error!("Failed starting {} P2P network seed: {}", name, e),
}
@@ -205,12 +292,15 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen, lilith)).detach();
ex.spawn(listen_and_serve(args.rpc_listen, lilith.clone())).detach();
// Wait for SIGINT
shutdown.recv().await?;
print!("\r");
info!("Caught termination signal, cleaning up and exiting...");
// Save spawns current hosts
save_hosts(&full_path, lilith.spawns_hosts().await);
Ok(())
}

View File

@@ -1,7 +1,7 @@
[package]
name = "tau"
description = "Command-line client for taud"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"

View File

@@ -34,6 +34,9 @@ pub fn apply_filter(tasks: &mut Vec<TaskInfo>, filter: &str) {
}
}
// Filter by month
_ if filter.starts_with('+') => tasks.retain(|task| task.tags.contains(&filter.into())),
// Filter by month
_ if filter.contains("month:") => {
let kv: Vec<&str> = filter.split(':').collect();

View File

@@ -76,6 +76,7 @@ enum TauSubcommand {
/// Modify/Edit an existing task.
Modify {
#[clap(allow_hyphen_values = true)]
/// Values (e.g. project:blockchain).
values: Vec<String>,
},

View File

@@ -52,6 +52,7 @@ impl FromStr for State {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct BaseTask {
pub title: String,
pub tags: Vec<String>,
pub desc: Option<String>,
pub assign: Vec<String>,
pub project: Vec<String>,
@@ -65,6 +66,7 @@ pub struct TaskInfo {
pub workspace: String,
pub id: u32,
pub title: String,
pub tags: Vec<String>,
pub desc: String,
pub owner: String,
pub assign: Vec<String>,
@@ -117,6 +119,7 @@ impl std::fmt::Display for Comment {
pub fn task_from_cli(values: Vec<String>) -> Result<BaseTask> {
let mut title = String::new();
let mut tags = vec![];
let mut desc = None;
let mut project = vec![];
let mut assign = vec![];
@@ -126,6 +129,10 @@ pub fn task_from_cli(values: Vec<String>) -> Result<BaseTask> {
for val in values {
let field: Vec<&str> = val.split(':').collect();
if field.len() == 1 {
if field[0].starts_with('+') || field[0].starts_with('-') {
tags.push(field[0].into());
continue
}
title.push_str(field[0]);
title.push(' ');
continue
@@ -155,6 +162,7 @@ pub fn task_from_cli(values: Vec<String>) -> Result<BaseTask> {
rank = Some(field[1].parse::<f32>()?);
}
}
let title = title.trim().into();
Ok(BaseTask { title, desc, project, assign, due, rank })
Ok(BaseTask { title, tags, desc, project, assign, due, rank })
}

View File

@@ -26,7 +26,7 @@ pub fn print_task_list(tasks: Vec<TaskInfo>, ws: String) -> Result<()> {
.separators(&[LinePosition::Title], LineSeparator::new('-', ' ', ' ', ' '))
.build(),
);
table.set_titles(row!["ID", "Title", "Project", "Assigned", "Due", "Rank"]);
table.set_titles(row!["ID", "Title", "Tags", "Project", "Assigned", "Due", "Rank"]);
// group tasks by state.
tasks.sort_by_key(|task| task.state.clone());
@@ -66,10 +66,16 @@ pub fn print_task_list(tasks: Vec<TaskInfo>, ws: String) -> Result<()> {
};
let rank = if let Some(r) = task.rank { r.to_string() } else { "".to_string() };
let mut print_tags = vec![];
for tag in &task.tags {
let t = tag.replace('+', "");
print_tags.push(t)
}
table.add_row(Row::new(vec![
Cell::new(&task.id.to_string()).style_spec(gen_style),
Cell::new(&task.title).style_spec(gen_style),
Cell::new(&print_tags.join(", ")).style_spec(gen_style),
Cell::new(&task.project.join(", ")).style_spec(gen_style),
Cell::new(&task.assign.join(", ")).style_spec(gen_style),
Cell::new(&timestamp_to_date(task.due.unwrap_or(0), DateFormat::Date))
@@ -109,13 +115,14 @@ pub fn print_task_info(taskinfo: TaskInfo) -> Result<()> {
[Bd =>"id", &taskinfo.id.to_string()],
["owner", &taskinfo.owner],
[Bd =>"title", &taskinfo.title],
["desc", &taskinfo.desc.to_string()],
[Bd =>"assign", taskinfo.assign.join(", ")],
["project", taskinfo.project.join(", ")],
[Bd =>"due", due],
["rank", rank],
[Bd =>"created_at", created_at],
["current_state", &taskinfo.state]);
["tags", &taskinfo.tags.join(", ")],
[Bd =>"desc", &taskinfo.desc.to_string()],
["assign", taskinfo.assign.join(", ")],
[Bd =>"project", taskinfo.project.join(", ")],
["due", due],
[Bd =>"rank", rank],
["created_at", created_at],
[Bd =>"current_state", &taskinfo.state]);
table.set_format(
FormatBuilder::new()
@@ -170,6 +177,10 @@ pub fn events_as_string(events: Vec<TaskEvent>) -> (String, String) {
writeln!(events_str, "- {} changed project to {}", event.author, event.content)
.unwrap();
}
"tags" => {
writeln!(events_str, "- {} changed tags to {}", event.author, event.content)
.unwrap();
}
"due" => {
writeln!(
events_str,

View File

@@ -1,7 +1,7 @@
[package]
name = "taud"
description = "Encrypted tasks management app using peer-to-peer network and raft consensus."
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["darkfi <dev@dark.fi>"]
license = "AGPL-3.0-only"

View File

@@ -1,7 +1,8 @@
use async_std::sync::{Arc, Mutex};
use async_std::sync::Mutex;
use std::{fs::create_dir_all, path::PathBuf};
use async_trait::async_trait;
use crypto_box::SalsaBox;
use fxhash::FxHashMap;
use log::{debug, warn};
use serde::{Deserialize, Serialize};
@@ -21,21 +22,21 @@ use crate::{
error::{to_json_result, TaudError, TaudResult},
month_tasks::MonthTasks,
task_info::{Comment, TaskInfo},
util::Workspace,
};
pub struct JsonRpcInterface {
dataset_path: PathBuf,
notify_queue_sender: async_channel::Sender<TaskInfo>,
nickname: String,
workspace: Arc<Mutex<String>>,
configured_ws: FxHashMap<String, Workspace>,
workspace: Mutex<String>,
workspaces: FxHashMap<String, SalsaBox>,
p2p: net::P2pPtr,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BaseTaskInfo {
title: String,
tags: Vec<String>,
desc: String,
assign: Vec<String>,
project: Vec<String>,
@@ -78,11 +79,11 @@ impl JsonRpcInterface {
dataset_path: PathBuf,
notify_queue_sender: async_channel::Sender<TaskInfo>,
nickname: String,
workspace: Arc<Mutex<String>>,
configured_ws: FxHashMap<String, Workspace>,
workspaces: FxHashMap<String, SalsaBox>,
p2p: net::P2pPtr,
) -> Self {
Self { dataset_path, nickname, workspace, configured_ws, notify_queue_sender, p2p }
let workspace = Mutex::new(workspaces.iter().last().unwrap().0.clone());
Self { dataset_path, nickname, workspace, workspaces, notify_queue_sender, p2p }
}
// RPCAPI:
@@ -121,9 +122,8 @@ impl JsonRpcInterface {
debug!(target: "tau", "JsonRpc::add() params {:?}", params);
let task: BaseTaskInfo = serde_json::from_value(params[0].clone())?;
let ws = self.workspace.lock().await.clone();
let mut new_task: TaskInfo = TaskInfo::new(
ws,
self.workspace.lock().await.clone(),
&task.title,
&task.desc,
&self.nickname,
@@ -133,6 +133,7 @@ impl JsonRpcInterface {
)?;
new_task.set_project(&task.project);
new_task.set_assign(&task.assign);
new_task.set_tags(&task.tags);
self.notify_queue_sender.send(new_task).await.map_err(Error::from)?;
Ok(json!(true))
@@ -144,9 +145,12 @@ impl JsonRpcInterface {
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
async fn get_ids(&self, params: &[Value]) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::get_ids() params {:?}", params);
let ws = self.workspace.lock().await.clone();
let tasks = MonthTasks::load_current_tasks(&self.dataset_path, ws, false)?;
let task_ids: Vec<u32> = tasks.iter().map(|task| task.get_id()).collect();
Ok(json!(task_ids))
}
@@ -160,10 +164,12 @@ impl JsonRpcInterface {
if params.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 2".into()))
}
let ws = self.workspace.lock().await.clone();
let ws = self.workspace.lock().await.clone();
let task = self.check_params_for_update(&params[0], &params[1], ws)?;
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
Ok(json!(true))
}
@@ -188,7 +194,6 @@ impl JsonRpcInterface {
if states.contains(&state.as_str()) {
task.set_state(&state);
task.set_event("state", &self.nickname, &state);
}
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
@@ -208,11 +213,11 @@ impl JsonRpcInterface {
}
let comment_content: String = serde_json::from_value(params[1].clone())?;
let ws = self.workspace.lock().await.clone();
let ws = self.workspace.lock().await.clone();
let mut task: TaskInfo = self.load_task_by_id(&params[0], ws)?;
task.set_comment(Comment::new(&comment_content, &self.nickname));
task.set_event("comment", &self.nickname, &comment_content);
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
@@ -229,8 +234,8 @@ impl JsonRpcInterface {
if params.len() != 1 {
return Err(TaudError::InvalidData("len of params should be 1".into()))
}
let ws = self.workspace.lock().await.clone();
let ws = self.workspace.lock().await.clone();
let task: TaskInfo = self.load_task_by_id(&params[0], ws)?;
Ok(json!(task))
@@ -272,7 +277,7 @@ impl JsonRpcInterface {
let ws = params[0].as_str().unwrap().to_string();
let mut s = self.workspace.lock().await;
if self.configured_ws.contains_key(&ws) {
if self.workspaces.contains_key(&ws) {
*s = ws
} else {
warn!("Workspace \"{}\" is not configured", ws);
@@ -288,7 +293,6 @@ impl JsonRpcInterface {
async fn get_ws(&self, params: &[Value]) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::switch_ws() params {:?}", params);
let ws = self.workspace.lock().await.clone();
Ok(json!(ws))
}
@@ -307,12 +311,12 @@ impl JsonRpcInterface {
return Err(TaudError::InvalidData("Invalid path".into()))
}
let ws = self.workspace.lock().await.clone();
let path = expand_path(params[0].as_str().unwrap())?.join("exported_tasks");
// mkdir datastore_path if not exists
let path = expand_path(params[0].as_str().unwrap())?.join("exported_tasks");
create_dir_all(path.join("month")).map_err(Error::from)?;
create_dir_all(path.join("task")).map_err(Error::from)?;
let ws = self.workspace.lock().await.clone();
let tasks = MonthTasks::load_current_tasks(&self.dataset_path, ws, true)?;
for task in tasks {
@@ -337,8 +341,8 @@ impl JsonRpcInterface {
return Err(TaudError::InvalidData("Invalid path".into()))
}
let ws = self.workspace.lock().await.clone();
let path = expand_path(params[0].as_str().unwrap())?.join("exported_tasks");
let ws = self.workspace.lock().await.clone();
let tasks = MonthTasks::load_current_tasks(&path, ws, true)?;
for task in tasks {
@@ -374,7 +378,6 @@ impl JsonRpcInterface {
let title: String = serde_json::from_value(title)?;
if !title.is_empty() {
task.set_title(&title);
task.set_event("title", &self.nickname, &title);
}
}
@@ -384,7 +387,6 @@ impl JsonRpcInterface {
let description: Option<String> = serde_json::from_value(description.clone())?;
if let Some(desc) = description {
task.set_desc(&desc);
task.set_event("desc", &self.nickname, &desc);
}
}
}
@@ -395,7 +397,6 @@ impl JsonRpcInterface {
let rank: Option<f32> = serde_json::from_value(rank.clone())?;
if let Some(rank) = rank {
task.set_rank(Some(rank));
task.set_event("rank", &self.nickname, &rank.to_string());
}
}
}
@@ -405,9 +406,6 @@ impl JsonRpcInterface {
let due: Option<Option<Timestamp>> = serde_json::from_value(due)?;
if let Some(d) = due {
task.set_due(d);
if let Some(d) = d {
task.set_event("due", &self.nickname, &d.0.to_string());
}
}
}
@@ -416,7 +414,6 @@ impl JsonRpcInterface {
let assign: Vec<String> = serde_json::from_value(assign)?;
if !assign.is_empty() {
task.set_assign(&assign);
task.set_event("assign", &self.nickname, &assign.join(", "));
}
}
@@ -425,7 +422,14 @@ impl JsonRpcInterface {
let project: Vec<String> = serde_json::from_value(project)?;
if !project.is_empty() {
task.set_project(&project);
task.set_event("project", &self.nickname, &project.join(", "));
}
}
if fields.contains_key("tags") {
let tags = fields.get("tags").unwrap().clone();
let tags: Vec<String> = serde_json::from_value(tags)?;
if !tags.is_empty() {
task.set_tags(&tags);
}
}

View File

@@ -3,6 +3,7 @@ use std::{
env,
fs::{create_dir_all, remove_dir_all},
io::stdin,
path::Path,
};
use async_executor::Executor;
@@ -41,19 +42,37 @@ use crate::{
jsonrpc::JsonRpcInterface,
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
task_info::TaskInfo,
util::{parse_workspaces, Workspace},
};
fn get_workspaces(settings: &Args) -> Result<FxHashMap<String, SalsaBox>> {
let mut workspaces = FxHashMap::default();
for workspace in settings.workspaces.iter() {
let workspace: Vec<&str> = workspace.split(':').collect();
let (workspace, secret) = (workspace[0], workspace[1]);
let bytes: [u8; 32] = bs58::decode(secret)
.into_vec()?
.try_into()
.map_err(|_| Error::ParseFailed("Parse secret key failed"))?;
let secret = crypto_box::SecretKey::from(bytes);
let public = secret.public_key();
let salsa_box = crypto_box::SalsaBox::new(&public, &secret);
workspaces.insert(workspace.to_string(), salsa_box);
}
Ok(workspaces)
}
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct EncryptedTask {
workspace: String,
nonce: Vec<u8>,
payload: Vec<u8>,
}
fn encrypt_task(
task: &TaskInfo,
workspace: &String,
salsa_box: &SalsaBox,
rng: &mut crypto_box::rand_core::OsRng,
) -> TaudResult<EncryptedTask> {
@@ -64,7 +83,7 @@ fn encrypt_task(
let payload = salsa_box.encrypt(&nonce, payload)?;
let nonce = nonce.to_vec();
Ok(EncryptedTask { workspace: workspace.to_string(), nonce, payload })
Ok(EncryptedTask { nonce, payload })
}
fn decrypt_task(encrypt_task: &EncryptedTask, salsa_box: &SalsaBox) -> TaudResult<TaskInfo> {
@@ -79,72 +98,59 @@ fn decrypt_task(encrypt_task: &EncryptedTask, salsa_box: &SalsaBox) -> TaudResul
}
async fn start_sync_loop(
commits_received: Arc<Mutex<Vec<String>>>,
broadcast_rcv: async_channel::Receiver<TaskInfo>,
raft_msgs_sender: async_channel::Sender<EncryptedTask>,
commits_recv: async_channel::Receiver<EncryptedTask>,
datastore_path: std::path::PathBuf,
configured_ws: FxHashMap<String, Workspace>,
workspaces: FxHashMap<String, SalsaBox>,
mut rng: crypto_box::rand_core::OsRng,
) -> TaudResult<()> {
loop {
select! {
task = broadcast_rcv.recv().fuse() => {
let tk = task.map_err(Error::from)?;
if configured_ws.contains_key(&tk.workspace) {
let ws_info = configured_ws.get(&tk.workspace).unwrap();
if let Some(salsa_box) = &ws_info.encryption {
let encrypted_task = encrypt_task(&tk, &tk.workspace, salsa_box, &mut rng)?;
info!(target: "tau", "Send the task: ref: {}", tk.ref_id);
raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?;
}
if workspaces.contains_key(&tk.workspace) {
let salsa_box = workspaces.get(&tk.workspace).unwrap();
let encrypted_task = encrypt_task(&tk, salsa_box, &mut rng)?;
info!(target: "tau", "Send the task: ref: {}", tk.ref_id);
raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?;
}
}
task = commits_recv.recv().fuse() => {
let recv = task.map_err(Error::from)?;
if configured_ws.contains_key(&recv.workspace) {
let ws_info = configured_ws.get(&recv.workspace).unwrap();
if let Some(salsa_box) = &ws_info.encryption {
let task = decrypt_task(&recv, salsa_box);
if let Err(e) = task {
info!("unable to decrypt the task: {}", e);
continue
}
let task = task.unwrap();
if !commits_received.lock().await.contains(&task.ref_id) {
commits_received.lock().await.push(task.ref_id.clone());
}
info!(target: "tau", "Save the task: ref: {}", task.ref_id);
task.save(&datastore_path)?;
}
}
let task = task.map_err(Error::from)?;
on_receive_task(&task,&datastore_path, &workspaces)
.await?;
}
}
}
}
async fn on_receive_task(
task: &EncryptedTask,
datastore_path: &Path,
workspaces: &FxHashMap<String, SalsaBox>,
) -> TaudResult<()> {
for (workspace, salsa_box) in workspaces.iter() {
let task = decrypt_task(&task, &salsa_box);
if let Err(e) = task {
info!("unable to decrypt the task: {}", e);
continue
}
let mut task = task.unwrap();
info!(target: "tau", "Save the task: ref: {}", task.ref_id);
task.workspace = workspace.clone();
task.save(&datastore_path)?;
}
Ok(())
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let datastore_path = expand_path(&settings.datastore)?;
let nickname =
if settings.nickname.is_some() { settings.nickname } else { env::var("USER").ok() };
if nickname.is_none() {
error!("Provide a nickname in config file");
return Ok(())
}
let mut rng = crypto_box::rand_core::OsRng;
if settings.key_gen {
info!(target: "tau", "Generating a new secret key");
let secret_key = SecretKey::generate(&mut rng);
let encoded = bs58::encode(secret_key.as_bytes());
println!("Secret key: {}", encoded.into_string());
return Ok(())
}
if settings.nickname.is_some() { settings.nickname.clone() } else { env::var("USER").ok() };
if settings.refresh {
println!("Removing local data in: {:?} (yes/no)? ", datastore_path);
@@ -156,7 +162,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
if confirm == "yes" || confirm == "y" {
remove_dir_all(datastore_path).unwrap_or(());
println!("Local data get removed");
println!("Local data removed successfully.");
} else {
error!("Unexpected Value: {}", confirm);
}
@@ -164,27 +170,54 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
return Ok(())
}
if nickname.is_none() {
error!("Provide a nickname in config file");
return Ok(())
}
// mkdir datastore_path if not exists
create_dir_all(datastore_path.clone())?;
create_dir_all(datastore_path.join("month"))?;
create_dir_all(datastore_path.join("task"))?;
// Pick up workspace settings from the TOML configuration
let cfg_path = get_config_path(settings.config, CONFIG_FILE)?;
let configured_ws = parse_workspaces(&cfg_path)?;
let rng = crypto_box::rand_core::OsRng;
if settings.generate {
println!("Generating a new workspace");
loop {
println!("Name for the new workspace: ");
let mut workspace = String::new();
stdin().read_line(&mut workspace).ok().expect("Failed to read line");
let workspace = workspace.to_lowercase();
let workspace = workspace.trim();
if workspace.is_empty() && workspace.len() < 3 {
error!("Wrong workspace try again");
continue
}
let mut rng = crypto_box::rand_core::OsRng;
let secret_key = SecretKey::generate(&mut rng);
let encoded = bs58::encode(secret_key.as_bytes());
println!("workspace: {}:{}", workspace, encoded.into_string());
println!("Please add it to the config file.");
break
}
// start at the first configured workspace
let workspace = if let Some(key) = configured_ws.keys().next() {
Arc::new(Mutex::new(key.to_owned()))
} else {
error!("Please provide at least one workspace in the config file: {:?}", cfg_path);
return Ok(())
};
}
let workspaces = get_workspaces(&settings)?;
if workspaces.is_empty() {
error!("Please add at least one workspace to the config file.");
println!("Run `$ taud --generate` to generate new workspace.");
return Ok(())
}
//
// Raft
//
let net_settings = settings.net;
let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default()));
let datastore_raft = datastore_path.join("tau.db");
@@ -193,13 +226,13 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let mut raft = Raft::<EncryptedTask>::new(raft_settings, seen_net_msgs.clone())?;
let raft_id = raft.id();
let commits_received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
let (broadcast_snd, broadcast_rcv) = async_channel::unbounded::<TaskInfo>();
//
// P2p setup
//
let mut net_settings = settings.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<NetMsg>();
let p2p = net::P2p::new(net_settings.into()).await;
@@ -228,8 +261,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
datastore_path.clone(),
broadcast_snd,
nickname.unwrap(),
workspace,
configured_ws.clone(),
workspaces.clone(),
p2p.clone(),
));
executor.spawn(listen_and_serve(settings.rpc_listen.clone(), rpc_interface)).detach();
@@ -249,12 +281,11 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
executor
.spawn(start_sync_loop(
commits_received.clone(),
broadcast_rcv,
raft.sender(),
raft.receiver(),
datastore_path,
configured_ws,
workspaces,
rng,
))
.detach();

View File

@@ -27,9 +27,12 @@ pub struct Args {
/// Increase verbosity
#[structopt(short, parse(from_occurrences))]
pub verbose: u8,
/// Generate a new secret key
/// Generate a new workspace
#[structopt(long)]
pub key_gen: bool,
pub generate: bool,
/// Secret Key To Encrypt/Decrypt tasks
#[structopt(long)]
pub workspaces: Vec<String>,
/// Clean all the local data in datastore path
/// (BE CAREFULL) Check the datastore path in the config file before running this
#[structopt(long)]

View File

@@ -58,6 +58,8 @@ pub struct TaskComments(Vec<Comment>);
pub struct TaskProjects(Vec<String>);
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TaskAssigns(Vec<String>);
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TaskTags(Vec<String>);
#[derive(Clone, Debug, Serialize, Deserialize, SerialEncodable, SerialDecodable, PartialEq)]
pub struct TaskInfo {
@@ -65,6 +67,7 @@ pub struct TaskInfo {
pub(crate) workspace: String,
id: u32,
title: String,
tags: TaskTags,
desc: String,
owner: String,
assign: TaskAssigns,
@@ -113,6 +116,7 @@ impl TaskInfo {
title: title.into(),
desc: desc.into(),
owner: owner.into(),
tags: TaskTags(vec![]),
assign: TaskAssigns(vec![]),
project: TaskProjects(vec![]),
due,
@@ -176,42 +180,77 @@ impl TaskInfo {
pub fn set_title(&mut self, title: &str) {
debug!(target: "tau", "TaskInfo::set_title()");
self.title = title.into();
self.set_event("title", &title);
}
pub fn set_desc(&mut self, desc: &str) {
debug!(target: "tau", "TaskInfo::set_desc()");
self.desc = desc.into();
self.set_event("desc", &desc);
}
pub fn set_assign(&mut self, assign: &[String]) {
pub fn set_tags(&mut self, tags: &[String]) {
debug!(target: "tau", "TaskInfo::set_tags()");
for tag in tags.iter() {
if tag.starts_with('+') && !self.tags.0.contains(tag) {
self.tags.0.push(tag.to_string());
}
if tag.starts_with('-') {
let t = tag.replace('-', "+");
self.tags.0.retain(|tag| tag != &t);
}
}
self.set_event("tags", &tags.join(", "));
}
pub fn set_assign(&mut self, assigns: &[String]) {
debug!(target: "tau", "TaskInfo::set_assign()");
self.assign = TaskAssigns(assign.to_owned());
self.assign = TaskAssigns(assigns.to_owned());
self.set_event("assign", &assigns.join(", "));
}
pub fn set_project(&mut self, project: &[String]) {
pub fn set_project(&mut self, projects: &[String]) {
debug!(target: "tau", "TaskInfo::set_project()");
self.project = TaskProjects(project.to_owned());
self.project = TaskProjects(projects.to_owned());
self.set_event("project", &projects.join(", "));
}
pub fn set_comment(&mut self, c: Comment) {
debug!(target: "tau", "TaskInfo::set_comment()");
self.comments.0.push(c);
self.comments.0.push(c.clone());
self.set_event("comment", &c.content);
}
pub fn set_rank(&mut self, r: Option<f32>) {
debug!(target: "tau", "TaskInfo::set_rank()");
self.rank = r;
match r {
Some(v) => {
self.set_event("rank", &v.to_string());
}
None => {
self.set_event("rank", "None");
}
}
}
pub fn set_due(&mut self, d: Option<Timestamp>) {
debug!(target: "tau", "TaskInfo::set_due()");
self.due = d;
match d {
Some(v) => {
self.set_event("due", &v.to_string());
}
None => {
self.set_event("due", "None");
}
}
}
pub fn set_event(&mut self, action: &str, owner: &str, content: &str) {
pub fn set_event(&mut self, action: &str, content: &str) {
debug!(target: "tau", "TaskInfo::set_event()");
if !content.is_empty() {
self.events.0.push(TaskEvent::new(action.into(), owner.into(), content.into()));
self.events.0.push(TaskEvent::new(action.into(), self.owner.clone(), content.into()));
}
}
@@ -221,6 +260,7 @@ impl TaskInfo {
return
}
self.state = state.to_string();
self.set_event("state", &state);
}
}
@@ -270,6 +310,18 @@ impl Decodable for TaskAssigns {
}
}
impl Encodable for TaskTags {
fn encode<S: io::Write>(&self, s: S) -> darkfi::Result<usize> {
encode_vec(&self.0, s)
}
}
impl Decodable for TaskTags {
fn decode<D: io::Read>(d: D) -> darkfi::Result<Self> {
Ok(Self(decode_vec(d)?))
}
}
fn encode_vec<T: Encodable, S: io::Write>(vec: &[T], mut s: S) -> darkfi::Result<usize> {
let mut len = 0;
len += VarInt(vec.len() as u64).encode(&mut s)?;

View File

@@ -1,52 +1,3 @@
use std::path::PathBuf;
use fxhash::FxHashMap;
use log::info;
use darkfi::Result;
#[derive(Clone)]
pub struct Workspace {
pub encryption: Option<crypto_box::SalsaBox>,
}
impl Workspace {
pub fn new() -> Result<Self> {
Ok(Self { encryption: None })
}
}
/// Parse the configuration file for any configured workspaces and return
/// a map containing said configurations.
pub fn parse_workspaces(config_file: &PathBuf) -> Result<FxHashMap<String, Workspace>> {
let toml_contents = std::fs::read_to_string(config_file)?;
let mut ret = FxHashMap::default();
if let toml::Value::Table(map) = toml::from_str(&toml_contents)? {
if map.contains_key("workspace") && map["workspace"].is_table() {
for ws in map["workspace"].as_table().unwrap() {
info!("Found configuration for workspace {}", ws.0);
let mut workspace_info = Workspace::new()?;
if ws.1.as_table().unwrap().contains_key("secret") {
// Build the NaCl box
let s = ws.1["secret"].as_str().unwrap();
let bytes: [u8; 32] = bs58::decode(s).into_vec()?.try_into().unwrap();
let secret = crypto_box::SecretKey::from(bytes);
let public = secret.public_key();
let msg_box = crypto_box::SalsaBox::new(&public, &secret);
workspace_info.encryption = Some(msg_box);
info!("Instantiated NaCl box for workspace {}", ws.0);
}
ret.insert(ws.0.to_string(), workspace_info);
}
}
};
Ok(ret)
}
pub fn find_free_id(task_ids: &[u32]) -> u32 {
for i in 1.. {
if !task_ids.contains(&i) {
@@ -60,6 +11,7 @@ pub fn find_free_id(task_ids: &[u32]) -> u32 {
mod tests {
use super::*;
use darkfi::Result;
#[test]
fn find_free_id_test() -> Result<()> {
let mut ids: Vec<u32> = vec![1, 3, 8, 9, 10, 3];

View File

@@ -4,16 +4,19 @@
## Sets Datastore Path
#datastore="~/.tau"
## Current display name
## Current display name
#nickname="NICKNAME"
## Workspaces
# workspaces = ["darkfi:86MGNN31r3VxT4ULMmhQnMtV8pDnod339KwHwHCfabG2"]
## Raft net settings
[net]
## P2P accept addresses
#inbound = ["tcp://127.0.0.1:23331"]
## Connection slots
outbound_connections=5
outbound_connections=8
## P2P external addresses
#external_addr = ["tls://127.0.0.1:23331"]
@@ -21,7 +24,7 @@ outbound_connections=5
## Peers to connect to
#peers = ["tls://127.0.0.1:23331"]
## Seed nodes to connect to
## Seed nodes to connect to
seeds=["tls://lilith0.dark.fi:23331", "tls://lilith1.dark.fi:23331"]
# Prefered transports for outbound connections
@@ -34,8 +37,4 @@ seeds=["tls://lilith0.dark.fi:23331", "tls://lilith1.dark.fi:23331"]
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10
## Per-workspace settings
#[workspace."darkfi"]
## Create with `taud --key-gen`
#secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"

View File

@@ -12,6 +12,9 @@
# Daemon published urls, common for all enabled networks
urls = ["tcp://127.0.0.1"]
# Hosts .tsv file to use
hosts_file="lilith_hosts.tsv"
## Per-network settings
[network."darkfid_sync"]
port = 33032

View File

@@ -43,7 +43,8 @@ local instruction = token('instruction', word_match{
'ec_get_x', 'ec_get_y',
'base_add', 'base_mul', 'base_sub', 'greater_than',
'poseidon_hash', 'merkle_root', 'constrain_instance',
'range_check', 'less_than', 'witness_base',
'range_check', 'less_than', 'bool_check',
'witness_base',
})
-- Identifiers.

View File

@@ -23,7 +23,7 @@ syn keyword zkasInstruction
\ ec_get_x ec_get_y
\ base_add base_mul base_sub
\ poseidon_hash merkle_root constrain_instance
\ range_check less_than witness_base
\ range_check less_than bool_check witness_base
syn region zkasString start='"' end='"' contained

View File

@@ -45,4 +45,6 @@ circuit "Opcodes" {
public = ec_mul_base(secret, NULLIFIER_K);
constrain_instance(ec_get_x(public));
constrain_instance(ec_get_y(public));
bool_check(one);
}

View File

@@ -12,6 +12,9 @@
# Daemon published url, common for all enabled networks
url = ["tls://127.0.0.1"]
# Hosts .tsv file to use
hosts_file="lilith_hosts.tsv"
## Per-network settings
#[network."darkfid_sync"]
#port = 33032

View File

@@ -8,31 +8,30 @@ pub type HostsPtr = Arc<Hosts>;
/// Manages a store of network addresses.
pub struct Hosts {
addrs: Mutex<Vec<Url>>,
addrs: Mutex<FxHashSet<Url>>,
}
impl Hosts {
/// Create a new host list.
pub fn new() -> Arc<Self> {
Arc::new(Self { addrs: Mutex::new(Vec::new()) })
}
/// Checks if a host address is in the host list.
async fn contains(&self, addrs: &[Url]) -> bool {
let a_set: FxHashSet<_> = addrs.iter().cloned().collect();
self.addrs.lock().await.iter().any(|item| a_set.contains(item))
Arc::new(Self { addrs: Mutex::new(FxHashSet::default()) })
}
/// Add a new host to the host list.
pub async fn store(&self, addrs: Vec<Url>) {
if !self.contains(&addrs).await {
self.addrs.lock().await.extend(addrs)
for addr in addrs {
self.addrs.lock().await.insert(addr);
}
}
/// Return the list of hosts.
pub async fn load_all(&self) -> Vec<Url> {
self.addrs.lock().await.clone()
self.addrs.lock().await.iter().cloned().collect()
}
/// Remove an Url from the list
pub async fn remove(&self, url: &Url) -> bool {
self.addrs.lock().await.remove(url)
}
/// Check if the host list is empty.

View File

@@ -6,7 +6,9 @@ use smol::Executor;
use crate::{Error, Result};
use super::super::{message, message_subscriber::MessageSubscription, ChannelPtr, SettingsPtr};
use super::super::{
message, message_subscriber::MessageSubscription, ChannelPtr, HostsPtr, SettingsPtr,
};
/// Implements the protocol version handshake sent out by nodes at the beginning
/// of a connection.
@@ -15,13 +17,14 @@ pub struct ProtocolVersion {
version_sub: MessageSubscription<message::VersionMessage>,
verack_sub: MessageSubscription<message::VerackMessage>,
settings: SettingsPtr,
hosts: HostsPtr,
}
impl ProtocolVersion {
/// Create a new version protocol. Makes a version and version
/// acknowledgement subscription, then adds them to a version protocol
/// instance.
pub async fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc<Self> {
pub async fn new(channel: ChannelPtr, settings: SettingsPtr, hosts: HostsPtr) -> Arc<Self> {
// Creates a version subscription.
let version_sub = channel
.clone()
@@ -36,8 +39,9 @@ impl ProtocolVersion {
.await
.expect("Missing verack dispatcher!");
Arc::new(Self { channel, version_sub, verack_sub, settings })
Arc::new(Self { channel, version_sub, verack_sub, settings, hosts })
}
/// Start version information exchange. Start the timer. Send version info
/// and wait for version acknowledgement. Wait for version info and send
/// version acknowledgement.
@@ -60,6 +64,7 @@ impl ProtocolVersion {
debug!(target: "net", "ProtocolVersion::run() [END]");
Ok(())
}
/// Send and recieve version information.
async fn exchange_versions(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net", "ProtocolVersion::exchange_versions() [START]");
@@ -73,8 +78,9 @@ impl ProtocolVersion {
debug!(target: "net", "ProtocolVersion::exchange_versions() [END]");
Ok(())
}
/// Send version info and wait for version acknowledgement
/// and insures the app version is the same
/// and ensures the app version is the same, if configured.
async fn send_version(self: Arc<Self>) -> Result<()> {
debug!(target: "net", "ProtocolVersion::send_version() [START]");
@@ -85,20 +91,46 @@ impl ProtocolVersion {
// Wait for version acknowledgement
let verack_msg = self.verack_sub.receive().await?;
let app_version = self.settings.app_version.clone();
if app_version != verack_msg.app {
error!(
"Wrong app version from [{:?}]. Disconnecting from channel.",
self.channel.address()
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
// Validate peer received version against our version, if configured.
// Seeds version gets ignored.
if !self.settings.seeds.contains(&self.channel.address()) {
match &self.settings.app_version {
Some(app_version) => {
debug!(target: "net", "ProtocolVersion::send_version() [App version: {}]", app_version);
debug!(target: "net", "ProtocolVersion::send_version() [Recieved version: {}]", verack_msg.app);
// Version format: MAJOR.MINOR.PATCH
let app_versions: Vec<&str> = app_version.split('.').collect();
let verack_msg_versions: Vec<&str> = verack_msg.app.split('.').collect();
// Check for malformed versions
if app_versions.len() != 3 || verack_msg_versions.len() != 3 {
error!("ProtocolVersion::send_version() [Malformed version detected. Disconnecting from channel.]");
self.hosts.remove(&self.channel.address()).await;
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
// Ignore PATCH version
if app_versions[0] != verack_msg_versions[0] ||
app_versions[1] != verack_msg_versions[1]
{
error!(
"ProtocolVersion::send_version() [Wrong app version from ({}). Disconnecting from channel.]",
self.channel.address()
);
self.hosts.remove(&self.channel.address()).await;
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
}
None => {
debug!(target: "net", "ProtocolVersion::send_version() [App version not set, ignorring received]")
}
}
}
debug!(target: "net", "ProtocolVersion::send_version() [END]");
Ok(())
}
/// Recieve version info, check the message is okay and send version
/// acknowledgement with app version attached.
async fn recv_version(self: Arc<Self>) -> Result<()> {
@@ -108,7 +140,9 @@ impl ProtocolVersion {
self.channel.set_remote_node_id(version.node_id.clone()).await;
// Send version acknowledgement
let verack = message::VerackMessage { app: self.settings.app_version.clone() };
let verack = message::VerackMessage {
app: self.settings.app_version.clone().unwrap_or("".to_string()),
};
self.channel.clone().send(verack).await?;
debug!(target: "net", "ProtocolVersion::recv_version() [END]");

View File

@@ -113,7 +113,9 @@ pub trait Session: Sync {
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let protocol_version =
ProtocolVersion::new(channel.clone(), p2p.settings().clone(), p2p.hosts().clone())
.await;
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());

View File

@@ -5,7 +5,7 @@ use async_executor::Executor;
use async_trait::async_trait;
use log::{debug, info, warn};
use rand::seq::SliceRandom;
use serde_json::json;
use serde_json::{json, Value};
use url::Url;
use crate::{
@@ -293,8 +293,13 @@ impl Session for OutboundSession {
slots.push(info.get_info().await);
}
let hosts = self.p2p().hosts().load_all().await;
let addrs: Vec<Value> =
hosts.iter().map(|addr| serde_json::Value::String(addr.to_string())).collect();
json!({
"slots": slots,
"hosts": serde_json::Value::Array(addrs),
})
}

View File

@@ -25,7 +25,7 @@ pub struct Settings {
pub peers: Vec<Url>,
pub seeds: Vec<Url>,
pub node_id: String,
pub app_version: String,
pub app_version: Option<String>,
pub outbound_transports: Vec<TransportName>,
pub localnet: bool,
}
@@ -45,7 +45,7 @@ impl Default for Settings {
peers: Vec::new(),
seeds: Vec::new(),
node_id: String::new(),
app_version: option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string(),
app_version: Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string()),
outbound_transports: get_outbound_transports(vec![]),
localnet: false,
}
@@ -99,7 +99,7 @@ pub struct SettingsOpt {
#[serde(default)]
#[structopt(skip)]
pub app_version: String,
pub app_version: Option<String>,
/// Prefered transports for outbound connections
#[serde(default)]

View File

@@ -24,7 +24,7 @@ impl Default for RaftSettings {
heartbeat_timeout: 500,
timeout: 6,
id_timeout: 12,
prun_duration: 240,
prun_duration: 30,
datastore_path: PathBuf::from(""),
}
}

View File

@@ -168,8 +168,13 @@ macro_rules! async_daemonize {
let log_file_path = match std::env::var("DARKFI_LOG") {
Ok(p) => p,
Err(_) => {
std::fs::create_dir_all(expand_path("~/.local")?)?;
"~/.local/darkfi.log".into()
let bin_name = if let Some(bin_name) = option_env!("CARGO_BIN_NAME") {
bin_name
} else {
"darkfi"
};
std::fs::create_dir_all(expand_path("~/.local/darkfi")?)?;
format!("~/.local/darkfi/{}.log", bin_name)
}
};

View File

@@ -1,10 +1,13 @@
/// Base field scalar arithmetic
/// Base field arithmetic gadget
pub mod arithmetic;
/// Field-native range check gadget;
/// Small range check, 0..8 bits
pub mod small_range_check;
/// Field-native range check gadget with a lookup table
pub mod native_range_check;
// Field-native less than comparison gadget
/// Field-native less than comparison gadget with a lookup table
pub mod less_than;
/// is_zero comparison gadget

View File

@@ -0,0 +1,143 @@
use halo2_proofs::{
arithmetic::FieldExt,
circuit::{AssignedCell, Chip, Layouter},
pasta::pallas,
plonk,
plonk::{Advice, Column, ConstraintSystem, Constraints, Expression, Selector},
poly::Rotation,
};
/// Checks that an expression is in the small range [0..range),
/// i.e. 0 ≤ word < range.
pub fn range_check<F: FieldExt>(word: Expression<F>, range: u8) -> Expression<F> {
(1..(range as usize))
.fold(word.clone(), |acc, i| acc * (Expression::Constant(F::from(i as u64)) - word.clone()))
}
#[derive(Clone, Debug)]
pub struct SmallRangeCheckConfig {
pub z: Column<Advice>,
pub selector: Selector,
}
#[derive(Clone, Debug)]
pub struct SmallRangeCheckChip {
config: SmallRangeCheckConfig,
}
impl Chip<pallas::Base> for SmallRangeCheckChip {
type Config = SmallRangeCheckConfig;
type Loaded = ();
fn config(&self) -> &Self::Config {
&self.config
}
fn loaded(&self) -> &Self::Loaded {
&()
}
}
impl SmallRangeCheckChip {
pub fn construct(config: SmallRangeCheckConfig) -> Self {
Self { config }
}
pub fn configure(
meta: &mut ConstraintSystem<pallas::Base>,
z: Column<Advice>,
range: u8,
) -> SmallRangeCheckConfig {
// Enable permutation on z column
meta.enable_equality(z);
let selector = meta.selector();
meta.create_gate("bool check", |meta| {
let selector = meta.query_selector(selector);
let advice = meta.query_advice(z, Rotation::cur());
Constraints::with_selector(selector, Some(range_check(advice, range)))
});
SmallRangeCheckConfig { z, selector }
}
pub fn small_range_check(
&self,
mut layouter: impl Layouter<pallas::Base>,
value: AssignedCell<pallas::Base, pallas::Base>,
) -> Result<(), plonk::Error> {
layouter.assign_region(
|| "small range constrain",
|mut region| {
self.config.selector.enable(&mut region, 0)?;
value.copy_advice(|| "z_0", &mut region, self.config.z, 0)?;
Ok(())
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::zk::assign_free_advice;
use halo2_proofs::{
circuit::{floor_planner, Value},
dev::MockProver,
plonk,
plonk::Circuit,
};
#[derive(Default)]
struct SmallRangeCircuit {
value: Value<pallas::Base>,
}
impl Circuit<pallas::Base> for SmallRangeCircuit {
type Config = (SmallRangeCheckConfig, Column<Advice>);
type FloorPlanner = floor_planner::V1;
fn without_witnesses(&self) -> Self {
Self::default()
}
fn configure(meta: &mut ConstraintSystem<pallas::Base>) -> Self::Config {
let w = meta.advice_column();
let z = meta.advice_column();
meta.enable_equality(w);
// One bit
let config = SmallRangeCheckChip::configure(meta, z, 2);
(config, w)
}
fn synthesize(
&self,
config: Self::Config,
mut layouter: impl Layouter<pallas::Base>,
) -> Result<(), plonk::Error> {
let chip = SmallRangeCheckChip::construct(config.0.clone());
let value = assign_free_advice(layouter.namespace(|| "val"), config.1, self.value)?;
chip.small_range_check(layouter.namespace(|| "boolean check"), value)?;
Ok(())
}
}
#[test]
fn boolean_range_check() {
let k = 3;
for i in 0..2 {
let circuit = SmallRangeCircuit { value: Value::known(pallas::Base::from(i as u64)) };
let prover = MockProver::run(k, &circuit, vec![]).unwrap();
prover.assert_satisfied();
}
let circuit = SmallRangeCircuit { value: Value::known(pallas::Base::from(2)) };
let prover = MockProver::run(k, &circuit, vec![]).unwrap();
assert!(prover.verify().is_err());
}
}

View File

@@ -31,6 +31,7 @@ use super::{
arithmetic::{ArithChip, ArithConfig, ArithInstruction},
less_than::{LessThanChip, LessThanConfig},
native_range_check::{NativeRangeCheckChip, NativeRangeCheckConfig},
small_range_check::{SmallRangeCheckChip, SmallRangeCheckConfig},
},
};
use crate::{
@@ -59,6 +60,7 @@ pub struct VmConfig {
native_64_range_check_config: NativeRangeCheckConfig<3, 64, 22>,
native_253_range_check_config: NativeRangeCheckConfig<3, 253, 85>,
lessthan_config: LessThanConfig<3, 253, 85>,
boolcheck_config: SmallRangeCheckConfig,
}
impl VmConfig {
@@ -232,6 +234,10 @@ impl Circuit<pallas::Base> for ZkCircuit {
k_values_table_253,
);
// Configuration for boolean checks, it uses the small_range_check
// chip with a range of 2, which enforces one bit, i.e. 0 or 1.
let boolcheck_config = SmallRangeCheckChip::configure(meta, advices[9], 2);
VmConfig {
primary,
advices,
@@ -245,6 +251,7 @@ impl Circuit<pallas::Base> for ZkCircuit {
native_64_range_check_config,
native_253_range_check_config,
lessthan_config,
boolcheck_config,
}
}
@@ -300,6 +307,9 @@ impl Circuit<pallas::Base> for ZkCircuit {
// Construct the Arithmetic chip.
let arith_chip = config.arithmetic_chip();
// Construct the boolean check chip.
let boolcheck_chip = SmallRangeCheckChip::construct(config.boolcheck_config.clone());
// ==========================
// Constants setup
// ==========================
@@ -697,6 +707,16 @@ impl Circuit<pallas::Base> for ZkCircuit {
)?;
}
Opcode::BoolCheck => {
debug!("Executing `BoolCheck{:?}` opcode", opcode.1);
let args = &opcode.1;
let w = stack[args[0].1].clone().into();
boolcheck_chip
.small_range_check(layouter.namespace(|| "copy boolean check"), w)?;
}
Opcode::ConstrainInstance => {
debug!("Executing `ConstrainInstance{:?}` opcode", opcode.1);
let args = &opcode.1;

View File

@@ -49,6 +49,9 @@ pub enum Opcode {
/// Compare two Base field elements and see if a is less than b
LessThan = 0x51,
/// Check if a field element fits in a boolean (Either 0 or 1)
BoolCheck = 0x52,
/// Constrain a Base field element to a circuit's public input
ConstrainInstance = 0xf0,
@@ -73,6 +76,7 @@ impl Opcode {
"witness_base" => Some(Self::WitnessBase),
"range_check" => Some(Self::RangeCheck),
"less_than" => Some(Self::LessThan),
"bool_check" => Some(Self::BoolCheck),
"constrain_instance" => Some(Self::ConstrainInstance),
"debug" => Some(Self::DebugPrint),
_ => None,
@@ -95,6 +99,7 @@ impl Opcode {
0x40 => Some(Self::WitnessBase),
0x50 => Some(Self::RangeCheck),
0x51 => Some(Self::LessThan),
0x52 => Some(Self::BoolCheck),
0xf0 => Some(Self::ConstrainInstance),
0xff => Some(Self::DebugPrint),
_ => None,
@@ -141,6 +146,8 @@ impl Opcode {
Opcode::LessThan => (vec![], vec![VarType::Base, VarType::Base]),
Opcode::BoolCheck => (vec![], vec![VarType::Base]),
Opcode::ConstrainInstance => (vec![], vec![VarType::Base]),
Opcode::DebugPrint => (vec![], vec![]),