src/dht: lookup map syncing implemented

This commit is contained in:
aggstam
2022-08-12 20:25:24 +03:00
parent eb1a9f1349
commit 0ef1a45086
6 changed files with 195 additions and 3 deletions

View File

@@ -56,6 +56,12 @@ Run fud as follows:
13:23:07 [INFO] Caught termination signal, cleaning up and exiting...
```
After daemon has been initialized, execute network syncing as follows:
```
% fu sync
13:25:46 [INFO] Daemon synced successfully!
```
fu
=======

View File

@@ -196,6 +196,11 @@ impl Fud {
let records = lock.map.clone();
let mut entries_hashes = HashSet::new();
// Sync lookup map with network
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
// We iterate files for new records
for entry in entries {
let e = entry.unwrap();

View File

@@ -3,7 +3,7 @@ use async_std::sync::{Arc, RwLock};
use chrono::Utc;
use futures::{select, FutureExt};
use fxhash::FxHashMap;
use log::{debug, error};
use log::{debug, error, warn};
use rand::Rng;
use std::{collections::HashSet, time::Duration};
@@ -16,7 +16,7 @@ use crate::{
};
use super::{
messages::{KeyRequest, KeyResponse, LookupRequest},
messages::{KeyRequest, KeyResponse, LookupMapRequest, LookupMapResponse, LookupRequest},
protocol::Protocol,
};
@@ -217,6 +217,43 @@ impl Dht {
Ok(())
}
/// Auxilary function to sync lookup map with network
pub async fn sync_lookup_map(&mut self) -> Result<()> {
debug!("Starting lookup map sync...");
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if self.p2p.channels().lock().await.values().len() != 0 {
// Currently we will just use the last channel
let channel = self.p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<LookupMapResponse>().await;
let response_sub = channel.subscribe_msg::<LookupMapResponse>().await?;
// Node creates a `LookupMapRequest` and sends it
let order = LookupMapRequest::new(self.id);
channel.send(order).await?;
// Node stores response data.
let resp = response_sub.receive().await?;
// Store retrieved records
debug!("Processing received records");
for (k, v) in &resp.lookup {
for node in v {
self.lookup_insert(*k, *node)?;
}
}
} else {
warn!("Node is not connected to other nodes");
}
debug!("Lookup map synced!");
Ok(())
}
}
// Auxilary function to wait for a key response from the P2P network.

View File

@@ -1,4 +1,6 @@
use fxhash::FxHashMap;
use rand::Rng;
use std::collections::HashSet;
use crate::{
net,
@@ -93,3 +95,53 @@ impl net::Message for LookupRequest {
"lookuprequest"
}
}
/// Auxiliary structure used for lookup map syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct LookupMapRequest {
/// Request id
pub id: blake3::Hash,
/// Daemon id executing the request
pub daemon: blake3::Hash,
}
impl LookupMapRequest {
pub fn new(daemon: blake3::Hash) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
Self { id, daemon }
}
}
impl net::Message for LookupMapRequest {
fn name() -> &'static str {
"lookupmaprequest"
}
}
/// Auxiliary structure used for consensus syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct LookupMapResponse {
/// Request id
pub id: blake3::Hash,
/// Daemon lookup map, containing nodes that holds each key
pub lookup: FxHashMap<blake3::Hash, HashSet<blake3::Hash>>,
}
impl LookupMapResponse {
pub fn new(lookup: FxHashMap<blake3::Hash, HashSet<blake3::Hash>>) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
Self { id, lookup }
}
}
impl net::Message for LookupMapResponse {
fn name() -> &'static str {
"lookupmapresponse"
}
}

View File

@@ -14,7 +14,7 @@ use crate::{
use super::{
dht::DhtPtr,
messages::{KeyRequest, KeyResponse, LookupRequest},
messages::{KeyRequest, KeyResponse, LookupMapRequest, LookupMapResponse, LookupRequest},
};
pub struct Protocol {
@@ -23,6 +23,7 @@ pub struct Protocol {
req_sub: MessageSubscription<KeyRequest>,
resp_sub: MessageSubscription<KeyResponse>,
lookup_sub: MessageSubscription<LookupRequest>,
lookup_map_sub: MessageSubscription<LookupMapRequest>,
jobsman: ProtocolJobsManagerPtr,
dht: DhtPtr,
p2p: P2pPtr,
@@ -40,10 +41,12 @@ impl Protocol {
msg_subsystem.add_dispatch::<KeyRequest>().await;
msg_subsystem.add_dispatch::<KeyResponse>().await;
msg_subsystem.add_dispatch::<LookupRequest>().await;
msg_subsystem.add_dispatch::<LookupMapRequest>().await;
let req_sub = channel.subscribe_msg::<KeyRequest>().await?;
let resp_sub = channel.subscribe_msg::<KeyResponse>().await?;
let lookup_sub = channel.subscribe_msg::<LookupRequest>().await?;
let lookup_map_sub = channel.subscribe_msg::<LookupMapRequest>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
@@ -51,6 +54,7 @@ impl Protocol {
req_sub,
resp_sub,
lookup_sub,
lookup_map_sub,
jobsman: ProtocolJobsManager::new("Protocol", channel),
dht,
p2p,
@@ -205,6 +209,40 @@ impl Protocol {
};
}
}
async fn handle_receive_lookup_map_request(self: Arc<Self>) -> Result<()> {
debug!("Protocol::handle_receive_lookup_map_request() [START]");
loop {
let req = match self.lookup_map_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("Protocol::handle_receive_lookup_map_request(): recv fail: {}", e);
continue
}
};
debug!("Protocol::handle_receive_lookup_map_request(): req: {:?}", req);
{
let dht = &mut self.dht.write().await;
if dht.seen.contains_key(&req.id) {
debug!(
"Protocol::handle_receive_lookup_map_request(): We have already seen this request."
);
continue
}
dht.seen.insert(req.id.clone(), Utc::now().timestamp());
}
// Extra validations can be added here.
let lookup = self.dht.read().await.lookup.clone();
let response = LookupMapResponse::new(lookup);
if let Err(e) = self.channel.send(response).await {
error!("Protocol::handle_receive_lookup_map_request() channel send fail: {}", e);
};
}
}
}
#[async_trait]
@@ -218,6 +256,10 @@ impl ProtocolBase for Protocol {
.clone()
.spawn(self.clone().handle_receive_lookup_request(), executor.clone())
.await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_lookup_map_request(), executor.clone())
.await;
debug!("Protocol::start() [END]");
Ok(())
}

View File

@@ -1,5 +1,7 @@
use fxhash::FxHashMap;
use std::{
borrow::Cow,
collections::HashSet,
io,
io::{Cursor, Read, Write},
mem,
@@ -645,6 +647,54 @@ impl Decodable for blake3::Hash {
}
}
impl<T: Encodable> Encodable for HashSet<T> {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += VarInt(self.len() as u64).encode(&mut s)?;
for c in self.iter() {
len += c.encode(&mut s)?;
}
Ok(len)
}
}
impl<T: Decodable + std::cmp::Eq + std::hash::Hash> Decodable for HashSet<T> {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = HashSet::new();
for _ in 0..len {
let entry: T = Decodable::decode(&mut d)?;
ret.insert(entry);
}
Ok(ret)
}
}
impl<T: Encodable, U: Encodable> Encodable for FxHashMap<T, U> {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += VarInt(self.len() as u64).encode(&mut s)?;
for c in self.iter() {
len += c.0.encode(&mut s)?;
len += c.1.encode(&mut s)?;
}
Ok(len)
}
}
impl<T: Decodable + std::cmp::Eq + std::hash::Hash, U: Decodable> Decodable for FxHashMap<T, U> {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = FxHashMap::default();
for _ in 0..len {
let key: T = Decodable::decode(&mut d)?;
let entry: U = Decodable::decode(&mut d)?;
ret.insert(key, entry);
}
Ok(ret)
}
}
// Tuples
macro_rules! tuple_encode {
($($x:ident),*) => (