event-graph: DAG sync implementation.

This commit is contained in:
parazyd
2023-09-11 12:40:58 +02:00
parent 4b6d85b340
commit 6661faf8c4
4 changed files with 213 additions and 18 deletions

View File

@@ -137,15 +137,15 @@ mod tests {
let mut event_empty_content = e.clone();
event_empty_content.content = vec![];
assert_eq!(event_empty_content.validate(), false);
assert!(!event_empty_content.validate());
let mut event_timestamp_too_old = e.clone();
event_timestamp_too_old.timestamp = 0;
assert_eq!(event_timestamp_too_old.validate(), false);
assert!(!event_timestamp_too_old.validate());
let mut event_timestamp_too_new = e.clone();
event_timestamp_too_new.timestamp = u64::MAX;
assert_eq!(event_timestamp_too_new.validate(), false);
assert!(!event_timestamp_too_new.validate());
let mut event_duplicated_parents = e.clone();
let duplicated_parents = [
@@ -156,11 +156,11 @@ mod tests {
blake3::hash(b"5"),
];
event_duplicated_parents.parents = duplicated_parents;
assert_eq!(event_duplicated_parents.validate(), false);
assert!(!event_duplicated_parents.validate());
let mut event_null_parents = e.clone();
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID];
event_null_parents.parents = all_null_parents;
assert_eq!(event_null_parents.validate(), false);
assert!(!event_null_parents.validate());
}
}

View File

@@ -34,7 +34,7 @@ use smol::{
use crate::{
net::P2pPtr,
system::{sleep, StoppableTask, StoppableTaskPtr},
system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr},
Error, Result,
};
@@ -44,7 +44,7 @@ pub use event::Event;
/// P2P protocol implementation for the Event Graph
pub mod proto;
use proto::TipReq;
use proto::{EventRep, EventReq, TipRep, TipReq, REPLY_TIMEOUT};
/// Utility functions
mod util;
@@ -184,7 +184,7 @@ impl EventGraph {
let mut communicated_peers = channels.len();
info!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH]Syncing DAG from {} peers...", communicated_peers,
"[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers,
);
// Here we keep track of the tips and how many time we've seen them.
@@ -193,6 +193,19 @@ impl EventGraph {
// Let's first ask all of our peers for their tips and collect them
// in our hashmap above.
for (url, channel) in channels.iter() {
let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
Ok(v) => v,
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {}, skipping ({})",
url, e,
);
communicated_peers -= 1;
continue
}
};
if let Err(e) = channel.send(&TipReq {}).await {
error!(
target: "event_graph::dag_sync()",
@@ -202,22 +215,18 @@ impl EventGraph {
continue
};
// TODO: I can't access tip_rep_sub from here.
/*
let peer_tips = match timeout(REPLY_TIMEOUT, tip_rep_sub.receive()).await {
Ok(peer_tips) => peer_tips?,
Err(_) => {
error!(
target: "event_graph::dag_sync()",
"Peer {} didn't reply with tips in time, skipping", url,
"[EVENTGRAPH] Sync: Peer {} didn't reply with tips in time, skipping", url,
);
communicated_peers -= 1;
continue
}
};
let peer_tips = &peer_tips.0;
*/
let peer_tips = vec![];
// Note down the seen tips
for tip in peer_tips {
@@ -250,9 +259,132 @@ impl EventGraph {
drop(tips);
// Now begin fetching the events backwards.
// TODO: Reuse the recursive logic from EventPut handler.
let mut missing_parents = vec![];
for tip in considered_tips.iter() {
assert!(tip != &NULL_ID);
todo!()
if !self.dag.contains_key(tip.as_bytes()).unwrap() {
missing_parents.push(*tip);
}
}
if missing_parents.is_empty() {
return Ok(())
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let mut received_events = vec![];
while !missing_parents.is_empty() {
for parent_id in missing_parents.clone().iter() {
let mut found_event = false;
for (url, channel) in channels.iter() {
debug!(
target: "event_graph::dag_sync()",
"Requesting {} from {}...", parent_id, url,
);
let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
Ok(v) => v,
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
url, e,
);
continue
}
};
if let Err(e) = channel.send(&EventReq(*parent_id)).await {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed communicating EventReq({}) to {}: {}",
parent_id, url, e,
);
continue
}
let parent = match timeout(REPLY_TIMEOUT, ev_rep_sub.receive()).await {
Ok(parent) => parent,
Err(_) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Timeout waiting for parent {} from {}",
parent_id, url,
);
continue
}
};
let parent = match parent {
Ok(v) => v.0.clone(),
Err(e) => {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed receiving parent {}: {}",
parent_id, e,
);
continue
}
};
if &parent.id() != parent_id {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Peer {} replied with a wrong event: {}",
url, parent.id(),
);
continue
}
debug!(
target: "event_graph::dag_sync()",
"Got correct parent event {}", parent_id,
);
received_events.push(parent.clone());
let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap();
missing_parents.remove(pos);
found_event = true;
// See if we have the upper parents
for upper_parent in parent.parents.iter() {
if upper_parent == &NULL_ID {
continue
}
if !self.dag.contains_key(upper_parent.as_bytes()).unwrap() {
debug!(
target: "event_graph::dag_sync()",
"Found upper missing parent event{}", upper_parent,
);
missing_parents.push(*upper_parent);
}
}
break
}
if !found_event {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Failed to get all events",
);
return Err(Error::DagSyncFailed)
}
}
} // <-- while !missing_parents.is_empty
// At this point we should've got all the events.
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
for event in received_events.iter().rev() {
self.dag_insert(event).await.unwrap();
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
Ok(())
}
/// Background task periodically pruning the DAG.

