From 94dc672a15d55da0fd4302cd4741cce857c31999 Mon Sep 17 00:00:00 2001 From: epiphany Date: Thu, 1 Jan 2026 18:14:18 +0000 Subject: [PATCH] geode, fud: replace `smol::fs::File` with `std::fs::File`, add `Chunk` struct --- bin/fud/fud/src/download.rs | 4 +-- bin/fud/fud/src/lib.rs | 62 +++++++++++++++++++++--------------- bin/fud/fud/src/proto.rs | 14 ++------ bin/fud/fud/src/resource.rs | 4 +-- src/geode/chunked_storage.rs | 56 +++++++++++++++++++++++++------- src/geode/file_sequence.rs | 56 ++++++++++++++++++++------------ src/geode/mod.rs | 8 ++--- 7 files changed, 128 insertions(+), 76 deletions(-) diff --git a/bin/fud/fud/src/download.rs b/bin/fud/fud/src/download.rs index 30a833302..9b8e89269 100644 --- a/bin/fud/fud/src/download.rs +++ b/bin/fud/fud/src/download.rs @@ -299,8 +299,8 @@ async fn handle_chunk_reply( // `total_bytes_size` (and `target_bytes_size`) again, // as `geode.write_chunk()` updated the FileSequence // to the exact file size. - if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() { - if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash { + if let Some(last_chunk) = ctx.chunked.iter().last() { + if matches!(resource.rtype, ResourceType::File) && last_chunk.hash == *chunk_hash { resource.total_bytes_size = ctx.chunked.get_fileseq().len(); resource.target_bytes_size = resource.total_bytes_size; } diff --git a/bin/fud/fud/src/lib.rs b/bin/fud/fud/src/lib.rs index dcf7451e3..67ecca1e5 100644 --- a/bin/fud/fud/src/lib.rs +++ b/bin/fud/fud/src/lib.rs @@ -33,7 +33,7 @@ use tracing::{error, info, warn}; use darkfi::{ dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings}, - geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE}, + geode::{hash_to_string, Chunk, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE}, net::P2pPtr, system::{ExecutorPtr, PublisherPtr, StoppableTask}, util::{path::expand_path, time::Timestamp}, @@ -476,7 +476,7 @@ impl Fud { resource.target_chunks_downloaded = match chunked { Some(chunked) => chunked .iter() - .filter(|(hash, available)| chunk_hashes.contains(hash) && *available) + .filter(|chunk| chunk_hashes.contains(&chunk.hash) && chunk.available) .count() as u64, None => 0, }; @@ -775,12 +775,12 @@ impl Fud { } // Set of all chunks we need locally and their current availability - let chunks: HashSet<(blake3::Hash, bool)> = - chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect(); + let chunks: HashSet = + chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).cloned().collect(); // Set of the chunks we need to download let mut missing_chunks: HashSet = - chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect(); + chunks.iter().filter(|&c| !c.available).map(|c| c.hash).collect(); // Update the resource with the chunks/bytes counts update_resource!(hash, { @@ -858,10 +858,8 @@ impl Fud { // Verify all chunks self.verify_chunks(&resource, &mut chunked).await?; - let is_complete = chunked - .iter() - .filter(|(hash, _)| chunk_hashes.contains(hash)) - .all(|(_, available)| *available); + let is_complete = + chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).all(|c| c.available); // We fetched all chunks, but the resource is not complete // (some chunks were missing from all seeders) @@ -953,9 +951,9 @@ impl Fud { let mut bytes: HashMap = HashMap::new(); // Gather all available chunks - for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() { + for (chunk_index, chunk) in chunks.iter().enumerate() { // Read the chunk using the `FileSequence` - let chunk = + let chunk_data = match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await { Ok(c) => c, Err(Error::Io(ErrorKind::NotFound)) => continue, @@ -966,23 +964,34 @@ impl Fud { }; // Perform chunk consistency check - if self.geode.verify_chunk(chunk_hash, &chunk) { - chunked.get_chunk_mut(chunk_index).1 = true; + if self.geode.verify_chunk(&chunk.hash, &chunk_data) { + chunked.get_chunk_mut(chunk_index).available = true; + chunked.get_chunk_mut(chunk_index).size = chunk_data.len(); bytes.insert( - *chunk_hash, - (chunk.len(), resource.get_selected_bytes(chunked, &chunk)), + chunk.hash, + ( + chunk_data.len(), + resource.get_bytes_of_selection( + chunked, + file_selection, + &chunk.hash, + chunk_data.len(), + ), + ), ); + } else { + chunked.get_chunk_mut(chunk_index).available = false; } } // Look for the chunks that are not on the filesystem let chunks = chunked.get_chunks().clone(); let missing_on_fs: Vec<_> = - chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect(); + chunks.iter().enumerate().filter(|(_, c)| !c.available).collect(); // Look for scraps - for (chunk_index, (chunk_hash, _)) in missing_on_fs { - let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?; + for (chunk_index, chunk) in missing_on_fs { + let scrap = self.scrap_tree.get(chunk.hash.as_bytes())?; if scrap.is_none() { continue; } @@ -993,7 +1002,7 @@ impl Fud { continue; } let scrap: Scrap = scrap.unwrap(); - if blake3::hash(&scrap.chunk) != *chunk_hash { + if blake3::hash(&scrap.chunk) != chunk.hash { continue; } @@ -1011,7 +1020,8 @@ impl Fud { } // Mark the chunk as available - chunked.get_chunk_mut(chunk_index).1 = true; + chunked.get_chunk_mut(chunk_index).available = true; + chunked.get_chunk_mut(chunk_index).size = scrap.chunk.len(); // Update the sums of locally available data bytes.insert( @@ -1023,9 +1033,11 @@ impl Fud { // If the resource is a file: make the `FileSequence`'s file the // exact file size if we know the last chunk's size. This is not // needed for directories. - if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() { - if !chunked.is_dir() && *last_chunk_available { - if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) { + let is_dir = chunked.is_dir(); + if let Some(last_chunk) = chunked.iter_mut().last() { + if !is_dir && last_chunk.available { + if let Some((last_chunk_size, _)) = bytes.get(&last_chunk.hash) { + last_chunk.size = *last_chunk_size; let exact_file_size = chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size); chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64); @@ -1172,8 +1184,8 @@ impl Fud { let chunked = self.geode.get(hash, &path).await; if let Ok(chunked) = chunked { - for (chunk_hash, _) in chunked.iter() { - let _ = self.scrap_tree.remove(chunk_hash.as_bytes()); + for chunk in chunked.iter() { + let _ = self.scrap_tree.remove(chunk.hash.as_bytes()); } } } diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 51e179fa9..f6b00e3d5 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -321,7 +321,7 @@ impl ProtocolFud { // If it's a file with a single chunk, just reply with the chunk if chunked_file.len() == 1 && !chunked_file.is_dir() { - let chunk_hash = chunked_file.get_chunks()[0].0; + let chunk_hash = chunked_file.get_chunks()[0].hash; let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await; if let Ok(chunk) = chunk { if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource { @@ -342,11 +342,7 @@ impl ProtocolFud { false => { let reply = FudFileReply { resource: request.resource, - chunk_hashes: chunked_file - .get_chunks() - .iter() - .map(|(chunk, _)| *chunk) - .collect(), + chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(), }; info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource)); let _ = self.channel.send(&reply).await; @@ -367,11 +363,7 @@ impl ProtocolFud { } let reply = FudDirectoryReply { resource: request.resource, - chunk_hashes: chunked_file - .get_chunks() - .iter() - .map(|(chunk, _)| *chunk) - .collect(), + chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(), files: files.unwrap(), }; info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource)); diff --git a/bin/fud/fud/src/resource.rs b/bin/fud/fud/src/resource.rs index 866bd39c6..47295d65e 100644 --- a/bin/fud/fud/src/resource.rs +++ b/bin/fud/fud/src/resource.rs @@ -185,7 +185,7 @@ impl Resource { } chunks } - FileSelection::All => chunked.iter().cloned().map(|(hash, _)| hash).collect(), + FileSelection::All => chunked.iter().cloned().map(|c| c.hash).collect(), } } @@ -199,7 +199,7 @@ impl Resource { }; let chunk_hash = blake3::hash(chunk); - let chunk_index = match chunked.iter().position(|(h, _)| *h == chunk_hash) { + let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) { Some(index) => index, None => { return 0; diff --git a/src/geode/chunked_storage.rs b/src/geode/chunked_storage.rs index 981a341f6..7cf27bf05 100644 --- a/src/geode/chunked_storage.rs +++ b/src/geode/chunked_storage.rs @@ -16,17 +16,39 @@ * along with this program. If not, see . */ -use std::path::{Path, PathBuf}; +use std::{ + hash::{Hash, Hasher}, + path::{Path, PathBuf}, +}; use crate::geode::{file_sequence::FileSequence, MAX_CHUNK_SIZE}; +#[derive(Clone, Debug, Eq)] +pub struct Chunk { + pub hash: blake3::Hash, + pub available: bool, + pub size: usize, +} + +impl Hash for Chunk { + fn hash(&self, state: &mut H) { + self.hash.hash(state); + } +} + +impl PartialEq for Chunk { + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash + } +} + /// `ChunkedStorage` is a representation of a file or directory we're trying to /// retrieve from `Geode`. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChunkedStorage { /// Vector of chunk hashes and a bool which is `true` if the chunk is /// available locally. - chunks: Vec<(blake3::Hash, bool)>, + chunks: Vec, /// FileSequence containing the list of file paths and file sizes, it has /// a single item if this is not a directory but a single file. fileseq: FileSequence, @@ -38,7 +60,10 @@ pub struct ChunkedStorage { impl ChunkedStorage { pub fn new(hashes: &[blake3::Hash], files: &[(PathBuf, u64)], is_dir: bool) -> Self { Self { - chunks: hashes.iter().map(|x| (*x, false)).collect(), + chunks: hashes + .iter() + .map(|x| Chunk { hash: *x, available: false, size: MAX_CHUNK_SIZE }) + .collect(), fileseq: FileSequence::new(files, is_dir), is_dir, } @@ -46,16 +71,16 @@ impl ChunkedStorage { /// Check whether we have all the chunks available locally. pub fn is_complete(&self) -> bool { - !self.chunks.iter().any(|(_, available)| !available) + !self.chunks.iter().any(|c| !c.available) } /// Return an iterator over the chunks and their availability. - pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, bool)> { + pub fn iter(&self) -> core::slice::Iter<'_, Chunk> { self.chunks.iter() } /// Return an mutable iterator over the chunks and their availability. - pub fn iter_mut(&mut self) -> core::slice::IterMut<'_, (blake3::Hash, bool)> { + pub fn iter_mut(&mut self) -> core::slice::IterMut<'_, Chunk> { self.chunks.iter_mut() } @@ -71,19 +96,28 @@ impl ChunkedStorage { /// Return the number of chunks available locally. pub fn local_chunks(&self) -> usize { - self.chunks.iter().filter(|(_, p)| *p).count() + self.chunks.iter().filter(|c| c.available).count() } /// Return `chunks`. - pub fn get_chunks(&self) -> &Vec<(blake3::Hash, bool)> { + pub fn get_chunks(&self) -> &Vec { &self.chunks } /// Return a mutable chunk from `chunks`. - pub fn get_chunk_mut(&mut self, index: usize) -> &mut (blake3::Hash, bool) { + pub fn get_chunk_mut(&mut self, index: usize) -> &mut Chunk { &mut self.chunks[index] } + pub fn get_chunk_index(&self, hash: &blake3::Hash) -> Option { + for (i, chunk) in self.chunks.iter().enumerate() { + if chunk.hash == *hash { + return Some(i) + } + } + None + } + /// Return the list of files from the `reader`. pub fn get_files(&self) -> &Vec<(PathBuf, u64)> { self.fileseq.get_files() @@ -125,7 +159,7 @@ impl ChunkedStorage { chunk_indexes .iter() .filter_map(|&index| self.chunks.get(index)) - .map(|(hash, _)| hash) + .map(|c| &c.hash) .cloned() .collect() } diff --git a/src/geode/file_sequence.rs b/src/geode/file_sequence.rs index 99debb148..29b2dab75 100644 --- a/src/geode/file_sequence.rs +++ b/src/geode/file_sequence.rs @@ -16,15 +16,19 @@ * along with this program. If not, see . */ +use std::{ + collections::HashSet, + fs::{File, OpenOptions}, + io::{Read, Seek, Write}, + path::PathBuf, + pin::Pin, +}; + use futures::{ task::{Context, Poll}, AsyncRead, AsyncSeek, AsyncWrite, }; -use smol::{ - fs::{File, OpenOptions}, - io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, -}; -use std::{collections::HashSet, path::PathBuf, pin::Pin}; +use smol::io::{self, SeekFrom}; /// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and /// `AsyncWrite` for an ordered list of (file path, file size). @@ -109,7 +113,7 @@ impl FileSequence { /// Open the file at (`current_file_index` + 1). /// If no file is currently open (`current_file_index` is None), it opens /// the first file. - async fn open_next_file(&mut self) -> io::Result<()> { + fn open_next_file(&mut self) -> io::Result<()> { self.current_file = None; self.current_file_index = match self.current_file_index { Some(i) => Some(i + 1), @@ -122,22 +126,20 @@ impl FileSequence { .read(true) .write(true) .create(false) - .open(self.files[self.current_file_index.unwrap()].0.clone()) - .await?; + .open(self.files[self.current_file_index.unwrap()].0.clone())?; self.current_file = Some(file); Ok(()) } /// Open the file at `file_index`. - async fn open_file(&mut self, file_index: usize) -> io::Result<()> { + fn open_file(&mut self, file_index: usize) -> io::Result<()> { self.current_file = None; self.current_file_index = Some(file_index); let file = OpenOptions::new() .read(true) .write(true) .create(false) - .open(self.files[file_index].0.clone()) - .await?; + .open(self.files[file_index].0.clone())?; self.current_file = Some(file); Ok(()) } @@ -169,7 +171,7 @@ impl AsyncRead for FileSequence { } // Open the next file - match smol::block_on(this.open_next_file()) { + match this.open_next_file() { Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { return Poll::Ready(Ok(total_read)); @@ -184,7 +186,7 @@ impl AsyncRead for FileSequence { // Read from the current file let file = this.current_file.as_mut().unwrap(); - match smol::block_on(file.read(&mut buf[total_read..])) { + match file.read(&mut buf[total_read..]) { Ok(bytes_read) => { if bytes_read == 0 { this.current_file = None; // Move to the next file @@ -239,7 +241,7 @@ impl AsyncSeek for FileSequence { if this.current_file.is_none() || this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index { - match smol::block_on(this.open_file(file_index)) { + match this.open_file(file_index) { Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::NotFound => { // If the file does not exist, return without actually seeking it @@ -253,7 +255,7 @@ impl AsyncSeek for FileSequence { let file_pos = abs_pos - bytes_offset; // Seek in the current file - match smol::block_on(file.seek(SeekFrom::Start(file_pos))) { + match file.seek(SeekFrom::Start(file_pos)) { Ok(_) => Poll::Ready(Ok(this.position)), Err(e) => Poll::Ready(Err(e)), } @@ -273,9 +275,9 @@ impl AsyncWrite for FileSequence { let finalize_current_file = |file: &mut File, max_size: u64| { if auto_set_len { - smol::block_on(file.set_len(max_size))?; + file.set_len(max_size)?; } - smol::block_on(file.flush())?; + file.flush()?; Ok(()) }; @@ -299,7 +301,7 @@ impl AsyncWrite for FileSequence { } // Switch to the next file - match smol::block_on(this.open_next_file()) { + match this.open_next_file() { Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::NotFound => { this.current_file = None; @@ -313,7 +315,7 @@ impl AsyncWrite for FileSequence { let max_size = this.files[this.current_file_index.unwrap()].1; // Check how much space is left in the current file - let current_position = smol::block_on(file.seek(io::SeekFrom::Current(0)))?; + let current_position = file.stream_position()?; let space_left = max_size - current_position; let bytes_to_write = remaining_buf.len().min(space_left as usize); @@ -327,7 +329,7 @@ impl AsyncWrite for FileSequence { } // Write to the current file - match smol::block_on(file.write(&remaining_buf[..bytes_to_write])) { + match file.write(&remaining_buf[..bytes_to_write]) { Ok(bytes_written) => { total_bytes_written += bytes_written; this.position += bytes_written as u64; @@ -361,7 +363,7 @@ impl AsyncWrite for FileSequence { fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { let this = self.get_mut(); if let Some(file) = this.current_file.take() { - match smol::block_on(file.sync_all()) { + match file.sync_all() { Ok(()) => Poll::Ready(Ok(())), Err(e) => Poll::Ready(Err(e)), } @@ -370,3 +372,15 @@ impl AsyncWrite for FileSequence { } } } + +impl Clone for FileSequence { + fn clone(&self) -> Self { + Self { + files: self.files.clone(), + current_file: None, + current_file_index: None, + position: 0, + auto_set_len: self.auto_set_len, + } + } +} diff --git a/src/geode/mod.rs b/src/geode/mod.rs index cd003fd2c..f4f8ca1de 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -95,7 +95,7 @@ use tracing::{debug, info, warn}; use crate::{Error, Result}; mod chunked_storage; -pub use chunked_storage::ChunkedStorage; +pub use chunked_storage::{Chunk, ChunkedStorage}; mod file_sequence; pub use file_sequence::FileSequence; @@ -354,7 +354,7 @@ impl Geode { let chunk_hash = blake3::hash(chunk_slice); // Get the chunk index in the file/directory from the chunk hash - let chunk_index = match chunked.iter().position(|(h, _)| *h == chunk_hash) { + let chunk_index = match chunked.iter().position(|c| c.hash == chunk_hash) { Some(index) => index, None => { return Err(Error::GeodeNeedsGc); @@ -382,7 +382,7 @@ impl Geode { let exact_file_size = chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - chunk_slice.len()); if let Some(file) = &chunked.get_fileseq_mut().get_current_file() { - let _ = file.set_len(exact_file_size as u64).await; + let _ = file.set_len(exact_file_size as u64); } chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64); } @@ -456,7 +456,7 @@ impl Geode { info!(target: "geode::get_chunk", "[Geode] Getting chunk {}", hash_to_string(chunk_hash)); // Get the chunk index in the file from the chunk hash - let chunk_index = match chunked.iter().position(|(h, _)| *h == *chunk_hash) { + let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) { Some(index) => index, None => return Err(Error::GeodeChunkNotFound), };