net/connector: Do transport mixing inside connect()

This commit is contained in:
parazyd
2023-07-08 21:17:00 +02:00
parent cec104f1a7
commit 33408bb3a4
4 changed files with 32 additions and 16 deletions

View File

@@ -43,12 +43,28 @@ impl Connector {
}
/// Establish an outbound connection
pub async fn connect(&self, endpoint: Url) -> Result<ChannelPtr> {
pub async fn connect(&self, url: &Url) -> Result<(Url, ChannelPtr)> {
let mut endpoint = url.clone();
let transports = &self.settings.allowed_transports;
let scheme = endpoint.scheme();
if !transports.contains(&scheme.to_string()) && self.settings.transport_mixing {
if transports.contains(&"tor".to_string()) && scheme == "tcp" {
endpoint.set_scheme("tor")?;
} else if transports.contains(&"tor+tls".to_string()) && scheme == "tcp+tls" {
endpoint.set_scheme("tor+tls")?;
} else if transports.contains(&"nym".to_string()) && scheme == "tcp" {
endpoint.set_scheme("nym")?;
} else if transports.contains(&"nym+tls".to_string()) && scheme == "tcp+tls" {
endpoint.set_scheme("nym+tls")?;
}
}
let dialer = Dialer::new(endpoint.clone()).await?;
let timeout = Duration::from_secs(self.settings.outbound_connect_timeout);
let ptstream = dialer.dial(Some(timeout)).await?;
let channel = Channel::new(ptstream, endpoint, self.session.clone()).await;
Ok(channel)
let channel = Channel::new(ptstream, endpoint.clone(), self.session.clone()).await;
Ok((endpoint, channel))
}
}

View File

@@ -121,11 +121,11 @@ impl ManualSession {
"[P2P] Connecting to manual outbound [{}] (attempt #{})",
addr, tried_attempts,
);
match connector.connect(addr.clone()).await {
Ok(channel) => {
match connector.connect(&addr).await {
Ok((url, channel)) => {
info!(
target: "net::manual_session",
"[P2P] Manual outbound connected [{}]", addr,
"[P2P] Manual outbound connected [{}]", url,
);
let stop_sub =
@@ -147,7 +147,7 @@ impl ManualSession {
stop_sub.receive().await;
info!(
target: "net::manual_session",
"[P2P] Manual outbound disconnected [{}]", addr,
"[P2P] Manual outbound disconnected [{}]", url,
);
// DEV NOTE: Here we can choose to attempt reconnection again
return Ok(())

View File

@@ -88,7 +88,7 @@ impl OutboundInfo {
async fn dnet_info(&self, p2p: P2pPtr) -> Option<Self> {
let Some(ref addr) = self.addr else { return None };
let Some(chan) = p2p.channels().lock().await.get(&addr).cloned() else { return None };
let Some(chan) = p2p.channels().lock().await.get(addr).cloned() else { return None };
Some(Self {
addr: self.addr.clone(),
@@ -237,12 +237,12 @@ impl OutboundSession {
slot_number, addr,
);
match connector.connect(addr.clone()).await {
Ok(channel) => {
match connector.connect(&addr).await {
Ok((url, channel)) => {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
slot_number, addr
slot_number, url
);
let stop_sub =

View File

@@ -149,25 +149,25 @@ impl SeedSyncSession {
let parent = Arc::downgrade(&self);
let connector = Connector::new(settings.clone(), Arc::new(parent));
match connector.connect(seed.clone()).await {
Ok(ch) => {
match connector.connect(&seed).await {
Ok((url, ch)) => {
info!(
target: "net::session::seedsync_session",
"[P2P] Connected seed #{} [{}]", seed_index, seed,
"[P2P] Connected seed #{} [{}]", seed_index, url,
);
if let Err(e) = self.clone().register_channel(ch.clone(), ex.clone()).await {
warn!(
target: "net::session::seedsync_session",
"[P2P] Failure during sync seed session #{} [{}]: {}",
seed_index, seed, e,
seed_index, url, e,
);
}
info!(
target: "net::session::seedsync_session",
"[P2P] Disconnecting from seed #{} [{}]",
seed_index, seed,
seed_index, url,
);
ch.stop().await;
}