de-MLS draft architecture

* first commit

* update

* mend

* update code

* refactor code

* prepare to smart contract integration

* mend

* fix after review
This commit is contained in:
Ekaterina Broslavskaya
2024-06-25 18:18:57 +07:00
committed by GitHub
parent b9e98f8fbd
commit 61abb1af1f
14 changed files with 1086 additions and 0 deletions

3
.gitignore vendored
View File

@@ -12,3 +12,6 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
.DS_Store
src/.DS_Store

24
Cargo.toml Normal file
View File

@@ -0,0 +1,24 @@
workspace = { members = [ "sc_key_store","ds"] }
[package]
name = "de-mls"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
openmls = { version = "=0.5.0", features = ["test-utils"] }
openmls_basic_credential = "=0.2.0"
openmls_rust_crypto = "=0.2.0"
openmls_traits = "=0.2.0"
# waku-bindings = "0.6.0"
bus = "=2.4.1"
rand = "=0.8.5"
anyhow = "=1.0.71"
thiserror = "=1.0.61"
ds = { path = "ds" }
sc_key_store = {path = "sc_key_store" }

19
ds/Cargo.toml Normal file
View File

@@ -0,0 +1,19 @@
[package]
name = "ds"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# chrono = "=0.4.38"
# waku-bindings = "=0.6.0"
bus = "=2.4.1"
openmls = "=0.5.0"
rand = { version = "^0.8" }
anyhow = "=1.0.71"
thiserror = "=1.0.61"
sc_key_store = { path = "../sc_key_store" }

89
ds/src/ds.rs Normal file
View File

