net: error[E0038]: Weak pointer to parent trait from child

Provokes the following error:

 error[E0038]: the trait `Session` cannot be made into an object
  --> src/net/channel.rs:72:14
   |
72 |     session: SessionWeakPtr,
   |              ^^^^^^^^^^^^^^ `Session` cannot be made into an object
   |
note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit <https://doc.rust-lang.org/reference/items/traits.html#object-safety>
  --> src/net/session/mod.rs:79:14
   |
76 | pub trait Session: Sync {
   |           ------- this trait cannot be made into an object...
...
79 |     async fn register_channel(
   |              ^^^^^^^^^^^^^^^^ ...because associated function `register_channel` has no `self` parameter
help: consider turning `register_channel` into a method by giving it a `&self` argument
   |
79 |     async fn register_channel(&self,
   |                               ++++++
help: alternatively, consider constraining `register_channel` so it does not apply to trait objects
   |
82 |         executor: Arc<Executor<'_, Self: Sized>>,
   |                                  +++++++++++++
-----------------------
This commit is contained in:
lunar-mining
2022-07-01 21:57:57 +02:00
parent 5382a76eac
commit ab2a5af40b
9 changed files with 62 additions and 21 deletions

View File

@@ -1,4 +1,4 @@
use async_std::sync::Arc;
use async_std::sync::{Arc, Mutex};
use std::{env, fs};
use log::{error, info};
@@ -11,7 +11,8 @@ use crate::{
};
use super::{
Channel, ChannelPtr, TcpTransport, TorTransport, Transport, TransportListener, TransportName,
Channel, ChannelPtr, SessionWeakPtr, TcpTransport, TorTransport, Transport, TransportListener,
TransportName,
};
/// Atomic pointer to Acceptor class.
@@ -21,12 +22,17 @@ pub type AcceptorPtr = Arc<Acceptor>;
pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
task: StoppableTaskPtr,
//pub session: Mutex<Option<SessionWeakPtr>>,
}
impl Acceptor {
/// Create new Acceptor object.
pub fn new() -> Arc<Self> {
Arc::new(Self { channel_subscriber: Subscriber::new(), task: StoppableTask::new() })
Arc::new(Self {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
//session,
})
}
/// Start accepting inbound socket connections. Creates a listener to start
/// listening on a local socket address. Then runs an accept loop in a new
@@ -91,7 +97,7 @@ impl Acceptor {
"Please set the env var DARKFI_TOR_COOKIE to the configured tor cookie file. \
For example: \
\'export DARKFI_TOR_COOKIE=\"/var/lib/tor/control_auth_cookie\"\'".to_string(),
))
));
}
let auth_cookie = auth_cookie.unwrap();

View File

@@ -19,7 +19,7 @@ use crate::{
use super::{
message,
message_subscriber::{MessageSubscription, MessageSubsystem},
TransportStream,
SessionWeakPtr, TransportStream,
};
/// Atomic pointer to async channel.
@@ -68,13 +68,18 @@ pub struct Channel {
receive_task: StoppableTaskPtr,
stopped: Mutex<bool>,
info: Mutex<ChannelInfo>,
//session: SessionWeakPtr,
}
impl Channel {
/// Sets up a new channel. Creates a reader and writer TCP stream and
/// summons the message subscriber subsystem. Performs a network
/// handshake on the subsystem dispatchers.
pub async fn new(stream: Box<dyn TransportStream>, address: Url) -> Arc<Self> {
pub async fn new(
stream: Box<dyn TransportStream>,
address: Url,
//session: SessionWeakPtr,
) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
let writer = Mutex::new(writer);
@@ -91,6 +96,7 @@ impl Channel {
receive_task: StoppableTask::new(),
stopped: Mutex::new(false),
info: Mutex::new(ChannelInfo::new()),
//session,
})
}

View File

@@ -7,18 +7,22 @@ use url::Url;
use crate::{Error, Result};
use super::{
Channel, ChannelPtr, SettingsPtr, TcpTransport, TorTransport, Transport, TransportName,
Channel, ChannelPtr, SessionWeakPtr, SettingsPtr, TcpTransport, TorTransport, Transport,
TransportName,
};
/// Create outbound socket connections.
pub struct Connector {
settings: SettingsPtr,
//pub session: SessionWeakPtr,
}
impl Connector {
/// Create a new connector with default network settings.
pub fn new(settings: SettingsPtr) -> Self {
Self { settings }
pub fn new(settings: SettingsPtr, // session: SessionWeakPtr
) -> Self {
Self { settings, // session
}
}
/// Establish an outbound connection.
@@ -53,6 +57,7 @@ impl Connector {
}
let channel = match $upgrade {
// session
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
Some(u) if u == "tls" => {
let stream = $transport.upgrade_dialer(stream?)?.await;

View File

@@ -96,7 +96,9 @@ pub use message::Message;
pub use message_subscriber::MessageSubscription;
pub use p2p::{P2p, P2pPtr};
pub use protocol::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr};
pub use session::{SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED};
pub use session::{
SessionWeakPtr, SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED,
};
pub use settings::{Settings, SettingsPtr};
pub use transport::{
TcpTransport, TorTransport, Transport, TransportListener, TransportName, TransportStream,

View File

@@ -38,14 +38,21 @@ pub struct InboundSession {
impl InboundSession {
/// Create a new inbound session.
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
//let acceptor = Acceptor::new(Mutex::new(None));
let acceptor = Acceptor::new();
Arc::new(Self {
let self_ = Arc::new(Self {
p2p,
acceptor,
accept_task: StoppableTask::new(),
connect_infos: Mutex::new(FxHashMap::default()),
})
});
//let parent = Arc::downgrade(&self_);
//*self_.acceptor.session.lock().await = Some(Arc::new(parent));
self_
}
/// Starts the inbound session. Begins by accepting connections and fails if

View File

@@ -56,7 +56,12 @@ impl ManualSession {
addr: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let connector = Connector::new(self.p2p().settings());
let parent = Arc::downgrade(&self);
let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
let settings = self.p2p().settings();
let attempts = settings.manual_attempt_limit;

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use log::debug;
@@ -46,6 +46,8 @@ pub use manual_session::ManualSession;
pub use outbound_session::OutboundSession;
pub use seed_session::SeedSession;
pub type SessionWeakPtr = Arc<Weak<dyn Session + Send + Sync + 'static>>;
/// Removes channel from the list of connected channels when a stop signal is
/// received.
async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
@@ -75,7 +77,7 @@ pub trait Session: Sync {
/// Registers a new channel with the session. Performs a network handshake
/// and starts the channel.
async fn register_channel(
self: Arc<Self>,
self_: Arc<dyn Session>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
@@ -85,14 +87,14 @@ pub trait Session: Sync {
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self.p2p();
let p2p = self_.p2p();
let protocols =
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await;
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// Switch on the channel
channel.start(executor.clone());

View File

@@ -135,7 +135,12 @@ impl OutboundSession {
slot_number: u32,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let connector = Connector::new(self.p2p().settings());
let parent = Arc::downgrade(&self);
let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
loop {
let addr = self.load_address(slot_number).await?;

View File

@@ -104,7 +104,10 @@ impl SeedSession {
(p2p.hosts(), p2p.settings())
};
let connector = Connector::new(settings.clone());
let parent = Arc::downgrade(&self);
let connector = Connector::new(
settings.clone(), //Arc::new(parent)
);
match connector.connect(seed.clone()).await {
Ok(channel) => {
// Blacklist goes here