create GatewaySlabsSubscriber type which receive new slabs from both syncing function and the subscriber

This commit is contained in:
ghassmo
2021-06-09 04:25:24 +03:00
parent 067e942f88
commit 2c93fc403c
2 changed files with 37 additions and 7 deletions

View File

@@ -8,7 +8,7 @@ use crate::{serial::deserialize, serial::serialize, Error, Result};
use async_executor::Executor;
use log::*;
pub type Slabs = Vec<Vec<u8>>;
pub type GatewaySlabsSubscriber = async_channel::Receiver<Slab>;
#[repr(u8)]
enum GatewayError {
@@ -179,6 +179,8 @@ impl GatewayService {
pub struct GatewayClient {
protocol: ReqProtocol,
slabstore: Arc<SlabStore>,
gateway_slabs_sub_s: async_channel::Sender<Slab>,
gateway_slabs_sub_rv: GatewaySlabsSubscriber,
}
impl GatewayClient {
@@ -187,9 +189,13 @@ impl GatewayClient {
let slabstore = SlabStore::new(rocks)?;
let (gateway_slabs_sub_s, gateway_slabs_sub_rv) = async_channel::unbounded::<Slab>();
Ok(GatewayClient {
protocol,
slabstore,
gateway_slabs_sub_s,
gateway_slabs_sub_rv,
})
}
@@ -221,14 +227,16 @@ impl GatewayClient {
Ok(last_index)
}
pub async fn get_slab(&mut self, index: u64) -> Result<Option<Vec<u8>>> {
pub async fn get_slab(&mut self, index: u64) -> Result<Option<Slab>> {
let rep = self
.protocol
.request(GatewayCommand::GetSlab as u8, serialize(&index))
.await?;
if let Some(slab) = rep {
self.slabstore.put(deserialize(&slab)?)?;
let slab: Slab = deserialize(&slab)?;
self.gateway_slabs_sub_s.send(slab.clone()).await?;
self.slabstore.put(slab.clone())?;
return Ok(Some(slab));
}
Ok(None)
@@ -267,9 +275,32 @@ impl GatewayClient {
self.slabstore.clone()
}
pub async fn start_subscriber(sub_addr: SocketAddr) -> Result<Subscriber> {
pub async fn start_subscriber(
&self,
sub_addr: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<GatewaySlabsSubscriber> {
let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT"));
subscriber.start().await?;
Ok(subscriber)
executor
.spawn(Self::subscribe_loop(
subscriber,
self.slabstore.clone(),
self.gateway_slabs_sub_s.clone(),
))
.detach();
Ok(self.gateway_slabs_sub_rv.clone())
}
async fn subscribe_loop(
mut subscriber: Subscriber,
slabstore: Arc<SlabStore>,
gateway_slabs_sub_s: async_channel::Sender<Slab>,
) -> Result<()> {
loop {
let slab = subscriber.fetch::<Slab>().await?;
gateway_slabs_sub_s.send(slab.clone()).await?;
slabstore.put(slab)?;
}
}
}

View File

@@ -2,6 +2,5 @@ pub mod gateway;
pub mod options;
pub mod reqrep;
pub use gateway::{GatewayClient, GatewayService};
pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber};
pub use options::{ClientProgramOptions, ProgramOptions};
pub use reqrep::Subscriber;