using sled datastore api to get logs and other required data

This commit is contained in:
ghassmo
2022-07-16 17:22:21 +04:00
parent e00bc7d967
commit 69772d8400
3 changed files with 93 additions and 91 deletions

View File

@@ -28,9 +28,9 @@ use super::{
const HEARTBEATTIMEOUT: u64 = 500;
const TIMEOUT: u64 = 6000;
const TIMEOUT_NODES: u64 = 1000;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2400;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000;
const SYNC_ATTEMPTS: u64 = 12;
const SYNC_ATTEMPTS: u64 = 30;
async fn load_node_ids_loop(
nodes: Arc<Mutex<HashMap<NodeId, Url>>>,
@@ -66,12 +66,6 @@ pub struct Raft<T> {
// this will be derived from the ip
pub id: Option<NodeId>,
// these four vars should be on local storage
current_term: u64,
voted_for: Option<NodeId>,
logs: Logs,
commit_length: u64,
role: Role,
current_leader: Option<NodeId>,
@@ -108,12 +102,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let datastore = DataStore::new(db_path.to_str().unwrap())?;
// load from sled datastore
let current_term = datastore.current_term.get_last()?.unwrap_or(0);
let voted_for = datastore.voted_for.get_last()?.flatten();
let logs = Logs(datastore.logs.get_all()?);
let commit_length = datastore.commits.get_all()?.len() as u64;
// broadcasting channels
let msgs_channel = async_channel::unbounded::<T>();
let commits_channel = async_channel::unbounded::<T>();
@@ -125,10 +113,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
Ok(Self {
id,
current_term,
voted_for,
logs,
commit_length,
role,
current_leader: None,
votes_received: vec![],
@@ -161,9 +145,9 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
// Sync listener node
if self.role == Role::Listener {
let last_term =
if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 };
if !self.is_logs_empty() { self.get_last_log()?.unwrap().term } else { 0 };
let sync_request = SyncRequest { logs_len: self.logs.len(), last_term };
let sync_request = SyncRequest { logs_len: self.get_logs_len(), last_term };
info!("Start Syncing...");
for _ in 0..SYNC_ATTEMPTS {
@@ -239,10 +223,10 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn broadcast_msg(&mut self, msg: &T, msg_id: Option<u64>) -> Result<()> {
if self.role == Role::Leader {
let msg = serialize(msg);
let log = Log { msg, term: self.current_term };
let log = Log { msg, term: self.get_current_term()? };
self.push_log(&log)?;
self.acked_length.insert(&self.id.clone().unwrap(), self.logs.len());
self.acked_length.insert(&self.id.clone().unwrap(), self.get_logs_len());
} else {
let b_msg = BroadcastMsgRequest(serialize(msg));
self.send(
@@ -300,19 +284,19 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut wipe = false;
let logs = if sr.logs_len == 0 {
self.logs.clone()
} else if self.logs.len() >= sr.logs_len &&
self.logs.get(sr.logs_len - 1)?.term == sr.last_term
self.get_logs()?.clone()
} else if self.get_logs_len() >= sr.logs_len &&
self.get_logs()?.get(sr.logs_len - 1)?.term == sr.last_term
{
self.logs.slice_from(sr.logs_len).unwrap()
self.get_logs()?.slice_from(sr.logs_len).unwrap()
} else {
wipe = true;
self.logs.clone()
self.get_logs()?.clone()
};
let sync_response = SyncResponse {
logs,
commit_length: self.commit_length,
commit_length: self.get_commits_len(),
leader_id: self.id.clone().unwrap(),
wipe,
};
@@ -335,7 +319,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> {
debug!(target: "raft", "Receive sync response");
if sr.wipe {
self.set_commit_length(&0)?;
self.push_logs(&sr.logs)?;
} else {
for log in sr.logs.0.iter() {
@@ -343,16 +326,14 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
}
if !self.logs.is_empty() {
self.set_current_term(&self.logs.0.last().unwrap().term.clone())?;
if !self.get_logs()?.is_empty() {
self.set_current_term(&self.get_logs()?.0.last().unwrap().term.clone())?;
}
for i in self.commit_length..sr.commit_length {
self.push_commit(&self.logs.get(i)?.msg).await?;
for i in self.get_commits_len()..sr.commit_length {
self.push_commit(&self.get_logs()?.get(i)?.msg).await?;
}
self.set_commit_length(&sr.commit_length)?;
self.current_leader = Some(sr.leader_id.clone());
Ok(())
@@ -426,18 +407,18 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let self_id = self.id.clone().unwrap();
self.set_current_term(&(self.current_term + 1))?;
self.set_current_term(&(self.get_current_term()? + 1))?;
self.role = Role::Candidate;
self.set_voted_for(&Some(self_id.clone()))?;
self.votes_received = vec![];
self.votes_received.push(self_id.clone());
self.reset_last_term();
self.reset_last_term()?;
let request = VoteRequest {
node_id: self_id,
current_term: self.current_term,
log_length: self.logs.len(),
current_term: self.get_current_term()?,
log_length: self.get_logs_len(),
last_term: self.last_term,
};
@@ -450,32 +431,32 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
return Ok(())
}
if vr.current_term > self.current_term {
if vr.current_term > self.get_current_term()? {
self.set_current_term(&vr.current_term)?;
self.set_voted_for(&None)?;
self.role = Role::Follower;
}
self.reset_last_term();
self.reset_last_term()?;
// check the logs of the candidate
let vote_ok = (vr.last_term > self.last_term) ||
(vr.last_term == self.last_term && vr.log_length >= self.logs.len());
(vr.last_term == self.last_term && vr.log_length >= self.get_logs_len());
// slef.voted_for equal to vr.node_id or is None or voted to someone else
let vote = if let Some(voted_for) = self.voted_for.as_ref() {
*voted_for == vr.node_id
let vote = if let Some(voted_for) = self.get_voted_for()? {
voted_for == vr.node_id
} else {
true
};
let mut response = VoteResponse {
node_id: self.id.clone().unwrap(),
current_term: self.current_term,
current_term: self.get_current_term()?,
ok: false,
};
if vr.current_term == self.current_term && vote_ok && vote {
if vr.current_term == self.get_current_term()? && vote_ok && vote {
self.set_voted_for(&Some(vr.node_id.clone()))?;
response.set_ok(true);
}
@@ -489,7 +470,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
return Ok(())
}
if self.role == Role::Candidate && vr.current_term == self.current_term && vr.ok {
if self.role == Role::Candidate && vr.current_term == self.get_current_term()? && vr.ok {
self.votes_received.push(vr.node_id);
let nodes = self.nodes.lock().await;
@@ -500,11 +481,11 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.role = Role::Leader;
self.current_leader = Some(self.id.clone().unwrap());
for node in nodes_cloned.iter() {
self.sent_length.insert(node.0, self.logs.len());
self.sent_length.insert(node.0, self.get_logs_len());
self.acked_length.insert(node.0, 0);
}
}
} else if vr.current_term > self.current_term {
} else if vr.current_term > self.get_current_term()? {
self.set_current_term(&vr.current_term)?;
self.role = Role::Follower;
self.set_voted_for(&None)?;
@@ -522,8 +503,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
};
let suffix: Logs = if self.logs.slice_from(prefix_len).is_some() {
self.logs.slice_from(prefix_len).unwrap()
let suffix: Logs = if self.get_logs()?.slice_from(prefix_len).is_some() {
self.get_logs()?.slice_from(prefix_len).unwrap()
} else {
return Ok(())
};
@@ -531,15 +512,15 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut prefix_term = 0;
if prefix_len > 0 {
prefix_term = self.logs.get(prefix_len - 1)?.term;
prefix_term = self.get_logs()?.get(prefix_len - 1)?.term;
}
let request = LogRequest {
leader_id: self.id.clone().unwrap(),
current_term: self.current_term,
current_term: self.get_current_term()?,
prefix_len,
prefix_term,
commit_length: self.commit_length,
commit_length: self.get_commits_len(),
suffix,
};
@@ -548,24 +529,25 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn receive_log_request(&mut self, lr: LogRequest) -> Result<()> {
if lr.current_term > self.current_term {
if lr.current_term > self.get_current_term()? {
self.set_current_term(&lr.current_term)?;
self.set_voted_for(&None)?;
}
if lr.current_term == self.current_term {
if lr.current_term == self.get_current_term()? {
if self.role != Role::Listener {
self.role = Role::Follower;
}
self.current_leader = Some(lr.leader_id.clone());
}
let mut ok = (self.logs.len() >= lr.prefix_len) &&
(lr.prefix_len == 0 || self.logs.get(lr.prefix_len - 1)?.term == lr.prefix_term);
let mut ok = (self.get_logs_len() >= lr.prefix_len) &&
(lr.prefix_len == 0 ||
self.get_logs()?.get(lr.prefix_len - 1)?.term == lr.prefix_term);
let mut ack = 0;
if lr.current_term == self.current_term && ok {
if lr.current_term == self.get_current_term()? && ok {
self.append_log(lr.prefix_len, lr.commit_length, &lr.suffix).await?;
ack = lr.prefix_len + lr.suffix.len();
} else {
@@ -578,7 +560,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let response = LogResponse {
node_id: self.id.clone().unwrap(),
current_term: self.current_term,
current_term: self.get_current_term()?,
ack,
ok,
};
@@ -588,7 +570,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn receive_log_response(&mut self, lr: LogResponse) -> Result<()> {
if lr.current_term == self.current_term && self.role == Role::Leader {
if lr.current_term == self.get_current_term()? && self.role == Role::Leader {
if lr.ok && lr.ack >= self.acked_length.get(&lr.node_id)? {
self.sent_length.insert(&lr.node_id, lr.ack);
self.acked_length.insert(&lr.node_id, lr.ack);
@@ -596,7 +578,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
} else if self.sent_length.get(&lr.node_id)? > 0 {
self.sent_length.insert(&lr.node_id, self.sent_length.get(&lr.node_id)? - 1);
}
} else if lr.current_term > self.current_term {
} else if lr.current_term > self.get_current_term()? {
self.set_current_term(&lr.current_term)?;
if self.role != Role::Listener {
self.role = Role::Follower;
@@ -607,12 +589,14 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
Ok(())
}
fn reset_last_term(&mut self) {
fn reset_last_term(&mut self) -> Result<()> {
self.last_term = 0;
if let Some(log) = self.logs.0.last() {
if let Some(log) = self.get_last_log()? {
self.last_term = log.term;
}
Ok(())
}
fn acks(&self, nodes: HashMap<NodeId, Url>, length: u64) -> HashMap<NodeId, Url> {
@@ -633,7 +617,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut ready: Vec<u64> = vec![];
for len in 1..(self.logs.len() + 1) {
for len in 1..(self.get_logs_len() + 1) {
if self.acks(nodes.clone(), len).len() >= min_acks {
ready.push(len);
}
@@ -645,13 +629,12 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let max_ready = *ready.iter().max().unwrap();
if max_ready > self.commit_length && self.logs.get(max_ready - 1)?.term == self.current_term
if max_ready > self.get_commits_len() &&
self.get_logs()?.get(max_ready - 1)?.term == self.get_current_term()?
{
for i in self.commit_length..max_ready {
self.push_commit(&self.logs.get(i)?.msg).await?;
for i in self.get_commits_len()..max_ready {
self.push_commit(&self.get_logs()?.get(i)?.msg).await?;
}
self.set_commit_length(&max_ready)?;
}
Ok(())
@@ -663,39 +646,33 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
leader_commit: u64,
suffix: &Logs,
) -> Result<()> {
if !suffix.is_empty() && self.logs.len() > prefix_len {
let index = min(self.logs.len(), prefix_len + suffix.len()) - 1;
if self.logs.get(index)?.term != suffix.get(index - prefix_len)?.term {
self.push_logs(&self.logs.slice_to(prefix_len))?;
if !suffix.is_empty() && self.get_logs_len() > prefix_len {
let index = min(self.get_logs_len(), prefix_len + suffix.len()) - 1;
if self.get_logs()?.get(index)?.term != suffix.get(index - prefix_len)?.term {
self.push_logs(&self.get_logs()?.slice_to(prefix_len))?;
}
}
if prefix_len + suffix.len() > self.logs.len() {
for i in (self.logs.len() - prefix_len)..suffix.len() {
if prefix_len + suffix.len() > self.get_logs_len() {
for i in (self.get_logs_len() - prefix_len)..suffix.len() {
self.push_log(&suffix.get(i)?)?;
}
}
if leader_commit > self.commit_length {
for i in self.commit_length..leader_commit {
self.push_commit(&self.logs.get(i)?.msg).await?;
if leader_commit > self.get_commits_len() {
for i in self.get_commits_len()..leader_commit {
self.push_commit(&self.get_logs()?.get(i)?.msg).await?;
}
self.set_commit_length(&leader_commit)?;
}
Ok(())
}
fn set_commit_length(&mut self, i: &u64) -> Result<()> {
self.commit_length = *i;
Ok(())
}
fn set_current_term(&mut self, i: &u64) -> Result<()> {
self.current_term = *i;
self.datastore.current_term.insert(i)
}
fn set_voted_for(&mut self, i: &Option<NodeId>) -> Result<()> {
self.voted_for = i.clone();
self.datastore.voted_for.insert(i)
}
async fn push_commit(&mut self, commit: &[u8]) -> Result<()> {
@@ -704,11 +681,31 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.datastore.commits.insert(&commit)
}
fn push_log(&mut self, log: &Log) -> Result<()> {
self.logs.push(log);
self.datastore.logs.insert(log)
}
fn push_logs(&mut self, logs: &Logs) -> Result<()> {
self.logs = logs.clone();
self.datastore.logs.wipe_insert_all(&logs.to_vec())
}
fn get_current_term(&self) -> Result<u64> {
Ok(self.datastore.current_term.get_last()?.unwrap_or(0))
}
fn get_voted_for(&self) -> Result<Option<NodeId>> {
Ok(self.datastore.voted_for.get_last()?.flatten())
}
fn get_commits_len(&self) -> u64 {
self.datastore.commits.len()
}
fn get_logs(&self) -> Result<Logs> {
Ok(Logs(self.datastore.logs.get_all()?))
}
fn get_logs_len(&self) -> u64 {
self.datastore.logs.len()
}
fn is_logs_empty(&self) -> bool {
self.datastore.logs.is_empty()
}
fn get_last_log(&self) -> Result<Option<Log>> {
self.datastore.logs.get_last()
}
}

View File

@@ -86,6 +86,10 @@ impl<T: Decodable + Encodable> DataTree<T> {
Ok(ret)
}
pub fn len(&self) -> u64 {
self.tree.len() as u64
}
pub fn get_last(&self) -> Result<Option<T>> {
if let Some(found) = self.tree.last()? {
let da = deserialize(&found.1)?;
@@ -93,4 +97,8 @@ impl<T: Decodable + Encodable> DataTree<T> {
}
Ok(None)
}
pub fn is_empty(&self) -> bool {
self.tree.is_empty()
}
}

View File

@@ -102,9 +102,6 @@ impl Logs {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn push(&mut self, d: &Log) {
self.0.push(d.clone());
}
pub fn slice_from(&self, start: u64) -> Option<Self> {
if self.len() >= start {