From ec7ebd09fd4803f2b401dc06f93f33700c796169 Mon Sep 17 00:00:00 2001 From: aggstam Date: Tue, 19 Jul 2022 16:23:39 +0300 Subject: [PATCH] script/research/dhtd: rewrite methods to prevent lock race conditions, protocol to use atomic check for seen messages --- script/research/dhtd/src/dht.rs | 91 +++++++++++++++------------- script/research/dhtd/src/error.rs | 2 + script/research/dhtd/src/main.rs | 67 ++++++++++++++------ script/research/dhtd/src/protocol.rs | 50 +++++++++------ 4 files changed, 130 insertions(+), 80 deletions(-) diff --git a/script/research/dhtd/src/dht.rs b/script/research/dhtd/src/dht.rs index 170e9e58d..87d414c06 100644 --- a/script/research/dhtd/src/dht.rs +++ b/script/research/dhtd/src/dht.rs @@ -11,6 +11,7 @@ use darkfi::{ net, net::P2pPtr, util::{serial::serialize, sleep}, + Error::TorError, Result, }; @@ -26,6 +27,7 @@ const SEEN_DURATION: i64 = 120; /// Atomic pointer to DHT state pub type DhtPtr = Arc>; +// TODO: proper errors // TODO: lookup table to be based on directly connected peers, not broadcast based // TODO: replace Strings with blake3 hashes // Using string in structures because we are at an external crate @@ -157,31 +159,39 @@ impl Dht { Ok(Some(key)) } - /// Check if provided key exists and retrieve it from the local map or query the network. - pub async fn get(&self, key: String) -> Result>> { + /// Verify if provided key exists and return flag if local or in network + pub fn contains_key(&self, key: String) -> Option { + match self.lookup.contains_key(&key) { + true => Some(self.map.contains_key(&key)), + false => None, + } + } + + /// Get key from local map, acting as daemon cache + pub fn get(&self, key: String) -> Option<&Vec> { + self.map.get(&key) + } + + /// Generate key request and broadcast it to the network + pub async fn request_key(&self, key: String) -> Result<()> { // Verify the key exist in the lookup map. let peers = match self.lookup.get(&key) { Some(v) => v.clone(), - None => return Ok(None), + None => { + error!("Key doesn't exist."); + return Err(TorError("Key doesn't exist.".to_string())) + } }; debug!("Key is in peers: {:?}", peers); - // Each node holds a local map, acting as its cache. - // When the node receives a request for a key it doesn't hold, - // it will query the P2P network and saves the response in its local cache. - match self.map.get(&key) { - Some(v) => return Ok(Some(v.clone())), - None => debug!("Requested key doesn't exist locally, querying the network..."), - }; - // We retrieve p2p network connected channels, to verify if we // are connected to a network. // Using len here because is_empty() uses unstable library feature // called 'exact_size_is_empty'. if self.p2p.channels().lock().await.values().len() == 0 { - debug!("Node is not connected to other nodes"); - return Ok(None) + error!("Node is not connected to other nodes."); + return Err(TorError("Node is not connected to other nodes.".to_string())) } // We create a key request, and broadcast it to the network @@ -195,38 +205,35 @@ impl Dht { return Err(e) } - // Waiting network response - match self.waiting_for_response().await { - Ok(resp) => match resp { - Some(response) => Ok(Some(response.value)), - None => Ok(None), + Ok(()) + } +} + +// Auxilary function to wait for a key response from the P2P network. +pub async fn waiting_for_response(dht: DhtPtr) -> Result> { + let (p2p_recv_channel, stop_signal) = { + let _dht = dht.read().await; + (_dht.p2p_recv_channel.clone(), _dht.stop_signal.clone()) + }; + let ex = Arc::new(async_executor::Executor::new()); + let (timeout_s, timeout_r) = async_channel::unbounded::<()>(); + ex.spawn(async move { + sleep(Duration::from_millis(REQUEST_TIMEOUT).as_secs()).await; + timeout_s.send(()).await.unwrap_or(()); + }) + .detach(); + + loop { + select! { + msg = p2p_recv_channel.recv().fuse() => { + let response = msg?; + return Ok(Some(response)) }, - Err(e) => Err(e), + _ = stop_signal.recv().fuse() => break, + _ = timeout_r.recv().fuse() => break, } } - - // Auxilary function to wait for a key response from the P2P network. - async fn waiting_for_response(&self) -> Result> { - let ex = Arc::new(async_executor::Executor::new()); - let (timeout_s, timeout_r) = async_channel::unbounded::<()>(); - ex.spawn(async move { - sleep(Duration::from_millis(REQUEST_TIMEOUT).as_secs()).await; - timeout_s.send(()).await.unwrap_or(()); - }) - .detach(); - - loop { - select! { - msg = self.p2p_recv_channel.recv().fuse() => { - let response = msg?; - return Ok(Some(response)) - }, - _ = self.stop_signal.recv().fuse() => break, - _ = timeout_r.recv().fuse() => break, - } - } - Ok(None) - } + Ok(None) } // Auxilary function to periodically prun seen messages, based on when they were received. diff --git a/script/research/dhtd/src/error.rs b/script/research/dhtd/src/error.rs index 0b8db66c2..cf83b6be0 100644 --- a/script/research/dhtd/src/error.rs +++ b/script/research/dhtd/src/error.rs @@ -7,6 +7,7 @@ pub enum RpcError { QueryFailed = -35108, KeyInsertFail = -35110, KeyRemoveFail = -35111, + WaitingNetworkError = -35112, } fn to_tuple(e: RpcError) -> (i64, String) { @@ -15,6 +16,7 @@ fn to_tuple(e: RpcError) -> (i64, String) { RpcError::QueryFailed => "Failed to query key", RpcError::KeyInsertFail => "Failed to insert key", RpcError::KeyRemoveFail => "Failed to remove key", + RpcError::WaitingNetworkError => "Error while waiting network response.", }; (e as i64, msg.to_string()) diff --git a/script/research/dhtd/src/main.rs b/script/research/dhtd/src/main.rs index d2194b0d1..cc22358fe 100644 --- a/script/research/dhtd/src/main.rs +++ b/script/research/dhtd/src/main.rs @@ -29,7 +29,7 @@ mod error; use error::{server_error, RpcError}; mod dht; -use dht::{Dht, DhtPtr}; +use dht::{waiting_for_response, Dht, DhtPtr}; mod messages; @@ -99,30 +99,61 @@ impl Dhtd { } let key = params[0].to_string(); - let result = self.dht.read().await.get(key.clone()).await; - match result { - Ok(res) => match res { + + // We execute this sequence to prevent lock races between threads + // Verify key exists + let exists = self.dht.read().await.contains_key(key.clone()); + if let None = exists { + info!("Did not find key: {}", key); + return server_error(RpcError::UnknownKey, id).into() + } + + // Check if key is local or shoud query network + let local = exists.unwrap(); + if local { + match self.dht.read().await.get(key.clone()) { Some(value) => { - info!("Key found!"); - // Optionally, we insert the key to our local map. - // This must happen here because we got blocking/race conditions - // if we try to insert the value in dht.get(). - if let Err(e) = self.dht.write().await.insert(key.clone(), value.clone()).await - { - error!("Failed to insert key: {}", e); - return server_error(RpcError::KeyInsertFail, id) - } let string = std::str::from_utf8(&value).unwrap().to_string(); - JsonResponse::new(json!((key, string)), id).into() + return JsonResponse::new(json!((key, string)), id).into() } None => { info!("Did not find key: {}", key); - server_error(RpcError::UnknownKey, id).into() + return server_error(RpcError::UnknownKey, id).into() } - }, + } + } + + info!("Key doesn't exist locally, querring network..."); + if let Err(e) = self.dht.read().await.request_key(key.clone()).await { + error!("Failed to query key: {}", e); + return server_error(RpcError::QueryFailed, id).into() + } + + info!("Waiting response..."); + match waiting_for_response(self.dht.clone()).await { + Ok(response) => { + match response { + Some(resp) => { + info!("Key found!"); + // Optionally, we insert the key to our local map + if let Err(e) = + self.dht.write().await.insert(resp.key, resp.value.clone()).await + { + error!("Failed to insert key: {}", e); + return server_error(RpcError::KeyInsertFail, id) + } + let string = std::str::from_utf8(&resp.value).unwrap().to_string(); + JsonResponse::new(json!((key, string)), id).into() + } + None => { + info!("Did not find key: {}", key); + server_error(RpcError::UnknownKey, id).into() + } + } + } Err(e) => { - error!("Failed to query key: {}", e); - server_error(RpcError::QueryFailed, id).into() + error!("Error while waiting network response: {}", e); + server_error(RpcError::WaitingNetworkError, id).into() } } } diff --git a/script/research/dhtd/src/protocol.rs b/script/research/dhtd/src/protocol.rs index c733a5927..dd5dc549a 100644 --- a/script/research/dhtd/src/protocol.rs +++ b/script/research/dhtd/src/protocol.rs @@ -72,12 +72,17 @@ impl Protocol { let req_copy = (*req).clone(); debug!("Protocol::handle_receive_request(): req: {:?}", req_copy); - if self.dht.read().await.seen.contains_key(&req_copy.id) { - debug!("Protocol::handle_receive_request(): We have already seen this request."); - continue - } + { + let dht = &mut self.dht.write().await; + if dht.seen.contains_key(&req_copy.id) { + debug!( + "Protocol::handle_receive_request(): We have already seen this request." + ); + continue + } - self.dht.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + dht.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + } let daemon = self.dht.read().await.id.to_string(); if daemon != req_copy.to { @@ -120,26 +125,28 @@ impl Protocol { let resp_copy = (*resp).clone(); debug!("Protocol::handle_receive_response(): resp: {:?}", resp_copy); - if self.dht.read().await.seen.contains_key(&resp_copy.id) { - error!("0.1"); - debug!("Protocol::handle_receive_response(): We have already seen this response."); - continue + { + let dht = &mut self.dht.write().await; + if dht.seen.contains_key(&resp_copy.id) { + debug!( + "Protocol::handle_receive_request(): We have already seen this request." + ); + continue + } + + dht.seen.insert(resp_copy.id.clone(), Utc::now().timestamp()); } if self.dht.read().await.id.to_string() != resp_copy.to { - error!("2.1"); if let Err(e) = self.p2p.broadcast_with_exclude(resp_copy.clone(), &exclude_list).await { error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e); }; - - self.dht.write().await.seen.insert(resp_copy.id, Utc::now().timestamp()); continue } self.notify_queue_sender.send(resp_copy.clone()).await?; - self.dht.write().await.seen.insert(resp_copy.id, Utc::now().timestamp()); } } @@ -163,14 +170,17 @@ impl Protocol { continue } - if self.dht.read().await.seen.contains_key(&req_copy.id) { - debug!( - "Protocol::handle_receive_lookup_request(): We have already seen this request." - ); - continue - } + { + let dht = &mut self.dht.write().await; + if dht.seen.contains_key(&req_copy.id) { + debug!( + "Protocol::handle_receive_request(): We have already seen this request." + ); + continue + } - self.dht.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + dht.seen.insert(req_copy.id.clone(), Utc::now().timestamp()); + } let result = match req_copy.req_type { 0 => {