This commit is contained in:
ghassmo
2021-05-29 15:50:08 +03:00
parent 6305c5151f
commit eb05266e96
3 changed files with 33 additions and 26 deletions

View File

@@ -28,12 +28,13 @@ async fn start(executor: Arc<Executor<'_>>, options: ClientProgramOptions) -> Re
let subscriber = GatewayClient::start_subscriber(sub_addr).await?;
let slabstore = client.get_slabstore();
let _ = executor.spawn(GatewayClient::subscribe(subscriber, slabstore));
let subscribe_task = executor.spawn(GatewayClient::subscribe(subscriber, slabstore));
// TEST
let _slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]);
client.put_slab(_slab).await?;
//client.put_slab(_slab).await?;
subscribe_task.cancel().await;
Ok(())
}
@@ -61,7 +62,7 @@ fn main() -> Result<()> {
std::fs::File::create(options.log_path.as_path()).unwrap(),
),
])
.unwrap();
.unwrap();
let ex2 = ex.clone();
@@ -106,7 +107,7 @@ mod test {
std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(),
),
])
.unwrap();
.unwrap();
let mut thread_pools: Vec<std::thread::JoinHandle<()>> = vec![];
@@ -116,19 +117,23 @@ mod test {
let mut rng = rand::thread_rng();
let rnd: u32 = rng.gen();
// create new client and use different slabstore
let mut client = GatewayClient::new(
"127.0.0.1:3333".parse().unwrap(),
Path::new(&format!("slabstore_{}.db", rnd)),
)
.unwrap();
.unwrap();
// start client
client.start().await.unwrap();
// sending slab
let _slab = Slab::new("testcoin".to_string(), rnd.to_le_bytes().to_vec());
client.put_slab(_slab).await.unwrap();
std::thread::sleep(std::time::Duration::from_secs(3));
let last_index = client.slabstore.get_last_index().unwrap();
let last_index = client.get_slabstore().get_last_index().unwrap();
info!("last index: {}", last_index);
})
});

View File

@@ -3,7 +3,7 @@ use std::convert::From;
use std::net::SocketAddr;
use std::path::Path;
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber, PeerId};
use crate::{
serial::deserialize, serial::serialize, slab::Slab, slabstore::SlabStore, Error, Result,
};
@@ -73,8 +73,8 @@ impl GatewayService {
async fn handle_request(
self: Arc<Self>,
send_queue: async_channel::Sender<(Vec<u8>, Reply)>,
recv_queue: async_channel::Receiver<(Vec<u8>, Request)>,
send_queue: async_channel::Sender<(PeerId, Reply)>,
recv_queue: async_channel::Receiver<(PeerId, Request)>,
publish_queue: async_channel::Sender<Vec<u8>>,
) -> Result<()> {
loop {
@@ -141,7 +141,7 @@ impl GatewayService {
pub struct GatewayClient {
protocol: ReqProtocol,
pub slabstore: Arc<SlabStore>,
slabstore: Arc<SlabStore>,
}
impl GatewayClient {

View File

@@ -14,9 +14,11 @@ use rand::Rng;
use signal_hook::{consts::SIGINT, iterator::Signals};
use zeromq::*;
pub type PeerId = Vec<u8>;
enum NetEvent {
Receive(zeromq::ZmqMessage),
Send((Vec<u8>, Reply)),
Send((PeerId, Reply)),
Stop,
}
@@ -27,11 +29,11 @@ pub fn addr_to_string(addr: SocketAddr) -> String {
pub struct RepProtocol {
addr: SocketAddr,
socket: zeromq::RouterSocket,
recv_queue: async_channel::Receiver<(Vec<u8>, Reply)>,
send_queue: async_channel::Sender<(Vec<u8>, Request)>,
recv_queue: async_channel::Receiver<(PeerId, Reply)>,
send_queue: async_channel::Sender<(PeerId, Request)>,
channels: (
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>, Request)>,
async_channel::Sender<(PeerId, Reply)>,
async_channel::Receiver<(PeerId, Request)>,
),
service_name: String,
}
@@ -39,8 +41,8 @@ pub struct RepProtocol {
impl RepProtocol {
pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol {
let socket = zeromq::RouterSocket::new();
let (send_queue, recv_channel) = async_channel::unbounded::<(Vec<u8>, Request)>();
let (send_channel, recv_queue) = async_channel::unbounded::<(Vec<u8>, Reply)>();
let (send_queue, recv_channel) = async_channel::unbounded::<(PeerId, Request)>();
let (send_channel, recv_queue) = async_channel::unbounded::<(PeerId, Reply)>();
let channels = (send_channel.clone(), recv_channel.clone());
@@ -57,8 +59,8 @@ impl RepProtocol {
pub async fn start(
&mut self,
) -> Result<(
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>, Request)>,
async_channel::Sender<(PeerId, Reply)>,
async_channel::Receiver<(PeerId, Request)>,
)> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
@@ -90,10 +92,10 @@ impl RepProtocol {
match event {
NetEvent::Receive(msg) => {
if let Some(request) = msg.get(1) {
let request: Vec<u8> = request.to_vec();
let request: Request = deserialize(&request)?;
if let Some(peer) = msg.get(0) {
if let Some(peer) = msg.get(0) {
if let Some(request) = msg.get(1) {
let request: Vec<u8> = request.to_vec();
let request: Request = deserialize(&request)?;
self.send_queue.send((peer.to_vec(), request)).await?;
}
}
@@ -175,7 +177,7 @@ impl ReqProtocol {
Ok(reply.get_payload())
} else {
Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
))
}
}
@@ -253,7 +255,7 @@ impl Subscriber {
Ok(data)
}
None => Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
)),
}
}