eventgraph: Use atomic database operations.

Additionally this changes the dag_insert API to take a slice of events.
This commit is contained in:
parazyd
2023-11-16 15:21:12 +01:00
parent cf4f85f7e9
commit 915520ea99
3 changed files with 79 additions and 44 deletions

View File

@@ -394,9 +394,9 @@ impl EventGraph {
// At this point we should've got all the events.
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
for event in received_events.iter().rev() {
self.dag_insert(event.clone()).await.unwrap();
}
// TODO: FIXME: This insert should also be atomic, dag_insert might need a rewrite
let received_events_rev: Vec<Event> = received_events.iter().rev().cloned().collect();
self.dag_insert(&received_events_rev).await.unwrap();
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
Ok(())
@@ -465,42 +465,77 @@ impl EventGraph {
}
}
/// Insert an event into the DAG.
/// This will append the new event into the unreferenced tips set, and
/// remove the event's parents from it. It will also append the event's
/// Atomically insert given events into the DAG and return the event IDs.
/// This will append the new events into the unreferenced tips set, and
/// remove the events' parents from it. It will also append the events'
/// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
/// knows that any requests for them are actually legitimate.
/// TODO: The `broadcasted_ids` set should periodically be pruned, when
/// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, event: Event) -> Result<blake3::Hash> {
let event_id = event.id();
debug!(target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id);
let s_event = serialize_async(&event).await;
// Update the unreferenced DAG tips set
pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut bcast_ids = self.broadcasted_ids.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
unreferenced_tips.remove(parent_id);
bcast_ids.insert(*parent_id);
// Here we keep the IDs to return
let mut ids = Vec::with_capacity(events.len());
// Create an atomic batch
let mut batch = sled::Batch::default();
// Iterate over given events
for event in events {
let event_id = event.id();
debug!(
target: "event_graph::dag_insert()",
"Inserting event {} into the DAG", event_id,
);
let event_se = serialize_async(event).await;
// Update the unreferenced DAG tips set
debug!(
target: "event_graph::dag_insert()",
"Event {} parents {:#?}", event_id, event.parents,
);
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
debug!(
target: "event_graph::dag_insert()",
"Removing {} from unreferenced_tips", parent_id,
);
unreferenced_tips.remove(parent_id);
broadcasted_ids.insert(*parent_id);
}
}
debug!(
target: "event_graph::dag_insert()",
"Adding {} to unreferenced tips", event_id,
);
unreferenced_tips.insert(event_id);
// Add the event to the atomic batch
batch.insert(event_id.as_bytes(), event_se);
// Note down the event ID to return
ids.push(event_id);
}
unreferenced_tips.insert(event_id);
self.dag.insert(event_id.as_bytes(), s_event).unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// We hold the write locks until this point because we insert the event
// into the database above, so we don't want anything to read these until
// that insertion is complete.
// Send out notifications about the new events
for event in events {
self.event_sub.notify(event.clone()).await;
}
// Drop the exclusive locks
drop(unreferenced_tips);
drop(bcast_ids);
drop(broadcasted_ids);
// Notify about the event on the event subscriber
self.event_sub.notify(event).await;
Ok(event_id)
Ok(ids)
}
/// Fetch an event from the DAG

View File

@@ -280,9 +280,9 @@ impl ProtocolEventGraph {
// At this point we should've got all the events.
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
for event in received_events.iter().rev() {
self.event_graph.dag_insert(event.clone()).await.unwrap();
}
let received_events_rev: Vec<Event> =
received_events.iter().rev().cloned().collect();
self.event_graph.dag_insert(&received_events_rev).await.unwrap();
} // <-- !missing_parents.is_empty()
// If we're here, we have all the parents, and we can now
@@ -291,7 +291,7 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion",
);
self.event_graph.dag_insert(event.clone()).await.unwrap();
self.event_graph.dag_insert(&[event.clone()]).await.unwrap();
// Relay the event to other peers.
self.event_graph

View File

@@ -160,7 +160,7 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let event = Event::new(vec![1, 2, 3, 4], random_node.clone()).await;
assert!(event.parents.contains(&genesis_event_id));
// The node adds it to their DAG.
let event_id = random_node.dag_insert(event.clone()).await.unwrap();
let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0];
let tips = random_node.unreferenced_tips.read().await;
assert!(tips.len() == 1);
assert!(tips.get(&event_id).is_some());
@@ -187,11 +187,11 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// ==============================================================
let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node.clone()).await;
let event0_id = random_node.dag_insert(event0.clone()).await.unwrap();
let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0];
let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node.clone()).await;
let event1_id = random_node.dag_insert(event1.clone()).await.unwrap();
let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0];
let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node.clone()).await;
let event2_id = random_node.dag_insert(event2.clone()).await.unwrap();
let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0];
// Genesis event + event from 2. + upper 3 events
assert!(random_node.dag.len() == 5);
let tips = random_node.unreferenced_tips.read().await;
@@ -225,15 +225,15 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node1 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1.clone()).await;
let _ = node1.dag_insert(event0_1.clone()).await.unwrap();
let _ = node1.dag_insert(&[event0_1.clone()]).await.unwrap()[0];
node1.p2p.broadcast(&EventPut(event0_1)).await;
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1.clone()).await;
let _ = node1.dag_insert(event1_1.clone()).await.unwrap();
let _ = node1.dag_insert(&[event1_1.clone()]).await.unwrap()[0];
node1.p2p.broadcast(&EventPut(event1_1)).await;
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1.clone()).await;
let _ = node1.dag_insert(event2_1.clone()).await.unwrap();
let _ = node1.dag_insert(&[event2_1.clone()]).await.unwrap()[0];
node1.p2p.broadcast(&EventPut(event2_1)).await;
// =======
@@ -241,14 +241,14 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node2 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2.clone()).await;
let _ = node2.dag_insert(event0_2.clone()).await.unwrap();
let _ = node2.dag_insert(&[event0_2.clone()]).await.unwrap()[0];
node2.p2p.broadcast(&EventPut(event0_2)).await;
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2.clone()).await;
let _ = node2.dag_insert(event1_2.clone()).await.unwrap();
let _ = node2.dag_insert(&[event1_2.clone()]).await.unwrap()[0];
node2.p2p.broadcast(&EventPut(event1_2)).await;
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2.clone()).await;
let _ = node2.dag_insert(event2_2.clone()).await.unwrap();
let _ = node2.dag_insert(&[event2_2.clone()]).await.unwrap()[0];
node2.p2p.broadcast(&EventPut(event2_2)).await;
// =======
@@ -256,15 +256,15 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node3 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3.clone()).await;
let _ = node3.dag_insert(event0_3.clone()).await.unwrap();
let _ = node3.dag_insert(&[event0_3.clone()]).await.unwrap()[0];
node2.p2p.broadcast(&EventPut(event0_3)).await;
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3.clone()).await;
let _ = node3.dag_insert(event1_3.clone()).await.unwrap();
let _ = node3.dag_insert(&[event1_3.clone()]).await.unwrap()[0];
node2.p2p.broadcast(&EventPut(event1_3)).await;
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3.clone()).await;
let event2_3_id = node3.dag_insert(event2_3.clone()).await.unwrap();
let event2_3_id = node3.dag_insert(&[event2_3.clone()]).await.unwrap()[0];
node3.p2p.broadcast(&EventPut(event2_3)).await;
info!("Waiting 10s for events propagation");