From 64a354fffaeb85620c6599ff0a6c06fbea62cef8 Mon Sep 17 00:00:00 2001 From: aggstam Date: Fri, 15 Jul 2022 22:56:40 +0300 Subject: [PATCH] script/research/dhtd: simple lookup table impl added --- script/research/dhtd/src/error.rs | 4 + script/research/dhtd/src/main.rs | 101 ++++++++++++++++++++---- script/research/dhtd/src/protocol.rs | 85 ++++++++++++++++++-- script/research/dhtd/src/structures.rs | 104 ++++++++++++++++++++++--- 4 files changed, 265 insertions(+), 29 deletions(-) diff --git a/script/research/dhtd/src/error.rs b/script/research/dhtd/src/error.rs index 7c599bdee..44e590dfd 100644 --- a/script/research/dhtd/src/error.rs +++ b/script/research/dhtd/src/error.rs @@ -6,6 +6,8 @@ pub enum RpcError { UnknownKey = -35107, QueryFailed = -35108, RequestBroadcastFail = -35109, + KeyInsertFail = -35110, + KeyRemoveFail = -35111, } fn to_tuple(e: RpcError) -> (i64, String) { @@ -13,6 +15,8 @@ fn to_tuple(e: RpcError) -> (i64, String) { RpcError::UnknownKey => "Did not find key", RpcError::QueryFailed => "Failed to query key", RpcError::RequestBroadcastFail => "Failed to broadcast request", + RpcError::KeyInsertFail => "Failed to insert key", + RpcError::KeyRemoveFail => "Failed to remove key", }; (e as i64, msg.to_string()) diff --git a/script/research/dhtd/src/main.rs b/script/research/dhtd/src/main.rs index 874420a4c..19f6a500a 100644 --- a/script/research/dhtd/src/main.rs +++ b/script/research/dhtd/src/main.rs @@ -34,7 +34,7 @@ mod error; use error::{server_error, RpcError}; mod structures; -use structures::{KeyRequest, KeyResponse, State, StatePtr}; +use structures::{KeyRequest, KeyResponse, LookupRequest, State, StatePtr}; mod protocol; use protocol::Protocol; @@ -82,7 +82,8 @@ struct Args { } /// Struct representing DHT daemon state. -/// In this example we store String data. +/// This example/temp-impl stores String data. +/// In final version everything will be in bytes (Vec {"jsonrpc": "2.0", "method": "get", "params": ["key"], "id": 1} // <-- {"jsonrpc": "2.0", "result": "value", "id": 1} @@ -114,16 +115,27 @@ impl Dhtd { return JsonError::new(InvalidParams, None, id).into() } + // Node verifies the key exist in the lookup map. + let key = params[0].to_string(); + let peers = match self.state.read().await.lookup.get(&key) { + Some(v) => v.clone(), + None => { + info!("Did not find key: {}", key); + return server_error(RpcError::UnknownKey, id).into() + } + }; + + debug!("Key is in peers: {:?}", peers); + // Each node holds a local map, acting as its cache. // When the node receives a request for a key it doesn't hold, // it will query the P2P network and saves the response in its local cache. - let key = params[0].to_string(); match self.state.read().await.map.get(&key) { Some(v) => { let string = std::str::from_utf8(&v).unwrap(); return JsonResponse::new(json!(string), id).into() } - None => info!("Requested key doesn't exist, querying the network..."), + None => info!("Requested key doesn't exist locally, querying the network..."), }; // We retrieve p2p network connected channels, to verify if we @@ -136,9 +148,11 @@ impl Dhtd { } // We create a key request, and broadcast it to the network - // TODO: this should be based on the lookup table, and ask peers directly let daemon = self.state.read().await.id.to_string(); - let request = KeyRequest::new(daemon, key.clone()); + // We choose last known peer as request recipient + let peer = peers.iter().last().unwrap().to_string(); + let request = KeyRequest::new(daemon.clone(), peer, key.clone()); + // TODO: ask connected peers directly, not broadcast if let Err(e) = self.p2p.broadcast(request).await { error!("Failed broadcasting request: {}", e); return server_error(RpcError::RequestBroadcastFail, id) @@ -149,9 +163,8 @@ impl Dhtd { Ok(resp) => match resp { Some(response) => { info!("Key found!"); - self.state.write().await.map.insert(response.key, response.value.clone()); - let string = std::str::from_utf8(&response.value).unwrap(); - JsonResponse::new(json!(string), id).into() + let string = std::str::from_utf8(&response.value).unwrap().to_string(); + self.insert_pair(id, response.key, string).await } None => { info!("Did not find key: {}", key); @@ -166,7 +179,6 @@ impl Dhtd { } // Auxilary function to wait for a key response from the P2P network. - // TODO: if no node holds the key, we shouldn't wait until the request timeout. async fn waiting_for_response(&self) -> Result> { let ex = Arc::new(async_executor::Executor::new()); let (timeout_s, timeout_r) = async_channel::unbounded::<()>(); @@ -201,12 +213,64 @@ impl Dhtd { let key = params[0].to_string(); let value = params[1].to_string(); - self.state.write().await.map.insert(key.clone(), value.as_bytes().to_vec()); - // TODO: inform network for the insert/update + self.insert_pair(id, key, value).await + } + + /// Auxilary function to handle pair insertion to state + async fn insert_pair(&self, id: Value, key: String, value: String) -> JsonResult { + if let Err(e) = self.state.write().await.insert(key.clone(), value.as_bytes().to_vec()) { + error!("Failed to insert key: {}", e); + return server_error(RpcError::KeyInsertFail, id) + } + + let daemon = self.state.read().await.id.to_string(); + let request = LookupRequest::new(daemon, key.clone(), 0); + if let Err(e) = self.p2p.broadcast(request).await { + error!("Failed broadcasting request: {}", e); + return server_error(RpcError::RequestBroadcastFail, id) + } JsonResponse::new(json!((key, value)), id).into() } + // RPCAPI: + // Remove key value pair from local map. + // --> {"jsonrpc": "2.0", "method": "remove", "params": ["key"], "id": 1} + // <-- {"jsonrpc": "2.0", "result": "key", "id": 1} + async fn remove(&self, id: Value, params: &[Value]) -> JsonResult { + if params.len() != 1 || !params[0].is_string() { + return JsonError::new(InvalidParams, None, id).into() + } + + let key = params[0].to_string(); + // Check if key value pair existed and act accordingly + let result = self.state.write().await.remove(key.clone()); + match result { + Ok(option) => match option { + Some(k) => { + info!("Key removed: {}", k); + + let daemon = self.state.read().await.id.to_string(); + let request = LookupRequest::new(daemon, key.clone(), 1); + if let Err(e) = self.p2p.broadcast(request).await { + error!("Failed broadcasting request: {}", e); + return server_error(RpcError::RequestBroadcastFail, id) + } + + JsonResponse::new(json!(k), id).into() + } + None => { + info!("Did not find key: {}", key); + server_error(RpcError::UnknownKey, id).into() + } + }, + Err(e) => { + error!("Failed to remove key: {}", e); + server_error(RpcError::KeyRemoveFail, id) + } + } + } + // RPCAPI: // Returns current local map. // --> {"jsonrpc": "2.0", "method": "map", "params": [], "id": 1} @@ -215,6 +279,15 @@ impl Dhtd { let map = self.state.read().await.map.clone(); JsonResponse::new(json!(map), id).into() } + + // RPCAPI: + // Returns current lookup map. + // --> {"jsonrpc": "2.0", "method": "lookup", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": "lookup", "id": 1} + pub async fn lookup(&self, id: Value, _params: &[Value]) -> JsonResult { + let lookup = self.state.read().await.lookup.clone(); + JsonResponse::new(json!(lookup), id).into() + } } #[async_trait] @@ -229,7 +302,9 @@ impl RequestHandler for Dhtd { match req.method.as_str() { Some("get") => return self.get(req.id, params).await, Some("insert") => return self.insert(req.id, params).await, + Some("remove") => return self.remove(req.id, params).await, Some("map") => return self.map(req.id, params).await, + Some("lookup") => return self.lookup(req.id, params).await, Some(_) | None => return JsonError::new(MethodNotFound, None, req.id).into(), } } diff --git a/script/research/dhtd/src/protocol.rs b/script/research/dhtd/src/protocol.rs index cfa46e3a8..84060e21d 100644 --- a/script/research/dhtd/src/protocol.rs +++ b/script/research/dhtd/src/protocol.rs @@ -12,13 +12,14 @@ use darkfi::{ Result, }; -use crate::structures::{KeyRequest, KeyResponse, StatePtr}; +use crate::structures::{KeyRequest, KeyResponse, LookupRequest, StatePtr}; pub struct Protocol { channel: ChannelPtr, notify_queue_sender: async_channel::Sender, req_sub: MessageSubscription, resp_sub: MessageSubscription, + lookup_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: StatePtr, p2p: P2pPtr, @@ -35,15 +36,18 @@ impl Protocol { let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let req_sub = channel.subscribe_msg::().await?; let resp_sub = channel.subscribe_msg::().await?; + let lookup_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { channel: channel.clone(), notify_queue_sender, req_sub, resp_sub, + lookup_sub, jobsman: ProtocolJobsManager::new("Protocol", channel), state, p2p, @@ -72,9 +76,20 @@ impl Protocol { self.state.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + let daemon = self.state.read().await.id.to_string(); + if daemon != req_copy.to { + if let Err(e) = + self.p2p.broadcast_with_exclude(req_copy.clone(), &exclude_list).await + { + error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e); + continue + }; + } + match self.state.read().await.map.get(&req_copy.key) { Some(value) => { - let response = KeyResponse::new(req_copy.daemon, req_copy.key, value.clone()); + let response = + KeyResponse::new(daemon, req_copy.from, req_copy.key, value.clone()); debug!("Protocol::handle_receive_request(): sending response: {:?}", response); if let Err(e) = self.channel.send(response).await { error!("Protocol::handle_receive_request(): p2p broadcast of response failed: {}", e); @@ -82,10 +97,8 @@ impl Protocol { }; } None => { - if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await { - error!("Protocol::handle_receive_request(): p2p broadcast fail: {}", e); - continue - }; + error!("Protocol::handle_receive_request(): Requested key doesn't exist locally: {}", req_copy.key); + continue } } } @@ -113,7 +126,7 @@ impl Protocol { self.state.write().await.seen.insert(resp_copy.id.clone(), Utc::now().timestamp()); - if self.state.read().await.id.to_string() != resp_copy.daemon { + if self.state.read().await.id.to_string() != resp_copy.to { if let Err(e) = self.p2p.broadcast_with_exclude(resp_copy.clone(), &exclude_list).await { @@ -125,6 +138,60 @@ impl Protocol { self.notify_queue_sender.send(resp_copy).await?; } } + + async fn handle_receive_lookup_request(self: Arc) -> Result<()> { + debug!("Protocol::handle_receive_lookup_request() [START]"); + let exclude_list = vec![self.channel.address()]; + loop { + let req = match self.lookup_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("Protocol::handle_receive_lookup_request(): recv fail: {}", e); + continue + } + }; + + let req_copy = (*req).clone(); + debug!("Protocol::handle_receive_lookup_request(): req: {:?}", req_copy); + + if !(0..=1).contains(&req_copy.req_type) { + debug!("Protocol::handle_receive_lookup_request(): Unknown request type."); + continue + } + + if self.state.read().await.seen.contains_key(&req_copy.id) { + debug!( + "Protocol::handle_receive_lookup_request(): We have already seen this request." + ); + continue + } + + self.state.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + + let result = match req_copy.req_type { + 0 => self + .state + .write() + .await + .lookup_insert(req_copy.key.clone(), req_copy.daemon.clone()), + _ => self + .state + .write() + .await + .lookup_remove(req_copy.key.clone(), req_copy.daemon.clone()), + }; + + if let Err(e) = result { + error!("Protocol::handle_receive_lookup_request(): request action failed: {}", e); + continue + }; + + if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await { + error!("Protocol::handle_receive_lookup_request(): p2p broadcast fail: {}", e); + continue + }; + } + } } #[async_trait] @@ -134,6 +201,10 @@ impl ProtocolBase for Protocol { self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; self.jobsman.clone().spawn(self.clone().handle_receive_response(), executor.clone()).await; + self.jobsman + .clone() + .spawn(self.clone().handle_receive_lookup_request(), executor.clone()) + .await; debug!("Protocol::start() [END]"); Ok(()) } diff --git a/script/research/dhtd/src/structures.rs b/script/research/dhtd/src/structures.rs index 97546f8b2..6a7d850b0 100644 --- a/script/research/dhtd/src/structures.rs +++ b/script/research/dhtd/src/structures.rs @@ -1,6 +1,7 @@ use async_std::sync::{Arc, RwLock}; use fxhash::FxHashMap; use rand::Rng; +use std::collections::HashSet; use darkfi::{ net, @@ -11,7 +12,8 @@ use darkfi::{ /// Atomic pointer to DHT daemon state pub type StatePtr = Arc>; -// TODO: add lookup table +// TODO: lookup table to be based on directly connected peers, not broadcast based +// TODO: replace Strings with blake3 hashes // Using string in structures because we are at an external crate // and cant use blake3 serialization. To be replaced once merged with core src. @@ -21,6 +23,8 @@ pub struct State { pub id: blake3::Hash, /// Daemon hasmap pub map: FxHashMap>, + /// Network lookup map, containing nodes that holds each key + pub lookup: FxHashMap>, /// Daemon seen requests/responses ids and timestamp, /// to prevent rebroadcasting and loops pub seen: FxHashMap, @@ -33,12 +37,61 @@ impl State { let n: u16 = rng.gen(); let id = blake3::hash(&serialize(&n)); let map = FxHashMap::default(); + let lookup = FxHashMap::default(); let seen = FxHashMap::default(); - let state = Arc::new(RwLock::new(State { id, map, seen })); + let state = Arc::new(RwLock::new(State { id, map, lookup, seen })); Ok(state) } + + /// Store provided key value pair and update local lookup map + pub fn insert(&mut self, key: String, value: Vec) -> Result<()> { + self.map.insert(key.clone(), value); + self.lookup_insert(key, self.id.to_string()) + } + + /// Remove provided key value pair and update local lookup map + pub fn remove(&mut self, key: String) -> Result> { + // Check if key value pair existed and act accordingly + let result = match self.map.remove(&key) { + Some(_) => { + self.lookup_remove(key.clone(), self.id.to_string())?; + Some(key) + } + None => None, + }; + + Ok(result) + } + + /// Store provided key node pair in local lookup map + pub fn lookup_insert(&mut self, key: String, node_id: String) -> Result<()> { + let mut lookup_set = match self.lookup.get(&key) { + Some(s) => s.clone(), + None => HashSet::new(), + }; + + lookup_set.insert(node_id); + self.lookup.insert(key, lookup_set); + + Ok(()) + } + + /// Remove provided node id from keys set in local lookup map + pub fn lookup_remove(&mut self, key: String, node_id: String) -> Result<()> { + if let Some(s) = self.lookup.get(&key) { + let mut lookup_set = s.clone(); + lookup_set.remove(&node_id); + if lookup_set.is_empty() { + self.lookup.remove(&key); + } else { + self.lookup.insert(key, lookup_set); + } + } + + Ok(()) + } } /// This struct represents a DHT key request @@ -47,18 +100,20 @@ pub struct KeyRequest { /// Request id pub id: String, /// Daemon id requesting the key - pub daemon: String, + pub from: String, + /// Daemon id holding the key + pub to: String, /// Key entry pub key: String, } impl KeyRequest { - pub fn new(daemon: String, key: String) -> Self { + pub fn new(from: String, to: String, key: String) -> Self { // Generate a random id let mut rng = rand::thread_rng(); let n: u16 = rng.gen(); let id = blake3::hash(&serialize(&n)).to_string(); - Self { id, daemon, key } + Self { id, from, to, key } } } @@ -73,8 +128,10 @@ impl net::Message for KeyRequest { pub struct KeyResponse { /// Response id pub id: String, - /// Daemon id requested the key - pub daemon: String, + /// Daemon id holding the key + pub from: String, + /// Daemon id holding the key + pub to: String, /// Key entry pub key: String, /// Key value @@ -82,12 +139,12 @@ pub struct KeyResponse { } impl KeyResponse { - pub fn new(daemon: String, key: String, value: Vec) -> Self { + pub fn new(from: String, to: String, key: String, value: Vec) -> Self { // Generate a random id let mut rng = rand::thread_rng(); let n: u16 = rng.gen(); let id = blake3::hash(&serialize(&n)).to_string(); - Self { id, daemon, key, value } + Self { id, from, to, key, value } } } @@ -96,3 +153,32 @@ impl net::Message for KeyResponse { "keyresponse" } } + +/// This struct represents a lookup map request +#[derive(Debug, Clone, SerialDecodable, SerialEncodable)] +pub struct LookupRequest { + /// Request id + pub id: String, + /// Daemon id executing the request + pub daemon: String, + /// Key entry + pub key: String, + /// Request type + pub req_type: u8, // 0 for insert, 1 for remove +} + +impl LookupRequest { + pub fn new(daemon: String, key: String, req_type: u8) -> Self { + // Generate a random id + let mut rng = rand::thread_rng(); + let n: u16 = rng.gen(); + let id = blake3::hash(&serialize(&n)).to_string(); + Self { id, daemon, key, req_type } + } +} + +impl net::Message for LookupRequest { + fn name() -> &'static str { + "lookuprequest" + } +}