From 796c41983fcee395e6b71049e14f36ca67138858 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Tue, 28 Sep 2021 17:59:40 +0300 Subject: [PATCH] Bridge: implement listen() function --- src/service/bridge.rs | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/service/bridge.rs b/src/service/bridge.rs index 05819a773..34eb8fa1e 100644 --- a/src/service/bridge.rs +++ b/src/service/bridge.rs @@ -1,6 +1,9 @@ -use crate::Result; +use crate::{Error, Result}; use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; +use log::*; use async_std::sync::{Arc, Mutex}; use std::collections::HashMap; @@ -43,21 +46,24 @@ pub struct TokenSubscribtion { pub public_key: String, } +#[derive(Debug)] pub struct TokenNotification { + pub network: String, + pub asset_id: jubjub::Fr, pub secret_key: Vec, pub received_balance: u64, } pub struct Bridge { clients: Mutex>>, - //notifiers: Mutex, async_channel::Receiver>>, + notifiers: FuturesUnordered>, } impl Bridge { pub fn new() -> Arc { Arc::new(Self { clients: Mutex::new(HashMap::new()), - //notifiers: Mutex::new(HashMap::new()), + notifiers: FuturesUnordered::new(), }) } @@ -66,36 +72,45 @@ impl Bridge { network: String, client: Arc, ) -> Result<()> { - //let notifier = client.get_notifier().await?; + let client2 = client.clone(); + let notifier = client2.get_notifier().await?; self.clients .lock() .await .insert(network.clone(), client.clone()); - // self.notifiers - // .lock() - // .await - // .insert(asset_id, notifier.clone()); + self.notifiers.push(notifier.clone()); Ok(()) } - pub async fn listen(self: Arc) {} + pub async fn listen(self: Arc) -> Option> { + debug!(target: "BRIDGE", "Start listening to new notification"); + self.notifiers + .iter() + .map(|n| n.recv()) + .collect::>>() + .next() + .await + .map(|o| o.map_err(|e| Error::from(e))) + } pub async fn subscribe(self: Arc) -> BridgeSubscribtion { + debug!(target: "BRIDGE", "Start new subscription"); let (sender, req) = async_channel::unbounded(); let (rep, receiver) = async_channel::unbounded(); - smol::spawn(self.listen_for_new_subscribtion(req.clone(), rep.clone())).detach(); + smol::spawn(self.listen_for_new_subscription(req.clone(), rep.clone())).detach(); BridgeSubscribtion { sender, receiver } } - async fn listen_for_new_subscribtion( + async fn listen_for_new_subscription( self: Arc, req: async_channel::Receiver, rep: async_channel::Sender, ) -> Result<()> { + debug!(target: "BRIDGE", "Listen for new subscription"); let req = req.recv().await?; let network = req.network; @@ -109,7 +124,6 @@ impl Bridge { return Ok(()); } - let client: Arc; // avoid deadlock {