script/research/dhtd: implementation of a simple DHT.

This commit is contained in:
aggstam
2022-07-12 15:05:35 +03:00
parent 2db183418d
commit 016f30e62d
7 changed files with 650 additions and 0 deletions

2
script/research/dhtd/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
Cargo.lock

View File

@@ -0,0 +1,34 @@
[package]
name = "dhtd"
version = "0.3.0"
edition = "2021"
[dependencies.darkfi]
path = "../../../"
features = ["util"]
[dependencies]
async-channel = "1.6.1"
async-executor = "1.4.1"
async-std = "1.12.0"
async-trait = "0.1.56"
blake3 = "1.3.1"
chrono = "0.4.19"
ctrlc-async = {version = "3.2.2", default-features = false, features = ["async-std", "termination"]}
easy-parallel = "3.2.0"
futures = "0.3.21"
futures-lite = "1.12.0"
fxhash = "0.2.1"
log = "0.4.17"
rand = "0.8.5"
serde_json = "1.0.82"
simplelog = "0.12.0"
url = "2.2.2"
# Argument parsing
serde = "1.0.138"
serde_derive = "1.0.138"
structopt = "0.3.26"
structopt-toml = "0.5.0"
[workspace]

View File

@@ -0,0 +1,25 @@
## dht configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
# JSON-RPC listen URL
#rpc_listen = "tcp://127.0.0.1:9540"
# P2P accept address
#p2p_accept = "tls://127.0.0.1:9541"
# P2P external address
#p2p_external = "tls://127.0.0.1:9541"
# Connection slots
#slots = 8
# Seed nodes to connect to
#seed = []
# Peers to connect to
#peer = []

View File

@@ -0,0 +1,24 @@
use serde_json::Value;
use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult};
pub enum RpcError {
UnknownKey = -35107,
QueryFailed = -35108,
RequestBroadcastFail = -35109,
}
fn to_tuple(e: RpcError) -> (i64, String) {
let msg = match e {
RpcError::UnknownKey => "Did not find key",
RpcError::QueryFailed => "Failed to query key",
RpcError::RequestBroadcastFail => "Failed to broadcast request",
};
(e as i64, msg.to_string())
}
pub fn server_error(e: RpcError, id: Value) -> JsonResult {
let (code, msg) = to_tuple(e);
JsonError::new(ServerError(code), Some(msg), id).into()
}

View File

