script/research/dhtd: decoupled structures, moved p2p actions inside Dht struct.

This commit is contained in:
aggstam
2022-07-19 00:05:59 +03:00
parent 7b0d19e8dd
commit 8e8ae97c9d
6 changed files with 398 additions and 365 deletions

View File

@@ -0,0 +1,256 @@
use async_executor::Executor;
use async_std::sync::{Arc, RwLock};
use chrono::Utc;
use futures::{select, FutureExt};
use fxhash::FxHashMap;
use log::{debug, error};
use rand::Rng;
use std::{collections::HashSet, time::Duration};
use darkfi::{
net,
net::P2pPtr,
util::{serial::serialize, sleep},
Result,
};
use crate::{
messages::{KeyRequest, KeyResponse, LookupRequest},
protocol::Protocol,
};
// Constants configuration
const REQUEST_TIMEOUT: u64 = 2400;
const SEEN_DURATION: i64 = 120;
/// Atomic pointer to DHT state
pub type DhtPtr = Arc<RwLock<Dht>>;
// TODO: lookup table to be based on directly connected peers, not broadcast based
// TODO: replace Strings with blake3 hashes
// Using string in structures because we are at an external crate
// and cant use blake3 serialization. To be replaced once merged with core src.
/// Struct representing DHT state.
pub struct Dht {
/// Daemon id
pub id: blake3::Hash,
/// Daemon hasmap
pub map: FxHashMap<String, Vec<u8>>,
/// Network lookup map, containing nodes that holds each key
pub lookup: FxHashMap<String, HashSet<String>>,
/// P2P network pointer
p2p: P2pPtr,
/// Channel to receive responses from P2P
p2p_recv_channel: async_channel::Receiver<KeyResponse>,
/// Stop signal channel to terminate background processes
stop_signal: async_channel::Receiver<()>,
/// Daemon seen requests/responses ids and timestamp,
/// to prevent rebroadcasting and loops
pub seen: FxHashMap<String, i64>,
}
impl Dht {
pub async fn new(
initial: Option<FxHashMap<String, HashSet<String>>>,
p2p_ptr: P2pPtr,
stop_signal: async_channel::Receiver<()>,
ex: Arc<Executor<'_>>,
) -> Result<DhtPtr> {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
let map = FxHashMap::default();
let lookup = match initial {
Some(l) => l,
None => FxHashMap::default(),
};
let p2p = p2p_ptr.clone();
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<KeyResponse>();
let seen = FxHashMap::default();
let dht = Arc::new(RwLock::new(Dht {
id,
map,
lookup,
p2p,
p2p_recv_channel,
stop_signal,
seen,
}));
// Registering P2P protocols
let registry = p2p_ptr.protocol_registry();
let _dht = dht.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p_ptr| {
let sender = p2p_send_channel.clone();
let dht = _dht.clone();
async move { Protocol::init(channel, sender, dht, p2p_ptr).await.unwrap() }
})
.await;
// Task to periodically clean up daemon seen messages
ex.spawn(prune_seen_messages(dht.clone())).detach();
Ok(dht)
}
/// Store provided key value pair and update lookup map
pub async fn insert(&mut self, key: String, value: Vec<u8>) -> Result<Option<String>> {
self.map.insert(key.clone(), value);
self.lookup_insert(key, self.id.to_string()).await
}
/// Remove provided key value pair and update lookup map
pub async fn remove(&mut self, key: String) -> Result<Option<String>> {
// Check if key value pair existed and act accordingly
match self.map.remove(&key) {
Some(_) => {
debug!("Key removed: {}", key);
let daemon = self.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 1);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return Err(e)
}
self.lookup_remove(key.clone(), self.id.to_string())
}
None => Ok(None),
}
}
/// Store provided key node pair in lookup map and update network
pub async fn lookup_insert(&mut self, key: String, node_id: String) -> Result<Option<String>> {
let mut lookup_set = match self.lookup.get(&key) {
Some(s) => s.clone(),
None => HashSet::new(),
};
lookup_set.insert(node_id);
self.lookup.insert(key.clone(), lookup_set);
let daemon = self.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 0);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return Err(e)
}
Ok(Some(key))
}
/// Remove provided node id from keys set in local lookup map
pub fn lookup_remove(&mut self, key: String, node_id: String) -> Result<Option<String>> {
if let Some(s) = self.lookup.get(&key) {
let mut lookup_set = s.clone();
lookup_set.remove(&node_id);
if lookup_set.is_empty() {
self.lookup.remove(&key);
} else {
self.lookup.insert(key.clone(), lookup_set);
}
}
Ok(Some(key))
}
/// Check if provided key exists and retrieve it from the local map or query the network.
pub async fn get(&self, key: String) -> Result<Option<Vec<u8>>> {
// Verify the key exist in the lookup map.
let peers = match self.lookup.get(&key) {
Some(v) => v.clone(),
None => return Ok(None),
};
debug!("Key is in peers: {:?}", peers);
// Each node holds a local map, acting as its cache.
// When the node receives a request for a key it doesn't hold,
// it will query the P2P network and saves the response in its local cache.
match self.map.get(&key) {
Some(v) => return Ok(Some(v.clone())),
None => debug!("Requested key doesn't exist locally, querying the network..."),
};
// We retrieve p2p network connected channels, to verify if we
// are connected to a network.
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if self.p2p.channels().lock().await.values().len() == 0 {
debug!("Node is not connected to other nodes");
return Ok(None)
}
// We create a key request, and broadcast it to the network
let daemon = self.id.to_string();
// We choose last known peer as request recipient
let peer = peers.iter().last().unwrap().to_string();
let request = KeyRequest::new(daemon.clone(), peer, key.clone());
// TODO: ask connected peers directly, not broadcast
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return Err(e)
}
// Waiting network response
match self.waiting_for_response().await {
Ok(resp) => match resp {
Some(response) => Ok(Some(response.value)),
None => Ok(None),
},
Err(e) => Err(e),
}
}
// Auxilary function to wait for a key response from the P2P network.
async fn waiting_for_response(&self) -> Result<Option<KeyResponse>> {
let ex = Arc::new(async_executor::Executor::new());
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
ex.spawn(async move {
sleep(Duration::from_millis(REQUEST_TIMEOUT).as_secs()).await;
timeout_s.send(()).await.unwrap_or(());
})
.detach();
loop {
select! {
msg = self.p2p_recv_channel.recv().fuse() => {
let response = msg?;
return Ok(Some(response))
},
_ = self.stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
}
}
Ok(None)
}
}
// Auxilary function to periodically prun seen messages, based on when they were received.
// This helps us to prevent broadcasting loops.
async fn prune_seen_messages(dht: DhtPtr) {
loop {
sleep(SEEN_DURATION as u64).await;
debug!("Pruning seen messages");
let now = Utc::now().timestamp();
let mut prune = vec![];
let map = dht.read().await.seen.clone();
for (k, v) in map.iter() {
if now - v > SEEN_DURATION {
prune.push(k);
}
}
let mut map = map.clone();
for i in prune {
map.remove(i);
}
dht.write().await.seen = map;
}
}

