diff --git a/bin/darkirc/proof/rlnv2-diff-signal.zk b/bin/darkirc/proof/rlnv2-diff-signal.zk index beaf3fb4d..e84f1d823 100644 --- a/bin/darkirc/proof/rlnv2-diff-signal.zk +++ b/bin/darkirc/proof/rlnv2-diff-signal.zk @@ -1,44 +1,48 @@ -k = 13; +k = 14; field = "pallas"; -constant "RlnV2_Diff_Signal" {} +constant "Rlnv2Diff_Signal" {} -witness "RlnV2_Diff_Signal" { - Base identity_nullifier, - Base identity_trapdoor, +witness "Rlnv2Diff_Signal" { + Base identity_nullifier, + Base identity_trapdoor, + Base user_message_limit, - MerklePath identity_path, - Uint32 identity_leaf_pos, + # Inclusion proof, the leaf is the identity_commitment + SparseMerklePath path, - Base x, # The message hash - Base external_nullifier, # Hash(Epoch, RLN identifier) + # The message hash + Base x, + Base message_id, - Base message_id, - Base user_message_limit, - - Base epoch, + Base epoch, } -circuit "RlnV2_Diff_Signal" { - constrain_instance(epoch); - constrain_instance(external_nullifier); +circuit "Rlnv2Diff_Signal" { + # Identity inclusion proof + identity_secret = poseidon_hash(identity_nullifier, identity_trapdoor); + identity_secret_hash = poseidon_hash(identity_secret, user_message_limit); + identity_commitment = poseidon_hash(identity_secret_hash); + root = sparse_merkle_root(identity_commitment, path, identity_commitment); + constrain_instance(root); - less_than_strict(message_id, user_message_limit); + # External nullifier is created from epoch and app identifier + app_id = witness_base(1000); + external_nullifier = poseidon_hash(epoch, app_id); + constrain_instance(external_nullifier); - # Identity secret hash - a_0 = poseidon_hash(identity_nullifier, identity_trapdoor); - a_1 = poseidon_hash(a_0, external_nullifier, message_id); + # Calculating internal nullifier + # a_0 = identity_secret_hash + a_0 = poseidon_hash(identity_nullifier, identity_trapdoor); + a_1 = poseidon_hash(a_0, external_nullifier, message_id); + x_a_1 = base_mul(x, a_1); + y = base_add(a_0, x_a_1); + constrain_instance(x); + constrain_instance(y); - # y = a_0 + x * a_1 - x_a_1 = base_mul(x, a_1); - y = base_add(a_0, x_a_1); - constrain_instance(x); - constrain_instance(y); + # Constrain message_id to be lower than actual message limit. + less_than_strict(message_id, user_message_limit); - internal_nullifier = poseidon_hash(a_1); - constrain_instance(internal_nullifier); - - identity_commitment = poseidon_hash(a_0, user_message_limit); - root = merkle_root(identity_leaf_pos, identity_path, identity_commitment); - constrain_instance(root); + internal_nullifier = poseidon_hash(a_1); + constrain_instance(internal_nullifier); } diff --git a/bin/darkirc/src/crypto/rln.rs b/bin/darkirc/src/crypto/rln.rs index f78252fb5..fef37ea59 100644 --- a/bin/darkirc/src/crypto/rln.rs +++ b/bin/darkirc/src/crypto/rln.rs @@ -28,10 +28,10 @@ use darkfi::{ Result, }; use darkfi_sdk::{ - bridgetree::Position, - crypto::{pasta_prelude::FromUniformBytes, poseidon_hash, MerkleTree}, + crypto::{pasta_prelude::FromUniformBytes, poseidon_hash, smt::SmtMemoryFp}, pasta::pallas, }; +use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable}; use log::info; use rand::{rngs::OsRng, CryptoRng, RngCore}; @@ -47,6 +47,9 @@ pub const RLN_EPOCH_LEN: u64 = 600; // 10 min pub const RLN2_SIGNAL_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-signal.zk.bin"); pub const RLN2_SLASH_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-slash.zk.bin"); +/// TODO: this should be configurable +pub const USER_MSG_LIMIT: u64 = 6; + /// Find closest epoch to given timestamp pub fn closest_epoch(timestamp: u64) -> u64 { let time_diff = timestamp - RLN_GENESIS; @@ -62,7 +65,7 @@ pub fn hash_event(event: &Event) -> pallas::Base { pallas::Base::from_uniform_bytes(&buf) } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, SerialEncodable, SerialDecodable)] pub struct RlnIdentity { pub nullifier: pallas::Base, pub trapdoor: pallas::Base, @@ -81,25 +84,52 @@ impl RlnIdentity { pallas::Base::random(&mut rng), ]), trapdoor: poseidon_hash([RLN_TRAPDOOR_DERIVATION_PATH, pallas::Base::random(&mut rng)]), - user_message_limit: 100, + user_message_limit: USER_MSG_LIMIT, message_id: 1, last_epoch: closest_epoch(UNIX_EPOCH.elapsed().unwrap().as_secs()), } } pub fn commitment(&self) -> pallas::Base { - poseidon_hash([ - poseidon_hash([self.nullifier, self.trapdoor]), - pallas::Base::from(self.user_message_limit), - ]) + let identity_secret = poseidon_hash([self.nullifier, self.trapdoor]); + let identity_secret_hash = poseidon_hash([identity_secret, self.user_message_limit.into()]); + + poseidon_hash([identity_secret_hash]) } + // pub fn _create_register_proof( + // &self, + // event: &Event, + // identity_tree: &mut SmtMemoryFp, + // register_pk: &ProvingKey, + // ) -> Result { + // let witnesses = vec![ + // Witness::Base(Value::known(self.nullifier)), + // Witness::Base(Value::known(self.trapdoor)), + // Witness::Base(Value::known(pallas::Base::from(self.user_message_limit))), + // ]; + + // let public_inputs = vec![self.commitment(), pallas::Base::from(self.user_message_limit)]; + + // info!(target: "crypto::rln::create_register_proof", "[RLN] Creating register proof for event {}", event.header.id()); + // let register_zkbin = ZkBinary::decode(RLN2_REGISTER_ZKBIN)?; + // let register_circuit = ZkCircuit::new(witnesses, ®ister_zkbin); + + // let proof = + // Proof::create(®ister_pk, &[register_circuit], &public_inputs, &mut OsRng).unwrap(); + + // let leaf = vec![self.commitment()]; + // let leaf: Vec<_> = leaf.into_iter().map(|l| (l, l)).collect(); + // // TODO: Recipients should verify that identity doesn't exist already before insert. + // identity_tree.insert_batch(leaf.clone()).unwrap(); // leaf == pos + // Ok(proof) + // } + pub fn create_signal_proof( &self, event: &Event, - identity_tree: &MerkleTree, - identity_pos: Position, - proving_key: &ProvingKey, + identity_tree: &SmtMemoryFp, + signal_pk: &ProvingKey, ) -> Result<(Proof, Vec)> { // 1. Construct share let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); @@ -112,32 +142,32 @@ impl RlnIdentity { let internal_nullifier = poseidon_hash([a_1]); - // 2. Create Merkle proof - let identity_root = identity_tree.root(0).unwrap(); - let identity_path = identity_tree.witness(identity_pos, 0).unwrap(); + // 2. Inclusion proof + let identity_root = identity_tree.root(); + let identity_path = identity_tree.prove_membership(&self.commitment()); + // TODO: Delete me later + assert!(identity_path.verify(&identity_root, &self.commitment(), &self.commitment())); // 3. Create ZK proof let witnesses = vec![ Witness::Base(Value::known(self.nullifier)), Witness::Base(Value::known(self.trapdoor)), - Witness::MerklePath(Value::known(identity_path.clone().try_into().unwrap())), - Witness::Uint32(Value::known(u64::from(identity_pos).try_into().unwrap())), - Witness::Base(Value::known(x)), - Witness::Base(Value::known(external_nullifier)), - Witness::Base(Value::known(message_id)), Witness::Base(Value::known(pallas::Base::from(self.user_message_limit))), + Witness::SparseMerklePath(Value::known(identity_path.path)), + Witness::Base(Value::known(x)), + Witness::Base(Value::known(message_id)), Witness::Base(Value::known(epoch)), ]; - let public_inputs = - vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()]; + let public_inputs = vec![identity_root, external_nullifier, x, y, internal_nullifier]; - info!(target: "crypto::rln::create_proof", "[RLN] Creating proof for event {}", event.header.id()); + info!(target: "crypto::rln::create_signal_proof", "[RLN] Creating signal proof for event {}", event.header.id()); let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; let signal_circuit = ZkCircuit::new(witnesses, &signal_zkbin); - let proof = Proof::create(proving_key, &[signal_circuit], &public_inputs, &mut OsRng)?; - Ok((proof, vec![y, internal_nullifier])) + let proof = Proof::create(signal_pk, &[signal_circuit], &public_inputs, &mut OsRng)?; + // Ok((proof, vec![y, internal_nullifier])) + Ok((proof, public_inputs)) } } diff --git a/bin/darkirc/src/irc/client.rs b/bin/darkirc/src/irc/client.rs index 1990ce24a..b5cac8943 100644 --- a/bin/darkirc/src/irc/client.rs +++ b/bin/darkirc/src/irc/client.rs @@ -28,16 +28,15 @@ use std::{ use darkfi::{ event_graph::{proto::EventPut, Event, NULL_ID}, system::Subscription, - zk::{empty_witnesses, Proof, ProvingKey, ZkCircuit}, + zk::{empty_witnesses, Proof, ProvingKey, VerifyingKey, ZkCircuit}, zkas::ZkBinary, Error, Result, }; use darkfi_sdk::{ - bridgetree::Position, - crypto::{pasta_prelude::PrimeField, poseidon_hash, MerkleTree}, - pasta::pallas, + crypto::util::FieldElemAsStr, + pasta::{pallas, Fp}, }; -use darkfi_serial::{deserialize_async, serialize_async}; +use darkfi_serial::{deserialize_async_partial, serialize_async}; use futures::FutureExt; use log::{debug, error, info, warn}; use sled_overlay::sled; @@ -52,9 +51,7 @@ use super::{ server::{IrcServer, MAX_MSG_LEN}, Msg, NickServ, OldPrivmsg, SERVER_NAME, }; -use crate::crypto::rln::{ - closest_epoch, hash_event, RlnIdentity, RLN2_SIGNAL_ZKBIN, RLN_APP_IDENTIFIER, -}; +use crate::crypto::rln::{closest_epoch, RlnIdentity, RLN2_SIGNAL_ZKBIN}; const PENALTY_LIMIT: usize = 5; @@ -78,6 +75,8 @@ pub struct Client { pub server: Arc, /// Subscription for incoming events pub incoming: Subscription, + /// Subscription for incoming static events + pub incoming_st: Subscription, /// Client socket addr pub addr: SocketAddr, /// ID of the last sent event @@ -114,6 +113,7 @@ impl Client { pub async fn new( server: Arc, incoming: Subscription, + incoming_st: Subscription, addr: SocketAddr, ) -> Result { let caps = @@ -125,6 +125,7 @@ impl Client { Ok(Self { server: server.clone(), incoming, + incoming_st, addr, last_sent: RwLock::new(NULL_ID), channels: RwLock::new(HashSet::new()), @@ -168,18 +169,21 @@ impl Client { if let Ok(0) = r { error!("[IRC CLIENT] Read failed for {}: Client disconnected", self.addr); self.incoming.unsubscribe().await; + self.incoming_st.unsubscribe().await; return Err(Error::ChannelStopped) } // If something failed during reading, we disconnect. if let Err(e) = r { error!("[IRC CLIENT] Read failed for {}: {}", self.addr, e); self.incoming.unsubscribe().await; + self.incoming_st.unsubscribe().await; return Err(Error::ChannelStopped) } // If the penalty limit is reached, disconnect the client. if self.penalty.load(SeqCst) == PENALTY_LIMIT { self.incoming.unsubscribe().await; + self.incoming_st.unsubscribe().await; return Err(Error::ChannelStopped) } @@ -221,7 +225,7 @@ impl Client { rln_identity.message_id += 1; - let (_proof, _public_inputs) = match self.create_rln_signal_proof(&rln_identity, &event).await { + let (proof, public_inputs) = match self.create_rln_signal_proof(&rln_identity, &event).await { Ok(v) => v, Err(e) => { // TODO: Send a message to the IRC client telling that sending went wrong @@ -231,10 +235,44 @@ impl Client { } }; - self.server.darkirc.p2p.broadcast(&EventPut(event)).await; + ///////// Temporary ///////// + let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; + let signal_circuit = ZkCircuit::new(empty_witnesses(&signal_zkbin)?, &signal_zkbin); + let Some(signal_vk) = self.server.server_store.get("rlnv2-diff-signal-vk")? else { + return Err(Error::DatabaseError( + "RLN signal verifying key not found in server store".to_string(), + )) + }; + let mut reader = Cursor::new(signal_vk); + let signal_vk = VerifyingKey::read(&mut reader, signal_circuit)?; + + // let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); + // let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]); + // let x = hash_event(&event); + // let y = public_inputs[0]; + // let internal_nullifier = public_inputs[1]; + + // // Fetch SMT root + // let identity_tree = self.server.rln_identity_tree.read().await; + // let identity_root = identity_tree.root(); + + // let public_inputs = + // vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root]; + + // Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)? + + match proof.verify(&signal_vk, &public_inputs) { + Ok(_) => info!("it works"), + Err(e) => panic!("it doesn't work because {e}") + } + ///////////////////////////// + + let blob = serialize_async(&(proof, public_inputs)).await; + + self.server.darkirc.p2p.broadcast(&EventPut(event, blob)).await; } else { // Broadcast it - self.server.darkirc.p2p.broadcast(&EventPut(event)).await; + self.server.darkirc.p2p.broadcast(&EventPut(event, vec![])).await; } } } @@ -246,6 +284,7 @@ impl Client { // If we got an error, we disconnect the client. Err(e) => { self.incoming.unsubscribe().await; + self.incoming_st.unsubscribe().await; return Err(e) } } @@ -261,6 +300,7 @@ impl Client { // ) -> Result> {> // for which the logic for delivery should be kept in sync r = self.incoming.receive().fuse() => { + info!("incoming: {:?}", r); // We will skip this if it's our own message. let event_id = r.header.id(); if *self.last_sent.read().await == event_id { @@ -277,49 +317,6 @@ impl Client { } } - // If the Event contains an appended blob of data, try to check if it's - // a RLN Signal proof and verify it. - //if false { - let mut verification_failed = false; - #[allow(clippy::never_loop)] - loop { - let (event, blob) = (r.clone(), vec![0,1,2]); - let (proof, public_inputs): (Proof, Vec) = match deserialize_async(&blob).await { - Ok(v) => v, - Err(_) => { - // TODO: FIXME: This logic should be better written. - // Right now we don't enforce RLN so we can just fall-through. - //error!("[IRC CLIENT] Failed deserializing event ephemeral data: {}", e); - break - } - }; - - if public_inputs.len() != 2 { - error!("[IRC CLIENT] Received event has the wrong number of public inputs"); - verification_failed = true; - break - } - - info!("[IRC CLIENT] Verifying incoming Event RLN proof"); - if self.verify_rln_signal_proof( - &event, - proof, - [public_inputs[0], public_inputs[1]], - ).await.is_err() { - verification_failed = true; - break - } - - // TODO: Store for secret shares recovery - info!("[IRC CLIENT] RLN verification successful"); - break - } - - if verification_failed { - error!("[IRC CLIENT] Incoming Event proof verification failed"); - continue - } - // Try to deserialize the `Event`'s content into a `Privmsg` let mut privmsg = match Msg::deserialize(r.content()).await { Ok(Msg::V1(old_msg)) => old_msg.into_new(), @@ -381,6 +378,50 @@ impl Client { return Err(e) } } + + // Process message from the network. These should only be RLN identities. + r = self.incoming_st.receive().fuse() => { + + // We will skip this if it's our own message. + let event_id = r.header.id(); + if *self.last_sent.read().await == event_id { + continue + } + + // If this event was seen, skip it + match self.is_seen(&event_id).await { + Ok(true) => continue, + Ok(false) => {}, + Err(e) => { + error!("[IRC CLIENT] (multiplex_connection) self.is_seen({}) failed: {}", event_id, e); + return Err(e) + } + } + + // Update SMT + let mut identities_tree = self.server.rln_identity_tree.write().await; + let fetched_rln_commitment: Fp = match deserialize_async_partial(r.content()).await + { + Ok((v, _)) => v, + Err(e) => { + error!(target: "irc::server", "[RLN] Failed deserializing incoming RLN Identity events: {}", e); + continue + } + }; + + info!("multiplex commitment: {}",fetched_rln_commitment.to_string()); + + let commitment = vec![fetched_rln_commitment]; + let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect(); + identities_tree.insert_batch(commitment)?; + info!("multiplex root: {}",identities_tree.root().to_string()); + + // Mark the message as seen for this USER + if let Err(e) = self.mark_seen(&event_id).await { + error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e); + return Err(e) + } + } } } } @@ -582,31 +623,46 @@ impl Client { Ok(db.contains_key(event_id.as_bytes())?) } + /// Abstraction for RLN register proof creation + // async fn _create_rln_register_proof( + // &self, + // rln_identity: &RlnIdentity, + // event: &Event, + // ) -> Result { + // let mut identity_tree = self.server.rln_identity_tree.write().await; + + // // Retrieve the ZK proving key from the db + // let register_zkbin = ZkBinary::decode(RLN2_REGISTER_ZKBIN)?; + // let register_circuit = ZkCircuit::new(empty_witnesses(®ister_zkbin)?, ®ister_zkbin); + // let Some(register_pk) = self.server.server_store.get("rlnv2-diff-register-pk")? else { + // return Err(Error::DatabaseError( + // "RLN register proving key not found in server store".to_string(), + // )) + // }; + // let mut reader = Cursor::new(register_pk); + // let register_pk = ProvingKey::read(&mut reader, register_circuit)?; + + // rln_identity._create_register_proof(event, &mut identity_tree, ®ister_pk) + // } + + // /// Abstraction for RLN register proof verification + // async fn _verify_rln_register_proof( + // &self, + // _event: &Event, + // proof: Proof, + // public_inputs: [pallas::Base; 2], + // register_vk: VerifyingKey, + // ) -> Result<()> { + // Ok(proof.verify(®ister_vk, &public_inputs)?) + // } + /// Abstraction for RLN signal proof creation async fn create_rln_signal_proof( &self, rln_identity: &RlnIdentity, event: &Event, ) -> Result<(Proof, Vec)> { - let identity_commitment = rln_identity.commitment(); - - // Fetch the commitment's leaf position in the Merkle tree - let Some(identity_pos) = - self.server.rln_identity_store.get(identity_commitment.to_repr())? - else { - return Err(Error::DatabaseError( - "Identity not found in commitment tree store".to_string(), - )) - }; - let identity_pos: Position = deserialize_async(&identity_pos).await?; - - // Fetch the latest commitment Merkle tree - let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else { - return Err(Error::DatabaseError( - "RLN Identity tree not found in server store".to_string(), - )) - }; - let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?; + let identity_tree = self.server.rln_identity_tree.read().await; // Retrieve the ZK proving key from the db let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; @@ -619,34 +675,34 @@ impl Client { let mut reader = Cursor::new(proving_key); let proving_key = ProvingKey::read(&mut reader, signal_circuit)?; - rln_identity.create_signal_proof(event, &identity_tree, identity_pos, &proving_key) + rln_identity.create_signal_proof(event, &identity_tree, &proving_key) } - /// Abstraction for RLN signal proof verification - async fn verify_rln_signal_proof( - &self, - event: &Event, - proof: Proof, - public_inputs: [pallas::Base; 2], - ) -> Result<()> { - let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); - let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]); - let x = hash_event(event); - let y = public_inputs[0]; - let internal_nullifier = public_inputs[1]; + // /// Abstraction for RLN signal proof verification + // async fn _verify_rln_signal_proof( + // &self, + // event: &Event, + // proof: Proof, + // public_inputs: [pallas::Base; 2], + // ) -> Result<()> { + // let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); + // let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]); + // let x = hash_event(event); + // let y = public_inputs[0]; + // let internal_nullifier = public_inputs[1]; - // Fetch the latest commitment Merkle tree - let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else { - return Err(Error::DatabaseError( - "RLN Identity tree not found in server store".to_string(), - )) - }; - let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?; - let identity_root = identity_tree.root(0).unwrap(); + // // Fetch the latest commitment Merkle tree + // let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else { + // return Err(Error::DatabaseError( + // "RLN Identity tree not found in server store".to_string(), + // )) + // }; + // let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?; + // let identity_root = identity_tree.root(0).unwrap(); - let public_inputs = - vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()]; + // let public_inputs = + // vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()]; - Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)?) - } + // Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)?) + // } } diff --git a/bin/darkirc/src/irc/command.rs b/bin/darkirc/src/irc/command.rs index 6fdfcebde..8baa7d719 100644 --- a/bin/darkirc/src/irc/command.rs +++ b/bin/darkirc/src/irc/command.rs @@ -706,7 +706,7 @@ impl Client { // Handle queries to NickServ if target.to_lowercase().as_str() == "nickserv" { - return self.nickserv.handle_query(message.strip_prefix(':').unwrap()).await + return self.nickserv.handle_query(args).await } // If it's a DM and we don't have an encryption key, we will diff --git a/bin/darkirc/src/irc/server.rs b/bin/darkirc/src/irc/server.rs index 3a9c93796..9cd39e406 100644 --- a/bin/darkirc/src/irc/server.rs +++ b/bin/darkirc/src/irc/server.rs @@ -16,13 +16,7 @@ * along with this program. If not, see . */ -use std::{ - collections::HashMap, - fs::File, - io::{BufReader, Cursor}, - path::PathBuf, - sync::Arc, -}; +use std::{collections::HashMap, fs::File, io::BufReader, path::PathBuf, sync::Arc}; use darkfi::{ event_graph::Event, @@ -32,8 +26,11 @@ use darkfi::{ zkas::ZkBinary, Error, Result, }; -use darkfi_sdk::crypto::MerkleTree; -use darkfi_serial::serialize_async; +use darkfi_sdk::{ + crypto::smt::{MemoryStorageFp, PoseidonFp, SmtMemoryFp, EMPTY_NODES_FP}, + pasta::Fp, +}; +use darkfi_serial::{deserialize_async, deserialize_async_partial}; use futures_rustls::{ rustls::{self, pki_types::PrivateKeyDer}, TlsAcceptor, @@ -49,7 +46,11 @@ use smol::{ }; use url::Url; -use super::{client::Client, IrcChannel, IrcContact, Priv, Privmsg}; +use super::{ + client::Client, + services::nickserv::{ACCOUNTS_DB_PREFIX, ACCOUNTS_KEY_RLN_IDENTITY}, + IrcChannel, IrcContact, Priv, Privmsg, +}; use crate::{ crypto::{ rln::{RlnIdentity, RLN2_SIGNAL_ZKBIN, RLN2_SLASH_ZKBIN}, @@ -93,9 +94,7 @@ pub struct IrcServer { /// Persistent server storage pub server_store: sled::Tree, /// RLN identity storage - pub rln_identity_store: sled::Tree, - /// RLN Signal VerifyingKey - pub rln_signal_vk: VerifyingKey, + pub rln_identity_tree: RwLock, } impl IrcServer { @@ -152,7 +151,9 @@ impl IrcServer { // Open persistent dbs let server_store = darkirc.sled.open_tree("server_store")?; - let rln_identity_store = darkirc.sled.open_tree("rln_identity_store")?; + let hasher = PoseidonFp::new(); + let store = MemoryStorageFp::new(); + let mut identity_tree = SmtMemoryFp::new(store, hasher.clone(), &EMPTY_NODES_FP); // Generate RLN proving and verifying keys, if needed let rln_signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; @@ -167,20 +168,13 @@ impl IrcServer { server_store.insert("rlnv2-diff-signal-pk", buf)?; } - let rln_signal_vk = match server_store.get("rlnv2-diff-signal-vk")? { - Some(vk) => { - let mut reader = Cursor::new(vk); - VerifyingKey::read(&mut reader, rln_signal_circuit)? - } - None => { - info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey"); - let verifyingkey = VerifyingKey::build(rln_signal_zkbin.k, &rln_signal_circuit); - let mut buf = vec![]; - verifyingkey.write(&mut buf)?; - server_store.insert("rlnv2-diff-signal-vk", buf)?; - verifyingkey - } - }; + if server_store.get("rlnv2-diff-signal-vk")?.is_none() { + info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey"); + let verifyingkey = VerifyingKey::build(rln_signal_zkbin.k, &rln_signal_circuit); + let mut buf = vec![]; + verifyingkey.write(&mut buf)?; + server_store.insert("rlnv2-diff-signal-vk", buf)?; + } if server_store.get("rlnv2-diff-slash-pk")?.is_none() { info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash ProvingKey"); @@ -194,7 +188,7 @@ impl IrcServer { if server_store.get("rlnv2-diff-slash-vk")?.is_none() { info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash VerifyingKey"); - let zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; + let zkbin = ZkBinary::decode(RLN2_SLASH_ZKBIN)?; let circuit = ZkCircuit::new(empty_witnesses(&zkbin).unwrap(), &zkbin); let verifyingkey = VerifyingKey::build(zkbin.k, &circuit); let mut buf = vec![]; @@ -202,12 +196,37 @@ impl IrcServer { server_store.insert("rlnv2-diff-slash-vk", buf)?; } - // Initialize RLN Incremental Merkle tree if necessary - if server_store.get("rln_identity_tree")?.is_none() { - let tree = MerkleTree::new(1); - server_store.insert("rln_identity_tree", serialize_async(&tree).await)?; + // Construct SMT from static DAG + let mut events = darkirc.event_graph.static_fetch_all().await?; + events.sort_by(|a, b| a.header.timestamp.cmp(&b.header.timestamp)); + + for event in events.iter() { + // info!("event: {}", event.id()); + let fetched_rln_commitment: Fp = match deserialize_async_partial(event.content()).await + { + Ok((v, _)) => v, + Err(e) => { + error!(target: "irc::server", "[RLN] Failed deserializing incoming RLN Identity events: {}", e); + continue + } + }; + + let commitment = vec![fetched_rln_commitment]; + let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect(); + identity_tree.insert_batch(commitment)?; } + // Set the default RLN account if any + let default_db = darkirc.sled.open_tree(format!("{}default", ACCOUNTS_DB_PREFIX))?; + let rln_identity = if !default_db.is_empty() { + let default_accnt = default_db.get(ACCOUNTS_KEY_RLN_IDENTITY)?.unwrap(); + let default_accnt = deserialize_async(&default_accnt).await.unwrap(); + info!("Default RLN account set"); + Some(default_accnt) + } else { + None + }; + let self_ = Arc::new(Self { darkirc, config_path, @@ -216,12 +235,11 @@ impl IrcServer { autojoin: RwLock::new(Vec::new()), channels: RwLock::new(HashMap::new()), contacts: RwLock::new(HashMap::new()), - rln_identity: RwLock::new(None), + rln_identity: RwLock::new(rln_identity), clients: Mutex::new(HashMap::new()), password, server_store, - rln_identity_store, - rln_signal_vk, + rln_identity_tree: RwLock::new(identity_tree), }); // Load any channel/contact configuration. @@ -251,7 +269,7 @@ impl IrcServer { let contacts = parse_configured_contacts(&contents)?; // Parse RLN identity - let rln_identity = parse_rln_identity(&contents)?; + let _rln_identity = parse_rln_identity(&contents)?; // Persist unconfigured channels (joined from client, or autojoined without config) let channels = { @@ -267,7 +285,7 @@ impl IrcServer { *self.autojoin.write().await = autojoin; *self.channels.write().await = channels; *self.contacts.write().await = contacts; - *self.rln_identity.write().await = rln_identity; + // *self.rln_identity.write().await = rln_identity; Ok(()) } @@ -306,9 +324,10 @@ impl IrcServer { // Subscribe to incoming events and set up the connection. let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; + let incoming_st = self.darkirc.event_graph.static_pub.clone().subscribe().await; if let Err(e) = self .clone() - .process_connection(stream, peer_addr, incoming, ex.clone()) + .process_connection(stream, peer_addr, incoming, incoming_st, ex.clone()) .await { error!("[IRC SERVER] Failed processing new connection: {}", e); @@ -320,9 +339,10 @@ impl IrcServer { None => { // Subscribe to incoming events and set up the connection. let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; + let incoming_st = self.darkirc.event_graph.static_pub.clone().subscribe().await; if let Err(e) = self .clone() - .process_connection(stream, peer_addr, incoming, ex.clone()) + .process_connection(stream, peer_addr, incoming, incoming_st, ex.clone()) .await { error!("[IRC SERVER] Failed processing new connection: {}", e); @@ -343,10 +363,11 @@ impl IrcServer { stream: C, peer_addr: SocketAddr, incoming: Subscription, + incoming_st: Subscription, ex: Arc>, ) -> Result<()> { let port = peer_addr.port(); - let client = Client::new(self.clone(), incoming, peer_addr).await?; + let client = Client::new(self.clone(), incoming, incoming_st, peer_addr).await?; let conn_task = StoppableTask::new(); self.clients.lock().await.insert(port, conn_task.clone()); diff --git a/bin/darkirc/src/irc/services/mod.rs b/bin/darkirc/src/irc/services/mod.rs index d07420e17..ce98ddc3e 100644 --- a/bin/darkirc/src/irc/services/mod.rs +++ b/bin/darkirc/src/irc/services/mod.rs @@ -16,8 +16,5 @@ * along with this program. If not, see . */ -/// Rate-Limit Nullifiers -pub mod rln; - /// NickServ implementation, used for account management pub mod nickserv; diff --git a/bin/darkirc/src/irc/services/nickserv.rs b/bin/darkirc/src/irc/services/nickserv.rs index 62e265f28..dfc3c86e3 100644 --- a/bin/darkirc/src/irc/services/nickserv.rs +++ b/bin/darkirc/src/irc/services/nickserv.rs @@ -16,24 +16,21 @@ * along with this program. If not, see . */ -use std::{ - str::{FromStr, SplitAsciiWhitespace}, - sync::Arc, -}; +use std::{str::SplitAsciiWhitespace, sync::Arc, time::UNIX_EPOCH}; -use darkfi::Result; -use darkfi_sdk::crypto::SecretKey; +use darkfi::{event_graph::Event, Result}; +use darkfi_sdk::{crypto::pasta_prelude::PrimeField, pasta::pallas}; use darkfi_serial::serialize_async; use smol::lock::RwLock; -use super::{ - super::{client::ReplyType, rpl::*}, - rln::RlnIdentity, +use super::super::{client::ReplyType, rpl::*}; +use crate::{ + crypto::rln::{closest_epoch, RlnIdentity, USER_MSG_LIMIT}, + IrcServer, }; -use crate::IrcServer; -const ACCOUNTS_DB_PREFIX: &str = "darkirc_account_"; -const ACCOUNTS_KEY_RLN_IDENTITY: &[u8] = b"rln_identity"; +pub const ACCOUNTS_DB_PREFIX: &str = "darkirc_account_"; +pub const ACCOUNTS_KEY_RLN_IDENTITY: &[u8] = b"rln_identity"; const NICKSERV_USAGE: &str = r#"***** NickServ Help ***** @@ -79,12 +76,14 @@ impl NickServ { let nick = self.nickname.read().await.to_string(); let mut tokens = query.split_ascii_whitespace(); + tokens.next(); let Some(command) = tokens.next() else { return Ok(vec![ReplyType::Server(( ERR_NOTEXTTOSEND, format!("{} :No text to send", nick), ))]) }; + let command = command.strip_prefix(':').unwrap(); match command.to_uppercase().as_str() { "INFO" => self.handle_info(&nick, &mut tokens).await, @@ -115,12 +114,12 @@ impl NickServ { let account_name = tokens.next(); let identity_nullifier = tokens.next(); let identity_trapdoor = tokens.next(); - let leaf_pos = tokens.next(); + let user_msg_limit = tokens.next(); if account_name.is_none() || identity_nullifier.is_none() || identity_trapdoor.is_none() || - leaf_pos.is_none() + user_msg_limit.is_none() { return Ok(vec![ ReplyType::Notice(( @@ -131,7 +130,7 @@ impl NickServ { ReplyType::Notice(( "NickServ".to_string(), nick.to_string(), - "Use `REGISTER `." + "Use `REGISTER `." .to_string(), )), ]) @@ -140,7 +139,7 @@ impl NickServ { let account_name = account_name.unwrap(); let identity_nullifier = identity_nullifier.unwrap(); let identity_trapdoor = identity_trapdoor.unwrap(); - let leaf_pos = leaf_pos.unwrap(); + let _user_msg_limit = user_msg_limit.unwrap(); // Open the sled tree let db = self @@ -157,45 +156,70 @@ impl NickServ { ))]) } - // TODO: WIF + // Open the sled tree + let db_default = + self.server.darkirc.sled.open_tree(format!("{}default", ACCOUNTS_DB_PREFIX))?; + // Parse the secrets - let identity_nullifier = match SecretKey::from_str(identity_nullifier) { - Ok(v) => v, - Err(e) => { - return Ok(vec![ReplyType::Notice(( - "NickServ".to_string(), - nick.to_string(), - format!("Invalid identity_nullifier: {}", e), - ))]) - } - }; + let nullifier_bytes = bs58::decode(identity_nullifier).into_vec()?; + let identity_nullifier = + match pallas::Base::from_repr(nullifier_bytes.try_into().unwrap()).into_option() { + Some(v) => v, + None => { + return Ok(vec![ReplyType::Notice(( + "NickServ".to_string(), + nick.to_string(), + format!("Invalid identity_nullifier"), + ))]) + } + }; - let identity_trapdoor = match SecretKey::from_str(identity_trapdoor) { - Ok(v) => v, - Err(e) => { - return Ok(vec![ReplyType::Notice(( - "NickServ".to_string(), - nick.to_string(), - format!("Invalid identity_trapdoor: {}", e), - ))]) - } - }; - - let leaf_pos = match u64::from_str(leaf_pos) { - Ok(v) => v, - Err(e) => { - return Ok(vec![ReplyType::Notice(( - "NickServ".to_string(), - nick.to_string(), - format!("Invalid leaf_pos: {}", e), - ))]) - } - }; + let trapdoor_bytes = bs58::decode(identity_trapdoor).into_vec()?; + let identity_trapdoor = + match pallas::Base::from_repr(trapdoor_bytes.try_into().unwrap()).into_option() { + Some(v) => v, + None => { + return Ok(vec![ReplyType::Notice(( + "NickServ".to_string(), + nick.to_string(), + format!("Invalid identity_trapdoor"), + ))]) + } + }; // Create a new RLN identity and insert it into the db tree - let rln_identity = - RlnIdentity { identity_nullifier, identity_trapdoor, leaf_pos: leaf_pos.into() }; - db.insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&rln_identity).await)?; + let new_rln_identity = RlnIdentity { + nullifier: identity_nullifier, + trapdoor: identity_trapdoor, + user_message_limit: USER_MSG_LIMIT, + message_id: 1, // TODO + last_epoch: closest_epoch(UNIX_EPOCH.elapsed().unwrap().as_secs()), + }; + + // Store account + db.insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&new_rln_identity).await)?; + // Set default account if not already + if db_default.is_empty() { + db_default + .insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&new_rln_identity).await)?; + } + + *self.server.rln_identity.write().await = Some(new_rln_identity); + + // Update SMT, DAG and broadcast + let mut identities_tree = self.server.rln_identity_tree.write().await; + let rln_commitment = new_rln_identity.commitment(); + // info!("register commitment: {}", rln_commitment.to_string()); + + let commitment = vec![rln_commitment]; + let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect(); + identities_tree.insert_batch(commitment)?; + // info!("root: {}", identities_tree.root().to_string()); + + let evgr = &self.server.darkirc.event_graph; + let event = Event::new_static(serialize_async(&rln_commitment).await, evgr).await; + evgr.static_insert(&event).await?; + evgr.static_broadcast(event).await?; Ok(vec![ReplyType::Notice(( "NickServ".to_string(), diff --git a/bin/darkirc/src/irc/services/rln.rs b/bin/darkirc/src/irc/services/rln.rs deleted file mode 100644 index 5bdb7583f..000000000 --- a/bin/darkirc/src/irc/services/rln.rs +++ /dev/null @@ -1,31 +0,0 @@ -/* This file is part of DarkFi (https://dark.fi) - * - * Copyright (C) 2020-2025 Dyne.org foundation - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use darkfi_sdk::{bridgetree, crypto::SecretKey}; -use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable}; - -/// Rate-Limit Nullifier account data -#[derive(Debug, Copy, Clone, SerialEncodable, SerialDecodable)] -pub struct RlnIdentity { - /// Identity nullifier secret - pub identity_nullifier: SecretKey, - /// Identity trapdoor secret - pub identity_trapdoor: SecretKey, - /// Leaf position of the identity commitment in the accounts' Merkle tree - pub leaf_pos: bridgetree::Position, -} diff --git a/bin/darkirc/src/main.rs b/bin/darkirc/src/main.rs index d86f67af4..2ea8521c7 100644 --- a/bin/darkirc/src/main.rs +++ b/bin/darkirc/src/main.rs @@ -577,6 +577,17 @@ async fn sync_task( info!("Got peer connection"); // We'll attempt to sync for ever if !skip_dag_sync { + info!("Syncing static DAG"); + match event_graph.static_sync().await { + Ok(()) => { + info!("Static synced successfully") + } + Err(e) => { + error!("Failed syncing static graph: {e}"); + p2p.stop().await; + return Err(Error::StaticDagSyncFailed) + } + } info!("Syncing event DAG"); match event_graph.sync_selected(dags_count, fast_mode).await { Ok(()) => break, diff --git a/bin/darkirc/src/settings.rs b/bin/darkirc/src/settings.rs index c11a9e6ba..6b0bc0d49 100644 --- a/bin/darkirc/src/settings.rs +++ b/bin/darkirc/src/settings.rs @@ -226,7 +226,7 @@ pub fn parse_rln_identity(data: &toml::Value) -> Result> { return Err(ParseFailed("RLN trapdoor not a string")) }; - let user_message_limit = if let Some(msglimit) = msglimit.as_float() { + let user_message_limit = if let Some(msglimit) = msglimit.as_integer() { msglimit as u64 } else { return Err(ParseFailed("RLN user message limit not a number")) diff --git a/bin/genev/genevd/src/main.rs b/bin/genev/genevd/src/main.rs index 492a59e15..5a88ed344 100644 --- a/bin/genev/genevd/src/main.rs +++ b/bin/genev/genevd/src/main.rs @@ -153,6 +153,17 @@ async fn realmain(settings: Args, executor: Arc>) -> Res info!(target: "genevd", "Waiting for some P2P connections..."); sleep(5).await; + match event_graph.static_sync().await { + Ok(()) => { + info!("static synced successfully") + } + Err(e) => { + error!("failed syncing static graph: {e}"); + p2p.stop().await; + return Err(Error::DagSyncFailed) + } + } + // We'll attempt to sync 5 times if !settings.skip_dag_sync { for i in 1..=6 { diff --git a/bin/genev/genevd/src/rpc.rs b/bin/genev/genevd/src/rpc.rs index 8056e92e7..d7ca6f0cc 100644 --- a/bin/genev/genevd/src/rpc.rs +++ b/bin/genev/genevd/src/rpc.rs @@ -219,7 +219,7 @@ impl JsonRpcInterface { error!("Failed inserting new event to DAG: {}", e); } else { // Otherwise, broadcast it - self.p2p.broadcast(&EventPut(event)).await; + self.p2p.broadcast(&EventPut(event, vec![])).await; } let json = JsonValue::Boolean(true); diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 0392c1972..cf0bd3a05 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -330,7 +330,7 @@ async fn start_sync_loop( error!(target: "taud", "Failed inserting new event to DAG: {}", e); } else { // Otherwise, broadcast it - p2p.broadcast(&EventPut(event)).await; + p2p.broadcast(&EventPut(event, vec![])).await; } } } diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml index fa3d83e7d..aa034ef64 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml @@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8890" ## Disabled RPC methods #rpc_disabled_methods = ["p2p.get_info"] +# [rln] +# nullifier = "28UWJrkAj54CjFhr7Dv2veysqtsiSH2HegjucfPPCFVH" +# trapdoor = "E8eEAnkvrdAygSG6aGKJFMNrYV7tf12WHZEETb4RnKxV" +# user_message_limit = 10 + ## P2P net settings [net] ## Connection slots diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml index 5c2cf858d..9a2ed9f66 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml @@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8891" ## Disabled RPC methods #rpc_disabled_methods = ["p2p.get_info"] +# [rln] +# nullifier = "GJJyzntzzVCuu4BwuS5SQxzaBf4JwJAsRFEZ9Poax3uu" +# trapdoor = "GJcELibk8nj8eiAxiczAH8bMWWNSUHDsCcFdqAVyzy61" +# user_message_limit = 10 + ## P2P net settings [net] ## Connection slots diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml index 5fe9093c1..4a3b9856b 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml @@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8892" ## Disabled RPC methods #rpc_disabled_methods = ["p2p.get_info"] +# [rln] +# nullifier = "H3Nuq4bCstK7629x75jJ6imw6faZUhkHx9pRL7aqTnzQ" +# trapdoor = "ArxesUcNmZ8ygz9bSJP2u8MywRkL2dwG9pPegraguddC" +# user_message_limit = 10 + ## P2P net settings [net] ## Connection slots diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml index 0a7aa4937..0616a084d 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml @@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8893" ## Disabled RPC methods #rpc_disabled_methods = ["p2p.get_info"] +# [rln] +# nullifier = "8iktbkj7FXGEKqWR6v6EAXjB2XwdQPFLZLx9YnUJYbCy" +# trapdoor = "4PP6JaUSabTuuCd9F41QA7yFFWwVwxMHjdjowfepHKxV" +# user_message_limit = 10 + ## P2P net settings [net] ## Connection slots diff --git a/src/error.rs b/src/error.rs index dc668b52b..93bf9f9d7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -552,6 +552,9 @@ pub enum Error { // ================== // Event Graph errors // ================== + #[error("Static DAG sync failed")] + StaticDagSyncFailed, + #[error("DAG sync failed")] DagSyncFailed, diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index e91937fc3..10a027324 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -48,6 +48,11 @@ impl Header { Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer } } + pub async fn new_static(event_graph: &EventGraph) -> Self { + let (layer, parents) = event_graph.get_next_layer_with_parents_static().await; + Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer } + } + pub async fn with_timestamp(timestamp: u64, event_graph: &EventGraph) -> Self { let current_dag_name = event_graph.current_genesis.read().await.header.timestamp; let (layer, parents) = event_graph.get_next_layer_with_parents(¤t_dag_name).await; @@ -150,6 +155,11 @@ impl Event { Self { header, content: data } } + pub async fn new_static(data: Vec, event_graph: &EventGraph) -> Self { + let header = Header::new_static(event_graph).await; + Self { header, content: data } + } + pub fn id(&self) -> blake3::Hash { self.header.id() } diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 7749a5c67..d3a175285 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -19,6 +19,7 @@ // use async_std::stream::from_iter; use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + io::Cursor, path::PathBuf, str::FromStr, sync::Arc, @@ -44,13 +45,18 @@ use tinyjson::JsonValue::{self}; use url::Url; use crate::{ - event_graph::util::{next_hour_timestamp, next_rotation_timestamp, replayer_log}, + event_graph::{ + proto::StaticPut, + util::{next_hour_timestamp, next_rotation_timestamp, replayer_log}, + }, net::{channel::Channel, P2pPtr}, rpc::{ jsonrpc::{JsonResponse, JsonResult}, util::json_map, }, system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, + zk::{empty_witnesses, VerifyingKey, ZkCircuit}, + zkas::ZkBinary, Error, Result, }; @@ -326,6 +332,8 @@ pub struct EventGraph { p2p: P2pPtr, /// Sled tree containing the headers dag_store: RwLock, + /// Static DAG + static_dag: sled::Tree, /// Replay logs path. datastore: PathBuf, /// Run in replay_mode where if set we log Sled DB instructions @@ -342,6 +350,9 @@ pub struct EventGraph { /// Event publisher, this notifies whenever an event is /// inserted into the DAG pub event_pub: PublisherPtr, + /// Static Event publisher, this notifies whenever a static event is + /// inserted into the static DAG + pub static_pub: PublisherPtr, /// Current genesis event pub current_genesis: RwLock, /// Currently configured DAG rotation, in hours @@ -352,9 +363,10 @@ pub struct EventGraph { pub deg_enabled: RwLock, /// The publisher for which we can give deg info over deg_publisher: PublisherPtr, - /// Run in replay_mode where if set we log Sled DB instructions - /// into `datastore`, useful to reacreate a faulty DAG to debug. + /// Run in fast mode where if set we sync only headers. fast_mode: bool, + /// Signaling verify key + signal_vk: VerifyingKey, } impl EventGraph { @@ -379,8 +391,30 @@ impl EventGraph { hours_rotation: u64, ex: Arc>, ) -> Result { + // Read or build signal verifying key + let signal_zkbin = include_bytes!("proofs/rlnv2-diff-signal.zk.bin"); + let signal_zkbin = ZkBinary::decode(signal_zkbin).unwrap(); + let signal_empty_circuit = + ZkCircuit::new(empty_witnesses(&signal_zkbin).unwrap(), &signal_zkbin); + + let signal_vk = match sled_db.get("rlnv2-diff-signal-vk")? { + Some(vk) => { + let mut reader = Cursor::new(vk); + VerifyingKey::read(&mut reader, signal_empty_circuit)? + } + None => { + info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey"); + let verifyingkey = VerifyingKey::build(signal_zkbin.k, &signal_empty_circuit); + let mut buf = vec![]; + verifyingkey.write(&mut buf)?; + sled_db.insert("rlnv2-diff-signal-vk", buf)?; + verifyingkey + } + }; + let broadcasted_ids = RwLock::new(HashSet::new()); let event_pub = Publisher::new(); + let static_pub = Publisher::new(); // Create the current genesis event based on the `hours_rotation` let current_genesis = generate_genesis(hours_rotation); @@ -390,23 +424,28 @@ impl EventGraph { header_dags: BTreeMap::default(), main_dags: BTreeMap::default(), } - .new(sled_db, hours_rotation) + .new(sled_db.clone(), hours_rotation) .await; + let static_dag = Self::static_new(sled_db).await?; + let self_ = Arc::new(Self { p2p, dag_store: RwLock::new(dag_store.clone()), + static_dag, datastore, replay_mode, fast_mode, broadcasted_ids, prune_task: OnceCell::new(), event_pub, + static_pub, current_genesis: RwLock::new(current_genesis.clone()), hours_rotation, synced: RwLock::new(false), deg_enabled: RwLock::new(false), deg_publisher: Publisher::new(), + signal_vk, }); // Check if we have it in our DAG. @@ -992,6 +1031,60 @@ impl EventGraph { (next_layer, parents) } + /// Find the unreferenced tips in the current DAG state, mapped by their layers. + async fn find_unreferenced_tips_static(&self, dag: &sled::Tree) -> LayerUTips { + // First get all the event IDs + let mut tips = HashSet::new(); + for iter_elem in dag.iter() { + let (id, _) = iter_elem.unwrap(); + let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + tips.insert(id); + } + // Iterate again to find unreferenced IDs + for iter_elem in dag.iter() { + let (_, event) = iter_elem.unwrap(); + let event: Event = deserialize_async(&event).await.unwrap(); + for parent in event.header.parents.iter() { + tips.remove(parent); + } + } + // Build the layers map + let mut map: LayerUTips = BTreeMap::new(); + for tip in tips { + let event = self.static_fetch(&tip).await.unwrap().unwrap(); + if let Some(layer_tips) = map.get_mut(&event.header.layer) { + layer_tips.insert(tip); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(tip); + map.insert(event.header.layer, layer_tips); + } + } + + map + } + + async fn get_next_layer_with_parents_static(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) { + let unreferenced_tips = self.find_unreferenced_tips_static(&self.static_dag).await; + + let mut parents = [NULL_ID; N_EVENT_PARENTS]; + let mut index = 0; + 'outer: for (_, tips) in unreferenced_tips.iter().rev() { + for tip in tips.iter() { + parents[index] = *tip; + index += 1; + if index >= N_EVENT_PARENTS { + break 'outer; + } + } + } + + let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1; + + assert!(parents.iter().any(|x| x != &NULL_ID)); + (next_layer, parents) + } + /// Internal function used for DAG sorting. async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> { let mut vec_tips = vec![]; @@ -1156,6 +1249,123 @@ impl EventGraph { Ok(result) } + + pub async fn static_new(sled_db: sled::Db) -> Result { + let static_dag = sled_db.open_tree("static-dag")?; + + let genesis = generate_genesis(0); + let mut overlay = SledTreeOverlay::new(&static_dag); + + let event_se = serialize_async(&genesis).await; + + // Add the event to the overlay + overlay.insert(genesis.id().as_bytes(), &event_se).unwrap(); + + // Aggregate changes into a single batch + let batch = match overlay.aggregate() { + Some(b) => b, + None => return Ok(static_dag), + }; + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = static_dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + + Ok(static_dag) + } + + pub async fn static_sync(&self) -> Result<()> { + self.dag_sync(self.static_dag.clone(), false).await?; + Ok(()) + } + + pub async fn static_broadcast(&self, event: Event) -> Result<()> { + self.p2p.broadcast(&StaticPut(event)).await; + Ok(()) + } + + pub async fn static_insert(&self, event: &Event) -> Result<()> { + let mut overlay = SledTreeOverlay::new(&self.static_dag); + + let event_se = serialize_async(event).await; + + // Add the event to the overlay + overlay.insert(event.id().as_bytes(), &event_se).unwrap(); + + // Aggregate changes into a single batch + let batch = match overlay.aggregate() { + Some(b) => b, + None => return Ok(()), + }; + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = self.static_dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + + // Send out notifications about the new event + self.static_pub.notify(event.clone()).await; + + Ok(()) + } + + pub async fn static_fetch(&self, event_id: &Hash) -> Result> { + let Some(bytes) = self.static_dag.get(event_id.as_bytes())? else { + return Ok(None); + }; + let event: Event = deserialize_async(&bytes).await?; + + return Ok(Some(event)) + } + + pub async fn static_fetch_all(&self) -> Result> { + let mut events = vec![]; + for iter_elem in self.static_dag.iter() { + let (id, _) = iter_elem.unwrap(); + let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + let event = self.static_fetch(&id).await?.unwrap(); + if event.header.parents == NULL_PARENTS { + continue + } + events.push(event); + } + Ok(events) + } + + pub async fn static_unreferenced_tips(&self) -> LayerUTips { + // First get all the event IDs + let mut tips = HashSet::new(); + for iter_elem in self.static_dag.iter() { + let (id, _) = iter_elem.unwrap(); + let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + tips.insert(id); + } + // Iterate again to find unreferenced IDs + for iter_elem in self.static_dag.iter() { + let (_, event) = iter_elem.unwrap(); + let event: Event = deserialize_async(&event).await.unwrap(); + for parent in event.header.parents.iter() { + tips.remove(parent); + } + } + // Build the layers map + let mut map: LayerUTips = BTreeMap::new(); + for tip in tips { + let event = self.static_fetch(&tip).await.unwrap().unwrap(); + if let Some(layer_tips) = map.get_mut(&event.header.layer) { + layer_tips.insert(tip); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(tip); + map.insert(event.header.layer, layer_tips); + } + } + + map + } } async fn request_header( diff --git a/src/event_graph/proofs/rlnv2-diff-signal.zk b/src/event_graph/proofs/rlnv2-diff-signal.zk new file mode 100644 index 000000000..e84f1d823 --- /dev/null +++ b/src/event_graph/proofs/rlnv2-diff-signal.zk @@ -0,0 +1,48 @@ +k = 14; +field = "pallas"; + +constant "Rlnv2Diff_Signal" {} + +witness "Rlnv2Diff_Signal" { + Base identity_nullifier, + Base identity_trapdoor, + Base user_message_limit, + + # Inclusion proof, the leaf is the identity_commitment + SparseMerklePath path, + + # The message hash + Base x, + Base message_id, + + Base epoch, +} + +circuit "Rlnv2Diff_Signal" { + # Identity inclusion proof + identity_secret = poseidon_hash(identity_nullifier, identity_trapdoor); + identity_secret_hash = poseidon_hash(identity_secret, user_message_limit); + identity_commitment = poseidon_hash(identity_secret_hash); + root = sparse_merkle_root(identity_commitment, path, identity_commitment); + constrain_instance(root); + + # External nullifier is created from epoch and app identifier + app_id = witness_base(1000); + external_nullifier = poseidon_hash(epoch, app_id); + constrain_instance(external_nullifier); + + # Calculating internal nullifier + # a_0 = identity_secret_hash + a_0 = poseidon_hash(identity_nullifier, identity_trapdoor); + a_1 = poseidon_hash(a_0, external_nullifier, message_id); + x_a_1 = base_mul(x, a_1); + y = base_add(a_0, x_a_1); + constrain_instance(x); + constrain_instance(y); + + # Constrain message_id to be lower than actual message limit. + less_than_strict(message_id, user_message_limit); + + internal_nullifier = poseidon_hash(a_1); + constrain_instance(internal_nullifier); +} diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index 0fcf0ced3..7257ad396 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -25,8 +25,11 @@ use std::{ }, }; -use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable}; -use log::{debug, error, trace, warn}; +use darkfi_sdk::pasta::pallas; +use darkfi_serial::{ + async_trait, deserialize_async, deserialize_async_partial, SerialDecodable, SerialEncodable, +}; +use log::{debug, error, info, trace, warn}; use smol::Executor; use super::{event::Header, Event, EventGraphPtr, LayerUTips, NULL_ID, NULL_PARENTS}; @@ -39,6 +42,7 @@ use crate::{ }, system::msleep, util::time::NanoTimestamp, + zk::Proof, Error, Result, }; @@ -105,6 +109,8 @@ pub struct ProtocolEventGraph { event_graph: EventGraphPtr, /// `MessageSubscriber` for `EventPut` ev_put_sub: MessageSubscription, + /// `MessageSubscriber` for `StaticPut` + st_put_sub: MessageSubscription, /// `MessageSubscriber` for `EventReq` ev_req_sub: MessageSubscription, /// `MessageSubscriber` for `EventRep` @@ -132,9 +138,15 @@ pub struct ProtocolEventGraph { /// A P2P message representing publishing an event on the network #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct EventPut(pub Event); +pub struct EventPut(pub Event, pub Vec); impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION); +/// A P2P message representing publishing an event of a static graph +/// (most likely RLN_identities) on the network +#[derive(Clone, SerialEncodable, SerialDecodable)] +pub struct StaticPut(pub Event); +impl_p2p_message!(StaticPut, "EventGraph::StaticPut", 0, 0, DEFAULT_METERING_CONFIGURATION); + /// A P2P message representing an event request #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct EventReq(pub Vec); @@ -175,6 +187,7 @@ impl ProtocolBase for ProtocolEventGraph { async fn start(self: Arc, ex: Arc>) -> Result<()> { self.jobsman.clone().start(ex.clone()); self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_static_put(), ex.clone()).await; self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await; @@ -193,6 +206,7 @@ impl ProtocolEventGraph { pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result { let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; @@ -202,6 +216,7 @@ impl ProtocolEventGraph { msg_subsystem.add_dispatch::().await; let ev_put_sub = channel.subscribe_msg::().await?; + let st_put_sub = channel.subscribe_msg::().await?; let ev_req_sub = channel.subscribe_msg::().await?; let ev_rep_sub = channel.subscribe_msg::().await?; let _hdr_put_sub = channel.subscribe_msg::().await?; @@ -216,6 +231,7 @@ impl ProtocolEventGraph { channel: channel.clone(), event_graph, ev_put_sub, + st_put_sub, ev_req_sub, ev_rep_sub, _hdr_put_sub, @@ -258,8 +274,8 @@ impl ProtocolEventGraph { let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME); loop { - let event = match self.ev_put_sub.receive().await { - Ok(v) => v.0.clone(), + let (event, blob) = match self.ev_put_sub.receive().await { + Ok(v) => (v.0.clone(), v.1.clone()), Err(_) => continue, }; trace!( @@ -276,9 +292,57 @@ impl ProtocolEventGraph { continue } + //////////////////// + let mut verification_failed = false; + #[allow(clippy::never_loop)] + loop { + let (proof, public_inputs): (Proof, Vec) = + match deserialize_async_partial(&blob).await { + Ok((v, _)) => v, + Err(e) => { + error!(target: "event_graph::protocol::handle_event_put()","[EVENTGRAPH] Failed deserializing event ephemeral data: {}", e); + break + } + }; + + if public_inputs.len() != 2 { + error!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Received event has the wrong number of public inputs"); + verification_failed = true; + break + } + + // info!("public_inputs: {:?}", public_inputs); + + info!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Verifying incoming Event RLN proof"); + // let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); + // let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]); + // let x = hash_event(event); + // let y = public_inputs[0]; + // let internal_nullifier = public_inputs[1]; + + // verification_failed = + // proof.verify(&self.event_graph.signal_vk, &public_inputs).is_err(); + verification_failed = + match proof.verify(&self.event_graph.signal_vk, &public_inputs) { + Ok(()) => false, + Err(e) => { + error!("error: {e}"); + true + } + }; + break + } + + if verification_failed { + error!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Incoming Event proof verification failed"); + continue + } + //////////////////// + // If we have already seen the event, we'll stay quiet. let current_genesis = self.event_graph.current_genesis.read().await; - let dag_name = current_genesis.header.timestamp.to_string(); + let genesis_timestamp = current_genesis.header.timestamp; + let dag_name = genesis_timestamp.to_string(); let hdr_tree_name = format!("headers_{dag_name}"); let event_id = event.id(); if self @@ -318,7 +382,6 @@ impl ProtocolEventGraph { // The genesis event marks the last time the Dag has been pruned of old // events. The pruning interval is defined by the days_rotation field // of [`EventGraph`]. - let genesis_timestamp = self.event_graph.current_genesis.read().await.header.timestamp; if event.header.timestamp < genesis_timestamp { debug!( target: "event_graph::protocol::handle_event_put()", @@ -512,7 +575,87 @@ impl ProtocolEventGraph { continue } - self.broadcaster_push.send(EventPut(event)).await.expect("push broadcaster closed"); + self.broadcaster_push + .send(EventPut(event, blob)) + .await + .expect("push broadcaster closed"); + } + } + + async fn handle_static_put(self: Arc) -> Result<()> { + // Rolling window of event timestamps on this channel + let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME); + + loop { + let event = match self.st_put_sub.receive().await { + Ok(v) => v.0.clone(), + Err(_) => continue, + }; + trace!( + target: "event_graph::protocol::handle_event_put()", + "Got EventPut: {} [{}]", event.id(), self.channel.address(), + ); + + info!("Received a static event: {:?}", event); + + // Check if node has finished syncing its DAG + if !*self.event_graph.synced.read().await { + debug!( + target: "event_graph::protocol::handle_event_put", + "DAG is still syncing, skipping..." + ); + continue + } + + let event_id = event.id(); + if self.event_graph.static_dag.contains_key(event_id.as_bytes())? { + debug!( + target: "event_graph::protocol::handle_static_put()", + "Event {} is already known", event_id, + ); + continue + } + + // Check if event's parents are in the static DAG + for parent in event.header.parents.iter() { + if *parent == NULL_ID { + continue + } + if !self.event_graph.static_dag.contains_key(parent.as_bytes())? { + debug!( + target: "event_graph::protocol::handle_static_put()", + "Event {} is orphan", event_id, + ); + return Err(Error::EventNotFound("Event is orphan".to_owned())) + } + } + + // There's a new unique event. + // Apply ban logic to stop network floods. + bantimes.ticktock(); + if bantimes.count() > WINDOW_MAXSIZE { + self.channel.ban().await; + // This error is actually unused. We could return Ok here too. + return Err(Error::MaliciousFlood) + } + + // Validate the new event first. If we do not consider it valid, we + // will just drop it and stay quiet. If the malicious threshold + // is reached, we will stop the connection. + if !event.validate_new() { + self.clone().increase_malicious_count().await?; + continue + } + + // At this point, this is a new event to us. Let's see if we + // have all of its parents. + debug!( + target: "event_graph::protocol::handle_event_put()", + "Event {} is new", event_id, + ); + + self.event_graph.static_insert(&event).await?; + self.event_graph.static_broadcast(event).await? } } @@ -700,11 +843,20 @@ impl ProtocolEventGraph { // We received a tip request. Let's find them, add them to // our bcast ids list, and reply with them. - let dag_timestamp = u64::from_str(&dag_name)?; - let store = self.event_graph.dag_store.read().await; - let (_, layers) = match store.header_dags.get(&dag_timestamp) { - Some(v) => v, - None => continue, + let layers = match dag_name.as_str() { + "static-dag" => { + let tips = self.event_graph.static_unreferenced_tips().await; + &tips.clone() + } + _ => { + let dag_timestamp = u64::from_str(&dag_name)?; + let store = self.event_graph.dag_store.read().await; + let (_, layers) = match store.header_dags.get(&dag_timestamp) { + Some(v) => v, + None => continue, + }; + &layers.clone() + } }; // let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await; let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; diff --git a/src/event_graph/tests.rs b/src/event_graph/tests.rs index a69f7fb44..7c320f68c 100644 --- a/src/event_graph/tests.rs +++ b/src/event_graph/tests.rs @@ -248,7 +248,7 @@ async fn eventgraph_propagation_real(ex: Arc>) { drop(store); drop(current_genesis); info!("Broadcasting event {}", event_id); - random_node.p2p.broadcast(&EventPut(event)).await; + random_node.p2p.broadcast(&EventPut(event, vec![])).await; info!("Waiting 5s for event propagation"); sleep(5).await; @@ -292,7 +292,7 @@ async fn eventgraph_propagation_real(ex: Arc>) { info!("Broadcasting event {}", event2_id); info!("Event chain: {:#?}", event_chain); - random_node.p2p.broadcast(&EventPut(event2)).await; + random_node.p2p.broadcast(&EventPut(event2, vec![])).await; info!("Waiting 5s for event propagation"); sleep(5).await; @@ -310,19 +310,19 @@ async fn eventgraph_propagation_real(ex: Arc>) { let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await; node1.header_dag_insert(vec![event0_1.header.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event0_1.clone()], &dag_name).await.unwrap(); - node1.p2p.broadcast(&EventPut(event0_1)).await; + node1.p2p.broadcast(&EventPut(event0_1, vec![])).await; msleep(300).await; let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await; node1.header_dag_insert(vec![event1_1.header.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event1_1.clone()], &dag_name).await.unwrap(); - node1.p2p.broadcast(&EventPut(event1_1)).await; + node1.p2p.broadcast(&EventPut(event1_1, vec![])).await; msleep(300).await; let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await; node1.header_dag_insert(vec![event2_1.header.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event2_1.clone()], &dag_name).await.unwrap(); - node1.p2p.broadcast(&EventPut(event2_1)).await; + node1.p2p.broadcast(&EventPut(event2_1, vec![])).await; msleep(300).await; // ======= @@ -332,19 +332,19 @@ async fn eventgraph_propagation_real(ex: Arc>) { let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await; node2.header_dag_insert(vec![event0_2.header.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event0_2.clone()], &dag_name).await.unwrap(); - node2.p2p.broadcast(&EventPut(event0_2)).await; + node2.p2p.broadcast(&EventPut(event0_2, vec![])).await; msleep(300).await; let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await; node2.header_dag_insert(vec![event1_2.header.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event1_2.clone()], &dag_name).await.unwrap(); - node2.p2p.broadcast(&EventPut(event1_2)).await; + node2.p2p.broadcast(&EventPut(event1_2, vec![])).await; msleep(300).await; let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await; node2.header_dag_insert(vec![event2_2.header.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event2_2.clone()], &dag_name).await.unwrap(); - node2.p2p.broadcast(&EventPut(event2_2)).await; + node2.p2p.broadcast(&EventPut(event2_2, vec![])).await; msleep(300).await; // ======= @@ -354,19 +354,19 @@ async fn eventgraph_propagation_real(ex: Arc>) { let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await; node3.header_dag_insert(vec![event0_3.header.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event0_3.clone()], &dag_name).await.unwrap(); - node3.p2p.broadcast(&EventPut(event0_3)).await; + node3.p2p.broadcast(&EventPut(event0_3, vec![])).await; msleep(300).await; let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await; node3.header_dag_insert(vec![event1_3.header.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event1_3.clone()], &dag_name).await.unwrap(); - node3.p2p.broadcast(&EventPut(event1_3)).await; + node3.p2p.broadcast(&EventPut(event1_3, vec![])).await; msleep(300).await; let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await; node3.header_dag_insert(vec![event2_3.header.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event2_3.clone()], &dag_name).await.unwrap(); - node3.p2p.broadcast(&EventPut(event2_3)).await; + node3.p2p.broadcast(&EventPut(event2_3, vec![])).await; msleep(300).await; // ///// @@ -464,7 +464,7 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc>) { let dag_name = current_genesis.header.timestamp.to_string(); random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap(); random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap(); - random_node.p2p.broadcast(&EventPut(event)).await; + random_node.p2p.broadcast(&EventPut(event, vec![])).await; } info!("Waiting 5s for events propagation"); sleep(5).await;