From a690df7d4665c69b3e7c8efcb00455a60bcbaccd Mon Sep 17 00:00:00 2001 From: narodnik Date: Sat, 6 Mar 2021 07:38:44 +0100 Subject: [PATCH] use downcast Arc trick to simplify code --- src/net/message_subscriber.rs | 40 +++++++++++++++++------------------ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/net/message_subscriber.rs b/src/net/message_subscriber.rs index 91bc2b323..eb5151137 100644 --- a/src/net/message_subscriber.rs +++ b/src/net/message_subscriber.rs @@ -151,7 +151,7 @@ pub trait Message2: 'static + Decodable + Send + Sync { pub struct MessageSubscription2 { id: MessageSubscriptionID, recv_queue: async_channel::Receiver>, - parent: Arc> + parent: Arc> } impl MessageSubscription2 { @@ -176,9 +176,7 @@ impl MessageSubscription2 { trait MessageDispatcherInterface: Sync { async fn notify(&self, payload: Vec); - async fn unsubscribe(&self, sub_id: MessageSubscriptionID); - - fn as_any(&self) -> &dyn Any; + fn as_any(self: Arc) -> Arc; } struct MessageDispatcher { @@ -197,7 +195,7 @@ impl MessageDispatcher { rng.gen() } - pub async fn subscribe(&self, self_arc: Arc>) -> MessageSubscription2 { + pub async fn subscribe(self: Arc) -> MessageSubscription2 { let (sender, recvr) = async_channel::unbounded(); let sub_id = Self::random_id(); self.subs.lock().await.insert(sub_id, sender); @@ -205,10 +203,14 @@ impl MessageDispatcher { MessageSubscription2 { id: sub_id, recv_queue: recvr, - parent: self_arc + parent: self } } + async fn unsubscribe(&self, sub_id: MessageSubscriptionID) { + self.subs.lock().await.remove(&sub_id); + } + async fn notify_all(&self, message: M) { let message = Arc::new(message); let mut garbage_ids = Vec::new(); @@ -251,11 +253,7 @@ impl MessageDispatcherInterface for MessageDispatcher { } } - async fn unsubscribe(&self, sub_id: MessageSubscriptionID) { - self.subs.lock().await.remove(&sub_id); - } - - fn as_any(&self) -> &dyn Any { + fn as_any(self: Arc) -> Arc { self } } @@ -290,7 +288,7 @@ impl Decodable for MyVersionMessage { } struct MessageSubsystem { - dispatchers: Mutex>>> + dispatchers: Mutex>> } impl MessageSubsystem { @@ -301,7 +299,7 @@ impl MessageSubsystem { pub async fn add_dispatch(&self) { self.dispatchers.lock().await.insert( M::name(), - Arc::new(Box::new(MessageDispatcher::::new())), + Arc::new(MessageDispatcher::::new()), ); } @@ -313,17 +311,15 @@ impl MessageSubsystem { .cloned(); let sub = match dispatcher { - Some(dispatcher_arc) => { - let dispatcher: &MessageDispatcher = match dispatcher_arc + Some(dispatcher) => { + let dispatcher: Arc> = dispatcher .as_any() - .downcast_ref::>() { - Some(dispatcher) => dispatcher, - None => panic!("Multiple messages registered with different names"), - }; + >>().expect( + "Multiple messages registered with different names"); - dispatcher.subscribe(dispatcher_arc.clone()).await + dispatcher.subscribe().await } None => { // normall return failure here @@ -376,5 +372,7 @@ pub async fn doteste() { // 1. do a get easy let msg2 = sub.receive().await; println!("{}", msg2.x); + + sub.unsubscribe().await; }