fud/proto: Implement file request handler

This commit is contained in:
parazyd
2023-07-29 14:07:40 +02:00
parent 63a408c777
commit 98a78098bc
3 changed files with 80 additions and 4 deletions

View File

@@ -146,7 +146,6 @@ impl Fud {
}
};
// TODO: Broadcast file_hash and chunk_hashes as FudFilePut {};
let fud_file = FudFilePut { file_hash, chunk_hashes };
self.p2p.broadcast(&fud_file).await;

View File

@@ -68,6 +68,20 @@ pub struct FudChunkRoute {
}
impl_p2p_message!(FudChunkRoute, "FudChunkRoute");
/// Message representing a file request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileRequest {
pub file_hash: blake3::Hash,
}
impl_p2p_message!(FudFileRequest, "FudFileRequest");
/// Message representing a file reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileReply {
pub chunk_hashes: Vec<blake3::Hash>,
}
impl_p2p_message!(FudFileReply, "FudFileReply");
/// Message representing a chunk request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkRequest {
@@ -78,11 +92,21 @@ impl_p2p_message!(FudChunkRequest, "FudChunkRequest");
/// Message representing a chunk reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkReply {
// TODO: Perhaps this could be a chunk-sized array, but then we need padding?
// TODO: This sould be a chunk-sized array, but then we need padding?
pub chunk: Vec<u8>,
}
impl_p2p_message!(FudChunkReply, "FudChunkReply");
/// Message representing a chunk reply when a file is not found
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileNotFound;
impl_p2p_message!(FudFileNotFound, "FudFileNotFound");
/// Message representing a chunk reply when a chunk is not found
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkNotFound;
impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound");
/// P2P protocol implementation for fud.
pub struct ProtocolFud {
channel: ChannelPtr,
@@ -90,6 +114,7 @@ pub struct ProtocolFud {
chunk_put_sub: MessageSubscription<FudChunkPut>,
file_route_sub: MessageSubscription<FudFileRoute>,
chunk_route_sub: MessageSubscription<FudChunkRoute>,
file_request_sub: MessageSubscription<FudFileRequest>,
chunk_request_sub: MessageSubscription<FudChunkRequest>,
fud: Arc<Fud>,
p2p: P2pPtr,
@@ -108,12 +133,14 @@ impl ProtocolFud {
msg_subsystem.add_dispatch::<FudChunkPut>().await;
msg_subsystem.add_dispatch::<FudFileRoute>().await;
msg_subsystem.add_dispatch::<FudChunkRoute>().await;
msg_subsystem.add_dispatch::<FudFileRequest>().await;
msg_subsystem.add_dispatch::<FudChunkRequest>().await;
let file_put_sub = channel.subscribe_msg::<FudFilePut>().await?;
let chunk_put_sub = channel.subscribe_msg::<FudChunkPut>().await?;
let file_route_sub = channel.subscribe_msg::<FudFileRoute>().await?;
let chunk_route_sub = channel.subscribe_msg::<FudChunkRoute>().await?;
let file_request_sub = channel.subscribe_msg::<FudFileRequest>().await?;
let chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
Ok(Arc::new(Self {
@@ -122,6 +149,7 @@ impl ProtocolFud {
chunk_put_sub,
file_route_sub,
chunk_route_sub,
file_request_sub,
chunk_request_sub,
fud,
p2p,
@@ -331,6 +359,47 @@ impl ProtocolFud {
}
}
async fn handle_fud_file_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_file_request()", "START");
loop {
let file_request = match self.file_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
target: "fud::ProtocolFud::handle_fud_file_request()",
"recv fail: {}", e,
);
continue
}
};
let chunked_file = match self.fud.geode.get(&file_request.file_hash).await {
Ok(v) => v,
Err(Error::GeodeNeedsGc) => {
// TODO: Run GC
continue
}
Err(Error::GeodeFileNotFound) => match self.channel.send(&FudFileNotFound).await {
Ok(()) => continue,
Err(_e) => continue,
},
Err(_e) => continue,
};
let file_reply = FudFileReply {
chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(),
};
match self.channel.send(&file_reply).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
}
async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
@@ -354,13 +423,17 @@ impl ProtocolFud {
}
Err(Error::GeodeChunkNotFound) => {
// TODO: Maybe reply with NotFound so peer can be removed
continue
match self.channel.send(&FudChunkNotFound).await {
Ok(()) => continue,
Err(_e) => continue,
}
}
Err(_e) => continue,
};
// The consistency should already be checked in Geode, so we're
// fine not checking and unwrapping here.
let mut buf = [0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read = chunk_fd.read(&mut buf).await.unwrap();
@@ -384,6 +457,7 @@ impl ProtocolBase for ProtocolFud {
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_put(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_file_route(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_route(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_file_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
debug!(target: "fud::ProtocolFud::start()", "END");
Ok(())

View File

@@ -466,6 +466,9 @@ pub enum Error {
#[error("Geode needs garbage collection")]
GeodeNeedsGc,
#[error("Geode file not found")]
GeodeFileNotFound,
#[error("Geode chunk not found")]
GeodeChunkNotFound,