From 5b7306af7182f07aa02c2e465a2cc31a3642768d Mon Sep 17 00:00:00 2001 From: aggstam Date: Wed, 24 Aug 2022 21:29:19 +0300 Subject: [PATCH] p2pnet: implemented caller signaling --- bin/darkfid/src/main.rs | 12 +++++-- bin/darkwikid/src/main.rs | 2 +- bin/faucetd/src/main.rs | 6 +++- bin/irc-raft/src/main.rs | 2 +- bin/ircd/src/main.rs | 2 +- bin/lilith/src/main.rs | 2 +- bin/tau/taud/src/main.rs | 2 +- example/dchat/src/main.rs | 2 +- example/net.rs | 2 +- script/research/dhtd/src/main.rs | 2 +- script/research/fud/README.md | 14 -------- script/research/fud/fud/src/main.rs | 19 +++++++---- script/research/raft-diag/src/main.rs | 2 +- src/net/p2p.rs | 47 +++++++++++++++++++++++++-- src/net/session/outbound_session.rs | 13 ++++++-- src/util/async_util.rs | 11 +++++++ 16 files changed, 103 insertions(+), 37 deletions(-) diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 4e7d52fd7..42be878cb 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -395,16 +395,20 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, darkfid.clone())).detach(); info!("Starting sync P2P network"); + let (init_send, init_recv) = async_channel::bounded::<()>(1); sync_p2p.clone().unwrap().start(ex.clone()).await?; let _ex = ex.clone(); let _sync_p2p = sync_p2p.clone(); ex.spawn(async move { - if let Err(e) = _sync_p2p.unwrap().run(_ex).await { + if let Err(e) = _sync_p2p.unwrap().run(_ex, Some(init_send)).await { error!("Failed starting sync P2P network: {}", e); } }) .detach(); + init_recv.recv().await?; + info!("Sync P2P network initialized!"); + match block_sync_task(sync_p2p.clone().unwrap(), state.clone()).await { Ok(()) => *darkfid.synced.lock().await = true, Err(e) => error!("Failed syncing blockchain: {}", e), @@ -413,16 +417,20 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Consensus protocol if args.consensus && *darkfid.synced.lock().await { info!("Starting consensus P2P network"); + let (init_send, init_recv) = async_channel::bounded::<()>(1); consensus_p2p.clone().unwrap().start(ex.clone()).await?; let _ex = ex.clone(); let _consensus_p2p = consensus_p2p.clone(); ex.spawn(async move { - if let Err(e) = _consensus_p2p.unwrap().run(_ex).await { + if let Err(e) = _consensus_p2p.unwrap().run(_ex, Some(init_send)).await { error!("Failed starting consensus P2P network: {}", e); } }) .detach(); + init_recv.recv().await?; + info!("Consensus P2P network initialized!"); + info!("Starting consensus protocol task"); ex.spawn(proposal_task(consensus_p2p.unwrap(), sync_p2p.unwrap(), state)).detach(); } else { diff --git a/bin/darkwikid/src/main.rs b/bin/darkwikid/src/main.rs index 07a2a33c5..bfc980f73 100644 --- a/bin/darkwikid/src/main.rs +++ b/bin/darkwikid/src/main.rs @@ -551,7 +551,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; - executor.spawn(p2p.clone().run(executor.clone())).detach(); + executor.spawn(p2p.clone().run(executor.clone(), None)).detach(); // // Darkwiki start diff --git a/bin/faucetd/src/main.rs b/bin/faucetd/src/main.rs index 0a249bd4d..c66882377 100644 --- a/bin/faucetd/src/main.rs +++ b/bin/faucetd/src/main.rs @@ -399,16 +399,20 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, faucetd.clone())).detach(); info!("Starting sync P2P network"); + let (init_send, init_recv) = async_channel::bounded::<()>(1); sync_p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); let _sync_p2p = sync_p2p.clone(); ex.spawn(async move { - if let Err(e) = _sync_p2p.run(_ex).await { + if let Err(e) = _sync_p2p.run(_ex, Some(init_send)).await { error!("Failed starting sync P2P network: {}", e); } }) .detach(); + init_recv.recv().await?; + info!("Sync P2P network initialized!"); + match block_sync_task(sync_p2p.clone(), state.clone()).await { Ok(()) => *faucetd.synced.lock().await = true, Err(e) => error!("Failed syncing blockchain: {}", e), diff --git a/bin/irc-raft/src/main.rs b/bin/irc-raft/src/main.rs index 7733355aa..e2e134e69 100644 --- a/bin/irc-raft/src/main.rs +++ b/bin/irc-raft/src/main.rs @@ -196,7 +196,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; let executor_cloned = executor.clone(); - let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone())); + let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone(), None)); // // RPC interface diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index e82e37e77..32625fd90 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -219,7 +219,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; let executor_cloned = executor.clone(); - executor_cloned.spawn(p2p.clone().run(executor.clone())).detach(); + executor_cloned.spawn(p2p.clone().run(executor.clone(), None)).detach(); // // RPC interface diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index 277e62487..430e96b50 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -51,7 +51,7 @@ async fn spawn_network( p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); ex.spawn(async move { - if let Err(e) = p2p.run(_ex).await { + if let Err(e) = p2p.run(_ex, None).await { error!("Failed starting P2P network seed: {}", e); } }) diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 2afd6aca3..4d6dc0d09 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -217,7 +217,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; - executor.spawn(p2p.clone().run(executor.clone())).detach(); + executor.spawn(p2p.clone().run(executor.clone(), None)).detach(); // // Waiting Exit signal diff --git a/example/dchat/src/main.rs b/example/dchat/src/main.rs index ec8943e60..cc2c49a21 100644 --- a/example/dchat/src/main.rs +++ b/example/dchat/src/main.rs @@ -103,7 +103,7 @@ impl Dchat { self.register_protocol(self.recv_msgs.clone()).await?; self.p2p.clone().start(ex.clone()).await?; - ex2.spawn(self.p2p.clone().run(ex.clone())).detach(); + ex2.spawn(self.p2p.clone().run(ex.clone(), None)).detach(); self.menu().await?; diff --git a/example/net.rs b/example/net.rs index d3d2ab9e0..f60db5625 100644 --- a/example/net.rs +++ b/example/net.rs @@ -14,7 +14,7 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( let p2p = net::P2p::new(options.network_settings).await; p2p.clone().start(executor.clone()).await?; - p2p.run(executor).await?; + p2p.run(executor, None).await?; Ok(()) } diff --git a/script/research/dhtd/src/main.rs b/script/research/dhtd/src/main.rs index a91e69889..4d8cb1bb2 100644 --- a/script/research/dhtd/src/main.rs +++ b/script/research/dhtd/src/main.rs @@ -287,7 +287,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let _ex = ex.clone(); let _p2p = p2p.clone(); ex.spawn(async move { - if let Err(e) = _p2p.run(_ex).await { + if let Err(e) = _p2p.run(_ex, None).await { error!("Failed starting P2P network: {}", e); } }) diff --git a/script/research/fud/README.md b/script/research/fud/README.md index f23c09e47..d5cc76bdd 100644 --- a/script/research/fud/README.md +++ b/script/research/fud/README.md @@ -45,23 +45,9 @@ Run fud as follows: 13:23:04 [INFO] Entry: seedd_config.toml 13:23:04 [INFO] Starting 8 outbound connection slots. 13:23:04 [INFO] Entry: lt.py -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #0 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #1 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #2 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #3 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #6 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #4 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #5 -13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #7 13:23:07 [INFO] Caught termination signal, cleaning up and exiting... ``` -After daemon has been initialized, execute network syncing as follows: -``` -% fu sync -13:25:46 [INFO] Daemon synced successfully! -``` - fu ======= diff --git a/script/research/fud/fud/src/main.rs b/script/research/fud/fud/src/main.rs index be247a57a..f7178b990 100644 --- a/script/research/fud/fud/src/main.rs +++ b/script/research/fud/fud/src/main.rs @@ -102,6 +102,12 @@ impl Fud { let entries = fs::read_dir(&self.folder).unwrap(); { let mut lock = self.dht.write().await; + + // Sync lookup map with network + if let Err(e) = lock.sync_lookup_map().await { + error!("Failed to sync lookup map: {}", e); + } + for entry in entries { let e = entry.unwrap(); let name = String::from(e.file_name().to_str().unwrap()); @@ -196,11 +202,6 @@ impl Fud { let records = lock.map.clone(); let mut entries_hashes = HashSet::new(); - // Sync lookup map with network - if let Err(e) = lock.sync_lookup_map().await { - error!("Failed to sync lookup map: {}", e); - } - // We iterate files for new records for entry in entries { let e = entry.unwrap(); @@ -320,7 +321,7 @@ impl Fud { } } } - + // RPCAPI: // Replies to a ping method. // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} @@ -395,16 +396,20 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, fud.clone())).detach(); info!("Starting sync P2P network"); + let (init_send, init_recv) = async_channel::bounded::<()>(1); p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); let _p2p = p2p.clone(); ex.spawn(async move { - if let Err(e) = _p2p.run(_ex).await { + if let Err(e) = _p2p.run(_ex, Some(init_send)).await { error!("Failed starting P2P network: {}", e); } }) .detach(); + init_recv.recv().await?; + info!("P2P network initialized!"); + fud.init().await?; // Wait for SIGINT diff --git a/script/research/raft-diag/src/main.rs b/script/research/raft-diag/src/main.rs index 677b73ee7..c7fadbe5e 100644 --- a/script/research/raft-diag/src/main.rs +++ b/script/research/raft-diag/src/main.rs @@ -153,7 +153,7 @@ async fn start(args: Args, executor: Arc>) -> Result<()> { }) .await; - p2p.clone().start(executor.clone()).await?; + p2p.clone().start(executor.clone(), None).await?; executor.spawn(p2p.clone().run(executor.clone())).detach(); diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 8ff76b8b2..ebd279234 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -9,6 +9,7 @@ use url::Url; use crate::{ system::{Subscriber, SubscriberPtr, Subscription}, + util::async_util, Result, }; @@ -153,7 +154,11 @@ impl P2p { /// Runs the network. Starts inbound, outbound and manual sessions. /// Waits for a stop signal and stops the network if received. - pub async fn run(self: Arc, executor: Arc>) -> Result<()> { + pub async fn run( + self: Arc, + executor: Arc>, + init_signal: Option>, + ) -> Result<()> { debug!(target: "net", "P2p::run() [BEGIN]"); *self.state.lock().await = P2pState::Run; @@ -167,7 +172,34 @@ impl P2p { inbound.clone().start(executor.clone()).await?; let outbound = self.session_outbound().await; - outbound.clone().start(executor.clone()).await?; + + // Check if caller should be signalled + match init_signal { + Some(send_signal) => { + // Start normal initialization with signalling + let (init_send, init_recv) = async_channel::bounded::<()>(1); + outbound.clone().start(executor.clone(), Some(init_send)).await?; + // To verify that the network needs initialization, we check if we have seeds or peers configured. + if !(self.settings.seeds.is_empty() && self.settings.peers.is_empty()) { + // Then we retrieve pending channels to check if they have been filled by the seed. + // If pending channels are empty, we sleep for 2 seconds to mitigate any network delays. + if self.pending().await.is_empty() { + debug!(target: "net", "P2p::run() sleeping waiting for hosts by seed."); + async_util::sleep(2).await; + } + + // We check again to see if we have any pending connections. + if !self.pending().await.is_empty() { + // We wait for outbound connections to finish initilizing. + init_recv.recv().await?; + } + } + + // Signal caller that network has been initialized + send_signal.send(()).await?; + } + None => outbound.clone().start(executor.clone(), None).await?, + } let stop_sub = self.subscribe_stop().await; // Wait for stop signal @@ -265,7 +297,18 @@ impl P2p { self.stop_subscriber.clone().subscribe().await } + /// Retrieve channels pub fn channels(&self) -> &ConnectedChannels { &self.channels } + + /// Retrieve pending channels, without our own external addresses + pub async fn pending(&self) -> FxHashSet { + let self_inbound_addr = self.settings().external_addr.clone(); + let mut pending = self.pending.lock().await.clone(); + for addr in self_inbound_addr { + pending.remove(&addr); + } + pending + } } diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index f6053627e..7309eb72a 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -91,7 +91,11 @@ impl OutboundSession { } /// Start the outbound session. Runs the channel connect loop. - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { + pub async fn start( + self: Arc, + executor: Arc>, + init_signal: Option>, + ) -> Result<()> { let slots_count = self.p2p().settings().outbound_connections; info!(target: "net", "Starting {} outbound connection slots.", slots_count); // Activate mutex lock on connection slots. @@ -103,7 +107,7 @@ impl OutboundSession { let task = StoppableTask::new(); task.clone().start( - self.clone().channel_connect_loop(i, executor.clone()), + self.clone().channel_connect_loop(i, executor.clone(), init_signal.clone()), // Ignore stop handler |_| async {}, Error::NetworkServiceStopped, @@ -134,6 +138,7 @@ impl OutboundSession { self: Arc, slot_number: u32, executor: Arc>, + init_signal: Option>, ) -> Result<()> { let parent = Arc::downgrade(&self); @@ -172,6 +177,8 @@ impl OutboundSession { info.state = OutboundState::Connected; } + async_util::notify_caller(init_signal.clone()); + // Wait for channel to close stop_sub.unwrap().receive().await; } @@ -183,6 +190,8 @@ impl OutboundSession { info.channel = None; info.state = OutboundState::Open; } + + async_util::notify_caller(init_signal.clone()); } } } diff --git a/src/util/async_util.rs b/src/util/async_util.rs index 6ae95336e..4b4cf3f95 100644 --- a/src/util/async_util.rs +++ b/src/util/async_util.rs @@ -1,3 +1,4 @@ +use log::error; use smol::Timer; use std::time::Duration; @@ -5,3 +6,13 @@ use std::time::Duration; pub async fn sleep(seconds: u64) { Timer::after(Duration::from_secs(seconds)).await; } + +/// Auxillary function to reduce boilerplate of sending +/// a message to an optional channel, to notify caller. +pub fn notify_caller(signal: Option>) { + if let Some(sender) = signal.clone() { + if let Err(err) = sender.try_send(()) { + error!(target: "net", "Init signal send error: {}", err); + } + } +}