p2pnet: 5b7306af71 reverted

This commit is contained in:
aggstam
2022-08-25 17:04:48 +03:00
parent 4f62cc5bcf
commit e8c516ec62
16 changed files with 37 additions and 103 deletions

View File

@@ -395,20 +395,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, darkfid.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
sync_p2p.clone().unwrap().start(ex.clone()).await?;
let _ex = ex.clone();
let _sync_p2p = sync_p2p.clone();
ex.spawn(async move {
if let Err(e) = _sync_p2p.unwrap().run(_ex, Some(init_send)).await {
if let Err(e) = _sync_p2p.unwrap().run(_ex).await {
error!("Failed starting sync P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Sync P2P network initialized!");
match block_sync_task(sync_p2p.clone().unwrap(), state.clone()).await {
Ok(()) => *darkfid.synced.lock().await = true,
Err(e) => error!("Failed syncing blockchain: {}", e),
@@ -417,20 +413,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// Consensus protocol
if args.consensus && *darkfid.synced.lock().await {
info!("Starting consensus P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
consensus_p2p.clone().unwrap().start(ex.clone()).await?;
let _ex = ex.clone();
let _consensus_p2p = consensus_p2p.clone();
ex.spawn(async move {
if let Err(e) = _consensus_p2p.unwrap().run(_ex, Some(init_send)).await {
if let Err(e) = _consensus_p2p.unwrap().run(_ex).await {
error!("Failed starting consensus P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Consensus P2P network initialized!");
info!("Starting consensus protocol task");
ex.spawn(proposal_task(consensus_p2p.unwrap(), sync_p2p.unwrap(), state)).detach();
} else {

View File

@@ -550,7 +550,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone(), None)).detach();
executor.spawn(p2p.clone().run(executor.clone())).detach();
//
// Darkwiki start

View File

@@ -399,20 +399,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, faucetd.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
sync_p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _sync_p2p = sync_p2p.clone();
ex.spawn(async move {
if let Err(e) = _sync_p2p.run(_ex, Some(init_send)).await {
if let Err(e) = _sync_p2p.run(_ex).await {
error!("Failed starting sync P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("Sync P2P network initialized!");
match block_sync_task(sync_p2p.clone(), state.clone()).await {
Ok(()) => *faucetd.synced.lock().await = true,
Err(e) => error!("Failed syncing blockchain: {}", e),

View File

@@ -196,7 +196,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
let executor_cloned = executor.clone();
let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone(), None));
let p2p_run_task = executor_cloned.spawn(p2p.clone().run(executor.clone()));
//
// RPC interface

View File

@@ -219,7 +219,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
let executor_cloned = executor.clone();
executor_cloned.spawn(p2p.clone().run(executor.clone(), None)).detach();
executor_cloned.spawn(p2p.clone().run(executor.clone())).detach();
//
// RPC interface

View File

@@ -52,7 +52,7 @@ async fn spawn_network(
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
ex.spawn(async move {
if let Err(e) = p2p.run(_ex, None).await {
if let Err(e) = p2p.run(_ex).await {
error!("Failed starting P2P network seed: {}", e);
}
})

View File

@@ -205,7 +205,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone(), None)).detach();
executor.spawn(p2p.clone().run(executor.clone())).detach();
//
// RPC interface

View File

@@ -103,7 +103,7 @@ impl Dchat {
self.register_protocol(self.recv_msgs.clone()).await?;
self.p2p.clone().start(ex.clone()).await?;
ex2.spawn(self.p2p.clone().run(ex.clone(), None)).detach();
ex2.spawn(self.p2p.clone().run(ex.clone())).detach();
self.menu().await?;

View File

@@ -14,7 +14,7 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
let p2p = net::P2p::new(options.network_settings).await;
p2p.clone().start(executor.clone()).await?;
p2p.run(executor, None).await?;
p2p.run(executor).await?;
Ok(())
}

View File

@@ -288,7 +288,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex, None).await {
if let Err(e) = _p2p.run(_ex).await {
error!("Failed starting P2P network: {}", e);
}
})

View File

@@ -45,9 +45,23 @@ Run fud as follows:
13:23:04 [INFO] Entry: seedd_config.toml
13:23:04 [INFO] Starting 8 outbound connection slots.
13:23:04 [INFO] Entry: lt.py
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #0
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #1
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #2
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #3
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #6
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #4
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #5
13:23:04 [WARN] Hosts address pool is empty. Retrying connect slot #7
13:23:07 [INFO] Caught termination signal, cleaning up and exiting...
```
After daemon has been initialized, execute network syncing as follows:
```
% fu sync
13:25:46 [INFO] Daemon synced successfully!
```
fu
=======

View File

@@ -102,12 +102,6 @@ impl Fud {
let entries = fs::read_dir(&self.folder).unwrap();
{
let mut lock = self.dht.write().await;
// Sync lookup map with network
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
for entry in entries {
let e = entry.unwrap();
let name = String::from(e.file_name().to_str().unwrap());
@@ -202,6 +196,11 @@ impl Fud {
let records = lock.map.clone();
let mut entries_hashes = HashSet::new();
// Sync lookup map with network
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
// We iterate files for new records
for entry in entries {
let e = entry.unwrap();
@@ -321,7 +320,7 @@ impl Fud {
}
}
}
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
@@ -396,20 +395,16 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
ex.spawn(listen_and_serve(args.rpc_listen, fud.clone())).detach();
info!("Starting sync P2P network");
let (init_send, init_recv) = async_channel::bounded::<()>(1);
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex, Some(init_send)).await {
if let Err(e) = _p2p.run(_ex).await {
error!("Failed starting P2P network: {}", e);
}
})
.detach();
init_recv.recv().await?;
info!("P2P network initialized!");
fud.init().await?;
// Wait for SIGINT

View File

@@ -153,7 +153,7 @@ async fn start(args: Args, executor: Arc<Executor<'_>>) -> Result<()> {
})
.await;
p2p.clone().start(executor.clone(), None).await?;
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone())).detach();

View File

@@ -9,7 +9,6 @@ use url::Url;
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
util::async_util,
Result,
};
@@ -154,11 +153,7 @@ impl P2p {
/// Runs the network. Starts inbound, outbound and manual sessions.
/// Waits for a stop signal and stops the network if received.
pub async fn run(
self: Arc<Self>,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net", "P2p::run() [BEGIN]");
*self.state.lock().await = P2pState::Run;
@@ -172,34 +167,7 @@ impl P2p {
inbound.clone().start(executor.clone()).await?;
let outbound = self.session_outbound().await;
// Check if caller should be signalled
match init_signal {
Some(send_signal) => {
// Start normal initialization with signalling
let (init_send, init_recv) = async_channel::bounded::<()>(1);
outbound.clone().start(executor.clone(), Some(init_send)).await?;
// To verify that the network needs initialization, we check if we have seeds or peers configured.
if !(self.settings.seeds.is_empty() && self.settings.peers.is_empty()) {
// Then we retrieve pending channels to check if they have been filled by the seed.
// If pending channels are empty, we sleep for 2 seconds to mitigate any network delays.
if self.pending().await.is_empty() {
debug!(target: "net", "P2p::run() sleeping waiting for hosts by seed.");
async_util::sleep(2).await;
}
// We check again to see if we have any pending connections.
if !self.pending().await.is_empty() {
// We wait for outbound connections to finish initilizing.
init_recv.recv().await?;
}
}
// Signal caller that network has been initialized
send_signal.send(()).await?;
}
None => outbound.clone().start(executor.clone(), None).await?,
}
outbound.clone().start(executor.clone()).await?;
let stop_sub = self.subscribe_stop().await;
// Wait for stop signal
@@ -297,18 +265,7 @@ impl P2p {
self.stop_subscriber.clone().subscribe().await
}
/// Retrieve channels
pub fn channels(&self) -> &ConnectedChannels {
&self.channels
}
/// Retrieve pending channels, without our own external addresses
pub async fn pending(&self) -> FxHashSet<Url> {
let self_inbound_addr = self.settings().external_addr.clone();
let mut pending = self.pending.lock().await.clone();
for addr in self_inbound_addr {
pending.remove(&addr);
}
pending
}
}

View File

@@ -91,11 +91,7 @@ impl OutboundSession {
}
/// Start the outbound session. Runs the channel connect loop.
pub async fn start(
self: Arc<Self>,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
let slots_count = self.p2p().settings().outbound_connections;
info!(target: "net", "Starting {} outbound connection slots.", slots_count);
// Activate mutex lock on connection slots.
@@ -107,7 +103,7 @@ impl OutboundSession {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(i, executor.clone(), init_signal.clone()),
self.clone().channel_connect_loop(i, executor.clone()),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
@@ -138,7 +134,6 @@ impl OutboundSession {
self: Arc<Self>,
slot_number: u32,
executor: Arc<Executor<'_>>,
init_signal: Option<async_channel::Sender<()>>,
) -> Result<()> {
let parent = Arc::downgrade(&self);
@@ -177,8 +172,6 @@ impl OutboundSession {
info.state = OutboundState::Connected;
}
async_util::notify_caller(init_signal.clone());
// Wait for channel to close
stop_sub.unwrap().receive().await;
}
@@ -190,8 +183,6 @@ impl OutboundSession {
info.channel = None;
info.state = OutboundState::Open;
}
async_util::notify_caller(init_signal.clone());
}
}
}

View File

@@ -1,4 +1,3 @@
use log::error;
use smol::Timer;
use std::time::Duration;
@@ -6,13 +5,3 @@ use std::time::Duration;
pub async fn sleep(seconds: u64) {
Timer::after(Duration::from_secs(seconds)).await;
}
/// Auxillary function to reduce boilerplate of sending
/// a message to an optional channel, to notify caller.
pub fn notify_caller(signal: Option<async_channel::Sender<()>>) {
if let Some(sender) = signal {
if let Err(err) = sender.try_send(()) {
error!(target: "net", "Init signal send error: {}", err);
}
}
}