FIx waku double message issue (#32)

* Refactor Waku client test to improve node setup and logging

* Refactor Waku integration to use new node setup and improve message handling

* Refactor Waku node setup and message handling with improved configuration

* Refactor main application structure and improve Waku node initialization

- Restructured main application to separate server and Waku node initialization
- Added more detailed logging for Waku node setup and connection process
- Updated Cargo.toml to use underscore naming convention and update Alloy dependency
- Modified Dockerfile to simplify build process
- Improved error handling and task management in main application flow

* Refactor project structure and improve dependency management

- Updated package name in Cargo.toml to use hyphen instead of underscore for consistency.
- Simplified dependency paths for Waku bindings in Cargo.toml files across the project.
- Modified Dockerfile to streamline the build process by removing unnecessary release flags.
- Enhanced README with updated environment variable instructions for better clarity.
- Improved frontend button structure for better user interaction.
- Cleaned up unused code in Waku actor and related tests, enhancing maintainability.

* Update Waku node initialization with auto-subscription notes

- Added comments to clarify that the Waku node is auto-subscribing to the pubsub topic and that explicit subscription is unnecessary due to this behavior.
- Documented the limitation regarding subscription checks in Waku, referencing the related issue for better context.

* Remove unused Go setup from CI workflow and clean up Cargo.toml by commenting out the library section. This streamlines the configuration and focuses on Rust dependencies.

* Add peer address configuration and benchmark setup

- Introduced a new benchmark configuration in Cargo.toml for `group_flow_benchmark`.
- Updated docker-compose.yml to include `PEER_ADDRESSES` environment variable.
- Enhanced README with instructions for setting `PEER_ADDRESSES` for node connections.
- Modified main.rs to parse and utilize peer addresses for Waku node connections, improving network configuration flexibility.

* Update Cargo.toml and Dockerfile for library configuration and build process

- Added a library section in Cargo.toml to specify bench settings.
- Modified Dockerfile to create necessary source files and streamline the build process by removing redundant commands.
- Updated main.rs to improve error handling for peer address configuration.

* Comment out library and benchmark sections in Cargo.toml, update Dockerfile to streamline build process, and disable CI benchmark jobs in workflow configuration.
This commit is contained in:
Ekaterina Broslavskaya
2025-03-21 16:25:48 +07:00
committed by GitHub
parent d93ac900ae
commit 8ebeb4d898
17 changed files with 408 additions and 449 deletions

View File

@@ -27,10 +27,6 @@ jobs:
user_test:
runs-on: ubuntu-latest
steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20.x'
- name: Checkout code
uses: actions/checkout@v3
- name: cargo test
@@ -38,30 +34,22 @@ jobs:
cargo test --release
working-directory: tests
benches:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20.x'
- name: Checkout sources
uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v2
- uses: boa-dev/criterion-compare-action@v3
with:
branchName: ${{ github.base_ref }}
# benches:
# if: github.event_name == 'pull_request'
# runs-on: ubuntu-latest
# timeout-minutes: 60
# steps:
# - name: Checkout sources
# uses: actions/checkout@v3
# - uses: Swatinem/rust-cache@v2
# - uses: boa-dev/criterion-compare-action@v3
# with:
# branchName: ${{ github.base_ref }}
lint:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.20.x'
- name: Checkout sources
uses: actions/checkout@v3
- name: Install stable toolchain

View File

@@ -13,24 +13,24 @@ name = "de-mls"
path = "src/main.rs"
bench = false
[lib]
bench = false
# [lib]
# bench = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# foundry-contracts.workspace = true
openmls = { version = "=0.5.0", features = ["test-utils"] }
openmls_basic_credential = "=0.2.0"
openmls_rust_crypto = "=0.2.0"
openmls_traits = "=0.2.0"
openmls = { version = "0.5.0", features = ["test-utils"] }
openmls_basic_credential = "0.2.0"
openmls_rust_crypto = "0.2.0"
openmls_traits = "0.2.0"
axum = { version = "0.6.10", features = ["ws"] }
futures = "0.3.26"
tower-http = { version = "0.4.0", features = ["cors"] }
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "full"] }
tokio-util = "0.7.13"
alloy = { git = "https://github.com/alloy-rs/alloy", features = [
alloy = { version = "0.11.0", features = [
"providers",
"node-bindings",
"network",
@@ -40,16 +40,16 @@ alloy = { git = "https://github.com/alloy-rs/alloy", features = [
] }
kameo = "0.13.0"
criterion = { version = "=0.4.0", features = ["html_reports"] }
criterion = { version = "0.4.0", features = ["html_reports"] }
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "rln-fix-deps"}
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "rln-fix-deps"}
rand = "=0.8.5"
serde_json = "=1.0"
serde = { version = "=1.0.204", features = ["derive"] }
tls_codec = "=0.3.0"
chrono = "=0.4.38"
rand = "0.8.5"
serde_json = "1.0"
serde = { version = "1.0.163", features = ["derive"] }
tls_codec = "0.3.0"
chrono = "0.4"
secp256k1 = { version = "0.30.0", features = [
"rand",
@@ -60,8 +60,8 @@ secp256k1 = { version = "0.30.0", features = [
ecies = "0.2.7"
libsecp256k1 = "0.7.1"
anyhow = "=1.0.81"
thiserror = "=1.0.61"
anyhow = "1.0.81"
thiserror = "1.0.39"
uuid = "1.11.0"
bounded-vec-deque = "0.1.1"
@@ -71,6 +71,6 @@ log = "0.4.22"
ds = { path = "ds" }
mls_crypto = { path = "mls_crypto" }
[[bench]]
name = "group_flow_benchmark"
harness = false
# [[bench]]
# name = "group_flow_benchmark"
# harness = false

View File

@@ -1,25 +1,21 @@
####################################################################################################
## Build image
####################################################################################################
FROM rust:latest as builder
FROM rust:latest
WORKDIR /app
RUN apt-get update && apt-get install -y libssl-dev pkg-config gcc clang
ENV PATH="/usr/local/go/bin:${PATH}"
COPY --from=golang:1.20 /usr/local/go/ /usr/local/go/
# Cache build dependencies
RUN echo "fn main() {}" > dummy.rs
COPY ["Cargo.toml", "./Cargo.toml"]
COPY ["ds/", "./ds/"]
COPY ["mls_crypto/", "./mls_crypto/"]
RUN sed -i 's#src/main.rs#dummy.rs#' Cargo.toml
RUN cargo build --release
RUN cargo build
RUN sed -i 's#dummy.rs#src/main.rs#' Cargo.toml
# Build the actual app
COPY ["src/", "./src/"]
RUN cargo build --release
RUN cargo build
CMD ["/app/target/release/de-mls"]
CMD ["/app/target/debug/de-mls"]

View File

@@ -6,8 +6,10 @@ Decentralized MLS PoC using a smart contract for group coordination
## Run Test Waku Node
This node is used to easially connect different instances of the app between each other.
```bash
docker run -p 8645:8645 -p 60000:60000 wakuorg/nwaku:v0.33.1 --cluster-id=15 --rest --relay --rln-relay=false --pubsub-topic=/waku/2/rs/15/0
docker run -p 8645:8645 -p 60000:60000 wakuorg/nwaku:v0.33.1 --cluster-id=15 --rest --relay --rln-relay=false --pubsub-topic=/waku/2/rs/15/1
```
## Run User Instance
@@ -18,7 +20,8 @@ Create a `.env` file in the `.env` folder for each client containing the followi
NAME=client1
BACKEND_PORT=3000
FRONTEND_PORT=4000
NODE_NAME=<waku-node-ip>
NODE_PORT=60000
PEER_ADDRESSES=[/ip4/x.x.x.x/tcp/60000/p2p/xxxx...xxxx]
```
Run docker compose up for the user instance
@@ -28,3 +31,23 @@ docker-compose --env-file ./.env/client1.env up --build
```
For each client, run the following command to start the frontend on the local host with the port specified in the `.env` file
Run from the frontend directory
```bash
PUBLIC_API_URL=http://0.0.0.0:3000 PUBLIC_WEBSOCKET_URL=ws://localhost:3000 npm run dev
```
Run from the root directory
```bash
RUST_BACKTRACE=full RUST_LOG=info NODE_PORT=60001 PEER_ADDRESSES=/ip4/x.x.x.x/tcp/60000/p2p/xxxx...xxxx,/ip4/y.y.y.y/tcp/60000/p2p/yyyy...yyyy cargo run -- --nocapture
```
### Example of ban user
In chat message block run ban command, note that user wallet address should be in the format without `0x`
```bash
/ban f39555ce6ab55579cfffb922665825e726880af6
```

View File

@@ -20,4 +20,5 @@ services:
- ${BACKEND_PORT}:3000
environment:
- RUST_LOG=info
- NODE=${NODE}
- NODE_PORT=${NODE_PORT}
- PEER_ADDRESSES=${PEER_ADDRESSES}

View File

@@ -6,25 +6,25 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-bindings" }
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "force-cluster-15", subdir = "waku-sys" }
waku-bindings = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "rln-fix-deps" }
waku-sys = { git = "https://github.com/waku-org/waku-rust-bindings.git", branch = "rln-fix-deps" }
tokio = { version = "1.43.0", features = ["full"] }
kameo = "=0.13.0"
kameo = "0.13.0"
bounded-vec-deque = "0.1.1"
chrono = "=0.4.38"
uuid = { version = "=1.11.0", features = [
chrono = "0.4"
uuid = { version = "1.11.0", features = [
"v4",
"fast-rng",
"macro-diagnostics",
] }
anyhow = "=1.0.81"
thiserror = "=1.0.61"
anyhow = "1.0.81"
thiserror = "1.0.39"
serde_json = "=1.0"
serde = "=1.0.204"
serde_json = "1.0"
serde = "1.0.163"
env_logger = "=0.11.5"
log = "=0.4.22"
env_logger = "0.11.5"
log = "0.4.22"

View File

@@ -1,17 +1,5 @@
use bounded_vec_deque::BoundedVecDeque;
use core::result::Result;
use log::{error, info};
use std::{
borrow::Cow,
str::FromStr,
sync::{Arc, Mutex as SyncMutex},
thread,
time::Duration,
};
use tokio::sync::mpsc::Sender;
use waku_bindings::*;
use crate::DeliveryServiceError;
use std::borrow::Cow;
use waku_bindings::{node::PubsubTopic, Encoding, WakuContentTopic};
pub const GROUP_VERSION: &str = "1";
pub const APP_MSG_SUBTOPIC: &str = "app_msg";
@@ -21,22 +9,21 @@ pub const SUBTOPICS: [&str; 3] = [APP_MSG_SUBTOPIC, COMMIT_MSG_SUBTOPIC, WELCOME
/// The pubsub topic for the Waku Node
/// Fixed for now because nodes on the network would need to be subscribed to existing pubsub topics
pub fn pubsub_topic() -> WakuPubSubTopic {
"/waku/2/rs/15/0".to_string()
pub fn pubsub_topic() -> PubsubTopic {
PubsubTopic::new("/waku/2/rs/15/1")
}
/// Build the content topics for a group. Subtopics are fixed for de-mls group communication.
///
/// Input:
/// - group_name: The name of the group
/// - group_version: The version of the group
///
/// Returns:
/// - content_topics: The content topics of the group
pub fn build_content_topics(group_name: &str, group_version: &str) -> Vec<WakuContentTopic> {
pub fn build_content_topics(group_name: &str) -> Vec<WakuContentTopic> {
SUBTOPICS
.iter()
.map(|subtopic| build_content_topic(group_name, group_version, subtopic))
.map(|subtopic| build_content_topic(group_name, GROUP_VERSION, subtopic))
.collect::<Vec<WakuContentTopic>>()
}
@@ -60,107 +47,3 @@ pub fn build_content_topic(
encoding: Encoding::Proto,
}
}
/// Build the content filter for the given pubsub topic and content topics
/// Input:
/// - pubsub_topic: The pubsub topic of the Waku Node
/// - content_topics: The content topics of the group
///
/// Returns:
/// - content_filter: The content filter of the group
pub fn content_filter(
pubsub_topic: &WakuPubSubTopic,
content_topics: &[WakuContentTopic],
) -> ContentFilter {
ContentFilter::new(Some(pubsub_topic.to_string()), content_topics.to_vec())
}
/// Setup the Waku Node Handle
/// Input:
/// - nodes_addresses: The addresses of the nodes to connect to
///
/// Returns:
/// - node_handle: The Waku Node Handle
#[allow(clippy::field_reassign_with_default)]
pub fn setup_node_handle(
nodes_addresses: Vec<String>,
) -> Result<WakuNodeHandle<Running>, DeliveryServiceError> {
let mut config = WakuNodeConfig::default();
// Set the port to 0 to let the system choose a random port
config.port = Some(0);
config.log_level = Some(WakuLogLevel::Panic);
let node_handle = waku_new(Some(config))
.map_err(|e| DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string()))?;
let node_handle = node_handle
.start()
.map_err(|e| DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string()))?;
let content_filter = ContentFilter::new(Some(pubsub_topic()), vec![]);
node_handle
.relay_subscribe(&content_filter)
.map_err(|e| DeliveryServiceError::WakuSubscribeToContentFilterError(e.to_string()))?;
for address in nodes_addresses
.iter()
.map(|a| Multiaddr::from_str(a.as_str()))
{
let address =
address.map_err(|e| DeliveryServiceError::FailedToParseMultiaddr(e.to_string()))?;
let peerid = node_handle
.add_peer(&address, ProtocolId::Relay)
.map_err(|e| DeliveryServiceError::WakuAddPeerError(e.to_string()))?;
node_handle
.connect_peer_with_id(&peerid, None)
.map_err(|e| DeliveryServiceError::WakuConnectPeerError(e.to_string()))?;
thread::sleep(Duration::from_secs(2));
}
Ok(node_handle)
}
/// Check if a content topic exists in a list of topics or if the list is empty
pub fn match_content_topic(
content_topics: &Arc<SyncMutex<Vec<WakuContentTopic>>>,
topic: &WakuContentTopic,
) -> bool {
let locked_topics = content_topics.lock().unwrap();
locked_topics.is_empty() || locked_topics.iter().any(|t| t == topic)
}
pub async fn handle_waku_event(
waku_sender: Sender<WakuMessage>,
content_topics: Arc<SyncMutex<Vec<WakuContentTopic>>>,
) {
info!("Setting up waku event callback");
let mut seen_messages = BoundedVecDeque::<String>::new(40);
waku_set_event_callback(move |signal| {
match signal.event() {
waku_bindings::Event::WakuMessage(event) => {
let msg_id = event.message_id();
if seen_messages.contains(msg_id) {
return;
}
seen_messages.push_back(msg_id.clone());
let content_topic = event.waku_message().content_topic();
// Check if message belongs to a relevant topic
if !match_content_topic(&content_topics, content_topic) {
error!("Content topic not match: {:?}", content_topic);
return;
};
let msg = event.waku_message().clone();
info!("Received message from waku: {:?}", event.message_id());
waku_sender
.blocking_send(msg)
.expect("Failed to send message to waku");
}
waku_bindings::Event::Unrecognized(data) => {
error!("Unrecognized event!\n {data:?}");
}
_ => {
error!(
"Unrecognized signal!\n {:?}",
serde_json::to_string(&signal)
);
}
}
});
}

View File

@@ -19,6 +19,8 @@ pub enum DeliveryServiceError {
WakuAddPeerError(String),
#[error("Waku connect peer error: {0}")]
WakuConnectPeerError(String),
#[error("Waku get listen addresses error: {0}")]
WakuGetListenAddressesError(String),
#[error("Failed to parse multiaddr: {0}")]
FailedToParseMultiaddr(String),

View File

@@ -1,32 +1,130 @@
use chrono::Utc;
use core::result::Result;
use kameo::{
message::{Context, Message},
Actor,
};
use log::debug;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use waku_bindings::{Running, WakuContentTopic, WakuMessage, WakuNodeHandle};
use std::{thread::sleep, time::Duration};
use tokio::sync::mpsc::Sender;
use waku_bindings::{
node::{WakuNodeConfig, WakuNodeHandle},
waku_new, Initialized, LibwakuResponse, Multiaddr, Running, WakuEvent, WakuMessage,
};
use crate::ds_waku::{pubsub_topic, GROUP_VERSION};
use crate::{
ds_waku::{build_content_topic, build_content_topics, content_filter},
DeliveryServiceError,
};
use crate::{ds_waku::build_content_topic, DeliveryServiceError};
/// WakuActor is the actor that handles the Waku Node
#[derive(Actor)]
pub struct WakuActor {
node: Arc<WakuNodeHandle<Running>>,
pub struct WakuNode<State> {
node: WakuNodeHandle<State>,
}
impl WakuActor {
/// Create a new WakuActor
impl WakuNode<Initialized> {
/// Create a new WakuNode
/// Input:
/// - node: The Waku Node to handle. Waku Node is already running
pub fn new(node: Arc<WakuNodeHandle<Running>>) -> Self {
Self { node }
pub async fn new(port: usize) -> Result<WakuNode<Initialized>, DeliveryServiceError> {
info!("Initializing waku node inside ");
// Note: here we are auto-subscribing to the pubsub topic /waku/2/rs/15/1
let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(port),
cluster_id: Some(15),
shards: vec![1],
log_level: Some("INFO"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
..Default::default()
}))
.await
.map_err(|e| DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string()))?;
info!("Waku node initialized");
Ok(WakuNode { node: waku })
}
pub async fn start(
self,
waku_sender: Sender<WakuMessage>,
) -> Result<WakuNode<Running>, DeliveryServiceError> {
let closure = move |response| {
if let LibwakuResponse::Success(v) = response {
let event: WakuEvent =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
WakuEvent::WakuMessage(evt) => {
info!("WakuMessage event received: {:?}", evt.message_hash);
waku_sender
.blocking_send(evt.waku_message.clone())
.expect("Failed to send message to waku");
}
WakuEvent::RelayTopicHealthChange(evt) => {
info!("Relay topic change evt: {:?}", evt);
}
WakuEvent::ConnectionChange(evt) => {
info!("Conn change evt: {:?}", evt);
}
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
};
self.node
.set_event_callback(closure)
.expect("set event call back working");
let waku = self.node.start().await.map_err(|e| {
debug!("Failed to start the Waku Node: {:?}", e);
DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string())
})?;
sleep(Duration::from_secs(2));
// Note: we are not subscribing to the pubsub topic here because we are already subscribed to it
// and from waku side we can't check if we are subscribed to it or not
// issue - https://github.com/waku-org/nwaku/issues/3246
// waku.relay_subscribe(&pubsub_topic()).await.map_err(|e| {
// debug!("Failed to subscribe to the Waku Node: {:?}", e);
// DeliveryServiceError::WakuSubscribeToGroupError(e)
// })?;
Ok(WakuNode { node: waku })
}
}
impl WakuNode<Running> {
pub async fn send_message(
&self,
msg: ProcessMessageToSend,
) -> Result<String, DeliveryServiceError> {
let waku_message = msg.build_waku_message()?;
let msg_id = self
.node
.relay_publish_message(&waku_message, &pubsub_topic(), None)
.await
.map_err(|e| {
debug!("Failed to relay publish the message: {:?}", e);
DeliveryServiceError::WakuPublishMessageError(e)
})?;
Ok(msg_id.to_string())
}
pub async fn connect_to_peers(
&self,
peer_addresses: Vec<Multiaddr>,
) -> Result<(), DeliveryServiceError> {
for peer_address in peer_addresses {
info!("Connecting to peer: {:?}", peer_address);
self.node
.connect(&peer_address, None)
.await
.map_err(|e| DeliveryServiceError::WakuConnectPeerError(e.to_string()))?;
info!("Connected to peer: {:?}", peer_address);
}
info!("Connected to all peers");
Ok(())
}
pub async fn listen_addresses(&self) -> Result<Vec<Multiaddr>, DeliveryServiceError> {
let addresses = self.node.listen_addresses().await.map_err(|e| {
debug!("Failed to get the listen addresses: {:?}", e);
DeliveryServiceError::WakuGetListenAddressesError(e)
})?;
Ok(addresses)
}
}
@@ -58,94 +156,8 @@ impl ProcessMessageToSend {
self.msg.clone(),
content_topic,
2,
Utc::now().timestamp() as usize,
self.app_id.clone(),
true,
))
}
}
/// Handle the message to send to the Waku Node
/// Input:
/// - msg: The message to send
///
/// Returns:
/// - msg_id: The message id of the message sent to the Waku Node
impl Message<ProcessMessageToSend> for WakuActor {
type Reply = Result<String, DeliveryServiceError>;
async fn handle(
&mut self,
msg: ProcessMessageToSend,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
let waku_message = msg.build_waku_message()?;
let msg_id = self
.node
.relay_publish_message(&waku_message, Some(pubsub_topic()), None)
.map_err(|e| {
debug!("Failed to relay publish the message: {:?}", e);
DeliveryServiceError::WakuPublishMessageError(e)
})?;
Ok(msg_id)
}
}
/// Message for actor to subscribe to a group
/// It contains the group name to subscribe to
pub struct ProcessSubscribeToGroup {
pub group_name: String,
}
/// Handle the message for actor to subscribe to a group
/// Input:
/// - group_name: The group to subscribe to
///
/// Returns:
/// - content_topics: The content topics of the group
impl Message<ProcessSubscribeToGroup> for WakuActor {
type Reply = Result<Vec<WakuContentTopic>, DeliveryServiceError>;
async fn handle(
&mut self,
msg: ProcessSubscribeToGroup,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
let content_topics = build_content_topics(&msg.group_name, GROUP_VERSION);
let content_filter = content_filter(&pubsub_topic(), &content_topics);
self.node.relay_subscribe(&content_filter).map_err(|e| {
debug!("Failed to relay subscribe to the group: {:?}", e);
DeliveryServiceError::WakuSubscribeToGroupError(e)
})?;
Ok(content_topics)
}
}
/// Message for actor to unsubscribe from a group
/// It contains the group name to unsubscribe from
pub struct ProcessUnsubscribeFromGroup {
pub group_name: String,
}
/// Handle the message for actor to unsubscribe from a group
/// Input:
/// - group_name: The group to unsubscribe from
///
/// Returns:
/// - ()
impl Message<ProcessUnsubscribeFromGroup> for WakuActor {
type Reply = Result<(), DeliveryServiceError>;
async fn handle(
&mut self,
msg: ProcessUnsubscribeFromGroup,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
let content_topics = build_content_topics(&msg.group_name, GROUP_VERSION);
let content_filter = content_filter(&pubsub_topic(), &content_topics);
self.node
.relay_unsubscribe(&content_filter)
.map_err(|e| DeliveryServiceError::WakuRelayTopicsError(e.to_string()))?;
Ok(())
}
}

View File

@@ -1,39 +1,30 @@
use ds::{
ds_waku::APP_MSG_SUBTOPIC,
waku_actor::{ProcessMessageToSend, WakuNode},
DeliveryServiceError,
};
use kameo::{
actor::pubsub::PubSub,
message::{Context, Message},
Actor,
};
use log::{error, info};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use log::info;
use tokio::sync::mpsc::channel;
use waku_bindings::WakuMessage;
use ds::{
ds_waku::{
build_content_topics, match_content_topic, setup_node_handle, APP_MSG_SUBTOPIC,
GROUP_VERSION,
},
waku_actor::{ProcessMessageToSend, ProcessSubscribeToGroup, WakuActor},
DeliveryServiceError,
};
use waku_bindings::waku_set_event_callback;
#[derive(Debug, Clone, Actor)]
pub struct ActorA {
pub struct Application {
pub app_id: String,
}
impl ActorA {
impl Application {
pub fn new() -> Self {
let app_id = uuid::Uuid::new_v4().to_string();
Self { app_id }
}
}
impl Message<WakuMessage> for ActorA {
impl Message<WakuMessage> for Application {
type Reply = Result<WakuMessage, DeliveryServiceError>;
async fn handle(
@@ -41,95 +32,73 @@ impl Message<WakuMessage> for ActorA {
msg: WakuMessage,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorA received message: {:?}", msg.timestamp());
info!("Application received message: {:?}", msg.timestamp);
Ok(msg)
}
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_waku_client() {
env_logger::init();
let group_name = "new_group".to_string();
let mut pubsub = PubSub::<WakuMessage>::new();
let (sender, _) = channel::<WakuMessage>(100);
let waku_node_default = WakuNode::new(60002)
.await
.expect("Failed to create WakuNode");
let (sender_alice, mut receiver_alice) = channel::<WakuMessage>(100);
// TODO: get node from env
let res = setup_node_handle(vec![
"/ip4/139.59.24.82/tcp/60000/p2p/16Uiu2HAm2CfeeaNiGwv88Loe417HrRbCwTFqhpDiR3wevbCcvYz2"
.to_string(),
]);
assert!(res.is_ok());
let node = res.unwrap();
let waku_node_init = WakuNode::new(60001)
.await
.expect("Failed to create WakuNode");
let uuid = uuid::Uuid::new_v4().as_bytes().to_vec();
let waku_actor = WakuActor::new(Arc::new(node));
let actor_ref = kameo::spawn(waku_actor);
let actor_a = ActorA::new();
let actor_a = Application::new();
let actor_a_ref = kameo::spawn(actor_a);
pubsub.subscribe(actor_a_ref);
let content_topics = Arc::new(Mutex::new(build_content_topics(&group_name, GROUP_VERSION)));
// let content_topics = Arc::new(Mutex::new(build_content_topics(&group_name)));
assert!(actor_ref
.ask(ProcessSubscribeToGroup {
group_name: group_name.clone(),
})
let waku_node_default = waku_node_default
.start(sender)
.await
.is_ok());
.expect("Failed to start WakuNode");
waku_set_event_callback(move |signal| {
match signal.event() {
waku_bindings::Event::WakuMessage(event) => {
let content_topic = event.waku_message().content_topic();
// Check if message belongs to a relevant topic
assert!(match_content_topic(&content_topics, content_topic));
let msg = event.waku_message().clone();
info!("msg: {:?}", msg.timestamp());
assert!(sender_alice.blocking_send(msg).is_ok());
}
let node_name = waku_node_default
.listen_addresses()
.await
.expect("Failed to get listen addresses");
let waku_node = waku_node_init
.start(sender_alice)
.await
.expect("Failed to start WakuNode");
waku_bindings::Event::Unrecognized(data) => {
error!("Unrecognized event!\n {data:?}");
}
_ => {
error!(
"Unrecognized signal!\n {:?}",
serde_json::to_string(&signal)
);
}
}
});
waku_node
.connect_to_peers(node_name)
.await
.expect("Failed to connect to peers");
let sender = tokio::spawn(async move {
for _ in 0..10 {
assert!(actor_ref
.ask(ProcessMessageToSend {
msg: format!("test_message").as_bytes().to_vec(),
subtopic: APP_MSG_SUBTOPIC.to_string(),
group_id: group_name.clone(),
app_id: uuid.clone(),
})
.await
.is_ok());
tokio::time::sleep(Duration::from_secs(1)).await;
}
info!("sender handle is finished");
});
let receiver = tokio::spawn(async move {
tokio::spawn(async move {
while let Some(msg) = receiver_alice.recv().await {
info!("msg received: {:?}", msg.timestamp());
info!("msg received from receiver_alice: {:?}", msg.timestamp);
pubsub.publish(msg).await;
}
info!("receiver handle is finished");
});
tokio::select! {
x = sender => {
info!("get from sender: {:?}", x);
}
w = receiver => {
info!("get from receiver: {:?}", w);
}
}
tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async move {
let res = waku_node
.send_message(ProcessMessageToSend {
msg: format!("test_message_1").as_bytes().to_vec(),
subtopic: APP_MSG_SUBTOPIC.to_string(),
group_id: group_name.clone(),
app_id: uuid.clone(),
})
.await;
info!("res: {:?}", res);
info!("sender handle is finished");
});
});
}

View File

@@ -56,14 +56,14 @@
{/if}
{#if rooms}
{#each rooms as room}
<div class="card bg-base-300 w-96 shadow-xl my-3" on:click={select_room(room)}>
<button class="card bg-base-300 w-96 shadow-xl my-3 w-full" on:click={() => select_room(room)}>
<div class="card-body">
<div class="flex justify-between">
<h2 class="card-title">{room}</h2>
<button class="btn btn-primary btn-md">Select Room</button>
</div>
</div>
</div>
</div>
</button>
{/each}
{/if}
</div>

View File

@@ -10,8 +10,8 @@
"sourceMap": true,
"strict": true,
"paths": {
"$lib": ["src/lib"],
"$lib/*": ["src/lib/*"]
"$lib": ["./src/lib"],
"$lib/*": ["./src/lib/*"]
}
}
// Path aliases are handled by https://kit.svelte.dev/docs/configuration#alias

View File

@@ -1,5 +1,6 @@
use kameo::actor::ActorRef;
use log::info;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
use waku_bindings::WakuMessage;
@@ -8,11 +9,11 @@ use crate::{
ws_actor::{RawWsMessage, WsAction, WsActor},
MessageToPrint,
};
use ds::waku_actor::{ProcessUnsubscribeFromGroup, WakuActor};
use ds::waku_actor::ProcessMessageToSend;
pub async fn handle_user_actions(
msg: WakuMessage,
waku_actor: ActorRef<WakuActor>,
waku_node: Sender<ProcessMessageToSend>,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
cancel_token: CancellationToken,
@@ -21,19 +22,14 @@ pub async fn handle_user_actions(
for action in actions {
match action {
UserAction::SendToWaku(msg) => {
let id = waku_actor.ask(msg).await?;
info!("Successfully publish message with id: {:?}", id);
waku_node.send(msg).await?;
}
UserAction::SendToGroup(msg) => {
info!("Send to group: {:?}", msg);
ws_actor.ask(msg).await?;
}
UserAction::RemoveGroup(group_name) => {
waku_actor
.ask(ProcessUnsubscribeFromGroup {
group_name: group_name.clone(),
})
.await?;
// TODO: remove from content topics
user_actor
.ask(ProcessLeaveGroup {
group_name: group_name.clone(),
@@ -59,7 +55,7 @@ pub async fn handle_ws_action(
msg: RawWsMessage,
ws_actor: ActorRef<WsActor>,
user_actor: ActorRef<User>,
waku_actor: ActorRef<WakuActor>,
waku_node: Sender<ProcessMessageToSend>,
) -> Result<(), Box<dyn std::error::Error>> {
let action = ws_actor.ask(msg).await?;
match action {
@@ -81,8 +77,7 @@ pub async fn handle_ws_action(
group_name: msg.group_id,
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
waku_node.send(pmt).await?;
}
WsAction::RemoveUser(user_to_ban, group_name) => {
info!("Got remove user: {:?}", &user_to_ban);
@@ -92,8 +87,7 @@ pub async fn handle_ws_action(
group_name: group_name.clone(),
})
.await?;
let id = waku_actor.ask(pmt).await?;
info!("Successfully publish message with id: {:?}", id);
waku_node.send(pmt).await?;
ws_actor
.ask(MessageToPrint {
sender: "system".to_string(),

View File

@@ -1,6 +1,6 @@
use alloy::signers::local::LocalSignerError;
use ecies::{decrypt, encrypt};
use kameo::{actor::ActorRef, error::SendError};
use kameo::error::SendError;
use libsecp256k1::{sign, verify, Message, PublicKey, SecretKey, Signature as libSignature};
use openmls::{error::LibraryError, prelude::*};
use openmls_rust_crypto::MemoryKeyStoreError;
@@ -14,12 +14,10 @@ use std::{
string::FromUtf8Error,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc::Sender;
use waku_bindings::{WakuContentTopic, WakuMessage};
use ds::{
waku_actor::{ProcessMessageToSend, ProcessSubscribeToGroup, WakuActor},
DeliveryServiceError,
};
use ds::{waku_actor::ProcessMessageToSend, DeliveryServiceError};
pub mod action_handlers;
pub mod group_actor;
@@ -29,7 +27,7 @@ pub mod user_app_instance;
pub mod ws_actor;
pub struct AppState {
pub waku_actor: ActorRef<WakuActor>,
pub waku_node: Sender<ProcessMessageToSend>,
pub rooms: Mutex<HashSet<String>>,
pub content_topics: Arc<Mutex<Vec<WakuContentTopic>>>,
pub pubsub: tokio::sync::broadcast::Sender<WakuMessage>,
@@ -251,14 +249,24 @@ pub enum UserError {
#[error("Failed to parse signer: {0}")]
SignerParsingError(#[from] LocalSignerError),
#[error("Failed to subscribe to group: {0}")]
KameoSubscribeToGroupError(#[from] SendError<ProcessSubscribeToGroup, DeliveryServiceError>),
#[error("Failed to publish message: {0}")]
KameoPublishMessageError(#[from] SendError<ProcessMessageToSend, DeliveryServiceError>),
#[error("Failed to create group: {0}")]
KameoCreateGroupError(String),
#[error("Failed to send message to user: {0}")]
KameoSendMessageError(String),
#[error("Failed to send message to waku: {0}")]
WakuSendMessageError(#[from] tokio::sync::mpsc::error::SendError<ProcessMessageToSend>),
}
/// Check if a content topic exists in a list of topics or if the list is empty
pub fn match_content_topic(
content_topics: &Arc<Mutex<Vec<WakuContentTopic>>>,
topic: &WakuContentTopic,
) -> bool {
let locked_topics = content_topics.lock().unwrap();
locked_topics.is_empty() || locked_topics.iter().any(|t| t == topic)
}
#[cfg(test)]

View File

@@ -14,21 +14,19 @@ use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use tower_http::cors::{Any, CorsLayer};
use waku_bindings::WakuMessage;
use waku_bindings::{Multiaddr, WakuMessage};
use de_mls::{
action_handlers::{handle_user_actions, handle_ws_action},
match_content_topic,
user_app_instance::create_user_instance,
ws_actor::{RawWsMessage, WsAction, WsActor},
AppState, Connection,
};
use ds::{
ds_waku::{handle_waku_event, setup_node_handle},
waku_actor::WakuActor,
};
use ds::waku_actor::{ProcessMessageToSend, WakuNode};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -37,20 +35,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.map(|val| val.parse::<u16>())
.unwrap_or(Ok(3000))?;
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let node_port = std::env::var("NODE_PORT").expect("NODE_PORT is not set");
let peer_addresses = std::env::var("PEER_ADDRESSES")
.map(|val| {
val.split(",")
.map(|addr| addr.parse::<Multiaddr>().unwrap())
.collect()
})
.expect("PEER_ADDRESSES is not set");
let node_name = std::env::var("NODE")?;
let node = setup_node_handle(vec![node_name])?;
let waku_actor = kameo::actor::spawn(WakuActor::new(Arc::new(node)));
let (tx, _) = tokio::sync::broadcast::channel(100);
let app_state = Arc::new(AppState {
waku_actor,
rooms: Mutex::new(HashSet::new()),
content_topics: Arc::new(Mutex::new(Vec::new())),
pubsub: tx.clone(),
});
let content_topics = Arc::new(Mutex::new(Vec::new()));
let (waku_sender, mut waku_receiver) = channel::<WakuMessage>(100);
handle_waku_event(waku_sender, app_state.content_topics.clone()).await;
let (sender, mut reciever) = channel::<ProcessMessageToSend>(100);
let (tx, _) = tokio::sync::broadcast::channel(100);
let app_state = Arc::new(AppState {
waku_node: sender,
rooms: Mutex::new(HashSet::new()),
content_topics,
pubsub: tx.clone(),
});
info!("App state initialized");
let recv_messages = tokio::spawn(async move {
info!("Running recv messages from waku");
@@ -58,7 +64,78 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = tx.send(msg);
}
});
info!("Waku receiver initialized");
let server_task = tokio::spawn(async move {
info!("Running server");
run_server(app_state, addr)
.await
.expect("Failed to run server")
});
info!("Starting waku node");
tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async move {
run_waku_node(node_port, peer_addresses, waku_sender, &mut reciever).await
})
})?;
tokio::select! {
result = recv_messages => {
if let Err(w) = result {
error!("Error receiving messages from waku: {}", w);
}
}
result = server_task => {
if let Err(e) = result {
error!("Error hosting server: {}", e);
}
}
}
Ok(())
}
async fn run_waku_node(
node_port: String,
peer_addresses: Vec<Multiaddr>,
waku_sender: Sender<WakuMessage>,
reciever: &mut Receiver<ProcessMessageToSend>,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Initializing waku node");
let waku_node_init = WakuNode::new(node_port.parse::<usize>().unwrap())
.await
.expect("Failed to initialize waku node");
let waku_node = waku_node_init
.start(waku_sender)
.await
.expect("Failed to start waku node");
info!("Waku node started");
info!("Connecting to peers");
waku_node
.connect_to_peers(peer_addresses.to_vec())
.await
.expect("Failed to connect to peers");
info!("Waku node connected to peers");
info!("Waiting for message to send to waku");
while let Some(msg) = reciever.recv().await {
info!("Received message to send to waku");
let id = waku_node
.send_message(msg)
.await
.expect("Failed to send message to waku");
info!("Successfully publish message with id: {:?}", id);
}
Ok(())
}
async fn run_server(
app_state: Arc<AppState>,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(vec![Method::GET]);
@@ -70,17 +147,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_state(app_state)
.layer(cors);
println!("Hosted on {:?}", addr);
info!("Hosted on {:?}", addr);
let res = axum::Server::bind(&addr).serve(app.into_make_service());
tokio::select! {
Err(x) = res => {
error!("Error hosting server: {}", x);
}
Err(w) = recv_messages => {
error!("Error receiving messages from waku: {}", w);
}
}
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
Ok(())
}
@@ -89,6 +160,7 @@ async fn handler(ws: WebSocketUpgrade, State(state): State<Arc<AppState>>) -> im
}
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
info!("Handling socket");
let (ws_sender, mut ws_receiver) = socket.split();
let ws_actor = kameo::spawn(WsActor::new(ws_sender));
let mut main_loop_connection = None::<Connection>;
@@ -131,9 +203,24 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
let mut recv_messages_waku = tokio::spawn(async move {
info!("Running recv messages from waku for current user");
while let Ok(msg) = user_waku_receiver.recv().await {
let content_topic = msg.content_topic.clone();
// Check if message belongs to a relevant topic
info!("Content topic: {:?}", content_topic);
info!(
"Content topics: {:?}",
state_clone.content_topics.lock().unwrap()
);
if !match_content_topic(&state_clone.content_topics, &content_topic) {
error!("Content topic not match: {:?}", content_topic);
return;
};
info!(
"Received message from waku that matches content topic: {:?}",
msg.timestamp
);
let res = handle_user_actions(
msg,
state_clone.waku_actor.clone(),
state_clone.waku_node.clone(),
ws_actor_clone.clone(),
user_actor_clone.clone(),
cancel_token_clone.clone(),
@@ -154,7 +241,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
RawWsMessage { message: text },
ws_actor.clone(),
user_ref_clone.clone(),
state.waku_actor.clone(),
state.waku_node.clone(),
)
.await;
if let Err(e) = res {

View File

@@ -208,6 +208,7 @@ impl User {
WelcomeMessageType::GroupAnnouncement => {
let app_id = group.app_id();
if group.is_admin() || group.is_kp_shared() {
info!("Its admin or key package already shared");
Ok(vec![UserAction::DoNothing])
} else {
info!(
@@ -301,14 +302,15 @@ impl User {
&mut self,
msg: WakuMessage,
) -> Result<Vec<UserAction>, UserError> {
let ct = msg.content_topic();
let ct = msg.content_topic.clone();
let group_name = ct.application_name.to_string();
let group = match self.groups.get(&group_name) {
Some(g) => g,
None => return Err(UserError::GroupNotFoundError(group_name)),
};
let app_id = group.app_id();
if msg.meta() == app_id {
if msg.meta == app_id {
info!("Message is from the same app, skipping");
return Ok(vec![UserAction::DoNothing]);
}
let ct = ct.content_topic_name.to_string();

View File

@@ -1,11 +1,11 @@
use alloy::signers::local::PrivateKeySigner;
use ds::ds_waku::build_content_topics;
use kameo::actor::ActorRef;
use log::{error, info};
use std::{str::FromStr, sync::Arc, time::Duration};
use crate::user::{ProcessAdminMessage, ProcessCreateGroup, User};
use crate::{AppState, Connection, UserError};
use ds::waku_actor::ProcessSubscribeToGroup;
pub async fn create_user_instance(
connection: Connection,
@@ -25,12 +25,8 @@ pub async fn create_user_instance(
.await
.map_err(|e| UserError::KameoCreateGroupError(e.to_string()))?;
let mut content_topics = app_state
.waku_actor
.ask(ProcessSubscribeToGroup {
group_name: group_name.clone(),
})
.await?;
let mut content_topics = build_content_topics(&group_name);
info!("Building content topics: {:?}", content_topics);
app_state
.content_topics
.lock()
@@ -44,7 +40,6 @@ pub async fn create_user_instance(
);
let user_clone = user_ref.clone();
let group_name_clone = group_name.clone();
let node_clone = app_state.waku_actor.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
@@ -56,8 +51,7 @@ pub async fn create_user_instance(
})
.await
.map_err(|e| UserError::KameoSendMessageError(e.to_string()))?;
let id = node_clone.ask(msg).await?;
info!("Successfully publish admin message with id: {:?}", id);
app_state.waku_node.send(msg).await?;
Ok::<(), UserError>(())
}
.await;