net: use the version exchange to auto-discover our own address when connecting to outbound (protocol_address) or seed (protocol_seed), AND our inbound is set to tcp or tcp+tls with host = [::] (ipv6).

This commit is contained in:
darkfi
2025-02-08 11:04:06 +01:00
parent 06ac53c5c7
commit bbaa871971
6 changed files with 90 additions and 28 deletions

View File

@@ -3,10 +3,10 @@ verbose = 2
[net]
hostlist = "example/node1.tsv"
inbound = ["tcp://127.0.0.1:10202"]
external_addrs = ["tcp://127.0.0.1:10202"]
inbound = ["tcp://[::]:10202"]
allowed_transports = ["tcp"]
seeds = ["tcp://127.0.0.1:10200"]
external_addrs = ["tcp://[::1]:10202"]
seeds = ["tcp://[::1]:10200"]
outbound_connections = 5
magic_bytes = [127, 64, 12, 201]
localnet = true

View File

@@ -3,11 +3,10 @@ verbose = 2
[net]
hostlist = "example/node2.tsv"
inbound = ["tcp://127.0.0.1:10201"]
#auto_advertise = true
external_addrs=["tcp://127.0.0.1:10201"]
inbound = ["tcp://[::]:10201"]
allowed_transports = ["tcp"]
seeds = ["tcp://127.0.0.1:10200"]
external_addrs=["tcp://[::1]:10201"]
seeds = ["tcp://[::1]:10200"]
outbound_connections = 5
magic_bytes = [127, 64, 12, 201]
localnet = true

View File

@@ -3,11 +3,10 @@ verbose = 2
[net]
hostlist = "example/seed.tsv"
inbound=["tcp://127.0.0.1:10200"]
external_addrs=["tcp://127.0.0.1:10200"]
inbound=["tcp://[::]:10200"]
allowed_transports = ["tcp"]
seeds=[]
outbound_connections = 5
# no outbounds for seed node
magic_bytes=[127, 64, 12, 201]
localnet = true

View File

