diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 449949281..63bff9462 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -1,4 +1,4 @@ -use async_std::sync::Arc; +use async_std::sync::{Arc, Mutex}; use std::{env, fs}; use log::{error, info}; @@ -11,7 +11,8 @@ use crate::{ }; use super::{ - Channel, ChannelPtr, TcpTransport, TorTransport, Transport, TransportListener, TransportName, + Channel, ChannelPtr, SessionWeakPtr, TcpTransport, TorTransport, Transport, TransportListener, + TransportName, }; /// Atomic pointer to Acceptor class. @@ -21,12 +22,17 @@ pub type AcceptorPtr = Arc; pub struct Acceptor { channel_subscriber: SubscriberPtr>, task: StoppableTaskPtr, + //pub session: Mutex>, } impl Acceptor { /// Create new Acceptor object. pub fn new() -> Arc { - Arc::new(Self { channel_subscriber: Subscriber::new(), task: StoppableTask::new() }) + Arc::new(Self { + channel_subscriber: Subscriber::new(), + task: StoppableTask::new(), + //session, + }) } /// Start accepting inbound socket connections. Creates a listener to start /// listening on a local socket address. Then runs an accept loop in a new @@ -91,7 +97,7 @@ impl Acceptor { "Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \ For example: \ \'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'".to_string(), - )) + )); } let auth_cookie = auth_cookie.unwrap(); diff --git a/src/net/channel.rs b/src/net/channel.rs index 8c26da6a4..2a61491cc 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -19,7 +19,7 @@ use crate::{ use super::{ message, message_subscriber::{MessageSubscription, MessageSubsystem}, - TransportStream, + SessionWeakPtr, TransportStream, }; /// Atomic pointer to async channel. @@ -68,13 +68,18 @@ pub struct Channel { receive_task: StoppableTaskPtr, stopped: Mutex, info: Mutex, + //session: SessionWeakPtr, } impl Channel { /// Sets up a new channel. Creates a reader and writer TCP stream and /// summons the message subscriber subsystem. Performs a network /// handshake on the subsystem dispatchers. - pub async fn new(stream: Box, address: Url) -> Arc { + pub async fn new( + stream: Box, + address: Url, + //session: SessionWeakPtr, + ) -> Arc { let (reader, writer) = stream.split(); let reader = Mutex::new(reader); let writer = Mutex::new(writer); @@ -91,6 +96,7 @@ impl Channel { receive_task: StoppableTask::new(), stopped: Mutex::new(false), info: Mutex::new(ChannelInfo::new()), + //session, }) } diff --git a/src/net/connector.rs b/src/net/connector.rs index 0bd16c631..b51cb4cab 100644 --- a/src/net/connector.rs +++ b/src/net/connector.rs @@ -7,18 +7,22 @@ use url::Url; use crate::{Error, Result}; use super::{ - Channel, ChannelPtr, SettingsPtr, TcpTransport, TorTransport, Transport, TransportName, + Channel, ChannelPtr, SessionWeakPtr, SettingsPtr, TcpTransport, TorTransport, Transport, + TransportName, }; /// Create outbound socket connections. pub struct Connector { settings: SettingsPtr, + //pub session: SessionWeakPtr, } impl Connector { /// Create a new connector with default network settings. - pub fn new(settings: SettingsPtr) -> Self { - Self { settings } + pub fn new(settings: SettingsPtr, // session: SessionWeakPtr + ) -> Self { + Self { settings, // session + } } /// Establish an outbound connection. @@ -53,6 +57,7 @@ impl Connector { } let channel = match $upgrade { + // session None => Channel::new(Box::new(stream?), connect_url.clone()).await, Some(u) if u == "tls" => { let stream = $transport.upgrade_dialer(stream?)?.await; diff --git a/src/net/mod.rs b/src/net/mod.rs index c125d3723..d059884bf 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -96,7 +96,9 @@ pub use message::Message; pub use message_subscriber::MessageSubscription; pub use p2p::{P2p, P2pPtr}; pub use protocol::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr}; -pub use session::{SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED}; +pub use session::{ + SessionWeakPtr, SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED, +}; pub use settings::{Settings, SettingsPtr}; pub use transport::{ TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream, diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index 2f375b497..4b016171c 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -38,14 +38,21 @@ pub struct InboundSession { impl InboundSession { /// Create a new inbound session. pub fn new(p2p: Weak) -> Arc { + //let acceptor = Acceptor::new(Mutex::new(None)); let acceptor = Acceptor::new(); - Arc::new(Self { + let self_ = Arc::new(Self { p2p, acceptor, accept_task: StoppableTask::new(), connect_infos: Mutex::new(FxHashMap::default()), - }) + }); + + //let parent = Arc::downgrade(&self_); + + //*self_.acceptor.session.lock().await = Some(Arc::new(parent)); + + self_ } /// Starts the inbound session. Begins by accepting connections and fails if diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index de741776f..99dab2a83 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -56,7 +56,12 @@ impl ManualSession { addr: Url, executor: Arc>, ) -> Result<()> { - let connector = Connector::new(self.p2p().settings()); + let parent = Arc::downgrade(&self); + let connector = Connector::new( + self.p2p().settings(), + //Arc::new(parent) + ); + let settings = self.p2p().settings(); let attempts = settings.manual_attempt_limit; diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 6fcb125c3..18d5371b8 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use async_trait::async_trait; use log::debug; @@ -46,6 +46,8 @@ pub use manual_session::ManualSession; pub use outbound_session::OutboundSession; pub use seed_session::SeedSession; +pub type SessionWeakPtr = Arc>; + /// Removes channel from the list of connected channels when a stop signal is /// received. async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { @@ -75,7 +77,7 @@ pub trait Session: Sync { /// Registers a new channel with the session. Performs a network handshake /// and starts the channel. async fn register_channel( - self: Arc, + self_: Arc, channel: ChannelPtr, executor: Arc>, ) -> Result<()> { @@ -85,14 +87,14 @@ pub trait Session: Sync { // We do this so that the protocols can begin receiving and buffering messages // while the handshake protocol is ongoing. // They are currently in sleep mode. - let p2p = self.p2p(); + let p2p = self_.p2p(); let protocols = - p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await; + p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await; // Perform the handshake protocol - let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; + let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await; let handshake_task = - self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); + self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); // Switch on the channel channel.start(executor.clone()); diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 4f7dd0fa3..09bed710f 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -135,7 +135,12 @@ impl OutboundSession { slot_number: u32, executor: Arc>, ) -> Result<()> { - let connector = Connector::new(self.p2p().settings()); + let parent = Arc::downgrade(&self); + + let connector = Connector::new( + self.p2p().settings(), + //Arc::new(parent) + ); loop { let addr = self.load_address(slot_number).await?; diff --git a/src/net/session/seed_session.rs b/src/net/session/seed_session.rs index 06bab8839..6950b76cd 100644 --- a/src/net/session/seed_session.rs +++ b/src/net/session/seed_session.rs @@ -104,7 +104,10 @@ impl SeedSession { (p2p.hosts(), p2p.settings()) }; - let connector = Connector::new(settings.clone()); + let parent = Arc::downgrade(&self); + let connector = Connector::new( + settings.clone(), //Arc::new(parent) + ); match connector.connect(seed.clone()).await { Ok(channel) => { // Blacklist goes here