mirror of
https://github.com/vacp2p/de-mls.git
synced 2026-01-09 13:38:06 -05:00
Add benchmarks for create group and add users action (#31)
* Update dependencies, add benchmarking, and enhance Waku integration - Updated `tokio` version from `1.38.0` to `1.43.0` in `Cargo.toml` files for improved performance and compatibility. - Introduced a new benchmarking file `benches/group_flow_benchmark.rs` to measure performance of group-related operations. - Added a new CI job for running benchmarks in the GitHub Actions workflow. - Enhanced Waku event handling in `ds_waku.rs` to utilize `BoundedVecDeque` for managing seen messages, improving memory efficiency. - Refactored message handling in `main.rs` to streamline Waku event processing and user actions. - Added `action_handlers.rs` to separate user action handling logic, improving code organization. - Introduced `user_app_instance.rs` for creating user instances with group management capabilities. - Made `invite_users` method public in `user.rs` to allow external access. - Updated WebSocket connection message structure to include serialization support. * Fix CI for running benchmark * Update CI workflow to run benchmarks on pull requests and enhance caching * Add required filed `branch name` * Update CI workflow to include Go setup and increase timeout for benchmark jobs * Update Cargo.toml to disable benchmarking for the de-mls binary * Update Cargo.toml to add library section and disable benchmarking for the de-mls library * Remove benchmarking configuration for the de-mls binary in Cargo.toml * Update Cargo.toml to change criterion dependency version and enable HTML reports * Disable benchmarking for the de-mls binary in Cargo.toml to streamline build configuration.
This commit is contained in:
committed by
GitHub
parent
c99eadb302
commit
d93ac900ae
16
.github/workflows/ci.yml
vendored
16
.github/workflows/ci.yml
vendored
@@ -38,6 +38,22 @@ jobs:
|
||||
cargo test --release
|
||||
working-directory: tests
|
||||
|
||||
benches:
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: '1.20.x'
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v3
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: boa-dev/criterion-compare-action@v3
|
||||
with:
|
||||
branchName: ${{ github.base_ref }}
|
||||
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
|
||||
16
Cargo.toml
16
Cargo.toml
@@ -11,6 +11,10 @@ edition = "2021"
|
||||
[[bin]]
|
||||
name = "de-mls"
|
||||
path = "src/main.rs"
|
||||
bench = false
|
||||
|
||||
[lib]
|
||||
bench = false
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
@@ -24,11 +28,7 @@ openmls_traits = "=0.2.0"
|
||||
axum = { version = "0.6.10", features = ["ws"] }
|
||||
futures = "0.3.26"
|
||||
tower-http = { version = "0.4.0", features = ["cors"] }
|
||||
tokio = { version = "=1.38.0", features = [
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
"full",
|
||||
] }
|
||||
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "full"] }
|
||||
tokio-util = "0.7.13"
|
||||
alloy = { git = "https://github.com/alloy-rs/alloy", features = [
|
||||
"providers",
|
||||
@@ -40,6 +40,8 @@ alloy = { git = "https://github.com/alloy-rs/alloy", features = [
|
||||
] }
|
||||
kameo = "0.13.0"
|
||||
|
||||
criterion = { version = "=0.4.0", features = ["html_reports"] }
|
||||
|
||||
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
|
||||
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
|
||||
|
||||
@@ -68,3 +70,7 @@ log = "0.4.22"
|
||||
|
||||
ds = { path = "ds" }
|
||||
mls_crypto = { path = "mls_crypto" }
|
||||
|
||||
[[bench]]
|
||||
name = "group_flow_benchmark"
|
||||
harness = false
|
||||
|
||||
201
benches/group_flow_benchmark.rs
Normal file
201
benches/group_flow_benchmark.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
use alloy::primitives::{Address, U160};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use de_mls::user::{ProcessCreateGroup, User, UserAction};
|
||||
use mls_crypto::openmls_provider::{MlsCryptoProvider, CIPHERSUITE};
|
||||
use openmls::prelude::{
|
||||
Credential, CredentialType, CredentialWithKey, CryptoConfig, KeyPackage, ProtocolVersion,
|
||||
};
|
||||
use openmls_basic_credential::SignatureKeyPair;
|
||||
use openmls_traits::OpenMlsCryptoProvider;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
fn generate_random_key_package() -> KeyPackage {
|
||||
let rand_bytes = thread_rng().gen::<[u8; 20]>();
|
||||
let rand_wallet_address = Address::from(U160::from_be_bytes(rand_bytes));
|
||||
let credential = Credential::new(
|
||||
rand_wallet_address.to_string().as_bytes().to_vec(),
|
||||
CredentialType::Basic,
|
||||
)
|
||||
.unwrap();
|
||||
let signature_keys = SignatureKeyPair::new(CIPHERSUITE.signature_algorithm()).unwrap();
|
||||
let credential_with_key = CredentialWithKey {
|
||||
credential,
|
||||
signature_key: signature_keys.to_public_vec().into(),
|
||||
};
|
||||
let crypto = &MlsCryptoProvider::default();
|
||||
signature_keys.store(crypto.key_store()).unwrap();
|
||||
KeyPackage::builder()
|
||||
.build(
|
||||
CryptoConfig {
|
||||
ciphersuite: CIPHERSUITE,
|
||||
version: ProtocolVersion::default(),
|
||||
},
|
||||
crypto,
|
||||
&signature_keys,
|
||||
credential_with_key.clone(),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Benchmark for creating user with group - that means it creates mls group instance
|
||||
fn create_user_with_group_benchmark(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
c.bench_function("create_user_with_group", |b| {
|
||||
b.iter(|| {
|
||||
rt.block_on(async {
|
||||
let user =
|
||||
User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
|
||||
.unwrap();
|
||||
let user_ref = kameo::spawn(user);
|
||||
user_ref
|
||||
.ask(ProcessCreateGroup {
|
||||
group_name: "group".to_string(),
|
||||
is_creation: true,
|
||||
})
|
||||
.await
|
||||
.expect("Failed to create group");
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
fn create_group_benchmark(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
let user_ref = rt.block_on(async {
|
||||
let user = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
|
||||
.unwrap();
|
||||
kameo::spawn(user)
|
||||
});
|
||||
|
||||
c.bench_function("create_group", |b| {
|
||||
b.iter(|| {
|
||||
rt.block_on(async {
|
||||
let group_name = "group".to_string() + &rand::thread_rng().gen::<u64>().to_string();
|
||||
user_ref
|
||||
.ask(ProcessCreateGroup {
|
||||
group_name,
|
||||
is_creation: true,
|
||||
})
|
||||
.await
|
||||
.expect("Failed to create group");
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark for creating user
|
||||
fn create_user_benchmark(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
c.bench_function("create_user", |b| {
|
||||
b.iter(|| {
|
||||
rt.block_on(async {
|
||||
User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
|
||||
.unwrap();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
fn add_user_to_group_benchmark(c: &mut Criterion) {
|
||||
for i in [10, 100, 500, 1000] {
|
||||
let rt = Runtime::new().unwrap();
|
||||
let user_kps = Arc::new(Mutex::new(
|
||||
(0..i)
|
||||
.map(|_| generate_random_key_package())
|
||||
.collect::<Vec<KeyPackage>>(),
|
||||
));
|
||||
c.bench_function(format!("add_users_to_group_{}", i).as_str(), |b| {
|
||||
b.iter(|| {
|
||||
rt.block_on(async {
|
||||
let mut alice = User::new(
|
||||
"0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80",
|
||||
)
|
||||
.unwrap();
|
||||
alice
|
||||
.create_group("group".to_string(), true)
|
||||
.await
|
||||
.expect("Failed to create group");
|
||||
alice
|
||||
.invite_users(user_kps.lock().unwrap().clone(), "group".to_string())
|
||||
.await
|
||||
.expect("Failed to invite users");
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn share_kp_benchmark(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
let mut alice = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
|
||||
.expect("Failed to create user");
|
||||
let mut bob = User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
|
||||
.expect("Failed to create user");
|
||||
|
||||
c.bench_function("share_kp_benchmark", |b| {
|
||||
b.iter(|| {
|
||||
rt.block_on(async {
|
||||
let group_name = "group".to_string() + &rand::thread_rng().gen::<u64>().to_string();
|
||||
alice
|
||||
.create_group(group_name.clone(), true)
|
||||
.await
|
||||
.expect("Failed to create group");
|
||||
bob.create_group(group_name.clone(), false)
|
||||
.await
|
||||
.expect("Failed to create group");
|
||||
let alice_ga_msg = alice
|
||||
.prepare_admin_msg(group_name.clone())
|
||||
.await
|
||||
.expect("Failed to prepare admin message");
|
||||
|
||||
let waku_ga_message = alice_ga_msg
|
||||
.build_waku_message()
|
||||
.expect("Failed to build waku message");
|
||||
|
||||
// Bob receives the Group Announcement msg and send Key Package Share msg to Alice
|
||||
let bob_res = bob
|
||||
.process_waku_msg(waku_ga_message)
|
||||
.await
|
||||
.expect("Failed to process waku message");
|
||||
let bob_kp_message = match bob_res[0].clone() {
|
||||
UserAction::SendToWaku(msg) => msg,
|
||||
_ => panic!("User action is not SendToWaku"),
|
||||
};
|
||||
let waku_kp_message = bob_kp_message
|
||||
.build_waku_message()
|
||||
.expect("Failed to build waku message");
|
||||
|
||||
// Alice receives the Key Package Share msg and send Welcome msg to Bob
|
||||
let user_action_invite = alice
|
||||
.process_waku_msg(waku_kp_message)
|
||||
.await
|
||||
.expect("Failed to process waku message");
|
||||
|
||||
let alice_welcome_message = match user_action_invite[1].clone() {
|
||||
UserAction::SendToWaku(msg) => msg,
|
||||
_ => panic!("User action is not SendToWaku"),
|
||||
};
|
||||
let waku_welcome_message = alice_welcome_message
|
||||
.build_waku_message()
|
||||
.expect("Failed to build waku message");
|
||||
|
||||
// Bob receives the Welcome msg and join the group
|
||||
bob.process_waku_msg(waku_welcome_message)
|
||||
.await
|
||||
.expect("Failed to process waku message");
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
create_group_benchmark,
|
||||
create_user_with_group_benchmark,
|
||||
share_kp_benchmark,
|
||||
create_user_benchmark,
|
||||
add_user_to_group_benchmark,
|
||||
);
|
||||
criterion_main!(benches);
|
||||
@@ -9,8 +9,9 @@ edition = "2021"
|
||||
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
|
||||
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
|
||||
|
||||
tokio = { version = "=1.38.0", features = ["full"] }
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
kameo = "=0.13.0"
|
||||
bounded-vec-deque = "0.1.1"
|
||||
|
||||
chrono = "=0.4.38"
|
||||
uuid = { version = "=1.11.0", features = [
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use bounded_vec_deque::BoundedVecDeque;
|
||||
use core::result::Result;
|
||||
use log::{error, info};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
str::FromStr,
|
||||
@@ -6,6 +8,7 @@ use std::{
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use waku_bindings::*;
|
||||
|
||||
use crate::DeliveryServiceError;
|
||||
@@ -121,3 +124,43 @@ pub fn match_content_topic(
|
||||
let locked_topics = content_topics.lock().unwrap();
|
||||
locked_topics.is_empty() || locked_topics.iter().any(|t| t == topic)
|
||||
}
|
||||
|
||||
pub async fn handle_waku_event(
|
||||
waku_sender: Sender<WakuMessage>,
|
||||
content_topics: Arc<SyncMutex<Vec<WakuContentTopic>>>,
|
||||
) {
|
||||
info!("Setting up waku event callback");
|
||||
let mut seen_messages = BoundedVecDeque::<String>::new(40);
|
||||
waku_set_event_callback(move |signal| {
|
||||
match signal.event() {
|
||||
waku_bindings::Event::WakuMessage(event) => {
|
||||
let msg_id = event.message_id();
|
||||
if seen_messages.contains(msg_id) {
|
||||
return;
|
||||
}
|
||||
seen_messages.push_back(msg_id.clone());
|
||||
let content_topic = event.waku_message().content_topic();
|
||||
// Check if message belongs to a relevant topic
|
||||
if !match_content_topic(&content_topics, content_topic) {
|
||||
error!("Content topic not match: {:?}", content_topic);
|
||||
return;
|
||||
};
|
||||
let msg = event.waku_message().clone();
|
||||
info!("Received message from waku: {:?}", event.message_id());
|
||||
waku_sender
|
||||
.blocking_send(msg)
|
||||
.expect("Failed to send message to waku");
|
||||
}
|
||||
|
||||
waku_bindings::Event::Unrecognized(data) => {
|
||||
error!("Unrecognized event!\n {data:?}");
|
||||
}
|
||||
_ => {
|
||||
error!(
|
||||
"Unrecognized signal!\n {:?}",
|
||||
serde_json::to_string(&signal)
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -50,9 +50,12 @@ impl Message<WakuMessage> for ActorA {
|
||||
async fn test_waku_client() {
|
||||
let group_name = "new_group".to_string();
|
||||
let mut pubsub = PubSub::<WakuMessage>::new();
|
||||
let (sender_alice, mut receiver_alice) = channel(100);
|
||||
let (sender_alice, mut receiver_alice) = channel::<WakuMessage>(100);
|
||||
// TODO: get node from env
|
||||
let res = setup_node_handle(vec![]);
|
||||
let res = setup_node_handle(vec![
|
||||
"/ip4/139.59.24.82/tcp/60000/p2p/16Uiu2HAm2CfeeaNiGwv88Loe417HrRbCwTFqhpDiR3wevbCcvYz2"
|
||||
.to_string(),
|
||||
]);
|
||||
assert!(res.is_ok());
|
||||
let node = res.unwrap();
|
||||
let uuid = uuid::Uuid::new_v4().as_bytes().to_vec();
|
||||
|
||||
109
src/action_handlers.rs
Normal file
109
src/action_handlers.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
use kameo::actor::ActorRef;
|
||||
use log::info;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use waku_bindings::WakuMessage;
|
||||
|
||||
use crate::{
|
||||
user::{ProcessLeaveGroup, ProcessRemoveUser, ProcessSendMessage, User, UserAction},
|
||||
ws_actor::{RawWsMessage, WsAction, WsActor},
|
||||
MessageToPrint,
|
||||
};
|
||||
use ds::waku_actor::{ProcessUnsubscribeFromGroup, WakuActor};
|
||||
|
||||
pub async fn handle_user_actions(
|
||||
msg: WakuMessage,
|
||||
waku_actor: ActorRef<WakuActor>,
|
||||
ws_actor: ActorRef<WsActor>,
|
||||
user_actor: ActorRef<User>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let actions = user_actor.ask(msg).await?;
|
||||
for action in actions {
|
||||
match action {
|
||||
UserAction::SendToWaku(msg) => {
|
||||
let id = waku_actor.ask(msg).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
}
|
||||
UserAction::SendToGroup(msg) => {
|
||||
info!("Send to group: {:?}", msg);
|
||||
ws_actor.ask(msg).await?;
|
||||
}
|
||||
UserAction::RemoveGroup(group_name) => {
|
||||
waku_actor
|
||||
.ask(ProcessUnsubscribeFromGroup {
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
user_actor
|
||||
.ask(ProcessLeaveGroup {
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
info!("Leave group: {:?}", &group_name);
|
||||
ws_actor
|
||||
.ask(MessageToPrint {
|
||||
sender: "system".to_string(),
|
||||
message: format!("Group {} removed you", group_name),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
cancel_token.cancel();
|
||||
}
|
||||
UserAction::DoNothing => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_ws_action(
|
||||
msg: RawWsMessage,
|
||||
ws_actor: ActorRef<WsActor>,
|
||||
user_actor: ActorRef<User>,
|
||||
waku_actor: ActorRef<WakuActor>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let action = ws_actor.ask(msg).await?;
|
||||
match action {
|
||||
WsAction::Connect(connect) => {
|
||||
info!("Got unexpected connect: {:?}", &connect);
|
||||
}
|
||||
WsAction::UserMessage(msg) => {
|
||||
info!("Got user message: {:?}", &msg);
|
||||
let mtp = MessageToPrint {
|
||||
message: msg.message.clone(),
|
||||
group_name: msg.group_id.clone(),
|
||||
sender: "me".to_string(),
|
||||
};
|
||||
ws_actor.ask(mtp).await?;
|
||||
|
||||
let pmt = user_actor
|
||||
.ask(ProcessSendMessage {
|
||||
msg: msg.message,
|
||||
group_name: msg.group_id,
|
||||
})
|
||||
.await?;
|
||||
let id = waku_actor.ask(pmt).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
}
|
||||
WsAction::RemoveUser(user_to_ban, group_name) => {
|
||||
info!("Got remove user: {:?}", &user_to_ban);
|
||||
let pmt = user_actor
|
||||
.ask(ProcessRemoveUser {
|
||||
user_to_ban: user_to_ban.clone(),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
let id = waku_actor.ask(pmt).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
ws_actor
|
||||
.ask(MessageToPrint {
|
||||
sender: "system".to_string(),
|
||||
message: format!("User {} was removed from group", user_to_ban),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
WsAction::DoNothing => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
10
src/lib.rs
10
src/lib.rs
@@ -21,10 +21,11 @@ use ds::{
|
||||
DeliveryServiceError,
|
||||
};
|
||||
|
||||
pub mod action_handlers;
|
||||
pub mod group_actor;
|
||||
pub mod identity;
|
||||
pub mod main_loop;
|
||||
pub mod user;
|
||||
pub mod user_app_instance;
|
||||
pub mod ws_actor;
|
||||
|
||||
pub struct AppState {
|
||||
@@ -34,6 +35,13 @@ pub struct AppState {
|
||||
pub pubsub: tokio::sync::broadcast::Sender<WakuMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Connection {
|
||||
pub eth_private_key: String,
|
||||
pub group_id: String,
|
||||
pub should_create_group: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum WelcomeMessageType {
|
||||
GroupAnnouncement,
|
||||
|
||||
186
src/main.rs
186
src/main.rs
@@ -6,9 +6,7 @@ use axum::{
|
||||
routing::get,
|
||||
Router,
|
||||
};
|
||||
use bounded_vec_deque::BoundedVecDeque;
|
||||
use futures::StreamExt;
|
||||
use kameo::actor::ActorRef;
|
||||
use log::{error, info};
|
||||
use serde_json::json;
|
||||
use std::{
|
||||
@@ -16,20 +14,20 @@ use std::{
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use waku_bindings::{waku_set_event_callback, WakuMessage};
|
||||
use waku_bindings::WakuMessage;
|
||||
|
||||
use de_mls::{
|
||||
main_loop::{main_loop, Connection},
|
||||
user::{ProcessLeaveGroup, ProcessRemoveUser, ProcessSendMessage, User, UserAction},
|
||||
action_handlers::{handle_user_actions, handle_ws_action},
|
||||
user_app_instance::create_user_instance,
|
||||
ws_actor::{RawWsMessage, WsAction, WsActor},
|
||||
AppState, MessageToPrint,
|
||||
AppState, Connection,
|
||||
};
|
||||
use ds::{
|
||||
ds_waku::{match_content_topic, setup_node_handle},
|
||||
waku_actor::{ProcessUnsubscribeFromGroup, WakuActor},
|
||||
ds_waku::{handle_waku_event, setup_node_handle},
|
||||
waku_actor::WakuActor,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
@@ -52,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
});
|
||||
|
||||
let (waku_sender, mut waku_receiver) = channel::<WakuMessage>(100);
|
||||
handle_waku(waku_sender, app_state.clone()).await;
|
||||
handle_waku_event(waku_sender, app_state.content_topics.clone()).await;
|
||||
|
||||
let recv_messages = tokio::spawn(async move {
|
||||
info!("Running recv messages from waku");
|
||||
@@ -73,6 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.layer(cors);
|
||||
|
||||
println!("Hosted on {:?}", addr);
|
||||
|
||||
let res = axum::Server::bind(&addr).serve(app.into_make_service());
|
||||
tokio::select! {
|
||||
Err(x) = res => {
|
||||
@@ -89,43 +88,6 @@ async fn handler(ws: WebSocketUpgrade, State(state): State<Arc<AppState>>) -> im
|
||||
ws.on_upgrade(|socket| handle_socket(socket, state))
|
||||
}
|
||||
|
||||
async fn handle_waku(waku_sender: Sender<WakuMessage>, state: Arc<AppState>) {
|
||||
info!("Setting up waku event callback");
|
||||
let mut seen_messages = BoundedVecDeque::<String>::new(40);
|
||||
waku_set_event_callback(move |signal| {
|
||||
match signal.event() {
|
||||
waku_bindings::Event::WakuMessage(event) => {
|
||||
let msg_id = event.message_id();
|
||||
if seen_messages.contains(msg_id) {
|
||||
return;
|
||||
}
|
||||
seen_messages.push_back(msg_id.clone());
|
||||
let content_topic = event.waku_message().content_topic();
|
||||
// Check if message belongs to a relevant topic
|
||||
if !match_content_topic(&state.content_topics, content_topic) {
|
||||
error!("Content topic not match: {:?}", content_topic);
|
||||
return;
|
||||
};
|
||||
let msg = event.waku_message().clone();
|
||||
info!("Received message from waku: {:?}", event.message_id());
|
||||
waku_sender
|
||||
.blocking_send(msg)
|
||||
.expect("Failed to send message to waku");
|
||||
}
|
||||
|
||||
waku_bindings::Event::Unrecognized(data) => {
|
||||
error!("Unrecognized event!\n {data:?}");
|
||||
}
|
||||
_ => {
|
||||
error!(
|
||||
"Unrecognized signal!\n {:?}",
|
||||
serde_json::to_string(&signal)
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
|
||||
let (ws_sender, mut ws_receiver) = socket.split();
|
||||
let ws_actor = kameo::spawn(WsActor::new(ws_sender));
|
||||
@@ -156,18 +118,19 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
|
||||
}
|
||||
}
|
||||
|
||||
let user_actor = main_loop(main_loop_connection.unwrap().clone(), state.clone())
|
||||
let user_actor = create_user_instance(main_loop_connection.unwrap().clone(), state.clone())
|
||||
.await
|
||||
.expect("Failed to start main loop");
|
||||
|
||||
let user_actor_clone = user_actor.clone();
|
||||
let state_clone = state.clone();
|
||||
let ws_actor_clone = ws_actor.clone();
|
||||
let mut waku_receiver = state.pubsub.subscribe();
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
let mut recv_messages = tokio::spawn(async move {
|
||||
info!("Running recv messages from waku");
|
||||
while let Ok(msg) = waku_receiver.recv().await {
|
||||
|
||||
let mut user_waku_receiver = state.pubsub.subscribe();
|
||||
let mut recv_messages_waku = tokio::spawn(async move {
|
||||
info!("Running recv messages from waku for current user");
|
||||
while let Ok(msg) = user_waku_receiver.recv().await {
|
||||
let res = handle_user_actions(
|
||||
msg,
|
||||
state_clone.waku_actor.clone(),
|
||||
@@ -183,11 +146,11 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
|
||||
});
|
||||
|
||||
let user_ref_clone = user_actor.clone();
|
||||
let mut send_messages = {
|
||||
let mut recv_messages_ws = {
|
||||
tokio::spawn(async move {
|
||||
info!("Running recieve messages from websocket");
|
||||
while let Some(Ok(Message::Text(text))) = ws_receiver.next().await {
|
||||
let res = handle_ws_message(
|
||||
let res = handle_ws_action(
|
||||
RawWsMessage { message: text },
|
||||
ws_actor.clone(),
|
||||
user_ref_clone.clone(),
|
||||
@@ -201,124 +164,25 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
|
||||
})
|
||||
};
|
||||
|
||||
info!("Waiting for main loop to finish");
|
||||
tokio::select! {
|
||||
_ = (&mut recv_messages) => {
|
||||
info!("recv_messages finished");
|
||||
send_messages.abort();
|
||||
_ = (&mut recv_messages_waku) => {
|
||||
info!("recv messages from waku finished");
|
||||
recv_messages_ws.abort();
|
||||
}
|
||||
_ = (&mut send_messages) => {
|
||||
info!("send_messages finished");
|
||||
send_messages.abort();
|
||||
_ = (&mut recv_messages_ws) => {
|
||||
info!("recieve messages from websocket finished");
|
||||
recv_messages_ws.abort();
|
||||
}
|
||||
_ = cancel_token.cancelled() => {
|
||||
info!("Cancel token cancelled");
|
||||
send_messages.abort();
|
||||
recv_messages.abort();
|
||||
recv_messages_ws.abort();
|
||||
recv_messages_waku.abort();
|
||||
}
|
||||
};
|
||||
|
||||
info!("Main loop finished");
|
||||
}
|
||||
|
||||
async fn handle_user_actions(
|
||||
msg: WakuMessage,
|
||||
waku_actor: ActorRef<WakuActor>,
|
||||
ws_actor: ActorRef<WsActor>,
|
||||
user_actor: ActorRef<User>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let actions = user_actor.ask(msg).await?;
|
||||
for action in actions {
|
||||
match action {
|
||||
UserAction::SendToWaku(msg) => {
|
||||
let id = waku_actor.ask(msg).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
}
|
||||
UserAction::SendToGroup(msg) => {
|
||||
info!("Send to group: {:?}", msg);
|
||||
ws_actor.ask(msg).await?;
|
||||
}
|
||||
UserAction::RemoveGroup(group_name) => {
|
||||
waku_actor
|
||||
.ask(ProcessUnsubscribeFromGroup {
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
user_actor
|
||||
.ask(ProcessLeaveGroup {
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
info!("Leave group: {:?}", &group_name);
|
||||
ws_actor
|
||||
.ask(MessageToPrint {
|
||||
sender: "system".to_string(),
|
||||
message: format!("Group {} removed you", group_name),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
cancel_token.cancel();
|
||||
}
|
||||
UserAction::DoNothing => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_ws_message(
|
||||
msg: RawWsMessage,
|
||||
ws_actor: ActorRef<WsActor>,
|
||||
user_actor: ActorRef<User>,
|
||||
waku_actor: ActorRef<WakuActor>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let action = ws_actor.ask(msg).await?;
|
||||
match action {
|
||||
WsAction::Connect(connect) => {
|
||||
info!("Got unexpected connect: {:?}", &connect);
|
||||
}
|
||||
WsAction::UserMessage(msg) => {
|
||||
info!("Got user message: {:?}", &msg);
|
||||
let mtp = MessageToPrint {
|
||||
message: msg.message.clone(),
|
||||
group_name: msg.group_id.clone(),
|
||||
sender: "me".to_string(),
|
||||
};
|
||||
ws_actor.ask(mtp).await?;
|
||||
|
||||
let pmt = user_actor
|
||||
.ask(ProcessSendMessage {
|
||||
msg: msg.message,
|
||||
group_name: msg.group_id,
|
||||
})
|
||||
.await?;
|
||||
let id = waku_actor.ask(pmt).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
}
|
||||
WsAction::RemoveUser(user_to_ban, group_name) => {
|
||||
info!("Got remove user: {:?}", &user_to_ban);
|
||||
let pmt = user_actor
|
||||
.ask(ProcessRemoveUser {
|
||||
user_to_ban: user_to_ban.clone(),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
let id = waku_actor.ask(pmt).await?;
|
||||
info!("Successfully publish message with id: {:?}", id);
|
||||
ws_actor
|
||||
.ask(MessageToPrint {
|
||||
sender: "system".to_string(),
|
||||
message: format!("User {} was removed from group", user_to_ban),
|
||||
group_name: group_name.clone(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
WsAction::DoNothing => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_rooms(State(state): State<Arc<AppState>>) -> String {
|
||||
let rooms = state.rooms.lock().unwrap();
|
||||
let vec = rooms.iter().collect::<Vec<&String>>();
|
||||
|
||||
@@ -362,7 +362,7 @@ impl User {
|
||||
}
|
||||
}
|
||||
|
||||
async fn invite_users(
|
||||
pub async fn invite_users(
|
||||
&mut self,
|
||||
users_kp: Vec<KeyPackage>,
|
||||
group_name: String,
|
||||
|
||||
@@ -4,17 +4,10 @@ use log::{error, info};
|
||||
use std::{str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use crate::user::{ProcessAdminMessage, ProcessCreateGroup, User};
|
||||
use crate::{AppState, UserError};
|
||||
use crate::{AppState, Connection, UserError};
|
||||
use ds::waku_actor::ProcessSubscribeToGroup;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Connection {
|
||||
pub eth_private_key: String,
|
||||
pub group_id: String,
|
||||
pub should_create_group: bool,
|
||||
}
|
||||
|
||||
pub async fn main_loop(
|
||||
pub async fn create_user_instance(
|
||||
connection: Connection,
|
||||
app_state: Arc<AppState>,
|
||||
) -> Result<ActorRef<User>, UserError> {
|
||||
@@ -47,7 +47,7 @@ pub struct UserMessage {
|
||||
}
|
||||
|
||||
/// This struct is used to represent the connection data that web socket sends to the user
|
||||
#[derive(Deserialize, Debug, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||
pub struct ConnectMessage {
|
||||
/// This is the private key of the user that we will use to authenticate the user
|
||||
pub eth_private_key: String,
|
||||
|
||||
Reference in New Issue
Block a user