fud, fu: handle auto ipv6 external addr, improve fu get, improve fu watch

This commit is contained in:
epiphany
2025-04-11 17:58:05 +02:00
committed by epiphany1
parent 8d22f69cf8
commit b8331f71b9
6 changed files with 96 additions and 78 deletions

View File

@@ -149,7 +149,7 @@ impl Fu {
JsonValue::String(file_name.unwrap_or_default()),
]),
);
let _ = self.rpc_client.request(req).await;
let _ = self.rpc_client.request(req).await?;
loop {
match subscription.receive().await {
@@ -428,31 +428,12 @@ impl Fu {
let info =
params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
match params.get("event").unwrap().get::<String>().unwrap().as_str() {
"download_started" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"file_download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"chunk_download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"download_completed" => {
"download_started" |
"file_download_completed" |
"chunk_download_completed" |
"download_completed" |
"missing_chunks" |
"file_not_found" => {
let resource = info
.get("resource")
.unwrap()
@@ -478,14 +459,6 @@ impl Fu {
update_resource(resource).await;
}
}
"missing_chunks" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"download_error" => {
// An error that caused the download to be unsuccessful
}

View File

@@ -87,11 +87,10 @@ impl From<DhtNode> for DhtRouterItem {
// TODO: Add a DhtSettings
pub struct Dht {
pub p2p: P2pPtr,
pub executor: ExecutorPtr,
/// Our own node id
pub node_id: blake3::Hash,
/// Are we bootstrapped?
pub bootstrapped: Arc<RwLock<bool>>,
/// Vec of buckets
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
/// Number of parallel lookup requests
@@ -106,10 +105,11 @@ pub struct Dht {
pub channel_cache: Arc<RwLock<HashMap<blake3::Hash, u32>>>,
/// Node ID -> Set of keys
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
/// Our own node
pub node: DhtNode,
/// Seconds
pub timeout: u64,
pub p2p: P2pPtr,
pub executor: ExecutorPtr,
}
impl Dht {
pub async fn new(
@@ -127,7 +127,7 @@ impl Dht {
}
Self {
p2p: p2p.clone(),
node_id: *node_id,
buckets: Arc::new(RwLock::new(buckets)),
bootstrapped: Arc::new(RwLock::new(false)),
alpha: a,
@@ -136,9 +136,10 @@ impl Dht {
node_cache: Arc::new(RwLock::new(HashMap::new())),
channel_cache: Arc::new(RwLock::new(HashMap::new())),
router_cache: Arc::new(RwLock::new(HashMap::new())),
executor: ex,
node: DhtNode { id: *node_id, addresses: p2p.clone().hosts().external_addrs().await },
timeout,
p2p: p2p.clone(),
executor: ex,
}
}
@@ -152,6 +153,23 @@ impl Dht {
*bootstrapped = true;
}
/// Get own node
pub async fn node(&self) -> DhtNode {
DhtNode {
id: self.node_id,
addresses: self
.p2p
.clone()
.hosts()
.external_addrs()
.await
.iter()
.filter(|addr| !addr.to_string().contains("[::]"))
.cloned()
.collect(),
}
}
// Get the distance between `key_1` and `key_2`
pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
let bytes1 = key_1.as_bytes();
@@ -166,7 +184,7 @@ impl Dht {
result_bytes
}
// Sort `nodes`
// Sort `nodes` by distance from `key`
pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
nodes.sort_by(|a, b| {
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
@@ -177,10 +195,10 @@ impl Dht {
// key -> bucket index
pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
if key == &self.node.id {
if key == &self.node_id {
return 0
}
let distance = self.distance(&self.node.id, key);
let distance = self.distance(&self.node_id, key);
let mut leading_zeros = 0;
for &byte in &distance {
@@ -261,11 +279,12 @@ pub trait DhtHandler {
message: &M,
router: DhtRouterPtr,
) -> Result<()> {
if self.dht().node.addresses.is_empty() {
let self_node = self.dht().node().await;
if self_node.addresses.is_empty() {
return Err(().into()); // TODO
}
self.add_to_router(router.clone(), key, vec![self.dht().node.clone().into()]).await;
self.add_to_router(router.clone(), key, vec![self_node.clone().into()]).await;
let nodes = self.lookup_nodes(key).await?;
for node in nodes {
@@ -331,7 +350,7 @@ pub trait DhtHandler {
// Add a node in the correct bucket
async fn add_node(&self, node: DhtNode) {
// Do not add ourselves to the buckets
if node.id == self.dht().node.id {
if node.id == self.dht().node_id {
return;
}
@@ -434,7 +453,7 @@ pub trait DhtHandler {
match value_result {
Ok(mut nodes) => {
// Remove ourselves from the new nodes
nodes.retain(|node| node.id != self.dht().node.id);
nodes.retain(|node| node.id != self.dht().node_id);
// Add each new node to our buckets
for node in nodes.clone() {

View File

@@ -23,16 +23,10 @@ use std::{
sync::Arc,
};
use num_bigint::BigUint;
use resource::{Resource, ResourceStatus};
use rpc::{ChunkDownloadCompleted, ChunkNotFound};
use tasks::FetchReply;
use crate::rpc::FudEvent;
use async_trait::async_trait;
use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr};
use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
use num_bigint::BigUint;
use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore};
use smol::{
channel,
@@ -44,10 +38,14 @@ use smol::{
};
use structopt_toml::{structopt::StructOpt, StructOptToml};
use crate::rpc::FudEvent;
use darkfi::{
async_daemonize, cli_desc,
geode::{hash_to_string, ChunkedFile, Geode},
net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr},
net::{
session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr,
Settings as NetSettings,
},
rpc::{
jsonrpc::JsonSubscriber,
p2p_method::HandlerP2p,
@@ -58,6 +56,10 @@ use darkfi::{
util::path::expand_path,
Error, Result,
};
use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr};
use resource::{Resource, ResourceStatus};
use rpc::{ChunkDownloadCompleted, ChunkNotFound};
use tasks::FetchReply;
/// P2P protocols
mod proto;
@@ -184,13 +186,12 @@ impl DhtHandler for Fud {
self.dht().set_bootstrapped().await;
// Lookup our own node id
let self_node = self.dht().node.clone();
debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self_node.id));
let _ = self.lookup_nodes(&self_node.id).await;
debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self.dht().node_id));
let _ = self.lookup_nodes(&self.dht().node_id).await;
}
// Send keys that are closer to this node than we are
let self_id = self.dht().node.id;
let self_id = self.dht().node_id;
let channel = self.get_channel(node).await?;
for (key, seeders) in self.seeders_router.read().await.iter() {
let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id));
@@ -250,12 +251,14 @@ impl Fud {
info!(target: "fud::init()", "Verifying resources...");
let resources = self.get_seeding_resources().await?;
if self.dht().node.clone().addresses.is_empty() {
let self_node = self.dht().node().await;
if self_node.addresses.is_empty() {
return Ok(());
}
info!(target: "fud::init()", "Start seeding...");
let self_router_items: Vec<DhtRouterItem> = vec![self.dht().node.clone().into()];
let self_router_items: Vec<DhtRouterItem> = vec![self_node.into()];
for resource in resources {
self.add_to_router(
@@ -689,9 +692,10 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let geode = Geode::new(&basedir).await?;
info!("Instantiating P2P network");
let p2p = P2p::new(args.net.into(), ex.clone()).await?;
let net_settings: NetSettings = args.net.into();
let p2p = P2p::new(net_settings.clone(), ex.clone()).await?;
let external_addrs = p2p.hosts().external_addrs().await;
let external_addrs = net_settings.external_addrs;
if external_addrs.is_empty() {
warn!(target: "fud::realmain", "No external addresses, you won't be able to seed")

View File

@@ -175,7 +175,7 @@ impl ProtocolFud {
};
info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING");
let reply = FudPingReply { node: self.fud.dht.node.clone() };
let reply = FudPingReply { node: self.fud.dht.node().await };
match self.channel.send(&reply).await {
Ok(()) => continue,
Err(_e) => continue,

View File

@@ -82,7 +82,9 @@ impl Fud {
// --> {"jsonrpc": "2.0", "method": "put", "params": ["/foo.txt"], "id": 42}
// <-- {"jsonrpc": "2.0", "result: "df4...3db7", "id": 42}
async fn put(&self, id: u16, params: JsonValue) -> JsonResult {
if self.dht().node.addresses.is_empty() {
let self_node = self.dht.node().await;
if self_node.addresses.is_empty() {
error!(target: "fud::put()", "Cannot put file, you don't have any external address");
return JsonError::new(
ErrorCode::InternalError,
@@ -123,8 +125,7 @@ impl Fud {
};
// Announce file
let self_node = self.dht.node.clone();
let fud_announce = FudAnnounce { key: file_hash, seeders: vec![self_node.clone().into()] };
let fud_announce = FudAnnounce { key: file_hash, seeders: vec![self_node.into()] };
let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await;
JsonResponse::new(JsonValue::String(hash_to_string(&file_hash)), id).into()
@@ -142,13 +143,20 @@ impl Fud {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let mut hash_buf = [0u8; 32];
let mut hash_buf = vec![];
match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
Ok(_) => {}
Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
}
let file_hash = blake3::Hash::from_bytes(hash_buf);
if hash_buf.len() != 32 {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let mut hash_buf_arr = [0u8; 32];
hash_buf_arr.copy_from_slice(&hash_buf);
let file_hash = blake3::Hash::from_bytes(hash_buf_arr);
let file_hash_str = hash_to_string(&file_hash);
let file_path = match params[1].get::<String>() {
@@ -359,6 +367,7 @@ pub struct ChunkNotFound {
#[derive(Clone, Debug)]
pub struct FileNotFound {
pub file_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct MissingChunks {
@@ -434,7 +443,10 @@ impl From<ChunkNotFound> for JsonValue {
}
impl From<FileNotFound> for JsonValue {
fn from(info: FileNotFound) -> JsonValue {
json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))])
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("resource", info.resource.into()),
])
}
}
impl From<MissingChunks> for JsonValue {
@@ -490,7 +502,7 @@ impl From<FudEvent> for JsonValue {
impl Fud {
/// Handle `get` RPC request
pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<()> {
let self_node = self.dht().node.clone();
let self_node = self.dht().node().await;
// Add resource to `self.resources`
let resource = Resource {
@@ -523,9 +535,19 @@ impl Fud {
Ok(()) => self.geode.get(&i_file_hash).await.unwrap(),
Err(Error::GeodeFileRouteNotFound) => {
self.event_publisher
.notify(FudEvent::FileNotFound(FileNotFound { file_hash: *file_hash }))
.await;
// Set resource status to `Incomplete` and send FudEvent::FileNotFound
let mut resources_write = self.resources.write().await;
if let Some(resource) = resources_write.get_mut(file_hash) {
resource.status = ResourceStatus::Incomplete;
self.event_publisher
.notify(FudEvent::FileNotFound(FileNotFound {
file_hash: *file_hash,
resource: resource.clone(),
}))
.await;
}
drop(resources_write);
return Err(Error::GeodeFileRouteNotFound);
}

View File

@@ -102,7 +102,7 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
loop {
sleep(interval).await;
let seeders = vec![fud.dht().node.clone().into()];
let seeders = vec![fud.dht().node().await.into()];
info!(target: "fud::announce_task()", "Verifying seeds...");
let seeding_resources = match fud.get_seeding_resources().await {