diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 63bff9462..e086b8e24 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -22,16 +22,16 @@ pub type AcceptorPtr = Arc; pub struct Acceptor { channel_subscriber: SubscriberPtr>, task: StoppableTaskPtr, - //pub session: Mutex>, + pub session: Mutex>, } impl Acceptor { /// Create new Acceptor object. - pub fn new() -> Arc { + pub fn new(session: Mutex>) -> Arc { Arc::new(Self { channel_subscriber: Subscriber::new(), task: StoppableTask::new(), - //session, + session, }) } /// Start accepting inbound socket connections. Creates a listener to start @@ -148,7 +148,8 @@ impl Acceptor { loop { match listener.next().await { Ok((stream, url)) => { - let channel = Channel::new(stream, url).await; + let channel = + Channel::new(stream, url, self.session.lock().await.clone().unwrap()).await; self.channel_subscriber.notify(Ok(channel)).await; } Err(e) => { diff --git a/src/net/channel.rs b/src/net/channel.rs index 2a61491cc..2d1c3c498 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -68,7 +68,7 @@ pub struct Channel { receive_task: StoppableTaskPtr, stopped: Mutex, info: Mutex, - //session: SessionWeakPtr, + session: SessionWeakPtr, } impl Channel { @@ -78,7 +78,7 @@ impl Channel { pub async fn new( stream: Box, address: Url, - //session: SessionWeakPtr, + session: SessionWeakPtr, ) -> Arc { let (reader, writer) = stream.split(); let reader = Mutex::new(reader); @@ -96,7 +96,7 @@ impl Channel { receive_task: StoppableTask::new(), stopped: Mutex::new(false), info: Mutex::new(ChannelInfo::new()), - //session, + session, }) } diff --git a/src/net/connector.rs b/src/net/connector.rs index b51cb4cab..fe544f3c4 100644 --- a/src/net/connector.rs +++ b/src/net/connector.rs @@ -14,15 +14,13 @@ use super::{ /// Create outbound socket connections. pub struct Connector { settings: SettingsPtr, - //pub session: SessionWeakPtr, + pub session: SessionWeakPtr, } impl Connector { /// Create a new connector with default network settings. - pub fn new(settings: SettingsPtr, // session: SessionWeakPtr - ) -> Self { - Self { settings, // session - } + pub fn new(settings: SettingsPtr, session: SessionWeakPtr) -> Self { + Self { settings, session } } /// Establish an outbound connection. @@ -58,10 +56,13 @@ impl Connector { let channel = match $upgrade { // session - None => Channel::new(Box::new(stream?), connect_url.clone()).await, + None => { + Channel::new(Box::new(stream?), connect_url.clone(), self.session.clone()) + .await + } Some(u) if u == "tls" => { let stream = $transport.upgrade_dialer(stream?)?.await; - Channel::new(Box::new(stream?), connect_url).await + Channel::new(Box::new(stream?), connect_url, self.session.clone()).await } Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)), }; diff --git a/src/net/p2p.rs b/src/net/p2p.rs index a9ec253f7..a72137428 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -94,7 +94,7 @@ impl P2p { let parent = Arc::downgrade(&self_); *self_.session_manual.lock().await = Some(ManualSession::new(parent.clone())); - *self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone())); + *self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()).await); *self_.session_outbound.lock().await = Some(OutboundSession::new(parent)); register_default_protocols(self_.clone()).await; diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index 4b016171c..b826f402e 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -37,9 +37,8 @@ pub struct InboundSession { impl InboundSession { /// Create a new inbound session. - pub fn new(p2p: Weak) -> Arc { - //let acceptor = Acceptor::new(Mutex::new(None)); - let acceptor = Acceptor::new(); + pub async fn new(p2p: Weak) -> Arc { + let acceptor = Acceptor::new(Mutex::new(None)); let self_ = Arc::new(Self { p2p, @@ -48,9 +47,9 @@ impl InboundSession { connect_infos: Mutex::new(FxHashMap::default()), }); - //let parent = Arc::downgrade(&self_); + let parent = Arc::downgrade(&self_); - //*self_.acceptor.session.lock().await = Some(Arc::new(parent)); + *self_.acceptor.session.lock().await = Some(Arc::new(parent)); self_ } diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 99dab2a83..0580c431e 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -57,10 +57,7 @@ impl ManualSession { executor: Arc>, ) -> Result<()> { let parent = Arc::downgrade(&self); - let connector = Connector::new( - self.p2p().settings(), - //Arc::new(parent) - ); + let connector = Connector::new(self.p2p().settings(), Arc::new(parent)); let settings = self.p2p().settings(); diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 18d5371b8..efcc7bdc6 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -76,8 +76,12 @@ async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { pub trait Session: Sync { /// Registers a new channel with the session. Performs a network handshake /// and starts the channel. + // if we need to pass Self as an Arc we can do so like this: + // pub trait MyTrait: Send + Sync { + // async fn foo(&self, self_: Arc) {} + // } async fn register_channel( - self_: Arc, + &self, channel: ChannelPtr, executor: Arc>, ) -> Result<()> { @@ -87,14 +91,14 @@ pub trait Session: Sync { // We do this so that the protocols can begin receiving and buffering messages // while the handshake protocol is ongoing. // They are currently in sleep mode. - let p2p = self_.p2p(); + let p2p = self.p2p(); let protocols = - p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await; + p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await; // Perform the handshake protocol - let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await; + let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; let handshake_task = - self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); + self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); // Switch on the channel channel.start(executor.clone()); diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 09bed710f..d72cdb8b6 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -137,10 +137,7 @@ impl OutboundSession { ) -> Result<()> { let parent = Arc::downgrade(&self); - let connector = Connector::new( - self.p2p().settings(), - //Arc::new(parent) - ); + let connector = Connector::new(self.p2p().settings(), Arc::new(parent)); loop { let addr = self.load_address(slot_number).await?; diff --git a/src/net/session/seed_session.rs b/src/net/session/seed_session.rs index 6950b76cd..8fba64a0f 100644 --- a/src/net/session/seed_session.rs +++ b/src/net/session/seed_session.rs @@ -105,9 +105,7 @@ impl SeedSession { }; let parent = Arc::downgrade(&self); - let connector = Connector::new( - settings.clone(), //Arc::new(parent) - ); + let connector = Connector::new(settings.clone(), Arc::new(parent)); match connector.connect(seed.clone()).await { Ok(channel) => { // Blacklist goes here