@@ -32,7 +32,7 @@ use log::{debug, error, info, trace, warn};
use rand::{rngs::OsRng, Rng};
use smol::{
io::{self, AsyncRead, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
lock::Mutex,
lock::{Mutex as AsyncMutex, OnceCell},
Executor,
};
use url::Url;
@@ -77,9 +77,9 @@ impl ChannelInfo {
/// Async channel for communication between nodes.
pub struct Channel {
/// The reading half of the transport stream
reader: Mutex<ReadHalf<Box<dyn PtStream>>>,
reader: AsyncMutex<ReadHalf<Box<dyn PtStream>>>,
/// The writing half of the transport stream
writer: Mutex<WriteHalf<Box<dyn PtStream>>>,
writer: AsyncMutex<WriteHalf<Box<dyn PtStream>>>,
/// The message subsystem instance for this channel
message_subsystem: MessageSubsystem,
/// Publisher listening for stop signal for closing this channel
@@ -93,7 +93,7 @@ pub struct Channel {
/// The version message of the node we are connected to.
/// Some if the version exchange has already occurred, None
/// otherwise.
pub version: Mutex<Option<Arc<VersionMessage>>>,
pub version: OnceCell<Arc<VersionMessage>>,
/// Channel debug info
pub info: ChannelInfo,
}
@@ -109,13 +109,12 @@ impl Channel {
session: SessionWeakPtr,
) -> Arc<Self> {
let (reader, writer) = io::split(stream);
let reader = Mutex::new(reader);
let writer = Mutex::new(writer);
let reader = AsyncMutex::new(reader);
let writer = AsyncMutex::new(writer);
let message_subsystem = MessageSubsystem::new();
Self::setup_dispatchers(&message_subsystem).await;
let version = Mutex::new(None);
let start_time = UNIX_EPOCH.elapsed().unwrap().as_secs();
let info = ChannelInfo::new(resolve_addr, connect_addr.clone(), start_time);
@@ -127,7 +126,7 @@ impl Channel {
receive_task: StoppableTask::new(),
stopped: AtomicBool::new(false),
session,
version,
version: OnceCell::new(),
info,
})
}
@@ -487,7 +486,11 @@ impl Channel {
/// Set the VersionMessage of the node this channel is connected
/// to. Called on receiving a version message in `ProtocolVersion`.
pub(crate) async fn set_version(&self, version: Arc<VersionMessage>) {
*self.version.lock().await = Some(version);
self.version.set(version).await.unwrap();
}
/// Should only be called after the version exchange has been completed.
pub fn get_version(&self) -> Arc<VersionMessage> {
self.version.get().unwrap().clone()
}
/// Returns the inner [`MessageSubsystem`] reference

View File

@@ -16,15 +16,15 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use async_trait::async_trait;
use log::debug;
use smol::{lock::RwLock as AsyncRwLock, Executor};
use std::{sync::Arc, time::UNIX_EPOCH};
use url::{Host, Url};
use super::{
super::{
channel::ChannelPtr,
channel::{Channel, ChannelPtr},
hosts::{HostColor, HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_publisher::MessageSubscription,
@@ -218,8 +218,7 @@ impl ProtocolAddress {
"[START] channel address={}", self.channel.address(),
);
let type_id = self.channel.session_type_id();
if type_id != SESSION_OUTBOUND {
if self.channel.session_type_id() != SESSION_OUTBOUND {
debug!(
target: "net::protocol_address::send_my_addrs",
"Not an outbound session. Stopping",
@@ -227,7 +226,16 @@ impl ProtocolAddress {
return Ok(())
}
let external_addrs = self.settings.read().await.external_addrs.clone();
let settings = self.settings.read().await;
let mut external_addrs = settings.external_addrs.clone();
// Auto-advertise the node's inbound address using the address that
// was sent to use by the node in the version exchange.
for inbound in settings.inbound_addrs.clone() {
let Some(inbound) = Self::patch_inbound(&self.channel, inbound) else { continue };
external_addrs.push(inbound);
}
drop(settings);
if external_addrs.is_empty() {
debug!(
@@ -259,6 +267,48 @@ impl ProtocolAddress {
Ok(())
}
/// If the inbound is an Ipv6 address, then replace it with the ip address reported to
/// us by the version exchange.
///
/// Also used by ProtocolSeed.
pub(super) fn patch_inbound(channel: &Channel, mut inbound: Url) -> Option<Url> {
if inbound.scheme() != "tcp" && inbound.scheme() != "tcp+tls" {
return None
}
let inbound_host = inbound.host()?;
// Is it an Ipv6 listener?
match inbound_host {
Host::Ipv6(addr) => {
// We are only interested if it's localhost
if !addr.is_loopback() {
return None
}
}
_ => return None,
}
// We should loop over the endpoints from the listeners
// But inbound session should be changed so the acceptors and listeners
// are accessible.
/*
let Some(mut port) = inbound.port() else { continue };
if port == 0 {
}
*/
// Get our auto-discovered IP
let version = channel.get_version();
let discover_host = version.connect_recv_addr.host()?;
// Check the reported address is Ipv6
let _ = match discover_host {
Host::Ipv6(_) => {}
_ => return None,
};
inbound.set_host(version.connect_recv_addr.host_str()).ok()?;
Some(inbound)
}
}
#[async_trait]

View File

@@ -16,11 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use async_trait::async_trait;
use log::debug;
use smol::{lock::RwLock as AsyncRwLock, Executor};
use std::{sync::Arc, time::UNIX_EPOCH};
use super::{
super::{
@@ -31,6 +30,7 @@ use super::{
p2p::P2pPtr,
settings::Settings,
},
protocol_address::ProtocolAddress,
protocol_base::{ProtocolBase, ProtocolBasePtr},
};
use crate::Result;
@@ -63,7 +63,18 @@ impl ProtocolSeed {
"[START] channel address={}", self.channel.address(),
);
let external_addrs = self.settings.read().await.external_addrs.clone();
let settings = self.settings.read().await;
let mut external_addrs = settings.external_addrs.clone();
// Auto-advertise the node's inbound address using the address that
// was sent to use by the node in the version exchange.
for inbound in settings.inbound_addrs.clone() {
let Some(inbound) = ProtocolAddress::patch_inbound(&self.channel, inbound) else {
continue
};
external_addrs.push(inbound);
}
drop(settings);
if external_addrs.is_empty() {
debug!(