Add Tor support to Cuprate (Arti, Tor Daemon, Dandelion router) (#509)

In `workspace`:
- New dependencies: `arti_client`, `Cuprate/tokio-socks.git`, `tor-cell`, `tor-config-path`, `tor-hsservice`, `tor-persist`, `tor-proto`, `tor-rtcompat` (yes nothing was exported).
- In `deny.toml`, whitelisted `Unlicense` license for `arti_client`.

In `cuprate-p2p-transport`:
- Implemented `Transport<ClearNet>` and `Transport<Tor>` for `Arti`. New `ArtiClientConfig`, `ArtiServerConfig` configuration. New `OnionListener` for accepting inbound connections from arti's generated onion service.
- Implemented `Transport<Tor>` for `Daemon`. New `DaemonClientConfig`, `DaemonServerConfig` configuration. New `DaemonInboundStream` listening for incoming TCP  connections from the tor daemon.
- `DisabledListener` as a polyfill for transports with inbound disabled, such as `Transport<ClearNet> for Arti` and in the future `Transport<ClearNet> for Socks5`.

In `cuprate-p2p-core`:
- Removed `Default` and `Debug` bound from `Transport::ClientConfig` and `Transport::ServerConfig`.
- Removed `Clone` bound from `Transport::ServerConfig`.

In `cuprate-p2p`:
- Changed some function visibility to `pub(super)` instead of `pub`.

In `cuprate-wire`:
- Added `borsh` dependency and `BorshSerialize` and `BorshDeserialize` derived implementation to `OnionAddr` for `BorshNetworkZone` requirement in address book.

In `cuprated`:
- New `tor` module containing the initialization of Arti, config helpers and context structure `TorContext` to pass down to p2p initialization function and other helpers.
- New `config/tor` module containing the `[tor]` configuration table. It define tor daemon related variables, as well as arti settings.
- Added `enable_inbound` field to `ClearNetConfig` to disable incoming listener by default.
- Added `proxy` field to `ClearNetConfig` for enabling clearnet over arti and in the future proxy urls.
- Added `TorNetConfig` for setting `Tor` network zone parameters such as listening ports, enabling arti inbound server, or setting an anonymous inbound onion address from an external daemon.
- Modified `initialize_zones_p2p` to now start Tor network zone and use the correct transport depending on user configuration.
- In `txpool/*`, generalized `DiffuseService`, `OutboundPeerStream` and `ConcreteDandelionRouter` for `Z: NetworkZone`. Created a new `MainDandelionRouter` service that will route local txs to a Tor router instead of clearnet if available. Adapted initialization to the changes.

---------

Co-authored-by: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com>
This commit is contained in:
SyntheticBird
2025-08-04 14:31:25 +00:00
committed by GitHub
parent 97e539559a
commit 9c2c942d2f
39 changed files with 4536 additions and 180 deletions

View File

@@ -155,8 +155,8 @@ jobs:
# TODO: support musl <https://github.com/Cuprate/cuprate/issues/336>
# - { image: alpine:latest, commands: "apk update && apk add alpine-sdk cmake curl bash" }
- { image: archlinux:latest, commands: "pacman -Syyu --noconfirm base-devel git cmake openssl" }
- { image: debian:stable, commands: "apt update && apt -y install build-essential curl cmake git libssl-dev" }
- { image: fedora:latest, commands: "dnf install --assumeyes @development-tools gcc gcc-c++ cmake git openssl-devel" }
- { image: debian:stable, commands: "apt update && apt -y install build-essential curl cmake git pkg-config libssl-dev" }
- { image: fedora:latest, commands: "dnf install --assumeyes @development-tools gcc gcc-c++ cmake git openssl-devel perl-core" }
steps:
- name: Checkout

3380
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -167,6 +167,7 @@ axum = { version = "0.7", default-features = false }
anyhow = { version = "1", default-features = false }
arc-swap = { version = "1", default-features = false }
arrayvec = { version = "0.7", default-features = false }
arti-client = { version = "0.32", default-features = false }
async-trait = { version = "0.1", default-features = false }
bitflags = { version = "2", default-features = false }
blake3 = { version = "1", default-features = false }
@@ -203,11 +204,18 @@ thiserror = { version = "1", default-features = false }
thread_local = { version = "1", default-features = false }
tokio-util = { version = "0.7", default-features = false }
tokio-stream = { version = "0.1", default-features = false }
tokio-socks = { git = "https://github.com/Cuprate/tokio-socks.git", rev = "8737caf", default-features = false }
tokio = { version = "1", default-features = false }
tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # <https://github.com/tower-rs/tower/pull/796>
tower-http = { version = "0.6", default-features = false }
toml = { version = "0.8", default-features = false }
toml_edit = { version = "0.22", default-features = false }
tor-cell = { version = "0.32", default-features = false }
tor-config-path = { version = "0.32", default-features = false }
tor-hsservice = { version = "0.32", default-features = false }
tor-persist = { version = "0.32", default-features = false }
tor-proto = { version = "0.32", default-features = false }
tor-rtcompat = { version = "0.32", default-features = false }
tracing-appender = { version = "0.2", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false }
tracing = { version = "0.1", default-features = false }

View File

@@ -28,6 +28,7 @@ cuprate-hex = { workspace = true }
cuprate-json-rpc = { workspace = true }
cuprate-levin = { workspace = true }
cuprate-p2p-core = { workspace = true }
cuprate-p2p-transport = { workspace = true }
cuprate-p2p = { workspace = true }
cuprate-pruning = { workspace = true }
cuprate-rpc-interface = { workspace = true, features = ["dummy"] }
@@ -41,6 +42,7 @@ cuprate-wire = { workspace = true }
# TODO: after v1.0.0, remove unneeded dependencies.
axum = { workspace = true, features = ["tokio", "http1", "http2"] }
anyhow = { workspace = true }
arti-client = { workspace = true, features = ["tokio", "native-tls", "onion-service-client", "onion-service-service", "static"] }
async-trait = { workspace = true }
bitflags = { workspace = true }
borsh = { workspace = true }
@@ -79,6 +81,9 @@ tokio-stream = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true, features = ["parse", "display"]}
toml_edit = { workspace = true }
tor-hsservice = { workspace = true }
tor-persist = { workspace = true }
tor-rtcompat = { workspace = true }
tower = { workspace = true, features = ["limit"] }
tower-http = { workspace = true, features = ["limit"] }
tracing-appender = { workspace = true }

View File

