diff --git a/src/bin/demowallet.rs b/src/bin/demowallet.rs index 99d1f016b..fdf254a5f 100644 --- a/src/bin/demowallet.rs +++ b/src/bin/demowallet.rs @@ -3,7 +3,7 @@ use async_std::sync::{Arc, Mutex}; use easy_parallel::Parallel; use drk::service::{fetch_slabs_loop, GatewayClient}; -use drk::Result; +use drk::{slab::Slab, Result}; async fn start(executor: Arc>) -> Result<()> { let mut client = GatewayClient::new("127.0.0.1:3333".parse()?)?; @@ -17,18 +17,18 @@ async fn start(executor: Arc>) -> Result<()> { println!("subscriber ready"); + // TODO sync new slab with slabstore let fetch_loop_task = executor.spawn(fetch_slabs_loop(subscriber.clone(), slabs.clone())); + + println!("send put slab"); - client.put_slab(vec![0, 0, 0, 0]).await?; + let slab = Slab::new("testcoin".to_string(), vec![0,0,0,0]); + client.put_slab(slab).await?; - println!("send get last index"); - let index = client.get_last_index().await?; - println!("index: {}", index); - - println!("send get slab"); - let x = client.get_slab(index).await?; - println!("{:?}", x); + // println!("send get slab"); + // let x = client.get_slab(1).await?; + // println!("{:?}", x); fetch_loop_task.cancel().await; diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 0861750a0..2a279d126 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use std::path::Path; use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; -use crate::{serial::deserialize, serial::serialize, slabstore::SlabStore, Error, Result}; +use crate::{serial::deserialize, serial::serialize, slabstore::SlabStore, Error, Result, slab::Slab}; use async_executor::Executor; use log::*; @@ -28,7 +28,7 @@ impl GatewayService { pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result> { let publisher = Mutex::new(Publisher::new(pub_addr)); - let slabstore = SlabStore::new(Path::new("../slabstore.db"))?; + let slabstore = SlabStore::new(Path::new("slabstore.db"))?; Ok(Arc::new(GatewayService { slabstore, @@ -74,8 +74,7 @@ impl GatewayService { let reply = Reply::from(&request, 0, vec![]); send_queue.send(reply).await?; - // publish to all subscribes - self.publisher.lock().await.publish(slab).await?; + // publish to all subscribes self.publisher.lock().await.publish(slab).await?; info!("received putslab msg"); } @@ -126,7 +125,7 @@ impl GatewayClient { pub fn new(addr: SocketAddr) -> Result { let protocol = ReqProtocol::new(addr); - let slabstore = SlabStore::new(Path::new("slabstore.db"))?; + let slabstore = SlabStore::new(Path::new("slabstore_client.db"))?; Ok(GatewayClient { protocol, @@ -137,6 +136,16 @@ impl GatewayClient { pub async fn start(&mut self) -> Result<()> { self.protocol.start().await?; + // start syncing + let local_last_index = self.slabstore.get_last_index()?; + let last_index = self.get_last_index().await?; + + if last_index > 0 { + for index in (local_last_index + 1)..(last_index + 1){ + self.get_slab(index).await?; + } + } + Ok(()) } @@ -147,17 +156,25 @@ impl GatewayClient { } pub async fn get_slab(&mut self, index: u64) -> Result> { - self.protocol + let slab = self.protocol .request(GatewayCommand::GetSlab as u8, serialize(&index)) - .await + .await?; + self.slabstore.put(slab.clone())?; + Ok(slab) } - pub async fn put_slab(&mut self, data: Vec) -> Result<()> { + pub async fn put_slab(&mut self, mut slab: Slab) -> Result<()> { + let last_index = self.get_last_index().await?; + slab.set_index(last_index + 1); + let slab = serialize(&slab); + self.protocol - .request(GatewayCommand::PutSlab as u8, data.clone()) + .request(GatewayCommand::PutSlab as u8, slab.clone()) .await?; + Ok(()) } + pub async fn get_last_index(&mut self) -> Result { let rep = self .protocol