View File

@@ -5,7 +5,6 @@ use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult};
pub enum RpcError {
UnknownKey = -35107,
QueryFailed = -35108,
RequestBroadcastFail = -35109,
KeyInsertFail = -35110,
KeyRemoveFail = -35111,
}
@@ -14,7 +13,6 @@ fn to_tuple(e: RpcError) -> (i64, String) {
let msg = match e {
RpcError::UnknownKey => "Did not find key",
RpcError::QueryFailed => "Failed to query key",
RpcError::RequestBroadcastFail => "Failed to broadcast request",
RpcError::KeyInsertFail => "Failed to insert key",
RpcError::KeyRemoveFail => "Failed to remove key",
};

View File

@@ -1,20 +1,16 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use futures::{select, FutureExt};
use futures_lite::future;
use log::{debug, error, info, warn};
use log::{error, info};
use serde_derive::Deserialize;
use serde_json::{json, Value};
use std::time::Duration;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use url::Url;
use darkfi::{
async_daemonize, cli_desc, net,
net::P2pPtr,
rpc::{
jsonrpc::{
ErrorCode::{InvalidParams, MethodNotFound},
@@ -25,7 +21,6 @@ use darkfi::{
util::{
cli::{get_log_config, get_log_level, spawn_config},
path::get_config_path,
sleep,
},
Result,
};
@@ -33,16 +28,15 @@ use darkfi::{
mod error;
use error::{server_error, RpcError};
mod structures;
use structures::{Dht, DhtPtr, KeyRequest, KeyResponse, LookupRequest};
mod dht;
use dht::{Dht, DhtPtr};
mod messages;
mod protocol;
use protocol::Protocol;
const CONFIG_FILE: &str = "dhtd_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../dhtd_config.toml");
const REQUEST_TIMEOUT: u64 = 2400;
const SEEN_DURATION: i64 = 120;
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
@@ -87,22 +81,11 @@ struct Args {
pub struct Dhtd {
/// Daemon dht state
dht: DhtPtr,
/// P2P network pointer
p2p: P2pPtr,
/// Channel to receive responses from P2P
p2p_recv_channel: async_channel::Receiver<KeyResponse>,
/// Stop signal channel to terminate background processes
stop_signal: async_channel::Receiver<()>,
}
impl Dhtd {
pub async fn new(
dht: DhtPtr,
p2p: P2pPtr,
p2p_recv_channel: async_channel::Receiver<KeyResponse>,
stop_signal: async_channel::Receiver<()>,
) -> Result<Self> {
Ok(Self { dht, p2p, p2p_recv_channel, stop_signal })
pub async fn new(dht: DhtPtr) -> Result<Self> {
Ok(Self { dht })
}
// RPCAPI:
@@ -115,56 +98,22 @@ impl Dhtd {
return JsonError::new(InvalidParams, None, id).into()
}
// Node verifies the key exist in the lookup map.
let key = params[0].to_string();
let peers = match self.dht.read().await.lookup.get(&key) {
Some(v) => v.clone(),
None => {
info!("Did not find key: {}", key);
return server_error(RpcError::UnknownKey, id).into()
}
};
debug!("Key is in peers: {:?}", peers);
// Each node holds a local map, acting as its cache.
// When the node receives a request for a key it doesn't hold,
// it will query the P2P network and saves the response in its local cache.
match self.dht.read().await.map.get(&key) {
Some(v) => {
let string = std::str::from_utf8(&v).unwrap();
return JsonResponse::new(json!(string), id).into()
}
None => info!("Requested key doesn't exist locally, querying the network..."),
};
// We retrieve p2p network connected channels, to verify if we
// are connected to a network.
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if self.p2p.channels().lock().await.values().len() == 0 {
warn!("Node is not connected to other nodes");
return server_error(RpcError::UnknownKey, id).into()
}
// We create a key request, and broadcast it to the network
let daemon = self.dht.read().await.id.to_string();
// We choose last known peer as request recipient
let peer = peers.iter().last().unwrap().to_string();
let request = KeyRequest::new(daemon.clone(), peer, key.clone());
// TODO: ask connected peers directly, not broadcast
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
// Waiting network response
match self.waiting_for_response().await {
Ok(resp) => match resp {
Some(response) => {
let result = self.dht.read().await.get(key.clone()).await;
match result {
Ok(res) => match res {
Some(value) => {
info!("Key found!");
let string = std::str::from_utf8(&response.value).unwrap().to_string();
self.insert_pair(id, response.key, string).await
// Optionally, we insert the key to our local map.
// This must happen here because we got blocking/race conditions
// if we try to insert the value in dht.get().
if let Err(e) = self.dht.write().await.insert(key.clone(), value.clone()).await
{
error!("Failed to insert key: {}", e);
return server_error(RpcError::KeyInsertFail, id)
}
let string = std::str::from_utf8(&value).unwrap().to_string();
JsonResponse::new(json!((key, string)), id).into()
}
None => {
info!("Did not find key: {}", key);
@@ -178,31 +127,8 @@ impl Dhtd {
}
}
// Auxilary function to wait for a key response from the P2P network.
async fn waiting_for_response(&self) -> Result<Option<KeyResponse>> {
let ex = Arc::new(async_executor::Executor::new());
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
ex.spawn(async move {
sleep(Duration::from_millis(REQUEST_TIMEOUT).as_secs()).await;
timeout_s.send(()).await.unwrap_or(());
})
.detach();
loop {
select! {
msg = self.p2p_recv_channel.recv().fuse() => {
let response = msg?;
return Ok(Some(response))
},
_ = self.stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
}
}
Ok(None)
}
// RPCAPI:
// Insert key value pair in local map.
// Insert key value pair in dht.
// --> {"jsonrpc": "2.0", "method": "insert", "params": ["key", "value"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "(key, value)", "id": 1}
async fn insert(&self, id: Value, params: &[Value]) -> JsonResult {
@@ -213,23 +139,12 @@ impl Dhtd {
let key = params[0].to_string();
let value = params[1].to_string();
self.insert_pair(id, key, value).await
}
/// Auxilary function to handle pair insertion to dht
async fn insert_pair(&self, id: Value, key: String, value: String) -> JsonResult {
if let Err(e) = self.dht.write().await.insert(key.clone(), value.as_bytes().to_vec()) {
if let Err(e) = self.dht.write().await.insert(key.clone(), value.as_bytes().to_vec()).await
{
error!("Failed to insert key: {}", e);
return server_error(RpcError::KeyInsertFail, id)
}
let daemon = self.dht.read().await.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 0);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
JsonResponse::new(json!((key, value)), id).into()
}
@@ -244,19 +159,11 @@ impl Dhtd {
let key = params[0].to_string();
// Check if key value pair existed and act accordingly
let result = self.dht.write().await.remove(key.clone());
let result = self.dht.write().await.remove(key.clone()).await;
match result {
Ok(option) => match option {
Some(k) => {
info!("Key removed: {}", k);
let daemon = self.dht.read().await.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 1);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
JsonResponse::new(json!(k), id).into()
}
None => {
@@ -310,32 +217,6 @@ impl RequestHandler for Dhtd {
}
}
// Auxilary function to periodically prun seen messages, based on when they were received.
// This helps us to prevent broadcasting loops.
async fn prune_seen_messages(dht: DhtPtr) {
loop {
sleep(SEEN_DURATION as u64).await;
debug!("Pruning seen messages");
let now = Utc::now().timestamp();
let mut prune = vec![];
let map = dht.read().await.seen.clone();
for (k, v) in map.iter() {
if now - v > SEEN_DURATION {
prune.push(k);
}
}
let mut map = map.clone();
for i in prune {
map.remove(i);
}
dht.write().await.seen = map;
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// We use this handler to block this function after detaching all
@@ -347,9 +228,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
})
.unwrap();
// Initialize daemon dht
let dht = Dht::new(None).await?;
// P2P network
let network_settings = net::Settings {
inbound: args.p2p_accept,
@@ -360,27 +238,15 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
..Default::default()
};
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<KeyResponse>();
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
info!("Registering P2P protocols...");
let _dht = dht.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let sender = p2p_send_channel.clone();
let dht = _dht.clone();
async move { Protocol::init(channel, sender, dht, p2p).await.unwrap() }
})
.await;
// Initialize daemon dht
let dht = Dht::new(None, p2p.clone(), shutdown.clone(), ex.clone()).await?;
// Initialize daemon
let dhtd = Dhtd::new(dht.clone(), p2p.clone(), p2p_recv_channel, shutdown.clone()).await?;
let dhtd = Dhtd::new(dht.clone()).await?;
let dhtd = Arc::new(dhtd);
// Task to periodically clean up daemon seen messages
ex.spawn(prune_seen_messages(dht.clone())).detach();
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen, dhtd.clone())).detach();

View File

@@ -0,0 +1,95 @@
use rand::Rng;
use darkfi::{
net,
util::serial::{serialize, SerialDecodable, SerialEncodable},
};
/// This struct represents a DHT key request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyRequest {
/// Request id
pub id: String,
/// Daemon id requesting the key
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
}
impl KeyRequest {
pub fn new(from: String, to: String, key: String) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, from, to, key }
}
}
impl net::Message for KeyRequest {
fn name() -> &'static str {
"keyrequest"
}
}
/// This struct represents a DHT key request response
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyResponse {
/// Response id
pub id: String,
/// Daemon id holding the key
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
/// Key value
pub value: Vec<u8>,
}
impl KeyResponse {
pub fn new(from: String, to: String, key: String, value: Vec<u8>) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, from, to, key, value }
}
}
impl net::Message for KeyResponse {
fn name() -> &'static str {
"keyresponse"
}
}
/// This struct represents a lookup map request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct LookupRequest {
/// Request id
pub id: String,
/// Daemon id executing the request
pub daemon: String,
/// Key entry
pub key: String,
/// Request type
pub req_type: u8, // 0 for insert, 1 for remove
}
impl LookupRequest {
pub fn new(daemon: String, key: String, req_type: u8) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key, req_type }
}
}
impl net::Message for LookupRequest {
fn name() -> &'static str {
"lookuprequest"
}
}

