fud: add dht lookups & improvements

This commit is contained in:
darkfi
2025-03-09 21:56:00 +01:00
committed by epiphany1
parent e219954da2
commit 058faffa75
7 changed files with 1304 additions and 677 deletions

3
Cargo.lock generated
View File

@@ -3403,7 +3403,10 @@ dependencies = [
"darkfi",
"darkfi-serial",
"easy-parallel",
"futures",
"log",
"num-bigint",
"rand 0.8.5",
"serde",
"signal-hook",
"signal-hook-async-std",

View File

@@ -19,7 +19,7 @@
use clap::{Parser, Subcommand};
use log::info;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use url::Url;
use darkfi::{
@@ -49,8 +49,10 @@ struct Args {
enum Subcmd {
/// Retrieve provided file name from the fud network
Get {
/// File name
/// File hash
file: String,
/// File name
name: Option<String>,
},
/// Put a file onto the fud network
@@ -58,6 +60,12 @@ enum Subcmd {
/// File name
file: String,
},
/// Get the current node buckets
ListBuckets {},
/// Get the router state
ListSeeders {},
}
struct Fu {
@@ -69,61 +77,11 @@ impl Fu {
self.rpc_client.stop().await;
}
// async fn list(&self) -> Result<()> {
// let req = JsonRequest::new("list", JsonValue::Array(vec![]));
// let rep = self.rpc_client.request(req).await?;
//
// // Extract response
// let content: Vec<JsonValue> = rep[0].clone().try_into().unwrap();
// let new: Vec<JsonValue> = rep[1].clone().try_into().unwrap();
// let deleted: Vec<JsonValue> = rep[2].clone().try_into().unwrap();
//
// // Print info
// info!("----------Content-------------");
// if content.is_empty() {
// info!("No file records exists in DHT.");
// } else {
// for name in content {
// info!("\t{}", String::try_from(name).unwrap());
// }
// }
// info!("------------------------------");
//
// info!("----------New files-----------");
// if new.is_empty() {
// info!("No new files to import.");
// } else {
// for name in new {
// info!("\t{}", String::try_from(name).unwrap());
// }
// }
// info!("------------------------------");
//
// info!("----------Removed keys--------");
// if deleted.is_empty() {
// info!("No keys were removed.");
// } else {
// for key in deleted {
// info!("\t{}", String::try_from(key).unwrap());
// }
// }
// info!("------------------------------");
//
// Ok(())
// }
//
// async fn sync(&self) -> Result<()> {
// let req = JsonRequest::new("sync", JsonValue::Array(vec![]));
// self.rpc_client.request(req).await?;
// info!("Daemon synced successfully!");
// Ok(())
// }
async fn get(&self, file: String) -> Result<()> {
let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file)]));
async fn get(&self, file_hash: String, file_name: Option<String>) -> Result<()> {
let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file_hash), JsonValue::String(file_name.unwrap_or_default())]));
let rep = self.rpc_client.request(req).await?;
let path = rep.stringify().unwrap();
info!("File waits you at: {}", path);
let path: String = rep.try_into().unwrap();
println!("{}", path);
Ok(())
}
@@ -138,6 +96,55 @@ impl Fu {
_ => Err(Error::ParseFailed("File ID is not a string")),
}
}
async fn list_buckets(&self) -> Result<()> {
let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?;
let buckets: Vec<JsonValue> = rep.try_into().unwrap();
for (bucket_i, bucket) in buckets.into_iter().enumerate() {
let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
if nodes.len() == 0 {
continue
}
println!("Bucket {}", bucket_i);
for n in nodes.clone() {
let node: Vec<JsonValue> = n.try_into().unwrap();
let node_id: JsonValue = node[0].clone();
let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
let mut addrs: Vec<String> = vec![];
for addr in addresses {
addrs.push(addr.try_into().unwrap())
}
println!("\t{}: {}", node_id.stringify().unwrap(), addrs.join(", "));
}
}
Ok(())
}
async fn list_seeders(&self) -> Result<()> {
let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?;
let files: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
println!("Seeders:");
if files.is_empty() {
println!("No records");
} else {
for (file_hash, node_ids) in files {
println!("{}", file_hash);
let node_ids: Vec<JsonValue> = node_ids.try_into().unwrap();
for node_id in node_ids {
let node_id: String = node_id.try_into().unwrap();
println!("\t{}", node_id);
}
}
}
Ok(())
}
}
fn main() -> Result<()> {
@@ -156,8 +163,10 @@ fn main() -> Result<()> {
match args.command {
// Subcmd::List => fu.list().await,
// Subcmd::Sync => fu.sync().await,
Subcmd::Get { file } => fu.get(file).await,
Subcmd::Get { file, name } => fu.get(file, name).await,
Subcmd::Put { file } => fu.put(file).await,
Subcmd::ListBuckets { } => fu.list_buckets().await,
Subcmd::ListSeeders { } => fu.list_seeders().await,
}?;
fu.close_connection().await;

View File

@@ -15,12 +15,15 @@ darkfi-serial = {version = "0.4.2", features = ["hash"]}
# Misc
async-trait = "0.1.86"
blake3 = "1.6.0"
rand = "0.8.5"
log = "0.4.26"
tinyjson = "2.5.1"
url = "2.5.4"
num-bigint = "0.4.6"
# Daemon
easy-parallel = "3.3.1"
futures = "0.3.31"
signal-hook-async-std = "0.2.2"
signal-hook = "0.3.17"
simplelog = "0.12.2"

479
bin/fud/fud/src/dht.rs Normal file
View File

@@ -0,0 +1,479 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use async_trait::async_trait;
use darkfi::{
net::{connector::Connector, session::Session, ChannelPtr, Message, P2pPtr},
system::{sleep, ExecutorPtr},
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use futures::future::join_all;
use log::{debug, error};
use num_bigint::BigUint;
use smol::lock::RwLock;
use url::Url;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, PartialEq, Eq, Hash)]
pub struct DhtNode {
pub id: blake3::Hash,
pub addresses: Vec<Url>,
}
pub struct DhtBucket {
pub nodes: Vec<DhtNode>,
}
/// "Router" means: Key -> Set of nodes
pub type DhtRouter = Arc<RwLock<HashMap<blake3::Hash, HashSet<DhtNode>>>>;
// TODO: Add a DhtSettings
pub struct Dht {
pub p2p: P2pPtr,
pub executor: ExecutorPtr,
pub bootstrapped: Arc<RwLock<bool>>,
/// Vec of buckets
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
/// Number of parallel lookup requests
pub alpha: usize,
/// Number of nodes in a bucket
pub k: usize,
/// Number of buckets
pub n_buckets: usize,
/// Channel ID -> Node ID
pub node_cache: Arc<RwLock<HashMap<u32, DhtNode>>>,
/// Node ID -> Channel ID
pub channel_cache: Arc<RwLock<HashMap<blake3::Hash, u32>>>,
/// Node ID -> Set of keys
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
/// Our own node
pub node: DhtNode,
/// Seconds
pub timeout: u64,
}
impl Dht {
pub async fn new(
node_id: &blake3::Hash,
a: usize,
k: usize,
timeout: u64,
p2p: P2pPtr,
ex: ExecutorPtr,
) -> Self {
// Create empty buckets
let mut buckets = vec![];
for _ in 0..256 {
buckets.push(DhtBucket { nodes: vec![] })
}
Self {
p2p: p2p.clone(),
buckets: Arc::new(RwLock::new(buckets)),
bootstrapped: Arc::new(RwLock::new(false)),
alpha: a,
k,
n_buckets: 256,
node_cache: Arc::new(RwLock::new(HashMap::new())),
channel_cache: Arc::new(RwLock::new(HashMap::new())),
router_cache: Arc::new(RwLock::new(HashMap::new())),
executor: ex,
node: DhtNode { id: *node_id, addresses: p2p.clone().hosts().external_addrs().await },
timeout,
}
}
pub async fn is_bootstrapped(&self) -> bool {
let bootstrapped = self.bootstrapped.read().await;
return *bootstrapped;
}
pub async fn set_bootstrapped(&self) {
let mut bootstrapped = self.bootstrapped.write().await;
*bootstrapped = true;
}
// Get the distance between `key_1` and `key_2`
pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
let bytes1 = key_1.as_bytes();
let bytes2 = key_2.as_bytes();
let mut result_bytes = [0u8; 32];
for i in 0..32 {
result_bytes[i] = bytes1[i] ^ bytes2[i];
}
result_bytes
}
// Sort `nodes`
pub fn sort_by_distance(&self, nodes: &mut Vec<DhtNode>, key: &blake3::Hash) {
nodes.sort_by(|a, b| {
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
distance_a.cmp(&distance_b)
});
}
// key -> bucket index
pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
if key == &self.node.id {
return 0
}
let distance = self.distance(&self.node.id, key);
let mut leading_zeros = 0;
for &byte in &distance {
if byte == 0 {
leading_zeros += 8;
} else {
leading_zeros += byte.leading_zeros() as usize;
break;
}
}
let bucket_index = self.n_buckets - leading_zeros;
std::cmp::min(bucket_index, self.n_buckets - 1)
}
// Get `n` closest known nodes to a key
// TODO: Can be optimized
pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec<DhtNode> {
let buckets_lock = self.buckets.clone();
let buckets = buckets_lock.read().await;
let mut neighbors = Vec::new();
for i in 0..self.n_buckets {
if let Some(bucket) = buckets.get(i) {
neighbors.extend(bucket.nodes.iter().cloned());
}
}
self.sort_by_distance(&mut neighbors, key);
neighbors.truncate(n);
neighbors
}
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<DhtNode> {
let node_cache_lock = self.node_cache.clone();
let node_cache = node_cache_lock.read().await;
node_cache.get(&channel_id).cloned()
}
}
#[async_trait]
pub trait DhtHandler {
fn dht(&self) -> Arc<Dht>;
// Send a DHT ping request
async fn ping(&self, channel: ChannelPtr) -> Result<DhtNode>;
// Triggered when we find a new node
async fn on_new_node(&self, node: &DhtNode) -> Result<()>;
// Send FIND NODES request to a peer to get nodes close to `key`
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>>;
// Announce message `m` for a key, and add ourselves to router
async fn announce<M: Message>(
&self,
key: &blake3::Hash,
message: &M,
router: DhtRouter,
) -> Result<()> {
self.add_to_router(router.clone(), key, vec![self.dht().node.clone()]).await;
let nodes = self.lookup_nodes(key).await?;
for node in nodes {
let channel = self.get_channel(&node).await;
if let Ok(ch) = channel {
let _ = ch.send(message).await;
}
}
Ok(())
}
// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
// Then fill the channel cache and the buckets
async fn channel_task<M: Message>(&self) -> Result<()> {
loop {
let channel_sub = self.dht().p2p.hosts().subscribe_channel().await;
let res = channel_sub.receive().await;
if res.is_err() {
continue;
}
let channel = res.unwrap();
let channel_cache_lock = self.dht().channel_cache.clone();
let mut channel_cache = channel_cache_lock.write().await;
if !channel.is_stopped() && !channel_cache.values().any(|&v| v == channel.info.id) {
let node = self.ping(channel.clone()).await;
if let Ok(n) = node {
channel_cache.insert(n.id, channel.info.id);
drop(channel_cache);
let node_cache_lock = self.dht().node_cache.clone();
let mut node_cache = node_cache_lock.write().await;
node_cache.insert(channel.info.id, n.clone());
drop(node_cache);
self.add_node(n.clone()).await;
let _ = self.on_new_node(&n.clone()).await;
}
}
}
}
// Remove disconnected nodes from the channel cache
async fn disconnect_task(&self) -> Result<()> {
loop {
sleep(15).await;
let channel_cache_lock = self.dht().channel_cache.clone();
let mut channel_cache = channel_cache_lock.write().await;
for (node_id, channel_id) in channel_cache.clone() {
let channel = self.dht().p2p.get_channel(channel_id);
if channel.is_none() || (channel.is_some() && channel.unwrap().is_stopped()) {
channel_cache.remove(&node_id);
}
}
}
}
// Add a node in the correct bucket
async fn add_node(&self, node: DhtNode) {
if node.id == self.dht().node.id {
return;
}
let bucket_index = self.dht().get_bucket_index(&node.id).await;
let buckets_lock = self.dht().buckets.clone();
let mut buckets = buckets_lock.write().await;
let bucket = &mut buckets[bucket_index];
// Node is already in the bucket
if bucket.nodes.iter().any(|n| n.id == node.id) {
return;
}
// Bucket is full
if bucket.nodes.len() >= self.dht().k {
// Ping the least recently seen node
let channel = self.get_channel(&bucket.nodes[0]).await;
if channel.is_ok() {
let ping_res = self.ping(channel.unwrap()).await;
if ping_res.is_ok() {
// Ping was successful, move the least recently seen node to the tail
let n = bucket.nodes.remove(0);
bucket.nodes.push(n);
return;
}
}
// Ping was not successful, remove the least recently seen node and add the new node
bucket.nodes.remove(0);
bucket.nodes.push(node);
return;
}
// Bucket is not full
bucket.nodes.push(node);
}
/// Move a node to the tail in its bucket,
/// to show that it is the most recently seen in the bucket.
/// If the node is not in a bucket it will be added using `add_node`
async fn update_node(&self, node: &DhtNode) {
let bucket_index = self.dht().get_bucket_index(&node.id).await;
let buckets_lock = self.dht().buckets.clone();
let mut buckets = buckets_lock.write().await;
let bucket = &mut buckets[bucket_index];
let node_index = bucket.nodes.iter().position(|n| n.id == node.id);
if node_index.is_none() {
self.add_node(node.clone()).await;
return;
}
let n = bucket.nodes.remove(node_index.unwrap());
bucket.nodes.push(n);
}
// Find nodes closest to a key
async fn lookup_nodes(&self, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
debug!(target: "dht::DhtHandler::lookup_nodes()", "Starting node lookup for key {}", key);
let k = self.dht().k;
let a = self.dht().alpha;
let mut visited_nodes = HashSet::new();
let mut nodes_to_visit = self.dht().find_neighbors(key, k).await;
let mut nearest_nodes: Vec<DhtNode> = vec![];
while !nodes_to_visit.is_empty() {
let mut queries: Vec<DhtNode> = Vec::with_capacity(a);
// Get `alpha` nodes from `nodes_to_visit` which is sorted by distance
for _ in 0..a {
match nodes_to_visit.pop() {
Some(node) => {
queries.push(node);
}
None => {
break;
}
}
}
let mut tasks = Vec::with_capacity(queries.len());
for node in &queries {
// Avoid visiting the same node multiple times
if !visited_nodes.insert(node.id) {
continue;
}
// Query the node for the value associated with the key
tasks.push(self.fetch_nodes(node, key));
}
let results = join_all(tasks).await;
for (i, value_result) in results.into_iter().enumerate() {
match value_result {
Ok(mut nodes) => {
// Remove ourselves from the new nodes
nodes.retain(|node| node.id != self.dht().node.id);
// Add each new node to our buckets
for node in nodes.clone() {
self.add_node(node).await;
}
// Add nodes to the list of nodes to visit
nodes_to_visit.extend(nodes);
self.dht().sort_by_distance(&mut nodes_to_visit, key);
// Update nearest_nodes
nearest_nodes.push(queries[i].clone());
self.dht().sort_by_distance(&mut nearest_nodes, key);
}
Err(e) => {
error!(target: "dht::DhtHandler::lookup_nodes", "{}", e);
}
}
}
// Early termination check
// Stops if our furthest visited node is closer than the closest node we will query
if let Some(furthest) = nearest_nodes.last() {
if let Some(next_node) = nodes_to_visit.first() {
let furthest_dist =
BigUint::from_bytes_be(&self.dht().distance(key, &furthest.id));
let next_dist =
BigUint::from_bytes_be(&self.dht().distance(key, &next_node.id));
if furthest_dist < next_dist {
break;
}
}
}
}
nearest_nodes.truncate(k);
return Ok(nearest_nodes)
}
// Get an existing channel, or create a new one
async fn get_channel(&self, node: &DhtNode) -> Result<ChannelPtr> {
let channel_cache_lock = self.dht().channel_cache.clone();
let channel_cache = channel_cache_lock.read().await;
if let Some(channel_id) = channel_cache.get(&node.id) {
if let Some(channel) = self.dht().p2p.get_channel(*channel_id) {
if channel.is_stopped() {
channel.clone().start(self.dht().executor.clone());
}
return Ok(channel);
}
}
// Create a channel
for addr in node.addresses.clone() {
let session_out = self.dht().p2p.session_outbound();
let session_weak = Arc::downgrade(&self.dht().p2p.session_outbound());
let connector = Connector::new(self.dht().p2p.settings(), session_weak);
let connect_res = connector.connect(&addr).await;
if connect_res.is_err() {
continue;
}
let (_, channel) = connect_res.unwrap();
let register_res =
session_out.register_channel(channel.clone(), self.dht().executor.clone()).await;
if register_res.is_err() {
channel.clone().stop().await;
continue;
}
return Ok(channel)
}
Err(Error::Custom("Could not create channel".to_string()))
}
// Add nodes as a provider for a key
async fn add_to_router(&self, router: DhtRouter, key: &blake3::Hash, nodes: Vec<DhtNode>) {
debug!(target: "dht::DhtHandler::add_to_router()", "Inserting {} nodes to key {}", nodes.len(), key);
let mut router_write = router.write().await;
let key_r = router_write.get_mut(key);
let router_cache_lock = self.dht().router_cache.clone();
let mut router_cache = router_cache_lock.write().await;
// Add to router
if let Some(k) = key_r {
k.extend(nodes.clone());
} else {
let mut hs = HashSet::new();
hs.extend(nodes.clone());
router_write.insert(*key, hs);
}
// Add to router_cache
for node in nodes {
let keys = router_cache.get_mut(&node.id);
if let Some(k) = keys {
k.insert(*key);
} else {
let mut keys = HashSet::new();
keys.insert(*key);
router_cache.insert(node.id, keys);
}
}
}
}

