src/event_graph: request and reply multiple events

This commit is contained in:
Dastan-glitch
2024-01-13 03:25:02 +03:00
parent 7c9b3549cf
commit f46eb4c0e4
2 changed files with 139 additions and 127 deletions

View File

@@ -287,64 +287,67 @@ impl EventGraph {
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
let mut received_events_hashes = HashSet::new();
while !missing_parents.is_empty() {
for parent_id in missing_parents.clone().iter() {
let mut found_event = false;
let mut found_event = false;
for channel in channels.iter() {
let url = channel.address();
for channel in channels.iter() {
let url = channel.address();
debug!(
target: "event_graph::dag_sync()",
"Requesting {} from {}...", parent_id, url,
);
debug!(
target: "event_graph::dag_sync()",
"Requesting {:?} from {}...", missing_parents, url,
);
let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
Ok(v) => v,
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
url, e,
);
continue
}
};
if let Err(e) = channel.send(&EventReq(*parent_id)).await {
let multi_ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
Ok(v) => v,
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed communicating EventReq({}) to {}: {}",
parent_id, url, e,
"[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
url, e,
);
continue
}
};
let parent = match timeout(REPLY_TIMEOUT, ev_rep_sub.receive()).await {
Ok(parent) => parent,
Err(_) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Timeout waiting for parent {} from {}",
parent_id, url,
);
continue
}
};
let request_missing_events = missing_parents.clone().into_iter().collect();
if let Err(e) = channel.send(&EventReq(request_missing_events)).await {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed communicating MultiEventReq({:?}) to {}: {}",
missing_parents, url, e,
);
continue
}
let parent = match parent {
Ok(v) => v.0.clone(),
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed receiving parent {}: {}",
parent_id, e,
);
continue
}
};
let parent = match timeout(REPLY_TIMEOUT, multi_ev_rep_sub.receive()).await {
Ok(parent) => parent,
Err(_) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
missing_parents, url,
);
continue
}
};
if &parent.id() != parent_id {
let parents = match parent {
Ok(v) => v.0.clone(),
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed receiving parents {:?}: {}",
missing_parents, e,
);
continue
}
};
for parent in parents {
let parent_id = parent.id();
if !missing_parents.contains(&parent_id) {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Peer {} replied with a wrong event: {}",
@@ -364,9 +367,9 @@ impl EventGraph {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
received_events_hashes.insert(parent_id);
missing_parents.remove(parent_id);
missing_parents.remove(&parent_id);
found_event = true;
// See if we have the upper parents
@@ -386,18 +389,19 @@ impl EventGraph {
missing_parents.insert(*upper_parent);
}
}
break
}
if !found_event {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed to get all events",
);
return Err(Error::DagSyncFailed)
}
break
}
if !found_event {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed to get all events",
);
return Err(Error::DagSyncFailed)
}
// }
} // <-- while !missing_parents.is_empty
// At this point we should've got all the events.

View File

