diff --git a/Cargo.lock b/Cargo.lock index 0ae812e5a..c87d21c77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,6 +783,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + [[package]] name = "blake2b_simd" version = "1.0.3" @@ -2680,6 +2689,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "equix" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89178c5241f5cc0c8f2b5ac5008f3c7a32caad341b1ec747a6e1e51d2e877110" +dependencies = [ + "arrayvec", + "hashx", + "num-traits", + "thiserror 2.0.12", + "visibility", +] + [[package]] name = "errno" version = "0.3.13" @@ -2823,6 +2845,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fixed-capacity-vec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b31a14f5ee08ed1a40e1252b35af18bed062e3f39b69aab34decde36bc43e40" + [[package]] name = "fixed-hash" version = "0.8.0" @@ -3036,16 +3064,20 @@ name = "fud" version = "0.5.0" dependencies = [ "async-trait", + "blake2", "blake3", "bs58", "darkfi", + "darkfi-sdk", "darkfi-serial", "easy-parallel", + "equix", "futures", "log", "num-bigint", "rand 0.8.5", "serde", + "sha2", "signal-hook", "signal-hook-async-std", "simplelog", @@ -3437,6 +3469,21 @@ dependencies = [ "hashbrown 0.15.4", ] +[[package]] +name = "hashx" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cb639748a589a17df2126f8015897ab416e81113afb82f56df5d47fa1486ab1" +dependencies = [ + "arrayvec", + "blake2", + "dynasmrt", + "fixed-capacity-vec", + "hex", + "rand_core 0.9.3", + "thiserror 2.0.12", +] + [[package]] name = "heck" version = "0.3.3" diff --git a/bin/fud/fud/Cargo.toml b/bin/fud/fud/Cargo.toml index 536c51547..24028a079 100644 --- a/bin/fud/fud/Cargo.toml +++ b/bin/fud/fud/Cargo.toml @@ -18,12 +18,16 @@ path = "src/main.rs" [dependencies] darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht", "sled-overlay"]} +darkfi-sdk = {path = "../../../src/sdk"} darkfi-serial = {version = "0.5.0", features = ["hash"]} +# Encoding +bs58 = "0.5.1" +sha2 = "0.10.9" + # Misc async-trait = "0.1.88" blake3 = "1.8.2" -bs58 = "0.5.1" rand = "0.8.5" log = "0.4.27" tinyjson = "2.5.1" @@ -38,6 +42,10 @@ signal-hook = "0.3.18" simplelog = "0.12.2" smol = "2.0.2" +# Equi-X +blake2 = "0.10.6" +equix = "0.2.5" + # Database sled-overlay = "0.1.9" diff --git a/bin/fud/fud/fud_config.toml b/bin/fud/fud/fud_config.toml index cc09036ef..a1653d4fe 100644 --- a/bin/fud/fud/fud_config.toml +++ b/bin/fud/fud/fud_config.toml @@ -12,6 +12,31 @@ base_dir = "~/.local/share/darkfi/fud" ## Chunk transfer timeout in seconds #chunk_timeout = 60 +# PoW settings (to generate a valid node id) +[pow] +## Equi-X effort value +#equix_value = 10000 + +## Number of latest BTC block hashes that are valid for fud's PoW +#btc_hash_count = 144 + +## Electrum nodes timeout in seconds +#btc_timeout = 15 + +# Electrum nodes used to fetch the latest BTC block hashes (used in fud's PoW) +btc_electrum_nodes = [ + "tcp://ax102.blockeng.ch:50001", + "tcp://fulcrum-core.1209k.com:50001", + "tcp://electrum.blockstream.info:50001", + "tcp://bitcoin.aranguren.org:50001", + "tcp://bitcoin.grey.pw:50001", + #"tor://4vrz2q62yxlfmcntnotzdjahpqh2joirp2vrcdsayyioxthffimbp2ad.onion:50001", + #"tor://k23xxwk6xipyfdqey4ylsfeetmcajjro63odwihzbmx5m6xabbwzp4yd.onion:50001", + #"tor://sysaw6aecffum4ghlbukauf6g7l3hzh3rffafmfak5bxnfowrynd56ad.onion:50001", + #"tor://udfpzbte2hommnvag5f3qlouqkhvp3xybhlus2yvfeqdwlhjroe4bbyd.onion:60001", + #"tor://lukebtcygzrosjtcklev2fhlvpbyu25saienzorhbf3vwc2fpa475qyd.onion:50001", +] + # DHT settings [dht] ## Number of nodes in a bucket diff --git a/bin/fud/fud/src/bitcoin.rs b/bin/fud/fud/src/bitcoin.rs new file mode 100644 index 000000000..e5da66625 --- /dev/null +++ b/bin/fud/fud/src/bitcoin.rs @@ -0,0 +1,203 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{ + collections::HashMap, + io::{Cursor, Read}, + sync::Arc, + time::Duration, +}; + +use log::{error, info, warn}; +use rand::{prelude::IteratorRandom, rngs::OsRng}; +use sha2::{Digest, Sha256}; +use smol::lock::RwLock; +use tinyjson::JsonValue; +use url::Url; + +use darkfi::{ + rpc::{client::RpcClient, jsonrpc::JsonRequest}, + system::{timeout::timeout, ExecutorPtr}, + Error, Result, +}; +use darkfi_sdk::{hex::decode_hex, GenericResult}; + +use crate::pow::PowSettings; + +pub type BitcoinBlockHash = [u8; 32]; + +/// A struct that can fetch and store recent Bitcoin block hashes, using Electrum nodes. +/// This is only used to evaluate and verify fud's Equi-X PoW. +/// Bitcoin block hashes are used in the challenge, to make Equi-X solution +/// expirable and unpredictable. +/// It's meant to be swapped with DarkFi block hashes once it is stable enough. +/// TODO: It should ask for new Electrum nodes, and build a local database of them +/// instead of relying only on the list defined in the settings. +pub struct BitcoinHashCache { + /// PoW settings which includes BTC/Electrum settings + settings: Arc>, + /// Current list of block hashes, the most recent block is at the end of the list + pub block_hashes: Vec, + /// Global multithreaded executor reference + ex: ExecutorPtr, +} + +impl BitcoinHashCache { + pub fn new(settings: Arc>, ex: ExecutorPtr) -> Self { + Self { settings, block_hashes: vec![], ex } + } + + /// Fetch block hashes from Electrum nodes, and update [`BitcoinHashCache::block_hashes`]. + pub async fn update(&mut self) -> Result> { + info!(target: "fud::BitcoinHashCache::update()", "[BTC] Updating block hashes..."); + + let mut block_hashes = vec![]; + let btc_electrum_nodes = self.settings.read().await.btc_electrum_nodes.clone(); + + let mut rng = OsRng; + let random_nodes: Vec<_> = + btc_electrum_nodes.iter().choose_multiple(&mut rng, btc_electrum_nodes.len()); + + for addr in random_nodes { + // Connect to the Electrum node + let client = match self.create_rpc_client(addr).await { + Ok(client) => client, + Err(e) => { + warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while creating RPC client for Electrum node {addr}: {e}"); + continue + } + }; + info!(target: "fud::BitcoinHashCache::update()", "[BTC] Connected to {addr}"); + + // Fetch the current BTC height + let current_height = match self.fetch_current_height(&client).await { + Ok(height) => height, + Err(e) => { + warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while fetching current height: {e}"); + client.stop().await; + continue + } + }; + info!(target: "fud::BitcoinHashCache::update()", "[BTC] Found current height {current_height}"); + + // Fetch the latest block hashes + match self.fetch_hashes(current_height, &client).await { + Ok(hashes) => { + client.stop().await; + if !hashes.is_empty() { + block_hashes = hashes; + break + } + warn!(target: "fud::BitcoinHashCache::update()", "[BTC] The Electrum node replied with an empty list of block headers"); + continue + } + Err(e) => { + warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while fetching block hashes: {e}"); + client.stop().await; + continue + } + }; + } + + if block_hashes.is_empty() { + let err_str = "Could not find any block hash"; + error!(target: "fud::BitcoinHashCache::update()", "[BTC] {err_str}"); + return Err(Error::Custom(err_str.to_string())) + } + + info!(target: "fud::BitcoinHashCache::update()", "[BTC] Found {} block hashes", block_hashes.len()); + + self.block_hashes = block_hashes.clone(); + Ok(block_hashes) + } + + async fn create_rpc_client(&self, addr: &Url) -> Result { + let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout); + let client = timeout(btc_timeout, RpcClient::new(addr.clone(), self.ex.clone())).await??; + Ok(client) + } + + /// Fetch the current BTC height using an Electrum node RPC. + async fn fetch_current_height(&self, client: &RpcClient) -> Result { + let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout); + let req = JsonRequest::new("blockchain.headers.subscribe", vec![].into()); + let rep = timeout(btc_timeout, client.request(req)).await??; + + rep.get::>() + .and_then(|res| res.get("height")) + .and_then(|h| h.get::()) + .map(|h| *h as u64) + .ok_or_else(|| { + Error::JsonParseError( + "Failed to parse `blockchain.headers.subscribe` response".into(), + ) + }) + } + + /// Fetch `self.count` BTC block hashes from `height` using an Electrum node RPC. + async fn fetch_hashes(&self, height: u64, client: &RpcClient) -> Result> { + let count = self.settings.read().await.btc_hash_count; + let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout); + let req = JsonRequest::new( + "blockchain.block.headers", + vec![ + JsonValue::Number((height as f64) - (count as f64)), + JsonValue::Number(count as f64), + ] + .into(), + ); + let rep = timeout(btc_timeout, client.request(req)).await??; + + let hex: &String = rep + .get::>() + .and_then(|res| res.get("hex")) + .and_then(|h| h.get::()) + .ok_or_else(|| { + Error::JsonParseError("Failed to parse `blockchain.block.headers` response".into()) + })?; + + let decoded_bytes = decode_hex(hex.as_str()).collect::>>()?; + Self::decode_block_hashes(decoded_bytes) + } + + /// Convert concatenated BTC block headers to a list of block hashes. + fn decode_block_hashes(data: Vec) -> Result> { + let mut cursor = Cursor::new(&data); + let count = data.len() / 80; + + let mut hashes = Vec::with_capacity(count); + for _ in 0..count { + // Read the 80-byte header + let mut header = [0u8; 80]; + cursor.read_exact(&mut header)?; + + // Compute double SHA-256 + let first_hash = Sha256::digest(header); + let second_hash = Sha256::digest(first_hash); + + // Convert to big-endian hash + let mut be_hash = [0u8; 32]; + be_hash.copy_from_slice(&second_hash); + be_hash.reverse(); + + hashes.push(be_hash); + } + + Ok(hashes) + } +} diff --git a/bin/fud/fud/src/equix.rs b/bin/fud/fud/src/equix.rs new file mode 100644 index 000000000..11bffa6eb --- /dev/null +++ b/bin/fud/fud/src/equix.rs @@ -0,0 +1,151 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//! Proof-of-Work using Equi-X +//! +//! +//! +//! + +use std::convert::AsRef; + +use blake2::{digest::consts::U4, Blake2b, Digest}; +pub use equix::{EquiXBuilder, HashError, RuntimeOption, Solution, SolverMemory}; + +use darkfi::{Error, Result}; + +/// Algorithm personalization string +const P_STRING: &[u8] = b"DarkFi Equi-X\0"; + +/// Length of the personalization string, in bytes +const P_STRING_LEN: usize = 14; + +/// Length of the nonce value generated by clients and included in the solution +pub const NONCE_LEN: usize = 16; + +/// A challenge string +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Challenge(Vec); + +impl Challenge { + /// Build a new [`Challenge`]. + /// + /// Copies `input` and `nonce` values into + /// a new byte vector. + pub fn new(input: &[u8], nonce: &[u8; NONCE_LEN]) -> Self { + let mut result = Vec::::new(); + result.extend_from_slice(P_STRING); + result.extend_from_slice(input.as_ref()); + result.extend_from_slice(nonce.as_ref()); + + Self(result) + } + + pub fn to_bytes(&self) -> Vec { + self.0.clone() + } + + /// Clone the input portion of this challenge. + pub fn input(&self) -> Vec { + self.0[P_STRING_LEN..(self.0.len() - NONCE_LEN)].into() + } + + /// Clone the nonce portion of this challenge. + pub fn nonce(&self) -> [u8; NONCE_LEN] { + self.0[(self.0.len() - NONCE_LEN)..].try_into().expect("slice length correct") + } + + /// Increment the nonce value inside this challenge. + pub fn increment_nonce(&mut self) { + fn inc_le_bytes(slice: &mut [u8]) { + for byte in slice { + let (value, overflow) = (*byte).overflowing_add(1); + *byte = value; + if !overflow { + break; + } + } + } + let len = self.0.len(); + inc_le_bytes(&mut self.0[(len - NONCE_LEN)..]); + } + + /// Verify that a solution proof passes the effort test. + pub fn check_effort(&self, proof: &equix::SolutionByteArray, effort: u32) -> bool { + let mut hasher = Blake2b::::new(); + hasher.update(self.as_ref()); + hasher.update(proof.as_ref()); + let value = u32::from_be_bytes(hasher.finalize().into()); + value.checked_mul(effort).is_some() + } +} + +impl AsRef<[u8]> for Challenge { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +pub struct EquiXPow { + /// Target effort + pub effort: u32, + /// The next [`Challenge`] to try + pub challenge: Challenge, + /// Configuration settings for Equi-X + pub equix: EquiXBuilder, + /// Temporary memory for Equi-X to use + pub mem: SolverMemory, +} + +impl EquiXPow { + pub fn run(&mut self) -> Result { + loop { + if let Some(solution) = self.run_step()? { + return Ok(solution); + } + } + } + + pub fn run_step(&mut self) -> Result> { + match self.equix.build(self.challenge.as_ref()) { + Ok(equix) => { + for candidate in equix.solve_with_memory(&mut self.mem) { + if self.challenge.check_effort(&candidate.to_bytes(), self.effort) { + return Ok(Some(candidate)) + } + } + } + Err(equix::Error::Hash(HashError::ProgramConstraints)) => (), + Err(e) => { + return Err(Error::Custom(e.to_string())); + } + }; + self.challenge.increment_nonce(); + Ok(None) + } + + pub fn verify(&self, challenge: &Challenge, solution: &Solution) -> Result<()> { + if challenge.check_effort(&solution.to_bytes(), self.effort) { + return self + .equix + .verify(challenge.as_ref(), solution) + .map_err(|e| Error::Custom(e.to_string())) + } + Err(Error::Custom("Equi-X solution has insufficient effort".into())) + } +} diff --git a/bin/fud/fud/src/lib.rs b/bin/fud/fud/src/lib.rs index ea2727324..169f82c70 100644 --- a/bin/fud/fud/src/lib.rs +++ b/bin/fud/fud/src/lib.rs @@ -16,18 +16,6 @@ * along with this program. If not, see . */ -use async_trait::async_trait; -use futures::{future::FutureExt, pin_mut, select}; -use log::{debug, error, info, warn}; -use num_bigint::BigUint; -use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore}; -use sled_overlay::sled; -use smol::{ - channel, - fs::{self, File, OpenOptions}, - io::{AsyncReadExt, AsyncWriteExt}, - lock::RwLock, -}; use std::{ collections::{HashMap, HashSet}, io::ErrorKind, @@ -36,15 +24,31 @@ use std::{ time::Instant, }; +use async_trait::async_trait; +use futures::{future::FutureExt, pin_mut, select}; +use log::{debug, error, info, warn}; +use num_bigint::BigUint; +use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, Rng}; +use sled_overlay::sled; +use smol::{ + channel, + fs::{self, File, OpenOptions}, + lock::RwLock, +}; +use url::Url; + use darkfi::{ - dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr}, + dht::{ + impl_dht_node_defaults, Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr, DhtSettings, + }, geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE}, net::{ChannelPtr, P2pPtr}, - system::{PublisherPtr, StoppableTask}, + system::{ExecutorPtr, PublisherPtr, StoppableTask}, util::path::expand_path, Error, Result, }; -use darkfi_serial::{deserialize_async, serialize_async}; +use darkfi_sdk::crypto::{schnorr::SchnorrPublic, SecretKey}; +use darkfi_serial::{deserialize_async, serialize_async, SerialDecodable, SerialEncodable}; /// P2P protocols pub mod proto; @@ -73,6 +77,20 @@ pub mod rpc; pub mod tasks; use tasks::FetchReply; +/// Bitcoin +pub mod bitcoin; + +/// PoW +pub mod pow; +use pow::{FudPow, VerifiableNodeData}; + +/// Equi-X +pub mod equix; + +/// Settings and args +pub mod settings; +use settings::Args; + /// Utils pub mod util; use util::{get_all_files, FileSelection}; @@ -81,41 +99,31 @@ const SLED_PATH_TREE: &[u8] = b"_fud_paths"; const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections"; const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps"; -// TODO: This is not Sybil-resistant -fn generate_node_id() -> Result { - let mut rng = OsRng; - let mut random_data = [0u8; 32]; - rng.fill_bytes(&mut random_data); - let node_id = blake3::Hash::from_bytes(random_data); - Ok(node_id) +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudNode { + data: VerifiableNodeData, + addresses: Vec, } +impl_dht_node_defaults!(FudNode); -/// Get or generate the node id. -/// Fetches and saves the node id from/to a file. -pub async fn get_node_id(node_id_path: &Path) -> Result { - match File::open(node_id_path).await { - Ok(mut file) => { - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).await?; - let mut out_buf = [0u8; 32]; - bs58::decode(buffer).onto(&mut out_buf)?; - let node_id = blake3::Hash::from_bytes(out_buf); - Ok(node_id) - } - Err(e) if e.kind() == ErrorKind::NotFound => { - let node_id = generate_node_id()?; - let mut file = OpenOptions::new().write(true).create(true).open(node_id_path).await?; - file.write_all(&bs58::encode(node_id.as_bytes()).into_vec()).await?; - file.flush().await?; - Ok(node_id) - } - Err(e) => Err(e.into()), +impl DhtNode for FudNode { + fn id(&self) -> blake3::Hash { + self.data.id() + } + fn addresses(&self) -> Vec { + self.addresses.clone() } } pub struct Fud { + /// Our own [`VerifiableNodeData`] + pub node_data: Arc>, + + /// Our secret key (the public key is in `node_data`) + pub secret_key: Arc>, + /// Key -> Seeders - seeders_router: DhtRouterPtr, + pub seeders_router: DhtRouterPtr, /// Pointer to the P2P network instance p2p: P2pPtr, @@ -129,8 +137,11 @@ pub struct Fud { /// Chunk transfer timeout in seconds chunk_timeout: u64, + /// The [`FudPow`] instance + pub pow: Arc>, + /// The DHT instance - dht: Arc, + dht: Arc>, /// Resources (current status of all downloads/seeds) resources: Arc>>, @@ -152,7 +163,7 @@ pub struct Fud { get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>, get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>, - /// Currently active downloading tasks (running the `fud.fetch_resource()` method) + /// Currently active includingdownloading tasks (running the `fud.fetch_resource()` method) fetch_tasks: Arc>>>, /// Used to send events to fud clients @@ -160,45 +171,71 @@ pub struct Fud { } #[async_trait] -impl DhtHandler for Fud { - fn dht(&self) -> Arc { +impl DhtHandler for Fud { + fn dht(&self) -> Arc> { self.dht.clone() } - async fn ping(&self, channel: ChannelPtr) -> Result { + async fn node(&self) -> FudNode { + FudNode { + data: self.node_data.read().await.clone(), + addresses: self + .p2p + .clone() + .hosts() + .external_addrs() + .await + .iter() + .filter(|addr| !addr.to_string().contains("[::]")) + .cloned() + .collect(), + } + } + + async fn ping(&self, channel: ChannelPtr) -> Result { debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id); let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; let msg_subscriber = channel.subscribe_msg::().await.unwrap(); - let request = FudPingRequest {}; + // Send `FudPingRequest` + let mut rng = OsRng; + let request = FudPingRequest { random: rng.gen() }; channel.send(&request).await?; + // Wait for `FudPingReply` let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await?; - msg_subscriber.unsubscribe().await; + // Verify the signature + if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) { + channel.ban().await; + return Err(Error::InvalidSignature) + } + + // Verify PoW + if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await { + channel.ban().await; + return Err(e) + } + Ok(reply.node.clone()) } // TODO: Optimize this - async fn on_new_node(&self, node: &DhtNode) -> Result<()> { - debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id)); + async fn on_new_node(&self, node: &FudNode) -> Result<()> { + debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id())); - // If this is the first node we know about, then bootstrap + // If this is the first node we know about, then bootstrap and announce our files if !self.dht().is_bootstrapped().await { - self.dht().set_bootstrapped().await; - - // Lookup our own node id - debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self.dht().node_id)); - let _ = self.lookup_nodes(&self.dht().node_id).await; + let _ = self.init().await; } // Send keys that are closer to this node than we are - let self_id = self.dht().node_id; + let self_id = self.node_data.read().await.id(); let channel = self.get_channel(node, None).await?; for (key, seeders) in self.seeders_router.read().await.iter() { - let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id)); + let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id())); let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); if node_distance <= self_distance { let _ = channel @@ -214,8 +251,8 @@ impl DhtHandler for Fud { Ok(()) } - async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result> { - debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id)); + async fn fetch_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result> { + debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id())); let channel = self.get_channel(node, None).await?; let msg_subsystem = channel.message_subsystem(); @@ -236,30 +273,43 @@ impl DhtHandler for Fud { impl Fud { pub async fn new( + settings: Args, p2p: P2pPtr, - basedir: PathBuf, - downloads_path: PathBuf, - chunk_timeout: u64, - dht: Arc, sled_db: &sled::Db, event_publisher: PublisherPtr, + executor: ExecutorPtr, ) -> Result { - let (get_tx, get_rx) = smol::channel::unbounded(); + let basedir = expand_path(&settings.base_dir)?; + let downloads_path = match settings.downloads_path { + Some(downloads_path) => expand_path(&downloads_path)?, + None => basedir.join("downloads"), + }; - // Hashmap used for routing - let seeders_router = Arc::new(RwLock::new(HashMap::new())); + // Run the PoW and generate a `VerifiableNodeData` + let mut pow = FudPow::new(settings.pow.into(), executor.clone()); + pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes + let (node_data, secret_key) = pow.generate_node().await?; + info!(target: "fud", "Your node ID: {}", hash_to_string(&node_data.id())); + // Geode info!("Instantiating Geode instance"); let geode = Geode::new(&basedir).await?; - info!("Instantiating DHT instance"); + // DHT + let dht_settings: DhtSettings = settings.dht.into(); + let dht: Arc> = + Arc::new(Dht::::new(&dht_settings, p2p.clone(), executor.clone()).await); + let (get_tx, get_rx) = smol::channel::unbounded(); let fud = Self { - seeders_router, + node_data: Arc::new(RwLock::new(node_data)), + secret_key: Arc::new(RwLock::new(secret_key)), + seeders_router: Arc::new(RwLock::new(HashMap::new())), p2p, geode, downloads_path, - chunk_timeout, + chunk_timeout: settings.chunk_timeout, + pow: Arc::new(RwLock::new(pow)), dht, path_tree: sled_db.open_tree(SLED_PATH_TREE)?, file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?, @@ -271,14 +321,15 @@ impl Fud { event_publisher, }; - fud.init().await?; - Ok(fud) } - /// Add ourselves to `seeders_router` for the files we already have. - /// Skipped if we have no external address. + /// Bootstrap the DHT, verify our resources, add ourselves to + /// `seeders_router` for the resources we already have, announce our files. async fn init(&self) -> Result<()> { + info!(target: "fud::init()", "Bootstrapping the DHT..."); + self.bootstrap().await; + info!(target: "fud::init()", "Finding resources..."); let mut resources_write = self.resources.write().await; for result in self.path_tree.iter() { @@ -339,16 +390,16 @@ impl Fud { info!(target: "fud::init()", "Verifying resources..."); let resources = self.verify_resources(None).await?; - let self_node = self.dht().node().await; + let self_node = self.node().await; + // Stop here if we have no external address if self_node.addresses.is_empty() { return Ok(()); } - info!(target: "fud::init()", "Start seeding..."); - let self_router_items: Vec = vec![self_node.into()]; - - for resource in resources { + // Add our own node as a seeder for the resources we are seeding + let self_router_items: Vec> = vec![self_node.into()]; + for resource in &resources { self.add_to_router( self.seeders_router.clone(), &resource.hash, @@ -357,6 +408,18 @@ impl Fud { .await; } + info!(target: "fud::init()", "Announcing resources..."); + let seeders = vec![self.node().await.into()]; + for resource in resources { + let _ = self + .announce( + &resource.hash, + &FudAnnounce { key: resource.hash, seeders: seeders.clone() }, + self.seeders_router.clone(), + ) + .await; + } + Ok(()) } @@ -530,17 +593,17 @@ impl Fud { /// Query `nodes` to find the seeders for `key` async fn fetch_seeders( &self, - nodes: &Vec, + nodes: &Vec, key: &blake3::Hash, - ) -> HashSet { - let self_node = self.dht().node().await; - let mut seeders: HashSet = HashSet::new(); + ) -> HashSet> { + let self_node = self.node().await; + let mut seeders: HashSet> = HashSet::new(); for node in nodes { let channel = match self.get_channel(node, None).await { Ok(channel) => channel, Err(e) => { - warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id)); + warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id())); continue; } }; @@ -581,7 +644,8 @@ impl Fud { seeders.extend(reply.seeders.clone()); } - seeders = seeders.iter().filter(|seeder| seeder.node.id != self_node.id).cloned().collect(); + seeders = + seeders.iter().filter(|seeder| seeder.node.id() != self_node.id()).cloned().collect(); info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(key)); seeders @@ -592,7 +656,7 @@ impl Fud { &self, hash: &blake3::Hash, chunked: &mut ChunkedStorage, - seeders: &HashSet, + seeders: &HashSet>, chunks: &HashSet, ) -> Result<()> { let mut remaining_chunks = chunks.clone(); @@ -606,12 +670,12 @@ impl Fud { let channel = match self.get_channel(&seeder.node, Some(*hash)).await { Ok(channel) => channel, Err(e) => { - warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id)); + warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id())); continue; } }; let mut chunks_to_query = remaining_chunks.clone(); - info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id)); + info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id())); loop { let start_time = Instant::now(); let msg_subsystem = channel.message_subsystem(); @@ -665,7 +729,7 @@ impl Fud { continue; // Skip to next chunk, will retry this chunk later } - info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); // If we did not write the whole chunk to the filesystem, // save the chunk in the scraps. @@ -726,7 +790,7 @@ impl Fud { msg_subscriber_notfound.unsubscribe().await; break; // Switch to another seeder } - info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); notify_event!(self, ChunkNotFound, { hash: *hash, chunk_hash }); } }; @@ -755,7 +819,7 @@ impl Fud { pub async fn fetch_metadata( &self, hash: &blake3::Hash, - nodes: &Vec, + nodes: &Vec, path: &Path, ) -> Result<()> { let mut queried_seeders: HashSet = HashSet::new(); @@ -766,7 +830,7 @@ impl Fud { let channel = match self.get_channel(node, Some(*hash)).await { Ok(channel) => channel, Err(e) => { - warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id)); + warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id())); continue; } }; @@ -801,7 +865,7 @@ impl Fud { }; let mut seeders = reply.seeders.clone(); - info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id)); + info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id())); msg_subscriber.unsubscribe().await; self.cleanup_channel(channel).await; @@ -809,10 +873,10 @@ impl Fud { // 2. Request the file/chunk from the seeders while let Some(seeder) = seeders.pop() { // Only query a seeder once - if queried_seeders.iter().any(|s| *s == seeder.node.id) { + if queried_seeders.iter().any(|s| *s == seeder.node.id()) { continue; } - queried_seeders.insert(seeder.node.id); + queried_seeders.insert(seeder.node.id()); if let Ok(channel) = self.get_channel(&seeder.node, Some(*hash)).await { let msg_subsystem = channel.message_subsystem(); @@ -876,7 +940,7 @@ impl Fud { warn!(target: "fud::fetch_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash"); continue; } - info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id())); result = Some(FetchReply::Chunk((*reply).clone())); break; } @@ -891,7 +955,7 @@ impl Fud { warn!(target: "fud::fetch_metadata()", "Received invalid file metadata"); continue; } - info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); result = Some(FetchReply::File((*reply).clone())); break; } @@ -912,7 +976,7 @@ impl Fud { warn!(target: "fud::fetch_metadata()", "Received invalid directory metadata"); continue; } - info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); result = Some(FetchReply::Directory((*reply).clone())); break; } @@ -922,7 +986,7 @@ impl Fud { warn!(target: "fud::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}"); continue; } - info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id)); + info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); } }; } @@ -1005,7 +1069,7 @@ impl Fud { &self, hash: &blake3::Hash, path: &Path, - ) -> Result<(ChunkedStorage, Vec)> { + ) -> Result<(ChunkedStorage, Vec)> { match self.geode.get(hash, path).await { // We already know the metadata Ok(v) => Ok((v, vec![])), @@ -1041,7 +1105,7 @@ impl Fud { path: &Path, files: &FileSelection, ) -> Result<()> { - let self_node = self.dht().node().await; + let self_node = self.node().await; let hash_bytes = hash.as_bytes(); let path_string = path.to_string_lossy().to_string(); @@ -1456,7 +1520,7 @@ impl Fud { /// Add a resource from the file system. pub async fn put(&self, path: &PathBuf) -> Result { - let self_node = self.dht.node().await; + let self_node = self.node().await; if self_node.addresses.is_empty() { return Err(Error::Custom( diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 43263062f..294af96e6 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -20,96 +20,46 @@ use log::{debug, error, info, warn}; use sled_overlay::sled; use smol::{stream::StreamExt, Executor}; use std::sync::Arc; -use structopt_toml::{structopt::StructOpt, StructOptToml}; +use structopt_toml::StructOptToml; use darkfi::{ - async_daemonize, cli_desc, - dht::{Dht, DhtHandler, DhtSettings, DhtSettingsOpt}, - geode::hash_to_string, - net::{session::SESSION_DEFAULT, settings::SettingsOpt, P2p, Settings as NetSettings}, + async_daemonize, + dht::DhtHandler, + net::{session::SESSION_DEFAULT, P2p, Settings as NetSettings}, rpc::{ jsonrpc::JsonSubscriber, server::{listen_and_serve, RequestHandler}, - settings::{RpcSettings, RpcSettingsOpt}, + settings::RpcSettings, }, system::{Publisher, StoppableTask}, util::path::expand_path, Error, Result, }; - use fud::{ - get_node_id, proto::{FudFindNodesReply, ProtocolFud}, rpc::JsonRpcInterface, - tasks::{announce_seed_task, get_task}, + settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, + tasks::{announce_seed_task, get_task, node_id_task}, Fud, }; -const CONFIG_FILE: &str = "fud_config.toml"; -const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml"); -const NODE_ID_PATH: &str = "node_id"; - -#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)] -#[serde(default)] -#[structopt(name = "fud", about = cli_desc!())] -struct Args { - #[structopt(short, parse(from_occurrences))] - /// Increase verbosity (-vvv supported) - verbose: u8, - - #[structopt(short, long)] - /// Configuration file to use - config: Option, - - #[structopt(long)] - /// Set log file path to output daemon logs into - log: Option, - - #[structopt(long, default_value = "~/.local/share/darkfi/fud")] - /// Base directory for filesystem storage - base_dir: String, - - #[structopt(short, long)] - /// Default path to store downloaded files (defaults to /downloads) - downloads_path: Option, - - #[structopt(long, default_value = "60")] - /// Chunk transfer timeout in seconds - chunk_timeout: u64, - - #[structopt(flatten)] - /// Network settings - net: SettingsOpt, - - #[structopt(flatten)] - /// JSON-RPC settings - rpc: RpcSettingsOpt, - - #[structopt(flatten)] - /// DHT settings - dht: DhtSettingsOpt, -} - async_daemonize!(realmain); async fn realmain(args: Args, ex: Arc>) -> Result<()> { // The working directory for this daemon and geode. let basedir = expand_path(&args.base_dir)?; - // The directory to store the downloaded files - let downloads_path = match args.downloads_path { - Some(downloads_path) => expand_path(&downloads_path)?, - None => basedir.join("downloads"), - }; + // Cloned args + let args_ = args.clone(); // Sled database init - info!("Instantiating database"); + info!(target: "fud", "Instantiating database"); let sled_db = sled::open(basedir.join("db"))?; - info!("Instantiating P2P network"); + info!(target: "fud", "Instantiating P2P network"); let net_settings: NetSettings = args.net.into(); let p2p = P2p::new(net_settings.clone(), ex.clone()).await?; - info!("Starting dnet subs task"); + info!(target: "fud", "Starting dnet subs task"); let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); let dnet_sub_ = dnet_sub.clone(); let p2p_ = p2p.clone(); @@ -133,28 +83,11 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); - let mut node_id_path = basedir.to_path_buf(); - node_id_path.push(NODE_ID_PATH); - let node_id = get_node_id(&node_id_path).await?; - - info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id)); - // Daemon instantiation let event_pub = Publisher::new(); - let dht_settings: DhtSettings = args.dht.into(); - let dht: Arc = Arc::new(Dht::new(&node_id, &dht_settings, p2p.clone(), ex.clone()).await); - let fud: Arc = Arc::new( - Fud::new( - p2p.clone(), - basedir, - downloads_path, - args.chunk_timeout, - dht.clone(), - &sled_db, - event_pub.clone(), - ) - .await?, - ); + + let fud: Arc = + Arc::new(Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?); info!(target: "fud", "Starting download subs task"); let event_sub = JsonSubscriber::new("event"); @@ -210,7 +143,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); - info!("Starting P2P protocols"); + info!(target: "fud", "Starting P2P protocols"); let registry = p2p.protocol_registry(); let fud_ = fud.clone(); registry @@ -256,6 +189,21 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); + info!(target: "fud", "Starting node ID task"); + let node_task = StoppableTask::new(); + let fud_ = fud.clone(); + node_task.clone().start( + async move { node_id_task(fud_.clone()).await }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "fud", "Failed starting node ID task: {e}"), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?; @@ -273,10 +221,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "fud", "Stopping P2P network..."); p2p.stop().await; - info!(target: "fud", "Stopping DHT tasks"); + info!(target: "fud", "Stopping DHT tasks..."); dht_channel_task.stop().await; announce_task.stop().await; + info!(target: "fud", "Stopping node ID task..."); + node_task.stop().await; + info!(target: "fud", "Flushing sled database..."); let flushed_bytes = sled_db.flush_async().await?; info!(target: "fud", "Flushed {flushed_bytes} bytes"); diff --git a/bin/fud/fud/src/pow.rs b/bin/fud/fud/src/pow.rs new file mode 100644 index 000000000..cadf223e9 --- /dev/null +++ b/bin/fud/fud/src/pow.rs @@ -0,0 +1,265 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{ + io::{Error as IoError, Read, Result as IoResult, Write}, + sync::Arc, +}; + +use log::info; +use rand::rngs::OsRng; +use smol::lock::RwLock; +use structopt::StructOpt; +use url::Url; + +use darkfi::{system::ExecutorPtr, Error, Result}; +use darkfi_sdk::crypto::{Keypair, PublicKey, SecretKey}; +use darkfi_serial::{ + async_trait, AsyncDecodable, AsyncEncodable, AsyncRead, AsyncWrite, Decodable, Encodable, +}; + +use crate::{ + bitcoin::{BitcoinBlockHash, BitcoinHashCache}, + equix::{Challenge, EquiXBuilder, EquiXPow, Solution, SolverMemory, NONCE_LEN}, +}; + +#[derive(Clone, Debug)] +pub struct PowSettings { + /// Equi-X effort value + pub equix_effort: u32, + /// Number of latest BTC block hashes that are valid for fud's PoW + pub btc_hash_count: usize, + /// Electrum nodes timeout in seconds + pub btc_timeout: u64, + /// Electrum nodes used to fetch the latest block hashes (used in fud's PoW) + pub btc_electrum_nodes: Vec, +} + +impl Default for PowSettings { + fn default() -> Self { + Self { + equix_effort: 10000, + btc_hash_count: 144, + btc_timeout: 15, + btc_electrum_nodes: vec![], + } + } +} + +#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)] +#[structopt()] +#[serde(rename = "pow")] +pub struct PowSettingsOpt { + /// Equi-X effort value + #[structopt(long)] + pub equix_effort: Option, + + /// Number of latest BTC block hashes that are valid for fud's PoW + #[structopt(long)] + pub btc_hash_count: Option, + + /// Electrum nodes timeout in seconds + #[structopt(long)] + pub btc_timeout: Option, + + /// Electrum nodes used to fetch the latest block hashes (used in fud's PoW) + #[structopt(long, use_delimiter = true)] + pub btc_electrum_nodes: Vec, +} + +impl From for PowSettings { + fn from(opt: PowSettingsOpt) -> Self { + let def = PowSettings::default(); + + Self { + equix_effort: opt.equix_effort.unwrap_or(def.equix_effort), + btc_hash_count: opt.btc_hash_count.unwrap_or(def.btc_hash_count), + btc_timeout: opt.btc_timeout.unwrap_or(def.btc_timeout), + btc_electrum_nodes: opt.btc_electrum_nodes, + } + } +} + +/// Struct handling a [`EquiXPow`] instance to generate and verify [`VerifiableNodeData`]. +pub struct FudPow { + pub settings: Arc>, + pub bitcoin_hash_cache: BitcoinHashCache, + equix_pow: EquiXPow, +} +impl FudPow { + pub fn new(settings: PowSettings, ex: ExecutorPtr) -> Self { + let pow_settings: Arc> = Arc::new(RwLock::new(settings)); + let bitcoin_hash_cache = BitcoinHashCache::new(pow_settings.clone(), ex.clone()); + + Self { + settings: pow_settings, + bitcoin_hash_cache, + equix_pow: EquiXPow { + effort: 0, // will be set when we call `generate_node()` + challenge: Challenge::new(&[], &[0u8; NONCE_LEN]), + equix: EquiXBuilder::default(), + mem: SolverMemory::default(), + }, + } + } + + /// Generate a random keypair and run the PoW to get a [`VerifiableNodeData`]. + pub async fn generate_node(&mut self) -> Result<(VerifiableNodeData, SecretKey)> { + info!(target: "fud::FudPow::generate_node()", "Generating a new node id..."); + + // Generate a random keypair + let keypair = Keypair::random(&mut OsRng); + + // Get a recent Bitcoin block hash + let n = 3; + let btc_block_hash = { + let block_hashes = &self.bitcoin_hash_cache.block_hashes; + if block_hashes.is_empty() { + return Err(Error::Custom( + "Can't generate a node id without BTC block hashes".into(), + )); + } + + let block_hash = if n > block_hashes.len() { + block_hashes.last() + } else { + block_hashes.get(block_hashes.len() - 1 - n) + }; + + if block_hash.is_none() { + return Err(Error::Custom("Could not find a recent BTC block hash".into())); + } + *block_hash.unwrap() + }; + + // Update the effort using the value from `self.settings` + self.equix_pow.effort = self.settings.read().await.equix_effort; + + // Construct Equi-X challenge + self.equix_pow.challenge = Challenge::new( + &[keypair.public.to_bytes(), btc_block_hash].concat(), + &[0u8; NONCE_LEN], + ); + + // Evaluate PoW + info!(target: "fud::FudPow::generate_node()", "Equi-X Proof-of-Work starts..."); + let solution = + self.equix_pow.run().map_err(|e| Error::Custom(format!("Equi-X error: {e}")))?; + info!(target: "fud::FudPow::generate_node()", "Equi-X Proof-of-Work is done"); + + // Create the VerifiableNodeData + Ok(( + VerifiableNodeData { + public_key: keypair.public, + btc_block_hash, + nonce: self.equix_pow.challenge.nonce(), + solution, + }, + keypair.secret, + )) + } + + /// Check if the Equi-X solution in a [`VerifiableNodeData`] is valid and has enough effort. + pub async fn verify_node(&mut self, node_data: &VerifiableNodeData) -> Result<()> { + // Update the effort using the value from `self.settings` + self.equix_pow.effort = self.settings.read().await.equix_effort; + + // Verify if the Bitcoin block hash is known + if !self.bitcoin_hash_cache.block_hashes.contains(&node_data.btc_block_hash) { + return Err(Error::Custom( + "Error verifying node data: the BTC block hash is unknown".into(), + )) + } + + // Verify the solution + self.equix_pow + .verify(&node_data.challenge(), &node_data.solution) + .map_err(|e| Error::Custom(format!("Error verifying Equi-X solution: {e}"))) + } +} + +/// The data needed to verify a fud PoW. +#[derive(Debug, Clone)] +pub struct VerifiableNodeData { + pub public_key: PublicKey, + pub btc_block_hash: BitcoinBlockHash, + pub nonce: [u8; NONCE_LEN], + pub solution: Solution, +} + +impl VerifiableNodeData { + /// The node id on the DHT. + pub fn id(&self) -> blake3::Hash { + blake3::hash(&[self.challenge().to_bytes(), self.solution.to_bytes().to_vec()].concat()) + } + + /// The Equi-X challenge. + pub fn challenge(&self) -> Challenge { + Challenge::new(&[self.public_key.to_bytes(), self.btc_block_hash].concat(), &self.nonce) + } +} + +impl Encodable for VerifiableNodeData { + fn encode(&self, s: &mut S) -> IoResult { + let mut len = 0; + len += self.public_key.encode(s)?; + len += self.btc_block_hash.encode(s)?; + len += self.nonce.encode(s)?; + len += self.solution.to_bytes().encode(s)?; + Ok(len) + } +} + +#[async_trait] +impl AsyncEncodable for VerifiableNodeData { + async fn encode_async(&self, s: &mut S) -> IoResult { + let mut len = 0; + len += self.public_key.encode_async(s).await?; + len += self.btc_block_hash.encode_async(s).await?; + len += self.nonce.encode_async(s).await?; + len += self.solution.to_bytes().encode_async(s).await?; + Ok(len) + } +} + +impl Decodable for VerifiableNodeData { + fn decode(d: &mut D) -> IoResult { + Ok(Self { + public_key: PublicKey::decode(d)?, + btc_block_hash: BitcoinBlockHash::decode(d)?, + nonce: <[u8; NONCE_LEN]>::decode(d)?, + solution: Solution::try_from_bytes(&<[u8; Solution::NUM_BYTES]>::decode(d)?) + .map_err(|e| IoError::other(format!("Error parsing Equi-X solution: {e}")))?, + }) + } +} + +#[async_trait] +impl AsyncDecodable for VerifiableNodeData { + async fn decode_async(d: &mut D) -> IoResult { + Ok(Self { + public_key: PublicKey::decode_async(d).await?, + btc_block_hash: BitcoinBlockHash::decode_async(d).await?, + nonce: <[u8; NONCE_LEN]>::decode_async(d).await?, + solution: Solution::try_from_bytes( + &<[u8; Solution::NUM_BYTES]>::decode_async(d).await?, + ) + .map_err(|e| IoError::other(format!("Error parsing Equi-X solution: {e}")))?, + }) + } +} diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 4d32ac2e5..b4841dd8e 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -22,7 +22,7 @@ use smol::Executor; use std::{path::StripPrefixError, sync::Arc}; use darkfi::{ - dht::{DhtHandler, DhtNode, DhtRouterItem}, + dht::{DhtHandler, DhtRouterItem}, geode::hash_to_string, impl_p2p_message, net::{ @@ -32,9 +32,10 @@ use darkfi::{ }, Error, Result, }; +use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature}; use darkfi_serial::{SerialDecodable, SerialEncodable}; -use super::Fud; +use super::{Fud, FudNode}; /// Message representing a file reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -55,7 +56,7 @@ impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudAnnounce { pub key: blake3::Hash, - pub seeders: Vec, + pub seeders: Vec>, } impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -74,13 +75,17 @@ impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATI /// Message representing a ping request on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudPingRequest; +pub struct FudPingRequest { + pub random: u64, +} impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a ping reply on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudPingReply { - pub node: DhtNode, + pub node: FudNode, + /// Signature of the random u64 from the ping request + pub sig: Signature, } impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -102,7 +107,7 @@ impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METE /// Message representing a find nodes reply on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFindNodesReply { - pub nodes: Vec, + pub nodes: Vec, } impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -122,7 +127,7 @@ impl_p2p_message!( /// Message representing a find seeders reply on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFindSeedersReply { - pub seeders: Vec, + pub seeders: Vec>, } impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -174,7 +179,7 @@ impl ProtocolFud { debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START"); loop { - let _ = match self.ping_request_sub.receive().await { + let ping_req = match self.ping_request_sub.receive().await { Ok(v) => v, Err(Error::ChannelStopped) => continue, Err(e) => { @@ -182,9 +187,12 @@ impl ProtocolFud { continue } }; - info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING"); + info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST"); - let reply = FudPingReply { node: self.fud.dht.node().await }; + let reply = FudPingReply { + node: self.fud.node().await, + sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()), + }; match self.channel.send(&reply).await { Ok(()) => continue, Err(_e) => continue, @@ -253,7 +261,7 @@ impl ProtocolFud { return false; } let reply = FudChunkReply { chunk }; - info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk {}", hash_to_string(&request.key)); + info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key)); let _ = self.channel.send(&reply).await; return true; } diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs index 4b98734a1..49d28cb74 100644 --- a/bin/fud/fud/src/rpc.rs +++ b/bin/fud/fud/src/rpc.rs @@ -27,6 +27,7 @@ use std::{ use tinyjson::JsonValue; use darkfi::{ + dht::DhtNode, geode::hash_to_string, net::P2pPtr, rpc::{ @@ -265,11 +266,11 @@ impl JsonRpcInterface { let mut nodes = vec![]; for node in bucket.nodes.clone() { let mut addresses = vec![]; - for addr in node.addresses { + for addr in &node.addresses { addresses.push(JsonValue::String(addr.to_string())); } nodes.push(JsonValue::Array(vec![ - JsonValue::String(hash_to_string(&node.id)), + JsonValue::String(hash_to_string(&node.id())), JsonValue::Array(addresses), ])); } @@ -293,7 +294,7 @@ impl JsonRpcInterface { for (hash, items) in self.fud.seeders_router.read().await.iter() { let mut node_ids = vec![]; for item in items { - node_ids.push(JsonValue::String(hash_to_string(&item.node.id))); + node_ids.push(JsonValue::String(hash_to_string(&item.node.id()))); } seeders_router.insert(hash_to_string(hash), JsonValue::Array(node_ids)); } diff --git a/bin/fud/fud/src/settings.rs b/bin/fud/fud/src/settings.rs new file mode 100644 index 000000000..badfbb356 --- /dev/null +++ b/bin/fud/fud/src/settings.rs @@ -0,0 +1,74 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use structopt::StructOpt; +use structopt_toml::{serde::Deserialize, StructOptToml}; + +use darkfi::{ + cli_desc, dht::DhtSettingsOpt, net::settings::SettingsOpt, rpc::settings::RpcSettingsOpt, +}; + +use crate::pow::PowSettingsOpt; + +pub const CONFIG_FILE: &str = "fud_config.toml"; +pub const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml"); + +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "fud", about = cli_desc!())] +pub struct Args { + #[structopt(short, parse(from_occurrences))] + /// Increase verbosity (-vvv supported) + pub verbose: u8, + + #[structopt(short, long)] + /// Configuration file to use + pub config: Option, + + #[structopt(long)] + /// Set log file path to output daemon logs into + pub log: Option, + + #[structopt(long, default_value = "~/.local/share/darkfi/fud")] + /// Base directory for filesystem storage + pub base_dir: String, + + #[structopt(short, long)] + /// Default path to store downloaded files (defaults to /downloads) + pub downloads_path: Option, + + #[structopt(long, default_value = "60")] + /// Chunk transfer timeout in seconds + pub chunk_timeout: u64, + + #[structopt(flatten)] + /// Network settings + pub net: SettingsOpt, + + #[structopt(flatten)] + /// JSON-RPC settings + pub rpc: RpcSettingsOpt, + + #[structopt(flatten)] + /// DHT settings + pub dht: DhtSettingsOpt, + + #[structopt(flatten)] + /// PoW settings + pub pow: PowSettingsOpt, +} diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs index 39c47a195..d6b4f132a 100644 --- a/bin/fud/fud/src/tasks.rs +++ b/bin/fud/fud/src/tasks.rs @@ -16,11 +16,13 @@ * along with this program. If not, see . */ -use log::{error, info}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; + +use log::{error, info, warn}; use darkfi::{ - dht::DhtHandler, + dht::{DhtHandler, DhtNode}, + geode::hash_to_string, system::{sleep, ExecutorPtr, StoppableTask}, Error, Result, }; @@ -89,7 +91,7 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { loop { sleep(interval).await; - let seeders = vec![fud.dht().node().await.into()]; + let seeders = vec![fud.node().await.into()]; info!(target: "fud::announce_seed_task()", "Verifying seeds..."); let seeding_resources = match fud.verify_resources(None).await { @@ -115,3 +117,90 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await; } } + +/// Background task that: +/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`] +/// 2. Removes old nodes from the DHT +/// 3. Removes old nodes from the seeders router +/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT +pub async fn node_id_task(fud: Arc) -> Result<()> { + let interval = 600; // TODO: Make a setting + + loop { + sleep(interval).await; + + let mut pow = fud.pow.write().await; + let btc = &mut pow.bitcoin_hash_cache; + + if btc.update().await.is_err() { + continue + } + + let block = fud.node_data.read().await.btc_block_hash; + let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) { + Some(i) => i < 6, + None => true, + }; + + if !needs_dht_reset { + // Removes nodes in the DHT with unknown BTC block hashes. + let dht = fud.dht(); + let mut buckets = dht.buckets.write().await; + for bucket in buckets.iter_mut() { + for (i, node) in bucket.nodes.clone().iter().enumerate().rev() { + // If this node's BTC block hash is unknown, remove it from the bucket + if !btc.block_hashes.contains(&node.data.btc_block_hash) { + bucket.nodes.remove(i); + info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id())); + } + } + } + drop(buckets); + + // Removes nodes in the seeders router with unknown BTC block hashes + let mut seeders_router = fud.seeders_router.write().await; + for (key, seeders) in seeders_router.iter_mut() { + for seeder in seeders.clone().iter() { + if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) { + seeders.remove(seeder); + info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key)); + } + } + } + + continue + } + + info!(target: "fud::node_id_task()", "Creating a new node id..."); + let (node_data, secret_key) = match pow.generate_node().await { + Ok(res) => res, + Err(e) => { + warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}"); + continue + } + }; + drop(pow); + info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id())); + + // Close all channels + let dht = fud.dht(); + let mut channel_cache = dht.channel_cache.write().await; + for channel in dht.p2p.hosts().channels().clone() { + channel.stop().await; + channel_cache.remove(&channel.info.id); + } + drop(channel_cache); + + // Reset the DHT + dht.reset().await; + + // Reset the seeders router + *fud.seeders_router.write().await = HashMap::new(); + + // Update our node data and our secret key + *fud.node_data.write().await = node_data; + *fud.secret_key.write().await = secret_key; + + // DHT will be bootstrapped on the next channel connection + } +} diff --git a/src/dht/handler.rs b/src/dht/handler.rs index bdc40ba17..607b463f7 100644 --- a/src/dht/handler.rs +++ b/src/dht/handler.rs @@ -40,27 +40,33 @@ use crate::{ }; #[async_trait] -pub trait DhtHandler { - fn dht(&self) -> Arc; +pub trait DhtHandler { + fn dht(&self) -> Arc>; + + /// Get our own node + async fn node(&self) -> N; /// Send a DHT ping request - async fn ping(&self, channel: ChannelPtr) -> Result; + async fn ping(&self, channel: ChannelPtr) -> Result; /// Triggered when we find a new node - async fn on_new_node(&self, node: &DhtNode) -> Result<()>; + async fn on_new_node(&self, node: &N) -> Result<()>; /// Send FIND NODES request to a peer to get nodes close to `key` - async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result>; + async fn fetch_nodes(&self, node: &N, key: &blake3::Hash) -> Result>; /// Announce message for a key, and add ourselves to router async fn announce( &self, key: &blake3::Hash, message: &M, - router: DhtRouterPtr, - ) -> Result<()> { - let self_node = self.dht().node().await; - if self_node.addresses.is_empty() { + router: DhtRouterPtr, + ) -> Result<()> + where + N: 'async_trait, + { + let self_node = self.node().await; + if self_node.addresses().is_empty() { return Err(().into()); // TODO } @@ -79,6 +85,19 @@ pub trait DhtHandler { Ok(()) } + /// Lookup our own node id to bootstrap our DHT + async fn bootstrap(&self) { + self.dht().set_bootstrapped(true).await; + + let self_node_id = self.node().await.id(); + debug!(target: "dht::DhtHandler::bootstrap()", "DHT bootstrapping {}", hash_to_string(&self_node_id)); + let nodes = self.lookup_nodes(&self_node_id).await; + + if nodes.is_err() || nodes.map_or(true, |v| v.is_empty()) { + self.dht().set_bootstrapped(false).await; + } + } + /// Send a DHT ping request when there is a new channel, to know the node id of the new peer, /// Then fill the channel cache and the buckets async fn channel_task(&self) -> Result<()> { @@ -106,7 +125,7 @@ pub trait DhtHandler { if let Err(e) = ping_res { warn!(target: "dht::DhtHandler::channel_task()", "Error while pinging (requesting node id) {}: {e}", channel.address()); - channel.stop().await; + // channel.stop().await; continue; } @@ -119,7 +138,7 @@ pub trait DhtHandler { }); drop(channel_cache); - if !node.addresses.is_empty() { + if !node.addresses().is_empty() { self.add_node(node.clone()).await; let _ = self.on_new_node(&node.clone()).await; } @@ -127,24 +146,35 @@ pub trait DhtHandler { } /// Add a node in the correct bucket - async fn add_node(&self, node: DhtNode) { + async fn add_node(&self, node: N) + where + N: 'async_trait, + { + let self_node = self.node().await; + // Do not add ourselves to the buckets - if node.id == self.dht().node_id { + if node.id() == self_node.id() { + return; + } + + // Don't add this node if it has any external address that is the same as one of ours + let node_addresses = node.addresses(); + if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) { return; } // Do not add a node to the buckets if it does not have an address - if node.addresses.is_empty() { + if node.addresses().is_empty() { return; } - let bucket_index = self.dht().get_bucket_index(&node.id).await; + let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await; let buckets_lock = self.dht().buckets.clone(); let mut buckets = buckets_lock.write().await; let bucket = &mut buckets[bucket_index]; // Node is already in the bucket - if bucket.nodes.iter().any(|n| n.id == node.id) { + if bucket.nodes.iter().any(|n| n.id() == node.id()) { return; } @@ -175,14 +205,15 @@ pub trait DhtHandler { /// Move a node to the tail in its bucket, /// to show that it is the most recently seen in the bucket. /// If the node is not in a bucket it will be added using `add_node` - async fn update_node(&self, node: &DhtNode) { - let bucket_index = self.dht().get_bucket_index(&node.id).await; + async fn update_node(&self, node: &N) { + let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await; let buckets_lock = self.dht().buckets.clone(); let mut buckets = buckets_lock.write().await; let bucket = &mut buckets[bucket_index]; - let node_index = bucket.nodes.iter().position(|n| n.id == node.id); + let node_index = bucket.nodes.iter().position(|n| n.id() == node.id()); if node_index.is_none() { + drop(buckets); self.add_node(node.clone()).await; return; } @@ -196,17 +227,21 @@ pub trait DhtHandler { async fn fetch_nodes_sp( &self, semaphore: Arc, - node: DhtNode, + node: N, key: &blake3::Hash, - ) -> (DhtNode, Result>) { + ) -> (N, Result>) + where + N: 'async_trait, + { let _permit = semaphore.acquire().await; (node.clone(), self.fetch_nodes(&node, key).await) } /// Find `k` nodes closest to a key - async fn lookup_nodes(&self, key: &blake3::Hash) -> Result> { + async fn lookup_nodes(&self, key: &blake3::Hash) -> Result> { info!(target: "dht::DhtHandler::lookup_nodes()", "Starting node lookup for key {}", bs58::encode(key.as_bytes()).into_string()); + let self_node_id = self.node().await.id(); let k = self.dht().settings.k; let a = self.dht().settings.alpha; let semaphore = Arc::new(Semaphore::new(self.dht().settings.concurrency)); @@ -217,13 +252,13 @@ pub trait DhtHandler { // Nodes with a pending request or a request completed let mut visited_nodes = HashSet::::new(); // Nodes that responded to our request, sorted by distance from `key` - let mut result = Vec::::new(); + let mut result = Vec::::new(); // Create the first `alpha` tasks for _ in 0..a { match nodes_to_visit.pop() { Some(node) => { - visited_nodes.insert(node.id); + visited_nodes.insert(node.id()); futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key)); } None => { @@ -235,13 +270,13 @@ pub trait DhtHandler { while let Some((queried_node, value_result)) = futures.next().await { match value_result { Ok(mut nodes) => { - info!(target: "dht::DhtHandler::lookup_nodes", "Queried {}, got {} nodes", bs58::encode(queried_node.id.as_bytes()).into_string(), nodes.len()); + info!(target: "dht::DhtHandler::lookup_nodes", "Queried {}, got {} nodes", bs58::encode(queried_node.id().as_bytes()).into_string(), nodes.len()); // Remove ourselves and already known nodes from the new nodes nodes.retain(|node| { - node.id != self.dht().node_id && - !visited_nodes.contains(&node.id) && - !nodes_to_visit.iter().any(|n| n.id == node.id) + node.id() != self_node_id && + !visited_nodes.contains(&node.id()) && + !nodes_to_visit.iter().any(|n| n.id() == node.id()) }); // Add new nodes to our buckets @@ -263,10 +298,11 @@ pub trait DhtHandler { if result.len() >= k { if let Some(furthest) = result.last() { if let Some(next_node) = nodes_to_visit.first() { - let furthest_dist = - BigUint::from_bytes_be(&self.dht().distance(key, &furthest.id)); + let furthest_dist = BigUint::from_bytes_be( + &self.dht().distance(key, &furthest.id()), + ); let next_dist = BigUint::from_bytes_be( - &self.dht().distance(key, &next_node.id), + &self.dht().distance(key, &next_node.id()), ); if furthest_dist < next_dist { info!(target: "dht::DhtHandler::lookup_nodes", "Early termination for lookup nodes"); @@ -280,7 +316,7 @@ pub trait DhtHandler { for _ in 0..a { match nodes_to_visit.pop() { Some(node) => { - visited_nodes.insert(node.id); + visited_nodes.insert(node.id()); futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key)); } None => { @@ -301,12 +337,12 @@ pub trait DhtHandler { /// Get a channel (existing or create a new one) to `node` about `topic`. /// Don't forget to call `cleanup_channel()` once you are done with it. - async fn get_channel(&self, node: &DhtNode, topic: Option) -> Result { + async fn get_channel(&self, node: &N, topic: Option) -> Result { let channel_cache_lock = self.dht().channel_cache.clone(); let mut channel_cache = channel_cache_lock.write().await; // Get existing channels for this node, regardless of topic - let channels: HashMap = channel_cache + let channels: HashMap> = channel_cache .iter() .filter(|&(_, item)| item.node == *node) .map(|(&key, item)| (key, item.clone())) @@ -358,7 +394,7 @@ pub trait DhtHandler { drop(channel_cache); // Create a channel - for addr in node.addresses.clone() { + for addr in node.addresses().clone() { let session_out = self.dht().p2p.session_outbound(); let session_weak = Arc::downgrade(&self.dht().p2p.session_outbound()); @@ -422,12 +458,14 @@ pub trait DhtHandler { /// Add nodes as a provider for a key async fn add_to_router( &self, - router: DhtRouterPtr, + router: DhtRouterPtr, key: &blake3::Hash, - router_items: Vec, - ) { + router_items: Vec>, + ) where + N: 'async_trait, + { let mut router_items = router_items.clone(); - router_items.retain(|item| !item.node.addresses.is_empty()); + router_items.retain(|item| !item.node.addresses().is_empty()); debug!(target: "dht::DhtHandler::add_to_router()", "Inserting {} nodes to key {}", router_items.len(), bs58::encode(key.as_bytes()).into_string()); @@ -449,13 +487,13 @@ pub trait DhtHandler { // Add to router_cache for router_item in router_items { - let keys = router_cache.get_mut(&router_item.node.id); + let keys = router_cache.get_mut(&router_item.node.id()); if let Some(k) = keys { k.insert(*key); } else { let mut keys = HashSet::new(); keys.insert(*key); - router_cache.insert(router_item.node.id, keys); + router_cache.insert(router_item.node.id(), keys); } } } diff --git a/src/dht/mod.rs b/src/dht/mod.rs index 68a577319..628c8fc59 100644 --- a/src/dht/mod.rs +++ b/src/dht/mod.rs @@ -16,14 +16,18 @@ * along with this program. If not, see . */ +use std::{ + cmp::Eq, + collections::{HashMap, HashSet}, + fmt::Debug, + hash::{Hash, Hasher}, + marker::{Send, Sync}, + sync::Arc, +}; + use async_trait::async_trait; use num_bigint::BigUint; use smol::lock::RwLock; -use std::{ - collections::{HashMap, HashSet}, - hash::{Hash, Hasher}, - sync::Arc, -}; use url::Url; use darkfi_serial::{SerialDecodable, SerialEncodable}; @@ -36,59 +40,65 @@ pub use settings::{DhtSettings, DhtSettingsOpt}; pub mod handler; pub use handler::DhtHandler; -#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)] -pub struct DhtNode { - pub id: blake3::Hash, - pub addresses: Vec, +pub trait DhtNode: Debug + Clone + Send + Sync + PartialEq + Eq + Hash { + fn id(&self) -> blake3::Hash; + fn addresses(&self) -> Vec; } -impl Hash for DhtNode { - fn hash(&self, state: &mut H) { - self.id.hash(state); - } +/// Implements default Hash, PartialEq, and Eq for a struct implementing [`DhtNode`] +#[macro_export] +macro_rules! impl_dht_node_defaults { + ($t:ty) => { + impl std::hash::Hash for $t { + fn hash(&self, state: &mut H) { + self.id().hash(state); + } + } + impl std::cmp::PartialEq for $t { + fn eq(&self, other: &Self) -> bool { + self.id() == other.id() + } + } + impl std::cmp::Eq for $t {} + }; } +pub use impl_dht_node_defaults; -impl PartialEq for DhtNode { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -pub struct DhtBucket { - pub nodes: Vec, +pub struct DhtBucket { + pub nodes: Vec, } /// "Router" means: Key -> Set of nodes (+ additional data for each node) -pub type DhtRouterPtr = Arc>>>; +pub type DhtRouterPtr = Arc>>>>; #[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)] -pub struct DhtRouterItem { - pub node: DhtNode, +pub struct DhtRouterItem { + pub node: N, pub timestamp: u64, } -impl Hash for DhtRouterItem { +impl Hash for DhtRouterItem { fn hash(&self, state: &mut H) { - self.node.id.hash(state); + self.node.id().hash(state); } } -impl PartialEq for DhtRouterItem { +impl PartialEq for DhtRouterItem { fn eq(&self, other: &Self) -> bool { - self.node.id == other.node.id + self.node.id() == other.node.id() } } -impl From for DhtRouterItem { - fn from(node: DhtNode) -> Self { +impl From for DhtRouterItem { + fn from(node: N) -> Self { DhtRouterItem { node, timestamp: Timestamp::current_time().inner() } } } #[derive(Clone)] -pub struct ChannelCacheItem { +pub struct ChannelCacheItem { /// The DHT node the channel is connected to. - node: DhtNode, + pub node: N, /// Topic is a hash that you set to remember what the channel is about, /// it's not shared with the peer. If you ask for a channel (with @@ -103,17 +113,15 @@ pub struct ChannelCacheItem { usage_count: u32, } -pub struct Dht { - /// Our own node id - pub node_id: blake3::Hash, +pub struct Dht { /// Are we bootstrapped? pub bootstrapped: Arc>, /// Vec of buckets - pub buckets: Arc>>, + pub buckets: Arc>>>, /// Number of buckets pub n_buckets: usize, /// Channel ID -> ChannelCacheItem - pub channel_cache: Arc>>, + pub channel_cache: Arc>>>, /// Node ID -> Set of keys pub router_cache: Arc>>>, @@ -123,13 +131,8 @@ pub struct Dht { pub executor: ExecutorPtr, } -impl Dht { - pub async fn new( - node_id: &blake3::Hash, - settings: &DhtSettings, - p2p: P2pPtr, - ex: ExecutorPtr, - ) -> Self { +impl Dht { + pub async fn new(settings: &DhtSettings, p2p: P2pPtr, ex: ExecutorPtr) -> Self { // Create empty buckets let mut buckets = vec![]; for _ in 0..256 { @@ -137,7 +140,6 @@ impl Dht { } Self { - node_id: *node_id, buckets: Arc::new(RwLock::new(buckets)), n_buckets: 256, bootstrapped: Arc::new(RwLock::new(false)), @@ -156,26 +158,9 @@ impl Dht { *bootstrapped } - pub async fn set_bootstrapped(&self) { + pub async fn set_bootstrapped(&self, value: bool) { let mut bootstrapped = self.bootstrapped.write().await; - *bootstrapped = true; - } - - /// Get own node - pub async fn node(&self) -> DhtNode { - DhtNode { - id: self.node_id, - addresses: self - .p2p - .clone() - .hosts() - .external_addrs() - .await - .iter() - .filter(|addr| !addr.to_string().contains("[::]")) - .cloned() - .collect(), - } + *bootstrapped = value; } /// Get the distance between `key_1` and `key_2` @@ -193,20 +178,20 @@ impl Dht { } /// Sort `nodes` by distance from `key` - pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) { + pub fn sort_by_distance(&self, nodes: &mut [N], key: &blake3::Hash) { nodes.sort_by(|a, b| { - let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id)); - let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id)); + let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id())); + let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id())); distance_a.cmp(&distance_b) }); } /// `key` -> bucket index - pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize { - if key == &self.node_id { + pub async fn get_bucket_index(&self, self_node_id: &blake3::Hash, key: &blake3::Hash) -> usize { + if key == self_node_id { return 0 } - let distance = self.distance(&self.node_id, key); + let distance = self.distance(self_node_id, key); let mut leading_zeros = 0; for &byte in &distance { @@ -224,7 +209,7 @@ impl Dht { /// Get `n` closest known nodes to a key /// TODO: Can be optimized - pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec { + pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec { let buckets_lock = self.buckets.clone(); let buckets = buckets_lock.read().await; @@ -244,7 +229,7 @@ impl Dht { } /// Channel ID -> DhtNode - pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { + pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { let channel_cache_lock = self.channel_cache.clone(); let channel_cache = channel_cache_lock.read().await; if let Some(cached) = channel_cache.get(&channel_id).cloned() { @@ -255,7 +240,7 @@ impl Dht { } /// Remove nodes in router that are older than expiry_secs - pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) { + pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) { let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64); let mut router_write = router.write().await; @@ -269,4 +254,17 @@ impl Dht { } } } + + /// Reset the DHT state + pub async fn reset(&self) { + let mut bootstrapped = self.bootstrapped.write().await; + *bootstrapped = false; + + let mut buckets = vec![]; + for _ in 0..256 { + buckets.push(DhtBucket { nodes: vec![] }) + } + + *self.buckets.write().await = buckets; + } }