diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 42be878cb..4e7d52fd7 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -395,20 +395,16 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, darkfid.clone())).detach(); info!("Starting sync P2P network"); - let (init_send, init_recv) = async_channel::bounded::<()>(1); sync_p2p.clone().unwrap().start(ex.clone()).await?; let _ex = ex.clone(); let _sync_p2p = sync_p2p.clone(); ex.spawn(async move { - if let Err(e) = _sync_p2p.unwrap().run(_ex, Some(init_send)).await { + if let Err(e) = _sync_p2p.unwrap().run(_ex).await { error!("Failed starting sync P2P network: {}", e); } }) .detach(); - init_recv.recv().await?; - info!("Sync P2P network initialized!"); - match block_sync_task(sync_p2p.clone().unwrap(), state.clone()).await { Ok(()) => *darkfid.synced.lock().await = true, Err(e) => error!("Failed syncing blockchain: {}", e), @@ -417,20 +413,16 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Consensus protocol if args.consensus && *darkfid.synced.lock().await { info!("Starting consensus P2P network"); - let (init_send, init_recv) = async_channel::bounded::<()>(1); consensus_p2p.clone().unwrap().start(ex.clone()).await?; let _ex = ex.clone(); let _consensus_p2p = consensus_p2p.clone(); ex.spawn(async move { - if let Err(e) = _consensus_p2p.unwrap().run(_ex, Some(init_send)).await { + if let Err(e) = _consensus_p2p.unwrap().run(_ex).await { error!("Failed starting consensus P2P network: {}", e); } }) .detach(); - init_recv.recv().await?; - info!("Consensus P2P network initialized!"); - info!("Starting consensus protocol task"); ex.spawn(proposal_task(consensus_p2p.unwrap(), sync_p2p.unwrap(), state)).detach(); } else { diff --git a/bin/darkwikid/src/main.rs b/bin/darkwikid/src/main.rs index 1822da041..ae97dabd9 100644 --- a/bin/darkwikid/src/main.rs +++ b/bin/darkwikid/src/main.rs @@ -550,7 +550,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; - executor.spawn(p2p.clone().run(executor.clone(), None)).detach(); + executor.spawn(p2p.clone().run(executor.clone())).detach(); // // Darkwiki start diff --git a/bin/faucetd/src/main.rs b/bin/faucetd/src/main.rs index c66882377..0a249bd4d 100644 --- a/bin/faucetd/src/main.rs +++ b/bin/faucetd/src/main.rs @@ -399,20 +399,16 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, faucetd.clone())).detach(); info!("Starting sync P2P network"); - let (init_send, init_recv) = async_channel::bounded::<()>(1); sync_p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); let _sync_p2p = sync_p2p.clone(); ex.spawn(async move { - if let Err(e) = _sync_p2p.run(_ex, Some(init_send)).await { + if let Err(e) = _sync_p2p.run(_ex).await { error!("Failed starting sync P2P network: {}", e); } }) .detach(); - init_recv.recv().await?; - info!("Sync P2P network initialized!"); - match block_sync_task(sync_p2p.clone(), state.clone()).await { Ok(()) => *faucetd.synced.lock().await = true, Err(e) => error!("Failed syncing blockchain: {}", e), diff --git a/bin/irc-raft/src/main.rs b/bin/irc-raft/src/main.rs index e2e134e69..7733355aa 100644 --- a/bin/irc-raft/src/main.rs +++ b/bin/irc-raft/src/main.rs @@ -196,7 +196,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; let executor_cloned = executor.clone(); - let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone(), None)); + let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone())); // // RPC interface diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 32625fd90..e82e37e77 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -219,7 +219,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; let executor_cloned = executor.clone(); - executor_cloned.spawn(p2p.clone().run(executor.clone(), None)).detach(); + executor_cloned.spawn(p2p.clone().run(executor.clone())).detach(); // // RPC interface diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index a3fcc7743..9203f5d97 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -52,7 +52,7 @@ async fn spawn_network( p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); ex.spawn(async move { - if let Err(e) = p2p.run(_ex, None).await { + if let Err(e) = p2p.run(_ex).await { error!("Failed starting P2P network seed: {}", e); } }) diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index ec749ac19..99bbd4a4a 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -205,7 +205,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { p2p.clone().start(executor.clone()).await?; - executor.spawn(p2p.clone().run(executor.clone(), None)).detach(); + executor.spawn(p2p.clone().run(executor.clone())).detach(); // // RPC interface diff --git a/example/dchat/src/main.rs b/example/dchat/src/main.rs index 042f03ffe..16e9b4912 100644 --- a/example/dchat/src/main.rs +++ b/example/dchat/src/main.rs @@ -103,7 +103,7 @@ impl Dchat { self.register_protocol(self.recv_msgs.clone()).await?; self.p2p.clone().start(ex.clone()).await?; - ex2.spawn(self.p2p.clone().run(ex.clone(), None)).detach(); + ex2.spawn(self.p2p.clone().run(ex.clone())).detach(); self.menu().await?; diff --git a/example/net.rs b/example/net.rs index f60db5625..d3d2ab9e0 100644 --- a/example/net.rs +++ b/example/net.rs @@ -14,7 +14,7 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( let p2p = net::P2p::new(options.network_settings).await; p2p.clone().start(executor.clone()).await?; - p2p.run(executor, None).await?; + p2p.run(executor).await?; Ok(()) } diff --git a/script/research/dhtd/src/main.rs b/script/research/dhtd/src/main.rs index fbb869c58..cde536180 100644 --- a/script/research/dhtd/src/main.rs +++ b/script/research/dhtd/src/main.rs @@ -288,7 +288,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let _ex = ex.clone(); let _p2p = p2p.clone(); ex.spawn(async move { - if let Err(e) = _p2p.run(_ex, None).await { + if let Err(e) = _p2p.run(_ex).await { error!("Failed starting P2P network: {}", e); } }) diff --git a/script/research/fud/README.md b/script/research/fud/README.md index d5cc76bdd..f23c09e47 100644 --- a/script/research/fud/README.md +++ b/script/research/fud/README.md @@ -45,9 +45,23 @@ Run fud as follows: 13:23:04 [INFO] Entry: seedd_config.toml 13:23:04 [INFO] Starting 8 outbound connection slots. 13:23:04 [INFO] Entry: lt.py +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #0 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #1 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #2 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #3 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #6 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #4 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #5 +13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #7 13:23:07 [INFO] Caught termination signal, cleaning up and exiting... ``` +After daemon has been initialized, execute network syncing as follows: +``` +% fu sync +13:25:46 [INFO] Daemon synced successfully! +``` + fu ======= diff --git a/script/research/fud/fud/src/main.rs b/script/research/fud/fud/src/main.rs index f7178b990..be247a57a 100644 --- a/script/research/fud/fud/src/main.rs +++ b/script/research/fud/fud/src/main.rs @@ -102,12 +102,6 @@ impl Fud { let entries = fs::read_dir(&self.folder).unwrap(); { let mut lock = self.dht.write().await; - - // Sync lookup map with network - if let Err(e) = lock.sync_lookup_map().await { - error!("Failed to sync lookup map: {}", e); - } - for entry in entries { let e = entry.unwrap(); let name = String::from(e.file_name().to_str().unwrap()); @@ -202,6 +196,11 @@ impl Fud { let records = lock.map.clone(); let mut entries_hashes = HashSet::new(); + // Sync lookup map with network + if let Err(e) = lock.sync_lookup_map().await { + error!("Failed to sync lookup map: {}", e); + } + // We iterate files for new records for entry in entries { let e = entry.unwrap(); @@ -321,7 +320,7 @@ impl Fud { } } } - + // RPCAPI: // Replies to a ping method. // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} @@ -396,20 +395,16 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.spawn(listen_and_serve(args.rpc_listen, fud.clone())).detach(); info!("Starting sync P2P network"); - let (init_send, init_recv) = async_channel::bounded::<()>(1); p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); let _p2p = p2p.clone(); ex.spawn(async move { - if let Err(e) = _p2p.run(_ex, Some(init_send)).await { + if let Err(e) = _p2p.run(_ex).await { error!("Failed starting P2P network: {}", e); } }) .detach(); - init_recv.recv().await?; - info!("P2P network initialized!"); - fud.init().await?; // Wait for SIGINT diff --git a/script/research/raft-diag/src/main.rs b/script/research/raft-diag/src/main.rs index c7fadbe5e..677b73ee7 100644 --- a/script/research/raft-diag/src/main.rs +++ b/script/research/raft-diag/src/main.rs @@ -153,7 +153,7 @@ async fn start(args: Args, executor: Arc>) -> Result<()> { }) .await; - p2p.clone().start(executor.clone(), None).await?; + p2p.clone().start(executor.clone()).await?; executor.spawn(p2p.clone().run(executor.clone())).detach(); diff --git a/src/net/p2p.rs b/src/net/p2p.rs index ebd279234..8ff76b8b2 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -9,7 +9,6 @@ use url::Url; use crate::{ system::{Subscriber, SubscriberPtr, Subscription}, - util::async_util, Result, }; @@ -154,11 +153,7 @@ impl P2p { /// Runs the network. Starts inbound, outbound and manual sessions. /// Waits for a stop signal and stops the network if received. - pub async fn run( - self: Arc, - executor: Arc>, - init_signal: Option>, - ) -> Result<()> { + pub async fn run(self: Arc, executor: Arc>) -> Result<()> { debug!(target: "net", "P2p::run() [BEGIN]"); *self.state.lock().await = P2pState::Run; @@ -172,34 +167,7 @@ impl P2p { inbound.clone().start(executor.clone()).await?; let outbound = self.session_outbound().await; - - // Check if caller should be signalled - match init_signal { - Some(send_signal) => { - // Start normal initialization with signalling - let (init_send, init_recv) = async_channel::bounded::<()>(1); - outbound.clone().start(executor.clone(), Some(init_send)).await?; - // To verify that the network needs initialization, we check if we have seeds or peers configured. - if !(self.settings.seeds.is_empty() && self.settings.peers.is_empty()) { - // Then we retrieve pending channels to check if they have been filled by the seed. - // If pending channels are empty, we sleep for 2 seconds to mitigate any network delays. - if self.pending().await.is_empty() { - debug!(target: "net", "P2p::run() sleeping waiting for hosts by seed."); - async_util::sleep(2).await; - } - - // We check again to see if we have any pending connections. - if !self.pending().await.is_empty() { - // We wait for outbound connections to finish initilizing. - init_recv.recv().await?; - } - } - - // Signal caller that network has been initialized - send_signal.send(()).await?; - } - None => outbound.clone().start(executor.clone(), None).await?, - } + outbound.clone().start(executor.clone()).await?; let stop_sub = self.subscribe_stop().await; // Wait for stop signal @@ -297,18 +265,7 @@ impl P2p { self.stop_subscriber.clone().subscribe().await } - /// Retrieve channels pub fn channels(&self) -> &ConnectedChannels { &self.channels } - - /// Retrieve pending channels, without our own external addresses - pub async fn pending(&self) -> FxHashSet { - let self_inbound_addr = self.settings().external_addr.clone(); - let mut pending = self.pending.lock().await.clone(); - for addr in self_inbound_addr { - pending.remove(&addr); - } - pending - } } diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 7309eb72a..f6053627e 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -91,11 +91,7 @@ impl OutboundSession { } /// Start the outbound session. Runs the channel connect loop. - pub async fn start( - self: Arc, - executor: Arc>, - init_signal: Option>, - ) -> Result<()> { + pub async fn start(self: Arc, executor: Arc>) -> Result<()> { let slots_count = self.p2p().settings().outbound_connections; info!(target: "net", "Starting {} outbound connection slots.", slots_count); // Activate mutex lock on connection slots. @@ -107,7 +103,7 @@ impl OutboundSession { let task = StoppableTask::new(); task.clone().start( - self.clone().channel_connect_loop(i, executor.clone(), init_signal.clone()), + self.clone().channel_connect_loop(i, executor.clone()), // Ignore stop handler |_| async {}, Error::NetworkServiceStopped, @@ -138,7 +134,6 @@ impl OutboundSession { self: Arc, slot_number: u32, executor: Arc>, - init_signal: Option>, ) -> Result<()> { let parent = Arc::downgrade(&self); @@ -177,8 +172,6 @@ impl OutboundSession { info.state = OutboundState::Connected; } - async_util::notify_caller(init_signal.clone()); - // Wait for channel to close stop_sub.unwrap().receive().await; } @@ -190,8 +183,6 @@ impl OutboundSession { info.channel = None; info.state = OutboundState::Open; } - - async_util::notify_caller(init_signal.clone()); } } } diff --git a/src/util/async_util.rs b/src/util/async_util.rs index 237f4343c..6ae95336e 100644 --- a/src/util/async_util.rs +++ b/src/util/async_util.rs @@ -1,4 +1,3 @@ -use log::error; use smol::Timer; use std::time::Duration; @@ -6,13 +5,3 @@ use std::time::Duration; pub async fn sleep(seconds: u64) { Timer::after(Duration::from_secs(seconds)).await; } - -/// Auxillary function to reduce boilerplate of sending -/// a message to an optional channel, to notify caller. -pub fn notify_caller(signal: Option>) { - if let Some(sender) = signal { - if let Err(err) = sender.try_send(()) { - error!(target: "net", "Init signal send error: {}", err); - } - } -}