diff --git a/src/net/message_subscriber.rs b/src/net/message_subscriber.rs index eb5151137..d66c213f7 100644 --- a/src/net/message_subscriber.rs +++ b/src/net/message_subscriber.rs @@ -1,15 +1,15 @@ -use std::io; -use log::*; -use std::io::Cursor; -use async_trait::async_trait; use async_std::sync::Mutex; +use async_trait::async_trait; +use log::*; use rand::Rng; use std::any::Any; use std::collections::HashMap; +use std::io; +use std::io::Cursor; use std::sync::Arc; use crate::error::Result; -use crate::net::error::{NetResult, NetError}; +use crate::net::error::{NetError, NetResult}; use crate::net::messages::{Message, PacketType}; use crate::serial::Decodable; use crate::serial::Encodable; @@ -151,15 +151,13 @@ pub trait Message2: 'static + Decodable + Send + Sync { pub struct MessageSubscription2 { id: MessageSubscriptionID, recv_queue: async_channel::Receiver>, - parent: Arc> + parent: Arc>, } impl MessageSubscription2 { pub async fn receive(&self) -> Arc { match self.recv_queue.recv().await { - Ok(message) => { - message - } + Ok(message) => message, Err(err) => { panic!("MessageSubscription::receive() recv_queue failed! {}", err); } @@ -203,7 +201,7 @@ impl MessageDispatcher { MessageSubscription2 { id: sub_id, recv_queue: recvr, - parent: self + parent: self, } } @@ -244,9 +242,7 @@ impl MessageDispatcherInterface for MessageDispatcher { // send down the pipes let cursor = Cursor::new(payload); match M::decode(cursor) { - Ok(message) => { - self.notify_all(message).await - } + Ok(message) => self.notify_all(message).await, Err(err) => { error!("Unable to decode data. Dropping...: {}", err); } @@ -259,7 +255,7 @@ impl MessageDispatcherInterface for MessageDispatcher { } struct MyVersionMessage { - x: u32 + x: u32, } impl Message2 for MyVersionMessage { @@ -288,23 +284,26 @@ impl Decodable for MyVersionMessage { } struct MessageSubsystem { - dispatchers: Mutex>> + dispatchers: Mutex>>, } impl MessageSubsystem { pub fn new() -> Self { - MessageSubsystem { dispatchers: Mutex::new(HashMap::new()) } + MessageSubsystem { + dispatchers: Mutex::new(HashMap::new()), + } } pub async fn add_dispatch(&self) { - self.dispatchers.lock().await.insert( - M::name(), - Arc::new(MessageDispatcher::::new()), - ); + self.dispatchers + .lock() + .await + .insert(M::name(), Arc::new(MessageDispatcher::::new())); } pub async fn subscribe(&self) -> NetResult> { - let dispatcher = self.dispatchers + let dispatcher = self + .dispatchers .lock() .await .get(MyVersionMessage::name()) @@ -314,10 +313,8 @@ impl MessageSubsystem { Some(dispatcher) => { let dispatcher: Arc> = dispatcher .as_any() - .downcast::>().expect( - "Multiple messages registered with different names"); + .downcast::>() + .expect("Multiple messages registered with different names"); dispatcher.subscribe().await } @@ -332,11 +329,7 @@ impl MessageSubsystem { } pub async fn trigger(&self, name: &str, data: Vec) { - let dispatcher = self.dispatchers - .lock() - .await - .get(name) - .cloned(); + let dispatcher = self.dispatchers.lock().await.get(name).cloned(); match dispatcher { Some(dispatcher) => { @@ -349,7 +342,7 @@ impl MessageSubsystem { pub async fn doteste() { println!("hello"); - + let subsystem = MessageSubsystem::new(); subsystem.add_dispatch::().await; @@ -375,4 +368,3 @@ pub async fn doteste() { sub.unsubscribe().await; } -