From f46eb4c0e4d8a6cb53e1fc8a5b8b52fed1bc8547 Mon Sep 17 00:00:00 2001 From: Dastan-glitch Date: Sat, 13 Jan 2024 03:25:02 +0300 Subject: [PATCH] src/event_graph: request and reply multiple events --- src/event_graph/mod.rs | 118 ++++++++++++++++--------------- src/event_graph/proto.rs | 148 +++++++++++++++++++++------------------ 2 files changed, 139 insertions(+), 127 deletions(-) diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 50f123c1a..32c74f305 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -287,64 +287,67 @@ impl EventGraph { info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events"); let mut received_events: BTreeMap> = BTreeMap::new(); let mut received_events_hashes = HashSet::new(); + while !missing_parents.is_empty() { - for parent_id in missing_parents.clone().iter() { - let mut found_event = false; + let mut found_event = false; - for channel in channels.iter() { - let url = channel.address(); + for channel in channels.iter() { + let url = channel.address(); - debug!( - target: "event_graph::dag_sync()", - "Requesting {} from {}...", parent_id, url, - ); + debug!( + target: "event_graph::dag_sync()", + "Requesting {:?} from {}...", missing_parents, url, + ); - let ev_rep_sub = match channel.subscribe_msg::().await { - Ok(v) => v, - Err(e) => { - error!( - target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})", - url, e, - ); - continue - } - }; - - if let Err(e) = channel.send(&EventReq(*parent_id)).await { + let multi_ev_rep_sub = match channel.subscribe_msg::().await { + Ok(v) => v, + Err(e) => { error!( target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Failed communicating EventReq({}) to {}: {}", - parent_id, url, e, + "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})", + url, e, ); continue } + }; - let parent = match timeout(REPLY_TIMEOUT, ev_rep_sub.receive()).await { - Ok(parent) => parent, - Err(_) => { - error!( - target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Timeout waiting for parent {} from {}", - parent_id, url, - ); - continue - } - }; + let request_missing_events = missing_parents.clone().into_iter().collect(); + if let Err(e) = channel.send(&EventReq(request_missing_events)).await { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Failed communicating MultiEventReq({:?}) to {}: {}", + missing_parents, url, e, + ); + continue + } - let parent = match parent { - Ok(v) => v.0.clone(), - Err(e) => { - error!( - target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Failed receiving parent {}: {}", - parent_id, e, - ); - continue - } - }; + let parent = match timeout(REPLY_TIMEOUT, multi_ev_rep_sub.receive()).await { + Ok(parent) => parent, + Err(_) => { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}", + missing_parents, url, + ); + continue + } + }; - if &parent.id() != parent_id { + let parents = match parent { + Ok(v) => v.0.clone(), + Err(e) => { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Failed receiving parents {:?}: {}", + missing_parents, e, + ); + continue + } + }; + + for parent in parents { + let parent_id = parent.id(); + if !missing_parents.contains(&parent_id) { error!( target: "event_graph::dag_sync()", "[EVENTGRAPH] Sync: Peer {} replied with a wrong event: {}", @@ -364,9 +367,9 @@ impl EventGraph { let layer_events = vec![parent.clone()]; received_events.insert(parent.layer, layer_events); } - received_events_hashes.insert(*parent_id); + received_events_hashes.insert(parent_id); - missing_parents.remove(parent_id); + missing_parents.remove(&parent_id); found_event = true; // See if we have the upper parents @@ -386,18 +389,19 @@ impl EventGraph { missing_parents.insert(*upper_parent); } } - - break } - if !found_event { - error!( - target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Failed to get all events", - ); - return Err(Error::DagSyncFailed) - } + break } + + if !found_event { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Failed to get all events", + ); + return Err(Error::DagSyncFailed) + } + // } } // <-- while !missing_parents.is_empty // At this point we should've got all the events. diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index 02b72d15c..5d4caf2fb 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -69,12 +69,12 @@ impl_p2p_message!(EventPut, "EventGraph::EventPut"); /// A P2P message representing an event request #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct EventReq(pub blake3::Hash); +pub struct EventReq(pub Vec); impl_p2p_message!(EventReq, "EventGraph::EventReq"); /// A P2P message representing an event reply #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct EventRep(pub Event); +pub struct EventRep(pub Vec); impl_p2p_message!(EventRep, "EventGraph::EventRep"); /// A P2P message representing a request for a peer's DAG tips @@ -241,28 +241,32 @@ impl ProtocolEventGraph { ); while !missing_parents.is_empty() { - for parent_id in missing_parents.clone().iter() { - debug!( - target: "event_graph::protocol::handle_event_put()", - "Requesting {}...", parent_id, - ); + // for parent_id in missing_parents.clone().iter() { + debug!( + target: "event_graph::protocol::handle_event_put()", + "Requesting {:?}...", missing_parents, + ); - self.channel.send(&EventReq(*parent_id)).await?; - let parent = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await { - Ok(parent) => parent?, - Err(_) => { - error!( - target: "event_graph::protocol::handle_event_put()", - "[EVENTGRAPH] Timeout while waiting for parent {} from {}", - parent_id, self.channel.address(), - ); - self.channel.stop().await; - return Err(Error::ChannelStopped) - } - }; - let parent = parent.0.clone(); + self.channel + .send(&EventReq(missing_parents.clone().into_iter().collect())) + .await?; + let parents = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await { + Ok(parent) => parent?, + Err(_) => { + error!( + target: "event_graph::protocol::handle_event_put()", + "[EVENTGRAPH] Timeout while waiting for parents {:?} from {}", + missing_parents, self.channel.address(), + ); + self.channel.stop().await; + return Err(Error::ChannelStopped) + } + }; + let parents = parents.0.clone(); - if &parent.id() != parent_id { + for parent in parents { + let parent_id = parent.id(); + if !missing_parents.contains(&parent_id) { error!( target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Peer {} replied with a wrong event: {}", @@ -283,9 +287,9 @@ impl ProtocolEventGraph { let layer_events = vec![parent.clone()]; received_events.insert(parent.layer, layer_events); } - received_events_hashes.insert(*parent_id); + received_events_hashes.insert(parent_id); - missing_parents.remove(parent_id); + missing_parents.remove(&parent_id); // See if we have the upper parents for upper_parent in parent.parents.iter() { @@ -349,13 +353,13 @@ impl ProtocolEventGraph { /// This is triggered whenever someone requests an event from us. async fn handle_event_req(self: Arc) -> Result<()> { loop { - let event_id = match self.ev_req_sub.receive().await { - Ok(v) => v.0, + let event_ids = match self.ev_req_sub.receive().await { + Ok(v) => v.0.clone(), Err(_) => continue, }; trace!( - target: "event_graph::protocol::handle_event_req()", - "Got EventReq: {} [{}]", event_id, self.channel.address(), + target: "event_graph::protocol::handle_multi_event_req()", + "Got MultiEventReq: {:?} [{}]", event_ids, self.channel.address(), ); // Check if node has finished syncing its DAG @@ -378,63 +382,67 @@ impl ProtocolEventGraph { // I dunno if this is a good idea, but it seems it will help // against malicious event requests where they want us to keep // reading our db and steal our bandwidth. - if !self.event_graph.broadcasted_ids.read().await.contains(&event_id) { - let malicious_count = self.malicious_count.fetch_add(1, SeqCst); - if malicious_count + 1 == MALICIOUS_THRESHOLD { - error!( + let mut events = vec![]; + for event_id in event_ids { + if !self.event_graph.broadcasted_ids.read().await.contains(&event_id) { + let malicious_count = self.malicious_count.fetch_add(1, SeqCst); + if malicious_count + 1 == MALICIOUS_THRESHOLD { + error!( + target: "event_graph::protocol::handle_event_req()", + "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", + self.channel.address(), + ); + self.channel.stop().await; + return Err(Error::ChannelStopped) + } + + warn!( target: "event_graph::protocol::handle_event_req()", - "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", - self.channel.address(), + "[EVENTGRAPH] Peer {} requested an unexpected event {:?}", + self.channel.address(), event_id, ); - self.channel.stop().await; - return Err(Error::ChannelStopped) + continue } - warn!( + // At this point we should have it in our DAG. + // This code panics if this is not the case. + debug!( target: "event_graph::protocol::handle_event_req()", - "[EVENTGRAPH] Peer {} requested an unexpected event {}", - self.channel.address(), event_id, + "Fetching event {:?} from DAG", event_id, ); - continue + events.push(self.event_graph.dag_get(&event_id).await.unwrap().unwrap()); } - // At this point we should have it in our DAG. - // This code panics if this is not the case. - debug!( - target: "event_graph::protocol::handle_event_req()", - "Fetching event {} from DAG", event_id, - ); - let event = self.event_graph.dag_get(&event_id).await.unwrap().unwrap(); - // Check if the incoming event is older than the genesis event. If so, something // has gone wrong. The event should have been pruned during the last // rotation. - let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp; - if event.timestamp < genesis_timestamp { - error!( - target: "event_graph::protocol::handle_event_req()", - "Requested event by peer {} is older than previous rotation period. It should have been pruned. - Event timestamp: `{}`. Genesis timestamp: `{}`", - event.id(), event.timestamp, genesis_timestamp - ); - } - - // Now let's get the upper level of event IDs. When we reply, we could - // get requests for those IDs as well. - let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; - for parent_id in event.parents.iter() { - if parent_id != &NULL_ID { - bcast_ids.insert(*parent_id); + for event in events.clone() { + let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp; + if event.timestamp < genesis_timestamp { + error!( + target: "event_graph::protocol::handle_event_req()", + "Requested event by peer {} is older than previous rotation period. It should have been pruned. + Event timestamp: `{}`. Genesis timestamp: `{}`", + event.id(), event.timestamp, genesis_timestamp + ); } - } - // TODO: We should remove the reply from the bcast IDs for this specific channel. - // We can't remove them for everyone. - //bcast_ids.remove(&event_id); - drop(bcast_ids); + // Now let's get the upper level of event IDs. When we reply, we could + // get requests for those IDs as well. + let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; + for parent_id in event.parents.iter() { + if parent_id != &NULL_ID { + bcast_ids.insert(*parent_id); + } + } + // TODO: We should remove the reply from the bcast IDs for this specific channel. + // We can't remove them for everyone. + //bcast_ids.remove(&event_id); + drop(bcast_ids); + } // Reply with the event - self.channel.send(&EventRep(event)).await?; + self.channel.send(&EventRep(events)).await?; } }