From 2c93fc403c12d5ad6aef2389a7f2b614378a7b79 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 9 Jun 2021 04:25:24 +0300 Subject: [PATCH] create GatewaySlabsSubscriber type which receive new slabs from both syncing function and the subscriber --- src/service/gateway.rs | 41 ++++++++++++++++++++++++++++++++++++----- src/service/mod.rs | 3 +-- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 35d5f5d72..e3e98df0e 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -8,7 +8,7 @@ use crate::{serial::deserialize, serial::serialize, Error, Result}; use async_executor::Executor; use log::*; -pub type Slabs = Vec>; +pub type GatewaySlabsSubscriber = async_channel::Receiver; #[repr(u8)] enum GatewayError { @@ -179,6 +179,8 @@ impl GatewayService { pub struct GatewayClient { protocol: ReqProtocol, slabstore: Arc, + gateway_slabs_sub_s: async_channel::Sender, + gateway_slabs_sub_rv: GatewaySlabsSubscriber, } impl GatewayClient { @@ -187,9 +189,13 @@ impl GatewayClient { let slabstore = SlabStore::new(rocks)?; + let (gateway_slabs_sub_s, gateway_slabs_sub_rv) = async_channel::unbounded::(); + Ok(GatewayClient { protocol, slabstore, + gateway_slabs_sub_s, + gateway_slabs_sub_rv, }) } @@ -221,14 +227,16 @@ impl GatewayClient { Ok(last_index) } - pub async fn get_slab(&mut self, index: u64) -> Result>> { + pub async fn get_slab(&mut self, index: u64) -> Result> { let rep = self .protocol .request(GatewayCommand::GetSlab as u8, serialize(&index)) .await?; if let Some(slab) = rep { - self.slabstore.put(deserialize(&slab)?)?; + let slab: Slab = deserialize(&slab)?; + self.gateway_slabs_sub_s.send(slab.clone()).await?; + self.slabstore.put(slab.clone())?; return Ok(Some(slab)); } Ok(None) @@ -267,9 +275,32 @@ impl GatewayClient { self.slabstore.clone() } - pub async fn start_subscriber(sub_addr: SocketAddr) -> Result { + pub async fn start_subscriber( + &self, + sub_addr: SocketAddr, + executor: Arc>, + ) -> Result { let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT")); subscriber.start().await?; - Ok(subscriber) + executor + .spawn(Self::subscribe_loop( + subscriber, + self.slabstore.clone(), + self.gateway_slabs_sub_s.clone(), + )) + .detach(); + Ok(self.gateway_slabs_sub_rv.clone()) + } + + async fn subscribe_loop( + mut subscriber: Subscriber, + slabstore: Arc, + gateway_slabs_sub_s: async_channel::Sender, + ) -> Result<()> { + loop { + let slab = subscriber.fetch::().await?; + gateway_slabs_sub_s.send(slab.clone()).await?; + slabstore.put(slab)?; + } } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 88558a7d2..dc737941a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,6 +2,5 @@ pub mod gateway; pub mod options; pub mod reqrep; -pub use gateway::{GatewayClient, GatewayService}; +pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber}; pub use options::{ClientProgramOptions, ProgramOptions}; -pub use reqrep::Subscriber;