geode, fud: replace smol::fs::File with std::fs::File, add Chunk struct

This commit is contained in:
epiphany
2026-01-01 18:14:18 +00:00
parent d18de935ac
commit 94dc672a15
7 changed files with 128 additions and 76 deletions

View File

@@ -299,8 +299,8 @@ async fn handle_chunk_reply(
// `total_bytes_size` (and `target_bytes_size`) again,
// as `geode.write_chunk()` updated the FileSequence
// to the exact file size.
if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
if let Some(last_chunk) = ctx.chunked.iter().last() {
if matches!(resource.rtype, ResourceType::File) && last_chunk.hash == *chunk_hash {
resource.total_bytes_size = ctx.chunked.get_fileseq().len();
resource.target_bytes_size = resource.total_bytes_size;
}

View File

@@ -33,7 +33,7 @@ use tracing::{error, info, warn};
use darkfi::{
dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
geode::{hash_to_string, Chunk, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
net::P2pPtr,
system::{ExecutorPtr, PublisherPtr, StoppableTask},
util::{path::expand_path, time::Timestamp},
@@ -476,7 +476,7 @@ impl Fud {
resource.target_chunks_downloaded = match chunked {
Some(chunked) => chunked
.iter()
.filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
.filter(|chunk| chunk_hashes.contains(&chunk.hash) && chunk.available)
.count() as u64,
None => 0,
};
@@ -775,12 +775,12 @@ impl Fud {
}
// Set of all chunks we need locally and their current availability
let chunks: HashSet<(blake3::Hash, bool)> =
chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
let chunks: HashSet<Chunk> =
chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).cloned().collect();
// Set of the chunks we need to download
let mut missing_chunks: HashSet<blake3::Hash> =
chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
chunks.iter().filter(|&c| !c.available).map(|c| c.hash).collect();
// Update the resource with the chunks/bytes counts
update_resource!(hash, {
@@ -858,10 +858,8 @@ impl Fud {
// Verify all chunks
self.verify_chunks(&resource, &mut chunked).await?;
let is_complete = chunked
.iter()
.filter(|(hash, _)| chunk_hashes.contains(hash))
.all(|(_, available)| *available);
let is_complete =
chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).all(|c| c.available);
// We fetched all chunks, but the resource is not complete
// (some chunks were missing from all seeders)
@@ -953,9 +951,9 @@ impl Fud {
let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
// Gather all available chunks
for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
for (chunk_index, chunk) in chunks.iter().enumerate() {
// Read the chunk using the `FileSequence`
let chunk =
let chunk_data =
match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
Ok(c) => c,
Err(Error::Io(ErrorKind::NotFound)) => continue,
@@ -966,23 +964,34 @@ impl Fud {
};
// Perform chunk consistency check
if self.geode.verify_chunk(chunk_hash, &chunk) {
chunked.get_chunk_mut(chunk_index).1 = true;
if self.geode.verify_chunk(&chunk.hash, &chunk_data) {
chunked.get_chunk_mut(chunk_index).available = true;
chunked.get_chunk_mut(chunk_index).size = chunk_data.len();
bytes.insert(
*chunk_hash,
(chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
chunk.hash,
(
chunk_data.len(),
resource.get_bytes_of_selection(
chunked,
file_selection,
&chunk.hash,
chunk_data.len(),
),
),
);
} else {
chunked.get_chunk_mut(chunk_index).available = false;
}
}
// Look for the chunks that are not on the filesystem
let chunks = chunked.get_chunks().clone();
let missing_on_fs: Vec<_> =
chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
chunks.iter().enumerate().filter(|(_, c)| !c.available).collect();
// Look for scraps
for (chunk_index, (chunk_hash, _)) in missing_on_fs {
let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
for (chunk_index, chunk) in missing_on_fs {
let scrap = self.scrap_tree.get(chunk.hash.as_bytes())?;
if scrap.is_none() {
continue;
}
@@ -993,7 +1002,7 @@ impl Fud {
continue;
}
let scrap: Scrap = scrap.unwrap();
if blake3::hash(&scrap.chunk) != *chunk_hash {
if blake3::hash(&scrap.chunk) != chunk.hash {
continue;
}
@@ -1011,7 +1020,8 @@ impl Fud {
}
// Mark the chunk as available
chunked.get_chunk_mut(chunk_index).1 = true;
chunked.get_chunk_mut(chunk_index).available = true;
chunked.get_chunk_mut(chunk_index).size = scrap.chunk.len();
// Update the sums of locally available data
bytes.insert(
@@ -1023,9 +1033,11 @@ impl Fud {
// If the resource is a file: make the `FileSequence`'s file the
// exact file size if we know the last chunk's size. This is not
// needed for directories.
if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
if !chunked.is_dir() && *last_chunk_available {
if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
let is_dir = chunked.is_dir();
if let Some(last_chunk) = chunked.iter_mut().last() {
if !is_dir && last_chunk.available {
if let Some((last_chunk_size, _)) = bytes.get(&last_chunk.hash) {
last_chunk.size = *last_chunk_size;
let exact_file_size =
chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
@@ -1172,8 +1184,8 @@ impl Fud {
let chunked = self.geode.get(hash, &path).await;
if let Ok(chunked) = chunked {
for (chunk_hash, _) in chunked.iter() {
let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
for chunk in chunked.iter() {
let _ = self.scrap_tree.remove(chunk.hash.as_bytes());
}
}
}

View File

@@ -321,7 +321,7 @@ impl ProtocolFud {
// If it's a file with a single chunk, just reply with the chunk
if chunked_file.len() == 1 && !chunked_file.is_dir() {
let chunk_hash = chunked_file.get_chunks()[0].0;
let chunk_hash = chunked_file.get_chunks()[0].hash;
let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
if let Ok(chunk) = chunk {
if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource {
@@ -342,11 +342,7 @@ impl ProtocolFud {
false => {
let reply = FudFileReply {
resource: request.resource,
chunk_hashes: chunked_file
.get_chunks()
.iter()
.map(|(chunk, _)| *chunk)
.collect(),
chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
};
info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource));
let _ = self.channel.send(&reply).await;
@@ -367,11 +363,7 @@ impl ProtocolFud {
}
let reply = FudDirectoryReply {
resource: request.resource,
chunk_hashes: chunked_file
.get_chunks()
.iter()
.map(|(chunk, _)| *chunk)
.collect(),
chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
files: files.unwrap(),
};
info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource));

View File

@@ -185,7 +185,7 @@ impl Resource {
}
chunks
}
FileSelection::All => chunked.iter().cloned().map(|(hash, _)| hash).collect(),
FileSelection::All => chunked.iter().cloned().map(|c| c.hash).collect(),
}
}
@@ -199,7 +199,7 @@ impl Resource {
};
let chunk_hash = blake3::hash(chunk);
let chunk_index = match chunked.iter().position(|(h, _)| *h == chunk_hash) {
let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) {
Some(index) => index,
None => {
return 0;

View File

@@ -16,17 +16,39 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::path::{Path, PathBuf};
use std::{
hash::{Hash, Hasher},
path::{Path, PathBuf},
};
use crate::geode::{file_sequence::FileSequence, MAX_CHUNK_SIZE};
#[derive(Clone, Debug, Eq)]
pub struct Chunk {
pub hash: blake3::Hash,
pub available: bool,
pub size: usize,
}
impl Hash for Chunk {
fn hash<H: Hasher>(&self, state: &mut H) {
self.hash.hash(state);
}
}
impl PartialEq for Chunk {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
/// `ChunkedStorage` is a representation of a file or directory we're trying to
/// retrieve from `Geode`.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ChunkedStorage {
/// Vector of chunk hashes and a bool which is `true` if the chunk is
/// available locally.
chunks: Vec<(blake3::Hash, bool)>,
chunks: Vec<Chunk>,
/// FileSequence containing the list of file paths and file sizes, it has
/// a single item if this is not a directory but a single file.
fileseq: FileSequence,
@@ -38,7 +60,10 @@ pub struct ChunkedStorage {
impl ChunkedStorage {
pub fn new(hashes: &[blake3::Hash], files: &[(PathBuf, u64)], is_dir: bool) -> Self {
Self {
chunks: hashes.iter().map(|x| (*x, false)).collect(),
chunks: hashes
.iter()
.map(|x| Chunk { hash: *x, available: false, size: MAX_CHUNK_SIZE })
.collect(),
fileseq: FileSequence::new(files, is_dir),
is_dir,
}
@@ -46,16 +71,16 @@ impl ChunkedStorage {
/// Check whether we have all the chunks available locally.
pub fn is_complete(&self) -> bool {
!self.chunks.iter().any(|(_, available)| !available)
!self.chunks.iter().any(|c| !c.available)
}
/// Return an iterator over the chunks and their availability.
pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, bool)> {
pub fn iter(&self) -> core::slice::Iter<'_, Chunk> {
self.chunks.iter()
}
/// Return an mutable iterator over the chunks and their availability.
pub fn iter_mut(&mut self) -> core::slice::IterMut<'_, (blake3::Hash, bool)> {
pub fn iter_mut(&mut self) -> core::slice::IterMut<'_, Chunk> {
self.chunks.iter_mut()
}
@@ -71,19 +96,28 @@ impl ChunkedStorage {
/// Return the number of chunks available locally.
pub fn local_chunks(&self) -> usize {
self.chunks.iter().filter(|(_, p)| *p).count()
self.chunks.iter().filter(|c| c.available).count()
}
/// Return `chunks`.
pub fn get_chunks(&self) -> &Vec<(blake3::Hash, bool)> {
pub fn get_chunks(&self) -> &Vec<Chunk> {
&self.chunks
}
/// Return a mutable chunk from `chunks`.
pub fn get_chunk_mut(&mut self, index: usize) -> &mut (blake3::Hash, bool) {
pub fn get_chunk_mut(&mut self, index: usize) -> &mut Chunk {
&mut self.chunks[index]
}
pub fn get_chunk_index(&self, hash: &blake3::Hash) -> Option<usize> {
for (i, chunk) in self.chunks.iter().enumerate() {
if chunk.hash == *hash {
return Some(i)
}
}
None
}
/// Return the list of files from the `reader`.
pub fn get_files(&self) -> &Vec<(PathBuf, u64)> {
self.fileseq.get_files()
@@ -125,7 +159,7 @@ impl ChunkedStorage {
chunk_indexes
.iter()
.filter_map(|&index| self.chunks.get(index))
.map(|(hash, _)| hash)
.map(|c| &c.hash)
.cloned()
.collect()
}

View File

@@ -16,15 +16,19 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::HashSet,
fs::{File, OpenOptions},
io::{Read, Seek, Write},
path::PathBuf,
pin::Pin,
};
use futures::{
task::{Context, Poll},
AsyncRead, AsyncSeek, AsyncWrite,
};
use smol::{
fs::{File, OpenOptions},
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
};
use std::{collections::HashSet, path::PathBuf, pin::Pin};
use smol::io::{self, SeekFrom};
/// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and
/// `AsyncWrite` for an ordered list of (file path, file size).
@@ -109,7 +113,7 @@ impl FileSequence {
/// Open the file at (`current_file_index` + 1).
/// If no file is currently open (`current_file_index` is None), it opens
/// the first file.
async fn open_next_file(&mut self) -> io::Result<()> {
fn open_next_file(&mut self) -> io::Result<()> {
self.current_file = None;
self.current_file_index = match self.current_file_index {
Some(i) => Some(i + 1),
@@ -122,22 +126,20 @@ impl FileSequence {
.read(true)
.write(true)
.create(false)
.open(self.files[self.current_file_index.unwrap()].0.clone())
.await?;
.open(self.files[self.current_file_index.unwrap()].0.clone())?;
self.current_file = Some(file);
Ok(())
}
/// Open the file at `file_index`.
async fn open_file(&mut self, file_index: usize) -> io::Result<()> {
fn open_file(&mut self, file_index: usize) -> io::Result<()> {
self.current_file = None;
self.current_file_index = Some(file_index);
let file = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(self.files[file_index].0.clone())
.await?;
.open(self.files[file_index].0.clone())?;
self.current_file = Some(file);
Ok(())
}
@@ -169,7 +171,7 @@ impl AsyncRead for FileSequence {
}
// Open the next file
match smol::block_on(this.open_next_file()) {
match this.open_next_file() {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Poll::Ready(Ok(total_read));
@@ -184,7 +186,7 @@ impl AsyncRead for FileSequence {
// Read from the current file
let file = this.current_file.as_mut().unwrap();
match smol::block_on(file.read(&mut buf[total_read..])) {
match file.read(&mut buf[total_read..]) {
Ok(bytes_read) => {
if bytes_read == 0 {
this.current_file = None; // Move to the next file
@@ -239,7 +241,7 @@ impl AsyncSeek for FileSequence {
if this.current_file.is_none() ||
this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index
{
match smol::block_on(this.open_file(file_index)) {
match this.open_file(file_index) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
// If the file does not exist, return without actually seeking it
@@ -253,7 +255,7 @@ impl AsyncSeek for FileSequence {
let file_pos = abs_pos - bytes_offset;
// Seek in the current file
match smol::block_on(file.seek(SeekFrom::Start(file_pos))) {
match file.seek(SeekFrom::Start(file_pos)) {
Ok(_) => Poll::Ready(Ok(this.position)),
Err(e) => Poll::Ready(Err(e)),
}
@@ -273,9 +275,9 @@ impl AsyncWrite for FileSequence {
let finalize_current_file = |file: &mut File, max_size: u64| {
if auto_set_len {
smol::block_on(file.set_len(max_size))?;
file.set_len(max_size)?;
}
smol::block_on(file.flush())?;
file.flush()?;
Ok(())
};
@@ -299,7 +301,7 @@ impl AsyncWrite for FileSequence {
}
// Switch to the next file
match smol::block_on(this.open_next_file()) {
match this.open_next_file() {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
this.current_file = None;
@@ -313,7 +315,7 @@ impl AsyncWrite for FileSequence {
let max_size = this.files[this.current_file_index.unwrap()].1;
// Check how much space is left in the current file
let current_position = smol::block_on(file.seek(io::SeekFrom::Current(0)))?;
let current_position = file.stream_position()?;
let space_left = max_size - current_position;
let bytes_to_write = remaining_buf.len().min(space_left as usize);
@@ -327,7 +329,7 @@ impl AsyncWrite for FileSequence {
}
// Write to the current file
match smol::block_on(file.write(&remaining_buf[..bytes_to_write])) {
match file.write(&remaining_buf[..bytes_to_write]) {
Ok(bytes_written) => {
total_bytes_written += bytes_written;
this.position += bytes_written as u64;
@@ -361,7 +363,7 @@ impl AsyncWrite for FileSequence {
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
if let Some(file) = this.current_file.take() {
match smol::block_on(file.sync_all()) {
match file.sync_all() {
Ok(()) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
}
@@ -370,3 +372,15 @@ impl AsyncWrite for FileSequence {
}
}
}
impl Clone for FileSequence {
fn clone(&self) -> Self {
Self {
files: self.files.clone(),
current_file: None,
current_file_index: None,
position: 0,
auto_set_len: self.auto_set_len,
}
}
}

View File

@@ -95,7 +95,7 @@ use tracing::{debug, info, warn};
use crate::{Error, Result};
mod chunked_storage;
pub use chunked_storage::ChunkedStorage;
pub use chunked_storage::{Chunk, ChunkedStorage};
mod file_sequence;
pub use file_sequence::FileSequence;
@@ -354,7 +354,7 @@ impl Geode {
let chunk_hash = blake3::hash(chunk_slice);
// Get the chunk index in the file/directory from the chunk hash
let chunk_index = match chunked.iter().position(|(h, _)| *h == chunk_hash) {
let chunk_index = match chunked.iter().position(|c| c.hash == chunk_hash) {
Some(index) => index,
None => {
return Err(Error::GeodeNeedsGc);
@@ -382,7 +382,7 @@ impl Geode {
let exact_file_size =
chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - chunk_slice.len());
if let Some(file) = &chunked.get_fileseq_mut().get_current_file() {
let _ = file.set_len(exact_file_size as u64).await;
let _ = file.set_len(exact_file_size as u64);
}
chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
}
@@ -456,7 +456,7 @@ impl Geode {
info!(target: "geode::get_chunk", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
// Get the chunk index in the file from the chunk hash
let chunk_index = match chunked.iter().position(|(h, _)| *h == *chunk_hash) {
let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) {
Some(index) => index,
None => return Err(Error::GeodeChunkNotFound),
};