[WIP] add static DAG and implement RLN

This commit is contained in:
dasman
2025-12-09 04:05:00 +03:00
parent b5b37a0201
commit 9dcbb2671d
23 changed files with 878 additions and 312 deletions

View File

@@ -1,44 +1,48 @@
k = 13; k = 14;
field = "pallas"; field = "pallas";
constant "RlnV2_Diff_Signal" {} constant "Rlnv2Diff_Signal" {}
witness "RlnV2_Diff_Signal" { witness "Rlnv2Diff_Signal" {
Base identity_nullifier, Base identity_nullifier,
Base identity_trapdoor, Base identity_trapdoor,
Base user_message_limit,
MerklePath identity_path, # Inclusion proof, the leaf is the identity_commitment
Uint32 identity_leaf_pos, SparseMerklePath path,
Base x, # The message hash # The message hash
Base external_nullifier, # Hash(Epoch, RLN identifier) Base x,
Base message_id,
Base message_id, Base epoch,
Base user_message_limit,
Base epoch,
} }
circuit "RlnV2_Diff_Signal" { circuit "Rlnv2Diff_Signal" {
constrain_instance(epoch); # Identity inclusion proof
constrain_instance(external_nullifier); identity_secret = poseidon_hash(identity_nullifier, identity_trapdoor);
identity_secret_hash = poseidon_hash(identity_secret, user_message_limit);
identity_commitment = poseidon_hash(identity_secret_hash);
root = sparse_merkle_root(identity_commitment, path, identity_commitment);
constrain_instance(root);
less_than_strict(message_id, user_message_limit); # External nullifier is created from epoch and app identifier
app_id = witness_base(1000);
external_nullifier = poseidon_hash(epoch, app_id);
constrain_instance(external_nullifier);
# Identity secret hash # Calculating internal nullifier
a_0 = poseidon_hash(identity_nullifier, identity_trapdoor); # a_0 = identity_secret_hash
a_1 = poseidon_hash(a_0, external_nullifier, message_id); a_0 = poseidon_hash(identity_nullifier, identity_trapdoor);
a_1 = poseidon_hash(a_0, external_nullifier, message_id);
x_a_1 = base_mul(x, a_1);
y = base_add(a_0, x_a_1);
constrain_instance(x);
constrain_instance(y);
# y = a_0 + x * a_1 # Constrain message_id to be lower than actual message limit.
x_a_1 = base_mul(x, a_1); less_than_strict(message_id, user_message_limit);
y = base_add(a_0, x_a_1);
constrain_instance(x);
constrain_instance(y);
internal_nullifier = poseidon_hash(a_1); internal_nullifier = poseidon_hash(a_1);
constrain_instance(internal_nullifier); constrain_instance(internal_nullifier);
identity_commitment = poseidon_hash(a_0, user_message_limit);
root = merkle_root(identity_leaf_pos, identity_path, identity_commitment);
constrain_instance(root);
} }

View File

@@ -28,10 +28,10 @@ use darkfi::{
Result, Result,
}; };
use darkfi_sdk::{ use darkfi_sdk::{
bridgetree::Position, crypto::{pasta_prelude::FromUniformBytes, poseidon_hash, smt::SmtMemoryFp},
crypto::{pasta_prelude::FromUniformBytes, poseidon_hash, MerkleTree},
pasta::pallas, pasta::pallas,
}; };
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
use log::info; use log::info;
use rand::{rngs::OsRng, CryptoRng, RngCore}; use rand::{rngs::OsRng, CryptoRng, RngCore};
@@ -47,6 +47,9 @@ pub const RLN_EPOCH_LEN: u64 = 600; // 10 min
pub const RLN2_SIGNAL_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-signal.zk.bin"); pub const RLN2_SIGNAL_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-signal.zk.bin");
pub const RLN2_SLASH_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-slash.zk.bin"); pub const RLN2_SLASH_ZKBIN: &[u8] = include_bytes!("../../proof/rlnv2-diff-slash.zk.bin");
/// TODO: this should be configurable
pub const USER_MSG_LIMIT: u64 = 6;
/// Find closest epoch to given timestamp /// Find closest epoch to given timestamp
pub fn closest_epoch(timestamp: u64) -> u64 { pub fn closest_epoch(timestamp: u64) -> u64 {
let time_diff = timestamp - RLN_GENESIS; let time_diff = timestamp - RLN_GENESIS;
@@ -62,7 +65,7 @@ pub fn hash_event(event: &Event) -> pallas::Base {
pallas::Base::from_uniform_bytes(&buf) pallas::Base::from_uniform_bytes(&buf)
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone, SerialEncodable, SerialDecodable)]
pub struct RlnIdentity { pub struct RlnIdentity {
pub nullifier: pallas::Base, pub nullifier: pallas::Base,
pub trapdoor: pallas::Base, pub trapdoor: pallas::Base,
@@ -81,25 +84,52 @@ impl RlnIdentity {
pallas::Base::random(&mut rng), pallas::Base::random(&mut rng),
]), ]),
trapdoor: poseidon_hash([RLN_TRAPDOOR_DERIVATION_PATH, pallas::Base::random(&mut rng)]), trapdoor: poseidon_hash([RLN_TRAPDOOR_DERIVATION_PATH, pallas::Base::random(&mut rng)]),
user_message_limit: 100, user_message_limit: USER_MSG_LIMIT,
message_id: 1, message_id: 1,
last_epoch: closest_epoch(UNIX_EPOCH.elapsed().unwrap().as_secs()), last_epoch: closest_epoch(UNIX_EPOCH.elapsed().unwrap().as_secs()),
} }
} }
pub fn commitment(&self) -> pallas::Base { pub fn commitment(&self) -> pallas::Base {
poseidon_hash([ let identity_secret = poseidon_hash([self.nullifier, self.trapdoor]);
poseidon_hash([self.nullifier, self.trapdoor]), let identity_secret_hash = poseidon_hash([identity_secret, self.user_message_limit.into()]);
pallas::Base::from(self.user_message_limit),
]) poseidon_hash([identity_secret_hash])
} }
// pub fn _create_register_proof(
// &self,
// event: &Event,
// identity_tree: &mut SmtMemoryFp,
// register_pk: &ProvingKey,
// ) -> Result<Proof> {
// let witnesses = vec![
// Witness::Base(Value::known(self.nullifier)),
// Witness::Base(Value::known(self.trapdoor)),
// Witness::Base(Value::known(pallas::Base::from(self.user_message_limit))),
// ];
// let public_inputs = vec![self.commitment(), pallas::Base::from(self.user_message_limit)];
// info!(target: "crypto::rln::create_register_proof", "[RLN] Creating register proof for event {}", event.header.id());
// let register_zkbin = ZkBinary::decode(RLN2_REGISTER_ZKBIN)?;
// let register_circuit = ZkCircuit::new(witnesses, &register_zkbin);
// let proof =
// Proof::create(&register_pk, &[register_circuit], &public_inputs, &mut OsRng).unwrap();
// let leaf = vec![self.commitment()];
// let leaf: Vec<_> = leaf.into_iter().map(|l| (l, l)).collect();
// // TODO: Recipients should verify that identity doesn't exist already before insert.
// identity_tree.insert_batch(leaf.clone()).unwrap(); // leaf == pos
// Ok(proof)
// }
pub fn create_signal_proof( pub fn create_signal_proof(
&self, &self,
event: &Event, event: &Event,
identity_tree: &MerkleTree, identity_tree: &SmtMemoryFp,
identity_pos: Position, signal_pk: &ProvingKey,
proving_key: &ProvingKey,
) -> Result<(Proof, Vec<pallas::Base>)> { ) -> Result<(Proof, Vec<pallas::Base>)> {
// 1. Construct share // 1. Construct share
let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); let epoch = pallas::Base::from(closest_epoch(event.header.timestamp));
@@ -112,32 +142,32 @@ impl RlnIdentity {
let internal_nullifier = poseidon_hash([a_1]); let internal_nullifier = poseidon_hash([a_1]);
// 2. Create Merkle proof // 2. Inclusion proof
let identity_root = identity_tree.root(0).unwrap(); let identity_root = identity_tree.root();
let identity_path = identity_tree.witness(identity_pos, 0).unwrap(); let identity_path = identity_tree.prove_membership(&self.commitment());
// TODO: Delete me later
assert!(identity_path.verify(&identity_root, &self.commitment(), &self.commitment()));
// 3. Create ZK proof // 3. Create ZK proof
let witnesses = vec![ let witnesses = vec![
Witness::Base(Value::known(self.nullifier)), Witness::Base(Value::known(self.nullifier)),
Witness::Base(Value::known(self.trapdoor)), Witness::Base(Value::known(self.trapdoor)),
Witness::MerklePath(Value::known(identity_path.clone().try_into().unwrap())),
Witness::Uint32(Value::known(u64::from(identity_pos).try_into().unwrap())),
Witness::Base(Value::known(x)),
Witness::Base(Value::known(external_nullifier)),
Witness::Base(Value::known(message_id)),
Witness::Base(Value::known(pallas::Base::from(self.user_message_limit))), Witness::Base(Value::known(pallas::Base::from(self.user_message_limit))),
Witness::SparseMerklePath(Value::known(identity_path.path)),
Witness::Base(Value::known(x)),
Witness::Base(Value::known(message_id)),
Witness::Base(Value::known(epoch)), Witness::Base(Value::known(epoch)),
]; ];
let public_inputs = let public_inputs = vec![identity_root, external_nullifier, x, y, internal_nullifier];
vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()];
info!(target: "crypto::rln::create_proof", "[RLN] Creating proof for event {}", event.header.id()); info!(target: "crypto::rln::create_signal_proof", "[RLN] Creating signal proof for event {}", event.header.id());
let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?;
let signal_circuit = ZkCircuit::new(witnesses, &signal_zkbin); let signal_circuit = ZkCircuit::new(witnesses, &signal_zkbin);
let proof = Proof::create(proving_key, &[signal_circuit], &public_inputs, &mut OsRng)?; let proof = Proof::create(signal_pk, &[signal_circuit], &public_inputs, &mut OsRng)?;
Ok((proof, vec![y, internal_nullifier])) // Ok((proof, vec![y, internal_nullifier]))
Ok((proof, public_inputs))
} }
} }