View File

@@ -18,32 +18,36 @@
use std::{
collections::{HashMap, HashSet},
io::ErrorKind,
path::PathBuf,
sync::Arc,
};
use num_bigint::BigUint;
use async_trait::async_trait;
use dht::{Dht, DhtHandler, DhtNode, DhtRouter};
use futures::{
future::{try_select, Either, FutureExt},
pin_mut,
};
use log::{debug, error, info, warn};
use rand::{rngs::OsRng, RngCore};
use smol::{
channel,
fs::File,
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
lock::{Mutex, MutexGuard, RwLock},
stream::StreamExt,
Executor,
};
use structopt_toml::{structopt::StructOpt, StructOptToml};
use tinyjson::JsonValue;
use url::Url;
use darkfi::{
async_daemonize, cli_desc,
geode::Geode,
net::{
connector::Connector,
protocol::ProtocolVersion,
session::{Session, SESSION_DEFAULT},
settings::SettingsOpt,
P2p, P2pPtr,
},
net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr},
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
p2p_method::HandlerP2p,
@@ -58,13 +62,18 @@ use darkfi::{
/// P2P protocols
mod proto;
use proto::{
FudChunkPut, FudChunkReply, FudChunkRequest, FudFilePut, FudFileReply, FudFileRequest,
FudAnnounce, FudChunkReply, FudFileReply, FudFindNodesReply, FudFindNodesRequest,
FudFindRequest, FudFindSeedersReply, FudFindSeedersRequest, FudPingReply, FudPingRequest,
ProtocolFud,
};
mod dht;
const CONFIG_FILE: &str = "fud_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml");
const NODE_ID_PATH: &str = "node_id";
#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "fud", about = cli_desc!())]
@@ -95,15 +104,18 @@ struct Args {
}
pub struct Fud {
/// Routing table for file metadata
metadata_router: Arc<RwLock<HashMap<blake3::Hash, HashSet<Url>>>>,
/// Routing table for file chunks
chunks_router: Arc<RwLock<HashMap<blake3::Hash, HashSet<Url>>>>,
/// Key -> Seeders
seeders_router: DhtRouter,
/// Pointer to the P2P network instance
p2p: P2pPtr,
/// The Geode instance
geode: Geode,
/// The DHT instance
dht: Arc<Dht>,
file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>,
file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
@@ -131,6 +143,8 @@ impl RequestHandler<()> for Fud {
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
"list_buckets" => self.list_buckets(req.id, req.params).await,
"list_seeders" => self.list_seeders(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
@@ -177,24 +191,42 @@ impl Fud {
}
};
let fud_file = FudFilePut { file_hash, chunk_hashes };
self.p2p.broadcast(&fud_file).await;
// Announce file
let self_node = self.dht.node.clone();
let fud_announce = FudAnnounce { key: file_hash, nodes: vec![self_node.clone()] };
let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await;
// Announce chunks
for chunk_hash in chunk_hashes {
let fud_announce = FudAnnounce { key: chunk_hash, nodes: vec![self_node.clone()] };
let _ = self.announce(&chunk_hash, &fud_announce, self.seeders_router.clone()).await;
}
JsonResponse::new(JsonValue::String(file_hash.to_hex().to_string()), id).into()
}
// RPCAPI:
// Fetch a file from the network. Takes a file hash as parameter.
// Returns the paths to the local chunks of the file, if found/fetched.
// Returns the path to the assembled file, if found/fetched.
//
// --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd"], "id": 42}
// <-- {"jsonrpc": "2.0", "result: ["~/.local/share/darkfi/fud/chunks/fab1...2314", ...], "id": 42}
// <-- {"jsonrpc": "2.0", "result: "~/.local/share/darkfi/fud/downloads/fab1...2314", "id": 42}
async fn get(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_string() {
if params.len() != 2 || !params[0].is_string() || !params[1].is_string() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let file_name: Option<String> = match params[1].get::<String>() {
Some(name) => match name.is_empty() {
true => None,
false => Some(name.clone()),
},
None => None,
};
let self_node = self.dht.node.clone();
let file_hash = match blake3::Hash::from_hex(params[0].get::<String>().unwrap()) {
Ok(v) => v,
Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
@@ -209,18 +241,7 @@ impl Fud {
info!("Waiting for background file fetch task...");
let (i_file_hash, status) = self.file_fetch_end_rx.recv().await.unwrap();
match status {
Ok(()) => {
let ch_file = self.geode.get(&file_hash).await.unwrap();
let m = FudFilePut {
file_hash: i_file_hash,
chunk_hashes: ch_file.iter().map(|(h, _)| *h).collect(),
};
self.p2p.broadcast(&m).await;
ch_file
}
Ok(()) => self.geode.get(&i_file_hash).await.unwrap(),
Err(Error::GeodeFileRouteNotFound) => {
// TODO: Return FileNotFound error
@@ -235,16 +256,17 @@ impl Fud {
};
if chunked_file.is_complete() {
let chunks: Vec<JsonValue> = chunked_file
.iter()
.map(|(_, path)| {
JsonValue::String(
path.as_ref().unwrap().clone().into_os_string().into_string().unwrap(),
)
})
.collect();
let fud_announce = FudAnnounce { key: file_hash, nodes: vec![self_node.clone()] };
let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await;
return JsonResponse::new(JsonValue::Array(chunks), id).into()
return match self.geode.assemble_file(&file_hash, &chunked_file, file_name).await {
Ok(file_path) => JsonResponse::new(
JsonValue::String(file_path.to_string_lossy().to_string()),
id,
)
.into(),
Err(_) => JsonError::new(ErrorCode::InternalError, None, id).into(),
}
}
// Fetch any missing chunks
@@ -261,8 +283,11 @@ impl Fud {
match status {
Ok(()) => {
let m = FudChunkPut { chunk_hash: i_chunk_hash };
self.p2p.broadcast(&m).await;
let fud_announce =
FudAnnounce { key: i_chunk_hash, nodes: vec![self_node.clone()] };
let _ = self
.announce(&i_chunk_hash, &fud_announce, self.seeders_router.clone())
.await;
}
Err(Error::GeodeChunkRouteNotFound) => continue,
@@ -277,19 +302,16 @@ impl Fud {
if !chunked_file.is_complete() {
todo!();
// Return JsonError missing chunks
// TODO: Return JsonError missing chunks
}
let chunks: Vec<JsonValue> = chunked_file
.iter()
.map(|(_, path)| {
JsonValue::String(
path.as_ref().unwrap().clone().into_os_string().into_string().unwrap(),
)
})
.collect();
JsonResponse::new(JsonValue::Array(chunks), id).into()
return match self.geode.assemble_file(&file_hash, &chunked_file, file_name).await {
Ok(file_path) => {
JsonResponse::new(JsonValue::String(file_path.to_string_lossy().to_string()), id)
.into()
}
Err(_) => JsonError::new(ErrorCode::InternalError, None, id).into(),
}
}
// RPCAPI:
@@ -331,6 +353,59 @@ impl Fud {
self.dnet_sub.clone().into()
}
// RPCAPI:
// Returns the current buckets
//
// --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1}
pub async fn list_buckets(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let mut buckets = vec![];
for bucket in self.dht.buckets.read().await.iter() {
let mut nodes = vec![];
for node in bucket.nodes.clone() {
let mut addresses = vec![];
for addr in node.addresses {
addresses.push(JsonValue::String(addr.to_string()));
}
nodes.push(JsonValue::Array(vec![
JsonValue::String(node.id.to_hex().to_string()),
JsonValue::Array(addresses),
]));
}
buckets.push(JsonValue::Array(nodes));
}
JsonResponse::new(JsonValue::Array(buckets), id).into()
}
// RPCAPI:
// Returns the content of the seeders router
//
// --> {"jsonrpc": "2.0", "method": "list_routes", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdef": ["ghijkl"]}}, "id": 1}
pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let mut seeders_router: HashMap<String, JsonValue> = HashMap::new();
for (hash, nodes) in self.seeders_router.read().await.iter() {
let mut node_ids = vec![];
for node in nodes {
node_ids.push(JsonValue::String(node.id.to_hex().to_string()));
}
seeders_router.insert(hash.to_hex().to_string(), JsonValue::Array(node_ids));
}
let mut res: HashMap<String, JsonValue> = HashMap::new();
res.insert("seeders".to_string(), JsonValue::Object(seeders_router));
JsonResponse::new(JsonValue::Object(res), id).into()
}
}
impl HandlerP2p for Fud {
@@ -339,229 +414,293 @@ impl HandlerP2p for Fud {
}
}
/// Background task that receives file fetch requests and tries to
/// fetch objects from the network using the routing table.
/// TODO: This can be optimised a lot for connection reuse, etc.
async fn fetch_file_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<()> {
info!("Started background file fetch task");
loop {
let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap();
info!("fetch_file_task: Received {}", file_hash);
enum FetchReply {
File(FudFileReply),
Chunk(FudChunkReply),
}
let mut metadata_router = fud.metadata_router.write().await;
let peers = metadata_router.get_mut(&file_hash);
/// Fetch a file or chunk from the network
/// 1. Lookup nodes close to the key
/// 2. Request seeders for the file/chunk from those nodes
/// 3. Request the file/chunk from the seeders
async fn fetch(fud: Arc<Fud>, key: blake3::Hash) -> Option<FetchReply> {
let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
let closest_nodes = fud.lookup_nodes(&key).await; // 1
let mut result: Option<FetchReply> = None;
if closest_nodes.is_err() {
return None
}
if peers.is_none() {
warn!("File {} not in routing table, cannot fetch", file_hash);
fud.file_fetch_end_tx
.send((file_hash, Err(Error::GeodeFileRouteNotFound)))
.await
.unwrap();
continue
}
for node in closest_nodes.unwrap() {
// 2. Request list of seeders
let channel = match fud.get_channel(&node).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch()", "Could not get a channel for node {}: {}", node.id, e);
continue;
}
};
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
let mut found = false;
let peers = peers.unwrap();
let mut invalid_file_routes = vec![];
let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
Ok(msg_subscriber) => msg_subscriber,
Err(e) => {
warn!(target: "fud::fetch()", "Error subscribing to msg: {}", e);
continue;
}
};
for peer in peers.iter() {
let session_out = fud.p2p.session_outbound();
let session_weak = Arc::downgrade(&fud.p2p.session_outbound());
let _ = channel.send(&FudFindSeedersRequest { key }).await;
info!("Connecting to {} to fetch {}", peer, file_hash);
let connector = Connector::new(fud.p2p.settings(), session_weak);
match connector.connect(peer).await {
Ok((url, channel)) => {
let proto_ver =
ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await;
let reply = match msg_subscriber.receive_with_timeout(fud.dht().timeout).await {
Ok(reply) => reply,
Err(e) => {
warn!(target: "fud::fetch()", "Error waiting for reply: {}", e);
continue;
}
};
let handshake_task = session_out.perform_handshake_protocols(
proto_ver,
channel.clone(),
executor.clone(),
);
let mut seeders = reply.nodes.clone();
info!(target: "fud::fetch()", "Found seeders for {}: {:?}", key, seeders);
channel.clone().start(executor.clone());
msg_subscriber.unsubscribe().await;
if let Err(e) = handshake_task.await {
error!("Handshake with {} failed: {}", url, e);
// Delete peer from router
invalid_file_routes.push(peer.clone());
continue
// 3. Request the file/chunk from the seeders
while let Some(seeder) = seeders.pop() {
// Only query a seeder once
if queried_seeders.iter().any(|s| *s == seeder.id) {
continue;
}
queried_seeders.insert(seeder.id);
if let Ok(channel) = fud.get_channel(&seeder).await {
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudChunkReply>().await;
msg_subsystem.add_dispatch::<FudFileReply>().await;
let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
let _ = channel.send(&FudFindRequest { key }).await;
let chunk_recv =
msg_subscriber_chunk.receive_with_timeout(fud.dht().timeout).fuse();
let file_recv = msg_subscriber_file.receive_with_timeout(fud.dht().timeout).fuse();
pin_mut!(chunk_recv, file_recv);
// Wait for a FudChunkReply or a FudFileReply
match try_select(chunk_recv, file_recv).await {
Ok(Either::Left((chunk_reply, _))) => {
info!(target: "fud::fetch()", "Received chunk {} from seeder {:?}", key, seeder.id);
msg_subscriber.unsubscribe().await;
result = Some(FetchReply::Chunk((*chunk_reply).clone()));
break;
}
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFileReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudFileReply>().await.unwrap();
let request = FudFileRequest { file_hash };
if let Err(e) = channel.send(&request).await {
error!("Failed sending FudFileRequest({}) to {}: {}", file_hash, url, e);
continue
Ok(Either::Right((file_reply, _))) => {
info!(target: "fud::fetch()", "Received file {} from seeder {:?}", key, seeder.id);
msg_subscriber.unsubscribe().await;
result = Some(FetchReply::File((*file_reply).clone()));
break;
}
// TODO: With timeout!
let reply = match msg_subscriber.receive().await {
Ok(v) => v,
Err(e) => {
error!("Error receiving FudFileReply from subscriber: {}", e);
continue
}
};
msg_subscriber.unsubscribe().await;
channel.stop().await;
if let Err(e) = fud.geode.insert_file(&file_hash, &reply.chunk_hashes).await {
error!("Failed inserting file {} to Geode: {}", file_hash, e);
continue
Err(e) => {
match e {
Either::Left((chunk_err, _)) => {
warn!(target: "fud::fetch()", "Error waiting for chunk reply: {}", chunk_err);
}
Either::Right((file_err, _)) => {
warn!(target: "fud::fetch()", "Error waiting for file reply: {}", file_err);
}
};
msg_subscriber.unsubscribe().await;
continue;
}
found = true;
break
}
Err(e) => {
error!("Failed to connect to {}: {}", peer, e);
continue
}
};
}
}
for peer in invalid_file_routes {
debug!("Removing peer {} from {} file router", peer, file_hash);
peers.remove(&peer);
if result.is_some() {
break;
}
}
if !found {
warn!("Did not manage to fetch {} file metadata", file_hash);
fud.file_fetch_end_tx
.send((file_hash, Err(Error::GeodeFileRouteNotFound)))
.await
.unwrap();
continue
}
result
}
info!("Successfully fetched {} file metadata", file_hash);
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
/// Background task that receives file fetch requests and tries to
/// fetch objects from the network using the routing table.
/// TODO: This can be optimised a lot for connection reuse, etc.
async fn fetch_file_task(fud: Arc<Fud>, _: Arc<Executor<'_>>) -> Result<()> {
info!(target: "fud::fetch_file_task()", "Started background file fetch task");
loop {
let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap();
info!(target: "fud::fetch_file_task()", "Fetching file {}", file_hash);
let result = fetch(fud.clone(), file_hash).await;
match result {
Some(reply) => {
match reply {
FetchReply::File(FudFileReply { chunk_hashes }) => {
if let Err(e) = fud.geode.insert_file(&file_hash, &chunk_hashes).await {
error!("Failed inserting file {} to Geode: {}", file_hash, e);
}
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
}
// Looked for a file but got a chunk, meaning that file_hash = chunk_hash, the file fits in a single chunk
FetchReply::Chunk(FudChunkReply { chunk }) => {
// TODO: Verify chunk
info!(target: "fud::fetch()", "File fits in a single chunk");
let _ = fud.geode.insert_file(&file_hash, &[file_hash]).await;
match fud.geode.insert_chunk(&chunk).await {
Ok(inserted_hash) => {
if inserted_hash != file_hash {
warn!("Received chunk does not match requested file");
}
}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", file_hash, e);
}
};
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
}
}
}
None => {
fud.file_fetch_end_tx
.send((file_hash, Err(Error::GeodeFileRouteNotFound)))
.await
.unwrap();
}
};
}
}
/// Background task that receives chunk fetch requests and tries to
/// fetch objects from the network using the routing table.
/// TODO: This can be optimised a lot for connection reuse, etc.
async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<()> {
info!("Started background chunk fetch task");
async fn fetch_chunk_task(fud: Arc<Fud>, _: Arc<Executor<'_>>) -> Result<()> {
info!(target: "fud::fetch_chunk_task()", "Started background chunk fetch task");
loop {
let (chunk_hash, _) = fud.chunk_fetch_rx.recv().await.unwrap();
info!("fetch_chunk_task: Received {}", chunk_hash);
info!(target: "fud::fetch_chunk_task()", "Fetching chunk {}", chunk_hash);
let mut chunk_router = fud.chunks_router.write().await;
let peers = chunk_router.get_mut(&chunk_hash);
let result = fetch(fud.clone(), chunk_hash).await;
if peers.is_none() {
warn!("Chunk {} not in routing table, cannot fetch", chunk_hash);
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
continue
match result {
Some(reply) => {
match reply {
FetchReply::Chunk(FudChunkReply { chunk }) => {
// TODO: Verify chunk
match fud.geode.insert_chunk(&chunk).await {
Ok(inserted_hash) => {
if inserted_hash != chunk_hash {
warn!("Received chunk does not match requested chunk");
}
}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e);
}
};
fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap();
}
_ => {
// Looked for a chunk but got a file instead, not supposed to happen
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
}
}
}
None => {
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
}
};
}
}
#[async_trait]
impl DhtHandler for Fud {
fn dht(&self) -> Arc<Dht> {
self.dht.clone()
}
async fn ping(&self, channel: ChannelPtr) -> Result<dht::DhtNode> {
debug!(target: "fud::Fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudPingReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
let request = FudPingRequest {};
channel.send(&request).await?;
let reply = msg_subscriber.receive_with_timeout(self.dht().timeout).await?;
msg_subscriber.unsubscribe().await;
Ok(reply.node.clone())
}
// TODO: Optimize this
async fn on_new_node(&self, node: &DhtNode) -> Result<()> {
debug!(target: "fud::Fud::DhtHandler::on_new_node()", "New node {}", node.id);
// If this is the first node we know about, then bootstrap
if !self.dht().is_bootstrapped().await {
self.dht().set_bootstrapped().await;
// Lookup our own node id
let self_node = self.dht().node.clone();
debug!(target: "fud::Fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", self_node.id);
let _ = self.lookup_nodes(&self_node.id).await;
}
let mut found = false;
let peers = peers.unwrap();
let mut invalid_chunk_routes = vec![];
for peer in peers.iter() {
let session_out = fud.p2p.session_outbound();
let session_weak = Arc::downgrade(&fud.p2p.session_outbound());
info!("Connecting to {} to fetch {}", peer, chunk_hash);
let connector = Connector::new(fud.p2p.settings(), session_weak);
match connector.connect(peer).await {
Ok((url, channel)) => {
let proto_ver =
ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await;
let handshake_task = session_out.perform_handshake_protocols(
proto_ver,
channel.clone(),
executor.clone(),
);
channel.clone().start(executor.clone());
if let Err(e) = handshake_task.await {
error!("Handshake with {} failed: {}", url, e);
// Delete peer from router
invalid_chunk_routes.push(peer.clone());
continue
}
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudChunkReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
let request = FudChunkRequest { chunk_hash };
if let Err(e) = channel.send(&request).await {
error!("Failed sending FudChunkRequest({}) to {}: {}", chunk_hash, url, e);
continue
}
// TODO: With timeout!
let reply = match msg_subscriber.receive().await {
Ok(v) => v,
Err(e) => {
error!("Error receiving FudChunkReply from subscriber: {}", e);
continue
}
};
msg_subscriber.unsubscribe().await;
channel.stop().await;
match fud.geode.insert_chunk(&reply.chunk).await {
Ok(inserted_hash) => {
if inserted_hash != chunk_hash {
warn!("Received chunk does not match requested chunk");
invalid_chunk_routes.push(peer.clone());
continue
}
}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e);
continue
}
}
found = true;
break
}
Err(e) => {
error!("Failed to connect to {}: {}", peer, e);
continue
}
// Send keys that are closer to this node than we are
let self_id = self.dht().node.id;
let channel = self.get_channel(node).await?;
for (key, nodes) in self.seeders_router.read().await.iter() {
let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id));
let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
if node_distance <= self_distance {
let _ = channel
.send(&FudAnnounce { key: *key, nodes: nodes.iter().cloned().collect() })
.await;
}
}
for peer in invalid_chunk_routes {
debug!("Removing peer {} from {} chunk router", peer, chunk_hash);
peers.remove(&peer);
}
if !found {
warn!("Did not manage to fetch {} chunk", chunk_hash);
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
continue
}
info!("Successfully fetched {} chunk", chunk_hash);
fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap();
Ok(())
}
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
debug!(target: "fud::Fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", key, node.id);
let channel = self.get_channel(node).await?;
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
let request = FudFindNodesRequest { key: *key };
channel.send(&request).await?;
let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().timeout).await?;
msg_subscriber_nodes.unsubscribe().await;
Ok(reply.nodes.clone())
}
}
// TODO: This is not Sybil-resistant
fn generate_node_id() -> Result<blake3::Hash> {
let mut rng = OsRng;
let mut random_data = [0u8; 32];
rng.fill_bytes(&mut random_data);
let node_id = blake3::Hash::from_bytes(random_data);
Ok(node_id)
}
async_daemonize!(realmain);
@@ -569,9 +708,8 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// The working directory for this daemon and geode.
let basedir = expand_path(&args.base_dir)?;
// Hashmaps used for routing
let metadata_router = Arc::new(RwLock::new(HashMap::new()));
let chunks_router = Arc::new(RwLock::new(HashMap::new()));
// Hashmap used for routing
let seeders_router = Arc::new(RwLock::new(HashMap::new()));
info!("Instantiating Geode instance");
let geode = Geode::new(&basedir).await?;
@@ -579,6 +717,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Instantiating P2P network");
let p2p = P2p::new(args.net.into(), ex.clone()).await?;
let external_addrs = p2p.hosts().external_addrs().await;
if external_addrs.is_empty() {
error!(
target: "fud::realmain",
"External addrs not configured. Stopping",
);
return Ok(())
}
info!("Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
@@ -603,16 +751,46 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
// Get or generate the node id
let node_id: Result<blake3::Hash> = {
let mut node_id_path: PathBuf = basedir.clone();
node_id_path.push(NODE_ID_PATH);
match File::open(node_id_path.clone()).await {
Ok(mut file) => {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let buf: [u8; 64] = buffer.try_into().expect("Node ID must have 64 characters");
let node_id = blake3::Hash::from_hex(buf)?;
Ok(node_id)
}
Err(e) if e.kind() == ErrorKind::NotFound => {
let node_id = generate_node_id()?;
let mut file =
OpenOptions::new().write(true).create(true).open(node_id_path).await?;
file.write_all(node_id.to_hex().as_bytes()).await?;
file.flush().await?;
Ok(node_id)
}
Err(e) => Err(e.into()),
}
};
let node_id_ = node_id?;
info!(target: "fud", "Your node ID: {}", node_id_);
// Daemon instantiation
let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded();
let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded();
let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded();
let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded();
// TODO: Add DHT settings in the config file
let dht = Arc::new(Dht::new(&node_id_, 4, 16, 15, p2p.clone(), ex.clone()).await);
let fud = Arc::new(Fud {
metadata_router,
chunks_router,
seeders_router,
p2p: p2p.clone(),
geode,
dht: dht.clone(),
file_fetch_tx,
file_fetch_rx,
file_fetch_end_tx,
@@ -680,6 +858,34 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
.await;
p2p.clone().start().await?;
info!(target: "fud", "Starting DHT tasks");
let dht_channel_task = StoppableTask::new();
let fud_ = fud.clone();
dht_channel_task.clone().start(
async move { fud_.channel_task::<FudFindNodesReply>().await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting dht channel task: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
let dht_disconnect_task = StoppableTask::new();
let fud_ = fud.clone();
dht_disconnect_task.clone().start(
async move { fud_.disconnect_task().await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting dht disconnect task: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
@@ -694,9 +900,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Stopping JSON-RPC server...");
rpc_task.stop().await;
info!("Stopping P2P network");
info!(target: "fud", "Stopping P2P network...");
p2p.stop().await;
info!(target: "fud", "Stopping DHT tasks");
dht_channel_task.stop().await;
dht_disconnect_task.stop().await;
info!("Bye!");
Ok(())
}

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashSet, sync::Arc};
use std::sync::Arc;
use async_trait::async_trait;
use darkfi::{
@@ -30,50 +30,11 @@ use darkfi::{
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use log::{debug, error};
use log::{debug, error, info};
use smol::{fs::File, Executor};
use url::Url;
use super::Fud;
/// Message representing a new file on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFilePut {
pub file_hash: blake3::Hash,
pub chunk_hashes: Vec<blake3::Hash>,
}
impl_p2p_message!(FudFilePut, "FudFilePut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new chunk on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkPut {
pub chunk_hash: blake3::Hash,
}
impl_p2p_message!(FudChunkPut, "FudChunkPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new route for a file on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileRoute {
pub file_hash: blake3::Hash,
pub chunk_hashes: Vec<blake3::Hash>,
pub peer: Url,
}
impl_p2p_message!(FudFileRoute, "FudFileRoute", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new route for a chunk on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkRoute {
pub chunk_hash: blake3::Hash,
pub peer: Url,
}
impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a file request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileRequest {
pub file_hash: blake3::Hash,
}
impl_p2p_message!(FudFileRequest, "FudFileRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
use crate::dht::{DhtHandler, DhtNode};
/// Message representing a file reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -82,17 +43,18 @@ pub struct FudFileReply {
}
impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk request from the network
/// Message representing a node announcing a key on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkRequest {
pub chunk_hash: blake3::Hash,
pub struct FudAnnounce {
pub key: blake3::Hash,
pub nodes: Vec<DhtNode>,
}
impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkReply {
// TODO: This sould be a chunk-sized array, but then we need padding?
// TODO: This should be a chunk-sized array, but then we need padding?
pub chunk: Vec<u8>,
}
impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -107,360 +69,284 @@ impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0, 0, DEFAULT_METERING_CON
pub struct FudChunkNotFound;
impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a seeders reply when seeders are not found
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudSeedersNotFound;
impl_p2p_message!(FudSeedersNotFound, "FudSeedersNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a ping request on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudPingRequest {}
impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a ping reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudPingReply {
pub node: DhtNode,
}
impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a find file/chunk request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindRequest {
pub key: blake3::Hash,
}
impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a find nodes request on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindNodesRequest {
pub key: blake3::Hash,
}
impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a find nodes reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindNodesReply {
pub nodes: Vec<DhtNode>,
}
impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a find seeders request on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindSeedersRequest {
pub key: blake3::Hash,
}
impl_p2p_message!(FudFindSeedersRequest, "FudFindSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a find seeders reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindSeedersReply {
pub nodes: Vec<DhtNode>,
}
impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// P2P protocol implementation for fud.
pub struct ProtocolFud {
channel: ChannelPtr,
file_put_sub: MessageSubscription<FudFilePut>,
chunk_put_sub: MessageSubscription<FudChunkPut>,
file_route_sub: MessageSubscription<FudFileRoute>,
chunk_route_sub: MessageSubscription<FudChunkRoute>,
file_request_sub: MessageSubscription<FudFileRequest>,
chunk_request_sub: MessageSubscription<FudChunkRequest>,
ping_request_sub: MessageSubscription<FudPingRequest>,
find_request_sub: MessageSubscription<FudFindRequest>,
find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
announce_sub: MessageSubscription<FudAnnounce>,
fud: Arc<Fud>,
p2p: P2pPtr,
jobsman: ProtocolJobsManagerPtr,
}
impl ProtocolFud {
pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, p2p: P2pPtr) -> Result<ProtocolBasePtr> {
pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
debug!(
target: "fud::proto::ProtocolFud::init()",
"Adding ProtocolFud to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFilePut>().await;
msg_subsystem.add_dispatch::<FudChunkPut>().await;
msg_subsystem.add_dispatch::<FudFileRoute>().await;
msg_subsystem.add_dispatch::<FudChunkRoute>().await;
msg_subsystem.add_dispatch::<FudFileRequest>().await;
msg_subsystem.add_dispatch::<FudChunkRequest>().await;
msg_subsystem.add_dispatch::<FudPingRequest>().await;
msg_subsystem.add_dispatch::<FudFindRequest>().await;
msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
msg_subsystem.add_dispatch::<FudAnnounce>().await;
let file_put_sub = channel.subscribe_msg::<FudFilePut>().await?;
let chunk_put_sub = channel.subscribe_msg::<FudChunkPut>().await?;
let file_route_sub = channel.subscribe_msg::<FudFileRoute>().await?;
let chunk_route_sub = channel.subscribe_msg::<FudChunkRoute>().await?;
let file_request_sub = channel.subscribe_msg::<FudFileRequest>().await?;
let chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
file_put_sub,
chunk_put_sub,
file_route_sub,
chunk_route_sub,
file_request_sub,
chunk_request_sub,
ping_request_sub,
find_request_sub,
find_nodes_request_sub,
find_seeders_request_sub,
announce_sub,
fud,
p2p,
jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
}))
}
async fn handle_fud_file_put(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_file_put()", "START");
async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
loop {
let fud_file = match self.file_put_sub.receive().await {
let _ = match self.ping_request_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_file_put()",
"recv fail: {}", e,
);
error!("{}", e);
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING");
// TODO: This approach is naive and optimistic. Needs to be fixed.
let mut metadata_lock = self.fud.metadata_router.write().await;
let file_route = metadata_lock.get_mut(&fud_file.file_hash);
match file_route {
Some(peers) => {
peers.insert(self.channel.address().clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(self.channel.address().clone());
metadata_lock.insert(fud_file.file_hash, peers);
}
}
drop(metadata_lock);
let mut chunks_lock = self.fud.chunks_router.write().await;
for chunk in &fud_file.chunk_hashes {
let chunk_route = chunks_lock.get_mut(chunk);
match chunk_route {
Some(peers) => {
peers.insert(self.channel.address().clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(self.channel.address().clone());
chunks_lock.insert(*chunk, peers);
}
}
}
drop(chunks_lock);
// Relay this knowledge of the new route
let route = FudFileRoute {
file_hash: fud_file.file_hash,
chunk_hashes: fud_file.chunk_hashes.clone(),
peer: self.channel.address().clone(),
};
self.p2p.broadcast_with_exclude(&route, &[self.channel.address().clone()]).await;
}
}
async fn handle_fud_chunk_put(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_chunk_put()", "START");
loop {
let fud_chunk = match self.chunk_put_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_chunk_put()",
"recv fail: {}", e,
);
continue
}
};
// TODO: This approach is naive and optimistic. Needs to be fixed.
let mut chunks_lock = self.fud.chunks_router.write().await;
let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash);
match chunk_route {
Some(peers) => {
peers.insert(self.channel.address().clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(self.channel.address().clone());
chunks_lock.insert(fud_chunk.chunk_hash, peers);
}
}
drop(chunks_lock);
// Relay this knowledge of the new route
let route = FudChunkRoute {
chunk_hash: fud_chunk.chunk_hash,
peer: self.channel.address().clone(),
};
self.p2p.broadcast_with_exclude(&route, &[self.channel.address().clone()]).await;
}
}
async fn handle_fud_file_route(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_file_route()", "START");
loop {
let fud_file = match self.file_route_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_file_route()",
"recv fail: {}", e,
);
continue
}
};
// TODO: This approach is naive and optimistic. Needs to be fixed.
let mut metadata_lock = self.fud.metadata_router.write().await;
let file_route = metadata_lock.get_mut(&fud_file.file_hash);
match file_route {
Some(peers) => {
peers.insert(fud_file.peer.clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(fud_file.peer.clone());
metadata_lock.insert(fud_file.file_hash, peers);
}
}
let excluded_peers: Vec<Url> = metadata_lock
.get(&fud_file.file_hash)
.unwrap_or(&HashSet::new())
.iter()
.cloned()
.collect();
drop(metadata_lock);
let mut chunks_lock = self.fud.chunks_router.write().await;
for chunk in &fud_file.chunk_hashes {
let chunk_route = chunks_lock.get_mut(chunk);
match chunk_route {
Some(peers) => {
peers.insert(fud_file.peer.clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(fud_file.peer.clone());
chunks_lock.insert(*chunk, peers);
}
}
}
drop(chunks_lock);
// Relay this knowledge of the new route
let route = FudFileRoute {
file_hash: fud_file.file_hash,
chunk_hashes: fud_file.chunk_hashes.clone(),
peer: fud_file.peer.clone(),
};
self.p2p
.broadcast_with_exclude(
&route,
&[vec![self.channel.address().clone(), fud_file.peer.clone()], excluded_peers]
.concat(),
)
.await;
}
}
async fn handle_fud_chunk_route(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_chunk_route()", "START");
loop {
let fud_chunk = match self.chunk_route_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_chunk_put()",
"recv fail: {}", e,
);
continue
}
};
// TODO: This approach is naive and optimistic. Needs to be fixed.
let mut chunks_lock = self.fud.chunks_router.write().await;
let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash);
match chunk_route {
Some(peers) => {
peers.insert(fud_chunk.peer.clone());
}
None => {
let mut peers = HashSet::new();
peers.insert(fud_chunk.peer.clone());
chunks_lock.insert(fud_chunk.chunk_hash, peers);
}
}
let excluded_peers: Vec<Url> = chunks_lock
.get(&fud_chunk.chunk_hash)
.unwrap_or(&HashSet::new())
.iter()
.cloned()
.collect();
drop(chunks_lock);
// Relay this knowledge of the new route
let route =
FudChunkRoute { chunk_hash: fud_chunk.chunk_hash, peer: fud_chunk.peer.clone() };
self.p2p
.broadcast_with_exclude(
&route,
&[vec![self.channel.address().clone(), fud_chunk.peer.clone()], excluded_peers]
.concat(),
)
.await;
}
}
async fn handle_fud_file_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_file_request()", "START");
loop {
let file_request = match self.file_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_file_request()",
"recv fail: {}", e,
);
continue
}
};
let chunked_file = match self.fud.geode.get(&file_request.file_hash).await {
Ok(v) => v,
Err(Error::GeodeNeedsGc) => {
// TODO: Run GC
continue
}
Err(Error::GeodeFileNotFound) => match self.channel.send(&FudFileNotFound).await {
Ok(()) => continue,
Err(_e) => continue,
},
Err(_e) => continue,
};
let file_reply = FudFileReply {
chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(),
};
match self.channel.send(&file_reply).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
}
async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
loop {
let chunk_request = match self.chunk_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_chunk_request()",
"recv fail: {}", e,
);
continue
}
};
let chunk_path = match self.fud.geode.get_chunk(&chunk_request.chunk_hash).await {
Ok(v) => v,
Err(Error::GeodeNeedsGc) => {
// TODO: Run GC
continue
}
Err(Error::GeodeChunkNotFound) => {
match self.channel.send(&FudChunkNotFound).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
Err(_e) => continue,
};
// The consistency should already be checked in Geode, so we're
// fine not checking and unwrapping here.
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };
let reply = FudPingReply { node: self.fud.dht.node.clone() };
match self.channel.send(&reply).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
}
async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
loop {
let request = match self.find_request_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
error!("{}", e);
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND");
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
self.fud.update_node(&node).await;
}
// Chunk
{
let chunk_res = self.fud.geode.get_chunk(&request.key).await;
// TODO: Run geode GC
if let Ok(chunk_path) = chunk_res {
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };
let _ = self.channel.send(&reply).await;
continue;
}
}
// File
{
let file_res = self.fud.geode.get(&request.key).await;
// TODO: Run geode GC
if let Ok(chunked_file) = file_res {
let reply = FudFileReply {
chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(),
};
let _ = self.channel.send(&reply).await;
continue;
}
}
// Peers
{
let router = self.fud.seeders_router.read().await;
let peers = router.get(&request.key);
if let Some(nodes) = peers {
let reply = FudFindNodesReply { nodes: nodes.clone().into_iter().collect() };
let _ = self.channel.send(&reply).await;
continue;
}
}
// Nodes
let reply = FudFindNodesReply {
nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().k).await,
};
let _ = self.channel.send(&reply).await;
}
}
async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
loop {
let request = match self.find_nodes_request_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
error!("{}", e);
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", &request.key);
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
self.fud.update_node(&node).await;
}
let reply = FudFindNodesReply {
nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().k).await,
};
match self.channel.send(&reply).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
}
async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
loop {
let request = match self.find_seeders_request_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
error!("{}", e);
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", &request.key);
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
self.fud.update_node(&node).await;
}
let router = self.fud.seeders_router.read().await;
let peers = router.get(&request.key);
match peers {
Some(nodes) => {
let _ = self
.channel
.send(&FudFindSeedersReply { nodes: nodes.iter().cloned().collect() })
.await;
}
None => {
let _ = self.channel.send(&FudSeedersNotFound {}).await;
}
};
}
}
async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
loop {
let request = match self.announce_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
error!("{}", e);
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", &request.key);
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
self.fud.update_node(&node).await;
}
self.fud
.add_to_router(self.fud.seeders_router.clone(), &request.key, request.nodes.clone())
.await;
}
}
}
#[async_trait]
@@ -468,12 +354,17 @@ impl ProtocolBase for ProtocolFud {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "fud::ProtocolFud::start()", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_fud_file_put(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_put(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_file_route(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_route(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_file_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
.await;
self.jobsman
.clone()
.spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
.await;
self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
debug!(target: "fud::ProtocolFud::start()", "END");
Ok(())
}

View File

@@ -78,6 +78,8 @@ pub const MAX_CHUNK_SIZE: usize = 262_144;
const FILES_PATH: &str = "files";
/// Path prefix where file chunks are stored
const CHUNKS_PATH: &str = "chunks";
/// Path prefix where full files are stored
const DOWNLOADS_PATH: &str = "downloads";
/// `ChunkedFile` is a representation of a file we're trying to
/// retrieve from `Geode`.
@@ -111,8 +113,13 @@ pub struct Geode {
files_path: PathBuf,
/// Path to the filesystem directory where file chunks are stored
chunks_path: PathBuf,
/// Path to the filesystem directory where full files are stored
downloads_path: PathBuf,
}
/// smol::fs::File::read does not guarantee that the buffer will be filled, even if the buffer is
/// smaller than the file. This is a workaround.
/// This reads the stream until the buffer is full or until we reached the end of the stream.
pub async fn read_until_filled(
mut stream: impl AsyncRead + Unpin,
buffer: &mut [u8],
@@ -137,14 +144,17 @@ impl Geode {
pub async fn new(base_path: &PathBuf) -> Result<Self> {
let mut files_path: PathBuf = base_path.into();
let mut chunks_path: PathBuf = base_path.into();
let mut downloads_path: PathBuf = base_path.into();
files_path.push(FILES_PATH);
chunks_path.push(CHUNKS_PATH);
downloads_path.push(DOWNLOADS_PATH);
// Create necessary directory structure if needed
fs::create_dir_all(&files_path).await?;
fs::create_dir_all(&chunks_path).await?;
fs::create_dir_all(&downloads_path).await?;
Ok(Self { files_path, chunks_path })
Ok(Self { files_path, chunks_path, downloads_path })
}
/// Attempt to read chunk hashes from a given file path and return
@@ -477,4 +487,26 @@ impl Geode {
Ok(chunk_path)
}
/// Assemble chunks to create a file.
/// This method does NOT perform a consistency check.
pub async fn assemble_file(&self, file_hash: &blake3::Hash, chunked_file: &ChunkedFile, file_name: Option<String>) -> Result<PathBuf> {
info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash);
let mut file_path = self.downloads_path.clone();
file_path.push(file_hash.to_hex().as_str());
fs::create_dir_all(&file_path).await?;
file_path.push(file_name.unwrap_or(file_hash.to_hex().to_string()));
let mut file_fd = File::create(&file_path).await?;
for (_, chunk_path) in chunked_file.iter() {
let mut buf = vec![];
let mut chunk_fd = File::open(chunk_path.clone().unwrap()).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
file_fd.write(chunk_slice).await?;
}
Ok(file_path)
}
}