Bridge: implement listen() function

This commit is contained in:
ghassmo
2021-09-28 17:59:40 +03:00
parent c6e4e3f3b4
commit 796c41983f

View File

@@ -1,6 +1,9 @@
use crate::Result;
use crate::{Error, Result};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use log::*;
use async_std::sync::{Arc, Mutex};
use std::collections::HashMap;
@@ -43,21 +46,24 @@ pub struct TokenSubscribtion {
pub public_key: String,
}
#[derive(Debug)]
pub struct TokenNotification {
pub network: String,
pub asset_id: jubjub::Fr,
pub secret_key: Vec<u8>,
pub received_balance: u64,
}
pub struct Bridge {
clients: Mutex<HashMap<String, Arc<dyn NetworkClient + Send + Sync>>>,
//notifiers: Mutex<HashMap<Vec<u8>, async_channel::Receiver<TokenNotification>>>,
notifiers: FuturesUnordered<async_channel::Receiver<TokenNotification>>,
}
impl Bridge {
pub fn new() -> Arc<Self> {
Arc::new(Self {
clients: Mutex::new(HashMap::new()),
//notifiers: Mutex::new(HashMap::new()),
notifiers: FuturesUnordered::new(),
})
}
@@ -66,36 +72,45 @@ impl Bridge {
network: String,
client: Arc<dyn NetworkClient + Send + Sync>,
) -> Result<()> {
//let notifier = client.get_notifier().await?;
let client2 = client.clone();
let notifier = client2.get_notifier().await?;
self.clients
.lock()
.await
.insert(network.clone(), client.clone());
// self.notifiers
// .lock()
// .await
// .insert(asset_id, notifier.clone());
self.notifiers.push(notifier.clone());
Ok(())
}
pub async fn listen(self: Arc<Self>) {}
pub async fn listen(self: Arc<Self>) -> Option<Result<TokenNotification>> {
debug!(target: "BRIDGE", "Start listening to new notification");
self.notifiers
.iter()
.map(|n| n.recv())
.collect::<FuturesUnordered<async_channel::Recv<TokenNotification>>>()
.next()
.await
.map(|o| o.map_err(|e| Error::from(e)))
}
pub async fn subscribe(self: Arc<Self>) -> BridgeSubscribtion {
debug!(target: "BRIDGE", "Start new subscription");
let (sender, req) = async_channel::unbounded();
let (rep, receiver) = async_channel::unbounded();
smol::spawn(self.listen_for_new_subscribtion(req.clone(), rep.clone())).detach();
smol::spawn(self.listen_for_new_subscription(req.clone(), rep.clone())).detach();
BridgeSubscribtion { sender, receiver }
}
async fn listen_for_new_subscribtion(
async fn listen_for_new_subscription(
self: Arc<Self>,
req: async_channel::Receiver<BridgeRequests>,
rep: async_channel::Sender<BridgeResponse>,
) -> Result<()> {
debug!(target: "BRIDGE", "Listen for new subscription");
let req = req.recv().await?;
let network = req.network;
@@ -109,7 +124,6 @@ impl Bridge {
return Ok(());
}
let client: Arc<dyn NetworkClient + Send + Sync>;
// avoid deadlock
{