From f630e1cd7c55a06e8f383b8d4a33c11dcad32094 Mon Sep 17 00:00:00 2001 From: darkfi Date: Sun, 23 Mar 2025 16:19:20 +0100 Subject: [PATCH] fud: remove chunk seeders, change file hash, replace hex with bs58 --- Cargo.lock | 1 + Cargo.toml | 1 + bin/fud/fu/src/main.rs | 15 +- bin/fud/fud/Cargo.toml | 1 + bin/fud/fud/src/dht.rs | 4 +- bin/fud/fud/src/main.rs | 295 ++++++++++++++++++++++++++++++--------- bin/fud/fud/src/proto.rs | 27 +++- bin/fud/fud/src/rpc.rs | 85 ++++------- bin/fud/fud/src/tasks.rs | 95 +++---------- src/geode/mod.rs | 119 +++++++++------- 10 files changed, 376 insertions(+), 267 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a319d0c2..ee88e0d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,6 +3400,7 @@ version = "0.4.1" dependencies = [ "async-trait", "blake3 1.6.0", + "bs58", "darkfi", "darkfi-serial", "easy-parallel", diff --git a/Cargo.toml b/Cargo.toml index c7b7daa16..b29cdcd5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -171,6 +171,7 @@ validator = [ geode = [ "blake3", + "bs58", "futures", "smol", ] diff --git a/bin/fud/fu/src/main.rs b/bin/fud/fu/src/main.rs index 50431f347..c12a6af23 100644 --- a/bin/fud/fu/src/main.rs +++ b/bin/fud/fu/src/main.rs @@ -172,20 +172,11 @@ impl Fu { return Err(Error::Custom(format!("Could not find file {}", file_hash))); } "chunk_not_found" => { - let info = params - .get("info") - .unwrap() - .get::>() - .unwrap(); - let chunk_hash = - info.get("chunk_hash").unwrap().get::().unwrap(); - println!(); - return Err(Error::Custom(format!( - "Could not find chunk {}", - chunk_hash - ))); + // A seeder does not have a chunk we are looking for, + // we will try another seeder so there is nothing to do } "missing_chunks" => { + // We tried all seeders and some chunks are still missing println!(); return Err(Error::Custom("Missing chunks".to_string())); } diff --git a/bin/fud/fud/Cargo.toml b/bin/fud/fud/Cargo.toml index 178dbabb6..25a32d4d5 100644 --- a/bin/fud/fud/Cargo.toml +++ b/bin/fud/fud/Cargo.toml @@ -15,6 +15,7 @@ darkfi-serial = {version = "0.4.2", features = ["hash"]} # Misc async-trait = "0.1.86" blake3 = "1.6.0" +bs58 = "0.5.1" rand = "0.8.5" log = "0.4.26" tinyjson = "2.5.1" diff --git a/bin/fud/fud/src/dht.rs b/bin/fud/fud/src/dht.rs index 8fed57e28..5e2ad8a39 100644 --- a/bin/fud/fud/src/dht.rs +++ b/bin/fud/fud/src/dht.rs @@ -31,7 +31,7 @@ use darkfi::{ }; use darkfi_serial::{SerialDecodable, SerialEncodable}; use futures::future::join_all; -use log::{debug, error}; +use log::{debug, error, warn}; use num_bigint::BigUint; use smol::lock::RwLock; use url::Url; @@ -495,6 +495,7 @@ pub trait DhtHandler { let connector = Connector::new(self.dht().p2p.settings(), session_weak); let connect_res = connector.connect(&addr).await; if connect_res.is_err() { + warn!(target: "dht::DhtHandler::get_channel()", "Error while connecting to {}: {}", addr, connect_res.unwrap_err()); continue; } let (_, channel) = connect_res.unwrap(); @@ -502,6 +503,7 @@ pub trait DhtHandler { session_out.register_channel(channel.clone(), self.dht().executor.clone()).await; if register_res.is_err() { channel.clone().stop().await; + warn!(target: "dht::DhtHandler::get_channel()", "Error while registering channel {}: {}", channel.info.id, register_res.unwrap_err()); continue; } diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index e0dfa4cc0..e0ecf3b28 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -24,6 +24,7 @@ use std::{ }; use num_bigint::BigUint; +use rpc::{ChunkDownloadCompleted, ChunkNotFound}; use tasks::FetchReply; use crate::rpc::FudEvent; @@ -31,7 +32,7 @@ use async_trait::async_trait; use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr}; use futures::{future::FutureExt, pin_mut, select}; use log::{debug, error, info, warn}; -use rand::{rngs::OsRng, RngCore}; +use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore}; use smol::{ channel, fs::{File, OpenOptions}, @@ -44,7 +45,7 @@ use structopt_toml::{structopt::StructOpt, StructOptToml}; use darkfi::{ async_daemonize, cli_desc, - geode::Geode, + geode::{hash_to_string, Geode}, net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr}, rpc::{ jsonrpc::JsonSubscriber, @@ -122,10 +123,6 @@ pub struct Fud { file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>, file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>, file_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>, - chunk_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>, - chunk_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>, - chunk_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>, - chunk_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>, rpc_connections: Mutex>, @@ -168,7 +165,7 @@ impl DhtHandler for Fud { // TODO: Optimize this async fn on_new_node(&self, node: &DhtNode) -> Result<()> { - debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", node.id); + debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id)); // If this is the first node we know about, then bootstrap if !self.dht().is_bootstrapped().await { @@ -176,7 +173,7 @@ impl DhtHandler for Fud { // Lookup our own node id let self_node = self.dht().node.clone(); - debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", self_node.id); + debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self_node.id)); let _ = self.lookup_nodes(&self_node.id).await; } @@ -200,7 +197,7 @@ impl DhtHandler for Fud { } async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result> { - debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", key, node.id); + debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id)); let channel = self.get_channel(node).await?; let msg_subsystem = channel.message_subsystem(); @@ -219,15 +216,14 @@ impl DhtHandler for Fud { } impl Fud { - /// Add ourselves to `seeders_router` for the files and chunks we already have. + /// Add ourselves to `seeders_router` for the files we already have. /// Skipped if we have no external address. async fn init(&self) -> Result<()> { if self.dht().node.clone().addresses.is_empty() { return Ok(()); } let self_router_items: Vec = vec![self.dht().node.clone().into()]; - let mut hashes = self.geode.list_chunks().await?; - hashes.extend(self.geode.list_files().await?); + let hashes = self.geode.list_files().await?; for hash in hashes { self.add_to_router(self.seeders_router.clone(), &hash, self_router_items.clone()).await; @@ -236,13 +232,186 @@ impl Fud { Ok(()) } - /// Fetch a file or chunk from the network + /// Query nodes close to `key` to find the seeders + async fn fetch_seeders(&self, key: blake3::Hash) -> HashSet { + let closest_nodes = self.lookup_nodes(&key).await; // Find the `k` closest nodes + if closest_nodes.is_err() { + return HashSet::new(); + } + + let mut seeders: HashSet = HashSet::new(); + + for node in closest_nodes.unwrap() { + let channel = match self.get_channel(&node).await { + Ok(channel) => channel, + Err(e) => { + warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e); + continue; + } + }; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let msg_subscriber = match channel.subscribe_msg::().await { + Ok(msg_subscriber) => msg_subscriber, + Err(e) => { + warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {}", e); + continue; + } + }; + + let send_res = channel.send(&FudFindSeedersRequest { key }).await; + if let Err(e) = send_res { + warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {}", e); + msg_subscriber.unsubscribe().await; + continue; + } + + let reply = match msg_subscriber.receive_with_timeout(self.dht().timeout).await { + Ok(reply) => reply, + Err(e) => { + warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {}", e); + msg_subscriber.unsubscribe().await; + continue; + } + }; + + msg_subscriber.unsubscribe().await; + + seeders.extend(reply.seeders.clone()); + } + + info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(&key)); + seeders + } + + /// Fetch chunks for a file from `seeders` + async fn fetch_chunks( + &self, + file_hash: &blake3::Hash, + chunk_hashes: &HashSet, + seeders: &HashSet, + ) { + let mut remaining_chunks = chunk_hashes.clone(); + let mut shuffled_seeders = { + let mut vec: Vec<_> = seeders.iter().cloned().collect(); + vec.shuffle(&mut OsRng); + vec + }; + + while let Some(seeder) = shuffled_seeders.pop() { + let channel = match self.get_channel(&seeder.node).await { + Ok(channel) => channel, + Err(e) => { + warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {}", hash_to_string(&seeder.node.id), e); + continue; + } + }; + info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id)); + loop { + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + let msg_subscriber_chunk = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_notfound = channel.subscribe_msg::().await.unwrap(); + + let mut chunks_to_query = remaining_chunks.clone(); + + // Select a chunk to request + let mut chunk_hash: Option = None; + if let Some(&random_chunk) = chunks_to_query.iter().choose(&mut OsRng) { + chunk_hash = Some(random_chunk); + } + + if chunk_hash.is_none() { + // No more chunks to request from this seeder + break; // Switch to another seeder + } + let chunk_hash = chunk_hash.unwrap(); + + let send_res = channel.send(&FudFindRequest { key: chunk_hash }).await; + if let Err(e) = send_res { + warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {}", e); + break; // Switch to another seeder + } + + let chunk_recv = + msg_subscriber_chunk.receive_with_timeout(self.dht().timeout).fuse(); + let notfound_recv = + msg_subscriber_notfound.receive_with_timeout(self.dht().timeout).fuse(); + + pin_mut!(chunk_recv, notfound_recv); + + // Wait for a FudChunkReply or FudNotFound + select! { + chunk_reply = chunk_recv => { + if let Err(e) = chunk_reply { + warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {}", e); + break; // Switch to another seeder + } + chunks_to_query.remove(&chunk_hash); + let reply = chunk_reply.unwrap(); + + match self.geode.insert_chunk(&reply.chunk).await { + Ok(inserted_hash) => { + if inserted_hash != chunk_hash { + warn!("Received chunk does not match requested chunk"); + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + continue; // Skip to next chunk, will retry this chunk later + } + + info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); + self.download_publisher + .notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted { + file_hash: *file_hash, + chunk_hash, + })) + .await; + remaining_chunks.remove(&chunk_hash); + } + Err(e) => { + error!("Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e); + } + }; + } + notfound_reply = notfound_recv => { + if let Err(e) = notfound_reply { + warn!(target: "fud::fetch_chunks()", "Error waiting for NOTFOUND reply: {}", e); + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + break; // Switch to another seeder + } + info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); + self.download_publisher + .notify(FudEvent::ChunkNotFound(ChunkNotFound { + file_hash: *file_hash, + chunk_hash, + })) + .await; + chunks_to_query.remove(&chunk_hash); + } + }; + + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + } + + // Stop when there are no missing chunks + if remaining_chunks.is_empty() { + break; + } + } + } + + /// Fetch a single file metadata from the network. + /// If the file is smaller than a single chunk then the chunk is returned. /// 1. Lookup nodes close to the key - /// 2. Request seeders for the file/chunk from those nodes - /// 3. Request the file/chunk from the seeders - async fn fetch(&self, key: blake3::Hash) -> Option { + /// 2. Request seeders for the file from those nodes + /// 3. Request the file from the seeders + async fn fetch_file_metadata(&self, file_hash: blake3::Hash) -> Option { let mut queried_seeders: HashSet = HashSet::new(); - let closest_nodes = self.lookup_nodes(&key).await; // 1 + let closest_nodes = self.lookup_nodes(&file_hash).await; // 1 let mut result: Option = None; if closest_nodes.is_err() { return None @@ -253,7 +422,7 @@ impl Fud { let channel = match self.get_channel(&node).await { Ok(channel) => channel, Err(e) => { - warn!(target: "fud::fetch()", "Could not get a channel for node {}: {}", node.id, e); + warn!(target: "fud::fetch_file_metadata()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e); continue; } }; @@ -263,14 +432,14 @@ impl Fud { let msg_subscriber = match channel.subscribe_msg::().await { Ok(msg_subscriber) => msg_subscriber, Err(e) => { - warn!(target: "fud::fetch()", "Error subscribing to msg: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error subscribing to msg: {}", e); continue; } }; - let send_res = channel.send(&FudFindSeedersRequest { key }).await; + let send_res = channel.send(&FudFindSeedersRequest { key: file_hash }).await; if let Err(e) = send_res { - warn!(target: "fud::fetch()", "Error while sending FudFindSeedersRequest: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error while sending FudFindSeedersRequest: {}", e); msg_subscriber.unsubscribe().await; continue; } @@ -278,13 +447,14 @@ impl Fud { let reply = match msg_subscriber.receive_with_timeout(self.dht().timeout).await { Ok(reply) => reply, Err(e) => { - warn!(target: "fud::fetch()", "Error waiting for reply: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error waiting for reply: {}", e); + msg_subscriber.unsubscribe().await; continue; } }; let mut seeders = reply.seeders.clone(); - info!(target: "fud::fetch()", "Found {} seeders for {}", seeders.len(), key); + info!(target: "fud::fetch_file_metadata()", "Found {} seeders for {}", seeders.len(), hash_to_string(&file_hash)); msg_subscriber.unsubscribe().await; @@ -308,9 +478,9 @@ impl Fud { let msg_subscriber_notfound = channel.subscribe_msg::().await.unwrap(); - let send_res = channel.send(&FudFindRequest { key }).await; + let send_res = channel.send(&FudFindRequest { key: file_hash }).await; if let Err(e) = send_res { - warn!(target: "fud::fetch()", "Error while sending FudFindRequest: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error while sending FudFindRequest: {}", e); msg_subscriber_chunk.unsubscribe().await; msg_subscriber_file.unsubscribe().await; msg_subscriber_notfound.unsubscribe().await; @@ -326,43 +496,55 @@ impl Fud { pin_mut!(chunk_recv, file_recv, notfound_recv); + let cleanup = async || { + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_file.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + }; + // Wait for a FudChunkReply, FudFileReply, or FudNotFound select! { + // Received a chunk while requesting a file, this is allowed to + // optimize fetching files smaller than a single chunk chunk_reply = chunk_recv => { + cleanup().await; if let Err(e) = chunk_reply { - warn!(target: "fud::fetch()", "Error waiting for chunk reply: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error waiting for chunk reply: {}", e); continue; } let reply = chunk_reply.unwrap(); - info!(target: "fud::fetch()", "Received chunk {} from seeder {}", key, seeder.node.id.to_hex().to_string()); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_file.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; + let chunk_hash = blake3::hash(&reply.chunk); + // Check that this is the only chunk in the file + if !self.geode.verify_file(&file_hash, &[chunk_hash]) { + warn!(target: "fud::fetch_file_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash"); + continue; + } + info!(target: "fud::fetch_file_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&file_hash), hash_to_string(&seeder.node.id)); result = Some(FetchReply::Chunk((*reply).clone())); break; } file_reply = file_recv => { + cleanup().await; if let Err(e) = file_reply { - warn!(target: "fud::fetch()", "Error waiting for file reply: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error waiting for file reply: {}", e); continue; } let reply = file_reply.unwrap(); - info!(target: "fud::fetch()", "Received file {} from seeder {}", key, seeder.node.id.to_hex().to_string()); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_file.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; + if !self.geode.verify_file(&file_hash, &reply.chunk_hashes) { + warn!(target: "fud::fetch_file_metadata()", "Received invalid file metadata"); + continue; + } + info!(target: "fud::fetch_file_metadata()", "Received file {} from seeder {}", hash_to_string(&file_hash), hash_to_string(&seeder.node.id)); result = Some(FetchReply::File((*reply).clone())); break; } notfound_reply = notfound_recv => { + cleanup().await; if let Err(e) = notfound_reply { - warn!(target: "fud::fetch()", "Error waiting for NOTFOUND reply: {}", e); + warn!(target: "fud::fetch_file_metadata()", "Error waiting for NOTFOUND reply: {}", e); continue; } - info!(target: "fud::fetch()", "Received NOTFOUND {} from seeder {}", key, seeder.node.id.to_hex().to_string()); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_file.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; + info!(target: "fud::fetch_file_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(&file_hash), hash_to_string(&seeder.node.id)); } }; } @@ -438,15 +620,17 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { Ok(mut file) => { let mut buffer = Vec::new(); file.read_to_end(&mut buffer).await?; - let buf: [u8; 64] = buffer.try_into().expect("Node ID must have 64 characters"); - let node_id = blake3::Hash::from_hex(buf)?; + let buf: [u8; 44] = buffer.try_into().expect("Node ID must have 44 characters"); + let mut out_buf = [0u8; 32]; + bs58::decode(buf).onto(&mut out_buf)?; + let node_id = blake3::Hash::from_bytes(out_buf); Ok(node_id) } Err(e) if e.kind() == ErrorKind::NotFound => { let node_id = generate_node_id()?; let mut file = OpenOptions::new().write(true).create(true).open(node_id_path).await?; - file.write_all(node_id.to_hex().as_bytes()).await?; + file.write_all(&bs58::encode(node_id.as_bytes()).into_vec()).await?; file.flush().await?; Ok(node_id) } @@ -456,15 +640,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let node_id_ = node_id?; - info!(target: "fud", "Your node ID: {}", node_id_); + info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id_)); // Daemon instantiation let download_sub = JsonSubscriber::new("get"); let (get_tx, get_rx) = smol::channel::unbounded(); let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded(); let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded(); - let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded(); - let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded(); // TODO: Add DHT settings in the config file let dht = Arc::new(Dht::new(&node_id_, 4, 16, 60, p2p.clone(), ex.clone()).await); let fud = Arc::new(Fud { @@ -478,10 +660,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { file_fetch_rx, file_fetch_end_tx, file_fetch_end_rx, - chunk_fetch_tx, - chunk_fetch_rx, - chunk_fetch_end_tx, - chunk_fetch_end_rx, rpc_connections: Mutex::new(HashSet::new()), dnet_sub, download_sub: download_sub.clone(), @@ -526,20 +704,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); - info!(target: "fud", "Starting fetch chunk task"); - let chunk_task = StoppableTask::new(); - chunk_task.clone().start( - tasks::fetch_chunk_task(fud.clone()), - |res| async { - match res { - Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } - Err(e) => error!(target: "fud", "Failed starting fetch chunk task: {}", e), - } - }, - Error::DetachedTaskStopped, - ex.clone(), - ); - info!(target: "fud", "Starting get task"); let get_task_ = StoppableTask::new(); get_task_.clone().start( @@ -643,9 +807,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "fud", "Stopping fetch file task..."); file_task.stop().await; - info!(target: "fud", "Stopping fetch chunk task..."); - chunk_task.stop().await; - info!(target: "fud", "Stopping get task..."); get_task_.stop().await; diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 898a5be86..398134f45 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; use darkfi::{ - geode::{read_until_filled, MAX_CHUNK_SIZE}, + geode::{hash_to_string, read_until_filled, MAX_CHUNK_SIZE}, impl_p2p_message, net::{ metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, @@ -227,6 +227,23 @@ impl ProtocolFud { // TODO: Run geode GC if let Ok(chunked_file) = file_res { + // If the file has a single chunk, just reply with the chunk + if chunked_file.len() == 1 { + let chunk_res = + self.fud.geode.get_chunk(&chunked_file.iter().next().unwrap().0).await; + if let Ok(chunk_path) = chunk_res { + let mut buf = vec![0u8; MAX_CHUNK_SIZE]; + let mut chunk_fd = File::open(&chunk_path).await.unwrap(); + let bytes_read = + read_until_filled(&mut chunk_fd, &mut buf).await.unwrap(); + let chunk_slice = &buf[..bytes_read]; + let reply = FudChunkReply { chunk: chunk_slice.to_vec() }; + info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk (file has a single chunk)"); + let _ = self.channel.send(&reply).await; + continue; + } + } + // Otherwise reply with the file metadata let reply = FudFileReply { chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(), }; @@ -237,7 +254,7 @@ impl ProtocolFud { } let reply = FudNotFound {}; - info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", request.key.to_hex().to_string()); + info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key)); let _ = self.channel.send(&reply).await; } } @@ -254,7 +271,7 @@ impl ProtocolFud { continue } }; - info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", &request.key); + info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key)); let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { @@ -283,7 +300,7 @@ impl ProtocolFud { continue } }; - info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", &request.key); + info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key)); let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { @@ -319,7 +336,7 @@ impl ProtocolFud { continue } }; - info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", &request.key); + info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key)); let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs index 515f3de1f..27aef12a8 100644 --- a/bin/fud/fud/src/rpc.rs +++ b/bin/fud/fud/src/rpc.rs @@ -24,6 +24,7 @@ use std::{ use crate::{dht::DhtHandler, proto::FudAnnounce, Fud}; use async_trait::async_trait; use darkfi::{ + geode::hash_to_string, rpc::{ jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, p2p_method::HandlerP2p, @@ -96,7 +97,7 @@ impl Fud { } }; - let (file_hash, chunk_hashes) = match self.geode.insert(fd).await { + let (file_hash, _) = match self.geode.insert(fd).await { Ok(v) => v, Err(e) => { error!(target: "fud::put()", "Failed inserting file {:?} to geode: {}", path, e); @@ -109,14 +110,7 @@ impl Fud { let fud_announce = FudAnnounce { key: file_hash, seeders: vec![self_node.clone().into()] }; let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await; - // Announce chunks - for chunk_hash in chunk_hashes { - let fud_announce = - FudAnnounce { key: chunk_hash, seeders: vec![self_node.clone().into()] }; - let _ = self.announce(&chunk_hash, &fud_announce, self.seeders_router.clone()).await; - } - - JsonResponse::new(JsonValue::String(file_hash.to_hex().to_string()), id).into() + JsonResponse::new(JsonValue::String(hash_to_string(&file_hash)), id).into() } // RPCAPI: @@ -138,10 +132,13 @@ impl Fud { None => None, }; - let file_hash = match blake3::Hash::from_hex(params[0].get::().unwrap()) { - Ok(v) => v, + let mut hash_buf = [0u8; 32]; + match bs58::decode(params[0].get::().unwrap().as_str()).onto(&mut hash_buf) { + Ok(_) => {} Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(), - }; + } + + let file_hash = blake3::Hash::from_bytes(hash_buf); let _ = self.get_tx.send((id, file_hash, file_name, Ok(()))).await; @@ -207,7 +204,7 @@ impl Fud { addresses.push(JsonValue::String(addr.to_string())); } nodes.push(JsonValue::Array(vec![ - JsonValue::String(node.id.to_hex().to_string()), + JsonValue::String(hash_to_string(&node.id)), JsonValue::Array(addresses), ])); } @@ -231,9 +228,9 @@ impl Fud { for (hash, items) in self.seeders_router.read().await.iter() { let mut node_ids = vec![]; for item in items { - node_ids.push(JsonValue::String(item.node.id.to_hex().to_string())); + node_ids.push(JsonValue::String(hash_to_string(&item.node.id))); } - seeders_router.insert(hash.to_hex().to_string(), JsonValue::Array(node_ids)); + seeders_router.insert(hash_to_string(hash), JsonValue::Array(node_ids)); } let mut res: HashMap = HashMap::new(); res.insert("seeders".to_string(), JsonValue::Object(seeders_router)); @@ -282,15 +279,15 @@ pub enum FudEvent { impl From for JsonValue { fn from(info: ChunkDownloadCompleted) -> JsonValue { json_map([ - ("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())), - ("chunk_hash", JsonValue::String(info.chunk_hash.to_hex().to_string())), + ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), + ("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))), ]) } } impl From for JsonValue { fn from(info: FileDownloadCompleted) -> JsonValue { json_map([ - ("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())), + ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), ("chunk_count", JsonValue::Number(info.chunk_count as f64)), ]) } @@ -298,7 +295,7 @@ impl From for JsonValue { impl From for JsonValue { fn from(info: DownloadCompleted) -> JsonValue { json_map([ - ("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())), + ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), ("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())), ]) } @@ -306,14 +303,14 @@ impl From for JsonValue { impl From for JsonValue { fn from(info: ChunkNotFound) -> JsonValue { json_map([ - ("file_hash", JsonValue::String(info.file_hash.to_hex().to_string())), - ("chunk_hash", JsonValue::String(info.chunk_hash.to_hex().to_string())), + ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), + ("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))), ]) } } impl From for JsonValue { fn from(info: FileNotFound) -> JsonValue { - json_map([("file_hash", JsonValue::String(info.file_hash.to_hex().to_string()))]) + json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))]) } } impl From for JsonValue { @@ -396,11 +393,13 @@ impl Fud { }; } - // Fetch any missing chunks - let mut missing_chunks = vec![]; + let seeders = self.fetch_seeders(file_hash).await; + + // List missing chunks + let mut missing_chunks = HashSet::new(); for (chunk, path) in chunked_file.iter() { if path.is_none() { - missing_chunks.push(*chunk); + missing_chunks.insert(*chunk); } else { self.download_publisher .notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted { @@ -411,44 +410,16 @@ impl Fud { } } - for chunk in missing_chunks { - self.chunk_fetch_tx.send((chunk, Ok(()))).await.unwrap(); - let (i_chunk_hash, status) = self.chunk_fetch_end_rx.recv().await.unwrap(); - - match status { - Ok(()) => { - self.download_publisher - .notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted { - file_hash, - chunk_hash: i_chunk_hash, - })) - .await; - let self_announce = - FudAnnounce { key: i_chunk_hash, seeders: vec![self_node.clone().into()] }; - let _ = self - .announce(&i_chunk_hash, &self_announce, self.seeders_router.clone()) - .await; - } - Err(Error::GeodeChunkRouteNotFound) => { - self.download_publisher - .notify(FudEvent::ChunkNotFound(ChunkNotFound { - file_hash, - chunk_hash: i_chunk_hash, - })) - .await; - return; - } - - Err(e) => panic!("{}", e), - }; - } + // Fetch missing chunks from seeders + self.fetch_chunks(&file_hash, &missing_chunks, &seeders).await; let chunked_file = match self.geode.get(&file_hash).await { Ok(v) => v, Err(e) => panic!("{}", e), }; - // We fetched all chunks, but the file is not complete? + // We fetched all chunks, but the file is not complete + // (some chunks were missing from all seeders) if !chunked_file.is_complete() { self.download_publisher.notify(FudEvent::MissingChunks(MissingChunks {})).await; return; diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs index 3aa2da8d8..878f2c9c2 100644 --- a/bin/fud/fud/src/tasks.rs +++ b/bin/fud/fud/src/tasks.rs @@ -18,14 +18,14 @@ use std::sync::Arc; -use darkfi::{system::sleep, Error, Result}; +use darkfi::{geode::hash_to_string, system::sleep, Error, Result}; use crate::{ dht::DhtHandler, proto::{FudAnnounce, FudChunkReply, FudFileReply}, Fud, }; -use log::{error, info, warn}; +use log::{error, info}; /// Triggered when calling the `get` RPC method pub async fn get_task(fud: Arc) -> Result<()> { @@ -48,32 +48,36 @@ pub async fn fetch_file_task(fud: Arc) -> Result<()> { info!(target: "fud::fetch_file_task()", "Started background file fetch task"); loop { let (file_hash, _) = fud.file_fetch_rx.recv().await.unwrap(); - info!(target: "fud::fetch_file_task()", "Fetching file {}", file_hash); + info!(target: "fud::fetch_file_task()", "Fetching file {}", hash_to_string(&file_hash)); - let result = fud.fetch(file_hash).await; + let result = fud.fetch_file_metadata(file_hash).await; match result { Some(reply) => { match reply { FetchReply::File(FudFileReply { chunk_hashes }) => { if let Err(e) = fud.geode.insert_file(&file_hash, &chunk_hashes).await { - error!("Failed inserting file {} to Geode: {}", file_hash, e); + error!( + "Failed inserting file {} to Geode: {}", + hash_to_string(&file_hash), + e + ); } fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); } - // Looked for a file but got a chunk, meaning that file_hash = chunk_hash, the file fits in a single chunk + // Looked for a file but got a chunk: the entire file fits in a single chunk FetchReply::Chunk(FudChunkReply { chunk }) => { - // TODO: Verify chunk - info!(target: "fud::fetch()", "File fits in a single chunk"); - let _ = fud.geode.insert_file(&file_hash, &[file_hash]).await; + info!(target: "fud::fetch_file_task()", "File fits in a single chunk"); + let chunk_hash = blake3::hash(&chunk); + let _ = fud.geode.insert_file(&file_hash, &[chunk_hash]).await; match fud.geode.insert_chunk(&chunk).await { - Ok(inserted_hash) => { - if inserted_hash != file_hash { - warn!("Received chunk does not match requested file"); - } - } + Ok(_) => {} Err(e) => { - error!("Failed inserting chunk {} to Geode: {}", file_hash, e); + error!( + "Failed inserting chunk {} to Geode: {}", + hash_to_string(&file_hash), + e + ); } }; fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); @@ -90,53 +94,6 @@ pub async fn fetch_file_task(fud: Arc) -> Result<()> { } } -/// Background task that receives chunk fetch requests and tries to -/// fetch objects from the network using the routing table. -/// TODO: This can be optimised a lot for connection reuse, etc. -pub async fn fetch_chunk_task(fud: Arc) -> Result<()> { - info!(target: "fud::fetch_chunk_task()", "Started background chunk fetch task"); - loop { - let (chunk_hash, _) = fud.chunk_fetch_rx.recv().await.unwrap(); - info!(target: "fud::fetch_chunk_task()", "Fetching chunk {}", chunk_hash); - - let result = fud.fetch(chunk_hash).await; - - match result { - Some(reply) => { - match reply { - FetchReply::Chunk(FudChunkReply { chunk }) => { - // TODO: Verify chunk - match fud.geode.insert_chunk(&chunk).await { - Ok(inserted_hash) => { - if inserted_hash != chunk_hash { - warn!("Received chunk does not match requested chunk"); - } - } - Err(e) => { - error!("Failed inserting chunk {} to Geode: {}", chunk_hash, e); - } - }; - fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap(); - } - _ => { - // Looked for a chunk but got a file instead, not supposed to happen - fud.chunk_fetch_end_tx - .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) - .await - .unwrap(); - } - } - } - None => { - fud.chunk_fetch_end_tx - .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) - .await - .unwrap(); - } - }; - } -} - /// Background task that removes seeders that did not announce a file/chunk /// for more than an hour. pub async fn prune_seeders_task(fud: Arc) -> Result<()> { @@ -155,20 +112,6 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { let seeders = vec![fud.dht().node.clone().into()]; - info!(target: "fud::announce_task()", "Announcing chunks..."); - let chunk_hashes = fud.geode.list_chunks().await; - if let Ok(chunks) = chunk_hashes { - for chunk in chunks { - let _ = fud - .announce( - &chunk, - &FudAnnounce { key: chunk, seeders: seeders.clone() }, - fud.seeders_router.clone(), - ) - .await; - } - } - info!(target: "fud::announce_task()", "Announcing files..."); let file_hashes = fud.geode.list_files().await; if let Ok(files) = file_hashes { diff --git a/src/geode/mod.rs b/src/geode/mod.rs index 1357b4934..fa0cb46de 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -81,6 +81,10 @@ const CHUNKS_PATH: &str = "chunks"; /// Path prefix where full files are stored const DOWNLOADS_PATH: &str = "downloads"; +pub fn hash_to_string(hash: &blake3::Hash) -> String { + bs58::encode(hash.as_bytes()).into_string() +} + /// `ChunkedFile` is a representation of a file we're trying to /// retrieve from `Geode`. /// @@ -176,7 +180,9 @@ impl Geode { let mut lines = BufReader::new(fd).lines(); while let Some(line) = lines.next().await { let line = line?; - let chunk_hash = blake3::Hash::from_hex(line)?; + let mut hash_buf = [0u8; 32]; + bs58::decode(line).onto(&mut hash_buf)?; + let chunk_hash = blake3::Hash::from_bytes(hash_buf); read_chunks.push(chunk_hash); } @@ -209,8 +215,9 @@ impl Geode { Some(v) => v, None => continue, }; - let chunk_hash = match blake3::Hash::from_hex(file_name) { - Ok(v) => v, + let mut hash_buf = [0u8; 32]; + let chunk_hash = match bs58::decode(file_name).onto(&mut hash_buf) { + Ok(_) => blake3::Hash::from_bytes(hash_buf), Err(_) => continue, }; @@ -270,8 +277,9 @@ impl Geode { Some(v) => v, None => continue, }; - let file_hash = match blake3::Hash::from_hex(file_name) { - Ok(v) => v, + let mut hash_buf = [0u8; 32]; + let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) { + Ok(_) => blake3::Hash::from_bytes(hash_buf), Err(_) => continue, }; @@ -298,7 +306,7 @@ impl Geode { /// Insert a file into Geode. The function expects any kind of byte stream, which /// can either be another file on the filesystem, a buffer, etc. /// Returns a tuple of `(blake3::Hash, Vec)` which represents the - /// file name, and the file's chunks, respectively. + /// file hash, and the file's chunks, respectively. pub async fn insert( &self, mut stream: impl AsyncRead + Unpin, @@ -316,7 +324,7 @@ impl Geode { let chunk_slice = &buf[..bytes_read]; let chunk_hash = blake3::hash(chunk_slice); - file_hasher.update(chunk_slice); + file_hasher.update(chunk_hash.as_bytes()); chunk_hashes.push(chunk_hash); // Write the chunk to a file, if necessary. We first perform @@ -324,7 +332,7 @@ impl Geode { // to perform a write, which is usually more expensive than // reading from disk. let mut chunk_path = self.chunks_path.clone(); - chunk_path.push(chunk_hash.to_hex().as_str()); + chunk_path.push(hash_to_string(&chunk_hash).as_str()); let chunk_fd = OpenOptions::new().read(true).write(true).create(true).open(&chunk_path).await?; @@ -361,15 +369,15 @@ impl Geode { buf = [0u8; MAX_CHUNK_SIZE]; } - // This hash is the file's chunks hashed in order. + // This hash is the file's chunks hashes hashed in order. let file_hash = file_hasher.finalize(); let mut file_path = self.files_path.clone(); - file_path.push(file_hash.to_hex().as_str()); + file_path.push(hash_to_string(&file_hash).as_str()); // We always overwrite the metadata. let mut file_fd = File::create(&file_path).await?; for ch in &chunk_hashes { - file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?; + file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?; } file_fd.flush().await?; @@ -379,6 +387,7 @@ impl Geode { /// Create and insert file metadata into Geode given a list of hashes. /// Always overwrites any existing file. + /// Verifies that the file hash matches the chunk hashes pub async fn insert_file( &self, file_hash: &blake3::Hash, @@ -386,12 +395,17 @@ impl Geode { ) -> Result<()> { info!(target: "geode::insert_file()", "[Geode] Inserting file metadata"); + if !self.verify_file(file_hash, chunk_hashes) { + // The chunk list or file hash is wrong + return Err(Error::GeodeNeedsGc) + } + let mut file_path = self.files_path.clone(); - file_path.push(file_hash.to_hex().as_str()); + file_path.push(hash_to_string(file_hash).as_str()); let mut file_fd = File::create(&file_path).await?; for ch in chunk_hashes { - file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?; + file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?; } file_fd.flush().await?; @@ -411,7 +425,7 @@ impl Geode { let chunk_hash = blake3::hash(chunk_slice); let mut chunk_path = self.chunks_path.clone(); - chunk_path.push(chunk_hash.to_hex().as_str()); + chunk_path.push(hash_to_string(&chunk_hash).as_str()); let mut chunk_fd = File::create(&chunk_path).await?; chunk_fd.write_all(chunk_slice).await?; chunk_fd.flush().await?; @@ -423,9 +437,10 @@ impl Geode { /// of chunks and optionally file paths to the said chunks. Returns an error if /// the read failed in any way (could also be the file does not exist). pub async fn get(&self, file_hash: &blake3::Hash) -> Result { - info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash); + let file_hash_str = hash_to_string(file_hash); + info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash_str); let mut file_path = self.files_path.clone(); - file_path.push(file_hash.to_hex().as_str()); + file_path.push(file_hash_str); // Try to read the file metadata. If it's corrupt, return an error signalling // that garbage collection needs to run. @@ -447,7 +462,7 @@ impl Geode { let mut buf = vec![]; for (chunk_hash, chunk_path) in chunked_file.0.iter_mut() { let mut c_path = self.chunks_path.clone(); - c_path.push(chunk_hash.to_hex().as_str()); + c_path.push(hash_to_string(chunk_hash).as_str()); if !c_path.exists() || !c_path.is_file() { // TODO: We should be aggressive here and remove the non-file. @@ -475,9 +490,10 @@ impl Geode { /// Fetch a single chunk from Geode. Returns a `PathBuf` pointing to the chunk /// if it is found. pub async fn get_chunk(&self, chunk_hash: &blake3::Hash) -> Result { - info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", chunk_hash); + let chunk_hash_str = hash_to_string(chunk_hash); + info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", chunk_hash_str); let mut chunk_path = self.chunks_path.clone(); - chunk_path.push(chunk_hash.to_hex().as_str()); + chunk_path.push(chunk_hash_str); if !chunk_path.exists() || !chunk_path.is_file() { // TODO: We should be aggressive here and remove the non-file. @@ -488,9 +504,7 @@ impl Geode { let mut buf = vec![]; let mut chunk_fd = File::open(&chunk_path).await?; let bytes_read = chunk_fd.read_to_end(&mut buf).await?; - let chunk_slice = &buf[..bytes_read]; - let hashed_chunk = blake3::hash(chunk_slice); - if &hashed_chunk != chunk_hash { + if !self.verify_chunk(chunk_hash, &buf[..bytes_read]) { // The chunk is corrupted return Err(Error::GeodeNeedsGc) } @@ -498,15 +512,38 @@ impl Geode { Ok(chunk_path) } + /// Verifies that the file hash matches the chunk hashes. + pub fn verify_file(&self, file_hash: &blake3::Hash, chunk_hashes: &[blake3::Hash]) -> bool { + info!(target: "geode::verify_file()", "[Geode] Verifying file metadata"); + + let mut file_hasher = blake3::Hasher::new(); + for chunk_hash in chunk_hashes { + file_hasher.update(chunk_hash.as_bytes()); + } + + *file_hash == file_hasher.finalize() + } + + /// Verifies that the chunk hash matches the content. + pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool { + blake3::hash(chunk_slice) == *chunk_hash + } + /// Assemble chunks to create a file. /// This method does NOT perform a consistency check. - pub async fn assemble_file(&self, file_hash: &blake3::Hash, chunked_file: &ChunkedFile, file_name: Option) -> Result { - info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash); + pub async fn assemble_file( + &self, + file_hash: &blake3::Hash, + chunked_file: &ChunkedFile, + file_name: Option, + ) -> Result { + let file_hash_str = hash_to_string(file_hash); + info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash_str); let mut file_path = self.downloads_path.clone(); - file_path.push(file_hash.to_hex().as_str()); + file_path.push(&file_hash_str); fs::create_dir_all(&file_path).await?; - file_path.push(file_name.unwrap_or(file_hash.to_hex().to_string())); + file_path.push(file_name.unwrap_or(file_hash_str)); let mut file_fd = File::create(&file_path).await?; for (_, chunk_path) in chunked_file.iter() { @@ -531,31 +568,15 @@ impl Geode { while let Some(file) = dir.try_next().await? { let os_file_name = file.file_name(); - let file_name = os_file_name.to_string_lossy(); - if let Ok(file_hash) = blake3::Hash::from_hex(file_name.to_string()) { - file_hashes.push(file_hash); - } + let file_name = os_file_name.to_string_lossy().to_string(); + let mut hash_buf = [0u8; 32]; + let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) { + Ok(_) => blake3::Hash::from_bytes(hash_buf), + Err(_) => continue, + }; + file_hashes.push(file_hash); } Ok(file_hashes) } - - /// List chunk hashes. - pub async fn list_chunks(&self) -> Result> { - info!(target: "geode::list_chunks()", "[Geode] Listing chunks"); - - let mut dir = fs::read_dir(&self.chunks_path).await?; - - let mut chunk_hashes = vec![]; - - while let Some(chunk) = dir.try_next().await? { - let os_file_name = chunk.file_name(); - let file_name = os_file_name.to_string_lossy(); - if let Ok(chunk_hash) = blake3::Hash::from_hex(file_name.to_string()) { - chunk_hashes.push(chunk_hash); - } - } - - Ok(chunk_hashes) - } }