View File

@@ -12,7 +12,10 @@ use darkfi::{
Result,
};
use crate::structures::{DhtPtr, KeyRequest, KeyResponse, LookupRequest};
use crate::{
dht::DhtPtr,
messages::{KeyRequest, KeyResponse, LookupRequest},
};
pub struct Protocol {
channel: ChannelPtr,
@@ -82,8 +85,8 @@ impl Protocol {
self.p2p.broadcast_with_exclude(req_copy.clone(), &exclude_list).await
{
error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e);
continue
};
continue
}
match self.dht.read().await.map.get(&req_copy.key) {
@@ -93,12 +96,10 @@ impl Protocol {
debug!("Protocol::handle_receive_request(): sending response: {:?}", response);
if let Err(e) = self.channel.send(response).await {
error!("Protocol::handle_receive_request(): p2p broadcast of response failed: {}", e);
continue
};
}
None => {
error!("Protocol::handle_receive_request(): Requested key doesn't exist locally: {}", req_copy.key);
continue
}
}
}
@@ -120,22 +121,25 @@ impl Protocol {
debug!("Protocol::handle_receive_response(): resp: {:?}", resp_copy);
if self.dht.read().await.seen.contains_key(&resp_copy.id) {
error!("0.1");
debug!("Protocol::handle_receive_response(): We have already seen this response.");
continue
}
self.dht.write().await.seen.insert(resp_copy.id.clone(), Utc::now().timestamp());
if self.dht.read().await.id.to_string() != resp_copy.to {
error!("2.1");
if let Err(e) =
self.p2p.broadcast_with_exclude(resp_copy.clone(), &exclude_list).await
{
error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e);
continue
};
self.dht.write().await.seen.insert(resp_copy.id, Utc::now().timestamp());
continue
}
self.notify_queue_sender.send(resp_copy).await?;
self.notify_queue_sender.send(resp_copy.clone()).await?;
self.dht.write().await.seen.insert(resp_copy.id, Utc::now().timestamp());
}
}
@@ -169,11 +173,13 @@ impl Protocol {
self.dht.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp());
let result = match req_copy.req_type {
0 => self
.dht
.write()
.await
.lookup_insert(req_copy.key.clone(), req_copy.daemon.clone()),
0 => {
self.dht
.write()
.await
.lookup_insert(req_copy.key.clone(), req_copy.daemon.clone())
.await
}
_ => self
.dht
.write()
@@ -188,7 +194,6 @@ impl Protocol {
if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await {
error!("Protocol::handle_receive_lookup_request(): p2p broadcast fail: {}", e);
continue
};
}
}

