script/research/dhtd: simple lookup table impl added

This commit is contained in:
aggstam
2022-07-15 22:56:40 +03:00
parent 455b349ebc
commit 64a354fffa
4 changed files with 265 additions and 29 deletions

View File

@@ -6,6 +6,8 @@ pub enum RpcError {
UnknownKey = -35107,
QueryFailed = -35108,
RequestBroadcastFail = -35109,
KeyInsertFail = -35110,
KeyRemoveFail = -35111,
}
fn to_tuple(e: RpcError) -> (i64, String) {
@@ -13,6 +15,8 @@ fn to_tuple(e: RpcError) -> (i64, String) {
RpcError::UnknownKey => "Did not find key",
RpcError::QueryFailed => "Failed to query key",
RpcError::RequestBroadcastFail => "Failed to broadcast request",
RpcError::KeyInsertFail => "Failed to insert key",
RpcError::KeyRemoveFail => "Failed to remove key",
};
(e as i64, msg.to_string())

View File

@@ -34,7 +34,7 @@ mod error;
use error::{server_error, RpcError};
mod structures;
use structures::{KeyRequest, KeyResponse, State, StatePtr};
use structures::{KeyRequest, KeyResponse, LookupRequest, State, StatePtr};
mod protocol;
use protocol::Protocol;
@@ -82,7 +82,8 @@ struct Args {
}
/// Struct representing DHT daemon state.
/// In this example we store String data.
/// This example/temp-impl stores String data.
/// In final version everything will be in bytes (Vec<u8).
pub struct Dhtd {
/// Daemon state
state: StatePtr,
@@ -105,7 +106,7 @@ impl Dhtd {
}
// RPCAPI:
// Checks if provided key exist in local map, otherwise queries the network.
// Checks if provided key exists and retrieve it from the local map or queries the network.
// Returns key value or not found message.
// --> {"jsonrpc": "2.0", "method": "get", "params": ["key"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "value", "id": 1}
@@ -114,16 +115,27 @@ impl Dhtd {
return JsonError::new(InvalidParams, None, id).into()
}
// Node verifies the key exist in the lookup map.
let key = params[0].to_string();
let peers = match self.state.read().await.lookup.get(&key) {
Some(v) => v.clone(),
None => {
info!("Did not find key: {}", key);
return server_error(RpcError::UnknownKey, id).into()
}
};
debug!("Key is in peers: {:?}", peers);
// Each node holds a local map, acting as its cache.
// When the node receives a request for a key it doesn't hold,
// it will query the P2P network and saves the response in its local cache.
let key = params[0].to_string();
match self.state.read().await.map.get(&key) {
Some(v) => {
let string = std::str::from_utf8(&v).unwrap();
return JsonResponse::new(json!(string), id).into()
}
None => info!("Requested key doesn't exist, querying the network..."),
None => info!("Requested key doesn't exist locally, querying the network..."),
};
// We retrieve p2p network connected channels, to verify if we
@@ -136,9 +148,11 @@ impl Dhtd {
}
// We create a key request, and broadcast it to the network
// TODO: this should be based on the lookup table, and ask peers directly
let daemon = self.state.read().await.id.to_string();
let request = KeyRequest::new(daemon, key.clone());
// We choose last known peer as request recipient
let peer = peers.iter().last().unwrap().to_string();
let request = KeyRequest::new(daemon.clone(), peer, key.clone());
// TODO: ask connected peers directly, not broadcast
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
@@ -149,9 +163,8 @@ impl Dhtd {
Ok(resp) => match resp {
Some(response) => {
info!("Key found!");
self.state.write().await.map.insert(response.key, response.value.clone());
let string = std::str::from_utf8(&response.value).unwrap();
JsonResponse::new(json!(string), id).into()
let string = std::str::from_utf8(&response.value).unwrap().to_string();
self.insert_pair(id, response.key, string).await
}
None => {
info!("Did not find key: {}", key);
@@ -166,7 +179,6 @@ impl Dhtd {
}
// Auxilary function to wait for a key response from the P2P network.
// TODO: if no node holds the key, we shouldn't wait until the request timeout.
async fn waiting_for_response(&self) -> Result<Option<KeyResponse>> {
let ex = Arc::new(async_executor::Executor::new());
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
@@ -201,12 +213,64 @@ impl Dhtd {
let key = params[0].to_string();
let value = params[1].to_string();
self.state.write().await.map.insert(key.clone(), value.as_bytes().to_vec());
// TODO: inform network for the insert/update
self.insert_pair(id, key, value).await
}
/// Auxilary function to handle pair insertion to state
async fn insert_pair(&self, id: Value, key: String, value: String) -> JsonResult {
if let Err(e) = self.state.write().await.insert(key.clone(), value.as_bytes().to_vec()) {
error!("Failed to insert key: {}", e);
return server_error(RpcError::KeyInsertFail, id)
}
let daemon = self.state.read().await.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 0);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
JsonResponse::new(json!((key, value)), id).into()
}
// RPCAPI:
// Remove key value pair from local map.
// --> {"jsonrpc": "2.0", "method": "remove", "params": ["key"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "key", "id": 1}
async fn remove(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
let key = params[0].to_string();
// Check if key value pair existed and act accordingly
let result = self.state.write().await.remove(key.clone());
match result {
Ok(option) => match option {
Some(k) => {
info!("Key removed: {}", k);
let daemon = self.state.read().await.id.to_string();
let request = LookupRequest::new(daemon, key.clone(), 1);
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
JsonResponse::new(json!(k), id).into()
}
None => {
info!("Did not find key: {}", key);
server_error(RpcError::UnknownKey, id).into()
}
},
Err(e) => {
error!("Failed to remove key: {}", e);
server_error(RpcError::KeyRemoveFail, id)
}
}
}
// RPCAPI:
// Returns current local map.
// --> {"jsonrpc": "2.0", "method": "map", "params": [], "id": 1}
@@ -215,6 +279,15 @@ impl Dhtd {
let map = self.state.read().await.map.clone();
JsonResponse::new(json!(map), id).into()
}
// RPCAPI:
// Returns current lookup map.
// --> {"jsonrpc": "2.0", "method": "lookup", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "lookup", "id": 1}
pub async fn lookup(&self, id: Value, _params: &[Value]) -> JsonResult {
let lookup = self.state.read().await.lookup.clone();
JsonResponse::new(json!(lookup), id).into()
}
}
#[async_trait]
@@ -229,7 +302,9 @@ impl RequestHandler for Dhtd {
match req.method.as_str() {
Some("get") => return self.get(req.id, params).await,
Some("insert") => return self.insert(req.id, params).await,
Some("remove") => return self.remove(req.id, params).await,
Some("map") => return self.map(req.id, params).await,
Some("lookup") => return self.lookup(req.id, params).await,
Some(_) | None => return JsonError::new(MethodNotFound, None, req.id).into(),
}
}

View File

@@ -12,13 +12,14 @@ use darkfi::{
Result,
};
use crate::structures::{KeyRequest, KeyResponse, StatePtr};
use crate::structures::{KeyRequest, KeyResponse, LookupRequest, StatePtr};
pub struct Protocol {
channel: ChannelPtr,
notify_queue_sender: async_channel::Sender<KeyResponse>,
req_sub: MessageSubscription<KeyRequest>,
resp_sub: MessageSubscription<KeyResponse>,
lookup_sub: MessageSubscription<LookupRequest>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
@@ -35,15 +36,18 @@ impl Protocol {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<KeyRequest>().await;
msg_subsystem.add_dispatch::<KeyResponse>().await;
msg_subsystem.add_dispatch::<LookupRequest>().await;
let req_sub = channel.subscribe_msg::<KeyRequest>().await?;
let resp_sub = channel.subscribe_msg::<KeyResponse>().await?;
let lookup_sub = channel.subscribe_msg::<LookupRequest>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
notify_queue_sender,
req_sub,
resp_sub,
lookup_sub,
jobsman: ProtocolJobsManager::new("Protocol", channel),
state,
p2p,
@@ -72,9 +76,20 @@ impl Protocol {
self.state.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp());
let daemon = self.state.read().await.id.to_string();
if daemon != req_copy.to {
if let Err(e) =
self.p2p.broadcast_with_exclude(req_copy.clone(), &exclude_list).await
{
error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e);
continue
};
}
match self.state.read().await.map.get(&req_copy.key) {
Some(value) => {
let response = KeyResponse::new(req_copy.daemon, req_copy.key, value.clone());
let response =
KeyResponse::new(daemon, req_copy.from, req_copy.key, value.clone());
debug!("Protocol::handle_receive_request(): sending response: {:?}", response);
if let Err(e) = self.channel.send(response).await {
error!("Protocol::handle_receive_request(): p2p broadcast of response failed: {}", e);
@@ -82,10 +97,8 @@ impl Protocol {
};
}
None => {
if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await {
error!("Protocol::handle_receive_request(): p2p broadcast fail: {}", e);
continue
};
error!("Protocol::handle_receive_request(): Requested key doesn't exist locally: {}", req_copy.key);
continue
}
}
}
@@ -113,7 +126,7 @@ impl Protocol {
self.state.write().await.seen.insert(resp_copy.id.clone(), Utc::now().timestamp());
if self.state.read().await.id.to_string() != resp_copy.daemon {
if self.state.read().await.id.to_string() != resp_copy.to {
if let Err(e) =
self.p2p.broadcast_with_exclude(resp_copy.clone(), &exclude_list).await
{
@@ -125,6 +138,60 @@ impl Protocol {
self.notify_queue_sender.send(resp_copy).await?;
}
}
async fn handle_receive_lookup_request(self: Arc<Self>) -> Result<()> {
debug!("Protocol::handle_receive_lookup_request() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let req = match self.lookup_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("Protocol::handle_receive_lookup_request(): recv fail: {}", e);
continue
}
};
let req_copy = (*req).clone();
debug!("Protocol::handle_receive_lookup_request(): req: {:?}", req_copy);
if !(0..=1).contains(&req_copy.req_type) {
debug!("Protocol::handle_receive_lookup_request(): Unknown request type.");
continue
}
if self.state.read().await.seen.contains_key(&req_copy.id) {
debug!(
"Protocol::handle_receive_lookup_request(): We have already seen this request."
);
continue
}
self.state.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp());
let result = match req_copy.req_type {
0 => self
.state
.write()
.await
.lookup_insert(req_copy.key.clone(), req_copy.daemon.clone()),
_ => self
.state
.write()
.await
.lookup_remove(req_copy.key.clone(), req_copy.daemon.clone()),
};
if let Err(e) = result {
error!("Protocol::handle_receive_lookup_request(): request action failed: {}", e);
continue
};
if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await {
error!("Protocol::handle_receive_lookup_request(): p2p broadcast fail: {}", e);
continue
};
}
}
}
#[async_trait]
@@ -134,6 +201,10 @@ impl ProtocolBase for Protocol {
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_response(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_lookup_request(), executor.clone())
.await;
debug!("Protocol::start() [END]");
Ok(())
}

View File

@@ -1,6 +1,7 @@
use async_std::sync::{Arc, RwLock};
use fxhash::FxHashMap;
use rand::Rng;
use std::collections::HashSet;
use darkfi::{
net,
@@ -11,7 +12,8 @@ use darkfi::{
/// Atomic pointer to DHT daemon state
pub type StatePtr = Arc<RwLock<State>>;
// TODO: add lookup table
// TODO: lookup table to be based on directly connected peers, not broadcast based
// TODO: replace Strings with blake3 hashes
// Using string in structures because we are at an external crate
// and cant use blake3 serialization. To be replaced once merged with core src.
@@ -21,6 +23,8 @@ pub struct State {
pub id: blake3::Hash,
/// Daemon hasmap
pub map: FxHashMap<String, Vec<u8>>,
/// Network lookup map, containing nodes that holds each key
pub lookup: FxHashMap<String, HashSet<String>>,
/// Daemon seen requests/responses ids and timestamp,
/// to prevent rebroadcasting and loops
pub seen: FxHashMap<String, i64>,
@@ -33,12 +37,61 @@ impl State {
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
let map = FxHashMap::default();
let lookup = FxHashMap::default();
let seen = FxHashMap::default();
let state = Arc::new(RwLock::new(State { id, map, seen }));
let state = Arc::new(RwLock::new(State { id, map, lookup, seen }));
Ok(state)
}
/// Store provided key value pair and update local lookup map
pub fn insert(&mut self, key: String, value: Vec<u8>) -> Result<()> {
self.map.insert(key.clone(), value);
self.lookup_insert(key, self.id.to_string())
}
/// Remove provided key value pair and update local lookup map
pub fn remove(&mut self, key: String) -> Result<Option<String>> {
// Check if key value pair existed and act accordingly
let result = match self.map.remove(&key) {
Some(_) => {
self.lookup_remove(key.clone(), self.id.to_string())?;
Some(key)
}
None => None,
};
Ok(result)
}
/// Store provided key node pair in local lookup map
pub fn lookup_insert(&mut self, key: String, node_id: String) -> Result<()> {
let mut lookup_set = match self.lookup.get(&key) {
Some(s) => s.clone(),
None => HashSet::new(),
};
lookup_set.insert(node_id);
self.lookup.insert(key, lookup_set);
Ok(())
}
/// Remove provided node id from keys set in local lookup map
pub fn lookup_remove(&mut self, key: String, node_id: String) -> Result<()> {
if let Some(s) = self.lookup.get(&key) {
let mut lookup_set = s.clone();
lookup_set.remove(&node_id);
if lookup_set.is_empty() {
self.lookup.remove(&key);
} else {
self.lookup.insert(key, lookup_set);
}
}
Ok(())
}
}
/// This struct represents a DHT key request
@@ -47,18 +100,20 @@ pub struct KeyRequest {
/// Request id
pub id: String,
/// Daemon id requesting the key
pub daemon: String,
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
}
impl KeyRequest {
pub fn new(daemon: String, key: String) -> Self {
pub fn new(from: String, to: String, key: String) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key }
Self { id, from, to, key }
}
}
@@ -73,8 +128,10 @@ impl net::Message for KeyRequest {
pub struct KeyResponse {
/// Response id
pub id: String,
/// Daemon id requested the key
pub daemon: String,
/// Daemon id holding the key
pub from: String,
/// Daemon id holding the key
pub to: String,
/// Key entry
pub key: String,
/// Key value
@@ -82,12 +139,12 @@ pub struct KeyResponse {
}
impl KeyResponse {
pub fn new(daemon: String, key: String, value: Vec<u8>) -> Self {
pub fn new(from: String, to: String, key: String, value: Vec<u8>) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key, value }
Self { id, from, to, key, value }
}
}
@@ -96,3 +153,32 @@ impl net::Message for KeyResponse {
"keyresponse"
}
}
/// This struct represents a lookup map request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct LookupRequest {
/// Request id
pub id: String,
/// Daemon id executing the request
pub daemon: String,
/// Key entry
pub key: String,
/// Request type
pub req_type: u8, // 0 for insert, 1 for remove
}
impl LookupRequest {
pub fn new(daemon: String, key: String, req_type: u8) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key, req_type }
}
}
impl net::Message for LookupRequest {
fn name() -> &'static str {
"lookuprequest"
}
}