mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-09 14:48:08 -05:00
fud, geode: track downloaded bytes and download speed, add eta, improve scraps
This commit is contained in:
@@ -205,3 +205,27 @@ impl From<FudEvent> for JsonValue {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Macro calling `fud.event_publisher.notify()`
|
||||
macro_rules! notify_event {
|
||||
// This is for any `FudEvent`
|
||||
($fud:ident, $event:ident, { $($fields:tt)* }) => {
|
||||
$fud
|
||||
.event_publisher
|
||||
.notify(FudEvent::$event(event::$event {
|
||||
$($fields)*
|
||||
}))
|
||||
.await;
|
||||
};
|
||||
// This is for `FudEvent`s that only have a hash and resource
|
||||
($fud:ident, $event:ident, $resource:expr) => {
|
||||
$fud
|
||||
.event_publisher
|
||||
.notify(FudEvent::$event(event::$event {
|
||||
hash: $resource.hash,
|
||||
resource: $resource.clone(),
|
||||
}))
|
||||
.await;
|
||||
};
|
||||
}
|
||||
pub(crate) use notify_event;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -246,9 +246,12 @@ impl ProtocolFud {
|
||||
return false;
|
||||
}
|
||||
|
||||
let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key, &path).await;
|
||||
let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
|
||||
if let Ok(chunk) = chunk {
|
||||
// TODO: Run geode GC
|
||||
if !self.fud.geode.verify_chunk(&request.key, &chunk) {
|
||||
// TODO: Run geode GC
|
||||
return false;
|
||||
}
|
||||
let reply = FudChunkReply { chunk };
|
||||
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk {}", hash_to_string(&request.key));
|
||||
let _ = self.channel.send(&reply).await;
|
||||
@@ -276,9 +279,12 @@ impl ProtocolFud {
|
||||
// If it's a file with a single chunk, just reply with the chunk
|
||||
if chunked_file.len() == 1 && !chunked_file.is_dir() {
|
||||
let chunk_hash = chunked_file.get_chunks()[0].0;
|
||||
let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash, &path).await;
|
||||
let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
|
||||
if let Ok(chunk) = chunk {
|
||||
// TODO: Run geode GC
|
||||
if !self.fud.geode.verify_chunk(&request.key, &chunk) {
|
||||
// TODO: Run geode GC
|
||||
return false;
|
||||
}
|
||||
let reply = FudChunkReply { chunk };
|
||||
info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
|
||||
let _ = self.channel.send(&reply).await;
|
||||
|
||||
@@ -16,10 +16,20 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use darkfi::{geode::hash_to_string, rpc::util::json_map};
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tinyjson::JsonValue;
|
||||
|
||||
use darkfi::{
|
||||
geode::{hash_to_string, ChunkedStorage, MAX_CHUNK_SIZE},
|
||||
rpc::util::json_map,
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
use crate::FileSelection;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ResourceStatus {
|
||||
Downloading,
|
||||
@@ -29,62 +39,302 @@ pub enum ResourceStatus {
|
||||
Verifying,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
impl ResourceStatus {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
ResourceStatus::Downloading => "downloading",
|
||||
ResourceStatus::Seeding => "seeding",
|
||||
ResourceStatus::Discovering => "discovering",
|
||||
ResourceStatus::Incomplete => "incomplete",
|
||||
ResourceStatus::Verifying => "verifying",
|
||||
}
|
||||
}
|
||||
fn from_str(s: &str) -> Result<Self> {
|
||||
match s {
|
||||
"downloading" => Ok(ResourceStatus::Downloading),
|
||||
"seeding" => Ok(ResourceStatus::Seeding),
|
||||
"discovering" => Ok(ResourceStatus::Discovering),
|
||||
"incomplete" => Ok(ResourceStatus::Incomplete),
|
||||
"verifying" => Ok(ResourceStatus::Verifying),
|
||||
_ => Err(Error::Custom("Invalid resource status".to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ResourceType {
|
||||
Unknown,
|
||||
File,
|
||||
Directory,
|
||||
}
|
||||
|
||||
impl ResourceType {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
ResourceType::Unknown => "unknown",
|
||||
ResourceType::File => "file",
|
||||
ResourceType::Directory => "directory",
|
||||
}
|
||||
}
|
||||
fn from_str(s: &str) -> Result<Self> {
|
||||
match s {
|
||||
"unknown" => Ok(ResourceType::Unknown),
|
||||
"file" => Ok(ResourceType::File),
|
||||
"directory" => Ok(ResourceType::Directory),
|
||||
_ => Err(Error::Custom("Invalid resource type".to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Structure representing the current state of a file or directory on fud.
|
||||
/// It is used in most `FudEvent`.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Resource {
|
||||
/// Resource hash (used as key in the DHT)
|
||||
pub hash: blake3::Hash,
|
||||
/// Resource type (file or directory)
|
||||
pub rtype: ResourceType,
|
||||
/// Path of the resource on the filesystem
|
||||
pub path: PathBuf,
|
||||
/// Current status of the resource
|
||||
pub status: ResourceStatus,
|
||||
pub chunks_total: u64,
|
||||
pub chunks_downloaded: u64,
|
||||
pub chunks_target: u64,
|
||||
/// The files the user wants to download
|
||||
pub file_selection: FileSelection,
|
||||
|
||||
/// Total number of chunks
|
||||
pub total_chunks_count: u64,
|
||||
/// Number of chunks we want to download
|
||||
pub target_chunks_count: u64,
|
||||
/// Number of chunks we already downloaded
|
||||
pub total_chunks_downloaded: u64,
|
||||
/// Number of chunks we already downloaded,
|
||||
/// but only those we want to download on the last fetch request
|
||||
pub target_chunks_downloaded: u64,
|
||||
|
||||
/// Total size (in bytes) of the resource
|
||||
pub total_bytes_size: u64,
|
||||
/// Data (in bytes) we want to download
|
||||
pub target_bytes_size: u64,
|
||||
/// Data (in bytes) we already downloaded
|
||||
pub total_bytes_downloaded: u64,
|
||||
/// Data (in bytes) we already downloaded,
|
||||
/// but only data we want to download on the last fetch request
|
||||
pub target_bytes_downloaded: u64,
|
||||
|
||||
/// Recent speeds in bytes/sec, used to compute the download ETA.
|
||||
pub speeds: Vec<f64>,
|
||||
}
|
||||
|
||||
impl Resource {
|
||||
pub fn new(
|
||||
hash: blake3::Hash,
|
||||
rtype: ResourceType,
|
||||
path: &Path,
|
||||
status: ResourceStatus,
|
||||
file_selection: FileSelection,
|
||||
) -> Self {
|
||||
Self {
|
||||
hash,
|
||||
rtype,
|
||||
path: path.to_path_buf(),
|
||||
status,
|
||||
file_selection,
|
||||
total_chunks_count: 0,
|
||||
target_chunks_count: 0,
|
||||
total_chunks_downloaded: 0,
|
||||
target_chunks_downloaded: 0,
|
||||
total_bytes_size: 0,
|
||||
target_bytes_size: 0,
|
||||
total_bytes_downloaded: 0,
|
||||
target_bytes_downloaded: 0,
|
||||
speeds: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes and returns download ETA in seconds using the `speeds` list.
|
||||
pub fn get_eta(&self) -> u64 {
|
||||
if self.speeds.is_empty() {
|
||||
return 0
|
||||
}
|
||||
|
||||
let remaining_chunks = self.target_chunks_count - self.target_chunks_downloaded;
|
||||
let mean_speed = self.speeds.iter().sum::<f64>() / self.speeds.len() as f64;
|
||||
|
||||
((remaining_chunks * MAX_CHUNK_SIZE as u64) as f64 / mean_speed) as u64
|
||||
}
|
||||
|
||||
/// Returns the list of selected files (absolute paths).
|
||||
pub fn get_selected_files(&self, chunked: &ChunkedStorage) -> Vec<PathBuf> {
|
||||
match &self.file_selection {
|
||||
FileSelection::Set(files) => files
|
||||
.iter()
|
||||
.map(|file| self.path.join(file))
|
||||
.filter(|abs| chunked.get_files().iter().any(|(f, _)| f == abs))
|
||||
.collect(),
|
||||
FileSelection::All => chunked.get_files().iter().map(|(f, _)| f.clone()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the (sub)set of chunk hashes in a ChunkedStorage for a file selection.
|
||||
pub fn get_selected_chunks(&self, chunked: &ChunkedStorage) -> HashSet<blake3::Hash> {
|
||||
match &self.file_selection {
|
||||
FileSelection::Set(files) => {
|
||||
let mut chunks = HashSet::new();
|
||||
for file in files {
|
||||
chunks.extend(chunked.get_chunks_of_file(&self.path.join(file)));
|
||||
}
|
||||
chunks
|
||||
}
|
||||
FileSelection::All => chunked.iter().cloned().map(|(hash, _)| hash).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes we want from a chunk (depends on the file selection).
|
||||
pub fn get_selected_bytes(&self, chunked: &ChunkedStorage, chunk: &[u8]) -> usize {
|
||||
// If `FileSelection` is not a set, we want all bytes from a chunk
|
||||
let file_set = if let FileSelection::Set(files) = &self.file_selection {
|
||||
files
|
||||
} else {
|
||||
return chunk.len();
|
||||
};
|
||||
|
||||
let chunk_hash = blake3::hash(chunk);
|
||||
let chunk_index = match chunked.iter().position(|(h, _)| *h == chunk_hash) {
|
||||
Some(index) => index,
|
||||
None => {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
let files = chunked.get_files();
|
||||
let chunk_length = chunk.len();
|
||||
let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
|
||||
let mut total_selected_bytes = 0;
|
||||
|
||||
// Find the starting file index based on the position
|
||||
let mut file_index = 0;
|
||||
let mut file_start_pos = 0;
|
||||
|
||||
while file_index < files.len() {
|
||||
if file_start_pos + files[file_index].1 > position {
|
||||
break;
|
||||
}
|
||||
file_start_pos += files[file_index].1;
|
||||
file_index += 1;
|
||||
}
|
||||
|
||||
if file_index >= files.len() {
|
||||
// Out of bounds
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Calculate the end position of the chunk
|
||||
let end_position = position + chunk_length as u64;
|
||||
|
||||
// Iterate through the files and count selected bytes
|
||||
while file_index < files.len() {
|
||||
let (file_path, file_size) = &files[file_index];
|
||||
let file_end_pos = file_start_pos + *file_size;
|
||||
|
||||
// Check if the file is in the selection
|
||||
if let Ok(rel_file_path) = file_path.strip_prefix(&self.path) {
|
||||
if file_set.contains(rel_file_path) {
|
||||
// Calculate the overlap with the chunk
|
||||
let overlap_start = position.max(file_start_pos);
|
||||
let overlap_end = end_position.min(file_end_pos);
|
||||
|
||||
if overlap_start < overlap_end {
|
||||
total_selected_bytes += (overlap_end - overlap_start) as usize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Move to the next file
|
||||
file_start_pos += *file_size;
|
||||
file_index += 1;
|
||||
|
||||
// Stop if we've reached the end of the chunk
|
||||
if file_start_pos >= end_position {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
total_selected_bytes
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Resource> for JsonValue {
|
||||
fn from(rs: Resource) -> JsonValue {
|
||||
json_map([
|
||||
("hash", JsonValue::String(hash_to_string(&rs.hash))),
|
||||
(
|
||||
"type",
|
||||
JsonValue::String(
|
||||
match rs.rtype {
|
||||
ResourceType::Unknown => "unknown",
|
||||
ResourceType::File => "file",
|
||||
ResourceType::Directory => "directory",
|
||||
}
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
("type", JsonValue::String(rs.rtype.as_str().to_string())),
|
||||
(
|
||||
"path",
|
||||
JsonValue::String(match rs.path.into_os_string().into_string() {
|
||||
JsonValue::String(match rs.path.clone().into_os_string().into_string() {
|
||||
Ok(path) => path,
|
||||
Err(_) => "".to_string(),
|
||||
}),
|
||||
),
|
||||
(
|
||||
"status",
|
||||
JsonValue::String(
|
||||
match rs.status {
|
||||
ResourceStatus::Downloading => "downloading",
|
||||
ResourceStatus::Seeding => "seeding",
|
||||
ResourceStatus::Discovering => "discovering",
|
||||
ResourceStatus::Incomplete => "incomplete",
|
||||
ResourceStatus::Verifying => "verifying",
|
||||
}
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
("chunks_total", JsonValue::Number(rs.chunks_total as f64)),
|
||||
("chunks_downloaded", JsonValue::Number(rs.chunks_downloaded as f64)),
|
||||
("chunks_target", JsonValue::Number(rs.chunks_target as f64)),
|
||||
("status", JsonValue::String(rs.status.as_str().to_string())),
|
||||
("total_chunks_count", JsonValue::Number(rs.total_chunks_count as f64)),
|
||||
("target_chunks_count", JsonValue::Number(rs.target_chunks_count as f64)),
|
||||
("total_chunks_downloaded", JsonValue::Number(rs.total_chunks_downloaded as f64)),
|
||||
("target_chunks_downloaded", JsonValue::Number(rs.target_chunks_downloaded as f64)),
|
||||
("total_bytes_size", JsonValue::Number(rs.total_bytes_size as f64)),
|
||||
("target_bytes_size", JsonValue::Number(rs.target_bytes_size as f64)),
|
||||
("total_bytes_downloaded", JsonValue::Number(rs.total_bytes_downloaded as f64)),
|
||||
("target_bytes_downloaded", JsonValue::Number(rs.target_bytes_downloaded as f64)),
|
||||
("speeds", JsonValue::Array(rs.speeds.into_iter().map(JsonValue::Number).collect())),
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonValue> for Resource {
|
||||
fn from(value: JsonValue) -> Self {
|
||||
let mut hash_buf = vec![];
|
||||
let _ = bs58::decode(value["hash"].get::<String>().unwrap().as_str()).onto(&mut hash_buf);
|
||||
let mut hash_buf_arr = [0u8; 32];
|
||||
hash_buf_arr.copy_from_slice(&hash_buf);
|
||||
let hash = blake3::Hash::from_bytes(hash_buf_arr);
|
||||
|
||||
let rtype = ResourceType::from_str(value["type"].get::<String>().unwrap()).unwrap();
|
||||
let path = PathBuf::from(value["path"].get::<String>().unwrap());
|
||||
let status = ResourceStatus::from_str(value["status"].get::<String>().unwrap()).unwrap();
|
||||
|
||||
let total_chunks_count = *value["total_chunks_count"].get::<f64>().unwrap() as u64;
|
||||
let target_chunks_count = *value["target_chunks_count"].get::<f64>().unwrap() as u64;
|
||||
let total_chunks_downloaded =
|
||||
*value["total_chunks_downloaded"].get::<f64>().unwrap() as u64;
|
||||
let target_chunks_downloaded =
|
||||
*value["target_chunks_downloaded"].get::<f64>().unwrap() as u64;
|
||||
let total_bytes_size = *value["total_bytes_size"].get::<f64>().unwrap() as u64;
|
||||
let target_bytes_size = *value["target_bytes_size"].get::<f64>().unwrap() as u64;
|
||||
let total_bytes_downloaded = *value["total_bytes_downloaded"].get::<f64>().unwrap() as u64;
|
||||
let target_bytes_downloaded =
|
||||
*value["target_bytes_downloaded"].get::<f64>().unwrap() as u64;
|
||||
|
||||
let speeds = value["speeds"]
|
||||
.get::<Vec<JsonValue>>()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|s| *s.get::<f64>().unwrap())
|
||||
.collect::<Vec<f64>>();
|
||||
|
||||
Resource {
|
||||
hash,
|
||||
rtype,
|
||||
path,
|
||||
status,
|
||||
file_selection: FileSelection::All, // TODO
|
||||
total_chunks_count,
|
||||
target_chunks_count,
|
||||
total_chunks_downloaded,
|
||||
target_chunks_downloaded,
|
||||
total_bytes_size,
|
||||
target_bytes_size,
|
||||
total_bytes_downloaded,
|
||||
target_bytes_downloaded,
|
||||
speeds,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
49
bin/fud/fud/src/scrap.rs
Normal file
49
bin/fud/fud/src/scrap.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
/* This file is part of DarkFi (https://dark.fi)
|
||||
*
|
||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use darkfi_serial::{async_trait, AsyncDecodable, AsyncEncodable, AsyncRead, AsyncWrite};
|
||||
use std::io::Result;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Scrap {
|
||||
pub chunk: Vec<u8>,
|
||||
/// Hash of the data that was last written to the file system (parts of `chunk`).
|
||||
/// Used to check if the data on the filesystem changed and the scrap should be rewritten.
|
||||
pub hash_written: blake3::Hash,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncEncodable for Scrap {
|
||||
#[inline]
|
||||
async fn encode_async<S: AsyncWrite + Unpin + Send>(&self, s: &mut S) -> Result<usize> {
|
||||
let mut len = 0;
|
||||
len += self.chunk.encode_async(s).await?;
|
||||
len += self.hash_written.encode_async(s).await?;
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncDecodable for Scrap {
|
||||
async fn decode_async<D: AsyncRead + Unpin + Send>(d: &mut D) -> Result<Self> {
|
||||
Ok(Self {
|
||||
chunk: <Vec<u8>>::decode_async(d).await?,
|
||||
hash_written: blake3::Hash::decode_async(d).await?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -26,8 +26,10 @@ use darkfi::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
event,
|
||||
event::notify_event,
|
||||
proto::{FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply},
|
||||
Fud,
|
||||
Fud, FudEvent,
|
||||
};
|
||||
|
||||
pub enum FetchReply {
|
||||
|
||||
@@ -16,13 +16,15 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use darkfi::Result;
|
||||
use smol::{fs, stream::StreamExt};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
pub use darkfi::geode::hash_to_string;
|
||||
use darkfi::Result;
|
||||
|
||||
pub async fn get_all_files(dir: &Path) -> Result<Vec<(PathBuf, u64)>> {
|
||||
let mut files = Vec::new();
|
||||
|
||||
@@ -45,7 +47,8 @@ pub async fn get_all_files(dir: &Path) -> Result<Vec<(PathBuf, u64)>> {
|
||||
|
||||
/// An enum to represent a set of files, where you can use `All` if you want
|
||||
/// all files without having to specify all of them.
|
||||
/// We could use an Option<HashSet<PathBuf>>, but this is more explicit.
|
||||
/// We could use an `Option<HashSet<PathBuf>>`, but this is more explicit.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum FileSelection {
|
||||
All,
|
||||
Set(HashSet<PathBuf>),
|
||||
|
||||
@@ -90,7 +90,12 @@ impl ChunkedStorage {
|
||||
}
|
||||
|
||||
/// Return `fileseq`.
|
||||
pub fn get_fileseq(&mut self) -> &mut FileSequence {
|
||||
pub fn get_fileseq(&self) -> &FileSequence {
|
||||
&self.fileseq
|
||||
}
|
||||
|
||||
/// Return a mutable `fileseq`.
|
||||
pub fn get_fileseq_mut(&mut self) -> &mut FileSequence {
|
||||
&mut self.fileseq
|
||||
}
|
||||
|
||||
@@ -100,7 +105,7 @@ impl ChunkedStorage {
|
||||
}
|
||||
|
||||
/// Return all chunks that contain parts of `file`.
|
||||
pub fn get_chunks_of_file(&self, file: &Path) -> Vec<(blake3::Hash, bool)> {
|
||||
pub fn get_chunks_of_file(&self, file: &Path) -> Vec<blake3::Hash> {
|
||||
let files = self.fileseq.get_files();
|
||||
let file_index = files.iter().position(|(f, _)| f == file);
|
||||
if file_index.is_none() {
|
||||
@@ -117,6 +122,11 @@ impl ChunkedStorage {
|
||||
|
||||
let chunk_indexes: Vec<usize> = (start_index as usize..=end_index as usize).collect();
|
||||
|
||||
chunk_indexes.iter().filter_map(|&index| self.chunks.get(index)).cloned().collect()
|
||||
chunk_indexes
|
||||
.iter()
|
||||
.filter_map(|&index| self.chunks.get(index))
|
||||
.map(|(hash, _)| hash)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use smol::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
|
||||
};
|
||||
use std::{path::PathBuf, pin::Pin};
|
||||
use std::{collections::HashSet, path::PathBuf, pin::Pin};
|
||||
|
||||
/// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and
|
||||
/// `AsyncWrite` for an ordered list of (file path, file size).
|
||||
@@ -81,6 +81,21 @@ impl FileSequence {
|
||||
&self.files
|
||||
}
|
||||
|
||||
/// Return the combined file size of all files.
|
||||
pub fn len(&self) -> u64 {
|
||||
self.files.iter().map(|(_, size)| size).sum()
|
||||
}
|
||||
|
||||
/// Return `true` if the `FileSequence` contains no file.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.files.is_empty()
|
||||
}
|
||||
|
||||
/// Return the combined file size of all files.
|
||||
pub fn subset_len(&self, files: HashSet<PathBuf>) -> u64 {
|
||||
self.files.iter().filter(|(path, _)| files.contains(path)).map(|(_, size)| size).sum()
|
||||
}
|
||||
|
||||
/// Compute the starting position of the file (in bytes) by suming up
|
||||
/// the size of the previous files.
|
||||
pub fn get_file_position(&self, file_index: usize) -> u64 {
|
||||
@@ -139,15 +154,31 @@ impl AsyncRead for FileSequence {
|
||||
|
||||
while total_read < buf.len() {
|
||||
if this.current_file.is_none() {
|
||||
// Stop if there are no more files to read
|
||||
if let Some(file_index) = this.current_file_index {
|
||||
// Stop if there are no more files to read
|
||||
if file_index >= this.files.len() - 1 {
|
||||
return Poll::Ready(Ok(total_read));
|
||||
}
|
||||
let start_pos = this.get_file_position(file_index);
|
||||
let file_size = this.files[file_index].1 as usize;
|
||||
let file_pos = this.position - start_pos;
|
||||
let space_left = file_size - file_pos as usize;
|
||||
let skip_bytes = (buf.len() - total_read).min(space_left);
|
||||
this.position += skip_bytes as u64;
|
||||
total_read += skip_bytes;
|
||||
}
|
||||
|
||||
// Open the next file
|
||||
if let Err(e) = smol::block_on(this.open_next_file()) {
|
||||
return Poll::Ready(Err(e));
|
||||
match smol::block_on(this.open_next_file()) {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
|
||||
return Poll::Ready(Ok(total_read));
|
||||
}
|
||||
Err(e) if e.kind() == io::ErrorKind::NotFound => {
|
||||
this.current_file = None;
|
||||
continue; // Skip to next file
|
||||
}
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,6 +190,7 @@ impl AsyncRead for FileSequence {
|
||||
this.current_file = None; // Move to the next file
|
||||
} else {
|
||||
total_read += bytes_read;
|
||||
this.position += bytes_read as u64;
|
||||
}
|
||||
}
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
|
||||
@@ -76,7 +76,10 @@
|
||||
//! The full file is not copied, and individual chunks are not stored by
|
||||
//! geode. Additionally it does not keep track of the full files path.
|
||||
|
||||
use std::{collections::HashSet, path::PathBuf};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use futures::{AsyncRead, AsyncSeek};
|
||||
use log::{debug, info, warn};
|
||||
@@ -88,7 +91,6 @@ use smol::{
|
||||
},
|
||||
stream::StreamExt,
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
use crate::{Error, Result};
|
||||
|
||||
@@ -363,7 +365,7 @@ impl Geode {
|
||||
let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
|
||||
|
||||
// Seek to the correct position
|
||||
let fileseq = &mut chunked.get_fileseq();
|
||||
let fileseq = &mut chunked.get_fileseq_mut();
|
||||
fileseq.seek(SeekFrom::Start(position)).await?;
|
||||
|
||||
// This will write the chunk, and truncate files if `chunked` is a directory.
|
||||
@@ -371,53 +373,24 @@ impl Geode {
|
||||
|
||||
// If it's the last chunk of a file (and it's *not* a directory),
|
||||
// truncate the file to the correct length.
|
||||
// This is because contrary to directories, for a file shared on fud we
|
||||
// do not know the exact file size from its metadata, we only know the
|
||||
// number of chunks. Therefore we only know the exact size once we know
|
||||
// the size of the last chunk.
|
||||
// This is because contrary to directories, we do not know the exact
|
||||
// file size from its metadata, we only know the number of chunks.
|
||||
// Therefore we only know the exact size once we know the size of the
|
||||
// last chunk.
|
||||
// We also update the `FileSequence` to the exact size.
|
||||
if !chunked.is_dir() && chunk_index == chunked.len() - 1 {
|
||||
let exact_file_size =
|
||||
chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - chunk_slice.len());
|
||||
if let Some(file) = &chunked.get_fileseq().get_current_file() {
|
||||
if let Some(file) = &chunked.get_fileseq_mut().get_current_file() {
|
||||
let _ = file.set_len(exact_file_size as u64).await;
|
||||
}
|
||||
chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
|
||||
}
|
||||
|
||||
Ok((chunk_hash, bytes_written))
|
||||
}
|
||||
|
||||
/// Iterate over chunks and find which chunks are available locally.
|
||||
pub async fn verify_chunks(&self, chunked_file: &mut ChunkedStorage) -> Result<()> {
|
||||
let chunks = chunked_file.get_chunks().clone();
|
||||
let mut available_chunks = vec![];
|
||||
|
||||
// Gather all available chunks
|
||||
for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
|
||||
// Read the chunk using the FileSequence
|
||||
let chunk = match self.read_chunk(&mut chunked_file.get_fileseq(), &chunk_index).await {
|
||||
Ok(c) => c,
|
||||
Err(Error::Io(ErrorKind::NotFound)) => continue,
|
||||
Err(e) => {
|
||||
warn!(target: "geode::verify_chunks()", "Error while verifying chunks: {e}");
|
||||
break
|
||||
}
|
||||
};
|
||||
|
||||
// Perform chunk consistency check
|
||||
if self.verify_chunk(chunk_hash, &chunk) {
|
||||
available_chunks.push(chunk_index);
|
||||
}
|
||||
}
|
||||
|
||||
// Update available chunks
|
||||
for chunk_index in available_chunks {
|
||||
chunked_file.get_chunk_mut(chunk_index).1 = true;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch file/directory metadata from Geode. Returns [`Chunked`]. Returns an error if
|
||||
/// Fetch file/directory metadata from Geode. Returns [`ChunkedStorage`]. Returns an error if
|
||||
/// the read failed in any way (could also be the file does not exist).
|
||||
pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<ChunkedStorage> {
|
||||
let hash_str = hash_to_string(hash);
|
||||
@@ -474,18 +447,14 @@ impl Geode {
|
||||
|
||||
/// Fetch a single chunk from Geode. Returns a Vec containing the chunk content
|
||||
/// if it is found.
|
||||
/// The returned chunk is NOT verified.
|
||||
pub async fn get_chunk(
|
||||
&self,
|
||||
chunked: &mut ChunkedStorage,
|
||||
chunk_hash: &blake3::Hash,
|
||||
path: &Path,
|
||||
) -> Result<Vec<u8>> {
|
||||
info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
|
||||
|
||||
if !path.exists() {
|
||||
return Err(Error::GeodeChunkNotFound)
|
||||
}
|
||||
|
||||
// Get the chunk index in the file from the chunk hash
|
||||
let chunk_index = match chunked.iter().position(|(h, _)| *h == *chunk_hash) {
|
||||
Some(index) => index,
|
||||
@@ -493,19 +462,14 @@ impl Geode {
|
||||
};
|
||||
|
||||
// Read the file to get the chunk content
|
||||
let chunk = self.read_chunk(&mut chunked.get_fileseq(), &chunk_index).await?;
|
||||
|
||||
// Perform chunk consistency check
|
||||
if !self.verify_chunk(chunk_hash, &chunk) {
|
||||
return Err(Error::GeodeNeedsGc)
|
||||
}
|
||||
let chunk = self.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await?;
|
||||
|
||||
Ok(chunk)
|
||||
}
|
||||
|
||||
/// Read the file at `file_path` to get its chunk with index `chunk_index`.
|
||||
/// Returns the chunk content in a Vec.
|
||||
async fn read_chunk(
|
||||
pub async fn read_chunk(
|
||||
&self,
|
||||
mut stream: impl AsyncRead + Unpin + AsyncSeek,
|
||||
chunk_index: &usize,
|
||||
|
||||
Reference in New Issue
Block a user