net: channel hosts a pointer to Session

solved a bug which warned Session could not be made into an object.

documented here: https://github.com/rust-lang/rust/issues/51443
and here: https://stackoverflow.com/questions/72838225/rust-trait-warning-method-references-the-self-type-in-its-where-clause
This commit is contained in:
lunar-mining
2022-07-02 13:16:24 +02:00
parent ab2a5af40b
commit 967f9ec770
9 changed files with 33 additions and 36 deletions

View File

@@ -22,16 +22,16 @@ pub type AcceptorPtr = Arc<Acceptor>;
pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
task: StoppableTaskPtr,
//pub session: Mutex<Option<SessionWeakPtr>>,
pub session: Mutex<Option<SessionWeakPtr>>,
}
impl Acceptor {
/// Create new Acceptor object.
pub fn new() -> Arc<Self> {
pub fn new(session: Mutex<Option<SessionWeakPtr>>) -> Arc<Self> {
Arc::new(Self {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
//session,
session,
})
}
/// Start accepting inbound socket connections. Creates a listener to start
@@ -148,7 +148,8 @@ impl Acceptor {
loop {
match listener.next().await {
Ok((stream, url)) => {
let channel = Channel::new(stream, url).await;
let channel =
Channel::new(stream, url, self.session.lock().await.clone().unwrap()).await;
self.channel_subscriber.notify(Ok(channel)).await;
}
Err(e) => {

View File

@@ -68,7 +68,7 @@ pub struct Channel {
receive_task: StoppableTaskPtr,
stopped: Mutex<bool>,
info: Mutex<ChannelInfo>,
//session: SessionWeakPtr,
session: SessionWeakPtr,
}
impl Channel {
@@ -78,7 +78,7 @@ impl Channel {
pub async fn new(
stream: Box<dyn TransportStream>,
address: Url,
//session: SessionWeakPtr,
session: SessionWeakPtr,
) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
@@ -96,7 +96,7 @@ impl Channel {
receive_task: StoppableTask::new(),
stopped: Mutex::new(false),
info: Mutex::new(ChannelInfo::new()),
//session,
session,
})
}

View File

@@ -14,15 +14,13 @@ use super::{
/// Create outbound socket connections.
pub struct Connector {
settings: SettingsPtr,
//pub session: SessionWeakPtr,
pub session: SessionWeakPtr,
}
impl Connector {
/// Create a new connector with default network settings.
pub fn new(settings: SettingsPtr, // session: SessionWeakPtr
) -> Self {
Self { settings, // session
}
pub fn new(settings: SettingsPtr, session: SessionWeakPtr) -> Self {
Self { settings, session }
}
/// Establish an outbound connection.
@@ -58,10 +56,13 @@ impl Connector {
let channel = match $upgrade {
// session
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
None => {
Channel::new(Box::new(stream?), connect_url.clone(), self.session.clone())
.await
}
Some(u) if u == "tls" => {
let stream = $transport.upgrade_dialer(stream?)?.await;
Channel::new(Box::new(stream?), connect_url).await
Channel::new(Box::new(stream?), connect_url, self.session.clone()).await
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
};

View File

@@ -94,7 +94,7 @@ impl P2p {
let parent = Arc::downgrade(&self_);
*self_.session_manual.lock().await = Some(ManualSession::new(parent.clone()));
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()));
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()).await);
*self_.session_outbound.lock().await = Some(OutboundSession::new(parent));
register_default_protocols(self_.clone()).await;

View File

@@ -37,9 +37,8 @@ pub struct InboundSession {
impl InboundSession {
/// Create a new inbound session.
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
//let acceptor = Acceptor::new(Mutex::new(None));
let acceptor = Acceptor::new();
pub async fn new(p2p: Weak<P2p>) -> Arc<Self> {
let acceptor = Acceptor::new(Mutex::new(None));
let self_ = Arc::new(Self {
p2p,
@@ -48,9 +47,9 @@ impl InboundSession {
connect_infos: Mutex::new(FxHashMap::default()),
});
//let parent = Arc::downgrade(&self_);
let parent = Arc::downgrade(&self_);
//*self_.acceptor.session.lock().await = Some(Arc::new(parent));
*self_.acceptor.session.lock().await = Some(Arc::new(parent));
self_
}

View File

@@ -57,10 +57,7 @@ impl ManualSession {
executor: Arc<Executor<'_>>,
) -> Result<()> {
let parent = Arc::downgrade(&self);
let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
let settings = self.p2p().settings();

View File

@@ -76,8 +76,12 @@ async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
pub trait Session: Sync {
/// Registers a new channel with the session. Performs a network handshake
/// and starts the channel.
// if we need to pass Self as an Arc we can do so like this:
// pub trait MyTrait: Send + Sync {
// async fn foo(&self, self_: Arc<dyn MyTrait>) {}
// }
async fn register_channel(
self_: Arc<dyn Session>,
&self,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
@@ -87,14 +91,14 @@ pub trait Session: Sync {
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self_.p2p();
let p2p = self.p2p();
let protocols =
p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await;
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await;
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let handshake_task =
self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// Switch on the channel
channel.start(executor.clone());

View File

@@ -137,10 +137,7 @@ impl OutboundSession {
) -> Result<()> {
let parent = Arc::downgrade(&self);
let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
loop {
let addr = self.load_address(slot_number).await?;

View File

@@ -105,9 +105,7 @@ impl SeedSession {
};
let parent = Arc::downgrade(&self);
let connector = Connector::new(
settings.clone(), //Arc::new(parent)
);
let connector = Connector::new(settings.clone(), Arc::new(parent));
match connector.connect(seed.clone()).await {
Ok(channel) => {
// Blacklist goes here