diff --git a/Cargo.lock b/Cargo.lock index 0b7fd8d75..553505988 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1847,6 +1847,7 @@ dependencies = [ "monero", "nu-ansi-term", "num-bigint", + "oxy-upnp-igd", "parking_lot 0.12.5", "pin-project-lite", "plotters", @@ -3352,6 +3353,17 @@ dependencies = [ "url", ] +[[package]] +name = "getifaddrs" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06f770241660efeeedb30b48ccbe0a8e6f649322148237539e71f3ccd14a4461" +dependencies = [ + "bitflags 2.10.0", + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -4862,6 +4874,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "oxy-upnp-igd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d27818d61515cd7f5592635320b7fe8981ff2c0a133e5ba94bb4707b59627f0d" +dependencies = [ + "async-channel 2.5.0", + "async-trait", + "futures", + "getifaddrs", + "quick-xml", + "smol", + "thiserror 2.0.17", + "tracing", + "url", +] + [[package]] name = "p256" version = "0.13.2" @@ -5566,6 +5595,15 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2e3bf4aa9d243beeb01a7b3bc30b77cfe2c44e24ec02d751a7104a53c2c49a1" +dependencies = [ + "memchr", +] + [[package]] name = "quinn-proto-smol" version = "0.12.0" diff --git a/Cargo.toml b/Cargo.toml index 29f9c0e88..1c9088376 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,9 @@ ed25519-compact = {version = "2.2.0", optional = true} rcgen = {version = "0.14.6", optional = true, features = ["pem"]} x509-parser = {version = "0.18.0", features = ["validate", "verify"], optional = true} +# UPnP IGD for NAT traversal +oxy-upnp-igd = {version = "0.1", optional = true} + # Encoding bs58 = {version = "0.5.1", optional = true} hex = {version = "0.4.3", optional = true} @@ -205,6 +208,10 @@ p2p-tor = [ "tor-cell", ] +upnp-igd = [ + "oxy-upnp-igd", +] + net-defaults = [ "async-trait", "ed25519-compact", diff --git a/bin/darkirc/Cargo.toml b/bin/darkirc/Cargo.toml index 3912c4a12..62f5e224a 100644 --- a/bin/darkirc/Cargo.toml +++ b/bin/darkirc/Cargo.toml @@ -17,6 +17,10 @@ path = "src/lib.rs" name = "darkirc" path = "src/main.rs" +[features] +default = [] +upnp-igd = ["darkfi/upnp-igd"] + [dependencies] darkfi = {path = "../../", features = ["async-daemonize", "event-graph", "rpc", "zk"]} darkfi-sdk = {path = "../../src/sdk", features = ["async"]} diff --git a/src/error.rs b/src/error.rs index 46536ca83..9a6600498 100644 --- a/src/error.rs +++ b/src/error.rs @@ -202,6 +202,10 @@ pub enum Error { #[error("Invalid state transition: current_state={0}, end_state={1}")] HostStateBlocked(String, String), + #[cfg(feature = "upnp-igd")] + #[error(transparent)] + UpnpError(#[from] oxy_upnp_igd::Error), + // ============= // Crypto errors // ============= diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 7cf2c2d71..c02041943 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -24,18 +24,27 @@ use std::{ }, }; -use smol::Executor; use tracing::warn; use url::Url; +#[cfg(feature = "upnp-igd")] +use smol::lock::Mutex as AsyncMutex; + use super::{ channel::{Channel, ChannelPtr}, hosts::HostColor, session::SessionWeakPtr, transport::{Listener, PtListener}, }; + +#[cfg(feature = "upnp-igd")] +use super::upnp::{setup_port_mappings, PortMapping}; + use crate::{ - system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, + system::{ + CondVar, ExecutorPtr, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, + Subscription, + }, util::logger::verbose, Error, Result, }; @@ -49,6 +58,8 @@ pub struct Acceptor { task: StoppableTaskPtr, session: SessionWeakPtr, conn_count: AtomicUsize, + #[cfg(feature = "upnp-igd")] + port_mappings: AsyncMutex>>, } impl Acceptor { @@ -59,11 +70,13 @@ impl Acceptor { task: StoppableTask::new(), session, conn_count: AtomicUsize::new(0), + #[cfg(feature = "upnp-igd")] + port_mappings: AsyncMutex::new(Vec::new()), }) } /// Start accepting inbound socket connections - pub async fn start(self: Arc, endpoint: Url, ex: Arc>) -> Result<()> { + pub async fn start(self: Arc, endpoint: Url, ex: ExecutorPtr) -> Result<()> { let datastore = self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone(); @@ -88,12 +101,29 @@ impl Acceptor { .push(onion_addr); } + #[cfg(feature = "upnp-igd")] + { + let actual_endpoint = listener.endpoint().await; + let settings = self.session.upgrade().unwrap().p2p().settings(); + let mappings = setup_port_mappings(&actual_endpoint, settings, ex.clone()); + self.port_mappings.lock().await.extend(mappings); + } + self.accept(ptlistener, ex); Ok(()) } /// Stop accepting inbound socket connections pub async fn stop(&self) { + // Stop all port mappings + #[cfg(feature = "upnp-igd")] + { + let mappings = std::mem::take(&mut *self.port_mappings.lock().await); + for mapping in mappings { + mapping.stop(); + } + } + // Send stop signal self.task.stop().await; } @@ -104,7 +134,7 @@ impl Acceptor { } /// Run the accept loop in a new thread and error if a connection problem occurs - fn accept(self: Arc, listener: Box, ex: Arc>) { + fn accept(self: Arc, listener: Box, ex: ExecutorPtr) { let self_ = self.clone(); self.task.clone().start( self.run_accept_loop(listener, ex.clone()), @@ -118,7 +148,7 @@ impl Acceptor { async fn run_accept_loop( self: Arc, listener: Box, - ex: Arc>, + ex: ExecutorPtr, ) -> Result<()> { // CondVar used to notify the loop to recheck if new connections can // be accepted by the listener. diff --git a/src/net/mod.rs b/src/net/mod.rs index 265182479..a5dead558 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -55,6 +55,10 @@ pub use message_publisher::MessageSubscription; /// Exposes agnostic dialers and agnostic listeners. pub mod transport; +/// Port mapping protocols (UPnP, NAT-PMP, PCP) +#[cfg(feature = "upnp-igd")] +pub mod upnp; + /// Hosts are a list of network addresses used when establishing outbound /// connections. /// diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index ce61c62f6..c340de442 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use smol::{lock::RwLock as AsyncRwLock, Executor}; use std::{sync::Arc, time::UNIX_EPOCH}; use tracing::debug; +use url::Url; use super::{ super::{ @@ -76,6 +77,16 @@ const PROTO_NAME: &str = "ProtocolAddress"; const TRANSPORT_COMBOS: [&str; 9] = ["tor", "tls", "tcp", "nym", "i2p", "tor+tls", "nym+tls", "tcp+tls", "i2p+tls"]; +/// Strip query parameters from a URL before broadcasting. +/// +/// This prevents leaking internal tracking identifiers (e.g., UPnP cookies) +/// that could be used for fingerprinting nodes on the P2P network. +fn strip_query_params(url: &Url) -> Url { + let mut stripped = url.clone(); + stripped.set_query(None); + stripped +} + impl ProtocolAddress { /// Creates a new address protocol. Makes an address, an external address /// and a get-address subscription and adds them to the address protocol @@ -244,8 +255,9 @@ impl ProtocolAddress { let mut addrs = vec![]; for addr in external_addrs { + let stripped_addr = strip_query_params(&addr); let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - addrs.push((addr, last_seen)); + addrs.push((stripped_addr, last_seen)); } debug!( diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index f910b0634..628019acd 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -26,7 +26,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; -use smol::{lock::Mutex, Executor}; +use smol::lock::Mutex; use tracing::{debug, error, warn}; use url::Url; @@ -40,7 +40,7 @@ use super::{ Session, SessionBitFlag, SESSION_INBOUND, }; use crate::{ - system::{StoppableTask, StoppableTaskPtr, Subscription}, + system::{ExecutorPtr, StoppableTask, StoppableTaskPtr, Subscription}, util::logger::verbose, Error, Result, }; @@ -136,7 +136,7 @@ impl InboundSession { index: usize, accept_addr: Url, acceptor: AcceptorPtr, - ex: Arc>, + ex: ExecutorPtr, ) -> Result<()> { verbose!(target: "net::inbound_session", "[P2P] Starting Inbound session #{index} on {accept_addr}"); // Start listener @@ -156,7 +156,7 @@ impl InboundSession { self: Arc, channel_sub: Subscription>, index: usize, - ex: Arc>, + ex: ExecutorPtr, ) -> Result<()> { loop { let channel = channel_sub.receive().await?; @@ -169,12 +169,7 @@ impl InboundSession { /// Registers the channel. First performs a network handshake and starts the channel. /// Then starts sending keep-alive and address messages across the channel. - async fn setup_channel( - self: Arc, - index: usize, - channel: ChannelPtr, - ex: Arc>, - ) { + async fn setup_channel(self: Arc, index: usize, channel: ChannelPtr, ex: ExecutorPtr) { verbose!( target: "net::inbound_session::setup_channel", "[P2P] Connected Inbound #{index} [{}]", channel.display_address() diff --git a/src/net/upnp.rs b/src/net/upnp.rs new file mode 100644 index 000000000..a2563e1fe --- /dev/null +++ b/src/net/upnp.rs @@ -0,0 +1,419 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2026 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//! UPnP IGD port mapping implementation +//! +//! This module provides UPnP Internet Gateway Device (IGD) port mapping +//! with automatic lease renewal and persistent retry for roaming support. + +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + sync::Arc, + time::Duration, +}; + +use async_trait::async_trait; +use oxy_upnp_igd::{add_port_mapping_lazy, Protocol, RenewalHandle}; +use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; +use tracing::error; +use url::Url; + +use crate::{ + net::settings::Settings, + system::{sleep, ExecutorPtr, StoppableTask, StoppableTaskPtr}, + util::logger::verbose, + Error, Result, +}; + +/// Trait for port mapping protocols (UPnP, NAT-PMP, PCP) +/// +/// Each protocol runs its own persistent task that: +/// 1. Attempts to discover a gateway +/// 2. Creates port mappings when gateway is found +/// 3. Periodically refreshes the external address +/// 4. Retries discovery on failures (supports roaming) +pub trait PortMapping: Send + Sync { + /// Start the port mapping protocol - runs forever with retries + fn start( + self: Arc, + settings: Arc>, + executor: ExecutorPtr, + ) -> Result<()>; + + /// Stop the port mapping protocol + fn stop(self: Arc); +} + +/// UPnP port mapping configuration +#[derive(Clone, Debug)] +pub struct UpnpConfig { + /// Port mapping lease duration in seconds + pub lease_duration: u32, + /// Gateway discovery timeout in seconds + pub discovery_timeout_secs: u64, + /// Description for port mapping (visible in router admin panel) + pub mapping_description: String, + /// External address refresh interval in seconds + pub ext_addr_refresh: u64, + /// How often to retry discovery if gateway not found (roaming support) + pub retry_interval_secs: u64, +} + +impl Default for UpnpConfig { + fn default() -> Self { + Self { + lease_duration: 300, + discovery_timeout_secs: 3, + mapping_description: "DarkFi".to_string(), + ext_addr_refresh: 120, + retry_interval_secs: 60, + } + } +} + +/// UPnP IGD port mapping protocol +/// +/// Maintains a persistent task that: +/// - Discovers UPnP gateway +/// - Creates port mappings +/// - Periodically refreshes the external address +/// - Retries discovery on failures (supports roaming devices) +pub struct UpnpPortMapping { + config: UpnpConfig, + internal_endpoint: Url, + handle: AsyncMutex>, + task: StoppableTaskPtr, +} + +impl UpnpPortMapping { + /// Create a new UPnP port mapping instance + pub fn new(config: UpnpConfig, internal_endpoint: Url) -> Self { + Self { + config, + internal_endpoint, + handle: AsyncMutex::new(None), + task: StoppableTask::new(), + } + } + + /// Main protocol loop - runs forever with retries + async fn run(&self, settings: Arc>, ex: &ExecutorPtr) -> Result<()> { + loop { + if self.try_create_mapping(ex).await.is_err() { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Gateway discovery failed, retrying in {}s", + self.config.retry_interval_secs + ); + sleep(self.config.retry_interval_secs).await; + continue; + } + + verbose!( + target: "net::upnp", + "[P2P] UPnP: Gateway discovered, mapping active for {}", + self.internal_endpoint + ); + + if self.run_refresh_loop(settings.clone()).await.is_err() { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Gateway lost, retrying discovery in {}s", + self.config.retry_interval_secs + ); + sleep(self.config.retry_interval_secs).await; + continue; + } + + unreachable!("UPnP refresh loop should never complete normally"); + } + } + + /// Attempt to discover gateway and create initial port mapping + async fn try_create_mapping(&self, ex: &ExecutorPtr) -> Result<()> { + let protocol = match self.internal_endpoint.scheme() { + "tcp" | "tcp+tls" => Protocol::TCP, + "quic" => Protocol::UDP, + s => { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Unsupported scheme '{s}', skipping" + ); + return Err(Error::NetworkServiceStopped); + } + }; + + // UPnP IGD port mapping is IPv4-only + let is_ipv4 = match self.internal_endpoint.host() { + Some(url::Host::Ipv4(_)) => true, + Some(url::Host::Ipv6(_)) => false, + // Treating domains as IPv4 is safe and generally useful + Some(url::Host::Domain(_)) => true, + None => false, + }; + + if !is_ipv4 { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Skipping IPv6 endpoint {} (IGD pinhole not implemented)", + self.internal_endpoint + ); + return Err(Error::NetworkServiceStopped); + } + + let internal_port = match self.internal_endpoint.port() { + Some(port) => port, + None => { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Invalid endpoint (missing port): {}", + self.internal_endpoint + ); + return Err(Error::NetworkServiceStopped); + } + }; + + let timeout = Duration::from_secs(self.config.discovery_timeout_secs); + + verbose!( + target: "net::upnp", + "[P2P] UPnP: Attempting port mapping for internal port {}", + internal_port + ); + + // This will return immediately with a lazy handle + let handle = add_port_mapping_lazy( + ex.clone(), + internal_port, + protocol, + &self.config.mapping_description, + self.config.lease_duration, + timeout, + ) + .await?; + + *self.handle.lock().await = Some(handle); + Ok(()) + } + + /// Refresh loop - updates external address periodically + async fn run_refresh_loop(&self, settings: Arc>) -> Result<()> { + loop { + sleep(self.config.ext_addr_refresh).await; + + let Some(external_url) = self.get_external_address().await else { + verbose!( + target: "net::upnp", + "[P2P] UPnP: Gateway no longer available" + ); + return Err(Error::NetworkServiceStopped); + }; + + // Update settings with new external address + let mut settings = settings.write().await; + + // Remove our old address (avoid duplicates) + let internal_id = format_address_id(&self.internal_endpoint, "upnp"); + settings.external_addrs.retain(|addr: &Url| { + if let Some(query) = addr.query() { + !query.contains(internal_id.as_str()) + } else { + true // Keep manually configured addresses + } + }); + + // Add new external address + settings.external_addrs.push(external_url.clone()); + + verbose!( + target: "net::upnp", + "[P2P] UPnP: Updated external address: {}", + external_url + ); + } + } + + /// Get current external address from UPnP handle + async fn get_external_address(&self) -> Option { + let handle = self.handle.lock().await; + let handle = handle.as_ref()?; + + let external_ip = handle.external_ip().await; + if external_ip.is_unspecified() { + return None; + } + + let external_port = handle.external_port(); + if external_port == 0 { + return None; + } + + let scheme = self.internal_endpoint.scheme(); + let internal_id = format_address_id(&self.internal_endpoint, "upnp"); + + Url::parse(&format!( + "{}://{}:{}?source=upnp&{}", + scheme, external_ip, external_port, internal_id + )) + .ok() + } +} + +#[async_trait] +impl PortMapping for UpnpPortMapping { + fn start(self: Arc, settings: Arc>, ex: ExecutorPtr) -> Result<()> { + let self_ = self.clone(); + let settings_ = settings.clone(); + let ex_ = ex.clone(); + self.task.clone().start( + async move { self_.run(settings_, &ex_).await }, + |result| async move { + match result { + Ok(()) => { + // Should never complete normally + error!("[P2P] UPnP task completed unexpectedly"); + } + Err(Error::NetworkServiceStopped) => { + // Expected when stopping + } + Err(e) => { + error!("[P2P] UPnP task error: {e}"); + } + } + }, + Error::NetworkServiceStopped, + ex, + ); + Ok(()) + } + + fn stop(self: Arc) { + // Stop the task (synchronous, signals the task to stop) + self.task.stop_nowait(); + // Handle dropped - mapping expires naturally + verbose!( + target: "net::upnp", + "[P2P] UPnP: Stopped port mapping for {}", + self.internal_endpoint + ); + } +} + +/// Format an identifier for this listener + protocol combination +/// +/// This utility is shared across all port mapping protocols (UPnP, NAT-PMP, PCP) +/// to create consistent, unique identifiers for external addresses. +pub fn format_address_id(endpoint: &Url, protocol: &str) -> String { + // Hash the endpoint URL to create a unique alphanumeric identifier + let mut hasher = DefaultHasher::new(); + endpoint.hash(&mut hasher); + let hash = hasher.finish(); + + format!("{}_cookie={:016x}", protocol, hash) +} + +/// Create UPnP port mapping from URL query parameters +pub fn create_upnp_from_url(url: &Url) -> Option> { + // Check if UPnP is explicitly enabled + if !url.query_pairs().any(|(key, value)| key == "upnp_igd" && value == "true") { + return None; + } + + // Parse configuration from URL query parameters using safe URL library methods + let mut config = UpnpConfig::default(); + + for (key, value) in url.query_pairs() { + match key.as_ref() { + "upnp_igd_lease_duration" => { + if let Ok(val) = value.parse::() { + config.lease_duration = val; + } + } + "upnp_igd_timeout" => { + if let Ok(val) = value.parse::() { + config.discovery_timeout_secs = val; + } + } + "upnp_igd_description" => { + config.mapping_description = value.into_owned(); + } + "upnp_igd_ext_addr_refresh" => { + if let Ok(val) = value.parse::() { + config.ext_addr_refresh = val; + } + } + _ => {} + } + } + + Some(Arc::new(UpnpPortMapping::new(config, url.clone()))) +} + +/// Initialize port mappings from URL query parameters. +/// +/// This function parses the endpoint URL for port mapping configuration, +/// creates the appropriate port mapping instances, and starts them. +/// Each port mapping runs its own persistent task for lease renewal +/// and external address updates. +/// +/// # Examples +/// ```text +/// // Enable UPnP with defaults +/// ?upnp_igd=true +/// +/// // UPnP with custom settings +/// ?upnp_igd=true&upnp_igd_lease_duration=600 +/// +/// // Multiple protocols +/// ?upnp_igd=true&pcp=true +/// ``` +/// +/// # Arguments +/// * `endpoint` - The actual endpoint URL with query parameters and +/// *assigned port* +/// * `settings` - P2P settings for updating external addresses +/// * `ex` - Executor for running async tasks +/// +/// # Returns +/// A vector of started port mappings (they auto-clean on drop) +pub fn setup_port_mappings( + actual_endpoint: &Url, + settings: Arc>, + ex: ExecutorPtr, +) -> Vec> { + let Some(mapping) = create_upnp_from_url(actual_endpoint) else { return vec![] }; + + if let Err(e) = Arc::clone(&mapping).start(settings.clone(), ex.clone()) { + error!( + target: "net::upnp", + "[P2P] UPnP port mapping: Failed to start for {}: {e}", + actual_endpoint + ); + return vec![] + } + + verbose!( + target: "net::upnp", + "[P2P] UPnP: Port mapping started for {}", + actual_endpoint + ); + vec![mapping] + + // Future: Add NAT-PMP, PCP here with similar patterns +}