@@ -0,0 +1,325 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use futures::{select, FutureExt};
use futures_lite::future;
use log::{debug, error, info, warn};
use serde_derive::Deserialize;
use serde_json::{json, Value};
use std::time::Duration;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use url::Url;
use darkfi::{
async_daemonize, cli_desc, net,
net::P2pPtr,
rpc::{
jsonrpc::{
ErrorCode::{InvalidParams, MethodNotFound},
JsonError, JsonRequest, JsonResponse, JsonResult,
},
server::{listen_and_serve, RequestHandler},
},
util::{
cli::{get_log_config, get_log_level, spawn_config},
path::get_config_path,
sleep,
},
Result,
};
mod error;
use error::{server_error, RpcError};
mod structures;
use structures::{KeyRequest, KeyResponse, State, StatePtr};
mod protocol;
use protocol::Protocol;
const CONFIG_FILE: &str = "dhtd_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../dhtd_config.toml");
const REQUEST_TIMEOUT: u64 = 2400;
const SEEN_DURATION: i64 = 120;
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "dhtd", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
/// Configuration file to use
config: Option<String>,
#[structopt(long, default_value = "tcp://127.0.0.1:9540")]
/// JSON-RPC listen URL
rpc_listen: Url,
#[structopt(long)]
/// P2P accept address
p2p_accept: Option<Url>,
#[structopt(long)]
/// P2P external address
p2p_external: Option<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots
slots: u32,
#[structopt(long)]
/// Connect to seed (repeatable flag)
p2p_seed: Vec<Url>,
#[structopt(long)]
/// Connect to peer (repeatable flag)
p2p_peer: Vec<Url>,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
}
/// Struct representing DHT daemon state
pub struct Dhtd {
/// Daemon state
state: StatePtr,
/// P2P network pointer
p2p: P2pPtr,
/// Channel to receive responses from P2P
p2p_recv_channel: async_channel::Receiver<KeyResponse>,
/// Stop signal channel to terminate background processes
stop_signal: async_channel::Receiver<()>,
}
impl Dhtd {
pub async fn new(
state: StatePtr,
p2p: P2pPtr,
p2p_recv_channel: async_channel::Receiver<KeyResponse>,
stop_signal: async_channel::Receiver<()>,
) -> Result<Self> {
Ok(Self { state, p2p, p2p_recv_channel, stop_signal })
}
// RPCAPI:
// Checks if provided key exist in local map, otherwise queries the network.
// Returns key value or not found message.
// --> {"jsonrpc": "2.0", "method": "get", "params": ["key"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "value", "id": 1}
async fn get(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
// Each node holds a local map, acting as its cache.
// When the node receives a request for a key it doesn't hold,
// it will query the P2P network and saves the response in its local cache.
let key = params[0].to_string();
match self.state.read().await.map.get(&key) {
Some(v) => return JsonResponse::new(json!(v), id).into(),
None => info!("Requested key doesn't exist, querying the network..."),
};
// We retrieve p2p network connected channels, to verify if we
// are connected to a network.
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if self.p2p.channels().lock().await.values().len() == 0 {
warn!("Node is not connected to other nodes");
return server_error(RpcError::UnknownKey, id).into()
}
// We create a key request, and broadcast it to the network
// TODO: this should be based on the lookup table, and ask peers directly
let daemon = self.state.read().await.id.to_string();
let request = KeyRequest::new(daemon, key.clone());
if let Err(e) = self.p2p.broadcast(request).await {
error!("Failed broadcasting request: {}", e);
return server_error(RpcError::RequestBroadcastFail, id)
}
// Waiting network response
match self.waiting_for_response().await {
Ok(resp) => match resp {
Some(response) => {
info!("Key found!");
self.state.write().await.map.insert(response.key, response.value.clone());
JsonResponse::new(json!(response.value), id).into()
}
None => {
info!("Did not find key: {}", key);
server_error(RpcError::UnknownKey, id).into()
}
},
Err(e) => {
error!("Failed to query key: {}", e);
server_error(RpcError::QueryFailed, id).into()
}
}
}
// Auxilary function to wait for a key response from the P2P network.
// TODO: if no node holds the key, we shouldn't wait until the request timeout.
async fn waiting_for_response(&self) -> Result<Option<KeyResponse>> {
let ex = Arc::new(async_executor::Executor::new());
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
ex.spawn(async move {
sleep(Duration::from_millis(REQUEST_TIMEOUT).as_secs()).await;
timeout_s.send(()).await.unwrap_or(());
})
.detach();
loop {
select! {
msg = self.p2p_recv_channel.recv().fuse() => {
let response = msg?;
return Ok(Some(response))
},
_ = self.stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
}
}
Ok(None)
}
// RPCAPI:
// Insert key value pair in local map.
// --> {"jsonrpc": "2.0", "method": "insert", "params": ["key", "value"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "(key, value)", "id": 1}
async fn insert(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 2 || !params[0].is_string() || !params[1].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
let key = params[0].to_string();
let value = params[1].to_string();
self.state.write().await.map.insert(key.clone(), value.clone());
// TODO: inform network for the insert/update
JsonResponse::new(json!((key, value)), id).into()
}
// RPCAPI:
// Returns current local map.
// --> {"jsonrpc": "2.0", "method": "map", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "map", "id": 1}
pub async fn map(&self, id: Value, _params: &[Value]) -> JsonResult {
let map = self.state.read().await.map.clone();
JsonResponse::new(json!(map), id).into()
}
}
#[async_trait]
impl RequestHandler for Dhtd {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if !req.params.is_array() {
return JsonError::new(InvalidParams, None, req.id).into()
}
let params = req.params.as_array().unwrap();
match req.method.as_str() {
Some("get") => return self.get(req.id, params).await,
Some("insert") => return self.insert(req.id, params).await,
Some("map") => return self.map(req.id, params).await,
Some(_) | None => return JsonError::new(MethodNotFound, None, req.id).into(),
}
}
}
// Auxilary function to periodically prun seen messages, based on when they were received.
// This helps us to prevent broadcasting loops.
async fn prune_seen_messages(state: StatePtr) {
loop {
sleep(SEEN_DURATION as u64).await;
debug!("Pruning seen messages");
let now = Utc::now().timestamp();
let mut prune = vec![];
let map = state.read().await.seen.clone();
for (k, v) in map.iter() {
if now - v > SEEN_DURATION {
prune.push(k);
}
}
let mut map = map.clone();
for i in prune {
map.remove(i);
}
state.write().await.seen = map;
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// We use this handler to block this function after detaching all
// tasks, and to catch a shutdown signal, where we can clean up and
// exit gracefully.
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
signal.send(()).await.unwrap();
})
.unwrap();
// Initialize daemon state
let state = State::new().await?;
// P2P network
let network_settings = net::Settings {
inbound: args.p2p_accept,
outbound_connections: args.slots,
external_addr: args.p2p_external,
peers: args.p2p_seed.clone(),
seeds: args.p2p_seed.clone(),
..Default::default()
};
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<KeyResponse>();
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
info!("Registering P2P protocols...");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let sender = p2p_send_channel.clone();
let state = _state.clone();
async move { Protocol::init(channel, sender, state, p2p).await.unwrap() }
})
.await;
// Initialize program state
let dhtd = Dhtd::new(state.clone(), p2p.clone(), p2p_recv_channel, shutdown.clone()).await?;
let dhtd = Arc::new(dhtd);
// Task to periodically clean up daemon seen messages
ex.spawn(prune_seen_messages(state.clone())).detach();
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen, dhtd.clone())).detach();
info!("Starting sync P2P network");
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
error!("Failed starting P2P network: {}", e);
}
})
.detach();
// Wait for SIGINT
shutdown.recv().await?;
print!("\r");
info!("Caught termination signal, cleaning up and exiting...");
Ok(())
}

