From 0ef1a450862b77280c238ae79b237ccc548cf52b Mon Sep 17 00:00:00 2001 From: aggstam Date: Fri, 12 Aug 2022 20:25:24 +0300 Subject: [PATCH] src/dht: lookup map syncing implemented --- script/research/fud/README.md | 6 ++++ script/research/fud/fud/src/main.rs | 5 +++ src/dht/dht.rs | 41 +++++++++++++++++++++-- src/dht/messages.rs | 52 +++++++++++++++++++++++++++++ src/dht/protocol.rs | 44 +++++++++++++++++++++++- src/util/serial.rs | 50 +++++++++++++++++++++++++++ 6 files changed, 195 insertions(+), 3 deletions(-) diff --git a/script/research/fud/README.md b/script/research/fud/README.md index bb2678294..f23c09e47 100644 --- a/script/research/fud/README.md +++ b/script/research/fud/README.md @@ -56,6 +56,12 @@ Run fud as follows: 13:23:07 [INFO] Caught termination signal, cleaning up and exiting... ``` +After daemon has been initialized, execute network syncing as follows: +``` +% fu sync +13:25:46 [INFO] Daemon synced successfully! +``` + fu ======= diff --git a/script/research/fud/fud/src/main.rs b/script/research/fud/fud/src/main.rs index 7ee4a999f..5c26396f0 100644 --- a/script/research/fud/fud/src/main.rs +++ b/script/research/fud/fud/src/main.rs @@ -196,6 +196,11 @@ impl Fud { let records = lock.map.clone(); let mut entries_hashes = HashSet::new(); + // Sync lookup map with network + if let Err(e) = lock.sync_lookup_map().await { + error!("Failed to sync lookup map: {}", e); + } + // We iterate files for new records for entry in entries { let e = entry.unwrap(); diff --git a/src/dht/dht.rs b/src/dht/dht.rs index f90bcd4fb..ed620b199 100644 --- a/src/dht/dht.rs +++ b/src/dht/dht.rs @@ -3,7 +3,7 @@ use async_std::sync::{Arc, RwLock}; use chrono::Utc; use futures::{select, FutureExt}; use fxhash::FxHashMap; -use log::{debug, error}; +use log::{debug, error, warn}; use rand::Rng; use std::{collections::HashSet, time::Duration}; @@ -16,7 +16,7 @@ use crate::{ }; use super::{ - messages::{KeyRequest, KeyResponse, LookupRequest}, + messages::{KeyRequest, KeyResponse, LookupMapRequest, LookupMapResponse, LookupRequest}, protocol::Protocol, }; @@ -217,6 +217,43 @@ impl Dht { Ok(()) } + + /// Auxilary function to sync lookup map with network + pub async fn sync_lookup_map(&mut self) -> Result<()> { + debug!("Starting lookup map sync..."); + + // Using len here because is_empty() uses unstable library feature + // called 'exact_size_is_empty'. + if self.p2p.channels().lock().await.values().len() != 0 { + // Currently we will just use the last channel + let channel = self.p2p.channels().lock().await.values().last().unwrap().clone(); + + // Communication setup + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = channel.subscribe_msg::().await?; + + // Node creates a `LookupMapRequest` and sends it + let order = LookupMapRequest::new(self.id); + channel.send(order).await?; + + // Node stores response data. + let resp = response_sub.receive().await?; + + // Store retrieved records + debug!("Processing received records"); + for (k, v) in &resp.lookup { + for node in v { + self.lookup_insert(*k, *node)?; + } + } + } else { + warn!("Node is not connected to other nodes"); + } + + debug!("Lookup map synced!"); + Ok(()) + } } // Auxilary function to wait for a key response from the P2P network. diff --git a/src/dht/messages.rs b/src/dht/messages.rs index 3ffe35183..dfc4b95ca 100644 --- a/src/dht/messages.rs +++ b/src/dht/messages.rs @@ -1,4 +1,6 @@ +use fxhash::FxHashMap; use rand::Rng; +use std::collections::HashSet; use crate::{ net, @@ -93,3 +95,53 @@ impl net::Message for LookupRequest { "lookuprequest" } } + +/// Auxiliary structure used for lookup map syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct LookupMapRequest { + /// Request id + pub id: blake3::Hash, + /// Daemon id executing the request + pub daemon: blake3::Hash, +} + +impl LookupMapRequest { + pub fn new(daemon: blake3::Hash) -> Self { + // Generate a random id + let mut rng = rand::thread_rng(); + let n: u16 = rng.gen(); + let id = blake3::hash(&serialize(&n)); + Self { id, daemon } + } +} + +impl net::Message for LookupMapRequest { + fn name() -> &'static str { + "lookupmaprequest" + } +} + +/// Auxiliary structure used for consensus syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct LookupMapResponse { + /// Request id + pub id: blake3::Hash, + /// Daemon lookup map, containing nodes that holds each key + pub lookup: FxHashMap>, +} + +impl LookupMapResponse { + pub fn new(lookup: FxHashMap>) -> Self { + // Generate a random id + let mut rng = rand::thread_rng(); + let n: u16 = rng.gen(); + let id = blake3::hash(&serialize(&n)); + Self { id, lookup } + } +} + +impl net::Message for LookupMapResponse { + fn name() -> &'static str { + "lookupmapresponse" + } +} diff --git a/src/dht/protocol.rs b/src/dht/protocol.rs index fe98f42aa..0d7769542 100644 --- a/src/dht/protocol.rs +++ b/src/dht/protocol.rs @@ -14,7 +14,7 @@ use crate::{ use super::{ dht::DhtPtr, - messages::{KeyRequest, KeyResponse, LookupRequest}, + messages::{KeyRequest, KeyResponse, LookupMapRequest, LookupMapResponse, LookupRequest}, }; pub struct Protocol { @@ -23,6 +23,7 @@ pub struct Protocol { req_sub: MessageSubscription, resp_sub: MessageSubscription, lookup_sub: MessageSubscription, + lookup_map_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, dht: DhtPtr, p2p: P2pPtr, @@ -40,10 +41,12 @@ impl Protocol { msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let req_sub = channel.subscribe_msg::().await?; let resp_sub = channel.subscribe_msg::().await?; let lookup_sub = channel.subscribe_msg::().await?; + let lookup_map_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { channel: channel.clone(), @@ -51,6 +54,7 @@ impl Protocol { req_sub, resp_sub, lookup_sub, + lookup_map_sub, jobsman: ProtocolJobsManager::new("Protocol", channel), dht, p2p, @@ -205,6 +209,40 @@ impl Protocol { }; } } + + async fn handle_receive_lookup_map_request(self: Arc) -> Result<()> { + debug!("Protocol::handle_receive_lookup_map_request() [START]"); + loop { + let req = match self.lookup_map_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("Protocol::handle_receive_lookup_map_request(): recv fail: {}", e); + continue + } + }; + + debug!("Protocol::handle_receive_lookup_map_request(): req: {:?}", req); + + { + let dht = &mut self.dht.write().await; + if dht.seen.contains_key(&req.id) { + debug!( + "Protocol::handle_receive_lookup_map_request(): We have already seen this request." + ); + continue + } + + dht.seen.insert(req.id.clone(), Utc::now().timestamp()); + } + + // Extra validations can be added here. + let lookup = self.dht.read().await.lookup.clone(); + let response = LookupMapResponse::new(lookup); + if let Err(e) = self.channel.send(response).await { + error!("Protocol::handle_receive_lookup_map_request() channel send fail: {}", e); + }; + } + } } #[async_trait] @@ -218,6 +256,10 @@ impl ProtocolBase for Protocol { .clone() .spawn(self.clone().handle_receive_lookup_request(), executor.clone()) .await; + self.jobsman + .clone() + .spawn(self.clone().handle_receive_lookup_map_request(), executor.clone()) + .await; debug!("Protocol::start() [END]"); Ok(()) } diff --git a/src/util/serial.rs b/src/util/serial.rs index 8e6d0fad3..79d8c9d36 100644 --- a/src/util/serial.rs +++ b/src/util/serial.rs @@ -1,5 +1,7 @@ +use fxhash::FxHashMap; use std::{ borrow::Cow, + collections::HashSet, io, io::{Cursor, Read, Write}, mem, @@ -645,6 +647,54 @@ impl Decodable for blake3::Hash { } } +impl Encodable for HashSet { + fn encode(&self, mut s: S) -> Result { + let mut len = 0; + len += VarInt(self.len() as u64).encode(&mut s)?; + for c in self.iter() { + len += c.encode(&mut s)?; + } + Ok(len) + } +} + +impl Decodable for HashSet { + fn decode(mut d: D) -> Result { + let len = VarInt::decode(&mut d)?.0; + let mut ret = HashSet::new(); + for _ in 0..len { + let entry: T = Decodable::decode(&mut d)?; + ret.insert(entry); + } + Ok(ret) + } +} + +impl Encodable for FxHashMap { + fn encode(&self, mut s: S) -> Result { + let mut len = 0; + len += VarInt(self.len() as u64).encode(&mut s)?; + for c in self.iter() { + len += c.0.encode(&mut s)?; + len += c.1.encode(&mut s)?; + } + Ok(len) + } +} + +impl Decodable for FxHashMap { + fn decode(mut d: D) -> Result { + let len = VarInt::decode(&mut d)?.0; + let mut ret = FxHashMap::default(); + for _ in 0..len { + let key: T = Decodable::decode(&mut d)?; + let entry: U = Decodable::decode(&mut d)?; + ret.insert(key, entry); + } + Ok(ret) + } +} + // Tuples macro_rules! tuple_encode { ($($x:ident),*) => (