From ccc5d9cf30b6325a556248792919b41037fa27ca Mon Sep 17 00:00:00 2001 From: parazyd Date: Tue, 2 Jul 2024 12:13:22 +0200 Subject: [PATCH] net/tor: Use P2P datastore for Tor's state and cache dirs --- src/net/acceptor.rs | 9 +++++++- src/net/connector.rs | 2 +- src/net/transport/mod.rs | 14 ++++++------ src/net/transport/tor.rs | 43 +++++++++++++++++++++++++++++-------- src/rpc/client.rs | 4 ++-- src/rpc/server.rs | 2 +- tests/network_transports.rs | 12 +++++------ 7 files changed, 59 insertions(+), 27 deletions(-) diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 1c88e9b8d..1de013e9d 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -63,7 +63,14 @@ impl Acceptor { /// Start accepting inbound socket connections pub async fn start(self: Arc, endpoint: Url, ex: Arc>) -> Result<()> { - let listener = Listener::new(endpoint).await?.listen().await?; + let listener = Listener::new( + endpoint, + self.session.upgrade().unwrap().p2p().settings().datastore.clone(), + ) + .await? + .listen() + .await?; + self.accept(listener, ex); Ok(()) } diff --git a/src/net/connector.rs b/src/net/connector.rs index 7f20dffae..84a391ef0 100644 --- a/src/net/connector.rs +++ b/src/net/connector.rs @@ -76,7 +76,7 @@ impl Connector { } } - let dialer = Dialer::new(endpoint.clone()).await?; + let dialer = Dialer::new(endpoint.clone(), self.settings.datastore.clone()).await?; let timeout = Duration::from_secs(self.settings.outbound_connect_timeout); let stop_fut = async { diff --git a/src/net/transport/mod.rs b/src/net/transport/mod.rs index 72da0a56a..127fcfa2b 100644 --- a/src/net/transport/mod.rs +++ b/src/net/transport/mod.rs @@ -126,8 +126,8 @@ macro_rules! enforce_abspath { } impl Dialer { - /// Instantiate a new [`Dialer`] with the given [`Url`]. - pub async fn new(endpoint: Url) -> io::Result { + /// Instantiate a new [`Dialer`] with the given [`Url`] and datastore path. + pub async fn new(endpoint: Url, datastore: Option) -> io::Result { match endpoint.scheme().to_lowercase().as_str() { #[cfg(feature = "p2p-tcp")] "tcp" => { @@ -151,7 +151,7 @@ impl Dialer { "tor" => { // Build a Tor dialer enforce_hostport!(endpoint); - let variant = tor::TorDialer::new().await?; + let variant = tor::TorDialer::new(datastore).await?; let variant = DialerVariant::Tor(variant); Ok(Self { endpoint, variant }) } @@ -160,7 +160,7 @@ impl Dialer { "tor+tls" => { // Build a Tor dialer wrapped with TLS enforce_hostport!(endpoint); - let variant = tor::TorDialer::new().await?; + let variant = tor::TorDialer::new(datastore).await?; let variant = DialerVariant::TorTls(variant); Ok(Self { endpoint, variant }) } @@ -287,9 +287,9 @@ pub struct Listener { } impl Listener { - /// Instantiate a new [`Listener`] with the given [`Url`]. + /// Instantiate a new [`Listener`] with the given [`Url`] and datastore path. /// Must contain a scheme, host string, and a port. - pub async fn new(endpoint: Url) -> io::Result { + pub async fn new(endpoint: Url, datastore: Option) -> io::Result { match endpoint.scheme().to_lowercase().as_str() { #[cfg(feature = "p2p-tcp")] "tcp" => { @@ -313,7 +313,7 @@ impl Listener { "tor" => { // Build a Tor Hidden Service listener enforce_hostport!(endpoint); - let variant = tor::TorListener::new().await?; + let variant = tor::TorListener::new(datastore).await?; let variant = ListenerVariant::Tor(variant); Ok(Self { endpoint, variant }) } diff --git a/src/net/transport/tor.rs b/src/net/transport/tor.rs index 513a1e8bb..748e4d9ae 100644 --- a/src/net/transport/tor.rs +++ b/src/net/transport/tor.rs @@ -24,7 +24,7 @@ use std::{ }; use arti_client::{ - config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto}, + config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto, TorClientConfigBuilder}, DataStream, StreamPrefs, TorClient, }; use async_trait::async_trait; @@ -47,18 +47,21 @@ use tor_rtcompat::PreferredRuntime; use url::Url; use super::{PtListener, PtStream}; +use crate::util::path::expand_path; /// A static for `TorClient` reusability static TOR_CLIENT: OnceCell> = OnceCell::new(); /// Tor Dialer implementation #[derive(Debug, Clone)] -pub struct TorDialer; +pub struct TorDialer { + datastore: Option, +} impl TorDialer { /// Instantiate a new [`TorDialer`] object - pub(crate) async fn new() -> io::Result { - Ok(Self {}) + pub(crate) async fn new(datastore: Option) -> io::Result { + Ok(Self { datastore }) } /// Internal dial function @@ -75,7 +78,17 @@ impl TorDialer { let client = match TOR_CLIENT .get_or_try_init(|| async { debug!(target: "net::tor::do_dial", "Bootstrapping..."); - TorClient::builder().create_bootstrapped().await + if let Some(datadir) = &self.datastore { + let datadir = expand_path(datadir).unwrap(); + + let config = TorClientConfigBuilder::from_directories(datadir.clone(), datadir) + .build() + .unwrap(); + + TorClient::create_bootstrapped(config).await + } else { + TorClient::builder().create_bootstrapped().await + } }) .await { @@ -139,12 +152,14 @@ impl TorDialer { /// Tor Listener implementation #[derive(Clone, Debug)] -pub struct TorListener; +pub struct TorListener { + datastore: Option, +} impl TorListener { /// Instantiate a new [`TorListener`] - pub async fn new() -> io::Result { - Ok(Self {}) + pub async fn new(datastore: Option) -> io::Result { + Ok(Self { datastore }) } /// Internal listen function @@ -154,7 +169,17 @@ impl TorListener { let client = match TOR_CLIENT .get_or_try_init(|| async { debug!(target: "net::tor::do_dial", "Bootstrapping..."); - TorClient::builder().create_bootstrapped().await + if let Some(datadir) = &self.datastore { + let datadir = expand_path(datadir).unwrap(); + + let config = TorClientConfigBuilder::from_directories(datadir.clone(), datadir) + .build() + .unwrap(); + + TorClient::create_bootstrapped(config).await + } else { + TorClient::builder().create_bootstrapped().await + } }) .await { diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 3c77a9c31..ff75c9683 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -58,7 +58,7 @@ impl RpcClient { // Instantiate Dialer and dial the server // TODO: Could add a timeout here - let dialer = Dialer::new(endpoint).await?; + let dialer = Dialer::new(endpoint, None).await?; let stream = dialer.dial(None).await?; // Create the StoppableTask running the request-reply loop. @@ -282,7 +282,7 @@ impl RpcChadClient { // Instantiate Dialer and dial the server // TODO: Could add a timeout here - let dialer = Dialer::new(endpoint).await?; + let dialer = Dialer::new(endpoint, None).await?; let stream = dialer.dial(None).await?; // Create the StoppableTask running the request-reply loop. diff --git a/src/rpc/server.rs b/src/rpc/server.rs index 4240dc9b4..169114264 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -340,7 +340,7 @@ pub async fn listen_and_serve( conn_limit: Option, ex: Arc>, ) -> Result<()> { - let listener = Listener::new(accept_url).await?.listen().await?; + let listener = Listener::new(accept_url, None).await?.listen().await?; run_accept_loop(listener, rh, conn_limit, ex.clone()).await } diff --git a/tests/network_transports.rs b/tests/network_transports.rs index a9a892822..9f7ebf78b 100644 --- a/tests/network_transports.rs +++ b/tests/network_transports.rs @@ -28,7 +28,7 @@ fn tcp_transport() { let url = Url::parse("tcp://127.0.0.1:5432").unwrap(); smol::block_on(executor.run(async { - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap(); executor .spawn(async move { let (stream, _) = listener.next().await.unwrap(); @@ -39,7 +39,7 @@ fn tcp_transport() { let payload = "ohai tcp"; - let dialer = Dialer::new(url).await.unwrap(); + let dialer = Dialer::new(url, None).await.unwrap(); let mut client = dialer.dial(None).await.unwrap(); payload.encode_async(&mut client).await.unwrap(); @@ -59,7 +59,7 @@ fn tcp_tls_transport() { let url = Url::parse("tcp+tls://127.0.0.1:5433").unwrap(); smol::block_on(executor.run(async { - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap(); executor .spawn(async move { let (stream, _) = listener.next().await.unwrap(); @@ -70,7 +70,7 @@ fn tcp_tls_transport() { let payload = "ohai tls"; - let dialer = Dialer::new(url).await.unwrap(); + let dialer = Dialer::new(url, None).await.unwrap(); let mut client = dialer.dial(None).await.unwrap(); payload.encode_async(&mut client).await.unwrap(); @@ -92,7 +92,7 @@ fn unix_transport() { .unwrap(); smol::block_on(executor.run(async { - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap(); executor .spawn(async move { let (stream, _) = listener.next().await.unwrap(); @@ -103,7 +103,7 @@ fn unix_transport() { let payload = "ohai unix"; - let dialer = Dialer::new(url).await.unwrap(); + let dialer = Dialer::new(url, None).await.unwrap(); let mut client = dialer.dial(None).await.unwrap(); payload.encode_async(&mut client).await.unwrap();