sync slabs in gateway client

This commit is contained in:
ghassmo
2021-05-28 18:21:17 +03:00
parent 16a11a6c52
commit 14254abef6
2 changed files with 35 additions and 18 deletions

View File

@@ -3,7 +3,7 @@ use async_std::sync::{Arc, Mutex};
use easy_parallel::Parallel;
use drk::service::{fetch_slabs_loop, GatewayClient};
use drk::Result;
use drk::{slab::Slab, Result};
async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
let mut client = GatewayClient::new("127.0.0.1:3333".parse()?)?;
@@ -17,18 +17,18 @@ async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
println!("subscriber ready");
// TODO sync new slab with slabstore
let fetch_loop_task = executor.spawn(fetch_slabs_loop(subscriber.clone(), slabs.clone()));
println!("send put slab");
client.put_slab(vec![0, 0, 0, 0]).await?;
let slab = Slab::new("testcoin".to_string(), vec![0,0,0,0]);
client.put_slab(slab).await?;
println!("send get last index");
let index = client.get_last_index().await?;
println!("index: {}", index);
println!("send get slab");
let x = client.get_slab(index).await?;
println!("{:?}", x);
// println!("send get slab");
// let x = client.get_slab(1).await?;
// println!("{:?}", x);
fetch_loop_task.cancel().await;

View File

@@ -4,7 +4,7 @@ use std::net::SocketAddr;
use std::path::Path;
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
use crate::{serial::deserialize, serial::serialize, slabstore::SlabStore, Error, Result};
use crate::{serial::deserialize, serial::serialize, slabstore::SlabStore, Error, Result, slab::Slab};
use async_executor::Executor;
use log::*;
@@ -28,7 +28,7 @@ impl GatewayService {
pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result<Arc<GatewayService>> {
let publisher = Mutex::new(Publisher::new(pub_addr));
let slabstore = SlabStore::new(Path::new("../slabstore.db"))?;
let slabstore = SlabStore::new(Path::new("slabstore.db"))?;
Ok(Arc::new(GatewayService {
slabstore,
@@ -74,8 +74,7 @@ impl GatewayService {
let reply = Reply::from(&request, 0, vec![]);
send_queue.send(reply).await?;
// publish to all subscribes
self.publisher.lock().await.publish(slab).await?;
// publish to all subscribes self.publisher.lock().await.publish(slab).await?;
info!("received putslab msg");
}
@@ -126,7 +125,7 @@ impl GatewayClient {
pub fn new(addr: SocketAddr) -> Result<GatewayClient> {
let protocol = ReqProtocol::new(addr);
let slabstore = SlabStore::new(Path::new("slabstore.db"))?;
let slabstore = SlabStore::new(Path::new("slabstore_client.db"))?;
Ok(GatewayClient {
protocol,
@@ -137,6 +136,16 @@ impl GatewayClient {
pub async fn start(&mut self) -> Result<()> {
self.protocol.start().await?;
// start syncing
let local_last_index = self.slabstore.get_last_index()?;
let last_index = self.get_last_index().await?;
if last_index > 0 {
for index in (local_last_index + 1)..(last_index + 1){
self.get_slab(index).await?;
}
}
Ok(())
}
@@ -147,17 +156,25 @@ impl GatewayClient {
}
pub async fn get_slab(&mut self, index: u64) -> Result<Vec<u8>> {
self.protocol
let slab = self.protocol
.request(GatewayCommand::GetSlab as u8, serialize(&index))
.await
.await?;
self.slabstore.put(slab.clone())?;
Ok(slab)
}
pub async fn put_slab(&mut self, data: Vec<u8>) -> Result<()> {
pub async fn put_slab(&mut self, mut slab: Slab) -> Result<()> {
let last_index = self.get_last_index().await?;
slab.set_index(last_index + 1);
let slab = serialize(&slab);
self.protocol
.request(GatewayCommand::PutSlab as u8, data.clone())
.request(GatewayCommand::PutSlab as u8, slab.clone())
.await?;
Ok(())
}
pub async fn get_last_index(&mut self) -> Result<u64> {
let rep = self
.protocol