3 Commits

Author SHA1 Message Date
seemenkina
b386452779 Refactor benchmarks to improve performance measurement and simplify code
- Simplified benchmark functions by removing parallel processing and complex setup
- Updated benchmark functions to use custom iteration with precise timing
- Removed unused benchmark variants for big groups
- Added a new benchmark for generating invite messages
- Simplified signer and user creation in setup functions
- Improved error handling and result checking in benchmark iterations
2025-01-28 17:28:40 +07:00
seemenkina
e9544da577 Update benchmarks and dependencies in Cargo.toml
- Added `rayon` and `itertools` dependencies to `Cargo.toml` for improved parallel processing and data manipulation.
- Introduced a new benchmark `add_users_benchmark` in `benches/group_flow_benchmark.rs` to measure performance of adding users to a group.
- Removed the previous `add_user_to_group_benchmark` function to streamline benchmarking code.
- Enhanced the `User` struct in `src/user.rs` with a new method `new_key_package` for generating key packages, improving user functionality.
2025-01-23 17:17:57 +07:00
Ekaterina Broslavskaya
d93ac900ae Add benchmarks for create group and add users action (#31)
* Update dependencies, add benchmarking, and enhance Waku integration

- Updated `tokio` version from `1.38.0` to `1.43.0` in `Cargo.toml` files for improved performance and compatibility.
- Introduced a new benchmarking file `benches/group_flow_benchmark.rs` to measure performance of group-related operations.
- Added a new CI job for running benchmarks in the GitHub Actions workflow.
- Enhanced Waku event handling in `ds_waku.rs` to utilize `BoundedVecDeque` for managing seen messages, improving memory efficiency.
- Refactored message handling in `main.rs` to streamline Waku event processing and user actions.
- Added `action_handlers.rs` to separate user action handling logic, improving code organization.
- Introduced `user_app_instance.rs` for creating user instances with group management capabilities.
- Made `invite_users` method public in `user.rs` to allow external access.
- Updated WebSocket connection message structure to include serialization support.

* Fix CI for running benchmark

* Update CI workflow to run benchmarks on pull requests and enhance caching

* Add required filed `branch name`

* Update CI workflow to include Go setup and increase timeout for benchmark jobs

* Update Cargo.toml to disable benchmarking for the de-mls binary

* Update Cargo.toml to add library section and disable benchmarking for the de-mls library

* Remove benchmarking configuration for the de-mls binary in Cargo.toml

* Update Cargo.toml to change criterion dependency version and enable HTML reports

* Disable benchmarking for the de-mls binary in Cargo.toml to streamline build configuration.
2025-01-20 10:40:49 +07:00
13 changed files with 571 additions and 181 deletions

View File

@@ -38,6 +38,22 @@ jobs:
cargo test --release
working-directory: tests
benches:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20.x'
- name: Checkout sources
uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v2
- uses: boa-dev/criterion-compare-action@v3
with:
branchName: ${{ github.base_ref }}
lint:
runs-on: ubuntu-latest
timeout-minutes: 60

View File

@@ -11,6 +11,10 @@ edition = "2021"
[[bin]]
name = "de-mls"
path = "src/main.rs"
bench = false
[lib]
bench = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -24,11 +28,7 @@ openmls_traits = "=0.2.0"
axum = { version = "0.6.10", features = ["ws"] }
futures = "0.3.26"
tower-http = { version = "0.4.0", features = ["cors"] }
tokio = { version = "=1.38.0", features = [
"macros",
"rt-multi-thread",
"full",
] }
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "full"] }
tokio-util = "0.7.13"
alloy = { git = "https://github.com/alloy-rs/alloy", features = [
"providers",
@@ -40,6 +40,10 @@ alloy = { git = "https://github.com/alloy-rs/alloy", features = [
] }
kameo = "0.13.0"
criterion = { version = "=0.4.0", features = ["html_reports"] }
rayon = "1.10.0"
itertools = "0.14.0"
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
@@ -68,3 +72,11 @@ log = "0.4.22"
ds = { path = "ds" }
mls_crypto = { path = "mls_crypto" }
[[bench]]
name = "group_flow_benchmark"
harness = false
[[bench]]
name = "add_users_benchmark"
harness = false

View File

@@ -0,0 +1,200 @@
use alloy::signers::local::PrivateKeySigner;
use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use openmls::prelude::KeyPackage;
use rand::Rng;
use tokio;
use de_mls::user::User;
async fn setup_group(n: usize) -> (User, Vec<User>, String) {
let group_name = "group".to_string() + &rand::thread_rng().gen::<u64>().to_string();
let signer = PrivateKeySigner::random();
let mut alice = User::new(&signer.to_bytes().to_string()).unwrap();
alice
.create_group(group_name.clone(), true)
.await
.expect("Failed to create group");
let mut users = Vec::with_capacity(n);
let mut key_packages = Vec::with_capacity(n);
for _ in 0..n {
let mut user = User::new(&signer.to_bytes().to_string()).unwrap();
let key_package = user.new_key_package().unwrap();
user.create_group(group_name.clone(), false)
.await
.expect("Failed to create group");
users.push(user);
key_packages.push(key_package);
}
// Get commit and welcome messages
let msgs = alice
.invite_users(key_packages.clone(), group_name.clone())
.await
.expect("Failed to invite users");
let res = msgs[1].build_waku_message();
assert!(res.is_ok(), "Failed to build waku message");
let waku_welcome_message = res.unwrap();
futures::future::join_all(
users
.iter_mut()
.map(|user| user.process_waku_msg(waku_welcome_message.clone())),
)
.await
.into_iter()
.all(|result| result.is_ok());
(alice, users, group_name)
}
async fn generate_users_chunk(n: usize, group_name: String) -> (Vec<User>, Vec<KeyPackage>) {
let mut users = Vec::with_capacity(n);
let mut key_packages = Vec::with_capacity(n);
for _ in 0..n {
let signer = PrivateKeySigner::random();
let mut user = User::new(&signer.to_bytes().to_string()).unwrap();
user.create_group(group_name.clone(), false)
.await
.expect("Failed to create group");
key_packages.push(user.new_key_package().unwrap());
users.push(user);
}
(users, key_packages)
}
fn multi_user_apply_commit_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
for (i, j) in [1, 10, 100]
.into_iter()
.cartesian_product([1, 10, 100, 1000, 5000, 10000])
{
let (mut alice, mut users, group_name) = rt.block_on(async { setup_group(i).await });
c.bench_function(
format!("multi_user_apply_commit_benchmark_{}_{}", i, j).as_str(),
|b| {
b.iter_custom(|iters| {
rt.block_on(async {
let mut total_duration = std::time::Duration::ZERO;
for _ in 0..iters {
// Setup phase: generate data
let (_, kp_to_join) = generate_users_chunk(j, group_name.clone()).await;
let start = std::time::Instant::now();
let msgs = alice
.invite_users(kp_to_join, group_name.clone())
.await
.expect("Failed to invite users");
let commit_msg = msgs[0].build_waku_message().unwrap();
futures::future::join_all(
users
.iter_mut()
.map(|user| user.process_waku_msg(commit_msg.clone())),
)
.await
.into_iter()
.all(|result| result.is_ok());
total_duration += start.elapsed();
}
total_duration
})
})
},
);
}
}
fn multi_user_welcome_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
for (i, j) in [1, 10, 100]
.into_iter()
.cartesian_product([1, 10, 100, 1000, 5000, 10000])
{
let (mut alice, _, group_name) = rt.block_on(async { setup_group(i).await });
c.bench_function(
format!("multi_user_welcome_benchmark_{}_{}", i, j).as_str(),
|b| {
b.iter_custom(|iters| {
rt.block_on(async {
let mut total_duration = std::time::Duration::ZERO;
for _ in 0..iters {
// Setup phase: generate data
let (mut users_to_join, kp_to_join) =
generate_users_chunk(j, group_name.clone()).await;
let start = std::time::Instant::now();
let msgs = alice
.invite_users(kp_to_join, group_name.clone())
.await
.expect("Failed to invite users");
let welcome_msg = msgs[1].build_waku_message().unwrap();
futures::future::join_all(
users_to_join
.iter_mut()
.map(|user| user.process_waku_msg(welcome_msg.clone())),
)
.await
.into_iter()
.all(|result| result.is_ok());
total_duration += start.elapsed();
}
total_duration
})
})
},
);
}
}
fn multi_user_generate_invite_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
for (i, j) in [1, 10, 100, 1000]
.into_iter()
.cartesian_product([1, 10, 100, 1000, 5000, 10000])
{
let (mut alice, _, group_name) = rt.block_on(async { setup_group(i).await });
c.bench_function(
format!("multi_user_generate_invite_benchmark_{}_{}", i, j).as_str(),
|b| {
b.iter_custom(|iters| {
rt.block_on(async {
let mut total_duration = std::time::Duration::ZERO;
for _ in 0..iters {
// Setup phase: generate data
let (_, kp_to_join) = generate_users_chunk(j, group_name.clone()).await;
let start = std::time::Instant::now();
let _ = alice
.invite_users(kp_to_join, group_name.clone())
.await
.expect("Failed to invite users");
total_duration += start.elapsed();
}
total_duration
})
})
},
);
}
}
criterion_group!(
name = benches;
config = Criterion::default().sample_size(10);
targets =
multi_user_apply_commit_benchmark,
multi_user_welcome_benchmark,
multi_user_generate_invite_benchmark
);
criterion_main!(benches);

