diff --git a/src/bin/demowallet.rs b/src/bin/demowallet.rs index f2a567b97..8ca1f0adf 100644 --- a/src/bin/demowallet.rs +++ b/src/bin/demowallet.rs @@ -1,35 +1,27 @@ use async_executor::Executor; -use async_std::sync::{Arc, Mutex}; +use async_std::sync::Arc; use easy_parallel::Parallel; -use drk::service::{fetch_slabs_loop, GatewayClient}; +use drk::service::GatewayClient; use drk::{slab::Slab, Result}; async fn start(executor: Arc>) -> Result<()> { - let mut client = GatewayClient::new("127.0.0.1:3333".parse()?)?; + let mut client = GatewayClient::new("127.0.0.1:3333".parse()?, "slabstore_client.db")?; client.start().await?; println!("connected to a server"); - let slabs = Arc::new(Mutex::new(vec![])); - let subscriber = client.subscribe("127.0.0.1:4444".parse()?).await?; + let slabstore = client.get_slabstore(); + let subscriber_task = executor.spawn(GatewayClient::subscribe(slabstore,"127.0.0.1:4444".parse()?)); println!("subscriber ready"); - // TODO sync new slab with slabstore - let fetch_loop_task = executor.spawn(fetch_slabs_loop(subscriber.clone(), slabs.clone())); - println!("send put slab"); let slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]); client.put_slab(slab).await?; - // println!("send get slab"); - // let x = client.get_slab(1).await?; - // println!("{:?}", x); - - fetch_loop_task.cancel().await; - + subscriber_task.cancel().await; Ok(()) } diff --git a/src/service/gateway.rs b/src/service/gateway.rs index afc8016da..09a1f6173 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -152,12 +152,6 @@ impl GatewayClient { Ok(()) } - pub async fn subscribe(&self, sub_addr: SocketAddr) -> Result>> { - let mut subscriber = Subscriber::new(sub_addr); - subscriber.start().await?; - Ok(Arc::new(Mutex::new(subscriber))) - } - pub async fn get_slab(&mut self, index: u64) -> Result> { let slab = self .protocol @@ -190,20 +184,17 @@ impl GatewayClient { self.slabstore.clone() } -} - -pub async fn fetch_slabs_loop( - subscriber: Arc>, - slabs: Arc>, -) -> Result<()> { - loop { - let slab: Vec; - { - let mut subscriber = subscriber.lock().await; + pub async fn subscribe(slabstore: Arc, sub_addr: SocketAddr) -> Result<()> { + let mut subscriber = Subscriber::new(sub_addr); + subscriber.start().await?; + loop { + let slab: Vec; slab = subscriber.fetch().await?; + slabstore.put(slab)?; } - info!("received new slab from subscriber"); - slabs.lock().await.push(slab); } + } + + diff --git a/src/service/mod.rs b/src/service/mod.rs index 3b5e5cfa5..32db02a4e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,5 +2,5 @@ pub mod gateway; pub mod options; pub mod reqrep; -pub use gateway::{fetch_slabs_loop, GatewayClient, GatewayService}; +pub use gateway::{ GatewayClient, GatewayService}; pub use options::ProgramOptions;