View File

@@ -52,7 +52,7 @@ pub struct ProtocolEventGraph {
/// `MessageSubscriber` for `TipReq`
tip_req_sub: MessageSubscription<TipReq>,
/// `MessageSubscriber` for `TipRep`
tip_rep_sub: MessageSubscription<TipRep>,
_tip_rep_sub: MessageSubscription<TipRep>,
/// Peer malicious message count
malicious_count: AtomicUsize,
/// P2P jobs manager pointer
@@ -112,7 +112,7 @@ impl ProtocolEventGraph {
let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
let tip_req_sub = channel.subscribe_msg::<TipReq>().await?;
let tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
let _tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
@@ -121,7 +121,7 @@ impl ProtocolEventGraph {
ev_req_sub,
ev_rep_sub,
tip_req_sub,
tip_rep_sub,
_tip_rep_sub,
malicious_count: AtomicUsize::new(0),
jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
}))

View File

@@ -41,6 +41,7 @@ const N_CONNS: usize = 2;
//const N_CONNS: usize = N_NODES / 3;
#[test]
#[ignore]
fn eventgraph_propagation() {
let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("sled".to_string());
@@ -279,6 +280,68 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id);
}
// ============================================================
// 8. Start a new node and try to sync the DAG from other peers
// ============================================================
{
// Connect to N_CONNS random peers.
let mut peers = vec![];
for _ in 0..N_CONNS {
let port = 13200 + rng.gen_range(0..N_NODES);
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let settings = Settings {
localnet: true,
inbound_addrs: vec![
Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()
],
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
peers,
allowed_transports: vec!["tcp".to_string()],
..Default::default()
};
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
let event_graph_ = event_graph.clone();
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
eg_instances.push(event_graph.clone());
event_graph.p2p.clone().start().await.unwrap();
info!("Waiting 10s for new node connection");
sleep(10).await;
event_graph.dag_sync().await.unwrap();
}
info!("Waiting 10s for things to settle");
sleep(10).await;
// ============================================================
// 9. Assert the new synced DAG has the same contents as others
// ============================================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len());
// 5 events from 2. and 4. + 9 events from 6. = ^
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id);
}
// Stop the P2P network
for eg in eg_instances.iter() {
eg.p2p.clone().stop().await;