View File

@@ -0,0 +1,135 @@
use criterion::{criterion_group, criterion_main, Criterion};
use de_mls::user::{ProcessCreateGroup, User, UserAction};
use rand::Rng;
use tokio::runtime::Runtime;
/// Benchmark for creating user with group - that means it creates mls group instance
fn create_user_with_group_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
c.bench_function("create_user_with_group", |b| {
b.iter(|| {
rt.block_on(async {
let user =
User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
.unwrap();
let user_ref = kameo::spawn(user);
user_ref
.ask(ProcessCreateGroup {
group_name: "group".to_string(),
is_creation: true,
})
.await
.expect("Failed to create group");
});
});
});
}
fn create_group_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let user_ref = rt.block_on(async {
let user = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
.unwrap();
kameo::spawn(user)
});
c.bench_function("create_group", |b| {
b.iter(|| {
rt.block_on(async {
let group_name = "group".to_string() + &rand::thread_rng().gen::<u64>().to_string();
user_ref
.ask(ProcessCreateGroup {
group_name,
is_creation: true,
})
.await
.expect("Failed to create group");
});
});
});
}
/// Benchmark for creating user
fn create_user_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
c.bench_function("create_user", |b| {
b.iter(|| {
rt.block_on(async {
User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
.unwrap();
});
});
});
}
fn share_kp_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut alice = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
.expect("Failed to create user");
let mut bob = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
.expect("Failed to create user");
c.bench_function("share_kp_benchmark", |b| {
b.iter(|| {
rt.block_on(async {
let group_name = "group".to_string() + &rand::thread_rng().gen::<u64>().to_string();
alice
.create_group(group_name.clone(), true)
.await
.expect("Failed to create group");
bob.create_group(group_name.clone(), false)
.await
.expect("Failed to create group");
let alice_ga_msg = alice
.prepare_admin_msg(group_name.clone())
.await
.expect("Failed to prepare admin message");
let waku_ga_message = alice_ga_msg
.build_waku_message()
.expect("Failed to build waku message");
// Bob receives the Group Announcement msg and send Key Package Share msg to Alice
let bob_res = bob
.process_waku_msg(waku_ga_message)
.await
.expect("Failed to process waku message");
let bob_kp_message = match bob_res[0].clone() {
UserAction::SendToWaku(msg) => msg,
_ => panic!("User action is not SendToWaku"),
};
let waku_kp_message = bob_kp_message
.build_waku_message()
.expect("Failed to build waku message");
// Alice receives the Key Package Share msg and send Welcome msg to Bob
let user_action_invite = alice
.process_waku_msg(waku_kp_message)
.await
.expect("Failed to process waku message");
let alice_welcome_message = match user_action_invite[1].clone() {
UserAction::SendToWaku(msg) => msg,
_ => panic!("User action is not SendToWaku"),
};
let waku_welcome_message = alice_welcome_message
.build_waku_message()
.expect("Failed to build waku message");
// Bob receives the Welcome msg and join the group
bob.process_waku_msg(waku_welcome_message)
.await
.expect("Failed to process waku message");
});
});
});
}
criterion_group!(
benches,
create_group_benchmark,
create_user_with_group_benchmark,
share_kp_benchmark,
create_user_benchmark,
);
criterion_main!(benches);

