diff --git a/Cargo.lock b/Cargo.lock index 0775166b6..6a319d0c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3403,7 +3403,10 @@ dependencies = [ "darkfi", "darkfi-serial", "easy-parallel", + "futures", "log", + "num-bigint", + "rand 0.8.5", "serde", "signal-hook", "signal-hook-async-std", diff --git a/bin/fud/fu/src/main.rs b/bin/fud/fu/src/main.rs index fd920b2b3..61ff0124b 100644 --- a/bin/fud/fu/src/main.rs +++ b/bin/fud/fu/src/main.rs @@ -19,7 +19,7 @@ use clap::{Parser, Subcommand}; use log::info; use simplelog::{ColorChoice, TermLogger, TerminalMode}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use url::Url; use darkfi::{ @@ -49,8 +49,10 @@ struct Args { enum Subcmd { /// Retrieve provided file name from the fud network Get { - /// File name + /// File hash file: String, + /// File name + name: Option, }, /// Put a file onto the fud network @@ -58,6 +60,12 @@ enum Subcmd { /// File name file: String, }, + + /// Get the current node buckets + ListBuckets {}, + + /// Get the router state + ListSeeders {}, } struct Fu { @@ -69,61 +77,11 @@ impl Fu { self.rpc_client.stop().await; } - // async fn list(&self) -> Result<()> { - // let req = JsonRequest::new("list", JsonValue::Array(vec![])); - // let rep = self.rpc_client.request(req).await?; - // - // // Extract response - // let content: Vec = rep[0].clone().try_into().unwrap(); - // let new: Vec = rep[1].clone().try_into().unwrap(); - // let deleted: Vec = rep[2].clone().try_into().unwrap(); - // - // // Print info - // info!("----------Content-------------"); - // if content.is_empty() { - // info!("No file records exists in DHT."); - // } else { - // for name in content { - // info!("\t{}", String::try_from(name).unwrap()); - // } - // } - // info!("------------------------------"); - // - // info!("----------New files-----------"); - // if new.is_empty() { - // info!("No new files to import."); - // } else { - // for name in new { - // info!("\t{}", String::try_from(name).unwrap()); - // } - // } - // info!("------------------------------"); - // - // info!("----------Removed keys--------"); - // if deleted.is_empty() { - // info!("No keys were removed."); - // } else { - // for key in deleted { - // info!("\t{}", String::try_from(key).unwrap()); - // } - // } - // info!("------------------------------"); - // - // Ok(()) - // } - // - // async fn sync(&self) -> Result<()> { - // let req = JsonRequest::new("sync", JsonValue::Array(vec![])); - // self.rpc_client.request(req).await?; - // info!("Daemon synced successfully!"); - // Ok(()) - // } - - async fn get(&self, file: String) -> Result<()> { - let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file)])); + async fn get(&self, file_hash: String, file_name: Option) -> Result<()> { + let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file_hash), JsonValue::String(file_name.unwrap_or_default())])); let rep = self.rpc_client.request(req).await?; - let path = rep.stringify().unwrap(); - info!("File waits you at: {}", path); + let path: String = rep.try_into().unwrap(); + println!("{}", path); Ok(()) } @@ -138,6 +96,55 @@ impl Fu { _ => Err(Error::ParseFailed("File ID is not a string")), } } + + async fn list_buckets(&self) -> Result<()> { + let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![])); + let rep = self.rpc_client.request(req).await?; + let buckets: Vec = rep.try_into().unwrap(); + for (bucket_i, bucket) in buckets.into_iter().enumerate() { + let nodes: Vec = bucket.try_into().unwrap(); + if nodes.len() == 0 { + continue + } + + println!("Bucket {}", bucket_i); + for n in nodes.clone() { + let node: Vec = n.try_into().unwrap(); + let node_id: JsonValue = node[0].clone(); + let addresses: Vec = node[1].clone().try_into().unwrap(); + let mut addrs: Vec = vec![]; + for addr in addresses { + addrs.push(addr.try_into().unwrap()) + } + println!("\t{}: {}", node_id.stringify().unwrap(), addrs.join(", ")); + } + } + + Ok(()) + } + + async fn list_seeders(&self) -> Result<()> { + let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![])); + let rep = self.rpc_client.request(req).await?; + + let files: HashMap = rep["seeders"].clone().try_into().unwrap(); + + println!("Seeders:"); + if files.is_empty() { + println!("No records"); + } else { + for (file_hash, node_ids) in files { + println!("{}", file_hash); + let node_ids: Vec = node_ids.try_into().unwrap(); + for node_id in node_ids { + let node_id: String = node_id.try_into().unwrap(); + println!("\t{}", node_id); + } + } + } + + Ok(()) + } } fn main() -> Result<()> { @@ -156,8 +163,10 @@ fn main() -> Result<()> { match args.command { // Subcmd::List => fu.list().await, // Subcmd::Sync => fu.sync().await, - Subcmd::Get { file } => fu.get(file).await, + Subcmd::Get { file, name } => fu.get(file, name).await, Subcmd::Put { file } => fu.put(file).await, + Subcmd::ListBuckets { } => fu.list_buckets().await, + Subcmd::ListSeeders { } => fu.list_seeders().await, }?; fu.close_connection().await; diff --git a/bin/fud/fud/Cargo.toml b/bin/fud/fud/Cargo.toml index 5c582c1b8..178dbabb6 100644 --- a/bin/fud/fud/Cargo.toml +++ b/bin/fud/fud/Cargo.toml @@ -15,12 +15,15 @@ darkfi-serial = {version = "0.4.2", features = ["hash"]} # Misc async-trait = "0.1.86" blake3 = "1.6.0" +rand = "0.8.5" log = "0.4.26" tinyjson = "2.5.1" url = "2.5.4" +num-bigint = "0.4.6" # Daemon easy-parallel = "3.3.1" +futures = "0.3.31" signal-hook-async-std = "0.2.2" signal-hook = "0.3.17" simplelog = "0.12.2" diff --git a/bin/fud/fud/src/dht.rs b/bin/fud/fud/src/dht.rs new file mode 100644 index 000000000..afa847ad3 --- /dev/null +++ b/bin/fud/fud/src/dht.rs @@ -0,0 +1,479 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use async_trait::async_trait; +use darkfi::{ + net::{connector::Connector, session::Session, ChannelPtr, Message, P2pPtr}, + system::{sleep, ExecutorPtr}, + Error, Result, +}; +use darkfi_serial::{SerialDecodable, SerialEncodable}; +use futures::future::join_all; +use log::{debug, error}; +use num_bigint::BigUint; +use smol::lock::RwLock; +use url::Url; + +#[derive(Debug, Clone, SerialEncodable, SerialDecodable, PartialEq, Eq, Hash)] +pub struct DhtNode { + pub id: blake3::Hash, + pub addresses: Vec, +} + +pub struct DhtBucket { + pub nodes: Vec, +} + +/// "Router" means: Key -> Set of nodes +pub type DhtRouter = Arc>>>; + +// TODO: Add a DhtSettings +pub struct Dht { + pub p2p: P2pPtr, + pub executor: ExecutorPtr, + + pub bootstrapped: Arc>, + + /// Vec of buckets + pub buckets: Arc>>, + /// Number of parallel lookup requests + pub alpha: usize, + /// Number of nodes in a bucket + pub k: usize, + /// Number of buckets + pub n_buckets: usize, + /// Channel ID -> Node ID + pub node_cache: Arc>>, + /// Node ID -> Channel ID + pub channel_cache: Arc>>, + /// Node ID -> Set of keys + pub router_cache: Arc>>>, + /// Our own node + pub node: DhtNode, + /// Seconds + pub timeout: u64, +} +impl Dht { + pub async fn new( + node_id: &blake3::Hash, + a: usize, + k: usize, + timeout: u64, + p2p: P2pPtr, + ex: ExecutorPtr, + ) -> Self { + // Create empty buckets + let mut buckets = vec![]; + for _ in 0..256 { + buckets.push(DhtBucket { nodes: vec![] }) + } + + Self { + p2p: p2p.clone(), + buckets: Arc::new(RwLock::new(buckets)), + bootstrapped: Arc::new(RwLock::new(false)), + alpha: a, + k, + n_buckets: 256, + node_cache: Arc::new(RwLock::new(HashMap::new())), + channel_cache: Arc::new(RwLock::new(HashMap::new())), + router_cache: Arc::new(RwLock::new(HashMap::new())), + executor: ex, + node: DhtNode { id: *node_id, addresses: p2p.clone().hosts().external_addrs().await }, + timeout, + } + } + + pub async fn is_bootstrapped(&self) -> bool { + let bootstrapped = self.bootstrapped.read().await; + return *bootstrapped; + } + + pub async fn set_bootstrapped(&self) { + let mut bootstrapped = self.bootstrapped.write().await; + *bootstrapped = true; + } + + // Get the distance between `key_1` and `key_2` + pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] { + let bytes1 = key_1.as_bytes(); + let bytes2 = key_2.as_bytes(); + + let mut result_bytes = [0u8; 32]; + + for i in 0..32 { + result_bytes[i] = bytes1[i] ^ bytes2[i]; + } + + result_bytes + } + + // Sort `nodes` + pub fn sort_by_distance(&self, nodes: &mut Vec, key: &blake3::Hash) { + nodes.sort_by(|a, b| { + let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id)); + let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id)); + distance_a.cmp(&distance_b) + }); + } + + // key -> bucket index + pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize { + if key == &self.node.id { + return 0 + } + let distance = self.distance(&self.node.id, key); + let mut leading_zeros = 0; + + for &byte in &distance { + if byte == 0 { + leading_zeros += 8; + } else { + leading_zeros += byte.leading_zeros() as usize; + break; + } + } + + let bucket_index = self.n_buckets - leading_zeros; + std::cmp::min(bucket_index, self.n_buckets - 1) + } + + // Get `n` closest known nodes to a key + // TODO: Can be optimized + pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec { + let buckets_lock = self.buckets.clone(); + let buckets = buckets_lock.read().await; + + let mut neighbors = Vec::new(); + + for i in 0..self.n_buckets { + if let Some(bucket) = buckets.get(i) { + neighbors.extend(bucket.nodes.iter().cloned()); + } + } + + self.sort_by_distance(&mut neighbors, key); + + neighbors.truncate(n); + + neighbors + } + + pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { + let node_cache_lock = self.node_cache.clone(); + let node_cache = node_cache_lock.read().await; + node_cache.get(&channel_id).cloned() + } +} + +#[async_trait] +pub trait DhtHandler { + fn dht(&self) -> Arc; + + // Send a DHT ping request + async fn ping(&self, channel: ChannelPtr) -> Result; + + // Triggered when we find a new node + async fn on_new_node(&self, node: &DhtNode) -> Result<()>; + + // Send FIND NODES request to a peer to get nodes close to `key` + async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result>; + + // Announce message `m` for a key, and add ourselves to router + async fn announce( + &self, + key: &blake3::Hash, + message: &M, + router: DhtRouter, + ) -> Result<()> { + self.add_to_router(router.clone(), key, vec![self.dht().node.clone()]).await; + let nodes = self.lookup_nodes(key).await?; + + for node in nodes { + let channel = self.get_channel(&node).await; + if let Ok(ch) = channel { + let _ = ch.send(message).await; + } + } + + Ok(()) + } + + // Send a DHT ping request when there is a new channel, to know the node id of the new peer, + // Then fill the channel cache and the buckets + async fn channel_task(&self) -> Result<()> { + loop { + let channel_sub = self.dht().p2p.hosts().subscribe_channel().await; + let res = channel_sub.receive().await; + if res.is_err() { + continue; + } + let channel = res.unwrap(); + let channel_cache_lock = self.dht().channel_cache.clone(); + let mut channel_cache = channel_cache_lock.write().await; + if !channel.is_stopped() && !channel_cache.values().any(|&v| v == channel.info.id) { + let node = self.ping(channel.clone()).await; + + if let Ok(n) = node { + channel_cache.insert(n.id, channel.info.id); + drop(channel_cache); + + let node_cache_lock = self.dht().node_cache.clone(); + let mut node_cache = node_cache_lock.write().await; + node_cache.insert(channel.info.id, n.clone()); + drop(node_cache); + + self.add_node(n.clone()).await; + + let _ = self.on_new_node(&n.clone()).await; + } + } + } + } + + // Remove disconnected nodes from the channel cache + async fn disconnect_task(&self) -> Result<()> { + loop { + sleep(15).await; + + let channel_cache_lock = self.dht().channel_cache.clone(); + let mut channel_cache = channel_cache_lock.write().await; + for (node_id, channel_id) in channel_cache.clone() { + let channel = self.dht().p2p.get_channel(channel_id); + if channel.is_none() || (channel.is_some() && channel.unwrap().is_stopped()) { + channel_cache.remove(&node_id); + } + } + } + } + + // Add a node in the correct bucket + async fn add_node(&self, node: DhtNode) { + if node.id == self.dht().node.id { + return; + } + + let bucket_index = self.dht().get_bucket_index(&node.id).await; + let buckets_lock = self.dht().buckets.clone(); + let mut buckets = buckets_lock.write().await; + let bucket = &mut buckets[bucket_index]; + + // Node is already in the bucket + if bucket.nodes.iter().any(|n| n.id == node.id) { + return; + } + + // Bucket is full + if bucket.nodes.len() >= self.dht().k { + // Ping the least recently seen node + let channel = self.get_channel(&bucket.nodes[0]).await; + if channel.is_ok() { + let ping_res = self.ping(channel.unwrap()).await; + if ping_res.is_ok() { + // Ping was successful, move the least recently seen node to the tail + let n = bucket.nodes.remove(0); + bucket.nodes.push(n); + return; + } + } + + // Ping was not successful, remove the least recently seen node and add the new node + bucket.nodes.remove(0); + bucket.nodes.push(node); + return; + } + + // Bucket is not full + bucket.nodes.push(node); + } + + /// Move a node to the tail in its bucket, + /// to show that it is the most recently seen in the bucket. + /// If the node is not in a bucket it will be added using `add_node` + async fn update_node(&self, node: &DhtNode) { + let bucket_index = self.dht().get_bucket_index(&node.id).await; + let buckets_lock = self.dht().buckets.clone(); + let mut buckets = buckets_lock.write().await; + let bucket = &mut buckets[bucket_index]; + + let node_index = bucket.nodes.iter().position(|n| n.id == node.id); + if node_index.is_none() { + self.add_node(node.clone()).await; + return; + } + + let n = bucket.nodes.remove(node_index.unwrap()); + bucket.nodes.push(n); + } + + // Find nodes closest to a key + async fn lookup_nodes(&self, key: &blake3::Hash) -> Result> { + debug!(target: "dht::DhtHandler::lookup_nodes()", "Starting node lookup for key {}", key); + + let k = self.dht().k; + let a = self.dht().alpha; + let mut visited_nodes = HashSet::new(); + let mut nodes_to_visit = self.dht().find_neighbors(key, k).await; + let mut nearest_nodes: Vec = vec![]; + + while !nodes_to_visit.is_empty() { + let mut queries: Vec = Vec::with_capacity(a); + + // Get `alpha` nodes from `nodes_to_visit` which is sorted by distance + for _ in 0..a { + match nodes_to_visit.pop() { + Some(node) => { + queries.push(node); + } + None => { + break; + } + } + } + + let mut tasks = Vec::with_capacity(queries.len()); + for node in &queries { + // Avoid visiting the same node multiple times + if !visited_nodes.insert(node.id) { + continue; + } + + // Query the node for the value associated with the key + tasks.push(self.fetch_nodes(node, key)); + } + + let results = join_all(tasks).await; + for (i, value_result) in results.into_iter().enumerate() { + match value_result { + Ok(mut nodes) => { + // Remove ourselves from the new nodes + nodes.retain(|node| node.id != self.dht().node.id); + + // Add each new node to our buckets + for node in nodes.clone() { + self.add_node(node).await; + } + + // Add nodes to the list of nodes to visit + nodes_to_visit.extend(nodes); + self.dht().sort_by_distance(&mut nodes_to_visit, key); + + // Update nearest_nodes + nearest_nodes.push(queries[i].clone()); + self.dht().sort_by_distance(&mut nearest_nodes, key); + } + Err(e) => { + error!(target: "dht::DhtHandler::lookup_nodes", "{}", e); + } + } + } + + // Early termination check + // Stops if our furthest visited node is closer than the closest node we will query + if let Some(furthest) = nearest_nodes.last() { + if let Some(next_node) = nodes_to_visit.first() { + let furthest_dist = + BigUint::from_bytes_be(&self.dht().distance(key, &furthest.id)); + let next_dist = + BigUint::from_bytes_be(&self.dht().distance(key, &next_node.id)); + if furthest_dist < next_dist { + break; + } + } + } + } + + nearest_nodes.truncate(k); + return Ok(nearest_nodes) + } + + // Get an existing channel, or create a new one + async fn get_channel(&self, node: &DhtNode) -> Result { + let channel_cache_lock = self.dht().channel_cache.clone(); + let channel_cache = channel_cache_lock.read().await; + + if let Some(channel_id) = channel_cache.get(&node.id) { + if let Some(channel) = self.dht().p2p.get_channel(*channel_id) { + if channel.is_stopped() { + channel.clone().start(self.dht().executor.clone()); + } + return Ok(channel); + } + } + + // Create a channel + for addr in node.addresses.clone() { + let session_out = self.dht().p2p.session_outbound(); + let session_weak = Arc::downgrade(&self.dht().p2p.session_outbound()); + + let connector = Connector::new(self.dht().p2p.settings(), session_weak); + let connect_res = connector.connect(&addr).await; + if connect_res.is_err() { + continue; + } + let (_, channel) = connect_res.unwrap(); + let register_res = + session_out.register_channel(channel.clone(), self.dht().executor.clone()).await; + if register_res.is_err() { + channel.clone().stop().await; + continue; + } + + return Ok(channel) + } + + Err(Error::Custom("Could not create channel".to_string())) + } + + // Add nodes as a provider for a key + async fn add_to_router(&self, router: DhtRouter, key: &blake3::Hash, nodes: Vec) { + debug!(target: "dht::DhtHandler::add_to_router()", "Inserting {} nodes to key {}", nodes.len(), key); + + let mut router_write = router.write().await; + let key_r = router_write.get_mut(key); + + let router_cache_lock = self.dht().router_cache.clone(); + let mut router_cache = router_cache_lock.write().await; + + // Add to router + if let Some(k) = key_r { + k.extend(nodes.clone()); + } else { + let mut hs = HashSet::new(); + hs.extend(nodes.clone()); + router_write.insert(*key, hs); + } + + // Add to router_cache + for node in nodes { + let keys = router_cache.get_mut(&node.id); + if let Some(k) = keys { + k.insert(*key); + } else { + let mut keys = HashSet::new(); + keys.insert(*key); + router_cache.insert(node.id, keys); + } + } + } +} diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index af6d7087d..c2315eb6d 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -18,32 +18,36 @@ use std::{ collections::{HashMap, HashSet}, + io::ErrorKind, + path::PathBuf, sync::Arc, }; +use num_bigint::BigUint; + use async_trait::async_trait; +use dht::{Dht, DhtHandler, DhtNode, DhtRouter}; +use futures::{ + future::{try_select, Either, FutureExt}, + pin_mut, +}; use log::{debug, error, info, warn}; +use rand::{rngs::OsRng, RngCore}; use smol::{ channel, - fs::File, + fs::{File, OpenOptions}, + io::{AsyncReadExt, AsyncWriteExt}, lock::{Mutex, MutexGuard, RwLock}, stream::StreamExt, Executor, }; use structopt_toml::{structopt::StructOpt, StructOptToml}; use tinyjson::JsonValue; -use url::Url; use darkfi::{ async_daemonize, cli_desc, geode::Geode, - net::{ - connector::Connector, - protocol::ProtocolVersion, - session::{Session, SESSION_DEFAULT}, - settings::SettingsOpt, - P2p, P2pPtr, - }, + net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr}, rpc::{ jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber}, p2p_method::HandlerP2p, @@ -58,13 +62,18 @@ use darkfi::{ /// P2P protocols mod proto; use proto::{ - FudChunkPut, FudChunkReply, FudChunkRequest, FudFilePut, FudFileReply, FudFileRequest, + FudAnnounce, FudChunkReply, FudFileReply, FudFindNodesReply, FudFindNodesRequest, + FudFindRequest, FudFindSeedersReply, FudFindSeedersRequest, FudPingReply, FudPingRequest, ProtocolFud, }; +mod dht; + const CONFIG_FILE: &str = "fud_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml"); +const NODE_ID_PATH: &str = "node_id"; + #[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)] #[serde(default)] #[structopt(name = "fud", about = cli_desc!())] @@ -95,15 +104,18 @@ struct Args { } pub struct Fud { - /// Routing table for file metadata - metadata_router: Arc>>>, - /// Routing table for file chunks - chunks_router: Arc>>>, + /// Key -> Seeders + seeders_router: DhtRouter, + /// Pointer to the P2P network instance p2p: P2pPtr, + /// The Geode instance geode: Geode, + /// The DHT instance + dht: Arc, + file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>, file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>, file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>, @@ -131,6 +143,8 @@ impl RequestHandler<()> for Fud { "dnet.switch" => self.dnet_switch(req.id, req.params).await, "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, + "list_buckets" => self.list_buckets(req.id, req.params).await, + "list_seeders" => self.list_seeders(req.id, req.params).await, _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } } @@ -177,24 +191,42 @@ impl Fud { } }; - let fud_file = FudFilePut { file_hash, chunk_hashes }; - self.p2p.broadcast(&fud_file).await; + // Announce file + let self_node = self.dht.node.clone(); + let fud_announce = FudAnnounce { key: file_hash, nodes: vec![self_node.clone()] }; + let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await; + + // Announce chunks + for chunk_hash in chunk_hashes { + let fud_announce = FudAnnounce { key: chunk_hash, nodes: vec![self_node.clone()] }; + let _ = self.announce(&chunk_hash, &fud_announce, self.seeders_router.clone()).await; + } JsonResponse::new(JsonValue::String(file_hash.to_hex().to_string()), id).into() } // RPCAPI: // Fetch a file from the network. Takes a file hash as parameter. - // Returns the paths to the local chunks of the file, if found/fetched. + // Returns the path to the assembled file, if found/fetched. // // --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd"], "id": 42} - // <-- {"jsonrpc": "2.0", "result: ["~/.local/share/darkfi/fud/chunks/fab1...2314", ...], "id": 42} + // <-- {"jsonrpc": "2.0", "result: "~/.local/share/darkfi/fud/downloads/fab1...2314", "id": 42} async fn get(&self, id: u16, params: JsonValue) -> JsonResult { let params = params.get::>().unwrap(); - if params.len() != 1 || !params[0].is_string() { + if params.len() != 2 || !params[0].is_string() || !params[1].is_string() { return JsonError::new(ErrorCode::InvalidParams, None, id).into() } + let file_name: Option = match params[1].get::() { + Some(name) => match name.is_empty() { + true => None, + false => Some(name.clone()), + }, + None => None, + }; + + let self_node = self.dht.node.clone(); + let file_hash = match blake3::Hash::from_hex(params[0].get::().unwrap()) { Ok(v) => v, Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(), @@ -209,18 +241,7 @@ impl Fud { info!("Waiting for background file fetch task..."); let (i_file_hash, status) = self.file_fetch_end_rx.recv().await.unwrap(); match status { - Ok(()) => { - let ch_file = self.geode.get(&file_hash).await.unwrap(); - - let m = FudFilePut { - file_hash: i_file_hash, - chunk_hashes: ch_file.iter().map(|(h, _)| *h).collect(), - }; - - self.p2p.broadcast(&m).await; - - ch_file - } + Ok(()) => self.geode.get(&i_file_hash).await.unwrap(), Err(Error::GeodeFileRouteNotFound) => { // TODO: Return FileNotFound error @@ -235,16 +256,17 @@ impl Fud { }; if chunked_file.is_complete() { - let chunks: Vec = chunked_file - .iter() - .map(|(_, path)| { - JsonValue::String( - path.as_ref().unwrap().clone().into_os_string().into_string().unwrap(), - ) - }) - .collect(); + let fud_announce = FudAnnounce { key: file_hash, nodes: vec![self_node.clone()] }; + let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await; - return JsonResponse::new(JsonValue::Array(chunks), id).into() + return match self.geode.assemble_file(&file_hash, &chunked_file, file_name).await { + Ok(file_path) => JsonResponse::new( + JsonValue::String(file_path.to_string_lossy().to_string()), + id, + ) + .into(), + Err(_) => JsonError::new(ErrorCode::InternalError, None, id).into(), + } } // Fetch any missing chunks @@ -261,8 +283,11 @@ impl Fud { match status { Ok(()) => { - let m = FudChunkPut { chunk_hash: i_chunk_hash }; - self.p2p.broadcast(&m).await; + let fud_announce = + FudAnnounce { key: i_chunk_hash, nodes: vec![self_node.clone()] }; + let _ = self + .announce(&i_chunk_hash, &fud_announce, self.seeders_router.clone()) + .await; } Err(Error::GeodeChunkRouteNotFound) => continue, @@ -277,19 +302,16 @@ impl Fud { if !chunked_file.is_complete() { todo!(); - // Return JsonError missing chunks + // TODO: Return JsonError missing chunks } - let chunks: Vec = chunked_file - .iter() - .map(|(_, path)| { - JsonValue::String( - path.as_ref().unwrap().clone().into_os_string().into_string().unwrap(), - ) - }) - .collect(); - - JsonResponse::new(JsonValue::Array(chunks), id).into() + return match self.geode.assemble_file(&file_hash, &chunked_file, file_name).await { + Ok(file_path) => { + JsonResponse::new(JsonValue::String(file_path.to_string_lossy().to_string()), id) + .into() + } + Err(_) => JsonError::new(ErrorCode::InternalError, None, id).into(), + } } // RPCAPI: @@ -331,6 +353,59 @@ impl Fud { self.dnet_sub.clone().into() } + + // RPCAPI: + // Returns the current buckets + // + // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1} + pub async fn list_buckets(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + let mut buckets = vec![]; + for bucket in self.dht.buckets.read().await.iter() { + let mut nodes = vec![]; + for node in bucket.nodes.clone() { + let mut addresses = vec![]; + for addr in node.addresses { + addresses.push(JsonValue::String(addr.to_string())); + } + nodes.push(JsonValue::Array(vec![ + JsonValue::String(node.id.to_hex().to_string()), + JsonValue::Array(addresses), + ])); + } + buckets.push(JsonValue::Array(nodes)); + } + + JsonResponse::new(JsonValue::Array(buckets), id).into() + } + + // RPCAPI: + // Returns the content of the seeders router + // + // --> {"jsonrpc": "2.0", "method": "list_routes", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdef": ["ghijkl"]}}, "id": 1} + pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + let mut seeders_router: HashMap = HashMap::new(); + for (hash, nodes) in self.seeders_router.read().await.iter() { + let mut node_ids = vec![]; + for node in nodes { + node_ids.push(JsonValue::String(node.id.to_hex().to_string())); + } + seeders_router.insert(hash.to_hex().to_string(), JsonValue::Array(node_ids)); + } + let mut res: HashMap = HashMap::new(); + res.insert("seeders".to_string(), JsonValue::Object(seeders_router)); + + JsonResponse::new(JsonValue::Object(res), id).into() + } } impl HandlerP2p for Fud { @@ -339,229 +414,293 @@ impl HandlerP2p for Fud { } } -/// Background task that receives file fetch requests and tries to -/// fetch objects from the network using the routing table. -/// TODO: This can be optimised a lot for connection reuse, etc. -async fn fetch_file_task(fud: Arc, executor: Arc>) -> Result<()> { - info!("Started background file fetch task"); - loop { - let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap(); - info!("fetch_file_task: Received {}", file_hash); +enum FetchReply { + File(FudFileReply), + Chunk(FudChunkReply), +} - let mut metadata_router = fud.metadata_router.write().await; - let peers = metadata_router.get_mut(&file_hash); +/// Fetch a file or chunk from the network +/// 1. Lookup nodes close to the key +/// 2. Request seeders for the file/chunk from those nodes +/// 3. Request the file/chunk from the seeders +async fn fetch(fud: Arc, key: blake3::Hash) -> Option { + let mut queried_seeders: HashSet = HashSet::new(); + let closest_nodes = fud.lookup_nodes(&key).await; // 1 + let mut result: Option = None; + if closest_nodes.is_err() { + return None + } - if peers.is_none() { - warn!("File {} not in routing table, cannot fetch", file_hash); - fud.file_fetch_end_tx - .send((file_hash, Err(Error::GeodeFileRouteNotFound))) - .await - .unwrap(); - continue - } + for node in closest_nodes.unwrap() { + // 2. Request list of seeders + let channel = match fud.get_channel(&node).await { + Ok(channel) => channel, + Err(e) => { + warn!(target: "fud::fetch()", "Could not get a channel for node {}: {}", node.id, e); + continue; + } + }; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; - let mut found = false; - let peers = peers.unwrap(); - let mut invalid_file_routes = vec![]; + let msg_subscriber = match channel.subscribe_msg::().await { + Ok(msg_subscriber) => msg_subscriber, + Err(e) => { + warn!(target: "fud::fetch()", "Error subscribing to msg: {}", e); + continue; + } + }; - for peer in peers.iter() { - let session_out = fud.p2p.session_outbound(); - let session_weak = Arc::downgrade(&fud.p2p.session_outbound()); + let _ = channel.send(&FudFindSeedersRequest { key }).await; - info!("Connecting to {} to fetch {}", peer, file_hash); - let connector = Connector::new(fud.p2p.settings(), session_weak); - match connector.connect(peer).await { - Ok((url, channel)) => { - let proto_ver = - ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await; + let reply = match msg_subscriber.receive_with_timeout(fud.dht().timeout).await { + Ok(reply) => reply, + Err(e) => { + warn!(target: "fud::fetch()", "Error waiting for reply: {}", e); + continue; + } + }; - let handshake_task = session_out.perform_handshake_protocols( - proto_ver, - channel.clone(), - executor.clone(), - ); + let mut seeders = reply.nodes.clone(); + info!(target: "fud::fetch()", "Found seeders for {}: {:?}", key, seeders); - channel.clone().start(executor.clone()); + msg_subscriber.unsubscribe().await; - if let Err(e) = handshake_task.await { - error!("Handshake with {} failed: {}", url, e); - // Delete peer from router - invalid_file_routes.push(peer.clone()); - continue + // 3. Request the file/chunk from the seeders + while let Some(seeder) = seeders.pop() { + // Only query a seeder once + if queried_seeders.iter().any(|s| *s == seeder.id) { + continue; + } + queried_seeders.insert(seeder.id); + + if let Ok(channel) = fud.get_channel(&seeder).await { + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + let msg_subscriber_chunk = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_file = channel.subscribe_msg::().await.unwrap(); + + let _ = channel.send(&FudFindRequest { key }).await; + + let chunk_recv = + msg_subscriber_chunk.receive_with_timeout(fud.dht().timeout).fuse(); + let file_recv = msg_subscriber_file.receive_with_timeout(fud.dht().timeout).fuse(); + + pin_mut!(chunk_recv, file_recv); + + // Wait for a FudChunkReply or a FudFileReply + match try_select(chunk_recv, file_recv).await { + Ok(Either::Left((chunk_reply, _))) => { + info!(target: "fud::fetch()", "Received chunk {} from seeder {:?}", key, seeder.id); + msg_subscriber.unsubscribe().await; + result = Some(FetchReply::Chunk((*chunk_reply).clone())); + break; } - - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let msg_subscriber = channel.subscribe_msg::().await.unwrap(); - let request = FudFileRequest { file_hash }; - - if let Err(e) = channel.send(&request).await { - error!("Failed sending FudFileRequest({}) to {}: {}", file_hash, url, e); - continue + Ok(Either::Right((file_reply, _))) => { + info!(target: "fud::fetch()", "Received file {} from seeder {:?}", key, seeder.id); + msg_subscriber.unsubscribe().await; + result = Some(FetchReply::File((*file_reply).clone())); + break; } - - // TODO: With timeout! - let reply = match msg_subscriber.receive().await { - Ok(v) => v, - Err(e) => { - error!("Error receiving FudFileReply from subscriber: {}", e); - continue - } - }; - - msg_subscriber.unsubscribe().await; - channel.stop().await; - - if let Err(e) = fud.geode.insert_file(&file_hash, &reply.chunk_hashes).await { - error!("Failed inserting file {} to Geode: {}", file_hash, e); - continue + Err(e) => { + match e { + Either::Left((chunk_err, _)) => { + warn!(target: "fud::fetch()", "Error waiting for chunk reply: {}", chunk_err); + } + Either::Right((file_err, _)) => { + warn!(target: "fud::fetch()", "Error waiting for file reply: {}", file_err); + } + }; + msg_subscriber.unsubscribe().await; + continue; } - - found = true; - break - } - - Err(e) => { - error!("Failed to connect to {}: {}", peer, e); - continue - } + }; } } - for peer in invalid_file_routes { - debug!("Removing peer {} from {} file router", peer, file_hash); - peers.remove(&peer); + if result.is_some() { + break; } + } - if !found { - warn!("Did not manage to fetch {} file metadata", file_hash); - fud.file_fetch_end_tx - .send((file_hash, Err(Error::GeodeFileRouteNotFound))) - .await - .unwrap(); - continue - } + result +} - info!("Successfully fetched {} file metadata", file_hash); - fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); +/// Background task that receives file fetch requests and tries to +/// fetch objects from the network using the routing table. +/// TODO: This can be optimised a lot for connection reuse, etc. +async fn fetch_file_task(fud: Arc, _: Arc>) -> Result<()> { + info!(target: "fud::fetch_file_task()", "Started background file fetch task"); + loop { + let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap(); + info!(target: "fud::fetch_file_task()", "Fetching file {}", file_hash); + + let result = fetch(fud.clone(), file_hash).await; + + match result { + Some(reply) => { + match reply { + FetchReply::File(FudFileReply { chunk_hashes }) => { + if let Err(e) = fud.geode.insert_file(&file_hash, &chunk_hashes).await { + error!("Failed inserting file {} to Geode: {}", file_hash, e); + } + fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); + } + // Looked for a file but got a chunk, meaning that file_hash = chunk_hash, the file fits in a single chunk + FetchReply::Chunk(FudChunkReply { chunk }) => { + // TODO: Verify chunk + info!(target: "fud::fetch()", "File fits in a single chunk"); + let _ = fud.geode.insert_file(&file_hash, &[file_hash]).await; + match fud.geode.insert_chunk(&chunk).await { + Ok(inserted_hash) => { + if inserted_hash != file_hash { + warn!("Received chunk does not match requested file"); + } + } + Err(e) => { + error!("Failed inserting chunk {} to Geode: {}", file_hash, e); + } + }; + fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); + } + } + } + None => { + fud.file_fetch_end_tx + .send((file_hash, Err(Error::GeodeFileRouteNotFound))) + .await + .unwrap(); + } + }; } } /// Background task that receives chunk fetch requests and tries to /// fetch objects from the network using the routing table. /// TODO: This can be optimised a lot for connection reuse, etc. -async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result<()> { - info!("Started background chunk fetch task"); +async fn fetch_chunk_task(fud: Arc, _: Arc>) -> Result<()> { + info!(target: "fud::fetch_chunk_task()", "Started background chunk fetch task"); loop { let (chunk_hash, _) = fud.chunk_fetch_rx.recv().await.unwrap(); - info!("fetch_chunk_task: Received {}", chunk_hash); + info!(target: "fud::fetch_chunk_task()", "Fetching chunk {}", chunk_hash); - let mut chunk_router = fud.chunks_router.write().await; - let peers = chunk_router.get_mut(&chunk_hash); + let result = fetch(fud.clone(), chunk_hash).await; - if peers.is_none() { - warn!("Chunk {} not in routing table, cannot fetch", chunk_hash); - fud.chunk_fetch_end_tx - .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) - .await - .unwrap(); - continue + match result { + Some(reply) => { + match reply { + FetchReply::Chunk(FudChunkReply { chunk }) => { + // TODO: Verify chunk + match fud.geode.insert_chunk(&chunk).await { + Ok(inserted_hash) => { + if inserted_hash != chunk_hash { + warn!("Received chunk does not match requested chunk"); + } + } + Err(e) => { + error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e); + } + }; + fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap(); + } + _ => { + // Looked for a chunk but got a file instead, not supposed to happen + fud.chunk_fetch_end_tx + .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) + .await + .unwrap(); + } + } + } + None => { + fud.chunk_fetch_end_tx + .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) + .await + .unwrap(); + } + }; + } +} + +#[async_trait] +impl DhtHandler for Fud { + fn dht(&self) -> Arc { + self.dht.clone() + } + + async fn ping(&self, channel: ChannelPtr) -> Result { + debug!(target: "fud::Fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id); + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + let msg_subscriber = channel.subscribe_msg::().await.unwrap(); + let request = FudPingRequest {}; + + channel.send(&request).await?; + + let reply = msg_subscriber.receive_with_timeout(self.dht().timeout).await?; + + msg_subscriber.unsubscribe().await; + + Ok(reply.node.clone()) + } + + // TODO: Optimize this + async fn on_new_node(&self, node: &DhtNode) -> Result<()> { + debug!(target: "fud::Fud::DhtHandler::on_new_node()", "New node {}", node.id); + + // If this is the first node we know about, then bootstrap + if !self.dht().is_bootstrapped().await { + self.dht().set_bootstrapped().await; + + // Lookup our own node id + let self_node = self.dht().node.clone(); + debug!(target: "fud::Fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", self_node.id); + let _ = self.lookup_nodes(&self_node.id).await; } - let mut found = false; - let peers = peers.unwrap(); - let mut invalid_chunk_routes = vec![]; - - for peer in peers.iter() { - let session_out = fud.p2p.session_outbound(); - let session_weak = Arc::downgrade(&fud.p2p.session_outbound()); - - info!("Connecting to {} to fetch {}", peer, chunk_hash); - let connector = Connector::new(fud.p2p.settings(), session_weak); - match connector.connect(peer).await { - Ok((url, channel)) => { - let proto_ver = - ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await; - - let handshake_task = session_out.perform_handshake_protocols( - proto_ver, - channel.clone(), - executor.clone(), - ); - - channel.clone().start(executor.clone()); - - if let Err(e) = handshake_task.await { - error!("Handshake with {} failed: {}", url, e); - // Delete peer from router - invalid_chunk_routes.push(peer.clone()); - continue - } - - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let msg_subscriber = channel.subscribe_msg::().await.unwrap(); - let request = FudChunkRequest { chunk_hash }; - - if let Err(e) = channel.send(&request).await { - error!("Failed sending FudChunkRequest({}) to {}: {}", chunk_hash, url, e); - continue - } - - // TODO: With timeout! - let reply = match msg_subscriber.receive().await { - Ok(v) => v, - Err(e) => { - error!("Error receiving FudChunkReply from subscriber: {}", e); - continue - } - }; - - msg_subscriber.unsubscribe().await; - channel.stop().await; - - match fud.geode.insert_chunk(&reply.chunk).await { - Ok(inserted_hash) => { - if inserted_hash != chunk_hash { - warn!("Received chunk does not match requested chunk"); - invalid_chunk_routes.push(peer.clone()); - continue - } - } - Err(e) => { - error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e); - continue - } - } - - found = true; - break - } - - Err(e) => { - error!("Failed to connect to {}: {}", peer, e); - continue - } + // Send keys that are closer to this node than we are + let self_id = self.dht().node.id; + let channel = self.get_channel(node).await?; + for (key, nodes) in self.seeders_router.read().await.iter() { + let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id)); + let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); + if node_distance <= self_distance { + let _ = channel + .send(&FudAnnounce { key: *key, nodes: nodes.iter().cloned().collect() }) + .await; } } - for peer in invalid_chunk_routes { - debug!("Removing peer {} from {} chunk router", peer, chunk_hash); - peers.remove(&peer); - } - - if !found { - warn!("Did not manage to fetch {} chunk", chunk_hash); - fud.chunk_fetch_end_tx - .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) - .await - .unwrap(); - continue - } - - info!("Successfully fetched {} chunk", chunk_hash); - fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap(); + Ok(()) } + + async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result> { + debug!(target: "fud::Fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", key, node.id); + + let channel = self.get_channel(node).await?; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + let msg_subscriber_nodes = channel.subscribe_msg::().await.unwrap(); + + let request = FudFindNodesRequest { key: *key }; + channel.send(&request).await?; + + let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().timeout).await?; + + msg_subscriber_nodes.unsubscribe().await; + + Ok(reply.nodes.clone()) + } +} + +// TODO: This is not Sybil-resistant +fn generate_node_id() -> Result { + let mut rng = OsRng; + let mut random_data = [0u8; 32]; + rng.fill_bytes(&mut random_data); + let node_id = blake3::Hash::from_bytes(random_data); + Ok(node_id) } async_daemonize!(realmain); @@ -569,9 +708,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // The working directory for this daemon and geode. let basedir = expand_path(&args.base_dir)?; - // Hashmaps used for routing - let metadata_router = Arc::new(RwLock::new(HashMap::new())); - let chunks_router = Arc::new(RwLock::new(HashMap::new())); + // Hashmap used for routing + let seeders_router = Arc::new(RwLock::new(HashMap::new())); info!("Instantiating Geode instance"); let geode = Geode::new(&basedir).await?; @@ -579,6 +717,16 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!("Instantiating P2P network"); let p2p = P2p::new(args.net.into(), ex.clone()).await?; + let external_addrs = p2p.hosts().external_addrs().await; + + if external_addrs.is_empty() { + error!( + target: "fud::realmain", + "External addrs not configured. Stopping", + ); + return Ok(()) + } + info!("Starting dnet subs task"); let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); let dnet_sub_ = dnet_sub.clone(); @@ -603,16 +751,46 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); + // Get or generate the node id + let node_id: Result = { + let mut node_id_path: PathBuf = basedir.clone(); + node_id_path.push(NODE_ID_PATH); + match File::open(node_id_path.clone()).await { + Ok(mut file) => { + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).await?; + let buf: [u8; 64] = buffer.try_into().expect("Node ID must have 64 characters"); + let node_id = blake3::Hash::from_hex(buf)?; + Ok(node_id) + } + Err(e) if e.kind() == ErrorKind::NotFound => { + let node_id = generate_node_id()?; + let mut file = + OpenOptions::new().write(true).create(true).open(node_id_path).await?; + file.write_all(node_id.to_hex().as_bytes()).await?; + file.flush().await?; + Ok(node_id) + } + Err(e) => Err(e.into()), + } + }; + + let node_id_ = node_id?; + + info!(target: "fud", "Your node ID: {}", node_id_); + // Daemon instantiation let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded(); let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded(); let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded(); let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded(); + // TODO: Add DHT settings in the config file + let dht = Arc::new(Dht::new(&node_id_, 4, 16, 15, p2p.clone(), ex.clone()).await); let fud = Arc::new(Fud { - metadata_router, - chunks_router, + seeders_router, p2p: p2p.clone(), geode, + dht: dht.clone(), file_fetch_tx, file_fetch_rx, file_fetch_end_tx, @@ -680,6 +858,34 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { .await; p2p.clone().start().await?; + info!(target: "fud", "Starting DHT tasks"); + let dht_channel_task = StoppableTask::new(); + let fud_ = fud.clone(); + dht_channel_task.clone().start( + async move { fud_.channel_task::().await }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "fud", "Failed starting dht channel task: {}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + let dht_disconnect_task = StoppableTask::new(); + let fud_ = fud.clone(); + dht_disconnect_task.clone().start( + async move { fud_.disconnect_task().await }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "fud", "Failed starting dht disconnect task: {}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?; @@ -694,9 +900,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "fud", "Stopping JSON-RPC server..."); rpc_task.stop().await; - info!("Stopping P2P network"); + info!(target: "fud", "Stopping P2P network..."); p2p.stop().await; + info!(target: "fud", "Stopping DHT tasks"); + dht_channel_task.stop().await; + dht_disconnect_task.stop().await; + info!("Bye!"); Ok(()) } diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 22a848e38..4d7dce216 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use async_trait::async_trait; use darkfi::{ @@ -30,50 +30,11 @@ use darkfi::{ Error, Result, }; use darkfi_serial::{SerialDecodable, SerialEncodable}; -use log::{debug, error}; +use log::{debug, error, info}; use smol::{fs::File, Executor}; -use url::Url; use super::Fud; - -/// Message representing a new file on the network -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudFilePut { - pub file_hash: blake3::Hash, - pub chunk_hashes: Vec, -} -impl_p2p_message!(FudFilePut, "FudFilePut", 0, 0, DEFAULT_METERING_CONFIGURATION); - -/// Message representing a new chunk on the network -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudChunkPut { - pub chunk_hash: blake3::Hash, -} -impl_p2p_message!(FudChunkPut, "FudChunkPut", 0, 0, DEFAULT_METERING_CONFIGURATION); - -/// Message representing a new route for a file on the network -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudFileRoute { - pub file_hash: blake3::Hash, - pub chunk_hashes: Vec, - pub peer: Url, -} -impl_p2p_message!(FudFileRoute, "FudFileRoute", 0, 0, DEFAULT_METERING_CONFIGURATION); - -/// Message representing a new route for a chunk on the network -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudChunkRoute { - pub chunk_hash: blake3::Hash, - pub peer: Url, -} -impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0, 0, DEFAULT_METERING_CONFIGURATION); - -/// Message representing a file request from the network -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudFileRequest { - pub file_hash: blake3::Hash, -} -impl_p2p_message!(FudFileRequest, "FudFileRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); +use crate::dht::{DhtHandler, DhtNode}; /// Message representing a file reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -82,17 +43,18 @@ pub struct FudFileReply { } impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION); -/// Message representing a chunk request from the network +/// Message representing a node announcing a key on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudChunkRequest { - pub chunk_hash: blake3::Hash, +pub struct FudAnnounce { + pub key: blake3::Hash, + pub nodes: Vec, } -impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); +impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a chunk reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkReply { - // TODO: This sould be a chunk-sized array, but then we need padding? + // TODO: This should be a chunk-sized array, but then we need padding? pub chunk: Vec, } impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -107,360 +69,284 @@ impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0, 0, DEFAULT_METERING_CON pub struct FudChunkNotFound; impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION); +/// Message representing a seeders reply when seeders are not found +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudSeedersNotFound; +impl_p2p_message!(FudSeedersNotFound, "FudSeedersNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a ping request on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudPingRequest {} +impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a ping reply on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudPingReply { + pub node: DhtNode, +} +impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a find file/chunk request from the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFindRequest { + pub key: blake3::Hash, +} +impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a find nodes request on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFindNodesRequest { + pub key: blake3::Hash, +} +impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a find nodes reply on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFindNodesReply { + pub nodes: Vec, +} +impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a find seeders request on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFindSeedersRequest { + pub key: blake3::Hash, +} +impl_p2p_message!(FudFindSeedersRequest, "FudFindSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); + +/// Message representing a find seeders reply on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFindSeedersReply { + pub nodes: Vec, +} +impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION); + /// P2P protocol implementation for fud. pub struct ProtocolFud { channel: ChannelPtr, - file_put_sub: MessageSubscription, - chunk_put_sub: MessageSubscription, - file_route_sub: MessageSubscription, - chunk_route_sub: MessageSubscription, - file_request_sub: MessageSubscription, - chunk_request_sub: MessageSubscription, + ping_request_sub: MessageSubscription, + find_request_sub: MessageSubscription, + find_nodes_request_sub: MessageSubscription, + find_seeders_request_sub: MessageSubscription, + announce_sub: MessageSubscription, fud: Arc, - p2p: P2pPtr, jobsman: ProtocolJobsManagerPtr, } impl ProtocolFud { - pub async fn init(fud: Arc, channel: ChannelPtr, p2p: P2pPtr) -> Result { + pub async fn init(fud: Arc, channel: ChannelPtr, _: P2pPtr) -> Result { debug!( target: "fud::proto::ProtocolFud::init()", "Adding ProtocolFud to the protocol registry" ); let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; - let file_put_sub = channel.subscribe_msg::().await?; - let chunk_put_sub = channel.subscribe_msg::().await?; - let file_route_sub = channel.subscribe_msg::().await?; - let chunk_route_sub = channel.subscribe_msg::().await?; - let file_request_sub = channel.subscribe_msg::().await?; - let chunk_request_sub = channel.subscribe_msg::().await?; + let ping_request_sub = channel.subscribe_msg::().await?; + let find_request_sub = channel.subscribe_msg::().await?; + let find_nodes_request_sub = channel.subscribe_msg::().await?; + let find_seeders_request_sub = channel.subscribe_msg::().await?; + let announce_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { channel: channel.clone(), - file_put_sub, - chunk_put_sub, - file_route_sub, - chunk_route_sub, - file_request_sub, - chunk_request_sub, + ping_request_sub, + find_request_sub, + find_nodes_request_sub, + find_seeders_request_sub, + announce_sub, fud, - p2p, jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()), })) } - async fn handle_fud_file_put(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_file_put()", "START"); + async fn handle_fud_ping_request(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START"); loop { - let fud_file = match self.file_put_sub.receive().await { + let _ = match self.ping_request_sub.receive().await { Ok(v) => v, + Err(Error::ChannelStopped) => continue, Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_file_put()", - "recv fail: {}", e, - ); + error!("{}", e); continue } }; + info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING"); - // TODO: This approach is naive and optimistic. Needs to be fixed. - let mut metadata_lock = self.fud.metadata_router.write().await; - let file_route = metadata_lock.get_mut(&fud_file.file_hash); - match file_route { - Some(peers) => { - peers.insert(self.channel.address().clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(self.channel.address().clone()); - metadata_lock.insert(fud_file.file_hash, peers); - } - } - drop(metadata_lock); - - let mut chunks_lock = self.fud.chunks_router.write().await; - for chunk in &fud_file.chunk_hashes { - let chunk_route = chunks_lock.get_mut(chunk); - match chunk_route { - Some(peers) => { - peers.insert(self.channel.address().clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(self.channel.address().clone()); - chunks_lock.insert(*chunk, peers); - } - } - } - drop(chunks_lock); - - // Relay this knowledge of the new route - let route = FudFileRoute { - file_hash: fud_file.file_hash, - chunk_hashes: fud_file.chunk_hashes.clone(), - peer: self.channel.address().clone(), - }; - - self.p2p.broadcast_with_exclude(&route, &[self.channel.address().clone()]).await; - } - } - - async fn handle_fud_chunk_put(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_chunk_put()", "START"); - - loop { - let fud_chunk = match self.chunk_put_sub.receive().await { - Ok(v) => v, - Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_chunk_put()", - "recv fail: {}", e, - ); - continue - } - }; - - // TODO: This approach is naive and optimistic. Needs to be fixed. - let mut chunks_lock = self.fud.chunks_router.write().await; - let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash); - match chunk_route { - Some(peers) => { - peers.insert(self.channel.address().clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(self.channel.address().clone()); - chunks_lock.insert(fud_chunk.chunk_hash, peers); - } - } - drop(chunks_lock); - - // Relay this knowledge of the new route - let route = FudChunkRoute { - chunk_hash: fud_chunk.chunk_hash, - peer: self.channel.address().clone(), - }; - - self.p2p.broadcast_with_exclude(&route, &[self.channel.address().clone()]).await; - } - } - - async fn handle_fud_file_route(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_file_route()", "START"); - - loop { - let fud_file = match self.file_route_sub.receive().await { - Ok(v) => v, - Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_file_route()", - "recv fail: {}", e, - ); - continue - } - }; - - // TODO: This approach is naive and optimistic. Needs to be fixed. - let mut metadata_lock = self.fud.metadata_router.write().await; - let file_route = metadata_lock.get_mut(&fud_file.file_hash); - match file_route { - Some(peers) => { - peers.insert(fud_file.peer.clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(fud_file.peer.clone()); - metadata_lock.insert(fud_file.file_hash, peers); - } - } - - let excluded_peers: Vec = metadata_lock - .get(&fud_file.file_hash) - .unwrap_or(&HashSet::new()) - .iter() - .cloned() - .collect(); - drop(metadata_lock); - - let mut chunks_lock = self.fud.chunks_router.write().await; - for chunk in &fud_file.chunk_hashes { - let chunk_route = chunks_lock.get_mut(chunk); - match chunk_route { - Some(peers) => { - peers.insert(fud_file.peer.clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(fud_file.peer.clone()); - chunks_lock.insert(*chunk, peers); - } - } - } - drop(chunks_lock); - - // Relay this knowledge of the new route - let route = FudFileRoute { - file_hash: fud_file.file_hash, - chunk_hashes: fud_file.chunk_hashes.clone(), - peer: fud_file.peer.clone(), - }; - - self.p2p - .broadcast_with_exclude( - &route, - &[vec![self.channel.address().clone(), fud_file.peer.clone()], excluded_peers] - .concat(), - ) - .await; - } - } - - async fn handle_fud_chunk_route(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_chunk_route()", "START"); - - loop { - let fud_chunk = match self.chunk_route_sub.receive().await { - Ok(v) => v, - Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_chunk_put()", - "recv fail: {}", e, - ); - continue - } - }; - - // TODO: This approach is naive and optimistic. Needs to be fixed. - let mut chunks_lock = self.fud.chunks_router.write().await; - let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash); - match chunk_route { - Some(peers) => { - peers.insert(fud_chunk.peer.clone()); - } - None => { - let mut peers = HashSet::new(); - peers.insert(fud_chunk.peer.clone()); - chunks_lock.insert(fud_chunk.chunk_hash, peers); - } - } - let excluded_peers: Vec = chunks_lock - .get(&fud_chunk.chunk_hash) - .unwrap_or(&HashSet::new()) - .iter() - .cloned() - .collect(); - drop(chunks_lock); - - // Relay this knowledge of the new route - let route = - FudChunkRoute { chunk_hash: fud_chunk.chunk_hash, peer: fud_chunk.peer.clone() }; - - self.p2p - .broadcast_with_exclude( - &route, - &[vec![self.channel.address().clone(), fud_chunk.peer.clone()], excluded_peers] - .concat(), - ) - .await; - } - } - - async fn handle_fud_file_request(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_file_request()", "START"); - - loop { - let file_request = match self.file_request_sub.receive().await { - Ok(v) => v, - Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_file_request()", - "recv fail: {}", e, - ); - continue - } - }; - - let chunked_file = match self.fud.geode.get(&file_request.file_hash).await { - Ok(v) => v, - Err(Error::GeodeNeedsGc) => { - // TODO: Run GC - continue - } - - Err(Error::GeodeFileNotFound) => match self.channel.send(&FudFileNotFound).await { - Ok(()) => continue, - Err(_e) => continue, - }, - - Err(_e) => continue, - }; - - let file_reply = FudFileReply { - chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(), - }; - - match self.channel.send(&file_reply).await { - Ok(()) => continue, - Err(_e) => continue, - } - } - } - - async fn handle_fud_chunk_request(self: Arc) -> Result<()> { - debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START"); - - loop { - let chunk_request = match self.chunk_request_sub.receive().await { - Ok(v) => v, - Err(e) => { - error!( - target: "fud::ProtocolFud::handle_fud_chunk_request()", - "recv fail: {}", e, - ); - continue - } - }; - - let chunk_path = match self.fud.geode.get_chunk(&chunk_request.chunk_hash).await { - Ok(v) => v, - Err(Error::GeodeNeedsGc) => { - // TODO: Run GC - continue - } - - Err(Error::GeodeChunkNotFound) => { - match self.channel.send(&FudChunkNotFound).await { - Ok(()) => continue, - Err(_e) => continue, - } - } - - Err(_e) => continue, - }; - - // The consistency should already be checked in Geode, so we're - // fine not checking and unwrapping here. - let mut buf = vec![0u8; MAX_CHUNK_SIZE]; - let mut chunk_fd = File::open(&chunk_path).await.unwrap(); - let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap(); - let chunk_slice = &buf[..bytes_read]; - - let reply = FudChunkReply { chunk: chunk_slice.to_vec() }; + let reply = FudPingReply { node: self.fud.dht.node.clone() }; match self.channel.send(&reply).await { Ok(()) => continue, Err(_e) => continue, } } } + + async fn handle_fud_find_request(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START"); + + loop { + let request = match self.find_request_sub.receive().await { + Ok(v) => v, + Err(Error::ChannelStopped) => continue, + Err(e) => { + error!("{}", e); + continue + } + }; + info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND"); + + let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; + if let Some(node) = node { + self.fud.update_node(&node).await; + } + + // Chunk + { + let chunk_res = self.fud.geode.get_chunk(&request.key).await; + + // TODO: Run geode GC + + if let Ok(chunk_path) = chunk_res { + let mut buf = vec![0u8; MAX_CHUNK_SIZE]; + let mut chunk_fd = File::open(&chunk_path).await.unwrap(); + let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap(); + let chunk_slice = &buf[..bytes_read]; + let reply = FudChunkReply { chunk: chunk_slice.to_vec() }; + let _ = self.channel.send(&reply).await; + continue; + } + } + + // File + { + let file_res = self.fud.geode.get(&request.key).await; + + // TODO: Run geode GC + + if let Ok(chunked_file) = file_res { + let reply = FudFileReply { + chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(), + }; + let _ = self.channel.send(&reply).await; + continue; + } + } + + // Peers + { + let router = self.fud.seeders_router.read().await; + let peers = router.get(&request.key); + + if let Some(nodes) = peers { + let reply = FudFindNodesReply { nodes: nodes.clone().into_iter().collect() }; + let _ = self.channel.send(&reply).await; + continue; + } + } + + // Nodes + let reply = FudFindNodesReply { + nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().k).await, + }; + let _ = self.channel.send(&reply).await; + } + } + + async fn handle_fud_find_nodes_request(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START"); + + loop { + let request = match self.find_nodes_request_sub.receive().await { + Ok(v) => v, + Err(Error::ChannelStopped) => continue, + Err(e) => { + error!("{}", e); + continue + } + }; + info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", &request.key); + + let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; + if let Some(node) = node { + self.fud.update_node(&node).await; + } + + let reply = FudFindNodesReply { + nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().k).await, + }; + match self.channel.send(&reply).await { + Ok(()) => continue, + Err(_e) => continue, + } + } + } + + async fn handle_fud_find_seeders_request(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START"); + + loop { + let request = match self.find_seeders_request_sub.receive().await { + Ok(v) => v, + Err(Error::ChannelStopped) => continue, + Err(e) => { + error!("{}", e); + continue + } + }; + info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", &request.key); + + let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; + if let Some(node) = node { + self.fud.update_node(&node).await; + } + + let router = self.fud.seeders_router.read().await; + let peers = router.get(&request.key); + + match peers { + Some(nodes) => { + let _ = self + .channel + .send(&FudFindSeedersReply { nodes: nodes.iter().cloned().collect() }) + .await; + } + None => { + let _ = self.channel.send(&FudSeedersNotFound {}).await; + } + }; + } + } + + async fn handle_fud_announce(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START"); + + loop { + let request = match self.announce_sub.receive().await { + Ok(v) => v, + Err(Error::ChannelStopped) => continue, + Err(e) => { + error!("{}", e); + continue + } + }; + info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", &request.key); + + let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; + if let Some(node) = node { + self.fud.update_node(&node).await; + } + + self.fud + .add_to_router(self.fud.seeders_router.clone(), &request.key, request.nodes.clone()) + .await; + } + } } #[async_trait] @@ -468,12 +354,17 @@ impl ProtocolBase for ProtocolFud { async fn start(self: Arc, executor: Arc>) -> Result<()> { debug!(target: "fud::ProtocolFud::start()", "START"); self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_fud_file_put(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_fud_chunk_put(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_fud_file_route(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_fud_chunk_route(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_fud_file_request(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await; + self.jobsman + .clone() + .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone()) + .await; + self.jobsman + .clone() + .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone()) + .await; + self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await; debug!(target: "fud::ProtocolFud::start()", "END"); Ok(()) } diff --git a/src/geode/mod.rs b/src/geode/mod.rs index 6c6f9e11e..420170e70 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -78,6 +78,8 @@ pub const MAX_CHUNK_SIZE: usize = 262_144; const FILES_PATH: &str = "files"; /// Path prefix where file chunks are stored const CHUNKS_PATH: &str = "chunks"; +/// Path prefix where full files are stored +const DOWNLOADS_PATH: &str = "downloads"; /// `ChunkedFile` is a representation of a file we're trying to /// retrieve from `Geode`. @@ -111,8 +113,13 @@ pub struct Geode { files_path: PathBuf, /// Path to the filesystem directory where file chunks are stored chunks_path: PathBuf, + /// Path to the filesystem directory where full files are stored + downloads_path: PathBuf, } +/// smol::fs::File::read does not guarantee that the buffer will be filled, even if the buffer is +/// smaller than the file. This is a workaround. +/// This reads the stream until the buffer is full or until we reached the end of the stream. pub async fn read_until_filled( mut stream: impl AsyncRead + Unpin, buffer: &mut [u8], @@ -137,14 +144,17 @@ impl Geode { pub async fn new(base_path: &PathBuf) -> Result { let mut files_path: PathBuf = base_path.into(); let mut chunks_path: PathBuf = base_path.into(); + let mut downloads_path: PathBuf = base_path.into(); files_path.push(FILES_PATH); chunks_path.push(CHUNKS_PATH); + downloads_path.push(DOWNLOADS_PATH); // Create necessary directory structure if needed fs::create_dir_all(&files_path).await?; fs::create_dir_all(&chunks_path).await?; + fs::create_dir_all(&downloads_path).await?; - Ok(Self { files_path, chunks_path }) + Ok(Self { files_path, chunks_path, downloads_path }) } /// Attempt to read chunk hashes from a given file path and return @@ -477,4 +487,26 @@ impl Geode { Ok(chunk_path) } + + /// Assemble chunks to create a file. + /// This method does NOT perform a consistency check. + pub async fn assemble_file(&self, file_hash: &blake3::Hash, chunked_file: &ChunkedFile, file_name: Option) -> Result { + info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash); + + let mut file_path = self.downloads_path.clone(); + file_path.push(file_hash.to_hex().as_str()); + fs::create_dir_all(&file_path).await?; + file_path.push(file_name.unwrap_or(file_hash.to_hex().to_string())); + + let mut file_fd = File::create(&file_path).await?; + for (_, chunk_path) in chunked_file.iter() { + let mut buf = vec![]; + let mut chunk_fd = File::open(chunk_path.clone().unwrap()).await?; + let bytes_read = chunk_fd.read_to_end(&mut buf).await?; + let chunk_slice = &buf[..bytes_read]; + file_fd.write(chunk_slice).await?; + } + + Ok(file_path) + } }