refactor subscribe function and remove fetch_slabs_loop function

This commit is contained in:
ghassmo
2021-05-29 07:08:24 +03:00
parent f7dcdd5ddd
commit f7552adf8a
3 changed files with 16 additions and 33 deletions

View File

@@ -1,35 +1,27 @@
use async_executor::Executor;
use async_std::sync::{Arc, Mutex};
use async_std::sync::Arc;
use easy_parallel::Parallel;
use drk::service::{fetch_slabs_loop, GatewayClient};
use drk::service::GatewayClient;
use drk::{slab::Slab, Result};
async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
let mut client = GatewayClient::new("127.0.0.1:3333".parse()?)?;
let mut client = GatewayClient::new("127.0.0.1:3333".parse()?, "slabstore_client.db")?;
client.start().await?;
println!("connected to a server");
let slabs = Arc::new(Mutex::new(vec![]));
let subscriber = client.subscribe("127.0.0.1:4444".parse()?).await?;
let slabstore = client.get_slabstore();
let subscriber_task = executor.spawn(GatewayClient::subscribe(slabstore,"127.0.0.1:4444".parse()?));
println!("subscriber ready");
// TODO sync new slab with slabstore
let fetch_loop_task = executor.spawn(fetch_slabs_loop(subscriber.clone(), slabs.clone()));
println!("send put slab");
let slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]);
client.put_slab(slab).await?;
// println!("send get slab");
// let x = client.get_slab(1).await?;
// println!("{:?}", x);
fetch_loop_task.cancel().await;
subscriber_task.cancel().await;
Ok(())
}

View File

@@ -152,12 +152,6 @@ impl GatewayClient {
Ok(())
}
pub async fn subscribe(&self, sub_addr: SocketAddr) -> Result<Arc<Mutex<Subscriber>>> {
let mut subscriber = Subscriber::new(sub_addr);
subscriber.start().await?;
Ok(Arc::new(Mutex::new(subscriber)))
}
pub async fn get_slab(&mut self, index: u64) -> Result<Vec<u8>> {
let slab = self
.protocol
@@ -190,20 +184,17 @@ impl GatewayClient {
self.slabstore.clone()
}
}
pub async fn fetch_slabs_loop(
subscriber: Arc<Mutex<Subscriber>>,
slabs: Arc<Mutex<Slabs>>,
) -> Result<()> {
loop {
let slab: Vec<u8>;
{
let mut subscriber = subscriber.lock().await;
pub async fn subscribe(slabstore: Arc<SlabStore>, sub_addr: SocketAddr) -> Result<()> {
let mut subscriber = Subscriber::new(sub_addr);
subscriber.start().await?;
loop {
let slab: Vec<u8>;
slab = subscriber.fetch().await?;
slabstore.put(slab)?;
}
info!("received new slab from subscriber");
slabs.lock().await.push(slab);
}
}

View File

@@ -2,5 +2,5 @@ pub mod gateway;
pub mod options;
pub mod reqrep;
pub use gateway::{fetch_slabs_loop, GatewayClient, GatewayService};
pub use gateway::{ GatewayClient, GatewayService};
pub use options::ProgramOptions;