integrate rocksdb to gateway service

This commit is contained in:
ghassmo
2021-05-28 16:27:35 +03:00
parent 0fbdd15454
commit 6503f23c22

View File

@@ -1,9 +1,10 @@
use async_std::sync::{Arc, Mutex};
use std::convert::TryInto;
use std::net::SocketAddr;
use std::path::Path;
use std::convert::From;
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
use crate::{Error, Result};
use crate::{Error, Result, slabstore::SlabStore, serial::serialize, serial::deserialize};
use async_executor::Executor;
use log::*;
@@ -18,20 +19,23 @@ enum GatewayCommand {
}
pub struct GatewayService {
slabs: Mutex<Slabs>,
slabstore: SlabStore,
addr: SocketAddr,
publisher: Mutex<Publisher>,
}
impl GatewayService {
pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Arc<GatewayService> {
let slabs = Mutex::new(vec![]);
pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result<Arc<GatewayService>> {
let publisher = Mutex::new(Publisher::new(pub_addr));
Arc::new(GatewayService {
slabs,
let slabstore = SlabStore::new(Path::new("../slabstore.db"))?;
Ok(Arc::new(GatewayService {
slabstore,
addr,
publisher,
})
}))
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
@@ -56,15 +60,22 @@ impl GatewayService {
send_queue: async_channel::Sender<Reply>,
recv_queue: async_channel::Receiver<Request>,
) -> Result<()> {
let data = vec![];
loop {
match recv_queue.recv().await {
Ok(request) => {
// TODO spawn new task when receive new msg
match request.get_command() {
0 => {
// PUTSLAB
let slab = request.get_payload();
self.slabs.lock().await.push(slab.clone());
// add to slabstore
self.slabstore.put(slab.clone())?;
// send reply
let reply = Reply::from(&request, 0, vec![]);
send_queue.send(reply).await?;
// publish to all subscribes
self.publisher.lock().await.publish(slab).await?;
@@ -72,10 +83,27 @@ impl GatewayService {
info!("received putslab msg");
}
1 => {
let index = request.get_payload();
let slab = self.slabstore.get(index)?;
let mut payload = vec![];
if let Some(sb) = slab {
payload = sb;
}
let reply = Reply::from(&request, 0, payload);
send_queue.send(reply).await?;
// GETSLAB
info!("received getslab msg");
}
2 => {
let index = self.slabstore.get_last_index_as_bytes()?;
let reply = Reply::from(&request, 0, index);
send_queue.send(reply).await?;
// GETLASTINDEX
info!("received getlastindex msg");
}
@@ -83,8 +111,6 @@ impl GatewayService {
return Err(Error::ServicesError("received wrong command"));
}
}
let rep = Reply::from(&request, 0, data.clone());
send_queue.send(rep.into()).await?;
}
Err(_) => {
break;
@@ -115,9 +141,9 @@ impl GatewayClient {
Ok(Arc::new(Mutex::new(subscriber)))
}
pub async fn get_slab(&mut self, index: u32) -> Result<Vec<u8>> {
pub async fn get_slab(&mut self, index: u64) -> Result<Vec<u8>> {
self.protocol
.request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec())
.request(GatewayCommand::GetSlab as u8, serialize(&index))
.await
}
@@ -127,16 +153,16 @@ impl GatewayClient {
.await?;
Ok(())
}
pub async fn get_last_index(&mut self) -> Result<u32> {
pub async fn get_last_index(&mut self) -> Result<u64> {
let rep = self
.protocol
.request(GatewayCommand::GetLastIndex as u8, vec![])
.await?;
let rep: [u8; 4] = rep.try_into().map_err(|_| crate::Error::TryIntoError)?;
Ok(u32::from_be_bytes(rep))
Ok(deserialize(&rep)?)
}
}
pub async fn fetch_slabs_loop(
subscriber: Arc<Mutex<Subscriber>>,
slabs: Arc<Mutex<Slabs>>,
@@ -151,3 +177,4 @@ pub async fn fetch_slabs_loop(
slabs.lock().await.push(slab);
}
}