p2pnet: implemented caller signaling

This commit is contained in:
aggstam
2022-08-24 21:29:19 +03:00
parent 09c23315f5
commit 5b7306af71
16 changed files with 103 additions and 37 deletions

View File

@@ -395,16 +395,20 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, darkfid.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
sync_p2p.clone().unwrap().start(ex.clone()).await?;
let _ex = ex.clone();
let _sync_p2p = sync_p2p.clone();
ex.spawn(async move {
if let Err(e) = _sync_p2p.unwrap().run(_ex).await {
if let Err(e) = _sync_p2p.unwrap().run(_ex, Some(init_send)).await {
error!("Failed starting sync P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Sync P2P network initialized!");
match block_sync_task(sync_p2p.clone().unwrap(), state.clone()).await {
Ok(()) => *darkfid.synced.lock().await = true,
Err(e) => error!("Failed syncing blockchain: {}", e),
@@ -413,16 +417,20 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// Consensus protocol
if args.consensus && *darkfid.synced.lock().await {
info!("Starting consensus P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
consensus_p2p.clone().unwrap().start(ex.clone()).await?;
let _ex = ex.clone();
let _consensus_p2p = consensus_p2p.clone();
ex.spawn(async move {
if let Err(e) = _consensus_p2p.unwrap().run(_ex).await {
if let Err(e) = _consensus_p2p.unwrap().run(_ex, Some(init_send)).await {
error!("Failed starting consensus P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Consensus P2P network initialized!");
info!("Starting consensus protocol task");
ex.spawn(proposal_task(consensus_p2p.unwrap(), sync_p2p.unwrap(), state)).detach();
} else {

View File

@@ -551,7 +551,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone())).detach();
executor.spawn(p2p.clone().run(executor.clone(), None)).detach();
//
// Darkwiki start

View File

@@ -399,16 +399,20 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, faucetd.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
sync_p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _sync_p2p = sync_p2p.clone();
ex.spawn(async move {
if let Err(e) = _sync_p2p.run(_ex).await {
if let Err(e) = _sync_p2p.run(_ex, Some(init_send)).await {
error!("Failed starting sync P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Sync P2P network initialized!");
match block_sync_task(sync_p2p.clone(), state.clone()).await {
Ok(()) => *faucetd.synced.lock().await = true,
Err(e) => error!("Failed syncing blockchain: {}", e),

View File

@@ -196,7 +196,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
let executor_cloned = executor.clone();
let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone()));
let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone(), None));
//
// RPC interface

View File

@@ -219,7 +219,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
let executor_cloned = executor.clone();
executor_cloned.spawn(p2p.clone().run(executor.clone())).detach();
executor_cloned.spawn(p2p.clone().run(executor.clone(), None)).detach();
//
// RPC interface

View File

@@ -51,7 +51,7 @@ async fn spawn_network(
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
ex.spawn(async move {
if let Err(e) = p2p.run(_ex).await {
if let Err(e) = p2p.run(_ex, None).await {
error!("Failed starting P2P network seed: {}", e);
}
})

View File

@@ -217,7 +217,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone())).detach();
executor.spawn(p2p.clone().run(executor.clone(), None)).detach();
//
// Waiting Exit signal

View File

@@ -103,7 +103,7 @@ impl Dchat {
self.register_protocol(self.recv_msgs.clone()).await?;
self.p2p.clone().start(ex.clone()).await?;
ex2.spawn(self.p2p.clone().run(ex.clone())).detach();
ex2.spawn(self.p2p.clone().run(ex.clone(), None)).detach();
self.menu().await?;

View File

@@ -14,7 +14,7 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
let p2p = net::P2p::new(options.network_settings).await;
p2p.clone().start(executor.clone()).await?;
p2p.run(executor).await?;
p2p.run(executor, None).await?;
Ok(())
}

View File

@@ -287,7 +287,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
if let Err(e) = _p2p.run(_ex, None).await {
error!("Failed starting P2P network: {}", e);
}
})

View File

@@ -45,23 +45,9 @@ Run fud as follows:
13:23:04 [INFO] Entry: seedd_config.toml
13:23:04 [INFO] Starting 8 outbound connection slots.
13:23:04 [INFO] Entry: lt.py
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #0
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #1
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #2
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #3
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #6
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #4
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #5
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #7
13:23:07 [INFO] Caught termination signal, cleaning up and exiting...
```
After daemon has been initialized, execute network syncing as follows:
```
% fu sync
13:25:46 [INFO] Daemon synced successfully!
```
fu
=======

View File

@@ -102,6 +102,12 @@ impl Fud {
let entries = fs::read_dir(&self.folder).unwrap();
{
let mut lock = self.dht.write().await;
// Sync lookup map with network
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
for entry in entries {
let e = entry.unwrap();
let name = String::from(e.file_name().to_str().unwrap());
@@ -196,11 +202,6 @@ impl Fud {
let records = lock.map.clone();
let mut entries_hashes = HashSet::new();
// Sync lookup map with network
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
// We iterate files for new records
for entry in entries {
let e = entry.unwrap();
@@ -320,7 +321,7 @@ impl Fud {
}
}
}
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
@@ -395,16 +396,20 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, fud.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
if let Err(e) = _p2p.run(_ex, Some(init_send)).await {
error!("Failed starting P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("P2P network initialized!");
fud.init().await?;
// Wait for SIGINT

View File

@@ -153,7 +153,7 @@ async fn start(args: Args, executor: Arc<Executor<'_>>) -> Result<()> {
})
.await;
p2p.clone().start(executor.clone()).await?;
p2p.clone().start(executor.clone(), None).await?;
executor.spawn(p2p.clone().run(executor.clone())).detach();

View File

@@ -9,6 +9,7 @@ use url::Url;
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
util::async_util,
Result,
};
@@ -153,7 +154,11 @@ impl P2p {
/// Runs the network. Starts inbound, outbound and manual sessions.
/// Waits for a stop signal and stops the network if received.
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn run(
self: Arc<Self>,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
debug!(target: "net", "P2p::run() [BEGIN]");
*self.state.lock().await = P2pState::Run;
@@ -167,7 +172,34 @@ impl P2p {
inbound.clone().start(executor.clone()).await?;
let outbound = self.session_outbound().await;
outbound.clone().start(executor.clone()).await?;
// Check if caller should be signalled
match init_signal {
Some(send_signal) => {
// Start normal initialization with signalling
let (init_send, init_recv) = async_channel::bounded::<()>(1);
outbound.clone().start(executor.clone(), Some(init_send)).await?;
// To verify that the network needs initialization, we check if we have seeds or peers configured.
if !(self.settings.seeds.is_empty() && self.settings.peers.is_empty()) {
// Then we retrieve pending channels to check if they have been filled by the seed.
// If pending channels are empty, we sleep for 2 seconds to mitigate any network delays.
if self.pending().await.is_empty() {
debug!(target: "net", "P2p::run() sleeping waiting for hosts by seed.");
async_util::sleep(2).await;
}
// We check again to see if we have any pending connections.
if !self.pending().await.is_empty() {
// We wait for outbound connections to finish initilizing.
init_recv.recv().await?;
}
}
// Signal caller that network has been initialized
send_signal.send(()).await?;
}
None => outbound.clone().start(executor.clone(), None).await?,
}
let stop_sub = self.subscribe_stop().await;
// Wait for stop signal
@@ -265,7 +297,18 @@ impl P2p {
self.stop_subscriber.clone().subscribe().await
}
/// Retrieve channels
pub fn channels(&self) -> &ConnectedChannels {
&self.channels
}
/// Retrieve pending channels, without our own external addresses
pub async fn pending(&self) -> FxHashSet<Url> {
let self_inbound_addr = self.settings().external_addr.clone();
let mut pending = self.pending.lock().await.clone();
for addr in self_inbound_addr {
pending.remove(&addr);
}
pending
}
}

View File

@@ -91,7 +91,11 @@ impl OutboundSession {
}
/// Start the outbound session. Runs the channel connect loop.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn start(
self: Arc<Self>,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
let slots_count = self.p2p().settings().outbound_connections;
info!(target: "net", "Starting {} outbound connection slots.", slots_count);
// Activate mutex lock on connection slots.
@@ -103,7 +107,7 @@ impl OutboundSession {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(i, executor.clone()),
self.clone().channel_connect_loop(i, executor.clone(), init_signal.clone()),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
@@ -134,6 +138,7 @@ impl OutboundSession {
self: Arc<Self>,
slot_number: u32,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
let parent = Arc::downgrade(&self);
@@ -172,6 +177,8 @@ impl OutboundSession {
info.state = OutboundState::Connected;
}
async_util::notify_caller(init_signal.clone());
// Wait for channel to close
stop_sub.unwrap().receive().await;
}
@@ -183,6 +190,8 @@ impl OutboundSession {
info.channel = None;
info.state = OutboundState::Open;
}
async_util::notify_caller(init_signal.clone());
}
}
}

View File

@@ -1,3 +1,4 @@
use log::error;
use smol::Timer;
use std::time::Duration;
@@ -5,3 +6,13 @@ use std::time::Duration;
pub async fn sleep(seconds: u64) {
Timer::after(Duration::from_secs(seconds)).await;
}
/// Auxillary function to reduce boilerplate of sending
/// a message to an optional channel, to notify caller.
pub fn notify_caller(signal: Option<async_channel::Sender<()>>) {
if let Some(sender) = signal.clone() {
if let Err(err) = sender.try_send(()) {
error!(target: "net", "Init signal send error: {}", err);
}
}
}