From 69772d84000a7f013977159182c88efca2b038cd Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 16 Jul 2022 17:22:21 +0400 Subject: [PATCH] using sled datastore api to get logs and other required data --- src/raft/consensus.rs | 173 ++++++++++++++++++++--------------------- src/raft/datastore.rs | 8 ++ src/raft/primitives.rs | 3 - 3 files changed, 93 insertions(+), 91 deletions(-) diff --git a/src/raft/consensus.rs b/src/raft/consensus.rs index 6fa2747b3..e31af1f33 100644 --- a/src/raft/consensus.rs +++ b/src/raft/consensus.rs @@ -28,9 +28,9 @@ use super::{ const HEARTBEATTIMEOUT: u64 = 500; const TIMEOUT: u64 = 6000; const TIMEOUT_NODES: u64 = 1000; -const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2400; +const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000; -const SYNC_ATTEMPTS: u64 = 12; +const SYNC_ATTEMPTS: u64 = 30; async fn load_node_ids_loop( nodes: Arc>>, @@ -66,12 +66,6 @@ pub struct Raft { // this will be derived from the ip pub id: Option, - // these four vars should be on local storage - current_term: u64, - voted_for: Option, - logs: Logs, - commit_length: u64, - role: Role, current_leader: Option, @@ -108,12 +102,6 @@ impl Raft { let datastore = DataStore::new(db_path.to_str().unwrap())?; - // load from sled datastore - let current_term = datastore.current_term.get_last()?.unwrap_or(0); - let voted_for = datastore.voted_for.get_last()?.flatten(); - let logs = Logs(datastore.logs.get_all()?); - let commit_length = datastore.commits.get_all()?.len() as u64; - // broadcasting channels let msgs_channel = async_channel::unbounded::(); let commits_channel = async_channel::unbounded::(); @@ -125,10 +113,6 @@ impl Raft { Ok(Self { id, - current_term, - voted_for, - logs, - commit_length, role, current_leader: None, votes_received: vec![], @@ -161,9 +145,9 @@ impl Raft { // Sync listener node if self.role == Role::Listener { let last_term = - if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 }; + if !self.is_logs_empty() { self.get_last_log()?.unwrap().term } else { 0 }; - let sync_request = SyncRequest { logs_len: self.logs.len(), last_term }; + let sync_request = SyncRequest { logs_len: self.get_logs_len(), last_term }; info!("Start Syncing..."); for _ in 0..SYNC_ATTEMPTS { @@ -239,10 +223,10 @@ impl Raft { async fn broadcast_msg(&mut self, msg: &T, msg_id: Option) -> Result<()> { if self.role == Role::Leader { let msg = serialize(msg); - let log = Log { msg, term: self.current_term }; + let log = Log { msg, term: self.get_current_term()? }; self.push_log(&log)?; - self.acked_length.insert(&self.id.clone().unwrap(), self.logs.len()); + self.acked_length.insert(&self.id.clone().unwrap(), self.get_logs_len()); } else { let b_msg = BroadcastMsgRequest(serialize(msg)); self.send( @@ -300,19 +284,19 @@ impl Raft { let mut wipe = false; let logs = if sr.logs_len == 0 { - self.logs.clone() - } else if self.logs.len() >= sr.logs_len && - self.logs.get(sr.logs_len - 1)?.term == sr.last_term + self.get_logs()?.clone() + } else if self.get_logs_len() >= sr.logs_len && + self.get_logs()?.get(sr.logs_len - 1)?.term == sr.last_term { - self.logs.slice_from(sr.logs_len).unwrap() + self.get_logs()?.slice_from(sr.logs_len).unwrap() } else { wipe = true; - self.logs.clone() + self.get_logs()?.clone() }; let sync_response = SyncResponse { logs, - commit_length: self.commit_length, + commit_length: self.get_commits_len(), leader_id: self.id.clone().unwrap(), wipe, }; @@ -335,7 +319,6 @@ impl Raft { async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> { debug!(target: "raft", "Receive sync response"); if sr.wipe { - self.set_commit_length(&0)?; self.push_logs(&sr.logs)?; } else { for log in sr.logs.0.iter() { @@ -343,16 +326,14 @@ impl Raft { } } - if !self.logs.is_empty() { - self.set_current_term(&self.logs.0.last().unwrap().term.clone())?; + if !self.get_logs()?.is_empty() { + self.set_current_term(&self.get_logs()?.0.last().unwrap().term.clone())?; } - for i in self.commit_length..sr.commit_length { - self.push_commit(&self.logs.get(i)?.msg).await?; + for i in self.get_commits_len()..sr.commit_length { + self.push_commit(&self.get_logs()?.get(i)?.msg).await?; } - self.set_commit_length(&sr.commit_length)?; - self.current_leader = Some(sr.leader_id.clone()); Ok(()) @@ -426,18 +407,18 @@ impl Raft { let self_id = self.id.clone().unwrap(); - self.set_current_term(&(self.current_term + 1))?; + self.set_current_term(&(self.get_current_term()? + 1))?; self.role = Role::Candidate; self.set_voted_for(&Some(self_id.clone()))?; self.votes_received = vec![]; self.votes_received.push(self_id.clone()); - self.reset_last_term(); + self.reset_last_term()?; let request = VoteRequest { node_id: self_id, - current_term: self.current_term, - log_length: self.logs.len(), + current_term: self.get_current_term()?, + log_length: self.get_logs_len(), last_term: self.last_term, }; @@ -450,32 +431,32 @@ impl Raft { return Ok(()) } - if vr.current_term > self.current_term { + if vr.current_term > self.get_current_term()? { self.set_current_term(&vr.current_term)?; self.set_voted_for(&None)?; self.role = Role::Follower; } - self.reset_last_term(); + self.reset_last_term()?; // check the logs of the candidate let vote_ok = (vr.last_term > self.last_term) || - (vr.last_term == self.last_term && vr.log_length >= self.logs.len()); + (vr.last_term == self.last_term && vr.log_length >= self.get_logs_len()); // slef.voted_for equal to vr.node_id or is None or voted to someone else - let vote = if let Some(voted_for) = self.voted_for.as_ref() { - *voted_for == vr.node_id + let vote = if let Some(voted_for) = self.get_voted_for()? { + voted_for == vr.node_id } else { true }; let mut response = VoteResponse { node_id: self.id.clone().unwrap(), - current_term: self.current_term, + current_term: self.get_current_term()?, ok: false, }; - if vr.current_term == self.current_term && vote_ok && vote { + if vr.current_term == self.get_current_term()? && vote_ok && vote { self.set_voted_for(&Some(vr.node_id.clone()))?; response.set_ok(true); } @@ -489,7 +470,7 @@ impl Raft { return Ok(()) } - if self.role == Role::Candidate && vr.current_term == self.current_term && vr.ok { + if self.role == Role::Candidate && vr.current_term == self.get_current_term()? && vr.ok { self.votes_received.push(vr.node_id); let nodes = self.nodes.lock().await; @@ -500,11 +481,11 @@ impl Raft { self.role = Role::Leader; self.current_leader = Some(self.id.clone().unwrap()); for node in nodes_cloned.iter() { - self.sent_length.insert(node.0, self.logs.len()); + self.sent_length.insert(node.0, self.get_logs_len()); self.acked_length.insert(node.0, 0); } } - } else if vr.current_term > self.current_term { + } else if vr.current_term > self.get_current_term()? { self.set_current_term(&vr.current_term)?; self.role = Role::Follower; self.set_voted_for(&None)?; @@ -522,8 +503,8 @@ impl Raft { } }; - let suffix: Logs = if self.logs.slice_from(prefix_len).is_some() { - self.logs.slice_from(prefix_len).unwrap() + let suffix: Logs = if self.get_logs()?.slice_from(prefix_len).is_some() { + self.get_logs()?.slice_from(prefix_len).unwrap() } else { return Ok(()) }; @@ -531,15 +512,15 @@ impl Raft { let mut prefix_term = 0; if prefix_len > 0 { - prefix_term = self.logs.get(prefix_len - 1)?.term; + prefix_term = self.get_logs()?.get(prefix_len - 1)?.term; } let request = LogRequest { leader_id: self.id.clone().unwrap(), - current_term: self.current_term, + current_term: self.get_current_term()?, prefix_len, prefix_term, - commit_length: self.commit_length, + commit_length: self.get_commits_len(), suffix, }; @@ -548,24 +529,25 @@ impl Raft { } async fn receive_log_request(&mut self, lr: LogRequest) -> Result<()> { - if lr.current_term > self.current_term { + if lr.current_term > self.get_current_term()? { self.set_current_term(&lr.current_term)?; self.set_voted_for(&None)?; } - if lr.current_term == self.current_term { + if lr.current_term == self.get_current_term()? { if self.role != Role::Listener { self.role = Role::Follower; } self.current_leader = Some(lr.leader_id.clone()); } - let mut ok = (self.logs.len() >= lr.prefix_len) && - (lr.prefix_len == 0 || self.logs.get(lr.prefix_len - 1)?.term == lr.prefix_term); + let mut ok = (self.get_logs_len() >= lr.prefix_len) && + (lr.prefix_len == 0 || + self.get_logs()?.get(lr.prefix_len - 1)?.term == lr.prefix_term); let mut ack = 0; - if lr.current_term == self.current_term && ok { + if lr.current_term == self.get_current_term()? && ok { self.append_log(lr.prefix_len, lr.commit_length, &lr.suffix).await?; ack = lr.prefix_len + lr.suffix.len(); } else { @@ -578,7 +560,7 @@ impl Raft { let response = LogResponse { node_id: self.id.clone().unwrap(), - current_term: self.current_term, + current_term: self.get_current_term()?, ack, ok, }; @@ -588,7 +570,7 @@ impl Raft { } async fn receive_log_response(&mut self, lr: LogResponse) -> Result<()> { - if lr.current_term == self.current_term && self.role == Role::Leader { + if lr.current_term == self.get_current_term()? && self.role == Role::Leader { if lr.ok && lr.ack >= self.acked_length.get(&lr.node_id)? { self.sent_length.insert(&lr.node_id, lr.ack); self.acked_length.insert(&lr.node_id, lr.ack); @@ -596,7 +578,7 @@ impl Raft { } else if self.sent_length.get(&lr.node_id)? > 0 { self.sent_length.insert(&lr.node_id, self.sent_length.get(&lr.node_id)? - 1); } - } else if lr.current_term > self.current_term { + } else if lr.current_term > self.get_current_term()? { self.set_current_term(&lr.current_term)?; if self.role != Role::Listener { self.role = Role::Follower; @@ -607,12 +589,14 @@ impl Raft { Ok(()) } - fn reset_last_term(&mut self) { + fn reset_last_term(&mut self) -> Result<()> { self.last_term = 0; - if let Some(log) = self.logs.0.last() { + if let Some(log) = self.get_last_log()? { self.last_term = log.term; } + + Ok(()) } fn acks(&self, nodes: HashMap, length: u64) -> HashMap { @@ -633,7 +617,7 @@ impl Raft { let mut ready: Vec = vec![]; - for len in 1..(self.logs.len() + 1) { + for len in 1..(self.get_logs_len() + 1) { if self.acks(nodes.clone(), len).len() >= min_acks { ready.push(len); } @@ -645,13 +629,12 @@ impl Raft { let max_ready = *ready.iter().max().unwrap(); - if max_ready > self.commit_length && self.logs.get(max_ready - 1)?.term == self.current_term + if max_ready > self.get_commits_len() && + self.get_logs()?.get(max_ready - 1)?.term == self.get_current_term()? { - for i in self.commit_length..max_ready { - self.push_commit(&self.logs.get(i)?.msg).await?; + for i in self.get_commits_len()..max_ready { + self.push_commit(&self.get_logs()?.get(i)?.msg).await?; } - - self.set_commit_length(&max_ready)?; } Ok(()) @@ -663,39 +646,33 @@ impl Raft { leader_commit: u64, suffix: &Logs, ) -> Result<()> { - if !suffix.is_empty() && self.logs.len() > prefix_len { - let index = min(self.logs.len(), prefix_len + suffix.len()) - 1; - if self.logs.get(index)?.term != suffix.get(index - prefix_len)?.term { - self.push_logs(&self.logs.slice_to(prefix_len))?; + if !suffix.is_empty() && self.get_logs_len() > prefix_len { + let index = min(self.get_logs_len(), prefix_len + suffix.len()) - 1; + if self.get_logs()?.get(index)?.term != suffix.get(index - prefix_len)?.term { + self.push_logs(&self.get_logs()?.slice_to(prefix_len))?; } } - if prefix_len + suffix.len() > self.logs.len() { - for i in (self.logs.len() - prefix_len)..suffix.len() { + if prefix_len + suffix.len() > self.get_logs_len() { + for i in (self.get_logs_len() - prefix_len)..suffix.len() { self.push_log(&suffix.get(i)?)?; } } - if leader_commit > self.commit_length { - for i in self.commit_length..leader_commit { - self.push_commit(&self.logs.get(i)?.msg).await?; + if leader_commit > self.get_commits_len() { + for i in self.get_commits_len()..leader_commit { + self.push_commit(&self.get_logs()?.get(i)?.msg).await?; } - self.set_commit_length(&leader_commit)?; } Ok(()) } - fn set_commit_length(&mut self, i: &u64) -> Result<()> { - self.commit_length = *i; - Ok(()) - } fn set_current_term(&mut self, i: &u64) -> Result<()> { - self.current_term = *i; self.datastore.current_term.insert(i) } + fn set_voted_for(&mut self, i: &Option) -> Result<()> { - self.voted_for = i.clone(); self.datastore.voted_for.insert(i) } async fn push_commit(&mut self, commit: &[u8]) -> Result<()> { @@ -704,11 +681,31 @@ impl Raft { self.datastore.commits.insert(&commit) } fn push_log(&mut self, log: &Log) -> Result<()> { - self.logs.push(log); self.datastore.logs.insert(log) } fn push_logs(&mut self, logs: &Logs) -> Result<()> { - self.logs = logs.clone(); self.datastore.logs.wipe_insert_all(&logs.to_vec()) } + fn get_current_term(&self) -> Result { + Ok(self.datastore.current_term.get_last()?.unwrap_or(0)) + } + fn get_voted_for(&self) -> Result> { + Ok(self.datastore.voted_for.get_last()?.flatten()) + } + fn get_commits_len(&self) -> u64 { + self.datastore.commits.len() + } + fn get_logs(&self) -> Result { + Ok(Logs(self.datastore.logs.get_all()?)) + } + fn get_logs_len(&self) -> u64 { + self.datastore.logs.len() + } + fn is_logs_empty(&self) -> bool { + self.datastore.logs.is_empty() + } + + fn get_last_log(&self) -> Result> { + self.datastore.logs.get_last() + } } diff --git a/src/raft/datastore.rs b/src/raft/datastore.rs index e82eaf0bd..60adfeeed 100644 --- a/src/raft/datastore.rs +++ b/src/raft/datastore.rs @@ -86,6 +86,10 @@ impl DataTree { Ok(ret) } + pub fn len(&self) -> u64 { + self.tree.len() as u64 + } + pub fn get_last(&self) -> Result> { if let Some(found) = self.tree.last()? { let da = deserialize(&found.1)?; @@ -93,4 +97,8 @@ impl DataTree { } Ok(None) } + + pub fn is_empty(&self) -> bool { + self.tree.is_empty() + } } diff --git a/src/raft/primitives.rs b/src/raft/primitives.rs index 15d198a8d..e5057a008 100644 --- a/src/raft/primitives.rs +++ b/src/raft/primitives.rs @@ -102,9 +102,6 @@ impl Logs { pub fn is_empty(&self) -> bool { self.0.is_empty() } - pub fn push(&mut self, d: &Log) { - self.0.push(d.clone()); - } pub fn slice_from(&self, start: u64) -> Option { if self.len() >= start {