@@ -8,6 +8,7 @@ use std::{
time::Duration,
};
use arti_client::KeystoreSelector;
use clap::Parser;
use serde::{Deserialize, Serialize};
@@ -17,11 +18,13 @@ use cuprate_helper::{
network::Network,
};
use cuprate_p2p::block_downloader::BlockDownloaderConfig;
use cuprate_p2p_core::ClearNet;
use cuprate_p2p_core::{ClearNet, Tor};
use cuprate_wire::OnionAddr;
use crate::{
constants::{DEFAULT_CONFIG_STARTUP_DELAY, DEFAULT_CONFIG_WARNING},
logging::eprintln_red,
tor::{TorContext, TorMode},
};
mod args;
@@ -31,6 +34,7 @@ mod rayon;
mod rpc;
mod storage;
mod tokio;
mod tor;
mod tracing_config;
#[macro_use]
@@ -42,6 +46,7 @@ use rayon::RayonConfig;
pub use rpc::RpcConfig;
use storage::StorageConfig;
use tokio::TokioConfig;
use tor::TorConfig;
use tracing_config::TracingConfig;
/// Header to put at the start of the generated config file.
@@ -144,6 +149,10 @@ config_struct! {
/// Configuration for cuprated's P2P system.
pub p2p: P2PConfig,
#[child = true]
/// Configuration for cuprated's Tor component
pub tor: TorConfig,
#[child = true]
/// Configuration for cuprated's RPC system.
pub rpc: RpcConfig,
@@ -165,6 +174,7 @@ impl Default for Config {
fast_sync: true,
tracing: Default::default(),
tokio: Default::default(),
tor: Default::default(),
rayon: Default::default(),
p2p: Default::default(),
rpc: Default::default(),
@@ -227,6 +237,45 @@ impl Config {
}
}
/// The [`Tor`], [`cuprate_p2p::P2PConfig`].
pub fn tor_p2p_config(&self, ctx: &TorContext) -> cuprate_p2p::P2PConfig<Tor> {
let inbound_enabled = self.p2p.tor_net.inbound_onion;
let our_onion_address = match ctx.mode {
TorMode::Off => None,
TorMode::Daemon => inbound_enabled.then(||
OnionAddr::new(
&self.tor.daemon.anonymous_inbound,
self.p2p.tor_net.p2p_port
).expect("Unable to parse supplied `anonymous_inbound` onion address. Please make sure the address is correct.")),
TorMode::Arti => inbound_enabled.then(|| {
let addr = ctx.arti_onion_service
.as_ref()
.unwrap()
.generate_identity_key(KeystoreSelector::Primary)
.unwrap()
.to_string();
OnionAddr::new(&addr, self.p2p.tor_net.p2p_port).unwrap()
})
};
cuprate_p2p::P2PConfig {
network: self.network,
seeds: p2p::tor_net_seed_nodes(self.network),
outbound_connections: self.p2p.tor_net.outbound_connections,
extra_outbound_connections: self.p2p.tor_net.extra_outbound_connections,
max_inbound_connections: self.p2p.tor_net.max_inbound_connections,
gray_peers_percent: self.p2p.tor_net.gray_peers_percent,
p2p_port: self.p2p.tor_net.p2p_port,
rpc_port: 0,
address_book_config: self.p2p.tor_net.address_book_config.address_book_config(
&self.fs.cache_directory,
self.network,
our_onion_address,
),
}
}
/// The [`ContextConfig`].
pub const fn context_config(&self) -> ContextConfig {
match self.network {

View File

@@ -5,16 +5,24 @@ use std::{
time::Duration,
};
use arti_client::{
config::onion_service::{OnionServiceConfig, OnionServiceConfigBuilder},
TorClient, TorClientBuilder, TorClientConfig,
};
use serde::{Deserialize, Serialize};
use tor_rtcompat::PreferredRuntime;
use cuprate_helper::{fs::address_book_path, network::Network};
use cuprate_p2p::config::TransportConfig;
use cuprate_p2p_core::{
transports::{Tcp, TcpServerConfig},
ClearNet, NetworkZone, Transport,
ClearNet, NetworkZone, Tor, Transport,
};
use cuprate_p2p_transport::{Arti, ArtiClientConfig, ArtiServerConfig};
use cuprate_wire::OnionAddr;
use crate::{p2p::ProxySettings, tor::TorMode};
use super::macros::config_struct;
config_struct! {
@@ -26,6 +34,10 @@ config_struct! {
/// The clear-net P2P config.
pub clear_net: ClearNetConfig,
#[child = true]
/// The tor-net P2P config.
pub tor_net: TorNetConfig,
#[child = true]
/// Block downloader config.
///
@@ -147,9 +159,10 @@ config_struct! {
/// Examples | 0.0, 0.5, 0.123, 0.999, 1.0
pub gray_peers_percent: f64,
/// The port to use to accept incoming IPv4 P2P connections.
/// The port bind to this network zone.
///
/// Setting this to 0 will disable incoming P2P connections.
/// This port will be bind to if the incoming P2P
/// server for this zone has been enabled.
///
/// Type | Number
/// Valid values | 0..65534
@@ -165,6 +178,17 @@ config_struct! {
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct ClearNetConfig {
/// Enable IPv4 inbound server.
///
/// The inbound server will listen on port `p2p.clear_net.p2p_port`.
/// Setting this to `false` will disable incoming IPv4 P2P connections.
///
/// Type | boolean
/// Valid values | false, true
/// Examples | false
pub enable_inbound: bool,
/// The IPv4 address to bind and listen for connections on.
///
/// Type | IPv4 address
@@ -173,6 +197,7 @@ config_struct! {
/// Enable IPv6 inbound server.
///
/// The inbound server will listen on port `p2p.clear_net.p2p_port`.
/// Setting this to `false` will disable incoming IPv6 P2P connections.
///
/// Type | boolean
@@ -185,20 +210,83 @@ config_struct! {
/// Type | IPv6 address
/// Examples | "::", "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
pub listen_on_v6: Ipv6Addr,
#[comment_out = true]
/// The proxy to use for outgoing P2P connections
///
/// This setting can only take "Tor" at the moment.
/// This will anonymise clearnet connections through Tor.
///
/// Setting this to "" (an empty string) will disable the proxy.
///
/// Enabling this setting will disable inbound connections.
///
/// Type | String
/// Valid values | "Tor"
/// Examples | "Tor"
pub proxy: ProxySettings,
}
/// The config values for P2P tor.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct TorNetConfig {
#[comment_out = true]
/// Enable the Tor P2P network.
///
/// Type | boolean
/// Valid values | false, true
/// Examples | false
pub enabled: bool,
#[comment_out = true]
/// Enable Tor inbound onion server.
///
/// In Arti mode, setting this to `true` will enable Arti's onion service for accepting inbound
/// Tor P2P connections. The keypair and therefore onion address is generated randomly on first run.
///
/// In Daemon mode, setting this to `true` will enable a TCP server listening for inbound connections
/// from your Tor daemon. Refer to the `tor.anonymous_inbound` and `tor.listening_addr` field for onion address
/// and listening configuration.
///
/// The server will listen on port `p2p.tor_net.p2p_port`
///
/// Type | boolean
/// Valid values | false, true
/// Examples | false
pub inbound_onion: bool,
}
}
impl Default for ClearNetConfig {
fn default() -> Self {
Self {
p2p_port: 18080,
enable_inbound: true,
listen_on: Ipv4Addr::UNSPECIFIED,
enable_inbound_v6: false,
listen_on_v6: Ipv6Addr::UNSPECIFIED,
proxy: ProxySettings::Socks(String::new()),
outbound_connections: 32,
extra_outbound_connections: 8,
max_inbound_connections: 128,
gray_peers_percent: 0.7,
address_book_config: AddressBookConfig::default(),
}
}
}
impl Default for TorNetConfig {
fn default() -> Self {
Self {
enabled: false,
inbound_onion: false,
p2p_port: 18080,
outbound_connections: 12,
extra_outbound_connections: 2,
max_inbound_connections: 128,
gray_peers_percent: 0.7,
address_book_config: AddressBookConfig::default(),
}
}

View File

@@ -0,0 +1,133 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
};
use serde::{Deserialize, Serialize};
use cuprate_helper::fs::CUPRATE_DATA_DIR;
use crate::{config::macros::config_struct, tor::TorMode};
config_struct! {
/// Arti config
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
#[allow(rustdoc::broken_intra_doc_links)]
pub struct ArtiConfig {
/// Path to the arti state directory.
///
/// The default directories for each OS:
///
/// | OS | Path |
/// |---------|-----------------------------------------------------|
/// | Windows | "C:\Users\Alice\AppData\Roaming\Cuprate\" |
/// | macOS | "/Users/Alice/Library/Application Support/Cuprate/" |
/// | Linux | "/home/alice/.local/share/cuprate/" |
pub directory_path: PathBuf,
/// Enable isolated circuits for Arti.
///
/// If set, Arti will use different tor circuits for each connections. This can
/// cause stability issues if the connection count is important.
///
/// Type | boolean
/// Valid values | false, true
/// Examples | false
pub isolated_circuit: bool,
/// Enable PoW security for Arti.
///
/// If set, Arti will enforce an EquiX PoW to be resolved for
/// other nodes to complete a rendez-vous request when under
/// heavy load.
///
/// Type | boolean
/// Valid values | false, true
/// Examples | false
pub onion_service_pow: bool,
}
/// Tor config
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
#[allow(rustdoc::broken_intra_doc_links)]
pub struct TorDaemonConfig {
/// The IP address and port of the external Tor daemon to use for outgoing connections.
///
/// Type | Socket address
/// Examples | "[::1]:9050", "127.0.0.1:9050"
pub address: SocketAddr,
#[comment_out = true]
/// Enable inbound connections for Daemon mode
///
/// This string specify the onion address that should be advertized to the Tor network
/// and that your daemon should be expecting connections from.
///
/// When this is set, `p2p.tor_net.p2p_port` is not used for host listening, but as the source
/// port of your hidden service in your torrc configuration file. For setting Cuprate's
/// listening port see `tor.listening_addr` field
///
/// Type | String
/// Valid values | "<56 character domain>.onion"
/// Examples | "monerotoruzizulg5ttgat2emf4d6fbmiea25detrmmy7erypseyteyd.onion"
pub anonymous_inbound: String,
/// The IP address and port to bind and listen on for anonymous inbound connections from Tor Daemon.
///
/// Type | Socket address
/// Examples | "0.0.0.0:18083", "192.168.1.50:2000", "[::]:5000", "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:18082"
pub listening_addr: SocketAddr,
}
/// Tor config
#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
#[allow(rustdoc::broken_intra_doc_links)]
pub struct TorConfig {
#[comment_out = true]
/// Enable Tor network by specifying how to connect to it.
///
/// When "Daemon" is set, the Tor daemon address to use can be
/// specified in `tor.daemon_addr`.
///
/// Type | String
/// Valid values | "Arti", "Daemon", "Off"
/// Examples | "Arti"
pub mode: TorMode,
#[child = true]
/// Arti config
///
/// Only relevant if `tor.mode` is set to "Arti"
pub arti: ArtiConfig,
#[child = true]
/// Tor Daemon config
///
/// Only relevant if `tor.mode` is set to "Daemon"
pub daemon: TorDaemonConfig,
}
}
impl Default for TorDaemonConfig {
fn default() -> Self {
Self {
address: "127.0.0.1:9050".parse().unwrap(),
anonymous_inbound: String::new(),
listening_addr: SocketAddrV4::new(Ipv4Addr::LOCALHOST, 18083).into(),
}
}
}
impl Default for ArtiConfig {
fn default() -> Self {
Self {
directory_path: CUPRATE_DATA_DIR.join("arti"),
isolated_circuit: false,
onion_service_pow: false,
}
}
}

View File

@@ -34,7 +34,10 @@ use cuprate_types::blockchain::BlockchainWriteRequest;
use txpool::IncomingTxHandler;
use crate::{
config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter,
config::Config,
constants::PANIC_CRITICAL_SERVICE_ERROR,
logging::CupratedTracingFilter,
tor::{initialize_tor_if_enabled, TorMode},
};
mod blockchain;
@@ -47,6 +50,7 @@ mod p2p;
mod rpc;
mod signals;
mod statics;
mod tor;
mod txpool;
mod version;
@@ -118,20 +122,23 @@ fn main() {
.await
.unwrap();
// Bootstrap or configure Tor if enabled.
let tor_context = initialize_tor_if_enabled(&config).await;
// Start p2p network zones
let (network_interfaces, tx_handler_subscribers) = p2p::initialize_zones_p2p(
&config,
context_svc.clone(),
blockchain_write_handle.clone(),
blockchain_read_handle.clone(),
txpool_write_handle.clone(),
txpool_read_handle.clone(),
tor_context,
)
.await;
// Create the incoming tx handler service.
let tx_handler = IncomingTxHandler::init(
network_interfaces.clearnet_network_interface.clone(),
network_interfaces.tor_network_interface,
txpool_write_handle.clone(),
txpool_read_handle.clone(),
context_svc.clone(),

View File

@@ -4,14 +4,20 @@
use std::convert::From;
use arti_client::TorClient;
use futures::{FutureExt, TryFutureExt};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::{self, Sender};
use tor_rtcompat::PreferredRuntime;
use tower::{Service, ServiceExt};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::BlockchainContextService;
use cuprate_p2p::{config::TransportConfig, NetworkInterface, P2PConfig};
use cuprate_p2p_core::{client::InternalPeerID, transports::Tcp, ClearNet, NetworkZone, Transport};
use cuprate_p2p_core::{
client::InternalPeerID, transports::Tcp, ClearNet, NetworkZone, Tor, Transport,
};
use cuprate_p2p_transport::{Arti, ArtiClientConfig, Daemon};
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use cuprate_types::blockchain::BlockchainWriteRequest;
@@ -19,6 +25,10 @@ use crate::{
blockchain,
config::Config,
constants::PANIC_CRITICAL_SERVICE_ERROR,
tor::{
transport_arti_config, transport_clearnet_arti_config, transport_daemon_config, TorContext,
TorMode,
},
txpool::{self, IncomingTxHandler},
};
@@ -28,10 +38,20 @@ pub mod request_handler;
pub use network_address::CrossNetworkInternalPeerId;
/// A simple parsing enum for the `p2p.clear_net.proxy` field
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum ProxySettings {
Tor,
#[serde(untagged)]
Socks(String),
}
/// This struct collect all supported and optional network zone interfaces.
pub struct NetworkInterfaces {
/// Mandatory clearnet network interface
pub clearnet_network_interface: NetworkInterface<ClearNet>,
/// Optional tor network interface
pub tor_network_interface: Option<NetworkInterface<Tor>>,
// ...one can dream for more!
}
@@ -39,6 +59,7 @@ impl NetworkInterfaces {
pub const fn new(clearnet_network_interface: NetworkInterface<ClearNet>) -> Self {
Self {
clearnet_network_interface,
tor_network_interface: None,
}
}
}
@@ -48,25 +69,93 @@ impl NetworkInterfaces {
pub async fn initialize_zones_p2p(
config: &Config,
context_svc: BlockchainContextService,
mut blockchain_write_handle: BlockchainWriteHandle,
mut blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
tor_ctx: TorContext,
) -> (NetworkInterfaces, Vec<Sender<IncomingTxHandler>>) {
// Start TCP clearnet P2P.
let (clearnet, incoming_tx_handler_tx) = start_zone_p2p::<ClearNet, Tcp>(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.clearnet_p2p_config(),
(&config.p2p.clear_net).into(),
)
.await
.unwrap();
// Start clearnet P2P.
let (clearnet, incoming_tx_handler_tx) = {
// If proxy is set
match config.p2p.clear_net.proxy {
ProxySettings::Tor => match tor_ctx.mode {
TorMode::Arti => {
tracing::info!("Anonymizing clearnet connections through Arti.");
start_zone_p2p::<ClearNet, Arti>(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.clearnet_p2p_config(),
transport_clearnet_arti_config(&tor_ctx),
)
.await
.unwrap()
}
TorMode::Daemon => {
tracing::error!("Anonymizing clearnet connections through the Tor daemon is not yet supported.");
std::process::exit(0);
}
TorMode::Off => {
tracing::error!("Clearnet proxy set to \"tor\" but Tor is actually off. Please be sure to set a mode in the configuration or command line");
std::process::exit(0);
}
},
ProxySettings::Socks(ref s) => {
if !s.is_empty() {
tracing::error!("Socks proxy is not yet supported.");
std::process::exit(0);
}
start_zone_p2p::<ClearNet, Tcp>(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.clearnet_p2p_config(),
(&config.p2p.clear_net).into(),
)
.await
.unwrap()
}
}
};
// Create network interface collection
let network_interfaces = NetworkInterfaces::new(clearnet);
let tx_handler_subscribers = vec![incoming_tx_handler_tx];
let mut network_interfaces = NetworkInterfaces::new(clearnet);
let mut tx_handler_subscribers = vec![incoming_tx_handler_tx];
// Start Tor P2P (if enabled)
let tor = if config.p2p.tor_net.enabled {
match tor_ctx.mode {
TorMode::Off => None,
TorMode::Daemon => Some(
start_zone_p2p::<Tor, Daemon>(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.tor_p2p_config(&tor_ctx),
transport_daemon_config(config),
)
.await
.unwrap(),
),
TorMode::Arti => Some(
start_zone_p2p::<Tor, Arti>(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.tor_p2p_config(&tor_ctx),
transport_arti_config(config, tor_ctx),
)
.await
.unwrap(),
),
}
} else {
None
};
if let Some((tor, incoming_tx_handler_tx)) = tor {
network_interfaces.tor_network_interface = Some(tor);
tx_handler_subscribers.push(incoming_tx_handler_tx);
}
(network_interfaces, tx_handler_subscribers)
}

View File

@@ -386,12 +386,14 @@ where
return Ok(ProtocolResponse::NA);
}
let state = if request.dandelionpp_fluff {
let addr = peer_information.id.into();
let anon_zone = matches!(addr, CrossNetworkInternalPeerId::Tor(_));
let state = if request.dandelionpp_fluff && !anon_zone {
TxState::Fluff
} else {
TxState::Stem {
from: peer_information.id.into(),
}
TxState::Stem { from: addr }
};
// Drop all the data except the stuff we still need.

View File

@@ -0,0 +1,197 @@
//! Tor initialization
//!
//! Extract configuration and initialize Arti.
//---------------------------------------------------------------------------------------------------- Imports
use std::{default, sync::Arc};
use arti_client::{
config::{onion_service::OnionServiceConfigBuilder, CfgPath, TorClientConfigBuilder},
KeystoreSelector, StreamPrefs, TorClient, TorClientBuilder, TorClientConfig,
};
use futures::Stream;
use serde::{Deserialize, Serialize};
use tor_hsservice::{OnionService, RendRequest, RunningOnionService};
use tor_persist::hsnickname::HsNickname;
use tor_rtcompat::PreferredRuntime;
use tracing::info;
use cuprate_helper::fs::CUPRATE_DATA_DIR;
use cuprate_p2p::TransportConfig;
use cuprate_p2p_core::{ClearNet, Tor};
use cuprate_p2p_transport::{
Arti, ArtiClientConfig, ArtiServerConfig, Daemon, DaemonClientConfig, DaemonServerConfig,
};
use cuprate_wire::OnionAddr;
use crate::{config::Config, p2p::ProxySettings};
//---------------------------------------------------------------------------------------------------- Initialization
#[derive(Clone, Default, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
/// Describe if Tor is enabled and how
pub enum TorMode {
/// Use of the [`arti_client`] library.
Arti,
/// Use of external tor daemon
Daemon,
#[default]
/// Tor is disabled
Off,
}
/// Contains the necessary Tor configuration or structures
/// for initializing P2P.
pub struct TorContext {
/// Which mode are we using.
pub mode: TorMode,
// -------- Only in Arti mode
/// Arti bootstrapped [`TorClient`].
pub bootstrapped_client: Option<TorClient<PreferredRuntime>>,
/// Arti bootstrapped client config
pub arti_client_config: Option<TorClientConfig>,
/// Arti onion service address.
pub arti_onion_service: Option<OnionService>,
}
/// Initialize the Tor network if enabled in configuration
///
/// This function will bootstrap Arti if needed by Tor network zone or
/// clearnet as a proxy.
pub async fn initialize_tor_if_enabled(config: &Config) -> TorContext {
let mode = config.tor.mode;
let anonymize_clearnet = matches!(config.p2p.clear_net.proxy, ProxySettings::Tor);
// Start Arti client
let (bootstrapped_client, arti_client_config) =
if mode == TorMode::Arti && (config.p2p.tor_net.enabled || anonymize_clearnet) {
Some(initialize_arti_client(config).await)
} else {
None
}
.unzip();
// Start Arti onion service
let arti_onion_service = arti_client_config
.as_ref()
.map(|client_config| initialize_arti_onion_service(client_config, config));
TorContext {
mode,
bootstrapped_client,
arti_client_config,
arti_onion_service,
}
}
/// Initialize Arti Tor client.
async fn initialize_arti_client(config: &Config) -> (TorClient<PreferredRuntime>, TorClientConfig) {
// Configuration
let mut tor_config = TorClientConfig::builder();
// Storage
tor_config
.storage()
.state_dir(CfgPath::new_literal(config.tor.arti.directory_path.clone()));
let tor_config = tor_config
.build()
.expect("Failed to build Tor client configuration.");
// Bootstrapping
info!("Bootstrapping Arti's TorClient...");
let mut tor_client = TorClient::builder()
.config(tor_config.clone())
.create_bootstrapped()
.await
.inspect_err(|err| tracing::error!("Unable to bootstrap arti: {err}"))
.unwrap();
// Isolation
if config.tor.arti.isolated_circuit {
let mut stream_prefs = StreamPrefs::new();
stream_prefs.isolate_every_stream();
tor_client.set_stream_prefs(stream_prefs);
}
(tor_client, tor_config)
}
fn initialize_arti_onion_service(client_config: &TorClientConfig, config: &Config) -> OnionService {
let onion_svc_config = OnionServiceConfigBuilder::default()
.enable_pow(config.tor.arti.onion_service_pow)
.nickname(HsNickname::new("cuprate".into()).unwrap())
.build()
.unwrap();
TorClient::<PreferredRuntime>::create_onion_service(client_config, onion_svc_config)
.expect("Unable to start Arti onion service.")
}
//---------------------------------------------------------------------------------------------------- Transport configuration
pub fn transport_arti_config(config: &Config, ctx: TorContext) -> TransportConfig<Tor, Arti> {
// Extracting
let (Some(bootstrapped_client), Some(client_config)) =
(ctx.bootstrapped_client, ctx.arti_client_config)
else {
panic!("Arti client should be initialized");
};
let server_config = config.p2p.tor_net.inbound_onion.then(|| {
let Some(onion_svc) = ctx.arti_onion_service else {
panic!("inbound onion enabled, but no onion service initialized!");
};
ArtiServerConfig::new(
onion_svc,
config.p2p.tor_net.p2p_port,
&bootstrapped_client,
&client_config,
)
});
TransportConfig::<Tor, Arti> {
client_config: ArtiClientConfig {
client: bootstrapped_client,
},
server_config,
}
}
pub fn transport_clearnet_arti_config(ctx: &TorContext) -> TransportConfig<ClearNet, Arti> {
let Some(bootstrapped_client) = &ctx.bootstrapped_client else {
panic!("Arti enabled but no TorClient initialized!");
};
TransportConfig::<ClearNet, Arti> {
client_config: ArtiClientConfig {
client: bootstrapped_client.clone(),
},
server_config: None,
}
}
pub fn transport_daemon_config(config: &Config) -> TransportConfig<Tor, Daemon> {
let mut invalid_onion = false;
if config.p2p.tor_net.inbound_onion && config.tor.daemon.anonymous_inbound.is_empty() {
invalid_onion = true;
tracing::warn!("Onion inbound is enabled yet no onion host has been defined in configuration. Inbound server disabled.");
}
TransportConfig::<Tor, Daemon> {
client_config: DaemonClientConfig {
tor_daemon: config.tor.daemon.address,
},
server_config: (config.p2p.tor_net.inbound_onion && !invalid_onion).then_some(
DaemonServerConfig {
ip: config.tor.daemon.listening_addr.ip(),
port: config.tor.daemon.listening_addr.port(),
},
),
}
}

View File

@@ -1,10 +1,17 @@
use std::time::Duration;
use std::{
task::{ready, Poll},
time::Duration,
};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::{Service, ServiceExt};
use cuprate_dandelion_tower::{
pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph,
pool::DandelionPoolService, traits::StemRequest, DandelionConfig, DandelionRouteReq,
DandelionRouter, DandelionRouterError, Graph, State, TxState,
};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone, Tor};
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use crate::{
@@ -12,10 +19,13 @@ use crate::{
txpool::incoming_tx::{DandelionTx, TxId},
};
mod anon_net_service;
mod diffuse_service;
mod stem_service;
mod tx_store;
pub use anon_net_service::AnonTxService;
/// The configuration used for [`cuprate_dandelion_tower`].
///
/// TODO: should we expose this to users of cuprated? probably not.
@@ -27,17 +37,74 @@ const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
};
/// A [`DandelionRouter`] with all generic types defined.
type ConcreteDandelionRouter = DandelionRouter<
stem_service::OutboundPeerStream,
diffuse_service::DiffuseService,
pub(super) type ConcreteDandelionRouter<Z> = DandelionRouter<
stem_service::OutboundPeerStream<Z>,
diffuse_service::DiffuseService<Z>,
CrossNetworkInternalPeerId,
stem_service::StemPeerService<ClearNet>,
stem_service::StemPeerService<Z>,
DandelionTx,
>;
/// The dandelion router used to send transactions to the network.
pub(super) struct MainDandelionRouter {
clearnet_router: ConcreteDandelionRouter<ClearNet>,
tor_router: Option<AnonTxService<Tor>>,
}
impl MainDandelionRouter {
pub const fn new(
clearnet_router: ConcreteDandelionRouter<ClearNet>,
tor_router: Option<AnonTxService<Tor>>,
) -> Self {
Self {
clearnet_router,
tor_router,
}
}
}
impl Service<DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>> for MainDandelionRouter {
type Response = State;
type Error = DandelionRouterError;
type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(tor_router) = self.tor_router.as_mut() {
ready!(tor_router.poll_ready(cx))?;
}
self.clearnet_router.poll_ready(cx)
}
fn call(
&mut self,
req: DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>,
) -> Self::Future {
// TODO: is this the best way to use anonymity networks?
if req.state == TxState::Local {
if let Some(tor_router) = self.tor_router.as_mut() {
if let Some(mut peer) = tor_router.peer.take() {
tracing::debug!("routing tx over Tor");
return peer
.call(StemRequest(req.tx))
.map_ok(|_| State::Stem)
.map_err(DandelionRouterError::PeerError)
.boxed();
}
tracing::warn!(
"failed to route tx over Tor, no connections, falling back to Clearnet"
);
}
}
self.clearnet_router.call(req)
}
}
/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast.
pub fn start_dandelion_pool_manager(
router: ConcreteDandelionRouter,
router: MainDandelionRouter,
txpool_read_handle: TxpoolReadHandle,
txpool_write_handle: TxpoolWriteHandle,
) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
@@ -54,12 +121,17 @@ pub fn start_dandelion_pool_manager(
}
/// Creates a [`DandelionRouter`] from a [`NetworkInterface`].
pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandelionRouter {
pub fn dandelion_router<Z: NetworkZone>(
network_interface: NetworkInterface<Z>,
) -> ConcreteDandelionRouter<Z>
where
InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
{
DandelionRouter::new(
diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(),
clear_net_broadcast_service: network_interface.broadcast_svc(),
},
stem_service::OutboundPeerStream::new(clear_net),
stem_service::OutboundPeerStream::<Z>::new(network_interface),
DANDELION_CONFIG,
)
}

View File

@@ -0,0 +1,68 @@
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::{Stream, StreamExt, TryStream};
use tower::Service;
use cuprate_dandelion_tower::{DandelionRouterError, OutboundPeer};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::{client::InternalPeerID, NetworkZone};
use crate::{
p2p::CrossNetworkInternalPeerId,
txpool::dandelion::stem_service::{OutboundPeerStream, StemPeerService},
};
/// The service to prepare peers on anonymous network zones for sending transactions.
pub struct AnonTxService<Z: NetworkZone> {
outbound_peer_discover: Pin<Box<OutboundPeerStream<Z>>>,
pub peer: Option<StemPeerService<Z>>,
}
impl<Z: NetworkZone> AnonTxService<Z>
where
InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
{
pub fn new(network_interface: NetworkInterface<Z>) -> Self {
Self {
outbound_peer_discover: Box::pin(OutboundPeerStream::new(network_interface)),
peer: None,
}
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DandelionRouterError>> {
loop {
if let Some(peer) = &mut self.peer {
if ready!(peer.poll_ready(cx)).is_err() {
self.peer = None;
continue;
}
return Poll::Ready(Ok(()));
}
let ret = ready!(self
.outbound_peer_discover
.as_mut()
.try_poll_next(cx)
.map_err(DandelionRouterError::OutboundPeerStreamError))
.ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??;
match ret {
OutboundPeer::Peer(_, mut svc) => {
let poll = svc.poll_ready(cx);
self.peer = Some(svc);
if ready!(poll).is_err() {
self.peer = None;
}
}
OutboundPeer::Exhausted => return Poll::Ready(Ok(())),
}
}
Poll::Ready(Ok(()))
}
}

View File

@@ -8,16 +8,16 @@ use tower::Service;
use cuprate_dandelion_tower::traits::DiffuseRequest;
use cuprate_p2p::{BroadcastRequest, BroadcastSvc};
use cuprate_p2p_core::ClearNet;
use cuprate_p2p_core::{ClearNet, NetworkZone};
use crate::txpool::dandelion::DandelionTx;
/// The dandelion diffusion service.
pub struct DiffuseService {
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
pub struct DiffuseService<N: NetworkZone> {
pub clear_net_broadcast_service: BroadcastSvc<N>,
}
impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
impl<N: NetworkZone> Service<DiffuseRequest<DandelionTx>> for DiffuseService<N> {
type Response = ();
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;

View File

@@ -12,38 +12,39 @@ use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest, Tor,
};
use cuprate_wire::protocol::NewTransactions;
use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
clear_net: NetworkInterface<ClearNet>,
state: OutboundPeerStreamState,
pub struct OutboundPeerStream<Z: NetworkZone> {
network_interface: NetworkInterface<Z>,
state: OutboundPeerStreamState<Z>,
}
impl OutboundPeerStream {
pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
impl<Z: NetworkZone> OutboundPeerStream<Z> {
pub const fn new(network_interface: NetworkInterface<Z>) -> Self {
Self {
clear_net,
network_interface,
state: OutboundPeerStreamState::Standby,
}
}
}
impl Stream for OutboundPeerStream {
type Item = Result<
OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
tower::BoxError,
>;
impl<Z: NetworkZone> Stream for OutboundPeerStream<Z>
where
InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
{
type Item =
Result<OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<Z>>, tower::BoxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
OutboundPeerStreamState::Standby => {
let peer_set = self.clear_net.peer_set();
let peer_set = self.network_interface.peer_set();
let res = ready!(peer_set.poll_ready(cx));
self.state = OutboundPeerStreamState::AwaitingPeer(
@@ -61,10 +62,9 @@ impl Stream for OutboundPeerStream {
};
match stem_peer {
Some(peer) => OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(peer.info.id),
StemPeerService(peer),
),
Some(peer) => {
OutboundPeer::Peer(peer.info.id.into(), StemPeerService(peer))
}
None => OutboundPeer::Exhausted,
}
})));
@@ -75,11 +75,11 @@ impl Stream for OutboundPeerStream {
}
/// The state of the [`OutboundPeerStream`].
enum OutboundPeerStreamState {
enum OutboundPeerStreamState<Z: NetworkZone> {
/// Standby state.
Standby,
/// Awaiting a response from the peer-set.
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<Z>, tower::BoxError>>),
}
/// The stem service, used to send stem txs.

View File

@@ -21,7 +21,7 @@ use cuprate_dandelion_tower::{
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_p2p_core::{ClearNet, Tor};
use cuprate_txpool::{
service::{
interface::{
@@ -39,7 +39,7 @@ use crate::{
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
txpool::{
dandelion,
dandelion::{self, AnonTxService, ConcreteDandelionRouter, MainDandelionRouter},
relay_rules::{check_tx_relay_rules, RelayRuleError},
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
},
@@ -105,12 +105,16 @@ impl IncomingTxHandler {
#[expect(clippy::significant_drop_tightening)]
pub fn init(
clear_net: NetworkInterface<ClearNet>,
tor_net: Option<NetworkInterface<Tor>>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockchainContextService,
blockchain_read_handle: BlockchainReadHandle,
) -> Self {
let dandelion_router = dandelion::dandelion_router(clear_net);
let clearnet_router = dandelion::dandelion_router(clear_net);
let tor_router = tor_net.map(AnonTxService::new);
let dandelion_router = MainDandelionRouter::new(clearnet_router, tor_router);
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
dandelion_router,

View File

@@ -303,7 +303,7 @@ mod tests {
let entry = ChainEntry {
ids,
peer: InternalPeerID::Unknown(1),
peer: InternalPeerID::Unknown([1; 16]),
handle: handle.1
};
@@ -335,7 +335,7 @@ mod tests {
let entries = (0..len).map(|i| {
ChainEntry {
ids: vec![HASHES.get(i).copied().unwrap_or_default()],
peer: InternalPeerID::Unknown(1),
peer: InternalPeerID::Unknown([1; 16]),
handle: handle.1.clone()
}
}).collect();
@@ -369,7 +369,7 @@ mod tests {
let handle = HandleBuilder::new().build();
let entry = ChainEntry {
ids: HASHES[hashes_start_height..(hashes_start_height + len)].to_vec(),
peer: InternalPeerID::Unknown(1),
peer: InternalPeerID::Unknown([1; 16]),
handle: handle.1
};

View File

@@ -83,7 +83,8 @@ ignore = [
#{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" },
# TODO: check this is sorted before a beta release.
{ id = "RUSTSEC-2024-0436", reason = "`paste` unmaintained, not necessarily vulnerable yet." }
{ id = "RUSTSEC-2024-0436", reason = "`paste` unmaintained, not necessarily vulnerable yet." },
{ id = "RUSTSEC-2023-0071", reason = "rsa is used in arti." }
]
# If this is true, then cargo deny will use the git executable to fetch advisory database.
# If this is false, then it uses a built-in git library.
@@ -101,6 +102,7 @@ ignore = [
allow = [
# Nothing required - free to use without permission.
"CC0-1.0", # https://creativecommons.org/publicdomain/zero/1.0/
"Unlicense",
# Must include copyright notice.
"BSD-2-Clause", # https://tldrlegal.com/license/bsd-2-clause-license-(freebsd)

View File

@@ -14,7 +14,7 @@ pub use paste::paste;
///
/// struct Example {
/// a: u8
/// }
/// }
///
/// epee_object!(
/// Example,
@@ -132,6 +132,7 @@ macro_rules! epee_object {
use super::*;
#[derive(Default)]
#[allow(clippy::empty_structs_with_brackets)]
pub struct [<__Builder $obj>] {
$($field: Option<cuprate_epee_encoding::epee_object!(@internal_field_type $ty, $($ty_as)?)>,)*
$($flat_field: <$flat_ty as cuprate_epee_encoding::EpeeObject>::Builder,)*

View File

@@ -18,6 +18,7 @@ cuprate-types = { workspace = true, default-features = false, features
cuprate-helper = { workspace = true, default-features = false, features = ["map"] }
bitflags = { workspace = true, features = ["std"] }
borsh = { workspace = true, features = ["derive"] }
bytes = { workspace = true, features = ["std"] }
thiserror = { workspace = true }

View File

@@ -8,12 +8,13 @@ use std::{
str::{self, FromStr},
};
use borsh::{BorshDeserialize, BorshSerialize};
use thiserror::Error;
use super::{NetworkAddress, NetworkAddressIncorrectZone};
/// A v3, `Copy`able onion address.
#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash, BorshSerialize, BorshDeserialize)]
pub struct OnionAddr {
/// 56 characters encoded onion v3 domain without the .onion suffix
/// <https://spec.torproject.org/rend-spec/encoding-onion-addresses.html>

View File

@@ -12,7 +12,7 @@ use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
};
use tokio_util::time::DelayQueue;
use tokio_util::time::{delay_queue, DelayQueue};
use tower::{Service, ServiceExt};
use crate::{
@@ -41,6 +41,7 @@ pub struct DandelionPoolManager<P, R, Tx, TxId, PeerId> {
/// Current stem pool embargo timers.
pub(crate) embargo_timers: DelayQueue<TxId>,
pub(crate) embargo_timer_keys: HashMap<TxId, delay_queue::Key>,
/// The distrobution to sample to get embargo timers.
pub(crate) embargo_dist: Exp<f64>,
@@ -68,8 +69,10 @@ where
embargo_timer
);
self.embargo_timers
.insert(tx_id, Duration::from_secs_f64(embargo_timer));
let key = self
.embargo_timers
.insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer));
self.embargo_timer_keys.insert(tx_id, key);
}
/// Stems the tx, setting the stem origin, if it wasn't already set.
@@ -164,9 +167,9 @@ where
// Remove the tx from the maps used during the stem phase.
self.stem_origins.remove(&tx_id);
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
// map. These timers should be relatively short, so it shouldn't be a problem.
//self.embargo_timers.try_remove(&tx_id);
if let Some(key) = self.embargo_timer_keys.remove(&tx_id) {
self.embargo_timers.try_remove(&key);
}
self.backing_pool
.ready()

View File

@@ -87,6 +87,7 @@ where
routing_set: JoinSet::new(),
stem_origins: HashMap::new(),
embargo_timers: DelayQueue::new(),
embargo_timer_keys: HashMap::new(),
embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
config,
_tx: PhantomData,

View File

@@ -183,8 +183,14 @@ where
.map_err(DandelionRouterError::OutboundPeerStreamError))
.ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
{
OutboundPeer::Peer(key, svc) => {
self.stem_peers.insert(key, svc);
OutboundPeer::Peer(key, mut svc) => {
let poll = svc.poll_ready(cx);
self.stem_peers.insert(key.clone(), svc);
if ready!(poll).is_err() {
self.stem_peers.remove(&key);
}
}
OutboundPeer::Exhausted => {
tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded.");

View File

@@ -28,6 +28,8 @@ rand = { workspace = true, features = ["std", "std_rng"] }
tracing = { workspace = true, features = ["std", "attributes"] }
hex-literal = { workspace = true }
hex = { workspace = true }
borsh = { workspace = true, features = ["derive", "std"], optional = true }
[dev-dependencies]

View File

@@ -40,14 +40,14 @@ pub enum InternalPeerID<A> {
/// A known address.
KnownAddr(A),
/// An unknown address (probably an inbound anonymity network connection).
Unknown(u128),
Unknown([u8; 16]),
}
impl<A: Display> Display for InternalPeerID<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::KnownAddr(addr) => addr.fmt(f),
Self::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")),
Self::Unknown(id) => f.write_str(&format!("Unknown, ID: {}", hex::encode(id))),
}
}
}

View File

@@ -28,7 +28,7 @@ use crate::{
fields(addr = %peer_information.id),
skip_all,
)]
pub async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync>(
pub(super) async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync>(
peer_information: PeerInformation<N::Addr>,
connection_tx: mpsc::Sender<ConnectionTaskRequest>,

View File

@@ -168,9 +168,9 @@ pub trait Transport<Z: NetworkZone>: Clone + Send + 'static {
/// Note: Currently, this client config is considered immutable during operational runtime. If one
/// wish to apply modifications on the fly, they will need to make use of an inner shared and mutable
/// reference to do so.
type ClientConfig: Default + Clone + Debug + Send + Sync + 'static;
type ClientConfig: Clone + Send + Sync + 'static;
/// Server configuration necessary when instantiating a listener for inbound connections.
type ServerConfig: Default + Clone + Debug + Send + Sync + 'static;
type ServerConfig: Send + Sync + 'static;
/// The stream (incoming data) type of this transport method.
type Stream: Stream<Item = Result<Message, BucketError>> + Unpin + Send + 'static;

View File

@@ -5,7 +5,27 @@ edition = "2021"
license = "MIT"
authors = ["SyntheticBird"]
[features]
default = ["static"]
static = ["arti-client/static"]
[dependencies]
cuprate-p2p-core = { workspace = true }
cuprate-wire = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true, features = ["net"] }
tokio-socks = { workspace = true, features = ["tokio"] }
tokio-util = { workspace = true, features = ["codec"] }
arti-client = { workspace = true, features = ["tokio", "native-tls", "onion-service-client", "onion-service-service", "experimental-api", "static-sqlite"] }
tor-config-path = { workspace = true }
tor-cell = { workspace = true }
tor-hsservice = { workspace = true }
tor-proto = { workspace = true }
tor-rtcompat = { workspace = true }
tracing = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,221 @@
//! Arti Transport
//!
//! This module defines a transport method for the `Tor` network zone using the `arti_client` library.
//!
//---------------------------------------------------------------------------------------------------- Imports
use std::{
io::{self, ErrorKind},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use arti_client::{DataReader, DataWriter, TorClient, TorClientConfig};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tokio_util::codec::{FramedRead, FramedWrite};
use tor_cell::relaycell::msg::Connected;
use tor_config_path::CfgPathResolver;
use tor_hsservice::{handle_rend_requests, OnionService, RunningOnionService};
use tor_proto::stream::IncomingStreamRequest;
use tor_rtcompat::PreferredRuntime;
use cuprate_p2p_core::{ClearNet, NetworkZone, Tor, Transport};
use cuprate_wire::MoneroWireCodec;
use crate::DisabledListener;
//---------------------------------------------------------------------------------------------------- Configuration
#[derive(Clone)]
pub struct ArtiClientConfig {
/// Arti bootstrapped client
pub client: TorClient<PreferredRuntime>,
}
pub struct ArtiServerConfig {
/// Arti onion service
pub onion_svc: OnionService,
/// Listening port
pub port: u16,
// Mandatory resources for launching the onion service
client: TorClient<PreferredRuntime>,
path_resolver: Arc<CfgPathResolver>,
}
impl ArtiServerConfig {
pub fn new(
onion_svc: OnionService,
port: u16,
client: &TorClient<PreferredRuntime>,
config: &TorClientConfig,
) -> Self {
let path_resolver: &CfgPathResolver = config.as_ref();
Self {
onion_svc,
port,
client: client.clone(),
path_resolver: Arc::new(path_resolver.clone()),
}
}
}
//---------------------------------------------------------------------------------------------------- Transport
type PinnedStream<I> = Pin<Box<dyn Stream<Item = I> + Send>>;
/// An onion service listening for incoming peer connections.
pub struct OnionListener {
/// A handle to the onion service instance.
_onion_svc: Arc<RunningOnionService>,
/// A modified stream that produce a data stream and sink from rendez-vous requests.
listener: PinnedStream<Result<(DataReader, DataWriter), io::Error>>,
}
impl Stream for OnionListener {
type Item = Result<
(
Option<<Tor as NetworkZone>::Addr>,
FramedRead<DataReader, MoneroWireCodec>,
FramedWrite<DataWriter, MoneroWireCodec>,
),
io::Error,
>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.listener.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(req) => Poll::Ready(req.map(|r| {
r.map(|(stream, sink)| {
(
None, // Inbound is anonymous
FramedRead::new(stream, MoneroWireCodec::default()),
FramedWrite::new(sink, MoneroWireCodec::default()),
)
})
})),
}
}
}
#[derive(Clone, Copy)]
pub struct Arti;
#[async_trait]
impl Transport<Tor> for Arti {
type ClientConfig = ArtiClientConfig;
type ServerConfig = ArtiServerConfig;
type Stream = FramedRead<DataReader, MoneroWireCodec>;
type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
type Listener = OnionListener;
async fn connect_to_peer(
addr: <Tor as NetworkZone>::Addr,
config: &Self::ClientConfig,
) -> Result<(Self::Stream, Self::Sink), io::Error> {
config
.client
.connect((addr.addr_string(), addr.port()))
.await
.map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
.map(|stream| {
let (stream, sink) = stream.split();
(
FramedRead::new(stream, MoneroWireCodec::default()),
FramedWrite::new(sink, MoneroWireCodec::default()),
)
})
}
async fn incoming_connection_listener(
config: Self::ServerConfig,
) -> Result<Self::Listener, io::Error> {
tracing::info!(
"Listening for incoming Tor P2P connections on address: {}:{}",
config.onion_svc.onion_address().unwrap(),
config.port
);
// Launch onion service
#[expect(clippy::clone_on_ref_ptr)]
let (svc, rdv_stream) = config
.onion_svc
.launch(
config.client.runtime().clone(),
config.client.dirmgr().clone(),
config.client.hs_circ_pool().clone(),
config.path_resolver,
)
.unwrap();
// Accept all rendez-vous and await correct stream request
let req_stream = handle_rend_requests(rdv_stream).then(move |sreq| async move {
match sreq.request() {
// As specified in: <https://spec.torproject.org/rend-spec/managing-streams.html>
//
// A client that wishes to open a data stream with us needs to send a BEGIN message with an empty address
// and no flags. We additionally filter requests to the correct port configured and advertised on P2P.
IncomingStreamRequest::Begin(r)
if r.port() == config.port && r.addr().is_empty() && r.flags().is_empty() =>
{
let stream = sreq
.accept(Connected::new_empty())
.await
.map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e.to_string()))?;
Ok(stream.split())
}
_ => {
sreq.shutdown_circuit()
.expect("Should never panic, unless programming error from arti's end.");
Err(io::Error::other("Received invalid command"))
}
}
});
Ok(OnionListener {
_onion_svc: svc,
listener: Box::pin(req_stream),
})
}
}
#[async_trait]
impl Transport<ClearNet> for Arti {
type ClientConfig = ArtiClientConfig;
type ServerConfig = ();
type Stream = FramedRead<DataReader, MoneroWireCodec>;
type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
type Listener = DisabledListener<ClearNet, DataReader, DataWriter>;
async fn connect_to_peer(
addr: <ClearNet as NetworkZone>::Addr,
config: &Self::ClientConfig,
) -> Result<(Self::Stream, Self::Sink), io::Error> {
config
.client
.connect(addr.to_string())
.await
.map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
.map(|stream| {
let (stream, sink) = stream.split();
(
FramedRead::new(stream, MoneroWireCodec::default()),
FramedWrite::new(sink, MoneroWireCodec::default()),
)
})
}
async fn incoming_connection_listener(
_config: Self::ServerConfig,
) -> Result<Self::Listener, io::Error> {
panic!("In anonymized clearnet mode, inbound is disabled!");
}
}

View File

@@ -0,0 +1,35 @@
use std::{
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
use cuprate_p2p_core::NetworkZone;
use cuprate_wire::MoneroWireCodec;
/// In proxied clearnet mode, inbound is disabled.
pub struct DisabledListener<Z: NetworkZone, R, W> {
_zone: PhantomData<Z>,
_reader: PhantomData<R>,
_writer: PhantomData<W>,
}
impl<Z: NetworkZone, R, W> Stream for DisabledListener<Z, R, W> {
type Item = Result<
(
Option<Z::Addr>,
FramedRead<R, MoneroWireCodec>,
FramedWrite<W, MoneroWireCodec>,
),
io::Error,
>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Panic within [`Transport::incoming_connection_listener`]
unreachable!()
}
}

View File

@@ -1,3 +1,15 @@
//! ## P2P Transports
//!
//! This crate will welcome additional transport implementation for Cuprate.
//! This crate implement additional transports for Cuprate.
/// Arti library implementation.
mod arti;
pub use arti::{Arti, ArtiClientConfig, ArtiServerConfig};
/// Tor daemon (SOCKS5) implementation
mod tor;
pub use tor::{Daemon, DaemonClientConfig, DaemonServerConfig};
/// Disabled listener
mod disabled;
pub(crate) use disabled::DisabledListener;

View File

@@ -0,0 +1,112 @@
//! Tor Daemon Transport
//!
//! This module defines a transport method for the `Tor` network zone using an external Tor daemon supporting SOCKS5.
//!
//---------------------------------------------------------------------------------------------------- Imports
use std::{
io::{self, ErrorKind},
net::{IpAddr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
use async_trait::async_trait;
use futures::Stream;
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener,
};
use tokio_socks::tcp::Socks5Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
use cuprate_p2p_core::{NetworkZone, Tor, Transport};
use cuprate_wire::MoneroWireCodec;
//---------------------------------------------------------------------------------------------------- Configuration
#[derive(Clone, Copy)]
pub struct DaemonClientConfig {
/// Socket address of the external Tor daemon
pub tor_daemon: SocketAddr,
}
#[derive(Clone, Copy)]
pub struct DaemonServerConfig {
/// Listening IP Address.
pub ip: IpAddr,
/// Listening TCP Port.
pub port: u16,
}
//---------------------------------------------------------------------------------------------------- Transport
/// A simple TCP server waiting for connections from the Tor daemon
pub struct DaemonInboundStream {
listener: TcpListener,
}
impl Stream for DaemonInboundStream {
type Item = Result<
(
Option<<Tor as NetworkZone>::Addr>,
FramedRead<OwnedReadHalf, MoneroWireCodec>,
FramedWrite<OwnedWriteHalf, MoneroWireCodec>,
),
io::Error,
>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.listener
.poll_accept(cx)
.map_ok(|(stream, _)| {
let (stream, sink) = stream.into_split();
(
None, // Inbound is anonymous
FramedRead::new(stream, MoneroWireCodec::default()),
FramedWrite::new(sink, MoneroWireCodec::default()),
)
})
.map(Some)
}
}
#[derive(Clone, Copy)]
pub struct Daemon;
#[async_trait]
impl Transport<Tor> for Daemon {
type ClientConfig = DaemonClientConfig;
type ServerConfig = DaemonServerConfig;
type Stream = FramedRead<OwnedReadHalf, MoneroWireCodec>;
type Sink = FramedWrite<OwnedWriteHalf, MoneroWireCodec>;
type Listener = DaemonInboundStream;
async fn connect_to_peer(
addr: <Tor as NetworkZone>::Addr,
config: &Self::ClientConfig,
) -> Result<(Self::Stream, Self::Sink), io::Error> {
Socks5Stream::connect(config.tor_daemon, addr.to_string())
.await
.map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
.map(|stream| {
let (stream, sink) = stream.into_inner().into_split();
(
FramedRead::new(stream, MoneroWireCodec::default()),
FramedWrite::new(sink, MoneroWireCodec::default()),
)
})
}
async fn incoming_connection_listener(
config: Self::ServerConfig,
) -> Result<Self::Listener, io::Error> {
let listener = TcpListener::bind((config.ip, config.port)).await?;
Ok(DaemonInboundStream { listener })
}
}

View File

@@ -30,7 +30,7 @@ use crate::{
)
)]
#[expect(clippy::used_underscore_binding)]
pub async fn download_batch_task<N: NetworkZone>(
pub(super) async fn download_batch_task<N: NetworkZone>(
client: ClientDropGuard<N>,
ids: ByteArrayVec<32>,
previous_id: [u8; 32],

View File

@@ -79,7 +79,7 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
///
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
#[instrument(level = "error", skip_all)]
pub async fn initial_chain_search<N: NetworkZone, C>(
pub(super) async fn initial_chain_search<N: NetworkZone, C>(
peer_set: &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError>

View File

@@ -424,8 +424,8 @@ mod tests {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown([1; 16])));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown([1; 16])));
// Outbound should get 1 and 3, inbound should get 2 and 3.
@@ -483,8 +483,8 @@ mod tests {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown([1; 16])));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown([1; 16])));
brcst
.ready()
@@ -509,11 +509,11 @@ mod tests {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut outbound_stream_from = pin!(outbound_mkr(InternalPeerID::Unknown(0)));
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown([1; 16])));
let mut outbound_stream_from = pin!(outbound_mkr(InternalPeerID::Unknown([0; 16])));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream_from = pin!(inbound_mkr(InternalPeerID::Unknown(0)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown([1; 16])));
let mut inbound_stream_from = pin!(inbound_mkr(InternalPeerID::Unknown([0; 16])));
brcst
.ready()
@@ -522,7 +522,7 @@ mod tests {
.call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[1]),
direction: None,
received_from: Some(InternalPeerID::Unknown(0)),
received_from: Some(InternalPeerID::Unknown([0; 16])),
})
.await
.unwrap();

View File

@@ -34,7 +34,7 @@ use crate::{
/// Starts the inbound server. This function will listen to all incoming connections
/// and initiate handshake if needed, after verifying the address isn't banned.
#[instrument(level = "warn", skip_all)]
pub async fn inbound_server<Z, T, HS, A>(
pub(super) async fn inbound_server<Z, T, HS, A>(
new_connection_tx: mpsc::Sender<Client<Z>>,
mut handshaker: HS,
mut address_book: A,

View File

@@ -39,7 +39,7 @@ pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
/// You must provide:
/// - A protocol request handler, which is given to each connection
/// - A core sync service, which keeps track of the sync state of our node
#[instrument(level = "debug", name = "net", skip_all, fields(zone = Z::NAME))]
#[instrument(level = "error", name = "net", skip_all, fields(zone = Z::NAME))]
pub async fn initialize_network<Z, T, PR, CS>(
protocol_request_handler_maker: PR,
core_sync_svc: CS,

View File

@@ -107,6 +107,7 @@ macro_rules! define_request_and_response {
///
$( #[$type_attr] )*
///
#[allow(clippy::empty_structs_with_brackets)]
$( #[$request_type_attr] )*
[<$type_name Request>] $(($restricted $(, $empty)?))? {
$(