View File

@@ -28,16 +28,15 @@ use std::{
use darkfi::{ use darkfi::{
event_graph::{proto::EventPut, Event, NULL_ID}, event_graph::{proto::EventPut, Event, NULL_ID},
system::Subscription, system::Subscription,
zk::{empty_witnesses, Proof, ProvingKey, ZkCircuit}, zk::{empty_witnesses, Proof, ProvingKey, VerifyingKey, ZkCircuit},
zkas::ZkBinary, zkas::ZkBinary,
Error, Result, Error, Result,
}; };
use darkfi_sdk::{ use darkfi_sdk::{
bridgetree::Position, crypto::util::FieldElemAsStr,
crypto::{pasta_prelude::PrimeField, poseidon_hash, MerkleTree}, pasta::{pallas, Fp},
pasta::pallas,
}; };
use darkfi_serial::{deserialize_async, serialize_async}; use darkfi_serial::{deserialize_async_partial, serialize_async};
use futures::FutureExt; use futures::FutureExt;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use sled_overlay::sled; use sled_overlay::sled;
@@ -52,9 +51,7 @@ use super::{
server::{IrcServer, MAX_MSG_LEN}, server::{IrcServer, MAX_MSG_LEN},
Msg, NickServ, OldPrivmsg, SERVER_NAME, Msg, NickServ, OldPrivmsg, SERVER_NAME,
}; };
use crate::crypto::rln::{ use crate::crypto::rln::{closest_epoch, RlnIdentity, RLN2_SIGNAL_ZKBIN};
closest_epoch, hash_event, RlnIdentity, RLN2_SIGNAL_ZKBIN, RLN_APP_IDENTIFIER,
};
const PENALTY_LIMIT: usize = 5; const PENALTY_LIMIT: usize = 5;
@@ -78,6 +75,8 @@ pub struct Client {
pub server: Arc<IrcServer>, pub server: Arc<IrcServer>,
/// Subscription for incoming events /// Subscription for incoming events
pub incoming: Subscription<Event>, pub incoming: Subscription<Event>,
/// Subscription for incoming static events
pub incoming_st: Subscription<Event>,
/// Client socket addr /// Client socket addr
pub addr: SocketAddr, pub addr: SocketAddr,
/// ID of the last sent event /// ID of the last sent event
@@ -114,6 +113,7 @@ impl Client {
pub async fn new( pub async fn new(
server: Arc<IrcServer>, server: Arc<IrcServer>,
incoming: Subscription<Event>, incoming: Subscription<Event>,
incoming_st: Subscription<Event>,
addr: SocketAddr, addr: SocketAddr,
) -> Result<Self> { ) -> Result<Self> {
let caps = let caps =
@@ -125,6 +125,7 @@ impl Client {
Ok(Self { Ok(Self {
server: server.clone(), server: server.clone(),
incoming, incoming,
incoming_st,
addr, addr,
last_sent: RwLock::new(NULL_ID), last_sent: RwLock::new(NULL_ID),
channels: RwLock::new(HashSet::new()), channels: RwLock::new(HashSet::new()),
@@ -168,18 +169,21 @@ impl Client {
if let Ok(0) = r { if let Ok(0) = r {
error!("[IRC CLIENT] Read failed for {}: Client disconnected", self.addr); error!("[IRC CLIENT] Read failed for {}: Client disconnected", self.addr);
self.incoming.unsubscribe().await; self.incoming.unsubscribe().await;
self.incoming_st.unsubscribe().await;
return Err(Error::ChannelStopped) return Err(Error::ChannelStopped)
} }
// If something failed during reading, we disconnect. // If something failed during reading, we disconnect.
if let Err(e) = r { if let Err(e) = r {
error!("[IRC CLIENT] Read failed for {}: {}", self.addr, e); error!("[IRC CLIENT] Read failed for {}: {}", self.addr, e);
self.incoming.unsubscribe().await; self.incoming.unsubscribe().await;
self.incoming_st.unsubscribe().await;
return Err(Error::ChannelStopped) return Err(Error::ChannelStopped)
} }
// If the penalty limit is reached, disconnect the client. // If the penalty limit is reached, disconnect the client.
if self.penalty.load(SeqCst) == PENALTY_LIMIT { if self.penalty.load(SeqCst) == PENALTY_LIMIT {
self.incoming.unsubscribe().await; self.incoming.unsubscribe().await;
self.incoming_st.unsubscribe().await;
return Err(Error::ChannelStopped) return Err(Error::ChannelStopped)
} }
@@ -221,7 +225,7 @@ impl Client {
rln_identity.message_id += 1; rln_identity.message_id += 1;
let (_proof, _public_inputs) = match self.create_rln_signal_proof(&rln_identity, &event).await { let (proof, public_inputs) = match self.create_rln_signal_proof(&rln_identity, &event).await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
// TODO: Send a message to the IRC client telling that sending went wrong // TODO: Send a message to the IRC client telling that sending went wrong
@@ -231,10 +235,44 @@ impl Client {
} }
}; };
self.server.darkirc.p2p.broadcast(&EventPut(event)).await; ///////// Temporary /////////
let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?;
let signal_circuit = ZkCircuit::new(empty_witnesses(&signal_zkbin)?, &signal_zkbin);
let Some(signal_vk) = self.server.server_store.get("rlnv2-diff-signal-vk")? else {
return Err(Error::DatabaseError(
"RLN signal verifying key not found in server store".to_string(),
))
};
let mut reader = Cursor::new(signal_vk);
let signal_vk = VerifyingKey::read(&mut reader, signal_circuit)?;
// let epoch = pallas::Base::from(closest_epoch(event.header.timestamp));
// let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]);
// let x = hash_event(&event);
// let y = public_inputs[0];
// let internal_nullifier = public_inputs[1];
// // Fetch SMT root
// let identity_tree = self.server.rln_identity_tree.read().await;
// let identity_root = identity_tree.root();
// let public_inputs =
// vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root];
// Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)?
match proof.verify(&signal_vk, &public_inputs) {
Ok(_) => info!("it works"),
Err(e) => panic!("it doesn't work because {e}")
}
/////////////////////////////
let blob = serialize_async(&(proof, public_inputs)).await;
self.server.darkirc.p2p.broadcast(&EventPut(event, blob)).await;
} else { } else {
// Broadcast it // Broadcast it
self.server.darkirc.p2p.broadcast(&EventPut(event)).await; self.server.darkirc.p2p.broadcast(&EventPut(event, vec![])).await;
} }
} }
} }
@@ -246,6 +284,7 @@ impl Client {
// If we got an error, we disconnect the client. // If we got an error, we disconnect the client.
Err(e) => { Err(e) => {
self.incoming.unsubscribe().await; self.incoming.unsubscribe().await;
self.incoming_st.unsubscribe().await;
return Err(e) return Err(e)
} }
} }
@@ -261,6 +300,7 @@ impl Client {
// <file:./command.rs::async fn get_history(&self, channels: &HashSet<String>) -> Result<Vec<ReplyType>> {> // <file:./command.rs::async fn get_history(&self, channels: &HashSet<String>) -> Result<Vec<ReplyType>> {>
// for which the logic for delivery should be kept in sync // for which the logic for delivery should be kept in sync
r = self.incoming.receive().fuse() => { r = self.incoming.receive().fuse() => {
info!("incoming: {:?}", r);
// We will skip this if it's our own message. // We will skip this if it's our own message.
let event_id = r.header.id(); let event_id = r.header.id();
if *self.last_sent.read().await == event_id { if *self.last_sent.read().await == event_id {
@@ -277,49 +317,6 @@ impl Client {
} }
} }
// If the Event contains an appended blob of data, try to check if it's
// a RLN Signal proof and verify it.
//if false {
let mut verification_failed = false;
#[allow(clippy::never_loop)]
loop {
let (event, blob) = (r.clone(), vec![0,1,2]);
let (proof, public_inputs): (Proof, Vec<pallas::Base>) = match deserialize_async(&blob).await {
Ok(v) => v,
Err(_) => {
// TODO: FIXME: This logic should be better written.
// Right now we don't enforce RLN so we can just fall-through.
//error!("[IRC CLIENT] Failed deserializing event ephemeral data: {}", e);
break
}
};
if public_inputs.len() != 2 {
error!("[IRC CLIENT] Received event has the wrong number of public inputs");
verification_failed = true;
break
}
info!("[IRC CLIENT] Verifying incoming Event RLN proof");
if self.verify_rln_signal_proof(
&event,
proof,
[public_inputs[0], public_inputs[1]],
).await.is_err() {
verification_failed = true;
break
}
// TODO: Store for secret shares recovery
info!("[IRC CLIENT] RLN verification successful");
break
}
if verification_failed {
error!("[IRC CLIENT] Incoming Event proof verification failed");
continue
}
// Try to deserialize the `Event`'s content into a `Privmsg` // Try to deserialize the `Event`'s content into a `Privmsg`
let mut privmsg = match Msg::deserialize(r.content()).await { let mut privmsg = match Msg::deserialize(r.content()).await {
Ok(Msg::V1(old_msg)) => old_msg.into_new(), Ok(Msg::V1(old_msg)) => old_msg.into_new(),
@@ -381,6 +378,50 @@ impl Client {
return Err(e) return Err(e)
} }
} }
// Process message from the network. These should only be RLN identities.
r = self.incoming_st.receive().fuse() => {
// We will skip this if it's our own message.
let event_id = r.header.id();
if *self.last_sent.read().await == event_id {
continue
}
// If this event was seen, skip it
match self.is_seen(&event_id).await {
Ok(true) => continue,
Ok(false) => {},
Err(e) => {
error!("[IRC CLIENT] (multiplex_connection) self.is_seen({}) failed: {}", event_id, e);
return Err(e)
}
}
// Update SMT
let mut identities_tree = self.server.rln_identity_tree.write().await;
let fetched_rln_commitment: Fp = match deserialize_async_partial(r.content()).await
{
Ok((v, _)) => v,
Err(e) => {
error!(target: "irc::server", "[RLN] Failed deserializing incoming RLN Identity events: {}", e);
continue
}
};
info!("multiplex commitment: {}",fetched_rln_commitment.to_string());
let commitment = vec![fetched_rln_commitment];
let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect();
identities_tree.insert_batch(commitment)?;
info!("multiplex root: {}",identities_tree.root().to_string());
// Mark the message as seen for this USER
if let Err(e) = self.mark_seen(&event_id).await {
error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e);
return Err(e)
}
}
} }
} }
} }
@@ -582,31 +623,46 @@ impl Client {
Ok(db.contains_key(event_id.as_bytes())?) Ok(db.contains_key(event_id.as_bytes())?)
} }
/// Abstraction for RLN register proof creation
// async fn _create_rln_register_proof(
// &self,
// rln_identity: &RlnIdentity,
// event: &Event,
// ) -> Result<Proof> {
// let mut identity_tree = self.server.rln_identity_tree.write().await;
// // Retrieve the ZK proving key from the db
// let register_zkbin = ZkBinary::decode(RLN2_REGISTER_ZKBIN)?;
// let register_circuit = ZkCircuit::new(empty_witnesses(&register_zkbin)?, &register_zkbin);
// let Some(register_pk) = self.server.server_store.get("rlnv2-diff-register-pk")? else {
// return Err(Error::DatabaseError(
// "RLN register proving key not found in server store".to_string(),
// ))
// };
// let mut reader = Cursor::new(register_pk);
// let register_pk = ProvingKey::read(&mut reader, register_circuit)?;
// rln_identity._create_register_proof(event, &mut identity_tree, &register_pk)
// }
// /// Abstraction for RLN register proof verification
// async fn _verify_rln_register_proof(
// &self,
// _event: &Event,
// proof: Proof,
// public_inputs: [pallas::Base; 2],
// register_vk: VerifyingKey,
// ) -> Result<()> {
// Ok(proof.verify(&register_vk, &public_inputs)?)
// }
/// Abstraction for RLN signal proof creation /// Abstraction for RLN signal proof creation
async fn create_rln_signal_proof( async fn create_rln_signal_proof(
&self, &self,
rln_identity: &RlnIdentity, rln_identity: &RlnIdentity,
event: &Event, event: &Event,
) -> Result<(Proof, Vec<pallas::Base>)> { ) -> Result<(Proof, Vec<pallas::Base>)> {
let identity_commitment = rln_identity.commitment(); let identity_tree = self.server.rln_identity_tree.read().await;
// Fetch the commitment's leaf position in the Merkle tree
let Some(identity_pos) =
self.server.rln_identity_store.get(identity_commitment.to_repr())?
else {
return Err(Error::DatabaseError(
"Identity not found in commitment tree store".to_string(),
))
};
let identity_pos: Position = deserialize_async(&identity_pos).await?;
// Fetch the latest commitment Merkle tree
let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else {
return Err(Error::DatabaseError(
"RLN Identity tree not found in server store".to_string(),
))
};
let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?;
// Retrieve the ZK proving key from the db // Retrieve the ZK proving key from the db
let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; let signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?;
@@ -619,34 +675,34 @@ impl Client {
let mut reader = Cursor::new(proving_key); let mut reader = Cursor::new(proving_key);
let proving_key = ProvingKey::read(&mut reader, signal_circuit)?; let proving_key = ProvingKey::read(&mut reader, signal_circuit)?;
rln_identity.create_signal_proof(event, &identity_tree, identity_pos, &proving_key) rln_identity.create_signal_proof(event, &identity_tree, &proving_key)
} }
/// Abstraction for RLN signal proof verification // /// Abstraction for RLN signal proof verification
async fn verify_rln_signal_proof( // async fn _verify_rln_signal_proof(
&self, // &self,
event: &Event, // event: &Event,
proof: Proof, // proof: Proof,
public_inputs: [pallas::Base; 2], // public_inputs: [pallas::Base; 2],
) -> Result<()> { // ) -> Result<()> {
let epoch = pallas::Base::from(closest_epoch(event.header.timestamp)); // let epoch = pallas::Base::from(closest_epoch(event.header.timestamp));
let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]); // let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]);
let x = hash_event(event); // let x = hash_event(event);
let y = public_inputs[0]; // let y = public_inputs[0];
let internal_nullifier = public_inputs[1]; // let internal_nullifier = public_inputs[1];
// Fetch the latest commitment Merkle tree // // Fetch the latest commitment Merkle tree
let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else { // let Some(identity_tree) = self.server.server_store.get("rln_identity_tree")? else {
return Err(Error::DatabaseError( // return Err(Error::DatabaseError(
"RLN Identity tree not found in server store".to_string(), // "RLN Identity tree not found in server store".to_string(),
)) // ))
}; // };
let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?; // let identity_tree: MerkleTree = deserialize_async(&identity_tree).await?;
let identity_root = identity_tree.root(0).unwrap(); // let identity_root = identity_tree.root(0).unwrap();
let public_inputs = // let public_inputs =
vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()]; // vec![epoch, external_nullifier, x, y, internal_nullifier, identity_root.inner()];
Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)?) // Ok(proof.verify(&self.server.rln_signal_vk, &public_inputs)?)
} // }
} }

