node/service: implement functions in p2p gateway

This commit is contained in:
ghassmo
2022-01-14 22:52:33 +04:00
parent 59abc99789
commit 5ee7330eb3
3 changed files with 109 additions and 69 deletions

View File

@@ -210,6 +210,7 @@ node = [
"crypto",
"wallet",
"util",
"net",
]

View File

@@ -1,37 +1,37 @@
use std::sync::Arc;
use async_std::sync::{Arc, Mutex};
use std::io;
use rand::Rng;
use async_executor::Executor;
use log::*;
use crate::{
blockchain::{rocks::columns, RocksColumn, Slab, SlabStore},
util::serial::{Decodable, Encodable},
net,
net::{P2p, P2pPtr, Settings},
util::{
serial::{deserialize, serialize, Decodable, Encodable},
sleep,
},
Error, Result,
net::{P2pPtr, P2p, Settings}
};
#[allow(dead_code)]
pub struct Gateway {
p2p: P2pPtr,
slabstore: Arc<SlabStore>,
last_indexes: Arc<Mutex<Vec<u64>>>,
}
impl Gateway {
pub fn new(
_settings: Settings,
rocks: RocksColumn<columns::Slabs>,
) -> Result<Self> {
pub fn new(_settings: Settings, rocks: RocksColumn<columns::Slabs>) -> Result<Self> {
let slabstore = SlabStore::new(rocks)?;
let settings = Settings::default();
let p2p = P2p::new(settings);
Ok(Self { p2p, slabstore})
let last_indexes = Arc::new(Mutex::new(vec![0; 10]));
Ok(Self { p2p, slabstore, last_indexes })
}
pub async fn start(&self, executor: Arc<Executor<'_>>) -> Result<()> {
self.p2p.clone().start(executor.clone()).await?;
self.p2p.clone().run(executor.clone()).await?;
@@ -39,57 +39,114 @@ impl Gateway {
Ok(())
}
async fn publish(&self, _msg: GatewayMessage) -> Result<()> {
Ok(())
async fn publish(&self, msg: GatewayMessage) -> Result<()> {
self.p2p.broadcast(msg).await
}
async fn subscribe_loop(&self, _executor: Arc<Executor<'_>>) -> Result<()> {
async fn subscribe_loop(&self, executor: Arc<Executor<'_>>) -> Result<()> {
let new_channel_sub = self.p2p.subscribe_channel().await;
loop {
let channel = new_channel_sub.receive().await?;
// do something
}
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<GatewayMessage>().await;
let msg_sub = channel.subscribe_msg::<GatewayMessage>().await?;
let jobsman = net::ProtocolJobsManager::new("GatewayMessage", channel);
jobsman.clone().start(executor.clone());
jobsman
.spawn(Self::handle_msg(self.slabstore.clone(), msg_sub), executor.clone())
.await;
}
}
async fn handle_msg(
msg: GatewayMessage,
_slabstore: Arc<SlabStore>,
pub async fn handle_msg(
slabstore: Arc<SlabStore>,
msg_sub: net::MessageSubscription<GatewayMessage>,
) -> Result<()> {
match msg.get_command() {
GatewayCommand::PutSlab => {
debug!(target: "GATEWAY DAEMON", "Received putslab msg");
}
GatewayCommand::GetSlab => {
debug!(target: "GATEWAY DAEMON", "Received getslab msg");
}
GatewayCommand::GetLastIndex => {
debug!(target: "GATEWAY DAEMON","Received getlastindex msg");
loop {
let msg = msg_sub.receive().await?;
match msg.get_command() {
GatewayCommand::PutSlab => {
debug!(target: "GATEWAY", "Received putslab msg");
let slab = msg.get_payload();
slabstore.put(deserialize(&slab)?)?;
// TODO publish the new received slab
}
GatewayCommand::GetSlab => {
debug!(target: "GATEWAY", "Received getslab msg");
let index = msg.get_payload();
let _slab = slabstore.get(index)?;
// TODO publish the slab
}
GatewayCommand::GetLastIndex => {
debug!(target: "GATEWAY","Received getlastindex msg");
let _index = slabstore.get_last_index_as_bytes()?;
// TODO publish the inex
}
}
}
Ok(())
}
pub async fn sync(&mut self) -> Result<u64> {
debug!(target: "GATEWAY CLIENT", "Start Syncing");
Ok(0)
pub async fn sync(&self) -> Result<()> {
debug!(target: "GATEWAY", "Start Syncing");
loop {
let local_last_index = self.slabstore.get_last_index()?;
// start syncing every 4 seconds
sleep(4).await;
self.get_last_index().await?;
let last_index = 0;
if last_index < local_last_index {
return Err(Error::SlabsStore(
"Local slabstore has higher index than gateway's slabstore.
Run \" darkfid -r \" to refresh the database."
.into(),
))
}
if last_index > 0 {
for index in (local_last_index + 1)..(last_index + 1) {
self.get_slab(index).await?
}
}
debug!(target: "GATEWAY","End Syncing");
}
}
pub async fn get_slab(&mut self, _index: u64) -> Result<Option<Slab>> {
debug!(target: "GATEWAY CLIENT","Get slab");
Ok(None)
pub async fn get_slab(&self, index: u64) -> Result<()> {
debug!(target: "GATEWAY","Send get slab msg");
let msg = GatewayMessage::new(GatewayCommand::GetSlab, serialize(&index));
self.publish(msg).await
}
pub async fn put_slab(&mut self, _slab: Slab) -> Result<()> {
debug!(target: "GATEWAY CLIENT","Put slab");
Ok(())
pub async fn put_slab(&self, slab: Slab) -> Result<()> {
debug!(target: "GATEWAY","Send put slab msg");
let msg = GatewayMessage::new(GatewayCommand::PutSlab, serialize(&slab));
self.publish(msg).await
}
pub async fn get_last_index(&self) -> Result<u64> {
debug!(target: "GATEWAY CLIENT","Get last index");
Ok(0)
pub async fn get_last_index(&self) -> Result<()> {
debug!(target: "GATEWAY","Send get last index msg");
let msg = GatewayMessage::new(GatewayCommand::PutSlab, vec![]);
self.publish(msg).await
}
pub fn get_slabstore(&self) -> Arc<SlabStore> {
@@ -107,25 +164,14 @@ pub enum GatewayCommand {
#[derive(Debug, PartialEq, Clone)]
pub struct GatewayMessage {
command: GatewayCommand,
id: u32,
payload: Vec<u8>,
}
impl GatewayMessage {
pub fn new(command: GatewayCommand, payload: Vec<u8>) -> Self {
let id = Self::gen_id();
Self { command, id, payload }
Self { command, payload }
}
fn gen_id() -> u32 {
let mut rng = rand::thread_rng();
rng.gen()
}
pub fn get_id(&self) -> u32 {
self.id
}
pub fn get_command(&self) -> GatewayCommand{
pub fn get_command(&self) -> GatewayCommand {
self.command.clone()
}
@@ -138,7 +184,6 @@ impl Encodable for GatewayMessage {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += (self.command.clone() as u8).encode(&mut s)?;
len += self.id.encode(&mut s)?;
len += self.payload.encode(&mut s)?;
Ok(len)
}
@@ -146,25 +191,19 @@ impl Encodable for GatewayMessage {
impl Decodable for GatewayMessage {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let command_code: u8 = Decodable::decode(&mut d)?;
let command = match command_code {
0 => GatewayCommand::GetSlab,
1 => GatewayCommand::PutSlab,
2 => GatewayCommand::GetLastIndex,
let command_code: u8 = Decodable::decode(&mut d)?;
let command = match command_code {
0 => GatewayCommand::PutSlab,
1 => GatewayCommand::GetSlab,
_ => GatewayCommand::GetLastIndex,
};
Ok(Self {
command,
id: Decodable::decode(&mut d)?,
payload: Decodable::decode(&mut d)?,
})
Ok(Self { command, payload: Decodable::decode(&mut d)? })
}
}
impl crate::net::Message for GatewayMessage {
impl net::Message for GatewayMessage {
fn name() -> &'static str {
"reply"
}
}

View File

@@ -1,5 +1,5 @@
pub mod gateway;
pub mod reqrep;
pub mod gateway_p2p;
pub mod reqrep;
pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber};