net: Remove async-std dependency and prefer smol.

Also we now take the global executor and keep it around in P2p.
This commit is contained in:
parazyd
2023-08-22 15:02:05 +02:00
parent 15203f1615
commit 0f125fa2db
37 changed files with 371 additions and 1031 deletions

782
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -62,6 +62,7 @@ async-std = {version = "1.12.0", features = ["attributes"], optional = true}
async-trait = {version = "0.1.73", optional = true}
futures = {version = "0.3.28", optional = true}
smol = {version = "1.3.0", optional = true}
pin-project-lite = {version = "0.2.12", optional = true}
# Networking
async-rustls = {version = "0.4.0", features = ["dangerous_configuration"], optional = true}
@@ -186,18 +187,19 @@ event-graph = [
net = [
"async-rustls",
"async-trait",
"ed25519-compact",
"futures",
"rand",
"rcgen",
"rustls-pemfile",
"semver",
"socket2",
"smol",
"structopt",
"structopt-toml",
"url",
"x509-parser",
"async-runtime",
"darkfi-serial",
"darkfi-serial/async",
"darkfi-serial/url",
@@ -220,9 +222,9 @@ rpc = [
]
system = [
"pin-project-lite",
"rand",
"async-runtime",
"smol",
]
tx = [

View File

@@ -28,11 +28,11 @@ use darkfi::{
Result,
};
async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<()> {
let p2p = net::P2p::new(options.network_settings).await;
async fn start(executor: Arc<Executor<'static>>, options: ProgramOptions) -> Result<()> {
let p2p = net::P2p::new(options.network_settings, executor.clone()).await;
p2p.clone().start(executor.clone()).await?;
p2p.run(executor).await?;
p2p.clone().start().await?;
p2p.run().await?;
Ok(())
}

View File

@@ -24,7 +24,7 @@ use crate::{
Float10, ValidatorStatePtr,
},
net::P2pPtr,
util::async_util::sleep,
system::sleep,
Result,
};

View File

@@ -23,7 +23,8 @@ use super::consensus_sync_task;
use crate::{
consensus::{constants, ValidatorStatePtr},
net::P2pPtr,
util::{async_util::sleep, time::Timestamp},
system::sleep,
util::time::Timestamp,
Result,
};

View File

@@ -127,8 +127,9 @@ pub enum Error {
#[error("Connection failed")]
ConnectFailed,
#[error("Timeout Error")]
TimeoutError,
#[cfg(feature = "system")]
#[error(transparent)]
TimeoutError(#[from] crate::system::timeout::TimeoutError),
#[error("Connection timed out")]
ConnectTimeout,
@@ -616,13 +617,6 @@ impl From<std::io::Error> for ClientFailed {
}
}
#[cfg(feature = "async-std")]
impl From<async_std::future::TimeoutError> for Error {
fn from(_err: async_std::future::TimeoutError) -> Self {
Self::TimeoutError
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::Io(err.kind())

View File

@@ -428,7 +428,7 @@ mod tests {
};
use super::*;
use crate::{event_graph::events_queue::EventsQueue, util::async_util::sleep, Result};
use crate::{event_graph::events_queue::EventsQueue, system::sleep, Result};
#[derive(SerialEncodable, SerialDecodable, Clone, Debug)]
pub struct PrivMsgEvent {

View File

@@ -28,7 +28,8 @@ use crate::{
event_graph::model::{Event, EventId, ModelPtr},
impl_p2p_message, net,
net::Message,
util::{async_util::sleep, ringbuffer::RingBuffer},
system::sleep,
util::ringbuffer::RingBuffer,
Result,
};

View File

@@ -16,9 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::{Arc, Mutex};
use std::sync::Arc;
use log::error;
use smol::Executor;
use smol::{lock::Mutex, Executor};
use url::Url;
use super::{

View File

@@ -16,15 +16,16 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::{Arc, Mutex};
use std::sync::Arc;
use darkfi_serial::{serialize, SerialDecodable, SerialEncodable};
use futures::{
io::{ReadHalf, WriteHalf},
AsyncReadExt,
};
use log::{debug, error, info};
use rand::{rngs::OsRng, Rng};
use smol::Executor;
use smol::{
io::{self, ReadHalf, WriteHalf},
lock::Mutex,
Executor,
};
use url::Url;
use super::{
@@ -89,7 +90,7 @@ impl Channel {
/// summons the message subscriber subsystem. Performs a network handshake
/// on the subsystem dispatchers.
pub async fn new(stream: Box<dyn PtStream>, addr: Url, session: SessionWeakPtr) -> Arc<Self> {
let (reader, writer) = stream.split();
let (reader, writer) = io::split(stream);
let reader = Mutex::new(reader);
let writer = Mutex::new(writer);

View File

@@ -16,9 +16,11 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi_serial::{SerialDecodable, SerialEncodable};
use url::Url;
use super::channel::ChannelInfo;
use crate::util::time::NanoTimestamp;
use url::Url;
macro_rules! dnetev {
($self:expr, $event_name:ident, $($code:tt)*) => {

View File

@@ -16,11 +16,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use async_std::sync::{Arc, RwLock};
use log::debug;
use rand::{prelude::IteratorRandom, rngs::OsRng};
use smol::lock::RwLock;
use url::Url;
use super::settings::SettingsPtr;
@@ -243,8 +246,9 @@ impl Hosts {
mod tests {
use super::{super::settings::Settings, *};
#[async_std::test]
async fn test_store_localnet() {
#[test]
fn test_store_localnet() {
smol::block_on(async {
let mut settings = Settings::default();
settings.localnet = true;
settings.external_addrs = vec![
@@ -280,10 +284,12 @@ mod tests {
for i in remote_hosts {
assert!(hosts.contains(&i).await);
}
});
}
#[async_std::test]
async fn test_store() {
#[test]
fn test_store() {
smol::block_on(async {
let mut settings = Settings::default();
settings.localnet = false;
settings.external_addrs = vec![
@@ -315,5 +321,6 @@ mod tests {
assert!(hosts.contains(&remote_hosts[0]).await);
assert!(hosts.contains(&remote_hosts[1]).await);
assert!(!hosts.contains(&remote_hosts[2]).await);
});
}
}

View File

@@ -17,8 +17,8 @@
*/
use darkfi_serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use log::trace;
use smol::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use url::Url;
use crate::{Error, Result};

View File

@@ -16,13 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{any::Any, collections::HashMap, io::Cursor};
use std::{any::Any, collections::HashMap, io::Cursor, sync::Arc};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, warn};
use rand::{rngs::OsRng, Rng};
use smol::lock::Mutex;
use super::message::Message;
use crate::{Error, Result};
@@ -277,12 +277,13 @@ mod tests {
use super::*;
use darkfi_serial::{serialize, SerialDecodable, SerialEncodable};
#[async_std::test]
async fn message_subscriber_test() {
#[test]
fn message_subscriber_test() {
#[derive(SerialEncodable, SerialDecodable)]
struct MyVersionMessage(pub u32);
crate::impl_p2p_message!(MyVersionMessage, "verver");
smol::block_on(async {
let subsystem = MessageSubsystem::new();
subsystem.add_dispatch::<MyVersionMessage>().await;
@@ -311,5 +312,6 @@ mod tests {
assert!(msg2.is_err());
sub.unsubscribe().await;
});
}
}

View File

@@ -16,16 +16,15 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::{HashMap, HashSet};
use async_std::{
stream::StreamExt,
sync::{Arc, Mutex},
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use futures::{stream::FuturesUnordered, TryFutureExt};
use log::{debug, error, info, warn};
use rand::{prelude::IteratorRandom, rngs::OsRng};
use smol::Executor;
use smol::{lock::Mutex, stream::StreamExt, Executor};
use url::Url;
use super::{
@@ -54,6 +53,8 @@ pub type P2pPtr = Arc<P2p>;
/// Toplevel peer-to-peer networking interface
pub struct P2p {
/// Global multithreaded executor reference
executor: Arc<Executor<'static>>,
/// Channels pending connection
pending: PendingChannels,
/// Connected channels
@@ -93,10 +94,11 @@ impl P2p {
///
/// Creates a weak pointer to self that is used by all sessions to access the
/// p2p parent class.
pub async fn new(settings: Settings) -> P2pPtr {
pub async fn new(settings: Settings, executor: Arc<Executor<'static>>) -> P2pPtr {
let settings = Arc::new(settings);
let self_ = Arc::new(Self {
executor,
pending: Mutex::new(HashSet::new()),
channels: Mutex::new(HashMap::new()),
channel_subscriber: Subscriber::new(),
@@ -126,28 +128,28 @@ impl P2p {
}
/// Invoke startup and seeding sequence. Call from constructing thread.
pub async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>) -> Result<()> {
debug!(target: "net::p2p::start()", "P2P::start() [BEGIN]");
info!(target: "net::p2p::start()", "[P2P] Seeding P2P subsystem");
// Start seed session
let seed = SeedSyncSession::new(Arc::downgrade(&self));
// This will block until all seed queries have finished
seed.start(ex.clone()).await?;
seed.start().await?;
debug!(target: "net::p2p::start()", "P2P::start() [END]");
Ok(())
}
/// Reseed the P2P network.
pub async fn reseed(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
pub async fn reseed(self: Arc<Self>) -> Result<()> {
debug!(target: "net::p2p::reseed()", "P2P::reseed() [BEGIN]");
info!(target: "net::p2p::reseed()", "[P2P] Reseeding P2P subsystem");
// Start seed session
let seed = SeedSyncSession::new(Arc::downgrade(&self));
// This will block until all seed queries have finished
seed.start(ex.clone()).await?;
seed.start().await?;
debug!(target: "net::p2p::reseed()", "P2P::reseed() [END]");
Ok(())
@@ -155,23 +157,23 @@ impl P2p {
/// Runs the network. Starts inbound, outbound, and manual sessions.
/// Waits for a stop signal and stops the network if received.
pub async fn run(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
pub async fn run(self: Arc<Self>) -> Result<()> {
debug!(target: "net::p2p::run()", "P2P::run() [BEGIN]");
info!(target: "net::p2p::run()", "[P2P] Running P2P subsystem");
// First attempt any set manual connections
let manual = self.session_manual().await;
for peer in &self.settings.peers {
manual.clone().connect(peer.clone(), ex.clone()).await;
manual.clone().connect(peer.clone()).await;
}
// Start the inbound session
let inbound = self.session_inbound().await;
inbound.clone().start(ex.clone()).await?;
inbound.clone().start().await?;
// Start the outbound session
let outbound = self.session_outbound().await;
outbound.clone().start(ex.clone()).await?;
outbound.clone().start().await?;
info!(target: "net::p2p::run()", "[P2P] P2P subsystem started");
@@ -287,6 +289,11 @@ impl P2p {
self.hosts.clone()
}
/// Reference the global executor
pub(super) fn executor(&self) -> Arc<Executor<'static>> {
self.executor.clone()
}
/// Return a reference to the internal protocol registry
pub fn protocol_registry(&self) -> &ProtocolRegistry {
&self.protocol_registry

View File

@@ -16,7 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Arc;
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use smol::Executor;
@@ -34,7 +35,7 @@ use super::{
protocol_base::{ProtocolBase, ProtocolBasePtr},
protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
};
use crate::{util::async_util::sleep, Result};
use crate::{system::sleep, Result};
/// Defines address and get-address messages
pub struct ProtocolAddress {

View File

@@ -16,7 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Arc;
use std::sync::Arc;
use async_trait::async_trait;
use smol::Executor;

View File

@@ -16,10 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::{Arc, Mutex};
use futures::Future;
use std::sync::Arc;
use log::{debug, trace};
use smol::{Executor, Task};
use smol::{future::Future, lock::Mutex, Executor, Task};
use super::super::channel::ChannelPtr;
use crate::Result;

View File

@@ -16,9 +16,11 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::{Duration, Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use async_std::{future::timeout, sync::Arc};
use async_trait::async_trait;
use log::{debug, error, warn};
use rand::{rngs::OsRng, Rng};
@@ -35,7 +37,10 @@ use super::{
protocol_base::{ProtocolBase, ProtocolBasePtr},
protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
};
use crate::{util::async_util::sleep, Error, Result};
use crate::{
system::{sleep, timeout::timeout},
Error, Result,
};
/// Defines ping and pong messages
pub struct ProtocolPing {

View File

@@ -16,17 +16,18 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Mutex;
use futures::{future::BoxFuture, Future};
use log::debug;
use smol::{
future::{Boxed, Future},
lock::Mutex,
};
use super::{
super::{channel::ChannelPtr, p2p::P2pPtr, session::SessionBitFlag},
protocol_base::ProtocolBasePtr,
};
type Constructor =
Box<dyn Fn(ChannelPtr, P2pPtr) -> BoxFuture<'static, ProtocolBasePtr> + Send + Sync>;
type Constructor = Box<dyn Fn(ChannelPtr, P2pPtr) -> Boxed<ProtocolBasePtr> + Send + Sync>;
#[derive(Default)]
pub struct ProtocolRegistry {
@@ -45,9 +46,8 @@ impl ProtocolRegistry {
C: 'static + Fn(ChannelPtr, P2pPtr) -> F + Send + Sync,
F: 'static + Future<Output = ProtocolBasePtr> + Send,
{
let constructor = move |channel, p2p| {
Box::pin(constructor(channel, p2p)) as BoxFuture<'static, ProtocolBasePtr>
};
let constructor =
move |channel, p2p| Box::pin(constructor(channel, p2p)) as Boxed<ProtocolBasePtr>;
self.constructors.lock().await.push((session_flags, Box::new(constructor)));
}

View File

@@ -16,7 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Arc;
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use smol::Executor;

View File

@@ -16,9 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use async_std::{future::timeout, sync::Arc};
use futures::future::join_all;
use log::{debug, error};
use smol::Executor;
@@ -30,7 +29,7 @@ use super::super::{
message_subscriber::MessageSubscription,
settings::SettingsPtr,
};
use crate::{Error, Result};
use crate::{system::timeout::timeout, Error, Result};
/// Implements the protocol version handshake sent out by nodes at
/// the beginning of a connection.

View File

@@ -23,10 +23,11 @@
//! an acceptor pointer, and a stoppable task pointer. Using a weak pointer
//! to P2P allows us to avoid circular dependencies.
use async_std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use log::{debug, error, info};
use smol::Executor;
use smol::{lock::Mutex, Executor};
use url::Url;
use super::{
@@ -60,12 +61,14 @@ impl InboundSession {
/// Starts the inbound session. Begins by accepting connections and fails
/// if the addresses are not configured. Then runs the channel subscription
/// loop.
pub async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>) -> Result<()> {
if self.p2p().settings().inbound_addrs.is_empty() {
info!(target: "net::inbound_session", "[P2P] Not configured for inbound connections.");
return Ok(())
}
let ex = self.p2p().executor();
// Activate mutex lock on accept tasks.
let mut accept_tasks = self.accept_tasks.lock().await;

View File

@@ -29,10 +29,11 @@
//! and insures that no other part of the program uses the slots at the
//! same time.
use async_std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use log::{info, warn};
use smol::Executor;
use smol::lock::Mutex;
use url::Url;
use super::{
@@ -44,8 +45,7 @@ use super::{
Session, SessionBitFlag, SESSION_MANUAL,
};
use crate::{
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr},
util::async_util::sleep,
system::{sleep, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr},
Error, Result,
};
@@ -82,11 +82,12 @@ impl ManualSession {
}
/// Connect the manual session to the given address
pub async fn connect(self: Arc<Self>, addr: Url, ex: Arc<Executor<'_>>) {
pub async fn connect(self: Arc<Self>, addr: Url) {
let ex = self.p2p().executor();
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(addr, ex.clone()),
self.clone().channel_connect_loop(addr),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
@@ -97,11 +98,8 @@ impl ManualSession {
}
/// Creates a connector object and tries to connect using it
pub async fn channel_connect_loop(
self: Arc<Self>,
addr: Url,
ex: Arc<Executor<'_>>,
) -> Result<()> {
pub async fn channel_connect_loop(self: Arc<Self>, addr: Url) -> Result<()> {
let ex = self.p2p().executor();
let parent = Arc::downgrade(&self);
let settings = self.p2p().settings();
let connector = Connector::new(settings.clone(), Arc::new(parent));

View File

@@ -16,7 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::{Arc, Weak};
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use log::debug;
use smol::Executor;

View File

@@ -26,12 +26,14 @@
//! and insures that no other part of the program uses the slots at the
//! same time.
use std::collections::HashSet;
use std::{
collections::HashSet,
sync::{Arc, Weak},
};
use async_std::sync::{Arc, Mutex, Weak};
use async_trait::async_trait;
use log::{debug, error, info, warn};
use smol::Executor;
use smol::{lock::Mutex, Executor};
use url::Url;
use super::{
@@ -45,8 +47,7 @@ use super::{
Session, SessionBitFlag, SESSION_OUTBOUND,
};
use crate::{
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr},
util::async_util::sleep,
system::{sleep, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr},
Error, Result,
};
@@ -98,7 +99,8 @@ impl OutboundSession {
}
/// Start the outbound session. Runs the channel connect loop.
pub async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>) -> Result<()> {
let ex = self.p2p().executor();
let n_slots = self.p2p().settings().outbound_connections;
info!(target: "net::outbound_session", "[P2P] Starting {} outbound connection slots.", n_slots);
// Activate mutex lock on connection slots.
@@ -108,7 +110,7 @@ impl OutboundSession {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(i, ex.clone()),
self.clone().channel_connect_loop(i),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
@@ -131,11 +133,8 @@ impl OutboundSession {
}
/// Creates a connector object and tries to connect using it.
pub async fn channel_connect_loop(
self: Arc<Self>,
slot: u32,
ex: Arc<Executor<'_>>,
) -> Result<()> {
pub async fn channel_connect_loop(self: Arc<Self>, slot: u32) -> Result<()> {
let ex = self.p2p().executor();
let parent = Arc::downgrade(&self);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
@@ -199,7 +198,7 @@ impl OutboundSession {
);
// Find an address to connect to. We also do peer discovery here if needed.
let addr = self.load_address(slot, transports, ex.clone()).await?;
let addr = self.load_address(slot, transports).await?;
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
@@ -270,12 +269,7 @@ impl OutboundSession {
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending). If no address was found, we'll attempt
/// to do peer discovery and try to fill the slot again.
async fn load_address(
&self,
slot: u32,
transports: &[String],
ex: Arc<Executor<'_>>,
) -> Result<Url> {
async fn load_address(&self, slot: u32, transports: &[String]) -> Result<Url> {
loop {
let p2p = self.p2p();
let retry_sleep = p2p.settings().outbound_connect_timeout;
@@ -349,7 +343,7 @@ impl OutboundSession {
// inside peer_discovery, it will block here in the slot sessions, while
// other slots can keep trying to find hosts. This is also why we sleep
// in the beginning of this loop if peer discovery is currently active.
self.peer_discovery(slot, ex.clone()).await;
self.peer_discovery(slot).await;
}
}
@@ -360,7 +354,7 @@ impl OutboundSession {
/// This function will also sleep `Settings::outbound_connect_timeout` seconds
/// after broadcasting in order to let the P2P stack receive and work through
/// the addresses it is expecting.
async fn peer_discovery(&self, slot: u32, ex: Arc<Executor<'_>>) {
async fn peer_discovery(&self, slot: u32) {
let p2p = self.p2p();
if *p2p.peer_discovery_running.lock().await {
@@ -395,7 +389,7 @@ impl OutboundSession {
"[P2P] No connected channels found for peer discovery. Reseeding.",
);
if let Err(e) = p2p.clone().reseed(ex.clone()).await {
if let Err(e) = p2p.clone().reseed().await {
error!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Network reseed failed: {}", e,

View File

@@ -37,7 +37,8 @@
//! function. This runs the version exchange protocol, stores the channel in the
//! p2p list of channels, and subscribes to a stop signal.
use async_std::sync::{Arc, Weak};
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use futures::future::join_all;
use log::{debug, info, warn};
@@ -68,7 +69,7 @@ impl SeedSyncSession {
/// Start the seed sync session. Creates a new task for every seed
/// connection and starts the seed on each task.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(self: Arc<Self>) -> Result<()> {
debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [START]");
let settings = self.p2p().settings();
@@ -82,6 +83,7 @@ impl SeedSyncSession {
}
// Gather tasks so we can execute concurrently
let executor = self.p2p().executor();
let mut tasks = Vec::with_capacity(settings.seeds.len());
for (i, seed) in settings.seeds.iter().enumerate() {
let ex_ = executor.clone();

View File

@@ -16,7 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Arc;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

View File

@@ -19,7 +19,7 @@
use std::time::Duration;
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite};
use smol::io::{AsyncRead, AsyncWrite};
use url::Url;
use crate::{Error, Result};
@@ -270,7 +270,7 @@ impl Listener {
"tcp" => {
// Build a TCP listener
enforce_hostport!(endpoint);
let variant = tcp::TcpListener::new(1024).await?;
let variant = tcp::TcpListener::new().await?;
let variant = ListenerVariant::Tcp(variant);
Ok(Self { endpoint, variant })
}
@@ -279,7 +279,7 @@ impl Listener {
"tcp+tls" => {
// Build a TCP listener wrapped with TLS
enforce_hostport!(endpoint);
let variant = tcp::TcpListener::new(1024).await?;
let variant = tcp::TcpListener::new().await?;
let variant = ListenerVariant::TcpTls(variant);
Ok(Self { endpoint, variant })
}
@@ -334,10 +334,10 @@ impl Listener {
pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
#[cfg(feature = "p2p-transport-tcp")]
impl PtStream for async_std::net::TcpStream {}
impl PtStream for smol::net::TcpStream {}
#[cfg(feature = "p2p-transport-tcp")]
impl PtStream for async_rustls::TlsStream<async_std::net::TcpStream> {}
impl PtStream for async_rustls::TlsStream<smol::net::TcpStream> {}
#[cfg(feature = "p2p-transport-tor")]
impl PtStream for arti_client::DataStream {}
@@ -346,7 +346,7 @@ impl PtStream for arti_client::DataStream {}
impl PtStream for async_rustls::TlsStream<arti_client::DataStream> {}
#[cfg(feature = "p2p-transport-unix")]
impl PtStream for async_std::os::unix::net::UnixStream {}
impl PtStream for smol::net::unix::UnixStream {}
/// Wrapper trait for async listeners
#[async_trait]

View File

@@ -16,17 +16,16 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{io, time::Duration};
use std::time::Duration;
use async_rustls::{TlsAcceptor, TlsStream};
use async_std::net::{SocketAddr, TcpListener as AsyncStdTcpListener, TcpStream};
use async_trait::async_trait;
use log::debug;
use socket2::{Domain, Socket, TcpKeepalive, Type};
use smol::net::{SocketAddr, TcpListener as SmolTcpListener, TcpStream};
use url::Url;
use super::{PtListener, PtStream};
use crate::Result;
use crate::{system::io_timeout, Result};
/// TCP Dialer implementation
#[derive(Debug, Clone)]
@@ -41,97 +40,48 @@ impl TcpDialer {
Ok(Self { ttl })
}
/// Internal helper function to create a TCP socket.
async fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
let domain = if socket_addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 };
let socket = Socket::new(domain, Type::STREAM, Some(socket2::Protocol::TCP))?;
if socket_addr.is_ipv6() {
socket.set_only_v6(true)?;
}
if let Some(ttl) = self.ttl {
socket.set_ttl(ttl)?;
}
socket.set_nodelay(true)?;
let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(20));
socket.set_tcp_keepalive(&keepalive)?;
socket.set_reuse_port(true)?;
Ok(socket)
}
/// Internal dial function
pub(crate) async fn do_dial(
&self,
socket_addr: SocketAddr,
timeout: Option<Duration>,
conn_timeout: Option<Duration>,
) -> Result<TcpStream> {
debug!(target: "net::tcp::do_dial", "Dialing {} with TCP...", socket_addr);
let socket = self.create_socket(socket_addr).await?;
let connection = if let Some(timeout) = timeout {
socket.connect_timeout(&socket_addr.into(), timeout)
let stream = if let Some(conn_timeout) = conn_timeout {
io_timeout(conn_timeout, TcpStream::connect(socket_addr)).await?
} else {
socket.connect(&socket_addr.into())
TcpStream::connect(socket_addr).await?
};
match connection {
Ok(()) => {}
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
Err(err) => return Err(err.into()),
if let Some(ttl) = self.ttl {
stream.set_ttl(ttl)?;
}
socket.set_nonblocking(true)?;
let stream = TcpStream::from(std::net::TcpStream::from(socket));
stream.set_nodelay(true)?;
Ok(stream)
}
}
/// TCP Listener implementation
#[derive(Debug, Clone)]
pub struct TcpListener {
/// Size of the listen backlog for listen sockets
backlog: i32,
}
pub struct TcpListener;
impl TcpListener {
/// Instantiate a new [`TcpListener`] with given backlog size.
pub async fn new(backlog: i32) -> Result<Self> {
Ok(Self { backlog })
}
/// Internal helper function to create a TCP socket.
async fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
let domain = if socket_addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 };
let socket = Socket::new(domain, Type::STREAM, Some(socket2::Protocol::TCP))?;
if socket_addr.is_ipv6() {
socket.set_only_v6(true)?;
}
socket.set_nodelay(true)?;
let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(20));
socket.set_tcp_keepalive(&keepalive)?;
socket.set_reuse_port(true)?;
Ok(socket)
pub async fn new() -> Result<Self> {
Ok(Self {})
}
/// Internal listen function
pub(crate) async fn do_listen(&self, socket_addr: SocketAddr) -> Result<AsyncStdTcpListener> {
let socket = self.create_socket(socket_addr).await?;
socket.bind(&socket_addr.into())?;
socket.listen(self.backlog)?;
socket.set_nonblocking(true)?;
Ok(AsyncStdTcpListener::from(std::net::TcpListener::from(socket)))
pub(crate) async fn do_listen(&self, socket_addr: SocketAddr) -> Result<SmolTcpListener> {
let listener = SmolTcpListener::bind(socket_addr).await?;
Ok(listener)
}
}
#[async_trait]
impl PtListener for AsyncStdTcpListener {
impl PtListener for SmolTcpListener {
async fn next(&self) -> Result<(Box<dyn PtStream>, Url)> {
let (stream, peer_addr) = match self.accept().await {
Ok((s, a)) => (s, a),
@@ -144,7 +94,7 @@ impl PtListener for AsyncStdTcpListener {
}
#[async_trait]
impl PtListener for (TlsAcceptor, AsyncStdTcpListener) {
impl PtListener for (TlsAcceptor, SmolTcpListener) {
async fn next(&self) -> Result<(Box<dyn PtStream>, Url)> {
let (stream, peer_addr) = match self.1.accept().await {
Ok((s, a)) => (s, a),

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::SystemTime;
use std::{sync::Arc, time::SystemTime};
use async_rustls::{
rustls,
@@ -29,7 +29,6 @@ use async_rustls::{
},
TlsAcceptor, TlsConnector, TlsStream,
};
use async_std::sync::Arc;
use log::error;
use rustls_pemfile::pkcs8_private_keys;
use x509_parser::{
@@ -255,8 +254,8 @@ impl TlsUpgrade {
#[cfg(feature = "p2p-transport-tcp")]
pub async fn upgrade_listener_tcp_tls(
self,
listener: async_std::net::TcpListener,
) -> Result<(TlsAcceptor, async_std::net::TcpListener)> {
listener: smol::net::TcpListener,
) -> Result<(TlsAcceptor, smol::net::TcpListener)> {
Ok((TlsAcceptor::from(self.server_config), listener))
}
}

View File

@@ -19,10 +19,9 @@
use std::time::Duration;
use arti_client::{config::BoolOrAuto, DataStream, StreamPrefs, TorClient};
use async_std::future;
use log::debug;
use crate::Result;
use crate::{system::timeout::timeout, Result};
/// Tor Dialer implementation
#[derive(Debug, Clone)]
@@ -39,7 +38,7 @@ impl TorDialer {
&self,
host: &str,
port: u16,
timeout: Option<Duration>,
conn_timeout: Option<Duration>,
) -> Result<DataStream> {
debug!(target: "net::tor::do_dial", "Dialing {}:{} with Tor...", host, port);
debug!(target: "net::tor::do_dial", "Bootstrapping...");
@@ -47,15 +46,12 @@ impl TorDialer {
let mut stream_prefs = StreamPrefs::new();
stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true));
if timeout.is_some() {
let res = future::timeout(
timeout.unwrap(),
client.connect_with_prefs((host, port), &stream_prefs),
)
.await?;
return Ok(res?)
}
let stream = if let Some(conn_timeout) = conn_timeout {
timeout(conn_timeout, client.connect_with_prefs((host, port), &stream_prefs)).await?
} else {
Ok(client.connect_with_prefs((host, port), &stream_prefs).await?)
};
Ok(stream?)
}
}

View File

@@ -15,13 +15,15 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::{
fs,
os::unix::net::{UnixListener as AsyncStdUnixListener, UnixStream},
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use log::debug;
use smol::{
fs,
net::unix::{UnixListener as SmolUnixListener, UnixStream},
};
use url::Url;
use super::{PtListener, PtStream};
@@ -59,16 +61,16 @@ impl UnixListener {
}
/// Internal listen function
pub(crate) async fn do_listen(&self, path: &PathBuf) -> Result<AsyncStdUnixListener> {
pub(crate) async fn do_listen(&self, path: &PathBuf) -> Result<SmolUnixListener> {
// This rm is a bit aggressive, but c'est la vie.
let _ = fs::remove_file(path).await;
let listener = AsyncStdUnixListener::bind(path).await?;
let listener = SmolUnixListener::bind(path)?;
Ok(listener)
}
}
#[async_trait]
impl PtListener for AsyncStdUnixListener {
impl PtListener for SmolUnixListener {
async fn next(&self) -> Result<(Box<dyn PtStream>, Url)> {
let (stream, _peer_addr) = match self.accept().await {
Ok((s, a)) => (s, a),

View File

@@ -16,6 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::Duration;
use smol::Timer;
/// Implementation of async background task spawning which are stoppable
/// using channel signalling.
pub mod stoppable_task;
@@ -28,3 +32,13 @@ pub use subscriber::{Subscriber, SubscriberPtr, Subscription};
/// Async timeout implementations
pub mod timeout;
pub use timeout::io_timeout;
/// Sleep for any number of seconds.
pub async fn sleep(seconds: u64) {
Timer::after(Duration::from_secs(seconds)).await;
}
/// Sleep for any number of milliseconds.
pub async fn msleep(millis: u64) {
Timer::after(Duration::from_millis(millis)).await;
}

View File

@@ -1,30 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use smol::Timer;
use std::time::Duration;
/// Sleep for any number of seconds.
pub async fn sleep(seconds: u64) {
Timer::after(Duration::from_secs(seconds)).await;
}
/// Sleep for any number of milliseconds.
pub async fn msleep(millis: u64) {
Timer::after(Duration::from_millis(millis)).await;
}

View File

@@ -118,7 +118,6 @@ pub fn get_log_config(verbosity_level: u8) -> simplelog::Config {
///
/// The Cargo.toml dependencies needed for this are:
/// ```text
/// async-std = "1.12.0"
/// darkfi = { path = "../../", features = ["util"] }
/// easy-parallel = "3.2.0"
/// signal-hook-async-std = "0.2.2"
@@ -159,7 +158,7 @@ pub fn get_log_config(verbosity_level: u8) -> simplelog::Config {
/// }
///
/// async_daemonize!(realmain);
/// async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
/// async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
/// println!("Hello, world!");
/// Ok(())
/// }
@@ -201,7 +200,7 @@ macro_rules! async_daemonize {
// https://docs.rs/smol/latest/smol/struct.Executor.html#examples
let n_threads = std::thread::available_parallelism().unwrap().get();
let ex = async_std::sync::Arc::new(smol::Executor::new());
let ex = std::sync::Arc::new(smol::Executor::new());
let (signal, shutdown) = smol::channel::unbounded::<()>();
let (_, result) = easy_parallel::Parallel::new()
// Run four executor threads

View File

@@ -16,10 +16,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#[cfg(feature = "async-runtime")]
/// async utility functions
pub mod async_util;
/// Command-line interface utilities
pub mod cli;