View File

@@ -1,187 +0,0 @@
use async_std::sync::{Arc, RwLock};
use fxhash::FxHashMap;
use rand::Rng;
use std::collections::HashSet;
use darkfi::{
net,
util::serial::{serialize, SerialDecodable, SerialEncodable},
Result,
};
/// Atomic pointer to DHT state
pub type DhtPtr = Arc<RwLock<Dht>>;
// TODO: lookup table to be based on directly connected peers, not broadcast based
// TODO: replace Strings with blake3 hashes
// Using string in structures because we are at an external crate
// and cant use blake3 serialization. To be replaced once merged with core src.
/// Struct representing DHT state.
pub struct Dht {
/// Daemon id
pub id: blake3::Hash,
/// Daemon hasmap
pub map: FxHashMap<String, Vec<u8>>,
/// Network lookup map, containing nodes that holds each key
pub lookup: FxHashMap<String, HashSet<String>>,
/// Daemon seen requests/responses ids and timestamp,
/// to prevent rebroadcasting and loops
pub seen: FxHashMap<String, i64>,
}
impl Dht {
pub async fn new(initial: Option<FxHashMap<String, HashSet<String>>>) -> Result<DhtPtr> {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
let map = FxHashMap::default();
let lookup = match initial {
Some(l) => l,
None => FxHashMap::default(),
};
let seen = FxHashMap::default();
let state = Arc::new(RwLock::new(Dht { id, map, lookup, seen }));
Ok(state)
}
/// Store provided key value pair and update local lookup map
pub fn insert(&mut self, key: String, value: Vec<u8>) -> Result<()> {
self.map.insert(key.clone(), value);
self.lookup_insert(key, self.id.to_string())
}
/// Remove provided key value pair and update local lookup map
pub fn remove(&mut self, key: String) -> Result<Option<String>> {
// Check if key value pair existed and act accordingly
let result = match self.map.remove(&key) {
Some(_) => {
self.lookup_remove(key.clone(), self.id.to_string())?;
Some(key)
}
None => None,
};
Ok(result)
}
/// Store provided key node pair in local lookup map
pub fn lookup_insert(&mut self, key: String, node_id: String) -> Result<()> {
let mut lookup_set = match self.lookup.get(&key) {
Some(s) => s.clone(),
None => HashSet::new(),
};
lookup_set.insert(node_id);
self.lookup.insert(key, lookup_set);
Ok(())
}
/// Remove provided node id from keys set in local lookup map
pub fn lookup_remove(&mut self, key: String, node_id: String) -> Result<()> {
if let Some(s) = self.lookup.get(&key) {
let mut lookup_set = s.clone();
lookup_set.remove(&node_id);
if lookup_set.is_empty() {
self.lookup.remove(&key);
} else {
self.lookup.insert(key, lookup_set);
}
}
Ok(())
}
}
/// This struct represents a DHT key request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyRequest {
/// Request id
pub id: String,
/// Daemon id requesting the key
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
}
impl KeyRequest {
pub fn new(from: String, to: String, key: String) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, from, to, key }
}
}
impl net::Message for KeyRequest {
fn name() -> &'static str {
"keyrequest"
}
}
/// This struct represents a DHT key request response
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyResponse {
/// Response id
pub id: String,
/// Daemon id holding the key
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
/// Key value
pub value: Vec<u8>,
}
impl KeyResponse {
pub fn new(from: String, to: String, key: String, value: Vec<u8>) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, from, to, key, value }
}
}
impl net::Message for KeyResponse {
fn name() -> &'static str {
"keyresponse"
}
}
/// This struct represents a lookup map request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct LookupRequest {
/// Request id
pub id: String,
/// Daemon id executing the request
pub daemon: String,
/// Key entry
pub key: String,
/// Request type
pub req_type: u8, // 0 for insert, 1 for remove
}
impl LookupRequest {
pub fn new(daemon: String, key: String, req_type: u8) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key, req_type }
}
}
impl net::Message for LookupRequest {
fn name() -> &'static str {
"lookuprequest"
}
}