From 0de966c9c7bce62049ed3676ed1d6c4e4c171a67 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Wed, 21 Feb 2024 13:27:11 +0200 Subject: [PATCH] darkfid: forks sync logic implemented --- bin/darkfid/src/proto/mod.rs | 4 +- bin/darkfid/src/proto/protocol_proposal.rs | 65 +++++++++++++++-- bin/darkfid/src/proto/protocol_sync.rs | 85 +++++++++++++++++++++- bin/darkfid/src/task/sync.rs | 22 +++++- bin/darkfid/src/tests/harness.rs | 23 +++++- bin/darkfid/src/tests/mod.rs | 59 ++++++++++++++- src/validator/consensus.rs | 80 ++++++++++++++++++++ 7 files changed, 325 insertions(+), 13 deletions(-) diff --git a/bin/darkfid/src/proto/mod.rs b/bin/darkfid/src/proto/mod.rs index 2a9e43cab..6dd896067 100644 --- a/bin/darkfid/src/proto/mod.rs +++ b/bin/darkfid/src/proto/mod.rs @@ -22,7 +22,9 @@ pub use protocol_proposal::{ProposalMessage, ProtocolProposal}; /// Validator blockchain sync protocol mod protocol_sync; -pub use protocol_sync::{ProtocolSync, SyncRequest, SyncResponse}; +pub use protocol_sync::{ + ForkSyncRequest, ForkSyncResponse, ProtocolSync, SyncRequest, SyncResponse, +}; /// Transaction broadcast protocol mod protocol_tx; diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index 4f11802ee..93cd5a89e 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -22,7 +22,6 @@ use async_trait::async_trait; use log::debug; use smol::Executor; use tinyjson::JsonValue; -use url::Url; use darkfi::{ impl_p2p_message, @@ -33,10 +32,12 @@ use darkfi::{ rpc::jsonrpc::JsonSubscriber, util::encoding::base64, validator::{consensus::Proposal, ValidatorPtr}, - Result, + Error, Result, }; use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable}; +use crate::proto::{ForkSyncRequest, ForkSyncResponse}; + /// Auxiliary [`Proposal`] wrapper structure used for messaging. #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct ProposalMessage(pub Proposal); @@ -45,10 +46,11 @@ impl_p2p_message!(ProposalMessage, "proposal"); pub struct ProtocolProposal { proposal_sub: MessageSubscription, + proposals_response_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, validator: ValidatorPtr, p2p: P2pPtr, - channel_address: Url, + channel: ChannelPtr, subscriber: JsonSubscriber, } @@ -65,22 +67,25 @@ impl ProtocolProposal { ); let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let proposal_sub = channel.subscribe_msg::().await?; + let proposals_response_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { proposal_sub, + proposals_response_sub, jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()), validator, p2p, - channel_address: channel.address().clone(), + channel, subscriber, })) } async fn handle_receive_proposal(self: Arc) -> Result<()> { debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START"); - let exclude_list = vec![self.channel_address.clone()]; + let exclude_list = vec![self.channel.address().clone()]; loop { let proposal = match self.proposal_sub.receive().await { Ok(v) => v, @@ -111,6 +116,7 @@ impl ProtocolProposal { let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await)); self.subscriber.notify(vec![enc_prop].into()).await; + continue } Err(e) => { debug!( @@ -118,8 +124,57 @@ impl ProtocolProposal { "append_proposal fail: {}", e ); + + match e { + Error::ExtendedChainIndexNotFound => { /* Do nothing */ } + _ => continue, + } } }; + + // If proposal fork chain was not found, we ask our peer for its sequence + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence"); + let last = self.validator.blockchain.last()?; + let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) }; + self.channel.send(&request).await?; + + // TODO: add a timeout here to retry + // Node waits for response + let response = self.proposals_response_sub.receive().await?; + + // Verify and store retrieved proposals + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals"); + + // Response should not be empty + if response.proposals.is_empty() { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence"); + continue + } + + // Sequence length must correspond to requested height + if response.proposals.len() as u64 != proposal_copy.0.block.header.height - last.0 { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous"); + continue + } + + // First proposal must extend canonical + if response.proposals[0].block.header.previous != last.1 { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical"); + continue + } + + // Last proposal must be the same as the one requested + if response.proposals.last().unwrap().hash != proposal_copy.0.hash { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip"); + continue + } + + for proposal in &response.proposals { + self.validator.consensus.append_proposal(proposal).await?; + // Notify subscriber + let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); + self.subscriber.notify(vec![enc_prop].into()).await; + } } } } diff --git a/bin/darkfid/src/proto/protocol_sync.rs b/bin/darkfid/src/proto/protocol_sync.rs index cce9485d4..a0ed25af9 100644 --- a/bin/darkfid/src/proto/protocol_sync.rs +++ b/bin/darkfid/src/proto/protocol_sync.rs @@ -29,7 +29,7 @@ use darkfi::{ ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, }, - validator::ValidatorPtr, + validator::{consensus::Proposal, ValidatorPtr}, Result, }; use darkfi_serial::{SerialDecodable, SerialEncodable}; @@ -55,8 +55,29 @@ pub struct SyncResponse { impl_p2p_message!(SyncResponse, "syncresponse"); +/// Auxiliary structure used for fork chain syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct ForkSyncRequest { + /// Canonical(finalized) tip block hash + pub tip: blake3::Hash, + /// Optional fork tip block hash + pub fork_tip: Option, +} + +impl_p2p_message!(ForkSyncRequest, "forksyncrequest"); + +/// Auxiliary structure used for fork chain syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct ForkSyncResponse { + /// Response fork proposals + pub proposals: Vec, +} + +impl_p2p_message!(ForkSyncResponse, "forksyncresponse"); + pub struct ProtocolSync { request_sub: MessageSubscription, + fork_request_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, validator: ValidatorPtr, channel: ChannelPtr, @@ -70,11 +91,14 @@ impl ProtocolSync { ); let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let request_sub = channel.subscribe_msg::().await?; + let fork_request_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { request_sub, + fork_request_sub, jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()), validator, channel, @@ -127,6 +151,61 @@ impl ProtocolSync { }; } } + + async fn handle_receive_fork_request(self: Arc) -> Result<()> { + debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START"); + loop { + let request = match self.fork_request_sub.receive().await { + Ok(v) => v, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "recv fail: {}", + e + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*self.validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "Node still syncing blockchain, skipping..." + ); + continue + } + + // If a fork tip is provided, grab its fork proposals sequence. + // Otherwise, grab best fork proposals sequence. + let proposals = match request.fork_tip { + Some(fork_tip) => { + self.validator.consensus.get_fork_proposals(request.tip, fork_tip).await + } + None => self.validator.consensus.get_best_fork_proposals(request.tip).await, + }; + let proposals = match proposals { + Ok(p) => p, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_request", + "Getting fork proposals failed: {}", + e + ); + continue + } + }; + + let response = ForkSyncResponse { proposals }; + if let Err(e) = self.channel.send(&response).await { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "channel send fail: {}", + e + ) + }; + } + } } #[async_trait] @@ -135,6 +214,10 @@ impl ProtocolBase for ProtocolSync { debug!(target: "darkfid::proto::protocol_sync::start", "START"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; + self.jobsman + .clone() + .spawn(self.clone().handle_receive_fork_request(), executor.clone()) + .await; debug!(target: "darkfid::proto::protocol_sync::start", "END"); Ok(()) } diff --git a/bin/darkfid/src/task/sync.rs b/bin/darkfid/src/task/sync.rs index c081b9fa2..39602f863 100644 --- a/bin/darkfid/src/task/sync.rs +++ b/bin/darkfid/src/task/sync.rs @@ -22,7 +22,7 @@ use log::{debug, info, warn}; use tinyjson::JsonValue; use crate::{ - proto::{SyncRequest, SyncResponse}, + proto::{ForkSyncRequest, ForkSyncResponse, SyncRequest, SyncResponse}, Darkfid, }; @@ -44,8 +44,11 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { // Communication setup let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let block_response_sub = channel.subscribe_msg::().await?; + let proposals_response_sub = channel.subscribe_msg::().await?; let notif_sub = node.subscribers.get("blocks").unwrap(); + let proposal_notif_sub = node.subscribers.get("proposals").unwrap(); // TODO: make this parallel and use a head selection method, // for example use a manual known head and only connect to nodes @@ -86,6 +89,23 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { last = last_received; } + // Node syncs current best fork + let request = ForkSyncRequest { tip: last.1, fork_tip: None }; + channel.send(&request).await?; + + // TODO: add a timeout here to retry + // Node waits for response + let response = proposals_response_sub.receive().await?; + + // Verify and store retrieved proposals + debug!(target: "darkfid::task::sync_task", "Processing received proposals"); + for proposal in &response.proposals { + node.validator.consensus.append_proposal(proposal).await?; + // Notify subscriber + let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); + proposal_notif_sub.notify(vec![enc_prop].into()).await; + } + *node.validator.synced.write().await = true; info!(target: "darkfid::task::sync_task", "Blockchain synced!"); Ok(()) diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index b84d9274b..f5163cf54 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -149,12 +149,28 @@ impl Harness { let alice_blockchain_len = alice.blockchain.len(); assert_eq!(alice_blockchain_len, bob.blockchain.len()); - // Last block is not finalized yet - assert_eq!(alice_blockchain_len, total_blocks - 1); + assert_eq!(alice_blockchain_len, total_blocks); Ok(()) } + pub async fn validate_fork_chains(&self, total_forks: usize, fork_sizes: Vec) { + let alice = &self.alice.validator.consensus.forks.read().await; + let bob = &self.bob.validator.consensus.forks.read().await; + + let alice_forks_len = alice.len(); + assert_eq!(alice_forks_len, bob.len()); + assert_eq!(alice_forks_len, total_forks); + + for (index, fork) in alice.iter().enumerate() { + assert_eq!(fork.proposals.len(), fork_sizes[index]); + } + + for (index, fork) in bob.iter().enumerate() { + assert_eq!(fork.proposals.len(), fork_sizes[index]); + } + } + pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> { // We append the block as a proposal to Alice, // and then we broadcast it to rest nodes @@ -163,11 +179,12 @@ impl Harness { self.alice.validator.consensus.append_proposal(&proposal).await?; let message = ProposalMessage(proposal); self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await; + self.alice.sync_p2p.as_ref().broadcast(&message).await; } // Sleep a bit so blocks can be propagated and then // trigger finalization check to Alice and Bob - sleep(1).await; + sleep(5).await; self.alice.validator.finalization().await?; self.bob.validator.finalization().await?; diff --git a/bin/darkfid/src/tests/mod.rs b/bin/darkfid/src/tests/mod.rs index b5b5a410b..3b624cb60 100644 --- a/bin/darkfid/src/tests/mod.rs +++ b/bin/darkfid/src/tests/mod.rs @@ -54,10 +54,20 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { let block4 = th.generate_next_block(&block3).await?; // Add them to nodes - th.add_blocks(&vec![block1, block2, block3, block4]).await?; + th.add_blocks(&vec![block1, block2, block3.clone(), block4.clone()]).await?; + + // Extend current fork sequence + let block5 = th.generate_next_block(&block4).await?; + // Create a new fork extending canonical + let block6 = th.generate_next_block(&block3).await?; + // Add them to nodes + th.add_blocks(&vec![block5, block6.clone()]).await?; // Validate chains - th.validate_chains(5).await?; + // Last blocks are not finalized yet + th.validate_chains(4).await?; + // Nodes must have one fork with 2 blocks and one with 1 block + th.validate_fork_chains(2, vec![2, 1]).await; // We are going to create a third node and try to sync from the previous two let mut sync_settings = @@ -73,8 +83,53 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { // Verify node synced let alice = &th.alice.validator; let charlie = &charlie.validator; + charlie.validate_blockchain(pow_target, pow_fixed_difficulty.clone()).await?; + assert_eq!(alice.blockchain.len(), charlie.blockchain.len()); + // Node must have one fork with 2 blocks + let charlie_forks = charlie.consensus.forks.read().await; + assert_eq!(charlie_forks.len(), 1); + assert_eq!(charlie_forks[0].proposals.len(), 2); + drop(charlie_forks); + + // Extend the small fork sequence and add it to nodes + let block7 = th.generate_next_block(&block6).await?; + th.add_blocks(&vec![block7.clone()]).await?; + + // Nodes must have two forks with 2 blocks each + th.validate_fork_chains(2, vec![2, 2]).await; + // Charlie didn't originaly have the fork, but it + // should be synced when its proposal was received + let charlie_forks = charlie.consensus.forks.read().await; + assert_eq!(charlie_forks.len(), 2); + assert_eq!(charlie_forks[0].proposals.len(), 2); + assert_eq!(charlie_forks[1].proposals.len(), 2); + drop(charlie_forks); + + // Extend the second fork and add it to nodes + let block8 = th.generate_next_block(&block7).await?; + th.add_blocks(&vec![block8.clone()]).await?; + + // Nodes must have executed finalization, so we validate their chains + th.validate_chains(6).await?; + let bob = &th.bob.validator; + let last = alice.blockchain.last()?.1; + assert_eq!(last, block7.hash()?); + assert_eq!(last, bob.blockchain.last()?.1); + // Nodes must have one fork with 1 block + th.validate_fork_chains(1, vec![1]).await; + let last_proposal = *alice.consensus.forks.read().await[0].proposals.last().unwrap(); + assert_eq!(last_proposal, block8.hash()?); + assert_eq!(&last_proposal, bob.consensus.forks.read().await[0].proposals.last().unwrap()); + + // Same for Charlie + charlie.finalization().await?; charlie.validate_blockchain(pow_target, pow_fixed_difficulty).await?; assert_eq!(alice.blockchain.len(), charlie.blockchain.len()); + assert_eq!(last, charlie.blockchain.last()?.1); + let charlie_forks = charlie.consensus.forks.read().await; + assert_eq!(charlie_forks.len(), 1); + assert_eq!(charlie_forks[0].proposals.len(), 1); + assert_eq!(last_proposal, charlie_forks[0].proposals[0]); // Thanks for reading Ok(()) diff --git a/src/validator/consensus.rs b/src/validator/consensus.rs index d7c723d4d..bcaabe345 100644 --- a/src/validator/consensus.rs +++ b/src/validator/consensus.rs @@ -146,6 +146,13 @@ impl Consensus { lock[i] = fork; } None => { + // Check if fork already exists + for f in lock.iter() { + if f.proposals == fork.proposals { + drop(lock); + return Err(Error::ProposalAlreadyExists) + } + } lock.push(fork); } } @@ -267,6 +274,79 @@ impl Consensus { Ok(finalized) } + + /// Auxilliary function to retrieve a fork proposals. + /// If provided tip is not the canonical(finalized), or fork doesn't exists, + /// an empty vector is returned. + pub async fn get_fork_proposals( + &self, + tip: blake3::Hash, + fork_tip: blake3::Hash, + ) -> Result> { + // Tip must be canonical(finalized) blockchain last + if self.blockchain.last()?.1 != tip { + return Ok(vec![]) + } + + // Grab a lock over current forks + let forks = self.forks.read().await; + + // Check if node has any forks + if forks.is_empty() { + drop(forks); + return Ok(vec![]) + } + + // Find fork by its tip + for fork in forks.iter() { + if fork.proposals.last() == Some(&fork_tip) { + // Grab its proposals + let blocks = fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?; + let mut ret = Vec::with_capacity(blocks.len()); + for block in blocks { + ret.push(Proposal::new(block)?); + } + drop(forks); + return Ok(ret) + } + } + + // Fork was not found + Ok(vec![]) + } + + /// Auxilliary function to retrieve current best fork proposals. + /// If multiple best forks exist, grab the proposals of the first one + /// If provided tip is not the canonical(finalized), or no forks exist, + /// an empty vector is returned. + pub async fn get_best_fork_proposals(&self, tip: blake3::Hash) -> Result> { + // Tip must be canonical(finalized) blockchain last + if self.blockchain.last()?.1 != tip { + return Ok(vec![]) + } + + // Grab a lock over current forks + let forks = self.forks.read().await; + + // Check if node has any forks + if forks.is_empty() { + drop(forks); + return Ok(vec![]) + } + + // Grab best fork + let forks_indexes = best_forks_indexes(&forks)?; + let fork = &forks[forks_indexes[0]]; + + // Grab its proposals + let blocks = fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?; + let mut ret = Vec::with_capacity(blocks.len()); + for block in blocks { + ret.push(Proposal::new(block)?); + } + + Ok(ret) + } } /// This struct represents a block proposal, used for consensus.