Implement delivery service using redis (#11)

* add redis connection

* add docker

* add docker

* fix maping error
This commit is contained in:
Ekaterina Broslavskaya
2024-07-01 18:40:11 +07:00
committed by GitHub
parent 30b8ebb4a7
commit 35d2104e3f
7 changed files with 177 additions and 211 deletions

View File

@@ -1,4 +1,4 @@
workspace = { members = [ "sc_key_store","ds"] }
workspace = { members = ["sc_key_store", "ds"] }
[package]
name = "de-mls"
version = "0.1.0"
@@ -14,6 +14,7 @@ openmls_traits = "=0.2.0"
# waku-bindings = "0.6.0"
bus = "=2.4.1"
tokio = "=1.38.0"
rand = "=0.8.5"
@@ -21,4 +22,4 @@ anyhow = "=1.0.71"
thiserror = "=1.0.61"
ds = { path = "ds" }
sc_key_store = {path = "sc_key_store" }
sc_key_store = { path = "sc_key_store" }

View File

@@ -1,6 +1,10 @@
# de-mls
Decentralized MLS PoC using a smart contract for group coordination
## Run Redis Server
`docker-compose up`
## Install deps
1. `Foundry`

6
docker-compose.yml Normal file
View File

@@ -0,0 +1,6 @@
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- 6379:6379

View File

@@ -9,11 +9,15 @@ edition = "2021"
# chrono = "=0.4.38"
# waku-bindings = "=0.6.0"
bus = "=2.4.1"
fred = { version = "=9.0.3", features = ["subscriber-client"] }
tokio = "=1.38.0"
openmls = "=0.5.0"
openmls = { version = "=0.5.0", features = ["test-utils"] }
rand = { version = "^0.8" }
anyhow = "=1.0.71"
thiserror = "=1.0.61"
tls_codec = "=0.3.0"
sc_key_store = { path = "../sc_key_store" }

View File

@@ -1,89 +1,74 @@
use bus::{Bus, BusReader};
// use chrono::Utc;
use std::collections::HashMap;
use fred::{
clients::{RedisClient, SubscriberClient},
error::RedisError,
prelude::*,
types::Message,
};
use openmls::framing::MlsMessageIn;
use tokio::sync::broadcast::{error::RecvError, Receiver};
use openmls::{
framing::{MlsMessageIn, MlsMessageOut},
prelude::{TlsDeserializeTrait, TlsSerializeTrait},
};
// use waku_bindings::*;
use sc_key_store::{pks::PublicKeyStorage, KeyStoreError};
#[derive(Debug)]
pub struct DSClient {
// node: WakuNodeHandle<Running>,
pub pub_node: Bus<MlsMessageIn>,
pub sub_node: HashMap<Vec<u8>, BusReader<MlsMessageIn>>,
pub struct RClient {
group_id: String,
client: RedisClient,
sub_client: SubscriberClient,
broadcaster: Receiver<Message>,
}
impl DSClient {
pub fn new_with_subscriber(id: Vec<u8>) -> Result<DSClient, DeliveryServiceError> {
let mut ds = DSClient {
pub_node: Bus::new(10),
sub_node: HashMap::new(),
};
ds.add_subscriber(id)?;
Ok(ds)
impl RClient {
pub async fn new_for_group(group_id: String) -> Result<Self, DeliveryServiceError> {
let redis_client = RedisClient::default();
let subscriber: SubscriberClient =
Builder::default_centralized().build_subscriber_client()?;
redis_client.init().await?;
subscriber.init().await?;
subscriber.subscribe(group_id.clone()).await?;
Ok(RClient {
group_id,
client: redis_client,
sub_client: subscriber.clone(),
broadcaster: subscriber.message_rx(),
})
}
pub fn add_subscriber(&mut self, id: Vec<u8>) -> Result<(), DeliveryServiceError> {
let rx = self.pub_node.add_rx();
self.sub_node.insert(id, rx);
pub async fn remove_from_group(&mut self) -> Result<(), DeliveryServiceError> {
self.sub_client.unsubscribe(self.group_id.clone()).await?;
self.sub_client.quit().await?;
self.client.quit().await?;
Ok(())
}
pub fn msg_send(
&mut self,
msg: MlsMessageIn,
// pubsub_topic: WakuPubSubTopic,
// content_topic: WakuContentTopic,
) -> Result<(), DeliveryServiceError> {
// let buff = self.msg.tls_serialize_detached().unwrap();
// Message::encode(&self.msg, &mut buff).expect("Could not encode :(");
self.pub_node.broadcast(msg);
// let waku_message = WakuMessage::new(
// buff,
// content_topic,
// 2,
// Utc::now().timestamp() as usize,
// vec![],
// true,
// );
// node_handle
// .node
// .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None)
// .map_err(|e| {
// debug!(
// error = tracing::field::debug(&e),
// "Failed to relay publish the message"
// );
// WakuHandlingError::PublishMessage(e)
// });
pub async fn msg_send(&mut self, msg: MlsMessageOut) -> Result<(), DeliveryServiceError> {
let buf = msg.tls_serialize_detached()?;
self.client
.publish(self.group_id.clone(), buf.as_slice())
.await?;
Ok(())
}
pub fn msg_recv(
&mut self,
id: &[u8],
pks: &PublicKeyStorage,
) -> Result<MlsMessageIn, DeliveryServiceError> {
let node: &mut BusReader<MlsMessageIn> = match self.sub_node.get_mut(id) {
Some(node) => node,
None => return Err(DeliveryServiceError::EmptySubNodeError),
};
node.recv().map_err(DeliveryServiceError::RecieveError)
pub async fn msg_recv(&mut self) -> Result<MlsMessageIn, DeliveryServiceError> {
// check only one message
let msg = self.broadcaster.recv().await?;
let bytes: Vec<u8> = msg.value.convert()?;
let res = MlsMessageIn::tls_deserialize_bytes(bytes)?;
Ok(res)
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeliveryServiceError {
#[error("Could not get data from Key Store: {0}")]
KeyStoreError(KeyStoreError),
#[error("Unauthorized User")]
UnauthorizedUserError,
#[error("Subscriber doesn't exist")]
EmptySubNodeError,
#[error("Reciever error: {0}")]
RecieveError(#[from] std::sync::mpsc::RecvError),
#[error("Redis error: {0}")]
RedisError(#[from] RedisError),
#[error("Tokio error: {0}")]
TokioRecieveError(#[from] RecvError),
#[error("Serialization problem: {0}")]
TlsError(#[from] tls_codec::Error),
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}

View File

@@ -3,14 +3,15 @@ mod identity;
mod openmls_provider;
mod user;
use std::{rc::Rc, str::FromStr};
use std::str::FromStr;
use bus::Bus;
use openmls::framing::{MlsMessageIn, MlsMessageInBody};
use sc_key_store::pks::PublicKeyStorage;
use user::User;
fn main() {
#[tokio::main]
async fn main() {
let mut pks = PublicKeyStorage::default();
// This channel for message before adding to group.
// Message are still encrypted, but this channel not attached to any group
@@ -41,19 +42,21 @@ fn main() {
//// Alice create group: Alice_Group
println!("Start create group");
let group_name = String::from_str("Alice_Group").unwrap();
let res = a_user.create_group(group_name.clone());
let res = a_user.create_group(group_name.clone()).await;
assert!(res.is_ok());
assert!(a_user.groups.borrow().contains_key("Alice_Group"));
assert!(a_user.groups.contains_key("Alice_Group"));
println!("Create group successfully");
//////
//// Alice invite Bob
println!("Alice inviting Bob");
let welcome = a_user.invite(b_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(b_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
//// Send welcome message to system broadcast. Only Bob can use it
@@ -63,13 +66,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = b_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = b_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(b_user.groups.borrow().contains_key("Alice_Group"));
assert!(b_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
@@ -78,26 +77,26 @@ fn main() {
/////
//// Bob send message and Alice recieve it
let res = b_user.send_msg("Hi!", group_name.clone());
let res = b_user.send_msg("Hi!", group_name.clone()).await;
assert!(res.is_ok());
// Bob also get the message but he cant decrypt it (regarding the mls rfc)
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
// Expected error with invalid decryption
assert!(res.is_err());
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////
//// Alice send message and Bob recieve it
let res = a_user.send_msg("Hi Bob!", group_name.clone());
let res = a_user.send_msg("Hi Bob!", group_name.clone()).await;
assert!(res.is_ok());
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
/////
@@ -119,13 +118,15 @@ fn main() {
//// Alice invite Carla
println!("Alice inviting Carla");
let welcome = a_user.invite(c_user.username(), group_name.clone(), &mut pks);
let welcome = a_user
.invite(c_user.username(), group_name.clone(), &mut pks)
.await;
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
//// Send welcome message to system broadcast. Only Bob can use it
@@ -136,13 +137,9 @@ fn main() {
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = c_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
let res = c_user.join_group(welcome).await;
assert!(res.is_ok());
assert!(c_user.groups.borrow().contains_key("Alice_Group"));
assert!(c_user.groups.contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
@@ -151,16 +148,16 @@ fn main() {
/////
//// Carla send message and Alice and Bob recieve it
let res = c_user.send_msg("Hi all!", group_name.clone());
let res = c_user.send_msg("Hi all!", group_name.clone()).await;
assert!(res.is_ok());
let res = c_user.recieve_msg(group_name.clone(), &pks);
let res = c_user.recieve_msg(group_name.clone()).await;
assert!(res.is_err());
let res = a_user.recieve_msg(group_name.clone(), &pks);
let res = a_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
let res = b_user.recieve_msg(group_name.clone(), &pks);
let res = b_user.recieve_msg(group_name.clone()).await;
assert!(res.is_ok());
////

View File

@@ -1,6 +1,6 @@
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::{cell::RefCell, collections::HashMap, rc::Rc, str};
use std::{cell::RefCell, collections::HashMap, str};
use ds::ds::*;
use openmls::{group::*, prelude::*};
@@ -13,19 +13,18 @@ use crate::conversation::*;
use crate::identity::{Identity, IdentityError};
use crate::openmls_provider::{CryptoProvider, CIPHERSUITE};
#[derive(Debug)]
pub struct Group {
group_name: String,
conversation: Conversation,
mls_group: RefCell<MlsGroup>,
pub ds_node: Rc<RefCell<DSClient>>,
rc_client: RClient,
// pubsub_topic: WakuPubSubTopic,
// content_topics: Vec<WakuContentTopic>,
}
pub struct User {
pub(crate) identity: RefCell<Identity>,
pub(crate) groups: RefCell<HashMap<String, Group>>,
pub(crate) identity: Identity,
pub(crate) groups: HashMap<String, Group>,
provider: CryptoProvider,
local_ks: LocalCache,
// pub(crate) contacts: HashMap<Vec<u8>, WakuPeers>,
@@ -37,8 +36,8 @@ impl User {
let crypto = CryptoProvider::default();
let id = Identity::new(CIPHERSUITE, &crypto, username)?;
Ok(User {
groups: RefCell::new(HashMap::new()),
identity: RefCell::new(id),
groups: HashMap::new(),
identity: id,
provider: crypto,
local_ks: LocalCache::empty_key_store(username),
// contacts: HashMap::new(),
@@ -46,26 +45,17 @@ impl User {
}
pub(crate) fn username(&self) -> String {
self.identity.borrow().to_string()
self.identity.to_string()
}
pub(crate) fn id(&self) -> Vec<u8> {
self.identity.borrow().identity()
self.identity.identity()
}
fn is_signature_eq(&self, sign: &Vec<u8>) -> bool {
self.identity
.borrow()
.credential_with_key
.signature_key
.as_slice()
== sign
}
pub fn create_group(&mut self, group_name: String) -> Result<(), UserError> {
pub async fn create_group(&mut self, group_name: String) -> Result<(), UserError> {
let group_id = group_name.as_bytes();
if self.groups.borrow().contains_key(&group_name) {
if self.groups.contains_key(&group_name) {
return Err(UserError::UnknownGroupError(group_name));
}
@@ -75,39 +65,39 @@ impl User {
let mls_group = MlsGroup::new_with_group_id(
&self.provider,
&self.identity.borrow().signer,
&self.identity.signer,
&group_config,
GroupId::from_slice(group_id),
self.identity.borrow().credential_with_key.clone(),
self.identity.credential_with_key.clone(),
)?;
let ds = DSClient::new_with_subscriber(self.id())?;
let rc = RClient::new_for_group(group_name.clone()).await?;
let group = Group {
group_name: group_name.clone(),
conversation: Conversation::default(),
mls_group: RefCell::new(mls_group),
ds_node: Rc::new(RefCell::new(ds)),
rc_client: rc,
// pubsub_topic: WakuPubSubTopic::new(),
// content_topics: Vec::new(),
};
self.groups.borrow_mut().insert(group_name, group);
self.groups.insert(group_name, group);
Ok(())
}
pub fn register(&mut self, mut pks: &mut PublicKeyStorage) -> Result<(), UserError> {
pks.add_user(self.key_packages(), self.identity.borrow().signer.public())?;
pks.add_user(self.key_packages(), self.identity.signer.public())?;
self.local_ks.get_update_from_smart_contract(pks)?;
Ok(())
}
/// Get the key packages fo this user.
pub fn key_packages(&self) -> UserKeyPackages {
let mut kpgs = self.identity.borrow().kp.clone();
let mut kpgs = self.identity.kp.clone();
UserKeyPackages(kpgs.drain().collect::<Vec<(Vec<u8>, KeyPackage)>>())
}
pub fn invite(
pub async fn invite(
&mut self,
username: String,
group_name: String,
@@ -122,20 +112,18 @@ impl User {
let joiner_key_package = pks.get_avaliable_user_kp(username.as_bytes())?;
// Build a proposal with this key package and do the MLS bits.
let mut groups = self.groups.borrow_mut();
let group = match groups.get_mut(&group_name) {
let group = match self.groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let (out_messages, welcome, _group_info) = group.mls_group.borrow_mut().add_members(
&self.provider,
&self.identity.borrow().signer,
&self.identity.signer,
&[joiner_key_package],
)?;
let msg = out_messages.into();
group.ds_node.as_ref().borrow_mut().msg_send(msg)?;
group.rc_client.msg_send(out_messages).await?;
// Second, process the invitation on our end.
group
.mls_group
@@ -143,28 +131,18 @@ impl User {
.merge_pending_commit(&self.provider)?;
// Put sending welcome by p2p here
// group.ds_node.as_ref().borrow_mut().msg_send(welcome.into())?;
drop(groups);
Ok(welcome.into())
}
pub fn recieve_msg(&self, group_name: String, pks: &PublicKeyStorage) -> Result<(), UserError> {
let msg = {
let groups = self.groups.borrow();
let group = match groups.get(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let msg = group
.ds_node
.as_ref()
.borrow_mut()
.msg_recv(self.id().as_ref(), pks)?;
msg
pub async fn recieve_msg(&mut self, group_name: String) -> Result<(), UserError> {
let group = match self.groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let msg = group.rc_client.msg_recv().await?;
match msg.extract() {
MlsMessageInBody::Welcome(_welcome) => {
// Now irrelevant because message are attached to group
@@ -181,10 +159,9 @@ impl User {
Ok(())
}
fn process_protocol_msg(&self, message: ProtocolMessage) -> Result<(), UserError> {
let mut groups = self.groups.borrow_mut();
fn process_protocol_msg(&mut self, message: ProtocolMessage) -> Result<(), UserError> {
let group_name = str::from_utf8(message.group_id().as_slice())?;
let group = match groups.get_mut(group_name) {
let group = match self.groups.get_mut(group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name.to_string())),
};
@@ -202,7 +179,6 @@ impl User {
== processed_message_credential.identity()
&& (self
.identity
.borrow()
.credential_with_key
.signature_key
.as_slice()
@@ -236,7 +212,7 @@ impl User {
if remove_proposal {
println!(
"update::Processing StagedCommitMessage removing {} from group {} ",
self.username(),
self.identity.to_string(),
group.group_name
);
return Ok(());
@@ -246,29 +222,23 @@ impl User {
Ok(())
}
pub fn send_msg(&mut self, msg: &str, group_name: String) -> Result<(), UserError> {
let groups = self.groups.borrow();
let group = match groups.get(&group_name) {
pub async fn send_msg(&mut self, msg: &str, group_name: String) -> Result<(), UserError> {
let group = match self.groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let message_out = group.mls_group.borrow_mut().create_message(
&self.provider,
&self.identity.borrow().signer,
&self.identity.signer,
msg.as_bytes(),
)?;
group
.ds_node
.as_ref()
.borrow_mut()
.msg_send(message_out.into())?;
group.rc_client.msg_send(message_out).await?;
Ok(())
}
pub fn join_group(&self, welcome: Welcome, ds: Rc<RefCell<DSClient>>) -> Result<(), UserError> {
pub async fn join_group(&mut self, welcome: Welcome) -> Result<(), UserError> {
let group_config = MlsGroupConfig::builder()
.use_ratchet_tree_extension(true)
.build();
@@ -278,67 +248,38 @@ impl User {
let group_id = mls_group.group_id().to_vec();
let group_name = String::from_utf8(group_id)?;
ds.borrow_mut()
.add_subscriber(self.identity.borrow().identity())?;
let rc = RClient::new_for_group(group_name.clone()).await?;
let group = Group {
group_name: group_name.clone(),
conversation: Conversation::default(),
mls_group: RefCell::new(mls_group),
ds_node: ds,
rc_client: rc,
};
match self.groups.borrow_mut().insert(group_name, group) {
match self.groups.insert(group_name, group) {
Some(old) => Err(UserError::AlreadyExistedGroupError(old.group_name)),
None => Ok(()),
}
}
/// Get a member
fn find_member_index(&self, name: String, group: &Group) -> Result<LeafNodeIndex, UserError> {
let mls_group = group.mls_group.borrow();
let member = mls_group
.members()
.find(|m| m.credential.identity().eq(name.as_bytes()));
match member {
Some(m) => Ok(m.index),
None => Err(UserError::UnknownGroupMemberError(name)),
}
}
fn group_members(&self, group: &Group) -> Result<Vec<Vec<u8>>, UserError> {
let mls_group = group.mls_group.borrow();
let members: Vec<Vec<u8>> = mls_group
.members()
.filter(|m| self.is_signature_eq(&m.signature_key))
.map(|m| m.credential.identity().to_vec())
.collect();
Ok(members)
}
pub fn remove(&mut self, name: String, group_name: String) -> Result<(), UserError> {
pub async fn remove(&mut self, name: String, group_name: String) -> Result<(), UserError> {
// Get the group ID
let mut groups = self.groups.borrow_mut();
let group = match groups.get_mut(&group_name) {
let group = match self.groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
// Get the user leaf index
let leaf_index = self.find_member_index(name, group)?;
let leaf_index = group.find_member_index(name)?;
// Remove operation on the mls group
let (remove_message, _welcome, _group_info) = group.mls_group.borrow_mut().remove_members(
&self.provider,
&self.identity.borrow().signer,
&self.identity.signer,
&[leaf_index],
)?;
let msg = remove_message.into();
group.ds_node.as_ref().borrow_mut().msg_send(msg)?;
group.rc_client.msg_send(remove_message).await?;
// Second, process the removal on our end.
group
@@ -346,8 +287,6 @@ impl User {
.borrow_mut()
.merge_pending_commit(&self.provider)?;
drop(groups);
Ok(())
}
@@ -356,8 +295,7 @@ impl User {
&self,
group_name: String,
) -> Result<Option<Vec<ConversationMessage>>, UserError> {
let groups = self.groups.borrow();
groups.get(&group_name).map_or_else(
self.groups.get(&group_name).map_or_else(
|| Err(UserError::UnknownGroupError(group_name)),
|g| {
Ok(g.conversation
@@ -368,20 +306,51 @@ impl User {
}
}
impl Group {
/// Get a member
fn find_member_index(&self, name: String) -> Result<LeafNodeIndex, GroupError> {
let member = self
.mls_group
.borrow()
.members()
.find(|m| m.credential.identity().eq(name.as_bytes()));
match member {
Some(m) => Ok(m.index),
None => Err(GroupError::UnknownGroupMemberError(name)),
}
}
fn group_members(&self, user_signature: &[u8]) -> Vec<Vec<u8>> {
self.mls_group
.borrow()
.members()
.filter(|m| m.signature_key == user_signature)
.map(|m| m.credential.identity().to_vec())
.collect::<Vec<Vec<u8>>>()
}
}
#[derive(Debug, thiserror::Error)]
pub enum GroupError {
#[error("Unknown group member : {0}")]
UnknownGroupMemberError(String),
}
#[derive(Debug, thiserror::Error)]
pub enum UserError {
#[error("Unknown group: {0}")]
UnknownGroupError(String),
#[error("Group already exist: {0}")]
AlreadyExistedGroupError(String),
#[error("Unknown group member : {0}")]
UnknownGroupMemberError(String),
#[error("Unsupported message type")]
MessageTypeError,
#[error("Unknown user")]
UnknownUserError,
#[error("Delivery Service error: {0}")]
DeliveryServiceError(#[from] DeliveryServiceError),
#[error(transparent)]
GroupError(#[from] GroupError),
#[error("Key Store error: {0}")]
KeyStoreError(#[from] KeyStoreError),
#[error("Identity error: {0}")]