mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-09 14:48:08 -05:00
event_graph: replace the rest of timeout() with receive_with_timeout()
This commit is contained in:
@@ -20,7 +20,6 @@ use std::{
|
|||||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use darkfi_serial::{deserialize_async, serialize_async};
|
use darkfi_serial::{deserialize_async, serialize_async};
|
||||||
@@ -40,10 +39,7 @@ use crate::{
|
|||||||
jsonrpc::{JsonResponse, JsonResult},
|
jsonrpc::{JsonResponse, JsonResult},
|
||||||
util::json_map,
|
util::json_map,
|
||||||
},
|
},
|
||||||
system::{
|
system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
|
||||||
msleep, timeout::timeout, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr,
|
|
||||||
Subscription,
|
|
||||||
},
|
|
||||||
Error, Result,
|
Error, Result,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -358,34 +354,20 @@ impl EventGraph {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
let parent = match timeout(
|
// Node waits for response
|
||||||
Duration::from_secs(self.p2p.settings().read().await.outbound_connect_timeout),
|
let Ok(parent) = ev_rep_sub
|
||||||
ev_rep_sub.receive(),
|
.receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
|
||||||
)
|
.await
|
||||||
.await
|
else {
|
||||||
{
|
error!(
|
||||||
Ok(parent) => parent,
|
target: "event_graph::dag_sync()",
|
||||||
Err(_) => {
|
"[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
|
||||||
error!(
|
missing_parents, url,
|
||||||
target: "event_graph::dag_sync()",
|
);
|
||||||
"[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
|
continue
|
||||||
missing_parents, url,
|
|
||||||
);
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let parents = match parent {
|
let parents = parent.0.clone();
|
||||||
Ok(v) => v.0.clone(),
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
target: "event_graph::dag_sync()",
|
|
||||||
"[EVENTGRAPH] Sync: Failed receiving parents {:?}: {}",
|
|
||||||
missing_parents, e,
|
|
||||||
);
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for parent in parents {
|
for parent in parents {
|
||||||
let parent_id = parent.id();
|
let parent_id = parent.id();
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ use std::{
|
|||||||
atomic::{AtomicUsize, Ordering::SeqCst},
|
atomic::{AtomicUsize, Ordering::SeqCst},
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
|
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
|
||||||
@@ -30,7 +29,7 @@ use log::{debug, error, trace, warn};
|
|||||||
use smol::Executor;
|
use smol::Executor;
|
||||||
|
|
||||||
use super::{Event, EventGraphPtr, NULL_ID};
|
use super::{Event, EventGraphPtr, NULL_ID};
|
||||||
use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result};
|
use crate::{impl_p2p_message, net::*, Error, Result};
|
||||||
|
|
||||||
/// Malicious behaviour threshold. If the threshold is reached, we will
|
/// Malicious behaviour threshold. If the threshold is reached, we will
|
||||||
/// drop the peer from our P2P connection.
|
/// drop the peer from our P2P connection.
|
||||||
@@ -256,25 +255,23 @@ impl ProtocolEventGraph {
|
|||||||
.send(&EventReq(missing_parents.clone().into_iter().collect()))
|
.send(&EventReq(missing_parents.clone().into_iter().collect()))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let parents = match timeout(
|
// Node waits for response
|
||||||
Duration::from_secs(
|
let Ok(parents) = self
|
||||||
|
.ev_rep_sub
|
||||||
|
.receive_with_timeout(
|
||||||
self.event_graph.p2p.settings().read().await.outbound_connect_timeout,
|
self.event_graph.p2p.settings().read().await.outbound_connect_timeout,
|
||||||
),
|
)
|
||||||
self.ev_rep_sub.receive(),
|
.await
|
||||||
)
|
else {
|
||||||
.await
|
error!(
|
||||||
{
|
target: "event_graph::protocol::handle_event_put()",
|
||||||
Ok(parent) => parent?,
|
"[EVENTGRAPH] Timeout while waiting for parents {:?} from {}",
|
||||||
Err(_) => {
|
missing_parents, self.channel.address(),
|
||||||
error!(
|
);
|
||||||
target: "event_graph::protocol::handle_event_put()",
|
self.channel.stop().await;
|
||||||
"[EVENTGRAPH] Timeout while waiting for parents {:?} from {}",
|
return Err(Error::ChannelStopped)
|
||||||
missing_parents, self.channel.address(),
|
|
||||||
);
|
|
||||||
self.channel.stop().await;
|
|
||||||
return Err(Error::ChannelStopped)
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let parents = parents.0.clone();
|
let parents = parents.0.clone();
|
||||||
|
|
||||||
for parent in parents {
|
for parent in parents {
|
||||||
|
|||||||
Reference in New Issue
Block a user