diff --git a/Cargo.toml b/Cargo.toml index c140ee005..5b5998f16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -210,6 +210,7 @@ node = [ "crypto", "wallet", "util", + "net", ] diff --git a/src/node/service/gateway_p2p.rs b/src/node/service/gateway_p2p.rs index 96c28f10c..d2b5138e7 100644 --- a/src/node/service/gateway_p2p.rs +++ b/src/node/service/gateway_p2p.rs @@ -1,37 +1,37 @@ -use std::sync::Arc; +use async_std::sync::{Arc, Mutex}; use std::io; -use rand::Rng; use async_executor::Executor; use log::*; use crate::{ blockchain::{rocks::columns, RocksColumn, Slab, SlabStore}, - util::serial::{Decodable, Encodable}, + net, + net::{P2p, P2pPtr, Settings}, + util::{ + serial::{deserialize, serialize, Decodable, Encodable}, + sleep, + }, Error, Result, - net::{P2pPtr, P2p, Settings} }; -#[allow(dead_code)] pub struct Gateway { p2p: P2pPtr, slabstore: Arc, + last_indexes: Arc>>, } impl Gateway { - pub fn new( - _settings: Settings, - rocks: RocksColumn, - ) -> Result { + pub fn new(_settings: Settings, rocks: RocksColumn) -> Result { let slabstore = SlabStore::new(rocks)?; let settings = Settings::default(); let p2p = P2p::new(settings); - Ok(Self { p2p, slabstore}) + let last_indexes = Arc::new(Mutex::new(vec![0; 10])); + Ok(Self { p2p, slabstore, last_indexes }) } pub async fn start(&self, executor: Arc>) -> Result<()> { - self.p2p.clone().start(executor.clone()).await?; self.p2p.clone().run(executor.clone()).await?; @@ -39,57 +39,114 @@ impl Gateway { Ok(()) } - - async fn publish(&self, _msg: GatewayMessage) -> Result<()> { - Ok(()) + async fn publish(&self, msg: GatewayMessage) -> Result<()> { + self.p2p.broadcast(msg).await } - async fn subscribe_loop(&self, _executor: Arc>) -> Result<()> { + async fn subscribe_loop(&self, executor: Arc>) -> Result<()> { let new_channel_sub = self.p2p.subscribe_channel().await; loop { let channel = new_channel_sub.receive().await?; - // do something - } + let message_subsytem = channel.get_message_subsystem(); + + message_subsytem.add_dispatch::().await; + + let msg_sub = channel.subscribe_msg::().await?; + + let jobsman = net::ProtocolJobsManager::new("GatewayMessage", channel); + + jobsman.clone().start(executor.clone()); + + jobsman + .spawn(Self::handle_msg(self.slabstore.clone(), msg_sub), executor.clone()) + .await; + } } - async fn handle_msg( - msg: GatewayMessage, - _slabstore: Arc, + pub async fn handle_msg( + slabstore: Arc, + msg_sub: net::MessageSubscription, ) -> Result<()> { - match msg.get_command() { - GatewayCommand::PutSlab => { - debug!(target: "GATEWAY DAEMON", "Received putslab msg"); - } - GatewayCommand::GetSlab => { - debug!(target: "GATEWAY DAEMON", "Received getslab msg"); - } - GatewayCommand::GetLastIndex => { - debug!(target: "GATEWAY DAEMON","Received getlastindex msg"); + loop { + let msg = msg_sub.receive().await?; + + match msg.get_command() { + GatewayCommand::PutSlab => { + debug!(target: "GATEWAY", "Received putslab msg"); + + let slab = msg.get_payload(); + + slabstore.put(deserialize(&slab)?)?; + + // TODO publish the new received slab + } + GatewayCommand::GetSlab => { + debug!(target: "GATEWAY", "Received getslab msg"); + + let index = msg.get_payload(); + let _slab = slabstore.get(index)?; + + // TODO publish the slab + } + GatewayCommand::GetLastIndex => { + debug!(target: "GATEWAY","Received getlastindex msg"); + + let _index = slabstore.get_last_index_as_bytes()?; + + // TODO publish the inex + } } } - Ok(()) } - pub async fn sync(&mut self) -> Result { - debug!(target: "GATEWAY CLIENT", "Start Syncing"); - Ok(0) + pub async fn sync(&self) -> Result<()> { + debug!(target: "GATEWAY", "Start Syncing"); + + loop { + let local_last_index = self.slabstore.get_last_index()?; + + // start syncing every 4 seconds + sleep(4).await; + + self.get_last_index().await?; + let last_index = 0; + + if last_index < local_last_index { + return Err(Error::SlabsStore( + "Local slabstore has higher index than gateway's slabstore. + Run \" darkfid -r \" to refresh the database." + .into(), + )) + } + + if last_index > 0 { + for index in (local_last_index + 1)..(last_index + 1) { + self.get_slab(index).await? + } + } + + debug!(target: "GATEWAY","End Syncing"); + } } - pub async fn get_slab(&mut self, _index: u64) -> Result> { - debug!(target: "GATEWAY CLIENT","Get slab"); - Ok(None) + pub async fn get_slab(&self, index: u64) -> Result<()> { + debug!(target: "GATEWAY","Send get slab msg"); + let msg = GatewayMessage::new(GatewayCommand::GetSlab, serialize(&index)); + self.publish(msg).await } - pub async fn put_slab(&mut self, _slab: Slab) -> Result<()> { - debug!(target: "GATEWAY CLIENT","Put slab"); - Ok(()) + pub async fn put_slab(&self, slab: Slab) -> Result<()> { + debug!(target: "GATEWAY","Send put slab msg"); + let msg = GatewayMessage::new(GatewayCommand::PutSlab, serialize(&slab)); + self.publish(msg).await } - pub async fn get_last_index(&self) -> Result { - debug!(target: "GATEWAY CLIENT","Get last index"); - Ok(0) + pub async fn get_last_index(&self) -> Result<()> { + debug!(target: "GATEWAY","Send get last index msg"); + let msg = GatewayMessage::new(GatewayCommand::PutSlab, vec![]); + self.publish(msg).await } pub fn get_slabstore(&self) -> Arc { @@ -107,25 +164,14 @@ pub enum GatewayCommand { #[derive(Debug, PartialEq, Clone)] pub struct GatewayMessage { command: GatewayCommand, - id: u32, payload: Vec, } impl GatewayMessage { pub fn new(command: GatewayCommand, payload: Vec) -> Self { - let id = Self::gen_id(); - Self { command, id, payload } + Self { command, payload } } - fn gen_id() -> u32 { - let mut rng = rand::thread_rng(); - rng.gen() - } - - pub fn get_id(&self) -> u32 { - self.id - } - - pub fn get_command(&self) -> GatewayCommand{ + pub fn get_command(&self) -> GatewayCommand { self.command.clone() } @@ -138,7 +184,6 @@ impl Encodable for GatewayMessage { fn encode(&self, mut s: S) -> Result { let mut len = 0; len += (self.command.clone() as u8).encode(&mut s)?; - len += self.id.encode(&mut s)?; len += self.payload.encode(&mut s)?; Ok(len) } @@ -146,25 +191,19 @@ impl Encodable for GatewayMessage { impl Decodable for GatewayMessage { fn decode(mut d: D) -> Result { - let command_code: u8 = Decodable::decode(&mut d)?; - let command = match command_code { - 0 => GatewayCommand::GetSlab, - 1 => GatewayCommand::PutSlab, - 2 => GatewayCommand::GetLastIndex, + let command_code: u8 = Decodable::decode(&mut d)?; + let command = match command_code { + 0 => GatewayCommand::PutSlab, + 1 => GatewayCommand::GetSlab, _ => GatewayCommand::GetLastIndex, }; - Ok(Self { - command, - id: Decodable::decode(&mut d)?, - payload: Decodable::decode(&mut d)?, - }) + Ok(Self { command, payload: Decodable::decode(&mut d)? }) } } -impl crate::net::Message for GatewayMessage { +impl net::Message for GatewayMessage { fn name() -> &'static str { "reply" } } - diff --git a/src/node/service/mod.rs b/src/node/service/mod.rs index 51094d1cf..9505c99e0 100644 --- a/src/node/service/mod.rs +++ b/src/node/service/mod.rs @@ -1,5 +1,5 @@ pub mod gateway; -pub mod reqrep; pub mod gateway_p2p; +pub mod reqrep; pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber};