net/tor: Use P2P datastore for Tor's state and cache dirs

This commit is contained in:
parazyd
2024-07-02 12:13:22 +02:00
parent c6aa5c1544
commit ccc5d9cf30
7 changed files with 59 additions and 27 deletions

View File

@@ -63,7 +63,14 @@ impl Acceptor {
/// Start accepting inbound socket connections
pub async fn start(self: Arc<Self>, endpoint: Url, ex: Arc<Executor<'_>>) -> Result<()> {
let listener = Listener::new(endpoint).await?.listen().await?;
let listener = Listener::new(
endpoint,
self.session.upgrade().unwrap().p2p().settings().datastore.clone(),
)
.await?
.listen()
.await?;
self.accept(listener, ex);
Ok(())
}

View File

@@ -76,7 +76,7 @@ impl Connector {
}
}
let dialer = Dialer::new(endpoint.clone()).await?;
let dialer = Dialer::new(endpoint.clone(), self.settings.datastore.clone()).await?;
let timeout = Duration::from_secs(self.settings.outbound_connect_timeout);
let stop_fut = async {

View File

@@ -126,8 +126,8 @@ macro_rules! enforce_abspath {
}
impl Dialer {
/// Instantiate a new [`Dialer`] with the given [`Url`].
pub async fn new(endpoint: Url) -> io::Result<Self> {
/// Instantiate a new [`Dialer`] with the given [`Url`] and datastore path.
pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
match endpoint.scheme().to_lowercase().as_str() {
#[cfg(feature = "p2p-tcp")]
"tcp" => {
@@ -151,7 +151,7 @@ impl Dialer {
"tor" => {
// Build a Tor dialer
enforce_hostport!(endpoint);
let variant = tor::TorDialer::new().await?;
let variant = tor::TorDialer::new(datastore).await?;
let variant = DialerVariant::Tor(variant);
Ok(Self { endpoint, variant })
}
@@ -160,7 +160,7 @@ impl Dialer {
"tor+tls" => {
// Build a Tor dialer wrapped with TLS
enforce_hostport!(endpoint);
let variant = tor::TorDialer::new().await?;
let variant = tor::TorDialer::new(datastore).await?;
let variant = DialerVariant::TorTls(variant);
Ok(Self { endpoint, variant })
}
@@ -287,9 +287,9 @@ pub struct Listener {
}
impl Listener {
/// Instantiate a new [`Listener`] with the given [`Url`].
/// Instantiate a new [`Listener`] with the given [`Url`] and datastore path.
/// Must contain a scheme, host string, and a port.
pub async fn new(endpoint: Url) -> io::Result<Self> {
pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
match endpoint.scheme().to_lowercase().as_str() {
#[cfg(feature = "p2p-tcp")]
"tcp" => {
@@ -313,7 +313,7 @@ impl Listener {
"tor" => {
// Build a Tor Hidden Service listener
enforce_hostport!(endpoint);
let variant = tor::TorListener::new().await?;
let variant = tor::TorListener::new(datastore).await?;
let variant = ListenerVariant::Tor(variant);
Ok(Self { endpoint, variant })
}

View File

@@ -24,7 +24,7 @@ use std::{
};
use arti_client::{
config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto},
config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto, TorClientConfigBuilder},
DataStream, StreamPrefs, TorClient,
};
use async_trait::async_trait;
@@ -47,18 +47,21 @@ use tor_rtcompat::PreferredRuntime;
use url::Url;
use super::{PtListener, PtStream};
use crate::util::path::expand_path;
/// A static for `TorClient` reusability
static TOR_CLIENT: OnceCell<TorClient<PreferredRuntime>> = OnceCell::new();
/// Tor Dialer implementation
#[derive(Debug, Clone)]
pub struct TorDialer;
pub struct TorDialer {
datastore: Option<String>,
}
impl TorDialer {
/// Instantiate a new [`TorDialer`] object
pub(crate) async fn new() -> io::Result<Self> {
Ok(Self {})
pub(crate) async fn new(datastore: Option<String>) -> io::Result<Self> {
Ok(Self { datastore })
}
/// Internal dial function
@@ -75,7 +78,17 @@ impl TorDialer {
let client = match TOR_CLIENT
.get_or_try_init(|| async {
debug!(target: "net::tor::do_dial", "Bootstrapping...");
TorClient::builder().create_bootstrapped().await
if let Some(datadir) = &self.datastore {
let datadir = expand_path(datadir).unwrap();
let config = TorClientConfigBuilder::from_directories(datadir.clone(), datadir)
.build()
.unwrap();
TorClient::create_bootstrapped(config).await
} else {
TorClient::builder().create_bootstrapped().await
}
})
.await
{
@@ -139,12 +152,14 @@ impl TorDialer {
/// Tor Listener implementation
#[derive(Clone, Debug)]
pub struct TorListener;
pub struct TorListener {
datastore: Option<String>,
}
impl TorListener {
/// Instantiate a new [`TorListener`]
pub async fn new() -> io::Result<Self> {
Ok(Self {})
pub async fn new(datastore: Option<String>) -> io::Result<Self> {
Ok(Self { datastore })
}
/// Internal listen function
@@ -154,7 +169,17 @@ impl TorListener {
let client = match TOR_CLIENT
.get_or_try_init(|| async {
debug!(target: "net::tor::do_dial", "Bootstrapping...");
TorClient::builder().create_bootstrapped().await
if let Some(datadir) = &self.datastore {
let datadir = expand_path(datadir).unwrap();
let config = TorClientConfigBuilder::from_directories(datadir.clone(), datadir)
.build()
.unwrap();
TorClient::create_bootstrapped(config).await
} else {
TorClient::builder().create_bootstrapped().await
}
})
.await
{

View File

@@ -58,7 +58,7 @@ impl RpcClient {
// Instantiate Dialer and dial the server
// TODO: Could add a timeout here
let dialer = Dialer::new(endpoint).await?;
let dialer = Dialer::new(endpoint, None).await?;
let stream = dialer.dial(None).await?;
// Create the StoppableTask running the request-reply loop.
@@ -282,7 +282,7 @@ impl RpcChadClient {
// Instantiate Dialer and dial the server
// TODO: Could add a timeout here
let dialer = Dialer::new(endpoint).await?;
let dialer = Dialer::new(endpoint, None).await?;
let stream = dialer.dial(None).await?;
// Create the StoppableTask running the request-reply loop.

View File

@@ -340,7 +340,7 @@ pub async fn listen_and_serve(
conn_limit: Option<usize>,
ex: Arc<smol::Executor<'_>>,
) -> Result<()> {
let listener = Listener::new(accept_url).await?.listen().await?;
let listener = Listener::new(accept_url, None).await?.listen().await?;
run_accept_loop(listener, rh, conn_limit, ex.clone()).await
}

View File

@@ -28,7 +28,7 @@ fn tcp_transport() {
let url = Url::parse("tcp://127.0.0.1:5432").unwrap();
smol::block_on(executor.run(async {
let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap();
let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap();
executor
.spawn(async move {
let (stream, _) = listener.next().await.unwrap();
@@ -39,7 +39,7 @@ fn tcp_transport() {
let payload = "ohai tcp";
let dialer = Dialer::new(url).await.unwrap();
let dialer = Dialer::new(url, None).await.unwrap();
let mut client = dialer.dial(None).await.unwrap();
payload.encode_async(&mut client).await.unwrap();
@@ -59,7 +59,7 @@ fn tcp_tls_transport() {
let url = Url::parse("tcp+tls://127.0.0.1:5433").unwrap();
smol::block_on(executor.run(async {
let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap();
let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap();
executor
.spawn(async move {
let (stream, _) = listener.next().await.unwrap();
@@ -70,7 +70,7 @@ fn tcp_tls_transport() {
let payload = "ohai tls";
let dialer = Dialer::new(url).await.unwrap();
let dialer = Dialer::new(url, None).await.unwrap();
let mut client = dialer.dial(None).await.unwrap();
payload.encode_async(&mut client).await.unwrap();
@@ -92,7 +92,7 @@ fn unix_transport() {
.unwrap();
smol::block_on(executor.run(async {
let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap();
let listener = Listener::new(url.clone(), None).await.unwrap().listen().await.unwrap();
executor
.spawn(async move {
let (stream, _) = listener.next().await.unwrap();
@@ -103,7 +103,7 @@ fn unix_transport() {
let payload = "ohai unix";
let dialer = Dialer::new(url).await.unwrap();
let dialer = Dialer::new(url, None).await.unwrap();
let mut client = dialer.dial(None).await.unwrap();
payload.encode_async(&mut client).await.unwrap();