cargo fmt

This commit is contained in:
ghassmo
2021-05-15 22:47:13 +03:00
parent f7f7bf29c9
commit 6c932412e7
4 changed files with 60 additions and 81 deletions

View File

@@ -7,12 +7,10 @@ use sapvi::Result;
use sapvi::service::gateway;
async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
let gateway =
gateway::GatewayService::new(
String::from("tcp://127.0.0.1:3333"),
String::from("tcp://127.0.0.1:4444")
);
let gateway = gateway::GatewayService::new(
String::from("tcp://127.0.0.1:3333"),
String::from("tcp://127.0.0.1:4444"),
);
gateway.start(executor.clone()).await?;
Ok(())

View File

@@ -1,42 +1,38 @@
use async_executor::Executor;
use easy_parallel::Parallel;
use async_std::sync::{Arc, Mutex};
use easy_parallel::Parallel;
use sapvi::Result;
use sapvi::service::gateway::GatewayClient;
use sapvi::Result;
async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
let mut client =
GatewayClient::new(
String::from("tcp://127.0.0.1:3333"),
);
let mut client = GatewayClient::new(String::from("tcp://127.0.0.1:3333"));
client.start().await?;
println!("connected to a server");
let slabs = Arc::new(Mutex::new(vec![]));
let subscriber = client.subscribe(
String::from("tcp://127.0.0.1:4444")
).await?;
let subscriber = client
.subscribe(String::from("tcp://127.0.0.1:4444"))
.await?;
println!("subscription ready");
let fetch_loop_task = executor.spawn(GatewayClient::fetch_slabs_loop(
subscriber.clone(),
slabs.clone(),
));
let fetch_loop_task = executor.spawn(GatewayClient::fetch_slabs_loop(subscriber.clone(), slabs.clone()));
client.put_slab(vec![0,0,0,0]).await?;
client.put_slab(vec![0,0,0,0]).await?;
client.put_slab(vec![0,0,0,0]).await?;
client.put_slab(vec![0, 0, 0, 0]).await?;
client.put_slab(vec![0, 0, 0, 0]).await?;
client.put_slab(vec![0, 0, 0, 0]).await?;
fetch_loop_task.cancel().await;
Ok(())
}
fn main() -> Result<()> {
let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_channel::unbounded::<()>();

View File

@@ -1,7 +1,7 @@
use std::convert::TryInto;
use async_std::sync::{Arc, Mutex};
use std::convert::TryInto;
use super::reqrep::{Reply, Request, RepProtocol, ReqProtocol, Publisher, Subscriber};
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
use crate::{Error, Result};
use async_executor::Executor;
@@ -11,28 +11,29 @@ pub type Slabs = Vec<Vec<u8>>;
pub struct GatewayService {
slabs: Mutex<Slabs>,
addr: String,
publisher: Mutex<Publisher>
publisher: Mutex<Publisher>,
}
impl GatewayService {
pub fn new(addr: String, pub_addr: String) -> Arc<GatewayService>{
pub fn new(addr: String, pub_addr: String) -> Arc<GatewayService> {
let slabs = Mutex::new(vec![]);
let publisher = Mutex::new(Publisher::new(pub_addr));
Arc::new(GatewayService {
slabs,
addr,
publisher
publisher,
})
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
let (send_queue_s, send_queue_r) = async_channel::unbounded::<Reply>();
let (recv_queue_s, recv_queue_r) = async_channel::unbounded::<Request>();
let mut reqrep = RepProtocol::new(self.addr.clone(), send_queue_r.clone(), recv_queue_s.clone(),);
let mut reqrep = RepProtocol::new(
self.addr.clone(),
send_queue_r.clone(),
recv_queue_s.clone(),
);
reqrep.start().await?;
println!("server started");
@@ -41,7 +42,8 @@ impl GatewayService {
println!("publisher started");
let handle_request_task = executor.spawn(self.handle_request(send_queue_s.clone(), recv_queue_r.clone()));
let handle_request_task =
executor.spawn(self.handle_request(send_queue_s.clone(), recv_queue_r.clone()));
reqrep.run().await?;
@@ -54,11 +56,10 @@ impl GatewayService {
send_queue: async_channel::Sender<Reply>,
recv_queue: async_channel::Receiver<Request>,
) -> Result<()> {
let data = vec![];
loop {
match recv_queue.recv().await{
match recv_queue.recv().await {
Ok(request) => {
match request.get_command() {
0 => {
@@ -85,7 +86,6 @@ impl GatewayService {
}
let rep = Reply::from(&request, 0, data.clone());
send_queue.send(rep.into()).await?;
}
Err(_) => {}
}
@@ -100,9 +100,7 @@ pub struct GatewayClient {
impl GatewayClient {
pub fn new(addr: String) -> GatewayClient {
let protocol = ReqProtocol::new(addr);
GatewayClient {
protocol,
}
GatewayClient { protocol }
}
pub async fn start(&mut self) -> Result<()> {
self.protocol.start().await?;
@@ -116,26 +114,35 @@ impl GatewayClient {
}
pub async fn get_slab(&mut self, index: u32) -> Result<Vec<u8>> {
self.protocol.request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec())
self.protocol
.request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec())
.await
}
pub async fn put_slab(&mut self, data: Vec<u8>) -> Result<()> {
self.protocol.request(GatewayCommand::PutSlab as u8, data.clone()).await?;
self.protocol
.request(GatewayCommand::PutSlab as u8, data.clone())
.await?;
Ok(())
}
pub async fn get_last_index(&mut self) -> Result<u32> {
let rep = self.protocol.request(GatewayCommand::GetLastIndex as u8, vec![]).await?;
let rep = self
.protocol
.request(GatewayCommand::GetLastIndex as u8, vec![])
.await?;
let rep: [u8; 4] = rep.try_into().unwrap();
Ok(u32::from_be_bytes(rep))
}
pub async fn fetch_slabs_loop(subscriber: Arc<Mutex<Subscriber>>, slabs: Arc<Mutex<Slabs>>) -> Result<()>{
pub async fn fetch_slabs_loop(
subscriber: Arc<Mutex<Subscriber>>,
slabs: Arc<Mutex<Slabs>>,
) -> Result<()> {
loop {
let mut subscriber = subscriber.lock().await;
let slab = subscriber.fetch().await?;
println!("received new slab from subscriber");
println!("received new slab from subscriber");
slabs.lock().await.push(slab);
}
}

View File

@@ -1,7 +1,7 @@
use std::io;
use crate::{Decodable, Encodable, Result};
use crate::serial::{deserialize, serialize};
use crate::{Decodable, Encodable, Result};
use bytes::Bytes;
use futures::FutureExt;
@@ -13,8 +13,7 @@ enum NetEvent {
Send(Reply),
}
pub struct RepProtocol{
pub struct RepProtocol {
addr: String,
socket: zeromq::RepSocket,
recv_queue: async_channel::Receiver<Reply>,
@@ -22,18 +21,18 @@ pub struct RepProtocol{
}
impl RepProtocol {
pub fn new(addr: String,
pub fn new(
addr: String,
recv_queue: async_channel::Receiver<Reply>,
send_queue: async_channel::Sender<Request>
send_queue: async_channel::Sender<Request>,
) -> RepProtocol {
let socket = zeromq::RepSocket::new();
RepProtocol{
RepProtocol {
addr,
socket,
recv_queue,
send_queue,
}
}
pub async fn start(&mut self) -> Result<()> {
self.socket.bind(self.addr.as_str()).await?;
@@ -61,23 +60,18 @@ impl RepProtocol {
}
}
}
}
}
pub struct ReqProtocol {
addr: String,
socket: zeromq::ReqSocket,
}
impl ReqProtocol {
pub fn new(addr: String) -> ReqProtocol{
pub fn new(addr: String) -> ReqProtocol {
let socket = zeromq::ReqSocket::new();
ReqProtocol {
addr,
socket
}
ReqProtocol { addr, socket }
}
pub async fn start(&mut self) -> Result<()> {
@@ -106,8 +100,6 @@ impl ReqProtocol {
Ok(reply.get_payload())
}
}
pub struct Publisher {
@@ -115,53 +107,43 @@ pub struct Publisher {
socket: zeromq::PubSocket,
}
impl Publisher {
pub fn new(addr: String) -> Publisher {
let socket = zeromq::PubSocket::new();
Publisher{
addr,
socket
}
Publisher { addr, socket }
}
pub async fn start(&mut self) -> Result<()> {
self.socket.bind(self.addr.as_str()).await?;
Ok(())
}
pub async fn publish(&mut self, data: Vec<u8>) -> Result<()>{
pub async fn publish(&mut self, data: Vec<u8>) -> Result<()> {
let data = Bytes::from(data);
self.socket.send(data.into()).await?;
Ok(())
}
}
pub struct Subscriber{
pub struct Subscriber {
addr: String,
socket: zeromq::SubSocket
socket: zeromq::SubSocket,
}
impl Subscriber {
pub fn new(addr: String) -> Subscriber {
let socket = zeromq::SubSocket::new();
Subscriber{
addr,
socket
}
Subscriber { addr, socket }
}
pub async fn start(&mut self) -> Result<()> {
self.socket
.connect(self.addr.as_str())
.await?;
self.socket.connect(self.addr.as_str()).await?;
self.socket.subscribe("").await?;
Ok(())
}
pub async fn fetch(&mut self) -> Result<Vec<u8>>{
pub async fn fetch(&mut self) -> Result<Vec<u8>> {
let data = self.socket.recv().await?;
let data: &Bytes = data.get(0).unwrap();
let data = data.to_vec();
@@ -169,7 +151,6 @@ impl Subscriber {
}
}
#[derive(Debug, PartialEq)]
pub struct Request {
command: u8,
@@ -301,6 +282,3 @@ mod tests {
assert_eq!(deserialized_reply, Some(reply));
}
}