@@ -0,0 +1,89 @@
use bus::{Bus, BusReader};
// use chrono::Utc;
use std::collections::HashMap;
use openmls::framing::MlsMessageIn;
// use waku_bindings::*;
use sc_key_store::{pks::PublicKeyStorage, KeyStoreError};
#[derive(Debug)]
pub struct DSClient {
// node: WakuNodeHandle<Running>,
pub pub_node: Bus<MlsMessageIn>,
pub sub_node: HashMap<Vec<u8>, BusReader<MlsMessageIn>>,
}
impl DSClient {
pub fn new_with_subscriber(id: Vec<u8>) -> Result<DSClient, DeliveryServiceError> {
let mut ds = DSClient {
pub_node: Bus::new(10),
sub_node: HashMap::new(),
};
ds.add_subscriber(id)?;
Ok(ds)
}
pub fn add_subscriber(&mut self, id: Vec<u8>) -> Result<(), DeliveryServiceError> {
let rx = self.pub_node.add_rx();
self.sub_node.insert(id, rx);
Ok(())
}
pub fn msg_send(
&mut self,
msg: MlsMessageIn,
// pubsub_topic: WakuPubSubTopic,
// content_topic: WakuContentTopic,
) -> Result<(), DeliveryServiceError> {
// let buff = self.msg.tls_serialize_detached().unwrap();
// Message::encode(&self.msg, &mut buff).expect("Could not encode :(");
self.pub_node.broadcast(msg);
// let waku_message = WakuMessage::new(
// buff,
// content_topic,
// 2,
// Utc::now().timestamp() as usize,
// vec![],
// true,
// );
// node_handle
// .node
// .relay_publish_message(&waku_message, Some(pubsub_topic.clone()), None)
// .map_err(|e| {
// debug!(
// error = tracing::field::debug(&e),
// "Failed to relay publish the message"
// );
// WakuHandlingError::PublishMessage(e)
// });
Ok(())
}
pub fn msg_recv(
&mut self,
id: &[u8],
pks: &PublicKeyStorage,
) -> Result<MlsMessageIn, DeliveryServiceError> {
let node: &mut BusReader<MlsMessageIn> = match self.sub_node.get_mut(id) {
Some(node) => node,
None => return Err(DeliveryServiceError::EmptySubNodeError),
};
node.recv().map_err(DeliveryServiceError::RecieveError)
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeliveryServiceError {
#[error("Could not get data from Key Store: {0}")]
KeyStoreError(KeyStoreError),
#[error("Unauthorized User")]
UnauthorizedUserError,
#[error("Subscriber doesn't exist")]
EmptySubNodeError,
#[error("Reciever error: {0}")]
RecieveError(#[from] std::sync::mpsc::RecvError),
}

1
ds/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod ds;

11
sc_key_store/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "sc_key_store"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
openmls = "=0.5.0"
thiserror = "=1.0.61"
anyhow = "=1.0.71"

55
sc_key_store/src/lib.rs Normal file
View File

@@ -0,0 +1,55 @@
pub mod local_ks;
pub mod pks;
use openmls::prelude::*;
/// The DS returns a list of key packages for a user as `UserKeyPackages`.
/// This is a tuple struct holding a vector of `(Vec<u8>, KeyPackage)` tuples,
/// where the first value is the key package hash (output of `KeyPackage::hash`)
/// and the second value is the corresponding key package.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct UserKeyPackages(pub Vec<(Vec<u8>, KeyPackage)>);
/// Information about a user.
#[derive(Debug, Default, Clone)]
pub struct UserInfo {
pub id: Vec<u8>,
pub key_packages: UserKeyPackages,
pub sign_pk: Vec<u8>,
}
pub trait SCKeyStoreService {
fn connect() -> Self;
fn does_user_exist(&self, id: &[u8]) -> bool;
fn add_user(&mut self, ukp: UserKeyPackages, sign_pk: &[u8]) -> Result<(), KeyStoreError>;
fn get_user(&self, id: &[u8]) -> Result<UserInfo, KeyStoreError>;
fn add_user_kp(&mut self, id: &[u8], ukp: UserKeyPackages) -> Result<(), KeyStoreError>;
// we need get key package of other user for inviting them to group
fn get_avaliable_user_kp(&mut self, id: &[u8]) -> Result<KeyPackage, KeyStoreError>;
}
pub trait LocalKeyStoreService {
fn empty_key_store(id: &[u8]) -> Self;
fn load_to_smart_contract<T: SCKeyStoreService>(&self, sc: &mut T)
-> Result<(), KeyStoreError>;
fn get_update_from_smart_contract<T: SCKeyStoreService>(
&mut self,
sc: T,
) -> Result<(), KeyStoreError>;
fn get_avaliable_kp(&mut self) -> Result<KeyPackage, KeyStoreError>;
}
#[derive(Debug, thiserror::Error)]
pub enum KeyStoreError {
#[error("User doesn't exist")]
UnknownUserError,
#[error("Invalid data for User: {0}")]
InvalidUserDataError(String),
#[error("Unauthorized User")]
UnauthorizedUserError,
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}

View File

@@ -0,0 +1,49 @@
use std::collections::HashSet;
use openmls::prelude::KeyPackage;
use crate::{KeyStoreError, LocalKeyStoreService, SCKeyStoreService, UserInfo, UserKeyPackages};
pub struct LocalCache {
user_info: UserInfo,
// map of reserved key_packages [group_id, key_package_hash]
pub reserved_key_pkg_hash: HashSet<Vec<u8>>,
}
impl LocalKeyStoreService for LocalCache {
fn empty_key_store(id: &[u8]) -> Self {
LocalCache {
user_info: UserInfo {
id: id.to_vec(),
key_packages: UserKeyPackages::default(),
sign_pk: Vec::with_capacity(32),
},
reserved_key_pkg_hash: HashSet::new(),
}
}
fn load_to_smart_contract<T: SCKeyStoreService>(
&self,
sc: &mut T,
) -> Result<(), KeyStoreError> {
sc.add_user_kp(&self.user_info.id, self.user_info.key_packages.clone())
}
fn get_update_from_smart_contract<T: SCKeyStoreService>(
&mut self,
sc: T,
) -> Result<(), KeyStoreError> {
let info = sc.get_user(&self.user_info.id)?;
self.user_info.key_packages.clone_from(&info.key_packages);
Ok(())
}
fn get_avaliable_kp(&mut self) -> Result<KeyPackage, KeyStoreError> {
match self.user_info.key_packages.0.pop() {
Some(c) => Ok(c.1),
None => Err(KeyStoreError::InvalidUserDataError(
"No more keypackage available".to_string(),
)),
}
}
}

90
sc_key_store/src/pks.rs Normal file
View File

@@ -0,0 +1,90 @@
use std::collections::HashMap;
use openmls::prelude::KeyPackage;
use crate::{KeyStoreError, SCKeyStoreService, UserInfo, UserKeyPackages};
/// Public Key Storage
/// This is a tuple struct holding a vector of `(Vec<u8>, UserInfo)` tuples,
/// where the first value is the Ethereum wallet address of a user
/// and the second value is the corresponding user information.
#[derive(Debug, Default)]
pub struct PublicKeyStorage {
storage: HashMap<Vec<u8>, UserInfo>,
}
impl From<UserKeyPackages> for UserInfo {
fn from(mut key_packages: UserKeyPackages) -> Self {
let key_package: KeyPackage = key_packages.0[0].1.clone();
let id = key_package.leaf_node().credential().identity();
let drain = key_packages.0.drain(..);
Self {
id: id.into(),
key_packages: UserKeyPackages(drain.collect::<Vec<(Vec<u8>, KeyPackage)>>()),
sign_pk: vec![0; 32],
}
}
}
impl SCKeyStoreService for &mut PublicKeyStorage {
fn connect() -> Self {
todo!()
}
fn does_user_exist(&self, id: &[u8]) -> bool {
self.storage.contains_key(id)
}
fn add_user(&mut self, ukp: UserKeyPackages, sign_pk: &[u8]) -> Result<(), KeyStoreError> {
if ukp.0.is_empty() {
return Err(KeyStoreError::InvalidUserDataError(
"no key packages".to_string(),
));
}
let mut new_user_info: UserInfo = ukp.into();
new_user_info.sign_pk.clone_from_slice(sign_pk);
if self.storage.contains_key(&new_user_info.id) {
return Err(KeyStoreError::InvalidUserDataError(
"already register".to_string(),
));
}
let res = self.storage.insert(new_user_info.id.clone(), new_user_info);
assert!(res.is_none());
Ok(())
}
fn add_user_kp(&mut self, id: &[u8], ukp: UserKeyPackages) -> Result<(), KeyStoreError> {
let user = match self.storage.get_mut(id) {
Some(u) => u,
None => return Err(KeyStoreError::UnknownUserError),
};
ukp.0
.into_iter()
.for_each(|value| user.key_packages.0.push(value));
Ok(())
}
fn get_user(&self, id: &[u8]) -> Result<UserInfo, KeyStoreError> {
match self.storage.get(id) {
Some(u) => Ok(u.to_owned()),
None => Err(KeyStoreError::UnknownUserError),
}
}
fn get_avaliable_user_kp(&mut self, id: &[u8]) -> Result<KeyPackage, KeyStoreError> {
let user = match self.storage.get_mut(id) {
Some(u) => u,
None => return Err(KeyStoreError::UnknownUserError),
};
match user.key_packages.0.pop() {
Some(c) => Ok(c.1),
None => Err(KeyStoreError::InvalidUserDataError(
"No more keypackage available".to_string(),
)),
}
}
}

35
src/conversation.rs Normal file
View File

@@ -0,0 +1,35 @@
#[derive(Default, Debug)]
pub struct Conversation {
messages: Vec<ConversationMessage>,
}
#[derive(Clone, Debug)]
pub struct ConversationMessage {
pub author: String,
pub message: String,
}
impl Conversation {
/// Add a message string to the conversation list.
pub fn add(&mut self, conversation_message: ConversationMessage) {
self.messages.push(conversation_message)
}
/// Get a list of messages in the conversation.
/// The function returns the `last_n` messages.
pub fn get(&self, last_n: usize) -> Option<&[ConversationMessage]> {
let num_messages = self.messages.len();
let start = if last_n > num_messages {
0
} else {
num_messages - last_n
};
self.messages.get(start..num_messages)
}
}
impl ConversationMessage {
pub fn new(message: String, author: String) -> Self {
Self { author, message }
}
}

96
src/identity.rs Normal file
View File

@@ -0,0 +1,96 @@
use std::collections::HashMap;
use openmls::credentials::CredentialWithKey;
use openmls::key_packages::*;
use openmls::prelude::*;
use openmls_basic_credential::SignatureKeyPair;
use openmls_rust_crypto::MemoryKeyStoreError;
use openmls_traits::types::Ciphersuite;
use crate::openmls_provider::CryptoProvider;
pub struct Identity {
pub(crate) kp: HashMap<Vec<u8>, KeyPackage>,
pub(crate) credential_with_key: CredentialWithKey,
pub(crate) signer: SignatureKeyPair,
}
impl Identity {
pub(crate) fn new(
ciphersuite: Ciphersuite,
crypto: &CryptoProvider,
username: &[u8],
) -> Result<Identity, IdentityError> {
let credential = Credential::new(username.to_vec(), CredentialType::Basic)?;
let signature_keys = SignatureKeyPair::new(ciphersuite.signature_algorithm())?;
let credential_with_key = CredentialWithKey {
credential,
signature_key: signature_keys.to_public_vec().into(),
};
signature_keys.store(crypto.key_store())?;
let key_package = KeyPackage::builder().build(
CryptoConfig {
ciphersuite,
version: ProtocolVersion::default(),
},
crypto,
&signature_keys,
credential_with_key.clone(),
)?;
let kp = key_package.hash_ref(crypto.crypto())?;
Ok(Identity {
kp: HashMap::from([(kp.as_slice().to_vec(), key_package)]),
credential_with_key,
signer: signature_keys,
})
}
/// Create an additional key package using the credential_with_key/signer bound to this identity
pub fn add_key_package(
&mut self,
ciphersuite: Ciphersuite,
crypto: &CryptoProvider,
) -> Result<KeyPackage, IdentityError> {
let key_package = KeyPackage::builder().build(
CryptoConfig::with_default_version(ciphersuite),
crypto,
&self.signer,
self.credential_with_key.clone(),
)?;
let kp = key_package.hash_ref(crypto.crypto())?;
self.kp.insert(kp.as_slice().to_vec(), key_package.clone());
Ok(key_package)
}
/// Get the plain identity as byte vector.
pub fn identity(&self) -> Vec<u8> {
self.credential_with_key.credential.identity().to_vec()
}
}
impl ToString for Identity {
fn to_string(&self) -> String {
std::str::from_utf8(self.credential_with_key.credential.identity())
.unwrap()
.to_string()
}
}
#[derive(Debug, thiserror::Error)]
pub enum IdentityError {
#[error("Something wrong while creating new key package: {0}")]
MlsKeyPackageNewError(#[from] KeyPackageNewError<MemoryKeyStoreError>),
#[error(transparent)]
MlsLibraryError(#[from] LibraryError),
#[error("Something wrong with signature: {0}")]
MlsCryptoError(#[from] CryptoError),
#[error("Can't save signature key")]
MlsKeyStoreError(#[from] MemoryKeyStoreError),
#[error("Something wrong with credential: {0}")]
MlsCredentialError(#[from] CredentialError),
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}

173
src/main.rs Normal file
View File

@@ -0,0 +1,173 @@
mod conversation;
mod identity;
mod openmls_provider;
mod user;
use std::{rc::Rc, str::FromStr};
use bus::Bus;
use openmls::framing::{MlsMessageIn, MlsMessageInBody};
use sc_key_store::pks::PublicKeyStorage;
use user::User;
fn main() {
let mut pks = PublicKeyStorage::default();
// This channel for message before adding to group.
// Message are still encrypted, but this channel not attached to any group
let mut m: Bus<MlsMessageIn> = Bus::new(10);
let mut a_r = m.add_rx();
let mut b_r = m.add_rx();
//// Create user Alice
println!("Start Register Alice");
let res = User::new("Alice".as_bytes());
assert!(res.is_ok());
let mut a_user = res.unwrap();
let res = a_user.register(&mut pks);
assert!(res.is_ok());
println!("Register Alice successfully");
//////
//// Create user Bob
println!("Start Register Bob");
let res = User::new("Bob".as_bytes());
assert!(res.is_ok());
let mut b_user = res.unwrap();
let res = b_user.register(&mut pks);
assert!(res.is_ok());
println!("Register Bob successfully");
//////
//// Alice create group: Alice_Group
println!("Start create group");
let group_name = String::from_str("Alice_Group").unwrap();
let res = a_user.create_group(group_name.clone());
assert!(res.is_ok());
assert!(a_user.groups.borrow().contains_key("Alice_Group"));
println!("Create group successfully");
//////
//// Alice invite Bob
println!("Alice inviting Bob");
let welcome = a_user.invite(b_user.username(), group_name.clone(), &mut pks);
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_err());
//// Send welcome message to system broadcast. Only Bob can use it
m.broadcast(welcome.unwrap());
let _ = a_r.recv();
let welc = b_r.recv();
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = b_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
assert!(res.is_ok());
assert!(b_user.groups.borrow().contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
};
println!("Bob successfully join to the group");
/////
//// Bob send message and Alice recieve it
let res = b_user.send_msg("Hi!", group_name.clone());
assert!(res.is_ok());
// Bob also get the message but he cant decrypt it (regarding the mls rfc)
let res = b_user.recieve_msg(group_name.clone(), &pks);
// Expected error with invalid decryption
assert!(res.is_err());
let res = a_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_ok());
/////
//// Alice send message and Bob recieve it
let res = a_user.send_msg("Hi Bob!", group_name.clone());
assert!(res.is_ok());
let res = a_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_ok());
/////
let msg = a_user.read_msgs(group_name.clone());
println!("Alice recieve_msgs: {:#?}", msg);
let msg = b_user.read_msgs(group_name.clone());
println!("Bob recieve_msgs: {:#?}", msg);
let mut c_r = m.add_rx();
//// Create user Alice
println!("Start Register Carla");
let res = User::new("Carla".as_bytes());
assert!(res.is_ok());
let mut c_user = res.unwrap();
let res = c_user.register(&mut pks);
assert!(res.is_ok());
println!("Register Carla successfully");
//////
//// Alice invite Carla
println!("Alice inviting Carla");
let welcome = a_user.invite(c_user.username(), group_name.clone(), &mut pks);
assert!(welcome.is_ok());
// Alice should skip message with invite update because she already update her instance
// It is failed because of wrong epoch
let res = a_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_err());
let res = b_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_ok());
//// Send welcome message to system broadcast. Only Bob can use it
m.broadcast(welcome.unwrap());
let _ = a_r.recv();
let _ = b_r.recv();
let welc = c_r.recv();
assert!(welc.is_ok());
let _ = match welc.unwrap().extract() {
MlsMessageInBody::Welcome(welcome) => {
let res = c_user.join_group(
welcome,
// same ds_node, need to think how to process this
Rc::clone(&a_user.groups.borrow().get("Alice_Group").unwrap().ds_node),
);
assert!(res.is_ok());
assert!(c_user.groups.borrow().contains_key("Alice_Group"));
Ok(())
}
_ => Err("do nothing".to_string()),
};
println!("Carla successfully join to the group");
/////
//// Carla send message and Alice and Bob recieve it
let res = c_user.send_msg("Hi all!", group_name.clone());
assert!(res.is_ok());
let res = c_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_err());
let res = a_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_ok());
let res = b_user.recieve_msg(group_name.clone(), &pks);
assert!(res.is_ok());
////
let msg = a_user.read_msgs(group_name.clone());
println!("Alice recieve_msgs: {:#?}", msg);
let msg = b_user.read_msgs(group_name.clone());
println!("Bob recieve_msgs: {:#?}", msg);
let msg = c_user.read_msgs(group_name.clone());
println!("Carla recieve_msgs: {:#?}", msg);
}

30
src/openmls_provider.rs Normal file
View File

@@ -0,0 +1,30 @@
use openmls::prelude::*;
use openmls_rust_crypto::MemoryKeyStore;
use openmls_rust_crypto::RustCrypto;
use openmls_traits::OpenMlsCryptoProvider;
pub const CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
#[derive(Default)]
pub struct CryptoProvider {
crypto: RustCrypto,
key_storage: MemoryKeyStore,
}
impl OpenMlsCryptoProvider for CryptoProvider {
type CryptoProvider = RustCrypto;
type RandProvider = RustCrypto;
type KeyStoreProvider = MemoryKeyStore;
fn crypto(&self) -> &Self::CryptoProvider {
&self.crypto
}
fn rand(&self) -> &Self::RandProvider {
&self.crypto
}
fn key_store(&self) -> &Self::KeyStoreProvider {
&self.key_storage
}
}

411
src/user.rs Normal file
View File

@@ -0,0 +1,411 @@
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::{cell::RefCell, collections::HashMap, rc::Rc, str};
use ds::ds::*;
use openmls::{group::*, prelude::*};
use openmls_rust_crypto::MemoryKeyStoreError;
use sc_key_store::*;
use sc_key_store::{local_ks::LocalCache, pks::PublicKeyStorage};
// use waku_bindings::*;
use crate::conversation::*;
use crate::identity::{Identity, IdentityError};
use crate::openmls_provider::{CryptoProvider, CIPHERSUITE};
#[derive(Debug)]
pub struct Group {
group_name: String,
conversation: Conversation,
mls_group: RefCell<MlsGroup>,
pub ds_node: Rc<RefCell<DSClient>>,
// pubsub_topic: WakuPubSubTopic,
// content_topics: Vec<WakuContentTopic>,
}
pub struct User {
pub(crate) identity: RefCell<Identity>,
pub(crate) groups: RefCell<HashMap<String, Group>>,
provider: CryptoProvider,
local_ks: LocalCache,
// pub(crate) contacts: HashMap<Vec<u8>, WakuPeers>,
}
impl User {
/// Create a new user with the given name and a fresh set of credentials.
pub fn new(username: &[u8]) -> Result<User, UserError> {
let crypto = CryptoProvider::default();
let id = Identity::new(CIPHERSUITE, &crypto, username)?;
Ok(User {
groups: RefCell::new(HashMap::new()),
identity: RefCell::new(id),
provider: crypto,
local_ks: LocalCache::empty_key_store(username),
// contacts: HashMap::new(),
})
}
pub(crate) fn username(&self) -> String {
self.identity.borrow().to_string()
}
pub(crate) fn id(&self) -> Vec<u8> {
self.identity.borrow().identity()
}
fn is_signature_eq(&self, sign: &Vec<u8>) -> bool {
self.identity
.borrow()
.credential_with_key
.signature_key
.as_slice()
== sign
}
pub fn create_group(&mut self, group_name: String) -> Result<(), UserError> {
let group_id = group_name.as_bytes();
if self.groups.borrow().contains_key(&group_name) {
return Err(UserError::UnknownGroupError(group_name));
}
let group_config = MlsGroupConfig::builder()
.use_ratchet_tree_extension(true)
.build();
let mls_group = MlsGroup::new_with_group_id(
&self.provider,
&self.identity.borrow().signer,
&group_config,
GroupId::from_slice(group_id),
self.identity.borrow().credential_with_key.clone(),
)?;
let ds = DSClient::new_with_subscriber(self.id())?;
let group = Group {
group_name: group_name.clone(),
conversation: Conversation::default(),
mls_group: RefCell::new(mls_group),
ds_node: Rc::new(RefCell::new(ds)),
// pubsub_topic: WakuPubSubTopic::new(),
// content_topics: Vec::new(),
};
self.groups.borrow_mut().insert(group_name, group);
Ok(())
}
pub fn register(&mut self, mut pks: &mut PublicKeyStorage) -> Result<(), UserError> {
pks.add_user(self.key_packages(), self.identity.borrow().signer.public())?;
self.local_ks.get_update_from_smart_contract(pks)?;
Ok(())
}
/// Get the key packages fo this user.
pub fn key_packages(&self) -> UserKeyPackages {
let mut kpgs = self.identity.borrow().kp.clone();
UserKeyPackages(kpgs.drain().collect::<Vec<(Vec<u8>, KeyPackage)>>())
}
pub fn invite(
&mut self,
username: String,
group_name: String,
mut pks: &mut PublicKeyStorage,
) -> Result<MlsMessageIn, UserError> {
// First we need to get the key package for {id} from the DS.
if !pks.does_user_exist(username.as_bytes()) {
return Err(UserError::UnknownUserError);
}
// Reclaim a key package from the server
let joiner_key_package = pks.get_avaliable_user_kp(username.as_bytes())?;
// Build a proposal with this key package and do the MLS bits.
let mut groups = self.groups.borrow_mut();
let group = match groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let (out_messages, welcome, _group_info) = group.mls_group.borrow_mut().add_members(
&self.provider,
&self.identity.borrow().signer,
&[joiner_key_package],
)?;
let msg = out_messages.into();
group.ds_node.as_ref().borrow_mut().msg_send(msg)?;
// Second, process the invitation on our end.
group
.mls_group
.borrow_mut()
.merge_pending_commit(&self.provider)?;
// Put sending welcome by p2p here
// group.ds_node.as_ref().borrow_mut().msg_send(welcome.into())?;
drop(groups);
Ok(welcome.into())
}
pub fn recieve_msg(&self, group_name: String, pks: &PublicKeyStorage) -> Result<(), UserError> {
let msg = {
let groups = self.groups.borrow();
let group = match groups.get(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let msg = group
.ds_node
.as_ref()
.borrow_mut()
.msg_recv(self.id().as_ref(), pks)?;
msg
};
match msg.extract() {
MlsMessageInBody::Welcome(_welcome) => {
// Now irrelevant because message are attached to group
// self.join_group(welcome, Rc::clone(&group.ds_node))?;
}
MlsMessageInBody::PrivateMessage(message) => {
self.process_protocol_msg(message.into())?;
}
MlsMessageInBody::PublicMessage(message) => {
self.process_protocol_msg(message.into())?;
}
_ => return Err(UserError::MessageTypeError),
}
Ok(())
}
fn process_protocol_msg(&self, message: ProtocolMessage) -> Result<(), UserError> {
let mut groups = self.groups.borrow_mut();
let group_name = str::from_utf8(message.group_id().as_slice())?;
let group = match groups.get_mut(group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name.to_string())),
};
let mut mls_group = group.mls_group.borrow_mut();
let processed_message = mls_group.process_message(&self.provider, message)?;
let processed_message_credential: Credential = processed_message.credential().clone();
match processed_message.into_content() {
ProcessedMessageContent::ApplicationMessage(application_message) => {
let sender_name = {
let user_id = mls_group.members().find_map(|m| {
if m.credential.identity()
== processed_message_credential.identity()
&& (self
.identity
.borrow()
.credential_with_key
.signature_key
.as_slice()
!= m.signature_key.as_slice())
{
println!("process ApplicationMessage: read sender name from credential identity for group {} ", group.group_name);
Some(
str::from_utf8(m.credential.identity()).unwrap().to_owned(),
)
} else {
None
}
});
user_id.unwrap_or("".to_owned()).as_bytes().to_vec()
};
let conversation_message = ConversationMessage::new(
String::from_utf8(application_message.into_bytes())?,
String::from_utf8(sender_name)?,
);
group.conversation.add(conversation_message);
}
ProcessedMessageContent::ProposalMessage(_proposal_ptr) => (),
ProcessedMessageContent::ExternalJoinProposalMessage(_external_proposal_ptr) => (),
ProcessedMessageContent::StagedCommitMessage(commit_ptr) => {
let mut remove_proposal: bool = false;
if commit_ptr.self_removed() {
remove_proposal = true;
}
mls_group.merge_staged_commit(&self.provider, *commit_ptr)?;
if remove_proposal {
println!(
"update::Processing StagedCommitMessage removing {} from group {} ",
self.username(),
group.group_name
);
return Ok(());
}
}
};
Ok(())
}
pub fn send_msg(&mut self, msg: &str, group_name: String) -> Result<(), UserError> {
let groups = self.groups.borrow();
let group = match groups.get(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
let message_out = group.mls_group.borrow_mut().create_message(
&self.provider,
&self.identity.borrow().signer,
msg.as_bytes(),
)?;
group
.ds_node
.as_ref()
.borrow_mut()
.msg_send(message_out.into())?;
Ok(())
}
pub fn join_group(&self, welcome: Welcome, ds: Rc<RefCell<DSClient>>) -> Result<(), UserError> {
let group_config = MlsGroupConfig::builder()
.use_ratchet_tree_extension(true)
.build();
let mls_group = MlsGroup::new_from_welcome(&self.provider, &group_config, welcome, None)?;
let group_id = mls_group.group_id().to_vec();
let group_name = String::from_utf8(group_id)?;
ds.borrow_mut()
.add_subscriber(self.identity.borrow().identity())?;
let group = Group {
group_name: group_name.clone(),
conversation: Conversation::default(),
mls_group: RefCell::new(mls_group),
ds_node: ds,
};
match self.groups.borrow_mut().insert(group_name, group) {
Some(old) => Err(UserError::AlreadyExistedGroupError(old.group_name)),
None => Ok(()),
}
}
/// Get a member
fn find_member_index(&self, name: String, group: &Group) -> Result<LeafNodeIndex, UserError> {
let mls_group = group.mls_group.borrow();
let member = mls_group
.members()
.find(|m| m.credential.identity().eq(name.as_bytes()));
match member {
Some(m) => Ok(m.index),
None => Err(UserError::UnknownGroupMemberError(name)),
}
}
fn group_members(&self, group: &Group) -> Result<Vec<Vec<u8>>, UserError> {
let mls_group = group.mls_group.borrow();
let members: Vec<Vec<u8>> = mls_group
.members()
.filter(|m| self.is_signature_eq(&m.signature_key))
.map(|m| m.credential.identity().to_vec())
.collect();
Ok(members)
}
pub fn remove(&mut self, name: String, group_name: String) -> Result<(), UserError> {
// Get the group ID
let mut groups = self.groups.borrow_mut();
let group = match groups.get_mut(&group_name) {
Some(g) => g,
None => return Err(UserError::UnknownGroupError(group_name)),
};
// Get the user leaf index
let leaf_index = self.find_member_index(name, group)?;
// Remove operation on the mls group
let (remove_message, _welcome, _group_info) = group.mls_group.borrow_mut().remove_members(
&self.provider,
&self.identity.borrow().signer,
&[leaf_index],
)?;
let msg = remove_message.into();
group.ds_node.as_ref().borrow_mut().msg_send(msg)?;
// Second, process the removal on our end.
group
.mls_group
.borrow_mut()
.merge_pending_commit(&self.provider)?;
drop(groups);
Ok(())
}
/// Return the last 100 messages sent to the group.
pub fn read_msgs(
&self,
group_name: String,
) -> Result<Option<Vec<ConversationMessage>>, UserError> {
let groups = self.groups.borrow();
groups.get(&group_name).map_or_else(
|| Err(UserError::UnknownGroupError(group_name)),
|g| {
Ok(g.conversation
.get(100)
.map(|messages: &[crate::conversation::ConversationMessage]| messages.to_vec()))
},
)
}
}
#[derive(Debug, thiserror::Error)]
pub enum UserError {
#[error("Unknown group: {0}")]
UnknownGroupError(String),
#[error("Group already exist: {0}")]
AlreadyExistedGroupError(String),
#[error("Unknown group member : {0}")]
UnknownGroupMemberError(String),
#[error("Unsupported message type")]
MessageTypeError,
#[error("Unknown user")]
UnknownUserError,
#[error("Delivery Service error: {0}")]
DeliveryServiceError(#[from] DeliveryServiceError),
#[error("Key Store error: {0}")]
KeyStoreError(#[from] KeyStoreError),
#[error("Identity error: {0}")]
IdentityError(#[from] IdentityError),
#[error("Something wrong while creating Mls group: {0}")]
MlsGroupCreationError(#[from] NewGroupError<MemoryKeyStoreError>),
#[error("Something wrong while adding member to Mls group: {0}")]
MlsAddMemberError(#[from] AddMembersError<MemoryKeyStoreError>),
#[error("Something wrong while merging pending commit: {0}")]
MlsMergePendingCommitError(#[from] MergePendingCommitError<MemoryKeyStoreError>),
#[error("Something wrong while merging commit: {0}")]
MlsMergeCommitError(#[from] MergeCommitError<MemoryKeyStoreError>),
#[error("Error processing unverified message: {0}")]
MlsProcessMessageError(#[from] ProcessMessageError),
#[error("Something wrong while creating message: {0}")]
MlsCreateMessageError(#[from] CreateMessageError),
#[error("Failed to create staged join: {0}")]
MlsWelcomeError(#[from] WelcomeError<MemoryKeyStoreError>),
#[error("Failed to remove member from group: {0}")]
MlsRemoveMembersError(#[from] RemoveMembersError<MemoryKeyStoreError>),
#[error("Parse String UTF8 error: {0}")]
ParseUTF8Error(#[from] FromUtf8Error),
#[error("Parse str UTF8 error: {0}")]
ParseStrUTF8Error(#[from] Utf8Error),
#[error("Unknown error: {0}")]
Other(anyhow::Error),
}