View File

@@ -706,7 +706,7 @@ impl Client {
// Handle queries to NickServ // Handle queries to NickServ
if target.to_lowercase().as_str() == "nickserv" { if target.to_lowercase().as_str() == "nickserv" {
return self.nickserv.handle_query(message.strip_prefix(':').unwrap()).await return self.nickserv.handle_query(args).await
} }
// If it's a DM and we don't have an encryption key, we will // If it's a DM and we don't have an encryption key, we will

View File

@@ -16,13 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::{ use std::{collections::HashMap, fs::File, io::BufReader, path::PathBuf, sync::Arc};
collections::HashMap,
fs::File,
io::{BufReader, Cursor},
path::PathBuf,
sync::Arc,
};
use darkfi::{ use darkfi::{
event_graph::Event, event_graph::Event,
@@ -32,8 +26,11 @@ use darkfi::{
zkas::ZkBinary, zkas::ZkBinary,
Error, Result, Error, Result,
}; };
use darkfi_sdk::crypto::MerkleTree; use darkfi_sdk::{
use darkfi_serial::serialize_async; crypto::smt::{MemoryStorageFp, PoseidonFp, SmtMemoryFp, EMPTY_NODES_FP},
pasta::Fp,
};
use darkfi_serial::{deserialize_async, deserialize_async_partial};
use futures_rustls::{ use futures_rustls::{
rustls::{self, pki_types::PrivateKeyDer}, rustls::{self, pki_types::PrivateKeyDer},
TlsAcceptor, TlsAcceptor,
@@ -49,7 +46,11 @@ use smol::{
}; };
use url::Url; use url::Url;
use super::{client::Client, IrcChannel, IrcContact, Priv, Privmsg}; use super::{
client::Client,
services::nickserv::{ACCOUNTS_DB_PREFIX, ACCOUNTS_KEY_RLN_IDENTITY},
IrcChannel, IrcContact, Priv, Privmsg,
};
use crate::{ use crate::{
crypto::{ crypto::{
rln::{RlnIdentity, RLN2_SIGNAL_ZKBIN, RLN2_SLASH_ZKBIN}, rln::{RlnIdentity, RLN2_SIGNAL_ZKBIN, RLN2_SLASH_ZKBIN},
@@ -93,9 +94,7 @@ pub struct IrcServer {
/// Persistent server storage /// Persistent server storage
pub server_store: sled::Tree, pub server_store: sled::Tree,
/// RLN identity storage /// RLN identity storage
pub rln_identity_store: sled::Tree, pub rln_identity_tree: RwLock<SmtMemoryFp>,
/// RLN Signal VerifyingKey
pub rln_signal_vk: VerifyingKey,
} }
impl IrcServer { impl IrcServer {
@@ -152,7 +151,9 @@ impl IrcServer {
// Open persistent dbs // Open persistent dbs
let server_store = darkirc.sled.open_tree("server_store")?; let server_store = darkirc.sled.open_tree("server_store")?;
let rln_identity_store = darkirc.sled.open_tree("rln_identity_store")?; let hasher = PoseidonFp::new();
let store = MemoryStorageFp::new();
let mut identity_tree = SmtMemoryFp::new(store, hasher.clone(), &EMPTY_NODES_FP);
// Generate RLN proving and verifying keys, if needed // Generate RLN proving and verifying keys, if needed
let rln_signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; let rln_signal_zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?;
@@ -167,20 +168,13 @@ impl IrcServer {
server_store.insert("rlnv2-diff-signal-pk", buf)?; server_store.insert("rlnv2-diff-signal-pk", buf)?;
} }
let rln_signal_vk = match server_store.get("rlnv2-diff-signal-vk")? { if server_store.get("rlnv2-diff-signal-vk")?.is_none() {
Some(vk) => { info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey");
let mut reader = Cursor::new(vk); let verifyingkey = VerifyingKey::build(rln_signal_zkbin.k, &rln_signal_circuit);
VerifyingKey::read(&mut reader, rln_signal_circuit)? let mut buf = vec![];
} verifyingkey.write(&mut buf)?;
None => { server_store.insert("rlnv2-diff-signal-vk", buf)?;
info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey"); }
let verifyingkey = VerifyingKey::build(rln_signal_zkbin.k, &rln_signal_circuit);
let mut buf = vec![];
verifyingkey.write(&mut buf)?;
server_store.insert("rlnv2-diff-signal-vk", buf)?;
verifyingkey
}
};
if server_store.get("rlnv2-diff-slash-pk")?.is_none() { if server_store.get("rlnv2-diff-slash-pk")?.is_none() {
info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash ProvingKey"); info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash ProvingKey");
@@ -194,7 +188,7 @@ impl IrcServer {
if server_store.get("rlnv2-diff-slash-vk")?.is_none() { if server_store.get("rlnv2-diff-slash-vk")?.is_none() {
info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash VerifyingKey"); info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Slash VerifyingKey");
let zkbin = ZkBinary::decode(RLN2_SIGNAL_ZKBIN)?; let zkbin = ZkBinary::decode(RLN2_SLASH_ZKBIN)?;
let circuit = ZkCircuit::new(empty_witnesses(&zkbin).unwrap(), &zkbin); let circuit = ZkCircuit::new(empty_witnesses(&zkbin).unwrap(), &zkbin);
let verifyingkey = VerifyingKey::build(zkbin.k, &circuit); let verifyingkey = VerifyingKey::build(zkbin.k, &circuit);
let mut buf = vec![]; let mut buf = vec![];
@@ -202,12 +196,37 @@ impl IrcServer {
server_store.insert("rlnv2-diff-slash-vk", buf)?; server_store.insert("rlnv2-diff-slash-vk", buf)?;
} }
// Initialize RLN Incremental Merkle tree if necessary // Construct SMT from static DAG
if server_store.get("rln_identity_tree")?.is_none() { let mut events = darkirc.event_graph.static_fetch_all().await?;
let tree = MerkleTree::new(1); events.sort_by(|a, b| a.header.timestamp.cmp(&b.header.timestamp));
server_store.insert("rln_identity_tree", serialize_async(&tree).await)?;
for event in events.iter() {
// info!("event: {}", event.id());
let fetched_rln_commitment: Fp = match deserialize_async_partial(event.content()).await
{
Ok((v, _)) => v,
Err(e) => {
error!(target: "irc::server", "[RLN] Failed deserializing incoming RLN Identity events: {}", e);
continue
}
};
let commitment = vec![fetched_rln_commitment];
let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect();
identity_tree.insert_batch(commitment)?;
} }
// Set the default RLN account if any
let default_db = darkirc.sled.open_tree(format!("{}default", ACCOUNTS_DB_PREFIX))?;
let rln_identity = if !default_db.is_empty() {
let default_accnt = default_db.get(ACCOUNTS_KEY_RLN_IDENTITY)?.unwrap();
let default_accnt = deserialize_async(&default_accnt).await.unwrap();
info!("Default RLN account set");
Some(default_accnt)
} else {
None
};
let self_ = Arc::new(Self { let self_ = Arc::new(Self {
darkirc, darkirc,
config_path, config_path,
@@ -216,12 +235,11 @@ impl IrcServer {
autojoin: RwLock::new(Vec::new()), autojoin: RwLock::new(Vec::new()),
channels: RwLock::new(HashMap::new()), channels: RwLock::new(HashMap::new()),
contacts: RwLock::new(HashMap::new()), contacts: RwLock::new(HashMap::new()),
rln_identity: RwLock::new(None), rln_identity: RwLock::new(rln_identity),
clients: Mutex::new(HashMap::new()), clients: Mutex::new(HashMap::new()),
password, password,
server_store, server_store,
rln_identity_store, rln_identity_tree: RwLock::new(identity_tree),
rln_signal_vk,
}); });
// Load any channel/contact configuration. // Load any channel/contact configuration.
@@ -251,7 +269,7 @@ impl IrcServer {
let contacts = parse_configured_contacts(&contents)?; let contacts = parse_configured_contacts(&contents)?;
// Parse RLN identity // Parse RLN identity
let rln_identity = parse_rln_identity(&contents)?; let _rln_identity = parse_rln_identity(&contents)?;
// Persist unconfigured channels (joined from client, or autojoined without config) // Persist unconfigured channels (joined from client, or autojoined without config)
let channels = { let channels = {
@@ -267,7 +285,7 @@ impl IrcServer {
*self.autojoin.write().await = autojoin; *self.autojoin.write().await = autojoin;
*self.channels.write().await = channels; *self.channels.write().await = channels;
*self.contacts.write().await = contacts; *self.contacts.write().await = contacts;
*self.rln_identity.write().await = rln_identity; // *self.rln_identity.write().await = rln_identity;
Ok(()) Ok(())
} }
@@ -306,9 +324,10 @@ impl IrcServer {
// Subscribe to incoming events and set up the connection. // Subscribe to incoming events and set up the connection.
let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await;
let incoming_st = self.darkirc.event_graph.static_pub.clone().subscribe().await;
if let Err(e) = self if let Err(e) = self
.clone() .clone()
.process_connection(stream, peer_addr, incoming, ex.clone()) .process_connection(stream, peer_addr, incoming, incoming_st, ex.clone())
.await .await
{ {
error!("[IRC SERVER] Failed processing new connection: {}", e); error!("[IRC SERVER] Failed processing new connection: {}", e);
@@ -320,9 +339,10 @@ impl IrcServer {
None => { None => {
// Subscribe to incoming events and set up the connection. // Subscribe to incoming events and set up the connection.
let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await;
let incoming_st = self.darkirc.event_graph.static_pub.clone().subscribe().await;
if let Err(e) = self if let Err(e) = self
.clone() .clone()
.process_connection(stream, peer_addr, incoming, ex.clone()) .process_connection(stream, peer_addr, incoming, incoming_st, ex.clone())
.await .await
{ {
error!("[IRC SERVER] Failed processing new connection: {}", e); error!("[IRC SERVER] Failed processing new connection: {}", e);
@@ -343,10 +363,11 @@ impl IrcServer {
stream: C, stream: C,
peer_addr: SocketAddr, peer_addr: SocketAddr,
incoming: Subscription<Event>, incoming: Subscription<Event>,
incoming_st: Subscription<Event>,
ex: Arc<Executor<'_>>, ex: Arc<Executor<'_>>,
) -> Result<()> { ) -> Result<()> {
let port = peer_addr.port(); let port = peer_addr.port();
let client = Client::new(self.clone(), incoming, peer_addr).await?; let client = Client::new(self.clone(), incoming, incoming_st, peer_addr).await?;
let conn_task = StoppableTask::new(); let conn_task = StoppableTask::new();
self.clients.lock().await.insert(port, conn_task.clone()); self.clients.lock().await.insert(port, conn_task.clone());

View File

@@ -16,8 +16,5 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
/// Rate-Limit Nullifiers
pub mod rln;
/// NickServ implementation, used for account management /// NickServ implementation, used for account management
pub mod nickserv; pub mod nickserv;

View File

@@ -16,24 +16,21 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::{ use std::{str::SplitAsciiWhitespace, sync::Arc, time::UNIX_EPOCH};
str::{FromStr, SplitAsciiWhitespace},
sync::Arc,
};
use darkfi::Result; use darkfi::{event_graph::Event, Result};
use darkfi_sdk::crypto::SecretKey; use darkfi_sdk::{crypto::pasta_prelude::PrimeField, pasta::pallas};
use darkfi_serial::serialize_async; use darkfi_serial::serialize_async;
use smol::lock::RwLock; use smol::lock::RwLock;
use super::{ use super::super::{client::ReplyType, rpl::*};
super::{client::ReplyType, rpl::*}, use crate::{
rln::RlnIdentity, crypto::rln::{closest_epoch, RlnIdentity, USER_MSG_LIMIT},
IrcServer,
}; };
use crate::IrcServer;
const ACCOUNTS_DB_PREFIX: &str = "darkirc_account_"; pub const ACCOUNTS_DB_PREFIX: &str = "darkirc_account_";
const ACCOUNTS_KEY_RLN_IDENTITY: &[u8] = b"rln_identity"; pub const ACCOUNTS_KEY_RLN_IDENTITY: &[u8] = b"rln_identity";
const NICKSERV_USAGE: &str = r#"***** NickServ Help ***** const NICKSERV_USAGE: &str = r#"***** NickServ Help *****
@@ -79,12 +76,14 @@ impl NickServ {
let nick = self.nickname.read().await.to_string(); let nick = self.nickname.read().await.to_string();
let mut tokens = query.split_ascii_whitespace(); let mut tokens = query.split_ascii_whitespace();
tokens.next();
let Some(command) = tokens.next() else { let Some(command) = tokens.next() else {
return Ok(vec![ReplyType::Server(( return Ok(vec![ReplyType::Server((
ERR_NOTEXTTOSEND, ERR_NOTEXTTOSEND,
format!("{} :No text to send", nick), format!("{} :No text to send", nick),
))]) ))])
}; };
let command = command.strip_prefix(':').unwrap();
match command.to_uppercase().as_str() { match command.to_uppercase().as_str() {
"INFO" => self.handle_info(&nick, &mut tokens).await, "INFO" => self.handle_info(&nick, &mut tokens).await,
@@ -115,12 +114,12 @@ impl NickServ {
let account_name = tokens.next(); let account_name = tokens.next();
let identity_nullifier = tokens.next(); let identity_nullifier = tokens.next();
let identity_trapdoor = tokens.next(); let identity_trapdoor = tokens.next();
let leaf_pos = tokens.next(); let user_msg_limit = tokens.next();
if account_name.is_none() || if account_name.is_none() ||
identity_nullifier.is_none() || identity_nullifier.is_none() ||
identity_trapdoor.is_none() || identity_trapdoor.is_none() ||
leaf_pos.is_none() user_msg_limit.is_none()
{ {
return Ok(vec![ return Ok(vec![
ReplyType::Notice(( ReplyType::Notice((
@@ -131,7 +130,7 @@ impl NickServ {
ReplyType::Notice(( ReplyType::Notice((
"NickServ".to_string(), "NickServ".to_string(),
nick.to_string(), nick.to_string(),
"Use `REGISTER <account_name> <identity_nullifier> <identity_trapdoor> <leaf_pos>`." "Use `REGISTER <account_name> <identity_nullifier> <identity_trapdoor> <user_msg_limit>`."
.to_string(), .to_string(),
)), )),
]) ])
@@ -140,7 +139,7 @@ impl NickServ {
let account_name = account_name.unwrap(); let account_name = account_name.unwrap();
let identity_nullifier = identity_nullifier.unwrap(); let identity_nullifier = identity_nullifier.unwrap();
let identity_trapdoor = identity_trapdoor.unwrap(); let identity_trapdoor = identity_trapdoor.unwrap();
let leaf_pos = leaf_pos.unwrap(); let _user_msg_limit = user_msg_limit.unwrap();
// Open the sled tree // Open the sled tree
let db = self let db = self
@@ -157,45 +156,70 @@ impl NickServ {
))]) ))])
} }
// TODO: WIF // Open the sled tree
let db_default =
self.server.darkirc.sled.open_tree(format!("{}default", ACCOUNTS_DB_PREFIX))?;
// Parse the secrets // Parse the secrets
let identity_nullifier = match SecretKey::from_str(identity_nullifier) { let nullifier_bytes = bs58::decode(identity_nullifier).into_vec()?;
Ok(v) => v, let identity_nullifier =
Err(e) => { match pallas::Base::from_repr(nullifier_bytes.try_into().unwrap()).into_option() {
return Ok(vec![ReplyType::Notice(( Some(v) => v,
"NickServ".to_string(), None => {
nick.to_string(), return Ok(vec![ReplyType::Notice((
format!("Invalid identity_nullifier: {}", e), "NickServ".to_string(),
))]) nick.to_string(),
} format!("Invalid identity_nullifier"),
}; ))])
}
};
let identity_trapdoor = match SecretKey::from_str(identity_trapdoor) { let trapdoor_bytes = bs58::decode(identity_trapdoor).into_vec()?;
Ok(v) => v, let identity_trapdoor =
Err(e) => { match pallas::Base::from_repr(trapdoor_bytes.try_into().unwrap()).into_option() {
return Ok(vec![ReplyType::Notice(( Some(v) => v,
"NickServ".to_string(), None => {
nick.to_string(), return Ok(vec![ReplyType::Notice((
format!("Invalid identity_trapdoor: {}", e), "NickServ".to_string(),
))]) nick.to_string(),
} format!("Invalid identity_trapdoor"),
}; ))])
}
let leaf_pos = match u64::from_str(leaf_pos) { };
Ok(v) => v,
Err(e) => {
return Ok(vec![ReplyType::Notice((
"NickServ".to_string(),
nick.to_string(),
format!("Invalid leaf_pos: {}", e),
))])
}
};
// Create a new RLN identity and insert it into the db tree // Create a new RLN identity and insert it into the db tree
let rln_identity = let new_rln_identity = RlnIdentity {
RlnIdentity { identity_nullifier, identity_trapdoor, leaf_pos: leaf_pos.into() }; nullifier: identity_nullifier,
db.insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&rln_identity).await)?; trapdoor: identity_trapdoor,
user_message_limit: USER_MSG_LIMIT,
message_id: 1, // TODO
last_epoch: closest_epoch(UNIX_EPOCH.elapsed().unwrap().as_secs()),
};
// Store account
db.insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&new_rln_identity).await)?;
// Set default account if not already
if db_default.is_empty() {
db_default
.insert(ACCOUNTS_KEY_RLN_IDENTITY, serialize_async(&new_rln_identity).await)?;
}
*self.server.rln_identity.write().await = Some(new_rln_identity);
// Update SMT, DAG and broadcast
let mut identities_tree = self.server.rln_identity_tree.write().await;
let rln_commitment = new_rln_identity.commitment();
// info!("register commitment: {}", rln_commitment.to_string());
let commitment = vec![rln_commitment];
let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect();
identities_tree.insert_batch(commitment)?;
// info!("root: {}", identities_tree.root().to_string());
let evgr = &self.server.darkirc.event_graph;
let event = Event::new_static(serialize_async(&rln_commitment).await, evgr).await;
evgr.static_insert(&event).await?;
evgr.static_broadcast(event).await?;
Ok(vec![ReplyType::Notice(( Ok(vec![ReplyType::Notice((
"NickServ".to_string(), "NickServ".to_string(),

View File

@@ -1,31 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi_sdk::{bridgetree, crypto::SecretKey};
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
/// Rate-Limit Nullifier account data
#[derive(Debug, Copy, Clone, SerialEncodable, SerialDecodable)]
pub struct RlnIdentity {
/// Identity nullifier secret
pub identity_nullifier: SecretKey,
/// Identity trapdoor secret
pub identity_trapdoor: SecretKey,
/// Leaf position of the identity commitment in the accounts' Merkle tree
pub leaf_pos: bridgetree::Position,
}

View File

@@ -577,6 +577,17 @@ async fn sync_task(
info!("Got peer connection"); info!("Got peer connection");
// We'll attempt to sync for ever // We'll attempt to sync for ever
if !skip_dag_sync { if !skip_dag_sync {
info!("Syncing static DAG");
match event_graph.static_sync().await {
Ok(()) => {
info!("Static synced successfully")
}
Err(e) => {
error!("Failed syncing static graph: {e}");
p2p.stop().await;
return Err(Error::StaticDagSyncFailed)
}
}
info!("Syncing event DAG"); info!("Syncing event DAG");
match event_graph.sync_selected(dags_count, fast_mode).await { match event_graph.sync_selected(dags_count, fast_mode).await {
Ok(()) => break, Ok(()) => break,

View File

@@ -226,7 +226,7 @@ pub fn parse_rln_identity(data: &toml::Value) -> Result<Option<RlnIdentity>> {
return Err(ParseFailed("RLN trapdoor not a string")) return Err(ParseFailed("RLN trapdoor not a string"))
}; };
let user_message_limit = if let Some(msglimit) = msglimit.as_float() { let user_message_limit = if let Some(msglimit) = msglimit.as_integer() {
msglimit as u64 msglimit as u64
} else { } else {
return Err(ParseFailed("RLN user message limit not a number")) return Err(ParseFailed("RLN user message limit not a number"))

View File

@@ -153,6 +153,17 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
info!(target: "genevd", "Waiting for some P2P connections..."); info!(target: "genevd", "Waiting for some P2P connections...");
sleep(5).await; sleep(5).await;
match event_graph.static_sync().await {
Ok(()) => {
info!("static synced successfully")
}
Err(e) => {
error!("failed syncing static graph: {e}");
p2p.stop().await;
return Err(Error::DagSyncFailed)
}
}
// We'll attempt to sync 5 times // We'll attempt to sync 5 times
if !settings.skip_dag_sync { if !settings.skip_dag_sync {
for i in 1..=6 { for i in 1..=6 {

View File

@@ -219,7 +219,7 @@ impl JsonRpcInterface {
error!("Failed inserting new event to DAG: {}", e); error!("Failed inserting new event to DAG: {}", e);
} else { } else {
// Otherwise, broadcast it // Otherwise, broadcast it
self.p2p.broadcast(&EventPut(event)).await; self.p2p.broadcast(&EventPut(event, vec![])).await;
} }
let json = JsonValue::Boolean(true); let json = JsonValue::Boolean(true);

View File

@@ -330,7 +330,7 @@ async fn start_sync_loop(
error!(target: "taud", "Failed inserting new event to DAG: {}", e); error!(target: "taud", "Failed inserting new event to DAG: {}", e);
} else { } else {
// Otherwise, broadcast it // Otherwise, broadcast it
p2p.broadcast(&EventPut(event)).await; p2p.broadcast(&EventPut(event, vec![])).await;
} }
} }
} }

View File

@@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8890"
## Disabled RPC methods ## Disabled RPC methods
#rpc_disabled_methods = ["p2p.get_info"] #rpc_disabled_methods = ["p2p.get_info"]
# [rln]
# nullifier = "28UWJrkAj54CjFhr7Dv2veysqtsiSH2HegjucfPPCFVH"
# trapdoor = "E8eEAnkvrdAygSG6aGKJFMNrYV7tf12WHZEETb4RnKxV"
# user_message_limit = 10
## P2P net settings ## P2P net settings
[net] [net]
## Connection slots ## Connection slots

View File

@@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8891"
## Disabled RPC methods ## Disabled RPC methods
#rpc_disabled_methods = ["p2p.get_info"] #rpc_disabled_methods = ["p2p.get_info"]
# [rln]
# nullifier = "GJJyzntzzVCuu4BwuS5SQxzaBf4JwJAsRFEZ9Poax3uu"
# trapdoor = "GJcELibk8nj8eiAxiczAH8bMWWNSUHDsCcFdqAVyzy61"
# user_message_limit = 10
## P2P net settings ## P2P net settings
[net] [net]
## Connection slots ## Connection slots

View File

@@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8892"
## Disabled RPC methods ## Disabled RPC methods
#rpc_disabled_methods = ["p2p.get_info"] #rpc_disabled_methods = ["p2p.get_info"]
# [rln]
# nullifier = "H3Nuq4bCstK7629x75jJ6imw6faZUhkHx9pRL7aqTnzQ"
# trapdoor = "ArxesUcNmZ8ygz9bSJP2u8MywRkL2dwG9pPegraguddC"
# user_message_limit = 10
## P2P net settings ## P2P net settings
[net] [net]
## Connection slots ## Connection slots

View File

@@ -29,6 +29,11 @@ rpc_listen = "tcp://127.0.0.1:8893"
## Disabled RPC methods ## Disabled RPC methods
#rpc_disabled_methods = ["p2p.get_info"] #rpc_disabled_methods = ["p2p.get_info"]
# [rln]
# nullifier = "8iktbkj7FXGEKqWR6v6EAXjB2XwdQPFLZLx9YnUJYbCy"
# trapdoor = "4PP6JaUSabTuuCd9F41QA7yFFWwVwxMHjdjowfepHKxV"
# user_message_limit = 10
## P2P net settings ## P2P net settings
[net] [net]
## Connection slots ## Connection slots

View File

@@ -552,6 +552,9 @@ pub enum Error {
// ================== // ==================
// Event Graph errors // Event Graph errors
// ================== // ==================
#[error("Static DAG sync failed")]
StaticDagSyncFailed,
#[error("DAG sync failed")] #[error("DAG sync failed")]
DagSyncFailed, DagSyncFailed,

View File

@@ -48,6 +48,11 @@ impl Header {
Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer } Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer }
} }
pub async fn new_static(event_graph: &EventGraph) -> Self {
let (layer, parents) = event_graph.get_next_layer_with_parents_static().await;
Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer }
}
pub async fn with_timestamp(timestamp: u64, event_graph: &EventGraph) -> Self { pub async fn with_timestamp(timestamp: u64, event_graph: &EventGraph) -> Self {
let current_dag_name = event_graph.current_genesis.read().await.header.timestamp; let current_dag_name = event_graph.current_genesis.read().await.header.timestamp;
let (layer, parents) = event_graph.get_next_layer_with_parents(&current_dag_name).await; let (layer, parents) = event_graph.get_next_layer_with_parents(&current_dag_name).await;
@@ -150,6 +155,11 @@ impl Event {
Self { header, content: data } Self { header, content: data }
} }
pub async fn new_static(data: Vec<u8>, event_graph: &EventGraph) -> Self {
let header = Header::new_static(event_graph).await;
Self { header, content: data }
}
pub fn id(&self) -> blake3::Hash { pub fn id(&self) -> blake3::Hash {
self.header.id() self.header.id()
} }

View File

@@ -19,6 +19,7 @@
// use async_std::stream::from_iter; // use async_std::stream::from_iter;
use std::{ use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
io::Cursor,
path::PathBuf, path::PathBuf,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
@@ -44,13 +45,18 @@ use tinyjson::JsonValue::{self};
use url::Url; use url::Url;
use crate::{ use crate::{
event_graph::util::{next_hour_timestamp, next_rotation_timestamp, replayer_log}, event_graph::{
proto::StaticPut,
util::{next_hour_timestamp, next_rotation_timestamp, replayer_log},
},
net::{channel::Channel, P2pPtr}, net::{channel::Channel, P2pPtr},
rpc::{ rpc::{
jsonrpc::{JsonResponse, JsonResult}, jsonrpc::{JsonResponse, JsonResult},
util::json_map, util::json_map,
}, },
system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
zk::{empty_witnesses, VerifyingKey, ZkCircuit},
zkas::ZkBinary,
Error, Result, Error, Result,
}; };
@@ -326,6 +332,8 @@ pub struct EventGraph {
p2p: P2pPtr, p2p: P2pPtr,
/// Sled tree containing the headers /// Sled tree containing the headers
dag_store: RwLock<DAGStore>, dag_store: RwLock<DAGStore>,
/// Static DAG
static_dag: sled::Tree,
/// Replay logs path. /// Replay logs path.
datastore: PathBuf, datastore: PathBuf,
/// Run in replay_mode where if set we log Sled DB instructions /// Run in replay_mode where if set we log Sled DB instructions
@@ -342,6 +350,9 @@ pub struct EventGraph {
/// Event publisher, this notifies whenever an event is /// Event publisher, this notifies whenever an event is
/// inserted into the DAG /// inserted into the DAG
pub event_pub: PublisherPtr<Event>, pub event_pub: PublisherPtr<Event>,
/// Static Event publisher, this notifies whenever a static event is
/// inserted into the static DAG
pub static_pub: PublisherPtr<Event>,
/// Current genesis event /// Current genesis event
pub current_genesis: RwLock<Event>, pub current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in hours /// Currently configured DAG rotation, in hours
@@ -352,9 +363,10 @@ pub struct EventGraph {
pub deg_enabled: RwLock<bool>, pub deg_enabled: RwLock<bool>,
/// The publisher for which we can give deg info over /// The publisher for which we can give deg info over
deg_publisher: PublisherPtr<DegEvent>, deg_publisher: PublisherPtr<DegEvent>,
/// Run in replay_mode where if set we log Sled DB instructions /// Run in fast mode where if set we sync only headers.
/// into `datastore`, useful to reacreate a faulty DAG to debug.
fast_mode: bool, fast_mode: bool,
/// Signaling verify key
signal_vk: VerifyingKey,
} }
impl EventGraph { impl EventGraph {
@@ -379,8 +391,30 @@ impl EventGraph {
hours_rotation: u64, hours_rotation: u64,
ex: Arc<Executor<'_>>, ex: Arc<Executor<'_>>,
) -> Result<EventGraphPtr> { ) -> Result<EventGraphPtr> {
// Read or build signal verifying key
let signal_zkbin = include_bytes!("proofs/rlnv2-diff-signal.zk.bin");
let signal_zkbin = ZkBinary::decode(signal_zkbin).unwrap();
let signal_empty_circuit =
ZkCircuit::new(empty_witnesses(&signal_zkbin).unwrap(), &signal_zkbin);
let signal_vk = match sled_db.get("rlnv2-diff-signal-vk")? {
Some(vk) => {
let mut reader = Cursor::new(vk);
VerifyingKey::read(&mut reader, signal_empty_circuit)?
}
None => {
info!(target: "irc::server", "[RLN] Creating RlnV2_Diff_Signal VerifyingKey");
let verifyingkey = VerifyingKey::build(signal_zkbin.k, &signal_empty_circuit);
let mut buf = vec![];
verifyingkey.write(&mut buf)?;
sled_db.insert("rlnv2-diff-signal-vk", buf)?;
verifyingkey
}
};
let broadcasted_ids = RwLock::new(HashSet::new()); let broadcasted_ids = RwLock::new(HashSet::new());
let event_pub = Publisher::new(); let event_pub = Publisher::new();
let static_pub = Publisher::new();
// Create the current genesis event based on the `hours_rotation` // Create the current genesis event based on the `hours_rotation`
let current_genesis = generate_genesis(hours_rotation); let current_genesis = generate_genesis(hours_rotation);
@@ -390,23 +424,28 @@ impl EventGraph {
header_dags: BTreeMap::default(), header_dags: BTreeMap::default(),
main_dags: BTreeMap::default(), main_dags: BTreeMap::default(),
} }
.new(sled_db, hours_rotation) .new(sled_db.clone(), hours_rotation)
.await; .await;
let static_dag = Self::static_new(sled_db).await?;
let self_ = Arc::new(Self { let self_ = Arc::new(Self {
p2p, p2p,
dag_store: RwLock::new(dag_store.clone()), dag_store: RwLock::new(dag_store.clone()),
static_dag,
datastore, datastore,
replay_mode, replay_mode,
fast_mode, fast_mode,
broadcasted_ids, broadcasted_ids,
prune_task: OnceCell::new(), prune_task: OnceCell::new(),
event_pub, event_pub,
static_pub,
current_genesis: RwLock::new(current_genesis.clone()), current_genesis: RwLock::new(current_genesis.clone()),
hours_rotation, hours_rotation,
synced: RwLock::new(false), synced: RwLock::new(false),
deg_enabled: RwLock::new(false), deg_enabled: RwLock::new(false),
deg_publisher: Publisher::new(), deg_publisher: Publisher::new(),
signal_vk,
}); });
// Check if we have it in our DAG. // Check if we have it in our DAG.
@@ -992,6 +1031,60 @@ impl EventGraph {
(next_layer, parents) (next_layer, parents)
} }
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips_static(&self, dag: &sled::Tree) -> LayerUTips {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
for parent in event.header.parents.iter() {
tips.remove(parent);
}
}
// Build the layers map
let mut map: LayerUTips = BTreeMap::new();
for tip in tips {
let event = self.static_fetch(&tip).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.header.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.header.layer, layer_tips);
}
}
map
}
async fn get_next_layer_with_parents_static(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
let unreferenced_tips = self.find_unreferenced_tips_static(&self.static_dag).await;
let mut parents = [NULL_ID; N_EVENT_PARENTS];
let mut index = 0;
'outer: for (_, tips) in unreferenced_tips.iter().rev() {
for tip in tips.iter() {
parents[index] = *tip;
index += 1;
if index >= N_EVENT_PARENTS {
break 'outer;
}
}
}
let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
assert!(parents.iter().any(|x| x != &NULL_ID));
(next_layer, parents)
}
/// Internal function used for DAG sorting. /// Internal function used for DAG sorting.
async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> { async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> {
let mut vec_tips = vec![]; let mut vec_tips = vec![];
@@ -1156,6 +1249,123 @@ impl EventGraph {
Ok(result) Ok(result)
} }
pub async fn static_new(sled_db: sled::Db) -> Result<sled::Tree> {
let static_dag = sled_db.open_tree("static-dag")?;
let genesis = generate_genesis(0);
let mut overlay = SledTreeOverlay::new(&static_dag);
let event_se = serialize_async(&genesis).await;
// Add the event to the overlay
overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
// Aggregate changes into a single batch
let batch = match overlay.aggregate() {
Some(b) => b,
None => return Ok(static_dag),
};
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = static_dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
Ok(static_dag)
}
pub async fn static_sync(&self) -> Result<()> {
self.dag_sync(self.static_dag.clone(), false).await?;
Ok(())
}
pub async fn static_broadcast(&self, event: Event) -> Result<()> {
self.p2p.broadcast(&StaticPut(event)).await;
Ok(())
}
pub async fn static_insert(&self, event: &Event) -> Result<()> {
let mut overlay = SledTreeOverlay::new(&self.static_dag);
let event_se = serialize_async(event).await;
// Add the event to the overlay
overlay.insert(event.id().as_bytes(), &event_se).unwrap();
// Aggregate changes into a single batch
let batch = match overlay.aggregate() {
Some(b) => b,
None => return Ok(()),
};
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.static_dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Send out notifications about the new event
self.static_pub.notify(event.clone()).await;
Ok(())
}
pub async fn static_fetch(&self, event_id: &Hash) -> Result<Option<Event>> {
let Some(bytes) = self.static_dag.get(event_id.as_bytes())? else {
return Ok(None);
};
let event: Event = deserialize_async(&bytes).await?;
return Ok(Some(event))
}
pub async fn static_fetch_all(&self) -> Result<Vec<Event>> {
let mut events = vec![];
for iter_elem in self.static_dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
let event = self.static_fetch(&id).await?.unwrap();
if event.header.parents == NULL_PARENTS {
continue
}
events.push(event);
}
Ok(events)
}
pub async fn static_unreferenced_tips(&self) -> LayerUTips {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in self.static_dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in self.static_dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
for parent in event.header.parents.iter() {
tips.remove(parent);
}
}
// Build the layers map
let mut map: LayerUTips = BTreeMap::new();
for tip in tips {
let event = self.static_fetch(&tip).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.header.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.header.layer, layer_tips);
}
}
map
}
} }
async fn request_header( async fn request_header(

View File

@@ -0,0 +1,48 @@
k = 14;
field = "pallas";
constant "Rlnv2Diff_Signal" {}
witness "Rlnv2Diff_Signal" {
Base identity_nullifier,
Base identity_trapdoor,
Base user_message_limit,
# Inclusion proof, the leaf is the identity_commitment
SparseMerklePath path,
# The message hash
Base x,
Base message_id,
Base epoch,
}
circuit "Rlnv2Diff_Signal" {
# Identity inclusion proof
identity_secret = poseidon_hash(identity_nullifier, identity_trapdoor);
identity_secret_hash = poseidon_hash(identity_secret, user_message_limit);
identity_commitment = poseidon_hash(identity_secret_hash);
root = sparse_merkle_root(identity_commitment, path, identity_commitment);
constrain_instance(root);
# External nullifier is created from epoch and app identifier
app_id = witness_base(1000);
external_nullifier = poseidon_hash(epoch, app_id);
constrain_instance(external_nullifier);
# Calculating internal nullifier
# a_0 = identity_secret_hash
a_0 = poseidon_hash(identity_nullifier, identity_trapdoor);
a_1 = poseidon_hash(a_0, external_nullifier, message_id);
x_a_1 = base_mul(x, a_1);
y = base_add(a_0, x_a_1);
constrain_instance(x);
constrain_instance(y);
# Constrain message_id to be lower than actual message limit.
less_than_strict(message_id, user_message_limit);
internal_nullifier = poseidon_hash(a_1);
constrain_instance(internal_nullifier);
}

View File

@@ -25,8 +25,11 @@ use std::{
}, },
}; };
use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable}; use darkfi_sdk::pasta::pallas;
use log::{debug, error, trace, warn}; use darkfi_serial::{
async_trait, deserialize_async, deserialize_async_partial, SerialDecodable, SerialEncodable,
};
use log::{debug, error, info, trace, warn};
use smol::Executor; use smol::Executor;
use super::{event::Header, Event, EventGraphPtr, LayerUTips, NULL_ID, NULL_PARENTS}; use super::{event::Header, Event, EventGraphPtr, LayerUTips, NULL_ID, NULL_PARENTS};
@@ -39,6 +42,7 @@ use crate::{
}, },
system::msleep, system::msleep,
util::time::NanoTimestamp, util::time::NanoTimestamp,
zk::Proof,
Error, Result, Error, Result,
}; };
@@ -105,6 +109,8 @@ pub struct ProtocolEventGraph {
event_graph: EventGraphPtr, event_graph: EventGraphPtr,
/// `MessageSubscriber` for `EventPut` /// `MessageSubscriber` for `EventPut`
ev_put_sub: MessageSubscription<EventPut>, ev_put_sub: MessageSubscription<EventPut>,
/// `MessageSubscriber` for `StaticPut`
st_put_sub: MessageSubscription<StaticPut>,
/// `MessageSubscriber` for `EventReq` /// `MessageSubscriber` for `EventReq`
ev_req_sub: MessageSubscription<EventReq>, ev_req_sub: MessageSubscription<EventReq>,
/// `MessageSubscriber` for `EventRep` /// `MessageSubscriber` for `EventRep`
@@ -132,9 +138,15 @@ pub struct ProtocolEventGraph {
/// A P2P message representing publishing an event on the network /// A P2P message representing publishing an event on the network
#[derive(Clone, SerialEncodable, SerialDecodable)] #[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventPut(pub Event); pub struct EventPut(pub Event, pub Vec<u8>);
impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION); impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing publishing an event of a static graph
/// (most likely RLN_identities) on the network
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct StaticPut(pub Event);
impl_p2p_message!(StaticPut, "EventGraph::StaticPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing an event request /// A P2P message representing an event request
#[derive(Clone, SerialEncodable, SerialDecodable)] #[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventReq(pub Vec<blake3::Hash>); pub struct EventReq(pub Vec<blake3::Hash>);
@@ -175,6 +187,7 @@ impl ProtocolBase for ProtocolEventGraph {
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> { async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
self.jobsman.clone().start(ex.clone()); self.jobsman.clone().start(ex.clone());
self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await; self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_static_put(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await; self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
// self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await;
// self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
@@ -193,6 +206,7 @@ impl ProtocolEventGraph {
pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> { pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.message_subsystem(); let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<EventPut>().await; msg_subsystem.add_dispatch::<EventPut>().await;
msg_subsystem.add_dispatch::<StaticPut>().await;
msg_subsystem.add_dispatch::<EventReq>().await; msg_subsystem.add_dispatch::<EventReq>().await;
msg_subsystem.add_dispatch::<EventRep>().await; msg_subsystem.add_dispatch::<EventRep>().await;
msg_subsystem.add_dispatch::<HeaderPut>().await; msg_subsystem.add_dispatch::<HeaderPut>().await;
@@ -202,6 +216,7 @@ impl ProtocolEventGraph {
msg_subsystem.add_dispatch::<TipRep>().await; msg_subsystem.add_dispatch::<TipRep>().await;
let ev_put_sub = channel.subscribe_msg::<EventPut>().await?; let ev_put_sub = channel.subscribe_msg::<EventPut>().await?;
let st_put_sub = channel.subscribe_msg::<StaticPut>().await?;
let ev_req_sub = channel.subscribe_msg::<EventReq>().await?; let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?; let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
let _hdr_put_sub = channel.subscribe_msg::<HeaderPut>().await?; let _hdr_put_sub = channel.subscribe_msg::<HeaderPut>().await?;
@@ -216,6 +231,7 @@ impl ProtocolEventGraph {
channel: channel.clone(), channel: channel.clone(),
event_graph, event_graph,
ev_put_sub, ev_put_sub,
st_put_sub,
ev_req_sub, ev_req_sub,
ev_rep_sub, ev_rep_sub,
_hdr_put_sub, _hdr_put_sub,
@@ -258,8 +274,8 @@ impl ProtocolEventGraph {
let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME); let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
loop { loop {
let event = match self.ev_put_sub.receive().await { let (event, blob) = match self.ev_put_sub.receive().await {
Ok(v) => v.0.clone(), Ok(v) => (v.0.clone(), v.1.clone()),
Err(_) => continue, Err(_) => continue,
}; };
trace!( trace!(
@@ -276,9 +292,57 @@ impl ProtocolEventGraph {
continue continue
} }
////////////////////
let mut verification_failed = false;
#[allow(clippy::never_loop)]
loop {
let (proof, public_inputs): (Proof, Vec<pallas::Base>) =
match deserialize_async_partial(&blob).await {
Ok((v, _)) => v,
Err(e) => {
error!(target: "event_graph::protocol::handle_event_put()","[EVENTGRAPH] Failed deserializing event ephemeral data: {}", e);
break
}
};
if public_inputs.len() != 2 {
error!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Received event has the wrong number of public inputs");
verification_failed = true;
break
}
// info!("public_inputs: {:?}", public_inputs);
info!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Verifying incoming Event RLN proof");
// let epoch = pallas::Base::from(closest_epoch(event.header.timestamp));
// let external_nullifier = poseidon_hash([epoch, RLN_APP_IDENTIFIER]);
// let x = hash_event(event);
// let y = public_inputs[0];
// let internal_nullifier = public_inputs[1];
// verification_failed =
// proof.verify(&self.event_graph.signal_vk, &public_inputs).is_err();
verification_failed =
match proof.verify(&self.event_graph.signal_vk, &public_inputs) {
Ok(()) => false,
Err(e) => {
error!("error: {e}");
true
}
};
break
}
if verification_failed {
error!(target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Incoming Event proof verification failed");
continue
}
////////////////////
// If we have already seen the event, we'll stay quiet. // If we have already seen the event, we'll stay quiet.
let current_genesis = self.event_graph.current_genesis.read().await; let current_genesis = self.event_graph.current_genesis.read().await;
let dag_name = current_genesis.header.timestamp.to_string(); let genesis_timestamp = current_genesis.header.timestamp;
let dag_name = genesis_timestamp.to_string();
let hdr_tree_name = format!("headers_{dag_name}"); let hdr_tree_name = format!("headers_{dag_name}");
let event_id = event.id(); let event_id = event.id();
if self if self
@@ -318,7 +382,6 @@ impl ProtocolEventGraph {
// The genesis event marks the last time the Dag has been pruned of old // The genesis event marks the last time the Dag has been pruned of old
// events. The pruning interval is defined by the days_rotation field // events. The pruning interval is defined by the days_rotation field
// of [`EventGraph`]. // of [`EventGraph`].
let genesis_timestamp = self.event_graph.current_genesis.read().await.header.timestamp;
if event.header.timestamp < genesis_timestamp { if event.header.timestamp < genesis_timestamp {
debug!( debug!(
target: "event_graph::protocol::handle_event_put()", target: "event_graph::protocol::handle_event_put()",
@@ -512,7 +575,87 @@ impl ProtocolEventGraph {
continue continue
} }
self.broadcaster_push.send(EventPut(event)).await.expect("push broadcaster closed"); self.broadcaster_push
.send(EventPut(event, blob))
.await
.expect("push broadcaster closed");
}
}
async fn handle_static_put(self: Arc<Self>) -> Result<()> {
// Rolling window of event timestamps on this channel
let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
loop {
let event = match self.st_put_sub.receive().await {
Ok(v) => v.0.clone(),
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_event_put()",
"Got EventPut: {} [{}]", event.id(), self.channel.address(),
);
info!("Received a static event: {:?}", event);
// Check if node has finished syncing its DAG
if !*self.event_graph.synced.read().await {
debug!(
target: "event_graph::protocol::handle_event_put",
"DAG is still syncing, skipping..."
);
continue
}
let event_id = event.id();
if self.event_graph.static_dag.contains_key(event_id.as_bytes())? {
debug!(
target: "event_graph::protocol::handle_static_put()",
"Event {} is already known", event_id,
);
continue
}
// Check if event's parents are in the static DAG
for parent in event.header.parents.iter() {
if *parent == NULL_ID {
continue
}
if !self.event_graph.static_dag.contains_key(parent.as_bytes())? {
debug!(
target: "event_graph::protocol::handle_static_put()",
"Event {} is orphan", event_id,
);
return Err(Error::EventNotFound("Event is orphan".to_owned()))
}
}
// There's a new unique event.
// Apply ban logic to stop network floods.
bantimes.ticktock();
if bantimes.count() > WINDOW_MAXSIZE {
self.channel.ban().await;
// This error is actually unused. We could return Ok here too.
return Err(Error::MaliciousFlood)
}
// Validate the new event first. If we do not consider it valid, we
// will just drop it and stay quiet. If the malicious threshold
// is reached, we will stop the connection.
if !event.validate_new() {
self.clone().increase_malicious_count().await?;
continue
}
// At this point, this is a new event to us. Let's see if we
// have all of its parents.
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is new", event_id,
);
self.event_graph.static_insert(&event).await?;
self.event_graph.static_broadcast(event).await?
} }
} }
@@ -700,11 +843,20 @@ impl ProtocolEventGraph {
// We received a tip request. Let's find them, add them to // We received a tip request. Let's find them, add them to
// our bcast ids list, and reply with them. // our bcast ids list, and reply with them.
let dag_timestamp = u64::from_str(&dag_name)?; let layers = match dag_name.as_str() {
let store = self.event_graph.dag_store.read().await; "static-dag" => {
let (_, layers) = match store.header_dags.get(&dag_timestamp) { let tips = self.event_graph.static_unreferenced_tips().await;
Some(v) => v, &tips.clone()
None => continue, }
_ => {
let dag_timestamp = u64::from_str(&dag_name)?;
let store = self.event_graph.dag_store.read().await;
let (_, layers) = match store.header_dags.get(&dag_timestamp) {
Some(v) => v,
None => continue,
};
&layers.clone()
}
}; };
// let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await; // let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await;
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;

View File

@@ -248,7 +248,7 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
drop(store); drop(store);
drop(current_genesis); drop(current_genesis);
info!("Broadcasting event {}", event_id); info!("Broadcasting event {}", event_id);
random_node.p2p.broadcast(&EventPut(event)).await; random_node.p2p.broadcast(&EventPut(event, vec![])).await;
info!("Waiting 5s for event propagation"); info!("Waiting 5s for event propagation");
sleep(5).await; sleep(5).await;
@@ -292,7 +292,7 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
info!("Broadcasting event {}", event2_id); info!("Broadcasting event {}", event2_id);
info!("Event chain: {:#?}", event_chain); info!("Event chain: {:#?}", event_chain);
random_node.p2p.broadcast(&EventPut(event2)).await; random_node.p2p.broadcast(&EventPut(event2, vec![])).await;
info!("Waiting 5s for event propagation"); info!("Waiting 5s for event propagation");
sleep(5).await; sleep(5).await;
@@ -310,19 +310,19 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await; let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await;
node1.header_dag_insert(vec![event0_1.header.clone()], &dag_name).await.unwrap(); node1.header_dag_insert(vec![event0_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event0_1.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event0_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event0_1)).await; node1.p2p.broadcast(&EventPut(event0_1, vec![])).await;
msleep(300).await; msleep(300).await;
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await; let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await;
node1.header_dag_insert(vec![event1_1.header.clone()], &dag_name).await.unwrap(); node1.header_dag_insert(vec![event1_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event1_1.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event1_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event1_1)).await; node1.p2p.broadcast(&EventPut(event1_1, vec![])).await;
msleep(300).await; msleep(300).await;
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await; let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await;
node1.header_dag_insert(vec![event2_1.header.clone()], &dag_name).await.unwrap(); node1.header_dag_insert(vec![event2_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event2_1.clone()], &dag_name).await.unwrap(); node1.dag_insert(&[event2_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event2_1)).await; node1.p2p.broadcast(&EventPut(event2_1, vec![])).await;
msleep(300).await; msleep(300).await;
// ======= // =======
@@ -332,19 +332,19 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await; let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await;
node2.header_dag_insert(vec![event0_2.header.clone()], &dag_name).await.unwrap(); node2.header_dag_insert(vec![event0_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event0_2.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event0_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_2)).await; node2.p2p.broadcast(&EventPut(event0_2, vec![])).await;
msleep(300).await; msleep(300).await;
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await; let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await;
node2.header_dag_insert(vec![event1_2.header.clone()], &dag_name).await.unwrap(); node2.header_dag_insert(vec![event1_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event1_2.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event1_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_2)).await; node2.p2p.broadcast(&EventPut(event1_2, vec![])).await;
msleep(300).await; msleep(300).await;
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await; let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await;
node2.header_dag_insert(vec![event2_2.header.clone()], &dag_name).await.unwrap(); node2.header_dag_insert(vec![event2_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event2_2.clone()], &dag_name).await.unwrap(); node2.dag_insert(&[event2_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event2_2)).await; node2.p2p.broadcast(&EventPut(event2_2, vec![])).await;
msleep(300).await; msleep(300).await;
// ======= // =======
@@ -354,19 +354,19 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await; let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await;
node3.header_dag_insert(vec![event0_3.header.clone()], &dag_name).await.unwrap(); node3.header_dag_insert(vec![event0_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event0_3.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event0_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event0_3)).await; node3.p2p.broadcast(&EventPut(event0_3, vec![])).await;
msleep(300).await; msleep(300).await;
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await; let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await;
node3.header_dag_insert(vec![event1_3.header.clone()], &dag_name).await.unwrap(); node3.header_dag_insert(vec![event1_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event1_3.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event1_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event1_3)).await; node3.p2p.broadcast(&EventPut(event1_3, vec![])).await;
msleep(300).await; msleep(300).await;
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await; let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await;
node3.header_dag_insert(vec![event2_3.header.clone()], &dag_name).await.unwrap(); node3.header_dag_insert(vec![event2_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event2_3.clone()], &dag_name).await.unwrap(); node3.dag_insert(&[event2_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event2_3)).await; node3.p2p.broadcast(&EventPut(event2_3, vec![])).await;
msleep(300).await; msleep(300).await;
// ///// // /////
@@ -464,7 +464,7 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
let dag_name = current_genesis.header.timestamp.to_string(); let dag_name = current_genesis.header.timestamp.to_string();
random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap(); random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap();
random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap(); random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap();
random_node.p2p.broadcast(&EventPut(event)).await; random_node.p2p.broadcast(&EventPut(event, vec![])).await;
} }
info!("Waiting 5s for events propagation"); info!("Waiting 5s for events propagation");
sleep(5).await; sleep(5).await;