From b7bfa7e4ff3aefcbffdd0ab945ec8d00a281fd79 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 7 May 2022 15:22:46 +0300 Subject: [PATCH] net3: WIP implement tor protocol --- example/p2pdebug/src/main.rs | 108 ++++++++++++++++++++++------------- src/net3/acceptor.rs | 71 +++++++++++++++++++++-- src/net3/connector.rs | 44 ++++++++++++-- src/net3/transport/tor.rs | 26 +++++---- 4 files changed, 188 insertions(+), 61 deletions(-) diff --git a/example/p2pdebug/src/main.rs b/example/p2pdebug/src/main.rs index 5a4df40a9..1beddd7cf 100644 --- a/example/p2pdebug/src/main.rs +++ b/example/p2pdebug/src/main.rs @@ -25,7 +25,7 @@ struct Args { /// Verbosity level #[clap(short, parse(from_occurrences))] verbose: u8, - /// node number: + /// node number: /// 0-2 is for seed nodes /// 3-20 is for inbound connections nodes /// 21- is for outbound connections nodes @@ -37,9 +37,15 @@ struct Args { /// communicate using tls protocol by default is tcp #[clap(long)] tls: bool, + /// communicate using tor protocol + #[clap(long)] + tor: bool, /// communicate using tls protocol by default is tcp #[clap(long, default_value = "127.0.0.1:11055")] rpc: SocketAddr, + /// open manual connection + #[clap(long)] + connect: Option, } #[derive(Debug, Clone)] @@ -57,7 +63,12 @@ struct MockP2p { } impl MockP2p { - async fn new(node_number: u8, _broadcast: bool, scheme: &str) -> Result<(net::P2pPtr, Self)> { + async fn new( + node_number: u8, + _broadcast: bool, + scheme: &str, + connect: Option, + ) -> Result<(net::P2pPtr, Self)> { let seed_addrs: Vec = vec![ Url::parse(&format!("{}://127.0.0.1:11001", scheme))?, Url::parse(&format!("{}://127.0.0.1:11002", scheme))?, @@ -69,49 +80,62 @@ impl MockP2p { let mut broadcast = _broadcast; - let p2p = match node_number { - 0..=2 => { - address = Some(seed_addrs[node_number as usize].clone()); + let p2p = if connect.is_none() { + match node_number { + 0..=2 => { + address = Some(seed_addrs[node_number as usize].clone()); - let net_settings = net::Settings { inbound: address.clone(), ..Default::default() }; - let p2p = net::P2p::new(net_settings).await; + let net_settings = + net::Settings { inbound: address.clone(), ..Default::default() }; + let p2p = net::P2p::new(net_settings).await; - broadcast = false; - state = State::Seed; + broadcast = false; + state = State::Seed; - p2p + p2p + } + 3..=20 => { + let random_port: u32 = rand::thread_rng().gen_range(11007..49151); + address = Some(format!("{}://127.0.0.1:{}", scheme, random_port).parse()?); + + let net_settings = net::Settings { + inbound: address.clone(), + external_addr: address.clone(), + seeds: seed_addrs, + ..Default::default() + }; + + let p2p = net::P2p::new(net_settings).await; + + state = State::Inbound; + + p2p + } + _ => { + address = None; + + let net_settings = net::Settings { + outbound_connections: 3, + seeds: seed_addrs, + ..Default::default() + }; + + let p2p = net::P2p::new(net_settings).await; + state = State::Outbound; + + p2p + } } - 3..=20 => { - let random_port: u32 = rand::thread_rng().gen_range(11007..49151); - address = Some(format!("{}://127.0.0.1:{}", scheme, random_port).parse()?); + } else { + address = None; - let net_settings = net::Settings { - inbound: address.clone(), - external_addr: address.clone(), - seeds: seed_addrs, - ..Default::default() - }; + let net_settings = + net::Settings { peers: vec![Url::parse(&connect.unwrap())?], ..Default::default() }; - let p2p = net::P2p::new(net_settings).await; + let p2p = net::P2p::new(net_settings).await; + state = State::Outbound; - state = State::Inbound; - - p2p - } - _ => { - address = None; - - let net_settings = net::Settings { - outbound_connections: 3, - seeds: seed_addrs, - ..Default::default() - }; - - let p2p = net::P2p::new(net_settings).await; - state = State::Outbound; - - p2p - } + p2p }; println!("start {:?} node #{} address {:?}", state, node_number, address); @@ -212,9 +236,13 @@ impl MockP2p { } async fn start(executor: Arc>, args: Args) -> Result<()> { - let scheme = if args.tls { "tls" } else { "tcp" }; + let mut scheme = (if args.tor { "tor" } else { "tcp" }).to_string(); - let (p2p, mock_p2p) = MockP2p::new(args.node, args.broadcast, scheme).await?; + if args.tls { + scheme = format!("{}+tls", scheme); + } + + let (p2p, mock_p2p) = MockP2p::new(args.node, args.broadcast, &scheme, args.connect).await?; mock_p2p.run(p2p.clone(), args.rpc.clone(), executor).await } diff --git a/src/net3/acceptor.rs b/src/net3/acceptor.rs index 7d2c5d733..162626218 100644 --- a/src/net3/acceptor.rs +++ b/src/net3/acceptor.rs @@ -1,6 +1,7 @@ use async_std::sync::Arc; +use std::{env, fs}; -use log::error; +use log::{error, info}; use smol::Executor; use url::Url; @@ -9,7 +10,9 @@ use crate::{ Error, Result, }; -use super::{Channel, ChannelPtr, TcpTransport, Transport, TransportListener, TransportName}; +use super::{ + Channel, ChannelPtr, TcpTransport, TorTransport, Transport, TransportListener, TransportName, +}; /// Atomic pointer to Acceptor class. pub type AcceptorPtr = Arc; @@ -40,14 +43,73 @@ impl Acceptor { let listener = transport.listen_on(accept_url.clone()); if let Err(err) = listener { - error!("Setup failed: {}", err); + error!("TCP Setup failed: {}", err); return Err(Error::BindFailed(accept_url.clone().to_string())) } let listener = listener?.await; if let Err(err) = listener { - error!("Bind listener failed: {}", err); + error!("TCP Bind listener failed: {}", err); + return Err(Error::BindFailed(accept_url.to_string())) + } + + let listener = listener?; + + match upgrade { + None => { + self.accept(Box::new(listener), executor); + } + Some(u) if u == "tls" => { + let tls_listener = transport.upgrade_listener(listener)?.await?; + self.accept(Box::new(tls_listener), executor); + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + } + } + TransportName::Tor(upgrade) => { + let socks5_url = Url::parse( + &env::var("DARKFI_TOR_SOCKS5_URL") + .unwrap_or("socks5://127.0.0.1:9050".to_string()), + )?; + + let torc_url = Url::parse( + &env::var("DARKFI_TOR_CONTROL_URL") + .unwrap_or("tcp://127.0.0.1:9051".to_string()), + )?; + + let auth_cookie = env::var("DARKFI_TOR_COOKIE"); + + if auth_cookie.is_err() { + return Err(Error::TorError( + "Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \ + For example: \ + \'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'".to_string(), + )) + } + + let auth_cookie = auth_cookie.unwrap(); + + let auth_cookie = hex::encode(&fs::read(auth_cookie).unwrap()); + + let transport = TorTransport::new(socks5_url, Some((torc_url, auth_cookie)))?; + + // generate EHS pointing to local address + let hurl = transport.create_ehs(accept_url.clone())?; + + info!("EHS TOR: {}", hurl.to_string()); + + let listener = transport.clone().listen_on(accept_url.clone()); + + if let Err(err) = listener { + error!("TOR Setup failed: {}", err); + return Err(Error::BindFailed(accept_url.clone().to_string())) + } + + let listener = listener?.await; + + if let Err(err) = listener { + error!("TOR Bind listener failed: {}", err); return Err(Error::BindFailed(accept_url.to_string())) } @@ -64,7 +126,6 @@ impl Acceptor { Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), } } - TransportName::Tor(_upgrade) => todo!(), } Ok(()) } diff --git a/src/net3/connector.rs b/src/net3/connector.rs index 2dcfc0db5..5e14b668f 100644 --- a/src/net3/connector.rs +++ b/src/net3/connector.rs @@ -1,12 +1,14 @@ use async_std::{future::timeout, sync::Arc}; -use std::time::Duration; +use std::{env, time::Duration}; use log::error; use url::Url; use crate::{Error, Result}; -use super::{Channel, ChannelPtr, SettingsPtr, TcpTransport, Transport, TransportName}; +use super::{ + Channel, ChannelPtr, SettingsPtr, TcpTransport, TorTransport, Transport, TransportName, +}; /// Create outbound socket connections. pub struct Connector { @@ -41,14 +43,47 @@ impl Connector { let stream = transport.dial(connect_url.clone()); if let Err(err) = stream { - error!("Setup failed: {}", err); + error!("TCP Setup failed: {}", err); return Err(Error::ConnectFailed) } let stream = stream?.await; if let Err(err) = stream { - error!("Connection failed: {}", err); + error!("TCP Connection failed: {}", err); + return Err(Error::ConnectFailed) + } + + let channel = match upgrade { + None => Channel::new(Box::new(stream?), connect_url.clone()).await, + Some(u) if u == "tls" => { + let stream = transport.upgrade_dialer(stream?)?.await; + Channel::new(Box::new(stream?), connect_url).await + } + Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), + }; + + Ok(channel) + } + TransportName::Tor(upgrade) => { + let socks5_url = Url::parse( + &env::var("DARKFI_TOR_SOCKS5_URL") + .unwrap_or("socks5://127.0.0.1:9050".to_string()), + )?; + + let transport = TorTransport::new(socks5_url, None)?; + + let stream = transport.clone().dial(connect_url.clone()); + + if let Err(err) = stream { + error!("TOR Setup failed: {}", err); + return Err(Error::ConnectFailed) + } + + let stream = stream?.await; + + if let Err(err) = stream { + error!("TOR Connection failed: {}", err); return Err(Error::ConnectFailed) } @@ -63,7 +98,6 @@ impl Connector { Ok(channel) } - TransportName::Tor(_upgrade) => todo!(), } } } diff --git a/src/net3/transport/tor.rs b/src/net3/transport/tor.rs index 2533119a4..e7b373c15 100644 --- a/src/net3/transport/tor.rs +++ b/src/net3/transport/tor.rs @@ -1,3 +1,7 @@ +use async_std::{ + net::{TcpListener, TcpStream}, + sync::Arc, +}; use std::{ io, io::{BufRead, BufReader, Write}, @@ -6,10 +10,6 @@ use std::{ time::Duration, }; -use async_std::{ - net::{TcpListener, TcpStream}, - sync::Arc, -}; use fast_socks5::client::{Config, Socks5Stream}; use futures::prelude::*; use futures_rustls::{TlsAcceptor, TlsStream}; @@ -17,9 +17,10 @@ use regex::Regex; use socket2::{Domain, Socket, Type}; use url::Url; -use super::{TlsUpgrade, Transport}; use crate::{Error, Result}; +use super::{TlsUpgrade, Transport, TransportStream}; + /// Implements communication through the tor proxy service. /// /// ## Dialing @@ -162,12 +163,13 @@ impl TorTransport { /// /// * `url` - url that the hidden service maps to. pub fn create_ehs(&self, url: Url) -> Result { - self.tor_controller - .as_ref() - .ok_or_else(|| { - Error::TorError("No controller configured for this transport".to_string()) - })? - .create_ehs(url) + let tor_controller = self.tor_controller.as_ref(); + + if tor_controller.is_none() { + return Err(Error::TorError("No controller configured for this transport".to_string())) + }; + + tor_controller.unwrap().create_ehs(url) } pub async fn do_dial(self, url: Url) -> Result> { @@ -212,6 +214,8 @@ impl TorTransport { } } +impl TransportStream for Socks5Stream {} + impl Transport for TorTransport { type Acceptor = TcpListener; type Connector = Socks5Stream;