diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 1a3b55131..a53f8645d 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -146,7 +146,6 @@ impl Fud { } }; - // TODO: Broadcast file_hash and chunk_hashes as FudFilePut {}; let fud_file = FudFilePut { file_hash, chunk_hashes }; self.p2p.broadcast(&fud_file).await; diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 730429ac9..d83e99a59 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -68,6 +68,20 @@ pub struct FudChunkRoute { } impl_p2p_message!(FudChunkRoute, "FudChunkRoute"); +/// Message representing a file request from the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFileRequest { + pub file_hash: blake3::Hash, +} +impl_p2p_message!(FudFileRequest, "FudFileRequest"); + +/// Message representing a file reply from the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFileReply { + pub chunk_hashes: Vec, +} +impl_p2p_message!(FudFileReply, "FudFileReply"); + /// Message representing a chunk request from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkRequest { @@ -78,11 +92,21 @@ impl_p2p_message!(FudChunkRequest, "FudChunkRequest"); /// Message representing a chunk reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkReply { - // TODO: Perhaps this could be a chunk-sized array, but then we need padding? + // TODO: This sould be a chunk-sized array, but then we need padding? pub chunk: Vec, } impl_p2p_message!(FudChunkReply, "FudChunkReply"); +/// Message representing a chunk reply when a file is not found +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFileNotFound; +impl_p2p_message!(FudFileNotFound, "FudFileNotFound"); + +/// Message representing a chunk reply when a chunk is not found +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudChunkNotFound; +impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound"); + /// P2P protocol implementation for fud. pub struct ProtocolFud { channel: ChannelPtr, @@ -90,6 +114,7 @@ pub struct ProtocolFud { chunk_put_sub: MessageSubscription, file_route_sub: MessageSubscription, chunk_route_sub: MessageSubscription, + file_request_sub: MessageSubscription, chunk_request_sub: MessageSubscription, fud: Arc, p2p: P2pPtr, @@ -108,12 +133,14 @@ impl ProtocolFud { msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; let file_put_sub = channel.subscribe_msg::().await?; let chunk_put_sub = channel.subscribe_msg::().await?; let file_route_sub = channel.subscribe_msg::().await?; let chunk_route_sub = channel.subscribe_msg::().await?; + let file_request_sub = channel.subscribe_msg::().await?; let chunk_request_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { @@ -122,6 +149,7 @@ impl ProtocolFud { chunk_put_sub, file_route_sub, chunk_route_sub, + file_request_sub, chunk_request_sub, fud, p2p, @@ -331,6 +359,47 @@ impl ProtocolFud { } } + async fn handle_fud_file_request(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_file_request()", "START"); + + loop { + let file_request = match self.file_request_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + target: "fud::ProtocolFud::handle_fud_file_request()", + "recv fail: {}", e, + ); + continue + } + }; + + let chunked_file = match self.fud.geode.get(&file_request.file_hash).await { + Ok(v) => v, + Err(Error::GeodeNeedsGc) => { + // TODO: Run GC + continue + } + + Err(Error::GeodeFileNotFound) => match self.channel.send(&FudFileNotFound).await { + Ok(()) => continue, + Err(_e) => continue, + }, + + Err(_e) => continue, + }; + + let file_reply = FudFileReply { + chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(), + }; + + match self.channel.send(&file_reply).await { + Ok(()) => continue, + Err(_e) => continue, + } + } + } + async fn handle_fud_chunk_request(self: Arc) -> Result<()> { debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START"); @@ -354,13 +423,17 @@ impl ProtocolFud { } Err(Error::GeodeChunkNotFound) => { - // TODO: Maybe reply with NotFound so peer can be removed - continue + match self.channel.send(&FudChunkNotFound).await { + Ok(()) => continue, + Err(_e) => continue, + } } Err(_e) => continue, }; + // The consistency should already be checked in Geode, so we're + // fine not checking and unwrapping here. let mut buf = [0u8; MAX_CHUNK_SIZE]; let mut chunk_fd = File::open(&chunk_path).await.unwrap(); let bytes_read = chunk_fd.read(&mut buf).await.unwrap(); @@ -384,6 +457,7 @@ impl ProtocolBase for ProtocolFud { self.jobsman.clone().spawn(self.clone().handle_fud_chunk_put(), executor.clone()).await; self.jobsman.clone().spawn(self.clone().handle_fud_file_route(), executor.clone()).await; self.jobsman.clone().spawn(self.clone().handle_fud_chunk_route(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_file_request(), executor.clone()).await; self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await; debug!(target: "fud::ProtocolFud::start()", "END"); Ok(()) diff --git a/src/error.rs b/src/error.rs index 8e0e90b0d..a64bed4d9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -466,6 +466,9 @@ pub enum Error { #[error("Geode needs garbage collection")] GeodeNeedsGc, + #[error("Geode file not found")] + GeodeFileNotFound, + #[error("Geode chunk not found")] GeodeChunkNotFound,