fud: remove chunk seeders, change file hash, replace hex with bs58

This commit is contained in:
darkfi
2025-03-23 16:19:20 +01:00
committed by epiphany1
parent aae5651501
commit f630e1cd7c
10 changed files with 376 additions and 267 deletions

1
Cargo.lock generated
View File

@@ -3400,6 +3400,7 @@ version = "0.4.1"
dependencies = [
"async-trait",
"blake3 1.6.0",
"bs58",
"darkfi",
"darkfi-serial",
"easy-parallel",

View File

@@ -171,6 +171,7 @@ validator = [
geode = [
"blake3",
"bs58",
"futures",
"smol",
]

View File

@@ -172,20 +172,11 @@ impl Fu {
return Err(Error::Custom(format!("Could not find file {}", file_hash)));
}
"chunk_not_found" => {
let info = params
.get("info")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
let chunk_hash =
info.get("chunk_hash").unwrap().get::<String>().unwrap();
println!();
return Err(Error::Custom(format!(
"Could not find chunk {}",
chunk_hash
)));
// A seeder does not have a chunk we are looking for,
// we will try another seeder so there is nothing to do
}
"missing_chunks" => {
// We tried all seeders and some chunks are still missing
println!();
return Err(Error::Custom("Missing chunks".to_string()));
}

View File

@@ -15,6 +15,7 @@ darkfi-serial = {version = "0.4.2", features = ["hash"]}
# Misc
async-trait = "0.1.86"
blake3 = "1.6.0"
bs58 = "0.5.1"
rand = "0.8.5"
log = "0.4.26"
tinyjson = "2.5.1"

View File

@@ -31,7 +31,7 @@ use darkfi::{
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use futures::future::join_all;
use log::{debug, error};
use log::{debug, error, warn};
use num_bigint::BigUint;
use smol::lock::RwLock;
use url::Url;
@@ -495,6 +495,7 @@ pub trait DhtHandler {
let connector = Connector::new(self.dht().p2p.settings(), session_weak);
let connect_res = connector.connect(&addr).await;
if connect_res.is_err() {
warn!(target: "dht::DhtHandler::get_channel()", "Error while connecting to {}: {}", addr, connect_res.unwrap_err());
continue;
}
let (_, channel) = connect_res.unwrap();
@@ -502,6 +503,7 @@ pub trait DhtHandler {
session_out.register_channel(channel.clone(), self.dht().executor.clone()).await;
if register_res.is_err() {
channel.clone().stop().await;
warn!(target: "dht::DhtHandler::get_channel()", "Error while registering channel {}: {}", channel.info.id, register_res.unwrap_err());
continue;
}

View File

@@ -24,6 +24,7 @@ use std::{
};
use num_bigint::BigUint;
use rpc::{ChunkDownloadCompleted, ChunkNotFound};
use tasks::FetchReply;
use crate::rpc::FudEvent;
@@ -31,7 +32,7 @@ use async_trait::async_trait;
use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr};
use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
use rand::{rngs::OsRng, RngCore};
use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore};
use smol::{
channel,
fs::{File, OpenOptions},
@@ -44,7 +45,7 @@ use structopt_toml::{structopt::StructOpt, StructOptToml};
use darkfi::{
async_daemonize, cli_desc,
geode::Geode,
geode::{hash_to_string, Geode},
net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr},
rpc::{
jsonrpc::JsonSubscriber,
@@ -122,10 +123,6 @@ pub struct Fud {
file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
file_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
chunk_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>,
chunk_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
chunk_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
chunk_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
@@ -168,7 +165,7 @@ impl DhtHandler for Fud {
// TODO: Optimize this
async fn on_new_node(&self, node: &DhtNode) -> Result<()> {
debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", node.id);
debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id));
// If this is the first node we know about, then bootstrap
if !self.dht().is_bootstrapped().await {
@@ -176,7 +173,7 @@ impl DhtHandler for Fud {
// Lookup our own node id
let self_node = self.dht().node.clone();
debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", self_node.id);
debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self_node.id));
let _ = self.lookup_nodes(&self_node.id).await;
}
@@ -200,7 +197,7 @@ impl DhtHandler for Fud {
}
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", key, node.id);
debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id));
let channel = self.get_channel(node).await?;
let msg_subsystem = channel.message_subsystem();
@@ -219,15 +216,14 @@ impl DhtHandler for Fud {
}
impl Fud {
/// Add ourselves to `seeders_router` for the files and chunks we already have.
/// Add ourselves to `seeders_router` for the files we already have.
/// Skipped if we have no external address.
async fn init(&self) -> Result<()> {
if self.dht().node.clone().addresses.is_empty() {
return Ok(());
}
let self_router_items: Vec<DhtRouterItem> = vec![self.dht().node.clone().into()];
let mut hashes = self.geode.list_chunks().await?;
hashes.extend(self.geode.list_files().await?);
let hashes = self.geode.list_files().await?;
for hash in hashes {
self.add_to_router(self.seeders_router.clone(), &hash, self_router_items.clone()).await;
@@ -236,13 +232,186 @@ impl Fud {
Ok(())
}
/// Fetch a file or chunk from the network
/// Query nodes close to `key` to find the seeders
async fn fetch_seeders(&self, key: blake3::Hash) -> HashSet<DhtRouterItem> {
let closest_nodes = self.lookup_nodes(&key).await; // Find the `k` closest nodes
if closest_nodes.is_err() {
return HashSet::new();
}
let mut seeders: HashSet<DhtRouterItem> = HashSet::new();
for node in closest_nodes.unwrap() {
let channel = match self.get_channel(&node).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e);
continue;
}
};
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
Ok(msg_subscriber) => msg_subscriber,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {}", e);
continue;
}
};
let send_res = channel.send(&FudFindSeedersRequest { key }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {}", e);
msg_subscriber.unsubscribe().await;
continue;
}
let reply = match msg_subscriber.receive_with_timeout(self.dht().timeout).await {
Ok(reply) => reply,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {}", e);
msg_subscriber.unsubscribe().await;
continue;
}
};
msg_subscriber.unsubscribe().await;
seeders.extend(reply.seeders.clone());
}
info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(&key));
seeders
}
/// Fetch chunks for a file from `seeders`
async fn fetch_chunks(
&self,
file_hash: &blake3::Hash,
chunk_hashes: &HashSet<blake3::Hash>,
seeders: &HashSet<DhtRouterItem>,
) {
let mut remaining_chunks = chunk_hashes.clone();
let mut shuffled_seeders = {
let mut vec: Vec<_> = seeders.iter().cloned().collect();
vec.shuffle(&mut OsRng);
vec
};
while let Some(seeder) = shuffled_seeders.pop() {
let channel = match self.get_channel(&seeder.node).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {}", hash_to_string(&seeder.node.id), e);
continue;
}
};
info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id));
loop {
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudChunkReply>().await;
msg_subsystem.add_dispatch::<FudNotFound>().await;
let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
let mut chunks_to_query = remaining_chunks.clone();
// Select a chunk to request
let mut chunk_hash: Option<blake3::Hash> = None;
if let Some(&random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
chunk_hash = Some(random_chunk);
}
if chunk_hash.is_none() {
// No more chunks to request from this seeder
break; // Switch to another seeder
}
let chunk_hash = chunk_hash.unwrap();
let send_res = channel.send(&FudFindRequest { key: chunk_hash }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {}", e);
break; // Switch to another seeder
}
let chunk_recv =
msg_subscriber_chunk.receive_with_timeout(self.dht().timeout).fuse();
let notfound_recv =
msg_subscriber_notfound.receive_with_timeout(self.dht().timeout).fuse();
pin_mut!(chunk_recv, notfound_recv);
// Wait for a FudChunkReply or FudNotFound
select! {
chunk_reply = chunk_recv => {
if let Err(e) = chunk_reply {
warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {}", e);
break; // Switch to another seeder
}
chunks_to_query.remove(&chunk_hash);
let reply = chunk_reply.unwrap();
match self.geode.insert_chunk(&reply.chunk).await {
Ok(inserted_hash) => {
if inserted_hash != chunk_hash {
warn!("Received chunk does not match requested chunk");
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
continue; // Skip to next chunk, will retry this chunk later
}
info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.download_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
file_hash: *file_hash,
chunk_hash,
}))
.await;
remaining_chunks.remove(&chunk_hash);
}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e);
}
};
}
notfound_reply = notfound_recv => {
if let Err(e) = notfound_reply {
warn!(target: "fud::fetch_chunks()", "Error waiting for NOTFOUND reply: {}", e);
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
break; // Switch to another seeder
}
info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.download_publisher
.notify(FudEvent::ChunkNotFound(ChunkNotFound {
file_hash: *file_hash,
chunk_hash,
}))
.await;
chunks_to_query.remove(&chunk_hash);
}
};
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
}
// Stop when there are no missing chunks
if remaining_chunks.is_empty() {
break;
}
}
}
/// Fetch a single file metadata from the network.
/// If the file is smaller than a single chunk then the chunk is returned.
/// 1. Lookup nodes close to the key
/// 2. Request seeders for the file/chunk from those nodes
/// 3. Request the file/chunk from the seeders
async fn fetch(&self, key: blake3::Hash) -> Option<FetchReply> {
/// 2. Request seeders for the file from those nodes
/// 3. Request the file from the seeders
async fn fetch_file_metadata(&self, file_hash: blake3::Hash) -> Option<FetchReply> {
let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
let closest_nodes = self.lookup_nodes(&key).await; // 1
let closest_nodes = self.lookup_nodes(&file_hash).await; // 1
let mut result: Option<FetchReply> = None;
if closest_nodes.is_err() {
return None
@@ -253,7 +422,7 @@ impl Fud {
let channel = match self.get_channel(&node).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch()", "Could not get a channel for node {}: {}", node.id, e);
warn!(target: "fud::fetch_file_metadata()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e);
continue;
}
};
@@ -263,14 +432,14 @@ impl Fud {
let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
Ok(msg_subscriber) => msg_subscriber,
Err(e) => {
warn!(target: "fud::fetch()", "Error subscribing to msg: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error subscribing to msg: {}", e);
continue;
}
};
let send_res = channel.send(&FudFindSeedersRequest { key }).await;
let send_res = channel.send(&FudFindSeedersRequest { key: file_hash }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch()", "Error while sending FudFindSeedersRequest: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error while sending FudFindSeedersRequest: {}", e);
msg_subscriber.unsubscribe().await;
continue;
}
@@ -278,13 +447,14 @@ impl Fud {
let reply = match msg_subscriber.receive_with_timeout(self.dht().timeout).await {
Ok(reply) => reply,
Err(e) => {
warn!(target: "fud::fetch()", "Error waiting for reply: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error waiting for reply: {}", e);
msg_subscriber.unsubscribe().await;
continue;
}
};
let mut seeders = reply.seeders.clone();
info!(target: "fud::fetch()", "Found {} seeders for {}", seeders.len(), key);
info!(target: "fud::fetch_file_metadata()", "Found {} seeders for {}", seeders.len(), hash_to_string(&file_hash));
msg_subscriber.unsubscribe().await;
@@ -308,9 +478,9 @@ impl Fud {
let msg_subscriber_notfound =
channel.subscribe_msg::<FudNotFound>().await.unwrap();
let send_res = channel.send(&FudFindRequest { key }).await;
let send_res = channel.send(&FudFindRequest { key: file_hash }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch()", "Error while sending FudFindRequest: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error while sending FudFindRequest: {}", e);
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_file.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
@@ -326,43 +496,55 @@ impl Fud {
pin_mut!(chunk_recv, file_recv, notfound_recv);
let cleanup = async || {
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_file.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
};
// Wait for a FudChunkReply, FudFileReply, or FudNotFound
select! {
// Received a chunk while requesting a file, this is allowed to
// optimize fetching files smaller than a single chunk
chunk_reply = chunk_recv => {
cleanup().await;
if let Err(e) = chunk_reply {
warn!(target: "fud::fetch()", "Error waiting for chunk reply: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error waiting for chunk reply: {}", e);
continue;
}
let reply = chunk_reply.unwrap();
info!(target: "fud::fetch()", "Received chunk {} from seeder {}", key, seeder.node.id.to_hex().to_string());
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_file.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
let chunk_hash = blake3::hash(&reply.chunk);
// Check that this is the only chunk in the file
if !self.geode.verify_file(&file_hash, &[chunk_hash]) {
warn!(target: "fud::fetch_file_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash");
continue;
}
info!(target: "fud::fetch_file_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&file_hash), hash_to_string(&seeder.node.id));
result = Some(FetchReply::Chunk((*reply).clone()));
break;
}
file_reply = file_recv => {
cleanup().await;
if let Err(e) = file_reply {
warn!(target: "fud::fetch()", "Error waiting for file reply: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error waiting for file reply: {}", e);
continue;
}
let reply = file_reply.unwrap();
info!(target: "fud::fetch()", "Received file {} from seeder {}", key, seeder.node.id.to_hex().to_string());
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_file.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
if !self.geode.verify_file(&file_hash, &reply.chunk_hashes) {
warn!(target: "fud::fetch_file_metadata()", "Received invalid file metadata");
continue;
}
info!(target: "fud::fetch_file_metadata()", "Received file {} from seeder {}", hash_to_string(&file_hash), hash_to_string(&seeder.node.id));
result = Some(FetchReply::File((*reply).clone()));
break;
}
notfound_reply = notfound_recv => {
cleanup().await;
if let Err(e) = notfound_reply {
warn!(target: "fud::fetch()", "Error waiting for NOTFOUND reply: {}", e);
warn!(target: "fud::fetch_file_metadata()", "Error waiting for NOTFOUND reply: {}", e);
continue;
}
info!(target: "fud::fetch()", "Received NOTFOUND {} from seeder {}", key, seeder.node.id.to_hex().to_string());
msg_subscriber_chunk.unsubscribe().await;
msg_subscriber_file.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
info!(target: "fud::fetch_file_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(&file_hash), hash_to_string(&seeder.node.id));
}
};
}
@@ -438,15 +620,17 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
Ok(mut file) => {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let buf: [u8; 64] = buffer.try_into().expect("Node ID must have 64 characters");
let node_id = blake3::Hash::from_hex(buf)?;
let buf: [u8; 44] = buffer.try_into().expect("Node ID must have 44 characters");
let mut out_buf = [0u8; 32];
bs58::decode(buf).onto(&mut out_buf)?;
let node_id = blake3::Hash::from_bytes(out_buf);
Ok(node_id)
}
Err(e) if e.kind() == ErrorKind::NotFound => {
let node_id = generate_node_id()?;
let mut file =
OpenOptions::new().write(true).create(true).open(node_id_path).await?;
file.write_all(node_id.to_hex().as_bytes()).await?;
file.write_all(&bs58::encode(node_id.as_bytes()).into_vec()).await?;
file.flush().await?;
Ok(node_id)
}
@@ -456,15 +640,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let node_id_ = node_id?;
info!(target: "fud", "Your node ID: {}", node_id_);
info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id_));
// Daemon instantiation
let download_sub = JsonSubscriber::new("get");
let (get_tx, get_rx) = smol::channel::unbounded();
let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded();
let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded();
let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded();
let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded();
// TODO: Add DHT settings in the config file
let dht = Arc::new(Dht::new(&node_id_, 4, 16, 60, p2p.clone(), ex.clone()).await);
let fud = Arc::new(Fud {
@@ -478,10 +660,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
file_fetch_rx,
file_fetch_end_tx,
file_fetch_end_rx,
chunk_fetch_tx,
chunk_fetch_rx,
chunk_fetch_end_tx,
chunk_fetch_end_rx,
rpc_connections: Mutex::new(HashSet::new()),
dnet_sub,
download_sub: download_sub.clone(),
@@ -526,20 +704,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!(target: "fud", "Starting fetch chunk task");
let chunk_task = StoppableTask::new();
chunk_task.clone().start(
tasks::fetch_chunk_task(fud.clone()),
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting fetch chunk task: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
info!(target: "fud", "Starting get task");
let get_task_ = StoppableTask::new();
get_task_.clone().start(
@@ -643,9 +807,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Stopping fetch file task...");
file_task.stop().await;
info!(target: "fud", "Stopping fetch chunk task...");
chunk_task.stop().await;
info!(target: "fud", "Stopping get task...");
get_task_.stop().await;

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use darkfi::{
geode::{read_until_filled, MAX_CHUNK_SIZE},
geode::{hash_to_string, read_until_filled, MAX_CHUNK_SIZE},
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
@@ -227,6 +227,23 @@ impl ProtocolFud {
// TODO: Run geode GC
if let Ok(chunked_file) = file_res {
// If the file has a single chunk, just reply with the chunk
if chunked_file.len() == 1 {
let chunk_res =
self.fud.geode.get_chunk(&chunked_file.iter().next().unwrap().0).await;
if let Ok(chunk_path) = chunk_res {
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read =
read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk (file has a single chunk)");
let _ = self.channel.send(&reply).await;
continue;
}
}
// Otherwise reply with the file metadata
let reply = FudFileReply {
chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(),
};
@@ -237,7 +254,7 @@ impl ProtocolFud {
}
let reply = FudNotFound {};
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", request.key.to_hex().to_string());
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
let _ = self.channel.send(&reply).await;
}
}
@@ -254,7 +271,7 @@ impl ProtocolFud {
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", &request.key);
info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
@@ -283,7 +300,7 @@ impl ProtocolFud {
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", &request.key);
info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {
@@ -319,7 +336,7 @@ impl ProtocolFud {
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", &request.key);
info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
if let Some(node) = node {

View File

@@ -24,6 +24,7 @@ use std::{
use crate::{dht::DhtHandler, proto::FudAnnounce, Fud};
use async_trait::async_trait;
use darkfi::{
geode::hash_to_string,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
p2p_method::HandlerP2p,
@@ -96,7 +97,7 @@ impl Fud {
}
};
let (file_hash, chunk_hashes) = match self.geode.insert(fd).await {
let (file_hash, _) = match self.geode.insert(fd).await {
Ok(v) => v,
Err(e) => {
error!(target: "fud::put()", "Failed inserting file {:?} to geode: {}", path, e);
@@ -109,14 +110,7 @@ impl Fud {
let fud_announce = FudAnnounce { key: file_hash, seeders: vec![self_node.clone().into()] };
let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await;
// Announce chunks
for chunk_hash in chunk_hashes {
let fud_announce =
FudAnnounce { key: chunk_hash, seeders: vec![self_node.clone().into()] };
let _ = self.announce(&chunk_hash, &fud_announce, self.seeders_router.clone()).await;
}
JsonResponse::new(JsonValue::String(file_hash.to_hex().to_string()), id).into()
JsonResponse::new(JsonValue::String(hash_to_string(&file_hash)), id).into()
}
// RPCAPI:
@@ -138,10 +132,13 @@ impl Fud {
None => None,
};
let file_hash = match blake3::Hash::from_hex(params[0].get::<String>().unwrap()) {
Ok(v) => v,
let mut hash_buf = [0u8; 32];
match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
Ok(_) => {}
Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
};
}
let file_hash = blake3::Hash::from_bytes(hash_buf);
let _ = self.get_tx.send((id, file_hash, file_name, Ok(()))).await;
@@ -207,7 +204,7 @@ impl Fud {
addresses.push(JsonValue::String(addr.to_string()));
}
nodes.push(JsonValue::Array(vec![
JsonValue::String(node.id.to_hex().to_string()),
JsonValue::String(hash_to_string(&node.id)),
JsonValue::Array(addresses),
]));
}
@@ -231,9 +228,9 @@ impl Fud {
for (hash, items) in self.seeders_router.read().await.iter() {
let mut node_ids = vec![];
for item in items {
node_ids.push(JsonValue::String(item.node.id.to_hex().to_string()));
node_ids.push(JsonValue::String(hash_to_string(&item.node.id)));
}
seeders_router.insert(hash.to_hex().to_string(), JsonValue::Array(node_ids));
seeders_router.insert(hash_to_string(hash), JsonValue::Array(node_ids));
}
let mut res: HashMap<String, JsonValue> = HashMap::new();
res.insert("seeders".to_string(), JsonValue::Object(seeders_router));
@@ -282,15 +279,15 @@ pub enum FudEvent {
impl From<ChunkDownloadCompleted> for JsonValue {
fn from(info: ChunkDownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())),
("chunk_hash", JsonValue::String(info.chunk_hash.to_hex().to_string())),
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
])
}
}
impl From<FileDownloadCompleted> for JsonValue {
fn from(info: FileDownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())),
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_count", JsonValue::Number(info.chunk_count as f64)),
])
}
@@ -298,7 +295,7 @@ impl From<FileDownloadCompleted> for JsonValue {
impl From<DownloadCompleted> for JsonValue {
fn from(info: DownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())),
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())),
])
}
@@ -306,14 +303,14 @@ impl From<DownloadCompleted> for JsonValue {
impl From<ChunkNotFound> for JsonValue {
fn from(info: ChunkNotFound) -> JsonValue {
json_map([
("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())),
("chunk_hash", JsonValue::String(info.chunk_hash.to_hex().to_string())),
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
])
}
}
impl From<FileNotFound> for JsonValue {
fn from(info: FileNotFound) -> JsonValue {
json_map([("file_hash", JsonValue::String(info.file_hash.to_hex().to_string()))])
json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))])
}
}
impl From<FudEvent> for JsonValue {
@@ -396,11 +393,13 @@ impl Fud {
};
}
// Fetch any missing chunks
let mut missing_chunks = vec![];
let seeders = self.fetch_seeders(file_hash).await;
// List missing chunks
let mut missing_chunks = HashSet::new();
for (chunk, path) in chunked_file.iter() {
if path.is_none() {
missing_chunks.push(*chunk);
missing_chunks.insert(*chunk);
} else {
self.download_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
@@ -411,44 +410,16 @@ impl Fud {
}
}
for chunk in missing_chunks {
self.chunk_fetch_tx.send((chunk, Ok(()))).await.unwrap();
let (i_chunk_hash, status) = self.chunk_fetch_end_rx.recv().await.unwrap();
match status {
Ok(()) => {
self.download_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
file_hash,
chunk_hash: i_chunk_hash,
}))
.await;
let self_announce =
FudAnnounce { key: i_chunk_hash, seeders: vec![self_node.clone().into()] };
let _ = self
.announce(&i_chunk_hash, &self_announce, self.seeders_router.clone())
.await;
}
Err(Error::GeodeChunkRouteNotFound) => {
self.download_publisher
.notify(FudEvent::ChunkNotFound(ChunkNotFound {
file_hash,
chunk_hash: i_chunk_hash,
}))
.await;
return;
}
Err(e) => panic!("{}", e),
};
}
// Fetch missing chunks from seeders
self.fetch_chunks(&file_hash, &missing_chunks, &seeders).await;
let chunked_file = match self.geode.get(&file_hash).await {
Ok(v) => v,
Err(e) => panic!("{}", e),
};
// We fetched all chunks, but the file is not complete?
// We fetched all chunks, but the file is not complete
// (some chunks were missing from all seeders)
if !chunked_file.is_complete() {
self.download_publisher.notify(FudEvent::MissingChunks(MissingChunks {})).await;
return;

View File

@@ -18,14 +18,14 @@
use std::sync::Arc;
use darkfi::{system::sleep, Error, Result};
use darkfi::{geode::hash_to_string, system::sleep, Error, Result};
use crate::{
dht::DhtHandler,
proto::{FudAnnounce, FudChunkReply, FudFileReply},
Fud,
};
use log::{error, info, warn};
use log::{error, info};
/// Triggered when calling the `get` RPC method
pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
@@ -48,32 +48,36 @@ pub async fn fetch_file_task(fud: Arc<Fud>) -> Result<()> {
info!(target: "fud::fetch_file_task()", "Started background file fetch task");
loop {
let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap();
info!(target: "fud::fetch_file_task()", "Fetching file {}", file_hash);
info!(target: "fud::fetch_file_task()", "Fetching file {}", hash_to_string(&file_hash));
let result = fud.fetch(file_hash).await;
let result = fud.fetch_file_metadata(file_hash).await;
match result {
Some(reply) => {
match reply {
FetchReply::File(FudFileReply { chunk_hashes }) => {
if let Err(e) = fud.geode.insert_file(&file_hash, &chunk_hashes).await {
error!("Failed inserting file {} to Geode: {}", file_hash, e);
error!(
"Failed inserting file {} to Geode: {}",
hash_to_string(&file_hash),
e
);
}
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
}
// Looked for a file but got a chunk, meaning that file_hash = chunk_hash, the file fits in a single chunk
// Looked for a file but got a chunk: the entire file fits in a single chunk
FetchReply::Chunk(FudChunkReply { chunk }) => {
// TODO: Verify chunk
info!(target: "fud::fetch()", "File fits in a single chunk");
let _ = fud.geode.insert_file(&file_hash, &[file_hash]).await;
info!(target: "fud::fetch_file_task()", "File fits in a single chunk");
let chunk_hash = blake3::hash(&chunk);
let _ = fud.geode.insert_file(&file_hash, &[chunk_hash]).await;
match fud.geode.insert_chunk(&chunk).await {
Ok(inserted_hash) => {
if inserted_hash != file_hash {
warn!("Received chunk does not match requested file");
}
}
Ok(_) => {}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", file_hash, e);
error!(
"Failed inserting chunk {} to Geode: {}",
hash_to_string(&file_hash),
e
);
}
};
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
@@ -90,53 +94,6 @@ pub async fn fetch_file_task(fud: Arc<Fud>) -> Result<()> {
}
}
/// Background task that receives chunk fetch requests and tries to
/// fetch objects from the network using the routing table.
/// TODO: This can be optimised a lot for connection reuse, etc.
pub async fn fetch_chunk_task(fud: Arc<Fud>) -> Result<()> {
info!(target: "fud::fetch_chunk_task()", "Started background chunk fetch task");
loop {
let (chunk_hash, _) = fud.chunk_fetch_rx.recv().await.unwrap();
info!(target: "fud::fetch_chunk_task()", "Fetching chunk {}", chunk_hash);
let result = fud.fetch(chunk_hash).await;
match result {
Some(reply) => {
match reply {
FetchReply::Chunk(FudChunkReply { chunk }) => {
// TODO: Verify chunk
match fud.geode.insert_chunk(&chunk).await {
Ok(inserted_hash) => {
if inserted_hash != chunk_hash {
warn!("Received chunk does not match requested chunk");
}
}
Err(e) => {
error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e);
}
};
fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap();
}
_ => {
// Looked for a chunk but got a file instead, not supposed to happen
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
}
}
}
None => {
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
}
};
}
}
/// Background task that removes seeders that did not announce a file/chunk
/// for more than an hour.
pub async fn prune_seeders_task(fud: Arc<Fud>) -> Result<()> {
@@ -155,20 +112,6 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
let seeders = vec![fud.dht().node.clone().into()];
info!(target: "fud::announce_task()", "Announcing chunks...");
let chunk_hashes = fud.geode.list_chunks().await;
if let Ok(chunks) = chunk_hashes {
for chunk in chunks {
let _ = fud
.announce(
&chunk,
&FudAnnounce { key: chunk, seeders: seeders.clone() },
fud.seeders_router.clone(),
)
.await;
}
}
info!(target: "fud::announce_task()", "Announcing files...");
let file_hashes = fud.geode.list_files().await;
if let Ok(files) = file_hashes {

View File

@@ -81,6 +81,10 @@ const CHUNKS_PATH: &str = "chunks";
/// Path prefix where full files are stored
const DOWNLOADS_PATH: &str = "downloads";
pub fn hash_to_string(hash: &blake3::Hash) -> String {
bs58::encode(hash.as_bytes()).into_string()
}
/// `ChunkedFile` is a representation of a file we're trying to
/// retrieve from `Geode`.
///
@@ -176,7 +180,9 @@ impl Geode {
let mut lines = BufReader::new(fd).lines();
while let Some(line) = lines.next().await {
let line = line?;
let chunk_hash = blake3::Hash::from_hex(line)?;
let mut hash_buf = [0u8; 32];
bs58::decode(line).onto(&mut hash_buf)?;
let chunk_hash = blake3::Hash::from_bytes(hash_buf);
read_chunks.push(chunk_hash);
}
@@ -209,8 +215,9 @@ impl Geode {
Some(v) => v,
None => continue,
};
let chunk_hash = match blake3::Hash::from_hex(file_name) {
Ok(v) => v,
let mut hash_buf = [0u8; 32];
let chunk_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
Ok(_) => blake3::Hash::from_bytes(hash_buf),
Err(_) => continue,
};
@@ -270,8 +277,9 @@ impl Geode {
Some(v) => v,
None => continue,
};
let file_hash = match blake3::Hash::from_hex(file_name) {
Ok(v) => v,
let mut hash_buf = [0u8; 32];
let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
Ok(_) => blake3::Hash::from_bytes(hash_buf),
Err(_) => continue,
};
@@ -298,7 +306,7 @@ impl Geode {
/// Insert a file into Geode. The function expects any kind of byte stream, which
/// can either be another file on the filesystem, a buffer, etc.
/// Returns a tuple of `(blake3::Hash, Vec<blake3::Hash>)` which represents the
/// file name, and the file's chunks, respectively.
/// file hash, and the file's chunks, respectively.
pub async fn insert(
&self,
mut stream: impl AsyncRead + Unpin,
@@ -316,7 +324,7 @@ impl Geode {
let chunk_slice = &buf[..bytes_read];
let chunk_hash = blake3::hash(chunk_slice);
file_hasher.update(chunk_slice);
file_hasher.update(chunk_hash.as_bytes());
chunk_hashes.push(chunk_hash);
// Write the chunk to a file, if necessary. We first perform
@@ -324,7 +332,7 @@ impl Geode {
// to perform a write, which is usually more expensive than
// reading from disk.
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(chunk_hash.to_hex().as_str());
chunk_path.push(hash_to_string(&chunk_hash).as_str());
let chunk_fd =
OpenOptions::new().read(true).write(true).create(true).open(&chunk_path).await?;
@@ -361,15 +369,15 @@ impl Geode {
buf = [0u8; MAX_CHUNK_SIZE];
}
// This hash is the file's chunks hashed in order.
// This hash is the file's chunks hashes hashed in order.
let file_hash = file_hasher.finalize();
let mut file_path = self.files_path.clone();
file_path.push(file_hash.to_hex().as_str());
file_path.push(hash_to_string(&file_hash).as_str());
// We always overwrite the metadata.
let mut file_fd = File::create(&file_path).await?;
for ch in &chunk_hashes {
file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?;
file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
}
file_fd.flush().await?;
@@ -379,6 +387,7 @@ impl Geode {
/// Create and insert file metadata into Geode given a list of hashes.
/// Always overwrites any existing file.
/// Verifies that the file hash matches the chunk hashes
pub async fn insert_file(
&self,
file_hash: &blake3::Hash,
@@ -386,12 +395,17 @@ impl Geode {
) -> Result<()> {
info!(target: "geode::insert_file()", "[Geode] Inserting file metadata");
if !self.verify_file(file_hash, chunk_hashes) {
// The chunk list or file hash is wrong
return Err(Error::GeodeNeedsGc)
}
let mut file_path = self.files_path.clone();
file_path.push(file_hash.to_hex().as_str());
file_path.push(hash_to_string(file_hash).as_str());
let mut file_fd = File::create(&file_path).await?;
for ch in chunk_hashes {
file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?;
file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
}
file_fd.flush().await?;
@@ -411,7 +425,7 @@ impl Geode {
let chunk_hash = blake3::hash(chunk_slice);
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(chunk_hash.to_hex().as_str());
chunk_path.push(hash_to_string(&chunk_hash).as_str());
let mut chunk_fd = File::create(&chunk_path).await?;
chunk_fd.write_all(chunk_slice).await?;
chunk_fd.flush().await?;
@@ -423,9 +437,10 @@ impl Geode {
/// of chunks and optionally file paths to the said chunks. Returns an error if
/// the read failed in any way (could also be the file does not exist).
pub async fn get(&self, file_hash: &blake3::Hash) -> Result<ChunkedFile> {
info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash);
let file_hash_str = hash_to_string(file_hash);
info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash_str);
let mut file_path = self.files_path.clone();
file_path.push(file_hash.to_hex().as_str());
file_path.push(file_hash_str);
// Try to read the file metadata. If it's corrupt, return an error signalling
// that garbage collection needs to run.
@@ -447,7 +462,7 @@ impl Geode {
let mut buf = vec![];
for (chunk_hash, chunk_path) in chunked_file.0.iter_mut() {
let mut c_path = self.chunks_path.clone();
c_path.push(chunk_hash.to_hex().as_str());
c_path.push(hash_to_string(chunk_hash).as_str());
if !c_path.exists() || !c_path.is_file() {
// TODO: We should be aggressive here and remove the non-file.
@@ -475,9 +490,10 @@ impl Geode {
/// Fetch a single chunk from Geode. Returns a `PathBuf` pointing to the chunk
/// if it is found.
pub async fn get_chunk(&self, chunk_hash: &blake3::Hash) -> Result<PathBuf> {
info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", chunk_hash);
let chunk_hash_str = hash_to_string(chunk_hash);
info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", chunk_hash_str);
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(chunk_hash.to_hex().as_str());
chunk_path.push(chunk_hash_str);
if !chunk_path.exists() || !chunk_path.is_file() {
// TODO: We should be aggressive here and remove the non-file.
@@ -488,9 +504,7 @@ impl Geode {
let mut buf = vec![];
let mut chunk_fd = File::open(&chunk_path).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
let hashed_chunk = blake3::hash(chunk_slice);
if &hashed_chunk != chunk_hash {
if !self.verify_chunk(chunk_hash, &buf[..bytes_read]) {
// The chunk is corrupted
return Err(Error::GeodeNeedsGc)
}
@@ -498,15 +512,38 @@ impl Geode {
Ok(chunk_path)
}
/// Verifies that the file hash matches the chunk hashes.
pub fn verify_file(&self, file_hash: &blake3::Hash, chunk_hashes: &[blake3::Hash]) -> bool {
info!(target: "geode::verify_file()", "[Geode] Verifying file metadata");
let mut file_hasher = blake3::Hasher::new();
for chunk_hash in chunk_hashes {
file_hasher.update(chunk_hash.as_bytes());
}
*file_hash == file_hasher.finalize()
}
/// Verifies that the chunk hash matches the content.
pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
blake3::hash(chunk_slice) == *chunk_hash
}
/// Assemble chunks to create a file.
/// This method does NOT perform a consistency check.
pub async fn assemble_file(&self, file_hash: &blake3::Hash, chunked_file: &ChunkedFile, file_name: Option<String>) -> Result<PathBuf> {
info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash);
pub async fn assemble_file(
&self,
file_hash: &blake3::Hash,
chunked_file: &ChunkedFile,
file_name: Option<String>,
) -> Result<PathBuf> {
let file_hash_str = hash_to_string(file_hash);
info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash_str);
let mut file_path = self.downloads_path.clone();
file_path.push(file_hash.to_hex().as_str());
file_path.push(&file_hash_str);
fs::create_dir_all(&file_path).await?;
file_path.push(file_name.unwrap_or(file_hash.to_hex().to_string()));
file_path.push(file_name.unwrap_or(file_hash_str));
let mut file_fd = File::create(&file_path).await?;
for (_, chunk_path) in chunked_file.iter() {
@@ -531,31 +568,15 @@ impl Geode {
while let Some(file) = dir.try_next().await? {
let os_file_name = file.file_name();
let file_name = os_file_name.to_string_lossy();
if let Ok(file_hash) = blake3::Hash::from_hex(file_name.to_string()) {
file_hashes.push(file_hash);
}
let file_name = os_file_name.to_string_lossy().to_string();
let mut hash_buf = [0u8; 32];
let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
Ok(_) => blake3::Hash::from_bytes(hash_buf),
Err(_) => continue,
};
file_hashes.push(file_hash);
}
Ok(file_hashes)
}
/// List chunk hashes.
pub async fn list_chunks(&self) -> Result<Vec<blake3::Hash>> {
info!(target: "geode::list_chunks()", "[Geode] Listing chunks");
let mut dir = fs::read_dir(&self.chunks_path).await?;
let mut chunk_hashes = vec![];
while let Some(chunk) = dir.try_next().await? {
let os_file_name = chunk.file_name();
let file_name = os_file_name.to_string_lossy();
if let Ok(chunk_hash) = blake3::Hash::from_hex(file_name.to_string()) {
chunk_hashes.push(chunk_hash);
}
}
Ok(chunk_hashes)
}
}