View File

@@ -0,0 +1,144 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use log::{debug, error};
use darkfi::{
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
use crate::structures::{KeyRequest, KeyResponse, StatePtr};
pub struct Protocol {
channel: ChannelPtr,
notify_queue_sender: async_channel::Sender<KeyResponse>,
req_sub: MessageSubscription<KeyRequest>,
resp_sub: MessageSubscription<KeyResponse>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
}
impl Protocol {
pub async fn init(
channel: ChannelPtr,
notify_queue_sender: async_channel::Sender<KeyResponse>,
state: StatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
debug!("Adding Protocol to the protocol registry");
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<KeyRequest>().await;
msg_subsystem.add_dispatch::<KeyResponse>().await;
let req_sub = channel.subscribe_msg::<KeyRequest>().await?;
let resp_sub = channel.subscribe_msg::<KeyResponse>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
notify_queue_sender,
req_sub,
resp_sub,
jobsman: ProtocolJobsManager::new("Protocol", channel),
state,
p2p,
}))
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!("Protocol::handle_receive_request() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let req = match self.req_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("Protocol::handle_receive_request(): recv fail: {}", e);
continue
}
};
let req_copy = (*req).clone();
debug!("Protocol::handle_receive_request(): req: {:?}", req_copy);
if self.state.read().await.seen.contains_key(&req_copy.id) {
debug!("Protocol::handle_receive_request(): We have already seen this request.");
continue
}
self.state.write().await.seen.insert(req_copy.id.clone(), Utc::now().timestamp());
match self.state.read().await.map.get(&req_copy.key) {
Some(value) => {
let response = KeyResponse::new(req_copy.daemon, req_copy.key, value.clone());
debug!("Protocol::handle_receive_request(): sending response: {:?}", response);
if let Err(e) = self.channel.send(response).await {
error!("Protocol::handle_receive_request(): p2p broadcast of response failed: {}", e);
continue
};
}
None => {
if let Err(e) = self.p2p.broadcast_with_exclude(req_copy, &exclude_list).await {
error!("Protocol::handle_receive_request(): p2p broadcast fail: {}", e);
continue
};
}
}
}
}
async fn handle_receive_response(self: Arc<Self>) -> Result<()> {
debug!("Protocol::handle_receive_response() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let resp = match self.resp_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("Protocol::handle_receive_response(): recv fail: {}", e);
continue
}
};
let resp_copy = (*resp).clone();
debug!("Protocol::handle_receive_response(): resp: {:?}", resp_copy);
if self.state.read().await.seen.contains_key(&resp_copy.id) {
debug!("Protocol::handle_receive_response(): We have already seen this response.");
continue
}
self.state.write().await.seen.insert(resp_copy.id.clone(), Utc::now().timestamp());
if self.state.read().await.id.to_string() != resp_copy.daemon {
if let Err(e) =
self.p2p.broadcast_with_exclude(resp_copy.clone(), &exclude_list).await
{
error!("Protocol::handle_receive_response(): p2p broadcast fail: {}", e);
continue
};
}
self.notify_queue_sender.send(resp_copy).await?;
}
}
}
#[async_trait]
impl ProtocolBase for Protocol {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("Protocol::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_response(), executor.clone()).await;
debug!("Protocol::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"Protocol"
}
}

View File

@@ -0,0 +1,96 @@
use async_std::sync::{Arc, RwLock};
use fxhash::FxHashMap;
use rand::Rng;
use darkfi::{
net,
util::serial::{serialize, SerialDecodable, SerialEncodable},
Result,
};
/// Atomic pointer to DHT daemon state
pub type StatePtr = Arc<RwLock<State>>;
// TODO: add lookup table
/// Struct representing DHT daemon state.
pub struct State {
/// Daemon id
pub id: blake3::Hash,
/// Daemon hasmap, using String as key and value for simplicity
pub map: FxHashMap<String, String>,
/// Daemon seen requests/responses ids, to prevent rebroadcasting and loops
pub seen: FxHashMap<String, i64>,
}
impl State {
pub async fn new() -> Result<StatePtr> {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n));
let map = FxHashMap::default();
let seen = FxHashMap::default();
let state = Arc::new(RwLock::new(State { id, map, seen }));
Ok(state)
}
}
/// This struct represents a DHT key request
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyRequest {
/// Request id
// Using string here because we are at an external crate
// and cant use blake3 serialization
pub id: String,
/// Daemon id requesting the key
pub daemon: String,
/// Key entry
pub key: String,
}
impl KeyRequest {
pub fn new(daemon: String, key: String) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key }
}
}
impl net::Message for KeyRequest {
fn name() -> &'static str {
"keyrequest"
}
}
/// This struct represents a DHT key request response
#[derive(Debug, Clone, SerialDecodable, SerialEncodable)]
pub struct KeyResponse {
/// Response id
pub id: String,
/// Daemon id requested the key
pub daemon: String,
/// Key entry
pub key: String,
/// Key value
pub value: String,
}
impl KeyResponse {
pub fn new(daemon: String, key: String, value: String) -> Self {
// Generate a random id
let mut rng = rand::thread_rng();
let n: u16 = rng.gen();
let id = blake3::hash(&serialize(&n)).to_string();
Self { id, daemon, key, value }
}
}
impl net::Message for KeyResponse {
fn name() -> &'static str {
"keyresponse"
}
}