@@ -69,12 +69,12 @@ impl_p2p_message!(EventPut, "EventGraph::EventPut");
/// A P2P message representing an event request
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventReq(pub blake3::Hash);
pub struct EventReq(pub Vec<blake3::Hash>);
impl_p2p_message!(EventReq, "EventGraph::EventReq");
/// A P2P message representing an event reply
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventRep(pub Event);
pub struct EventRep(pub Vec<Event>);
impl_p2p_message!(EventRep, "EventGraph::EventRep");
/// A P2P message representing a request for a peer's DAG tips
@@ -241,28 +241,32 @@ impl ProtocolEventGraph {
);
while !missing_parents.is_empty() {
for parent_id in missing_parents.clone().iter() {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Requesting {}...", parent_id,
);
// for parent_id in missing_parents.clone().iter() {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Requesting {:?}...", missing_parents,
);
self.channel.send(&EventReq(*parent_id)).await?;
let parent = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await {
Ok(parent) => parent?,
Err(_) => {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Timeout while waiting for parent {} from {}",
parent_id, self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
};
let parent = parent.0.clone();
self.channel
.send(&EventReq(missing_parents.clone().into_iter().collect()))
.await?;
let parents = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await {
Ok(parent) => parent?,
Err(_) => {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Timeout while waiting for parents {:?} from {}",
missing_parents, self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
};
let parents = parents.0.clone();
if &parent.id() != parent_id {
for parent in parents {
let parent_id = parent.id();
if !missing_parents.contains(&parent_id) {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} replied with a wrong event: {}",
@@ -283,9 +287,9 @@ impl ProtocolEventGraph {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
received_events_hashes.insert(parent_id);
missing_parents.remove(parent_id);
missing_parents.remove(&parent_id);
// See if we have the upper parents
for upper_parent in parent.parents.iter() {
@@ -349,13 +353,13 @@ impl ProtocolEventGraph {
/// This is triggered whenever someone requests an event from us.
async fn handle_event_req(self: Arc<Self>) -> Result<()> {
loop {
let event_id = match self.ev_req_sub.receive().await {
Ok(v) => v.0,
let event_ids = match self.ev_req_sub.receive().await {
Ok(v) => v.0.clone(),
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_event_req()",
"Got EventReq: {} [{}]", event_id, self.channel.address(),
target: "event_graph::protocol::handle_multi_event_req()",
"Got MultiEventReq: {:?} [{}]", event_ids, self.channel.address(),
);
// Check if node has finished syncing its DAG
@@ -378,63 +382,67 @@ impl ProtocolEventGraph {
// I dunno if this is a good idea, but it seems it will help
// against malicious event requests where they want us to keep
// reading our db and steal our bandwidth.
if !self.event_graph.broadcasted_ids.read().await.contains(&event_id) {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
let mut events = vec![];
for event_id in event_ids {
if !self.event_graph.broadcasted_ids.read().await.contains(&event_id) {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::protocol::handle_event_req()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_req()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
"[EVENTGRAPH] Peer {} requested an unexpected event {:?}",
self.channel.address(), event_id,
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
continue
}
warn!(
// At this point we should have it in our DAG.
// This code panics if this is not the case.
debug!(
target: "event_graph::protocol::handle_event_req()",
"[EVENTGRAPH] Peer {} requested an unexpected event {}",
self.channel.address(), event_id,
"Fetching event {:?} from DAG", event_id,
);
continue
events.push(self.event_graph.dag_get(&event_id).await.unwrap().unwrap());
}
// At this point we should have it in our DAG.
// This code panics if this is not the case.
debug!(
target: "event_graph::protocol::handle_event_req()",
"Fetching event {} from DAG", event_id,
);
let event = self.event_graph.dag_get(&event_id).await.unwrap().unwrap();
// Check if the incoming event is older than the genesis event. If so, something
// has gone wrong. The event should have been pruned during the last
// rotation.
let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
if event.timestamp < genesis_timestamp {
error!(
target: "event_graph::protocol::handle_event_req()",
"Requested event by peer {} is older than previous rotation period. It should have been pruned.
Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
// Now let's get the upper level of event IDs. When we reply, we could
// get requests for those IDs as well.
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
bcast_ids.insert(*parent_id);
for event in events.clone() {
let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
if event.timestamp < genesis_timestamp {
error!(
target: "event_graph::protocol::handle_event_req()",
"Requested event by peer {} is older than previous rotation period. It should have been pruned.
Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
}
// TODO: We should remove the reply from the bcast IDs for this specific channel.
// We can't remove them for everyone.
//bcast_ids.remove(&event_id);
drop(bcast_ids);
// Now let's get the upper level of event IDs. When we reply, we could
// get requests for those IDs as well.
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
bcast_ids.insert(*parent_id);
}
}
// TODO: We should remove the reply from the bcast IDs for this specific channel.
// We can't remove them for everyone.
//bcast_ids.remove(&event_id);
drop(bcast_ids);
}
// Reply with the event
self.channel.send(&EventRep(event)).await?;
self.channel.send(&EventRep(events)).await?;
}
}