View File

@@ -9,8 +9,9 @@ edition = "2021"
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
tokio = { version = "=1.38.0", features = ["full"] }
tokio = { version = "1.43.0", features = ["full"] }
kameo = "=0.13.0"
bounded-vec-deque = "0.1.1"
chrono = "=0.4.38"
uuid = { version = "=1.11.0", features = [

View File

@@ -1,4 +1,6 @@
use bounded_vec_deque::BoundedVecDeque;
use core::result::Result;
use log::{error, info};
use std::{
borrow::Cow,
str::FromStr,
@@ -6,6 +8,7 @@ use std::{
thread,
time::Duration,
};
use tokio::sync::mpsc::Sender;
use waku_bindings::*;
use crate::DeliveryServiceError;
@@ -121,3 +124,43 @@ pub fn match_content_topic(
let locked_topics = content_topics.lock().unwrap();
locked_topics.is_empty() || locked_topics.iter().any(|t| t == topic)
}
pub async fn handle_waku_event(
waku_sender: Sender<WakuMessage>,
content_topics: Arc<SyncMutex<Vec<WakuContentTopic>>>,
) {
info!("Setting up waku event callback");
let mut seen_messages = BoundedVecDeque::<String>::new(40);
waku_set_event_callback(move |signal| {
match signal.event() {
waku_bindings::Event::WakuMessage(event) => {
let msg_id = event.message_id();
if seen_messages.contains(msg_id) {
return;
}
seen_messages.push_back(msg_id.clone());
let content_topic = event.waku_message().content_topic();
// Check if message belongs to a relevant topic
if !match_content_topic(&content_topics, content_topic) {
error!("Content topic not match: {:?}", content_topic);
return;
};
let msg = event.waku_message().clone();
info!("Received message from waku: {:?}", event.message_id());
waku_sender
.blocking_send(msg)
.expect("Failed to send message to waku");
}
waku_bindings::Event::Unrecognized(data) => {
error!("Unrecognized event!\n {data:?}");
}
_ => {
error!(
"Unrecognized signal!\n {:?}",
serde_json::to_string(&signal)
);
}
}
});
}

View File

@@ -50,9 +50,12 @@ impl Message<WakuMessage> for ActorA {
async fn test_waku_client() {
let group_name = "new_group".to_string();
let mut pubsub = PubSub::<WakuMessage>::new();
let (sender_alice, mut receiver_alice) = channel(100);
let (sender_alice, mut receiver_alice) = channel::<WakuMessage>(100);
// TODO: get node from env
let res = setup_node_handle(vec![]);
let res = setup_node_handle(vec![
"/ip4/139.59.24.82/tcp/60000/p2p/16Uiu2HAm2CfeeaNiGwv88Loe417HrRbCwTFqhpDiR3wevbCcvYz2"
.to_string(),
]);
assert!(res.is_ok());
let node = res.unwrap();
let uuid = uuid::Uuid::new_v4().as_bytes().to_vec();

109
src/action_handlers.rs Normal file
View File

@@ -0,0 +1,109 @@
use kameo::actor::ActorRef;
use log::info;
use tokio_util::sync::CancellationToken;
use waku_bindings::WakuMessage;
use crate::{
user::{ProcessLeaveGroup, ProcessRemoveUser, ProcessSendMessage, User, UserAction},
ws_actor::{RawWsMessage, WsAction, WsActor},
MessageToPrint,
};
use ds::waku_actor::{ProcessUnsubscribeFromGroup, WakuActor};
pub async fn handle_user_actions(
msg: WakuMessage,
waku_actor: ActorRef<WakuActor>,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
cancel_token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error>> {
let actions = user_actor.ask(msg).await?;
for action in actions {
match action {
UserAction::SendToWaku(msg) => {
let id = waku_actor.ask(msg).await?;
info!("Successfully publish message with id: {:?}", id);
}
UserAction::SendToGroup(msg) => {
info!("Send to group: {:?}", msg);
ws_actor.ask(msg).await?;
}
UserAction::RemoveGroup(group_name) => {
waku_actor
.ask(ProcessUnsubscribeFromGroup {
group_name: group_name.clone(),
})
.await?;
user_actor
.ask(ProcessLeaveGroup {
group_name: group_name.clone(),
})
.await?;
info!("Leave group: {:?}", &group_name);
ws_actor
.ask(MessageToPrint {
sender: "system".to_string(),
message: format!("Group {} removed you", group_name),
group_name: group_name.clone(),
})
.await?;
cancel_token.cancel();
}
UserAction::DoNothing => {}
}
}
Ok(())
}
pub async fn handle_ws_action(
msg: RawWsMessage,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
waku_actor: ActorRef<WakuActor>,
) -> Result<(), Box<dyn std::error::Error>> {
let action = ws_actor.ask(msg).await?;
match action {
WsAction::Connect(connect) => {
info!("Got unexpected connect: {:?}", &connect);
}
WsAction::UserMessage(msg) => {
info!("Got user message: {:?}", &msg);
let mtp = MessageToPrint {
message: msg.message.clone(),
group_name: msg.group_id.clone(),
sender: "me".to_string(),
};
ws_actor.ask(mtp).await?;
let pmt = user_actor
.ask(ProcessSendMessage {
msg: msg.message,
group_name: msg.group_id,
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
}
WsAction::RemoveUser(user_to_ban, group_name) => {
info!("Got remove user: {:?}", &user_to_ban);
let pmt = user_actor
.ask(ProcessRemoveUser {
user_to_ban: user_to_ban.clone(),
group_name: group_name.clone(),
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
ws_actor
.ask(MessageToPrint {
sender: "system".to_string(),
message: format!("User {} was removed from group", user_to_ban),
group_name: group_name.clone(),
})
.await?;
}
WsAction::DoNothing => {}
}
Ok(())
}

View File

@@ -21,10 +21,11 @@ use ds::{
DeliveryServiceError,
};
pub mod action_handlers;
pub mod group_actor;
pub mod identity;
pub mod main_loop;
pub mod user;
pub mod user_app_instance;
pub mod ws_actor;
pub struct AppState {
@@ -34,6 +35,13 @@ pub struct AppState {
pub pubsub: tokio::sync::broadcast::Sender<WakuMessage>,
}
#[derive(Debug, Clone)]
pub struct Connection {
pub eth_private_key: String,
pub group_id: String,
pub should_create_group: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WelcomeMessageType {
GroupAnnouncement,

View File

@@ -6,9 +6,7 @@ use axum::{
routing::get,
Router,
};
use bounded_vec_deque::BoundedVecDeque;
use futures::StreamExt;
use kameo::actor::ActorRef;
use log::{error, info};
use serde_json::json;
use std::{
@@ -16,20 +14,20 @@ use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::channel;
use tokio_util::sync::CancellationToken;
use tower_http::cors::{Any, CorsLayer};
use waku_bindings::{waku_set_event_callback, WakuMessage};
use waku_bindings::WakuMessage;
use de_mls::{
main_loop::{main_loop, Connection},
user::{ProcessLeaveGroup, ProcessRemoveUser, ProcessSendMessage, User, UserAction},
action_handlers::{handle_user_actions, handle_ws_action},
user_app_instance::create_user_instance,
ws_actor::{RawWsMessage, WsAction, WsActor},
AppState, MessageToPrint,
AppState, Connection,
};
use ds::{
ds_waku::{match_content_topic, setup_node_handle},
waku_actor::{ProcessUnsubscribeFromGroup, WakuActor},
ds_waku::{handle_waku_event, setup_node_handle},
waku_actor::WakuActor,
};
#[tokio::main]
@@ -52,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
let (waku_sender, mut waku_receiver) = channel::<WakuMessage>(100);
handle_waku(waku_sender, app_state.clone()).await;
handle_waku_event(waku_sender, app_state.content_topics.clone()).await;
let recv_messages = tokio::spawn(async move {
info!("Running recv messages from waku");
@@ -73,6 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.layer(cors);
println!("Hosted on {:?}", addr);
let res = axum::Server::bind(&addr).serve(app.into_make_service());
tokio::select! {
Err(x) = res => {
@@ -89,43 +88,6 @@ async fn handler(ws: WebSocketUpgrade, State(state): State<Arc<AppState>>) -> im
ws.on_upgrade(|socket| handle_socket(socket, state))
}
async fn handle_waku(waku_sender: Sender<WakuMessage>, state: Arc<AppState>) {
info!("Setting up waku event callback");
let mut seen_messages = BoundedVecDeque::<String>::new(40);
waku_set_event_callback(move |signal| {
match signal.event() {
waku_bindings::Event::WakuMessage(event) => {
let msg_id = event.message_id();
if seen_messages.contains(msg_id) {
return;
}
seen_messages.push_back(msg_id.clone());
let content_topic = event.waku_message().content_topic();
// Check if message belongs to a relevant topic
if !match_content_topic(&state.content_topics, content_topic) {
error!("Content topic not match: {:?}", content_topic);
return;
};
let msg = event.waku_message().clone();
info!("Received message from waku: {:?}", event.message_id());
waku_sender
.blocking_send(msg)
.expect("Failed to send message to waku");
}
waku_bindings::Event::Unrecognized(data) => {
error!("Unrecognized event!\n {data:?}");
}
_ => {
error!(
"Unrecognized signal!\n {:?}",
serde_json::to_string(&signal)
);
}
}
});
}
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
let (ws_sender, mut ws_receiver) = socket.split();
let ws_actor = kameo::spawn(WsActor::new(ws_sender));
@@ -156,18 +118,19 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
}
}
let user_actor = main_loop(main_loop_connection.unwrap().clone(), state.clone())
let user_actor = create_user_instance(main_loop_connection.unwrap().clone(), state.clone())
.await
.expect("Failed to start main loop");
let user_actor_clone = user_actor.clone();
let state_clone = state.clone();
let ws_actor_clone = ws_actor.clone();
let mut waku_receiver = state.pubsub.subscribe();
let cancel_token_clone = cancel_token.clone();
let mut recv_messages = tokio::spawn(async move {
info!("Running recv messages from waku");
while let Ok(msg) = waku_receiver.recv().await {
let mut user_waku_receiver = state.pubsub.subscribe();
let mut recv_messages_waku = tokio::spawn(async move {
info!("Running recv messages from waku for current user");
while let Ok(msg) = user_waku_receiver.recv().await {
let res = handle_user_actions(
msg,
state_clone.waku_actor.clone(),
@@ -183,11 +146,11 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
});
let user_ref_clone = user_actor.clone();
let mut send_messages = {
let mut recv_messages_ws = {
tokio::spawn(async move {
info!("Running recieve messages from websocket");
while let Some(Ok(Message::Text(text))) = ws_receiver.next().await {
let res = handle_ws_message(
let res = handle_ws_action(
RawWsMessage { message: text },
ws_actor.clone(),
user_ref_clone.clone(),
@@ -201,124 +164,25 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
})
};
info!("Waiting for main loop to finish");
tokio::select! {
_ = (&mut recv_messages) => {
info!("recv_messages finished");
send_messages.abort();
_ = (&mut recv_messages_waku) => {
info!("recv messages from waku finished");
recv_messages_ws.abort();
}
_ = (&mut send_messages) => {
info!("send_messages finished");
send_messages.abort();
_ = (&mut recv_messages_ws) => {
info!("recieve messages from websocket finished");
recv_messages_ws.abort();
}
_ = cancel_token.cancelled() => {
info!("Cancel token cancelled");
send_messages.abort();
recv_messages.abort();
recv_messages_ws.abort();
recv_messages_waku.abort();
}
};
info!("Main loop finished");
}
async fn handle_user_actions(
msg: WakuMessage,
waku_actor: ActorRef<WakuActor>,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
cancel_token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error>> {
let actions = user_actor.ask(msg).await?;
for action in actions {
match action {
UserAction::SendToWaku(msg) => {
let id = waku_actor.ask(msg).await?;
info!("Successfully publish message with id: {:?}", id);
}
UserAction::SendToGroup(msg) => {
info!("Send to group: {:?}", msg);
ws_actor.ask(msg).await?;
}
UserAction::RemoveGroup(group_name) => {
waku_actor
.ask(ProcessUnsubscribeFromGroup {
group_name: group_name.clone(),
})
.await?;
user_actor
.ask(ProcessLeaveGroup {
group_name: group_name.clone(),
})
.await?;
info!("Leave group: {:?}", &group_name);
ws_actor
.ask(MessageToPrint {
sender: "system".to_string(),
message: format!("Group {} removed you", group_name),
group_name: group_name.clone(),
})
.await?;
cancel_token.cancel();
}
UserAction::DoNothing => {}
}
}
Ok(())
}
async fn handle_ws_message(
msg: RawWsMessage,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
waku_actor: ActorRef<WakuActor>,
) -> Result<(), Box<dyn std::error::Error>> {
let action = ws_actor.ask(msg).await?;
match action {
WsAction::Connect(connect) => {
info!("Got unexpected connect: {:?}", &connect);
}
WsAction::UserMessage(msg) => {
info!("Got user message: {:?}", &msg);
let mtp = MessageToPrint {
message: msg.message.clone(),
group_name: msg.group_id.clone(),
sender: "me".to_string(),
};
ws_actor.ask(mtp).await?;
let pmt = user_actor
.ask(ProcessSendMessage {
msg: msg.message,
group_name: msg.group_id,
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
}
WsAction::RemoveUser(user_to_ban, group_name) => {
info!("Got remove user: {:?}", &user_to_ban);
let pmt = user_actor
.ask(ProcessRemoveUser {
user_to_ban: user_to_ban.clone(),
group_name: group_name.clone(),
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
ws_actor
.ask(MessageToPrint {
sender: "system".to_string(),
message: format!("User {} was removed from group", user_to_ban),
group_name: group_name.clone(),
})
.await?;
}
WsAction::DoNothing => {}
}
Ok(())
}
async fn get_rooms(State(state): State<Arc<AppState>>) -> String {
let rooms = state.rooms.lock().unwrap();
let vec = rooms.iter().collect::<Vec<&String>>();

View File

@@ -362,7 +362,7 @@ impl User {
}
}
async fn invite_users(
pub async fn invite_users(
&mut self,
users_kp: Vec<KeyPackage>,
group_name: String,
@@ -508,4 +508,10 @@ impl User {
pub fn wallet(&self) -> EthereumWallet {
EthereumWallet::from(self.eth_signer.clone())
}
pub fn new_key_package(&mut self) -> Result<KeyPackage, UserError> {
Ok(self
.identity
.generate_key_package(CIPHERSUITE, &self.provider)?)
}
}

View File

@@ -4,17 +4,10 @@ use log::{error, info};
use std::{str::FromStr, sync::Arc, time::Duration};
use crate::user::{ProcessAdminMessage, ProcessCreateGroup, User};
use crate::{AppState, UserError};
use crate::{AppState, Connection, UserError};
use ds::waku_actor::ProcessSubscribeToGroup;
#[derive(Debug, Clone)]
pub struct Connection {
pub eth_private_key: String,
pub group_id: String,
pub should_create_group: bool,
}
pub async fn main_loop(
pub async fn create_user_instance(
connection: Connection,
app_state: Arc<AppState>,
) -> Result<ActorRef<User>, UserError> {

View File

@@ -47,7 +47,7 @@ pub struct UserMessage {
}
/// This struct is used to represent the connection data that web socket sends to the user
#[derive(Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct ConnectMessage {
/// This is the private key of the user that we will use to authenticate the user
pub eth_private_key: String,