diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 4dd26efd5..ed7917b2a 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -28,12 +28,13 @@ async fn start(executor: Arc>, options: ClientProgramOptions) -> Re let subscriber = GatewayClient::start_subscriber(sub_addr).await?; let slabstore = client.get_slabstore(); - let _ = executor.spawn(GatewayClient::subscribe(subscriber, slabstore)); + let subscribe_task = executor.spawn(GatewayClient::subscribe(subscriber, slabstore)); // TEST let _slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]); - client.put_slab(_slab).await?; + //client.put_slab(_slab).await?; + subscribe_task.cancel().await; Ok(()) } @@ -61,7 +62,7 @@ fn main() -> Result<()> { std::fs::File::create(options.log_path.as_path()).unwrap(), ), ]) - .unwrap(); + .unwrap(); let ex2 = ex.clone(); @@ -106,7 +107,7 @@ mod test { std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(), ), ]) - .unwrap(); + .unwrap(); let mut thread_pools: Vec> = vec![]; @@ -116,19 +117,23 @@ mod test { let mut rng = rand::thread_rng(); let rnd: u32 = rng.gen(); + + // create new client and use different slabstore let mut client = GatewayClient::new( "127.0.0.1:3333".parse().unwrap(), Path::new(&format!("slabstore_{}.db", rnd)), ) - .unwrap(); + .unwrap(); + // start client client.start().await.unwrap(); + // sending slab let _slab = Slab::new("testcoin".to_string(), rnd.to_le_bytes().to_vec()); client.put_slab(_slab).await.unwrap(); - std::thread::sleep(std::time::Duration::from_secs(3)); - let last_index = client.slabstore.get_last_index().unwrap(); + + let last_index = client.get_slabstore().get_last_index().unwrap(); info!("last index: {}", last_index); }) }); diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 6d3d2f742..d4870176f 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -3,7 +3,7 @@ use std::convert::From; use std::net::SocketAddr; use std::path::Path; -use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; +use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber, PeerId}; use crate::{ serial::deserialize, serial::serialize, slab::Slab, slabstore::SlabStore, Error, Result, }; @@ -73,8 +73,8 @@ impl GatewayService { async fn handle_request( self: Arc, - send_queue: async_channel::Sender<(Vec, Reply)>, - recv_queue: async_channel::Receiver<(Vec, Request)>, + send_queue: async_channel::Sender<(PeerId, Reply)>, + recv_queue: async_channel::Receiver<(PeerId, Request)>, publish_queue: async_channel::Sender>, ) -> Result<()> { loop { @@ -141,7 +141,7 @@ impl GatewayService { pub struct GatewayClient { protocol: ReqProtocol, - pub slabstore: Arc, + slabstore: Arc, } impl GatewayClient { diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index 5ea97870c..aa01a79ad 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -14,9 +14,11 @@ use rand::Rng; use signal_hook::{consts::SIGINT, iterator::Signals}; use zeromq::*; +pub type PeerId = Vec; + enum NetEvent { Receive(zeromq::ZmqMessage), - Send((Vec, Reply)), + Send((PeerId, Reply)), Stop, } @@ -27,11 +29,11 @@ pub fn addr_to_string(addr: SocketAddr) -> String { pub struct RepProtocol { addr: SocketAddr, socket: zeromq::RouterSocket, - recv_queue: async_channel::Receiver<(Vec, Reply)>, - send_queue: async_channel::Sender<(Vec, Request)>, + recv_queue: async_channel::Receiver<(PeerId, Reply)>, + send_queue: async_channel::Sender<(PeerId, Request)>, channels: ( - async_channel::Sender<(Vec, Reply)>, - async_channel::Receiver<(Vec, Request)>, + async_channel::Sender<(PeerId, Reply)>, + async_channel::Receiver<(PeerId, Request)>, ), service_name: String, } @@ -39,8 +41,8 @@ pub struct RepProtocol { impl RepProtocol { pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol { let socket = zeromq::RouterSocket::new(); - let (send_queue, recv_channel) = async_channel::unbounded::<(Vec, Request)>(); - let (send_channel, recv_queue) = async_channel::unbounded::<(Vec, Reply)>(); + let (send_queue, recv_channel) = async_channel::unbounded::<(PeerId, Request)>(); + let (send_channel, recv_queue) = async_channel::unbounded::<(PeerId, Reply)>(); let channels = (send_channel.clone(), recv_channel.clone()); @@ -57,8 +59,8 @@ impl RepProtocol { pub async fn start( &mut self, ) -> Result<( - async_channel::Sender<(Vec, Reply)>, - async_channel::Receiver<(Vec, Request)>, + async_channel::Sender<(PeerId, Reply)>, + async_channel::Receiver<(PeerId, Request)>, )> { let addr = addr_to_string(self.addr); self.socket.bind(addr.as_str()).await?; @@ -90,10 +92,10 @@ impl RepProtocol { match event { NetEvent::Receive(msg) => { - if let Some(request) = msg.get(1) { - let request: Vec = request.to_vec(); - let request: Request = deserialize(&request)?; - if let Some(peer) = msg.get(0) { + if let Some(peer) = msg.get(0) { + if let Some(request) = msg.get(1) { + let request: Vec = request.to_vec(); + let request: Request = deserialize(&request)?; self.send_queue.send((peer.to_vec(), request)).await?; } } @@ -175,7 +177,7 @@ impl ReqProtocol { Ok(reply.get_payload()) } else { Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )) } } @@ -253,7 +255,7 @@ impl Subscriber { Ok(data) } None => Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )), } }