darkfid: forks sync logic implemented

This commit is contained in:
skoupidi
2024-02-21 13:27:11 +02:00
parent ffcefada18
commit 0de966c9c7
7 changed files with 325 additions and 13 deletions

View File

@@ -22,7 +22,9 @@ pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
/// Validator blockchain sync protocol
mod protocol_sync;
pub use protocol_sync::{ProtocolSync, SyncRequest, SyncResponse};
pub use protocol_sync::{
ForkSyncRequest, ForkSyncResponse, ProtocolSync, SyncRequest, SyncResponse,
};
/// Transaction broadcast protocol
mod protocol_tx;

View File

@@ -22,7 +22,6 @@ use async_trait::async_trait;
use log::debug;
use smol::Executor;
use tinyjson::JsonValue;
use url::Url;
use darkfi::{
impl_p2p_message,
@@ -33,10 +32,12 @@ use darkfi::{
rpc::jsonrpc::JsonSubscriber,
util::encoding::base64,
validator::{consensus::Proposal, ValidatorPtr},
Result,
Error, Result,
};
use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
use crate::proto::{ForkSyncRequest, ForkSyncResponse};
/// Auxiliary [`Proposal`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ProposalMessage(pub Proposal);
@@ -45,10 +46,11 @@ impl_p2p_message!(ProposalMessage, "proposal");
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<ProposalMessage>,
proposals_response_sub: MessageSubscription<ForkSyncResponse>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
channel: ChannelPtr,
subscriber: JsonSubscriber,
}
@@ -65,22 +67,25 @@ impl ProtocolProposal {
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<ProposalMessage>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let proposal_sub = channel.subscribe_msg::<ProposalMessage>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
Ok(Arc::new(Self {
proposal_sub,
proposals_response_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()),
validator,
p2p,
channel_address: channel.address().clone(),
channel,
subscriber,
}))
}
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
let exclude_list = vec![self.channel_address.clone()];
let exclude_list = vec![self.channel.address().clone()];
loop {
let proposal = match self.proposal_sub.receive().await {
Ok(v) => v,
@@ -111,6 +116,7 @@ impl ProtocolProposal {
let enc_prop =
JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await));
self.subscriber.notify(vec![enc_prop].into()).await;
continue
}
Err(e) => {
debug!(
@@ -118,8 +124,57 @@ impl ProtocolProposal {
"append_proposal fail: {}",
e
);
match e {
Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
_ => continue,
}
}
};
// If proposal fork chain was not found, we ask our peer for its sequence
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence");
let last = self.validator.blockchain.last()?;
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) };
self.channel.send(&request).await?;
// TODO: add a timeout here to retry
// Node waits for response
let response = self.proposals_response_sub.receive().await?;
// Verify and store retrieved proposals
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals");
// Response should not be empty
if response.proposals.is_empty() {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence");
continue
}
// Sequence length must correspond to requested height
if response.proposals.len() as u64 != proposal_copy.0.block.header.height - last.0 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous");
continue
}
// First proposal must extend canonical
if response.proposals[0].block.header.previous != last.1 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical");
continue
}
// Last proposal must be the same as the one requested
if response.proposals.last().unwrap().hash != proposal_copy.0.hash {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip");
continue
}
for proposal in &response.proposals {
self.validator.consensus.append_proposal(proposal).await?;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
self.subscriber.notify(vec![enc_prop].into()).await;
}
}
}
}

View File

@@ -29,7 +29,7 @@ use darkfi::{
ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
validator::ValidatorPtr,
validator::{consensus::Proposal, ValidatorPtr},
Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
@@ -55,8 +55,29 @@ pub struct SyncResponse {
impl_p2p_message!(SyncResponse, "syncresponse");
/// Auxiliary structure used for fork chain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncRequest {
/// Canonical(finalized) tip block hash
pub tip: blake3::Hash,
/// Optional fork tip block hash
pub fork_tip: Option<blake3::Hash>,
}
impl_p2p_message!(ForkSyncRequest, "forksyncrequest");
/// Auxiliary structure used for fork chain syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ForkSyncResponse {
/// Response fork proposals
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
pub struct ProtocolSync {
request_sub: MessageSubscription<SyncRequest>,
fork_request_sub: MessageSubscription<ForkSyncRequest>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
channel: ChannelPtr,
@@ -70,11 +91,14 @@ impl ProtocolSync {
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<SyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
let request_sub = channel.subscribe_msg::<SyncRequest>().await?;
let fork_request_sub = channel.subscribe_msg::<ForkSyncRequest>().await?;
Ok(Arc::new(Self {
request_sub,
fork_request_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()),
validator,
channel,
@@ -127,6 +151,61 @@ impl ProtocolSync {
};
}
}
async fn handle_receive_fork_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
loop {
let request = match self.fork_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Node still syncing blockchain, skipping..."
);
continue
}
// If a fork tip is provided, grab its fork proposals sequence.
// Otherwise, grab best fork proposals sequence.
let proposals = match request.fork_tip {
Some(fork_tip) => {
self.validator.consensus.get_fork_proposals(request.tip, fork_tip).await
}
None => self.validator.consensus.get_best_fork_proposals(request.tip).await,
};
let proposals = match proposals {
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Getting fork proposals failed: {}",
e
);
continue
}
};
let response = ForkSyncResponse { proposals };
if let Err(e) = self.channel.send(&response).await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"channel send fail: {}",
e
)
};
}
}
}
#[async_trait]
@@ -135,6 +214,10 @@ impl ProtocolBase for ProtocolSync {
debug!(target: "darkfid::proto::protocol_sync::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_fork_request(), executor.clone())
.await;
debug!(target: "darkfid::proto::protocol_sync::start", "END");
Ok(())
}

View File

@@ -22,7 +22,7 @@ use log::{debug, info, warn};
use tinyjson::JsonValue;
use crate::{
proto::{SyncRequest, SyncResponse},
proto::{ForkSyncRequest, ForkSyncResponse, SyncRequest, SyncResponse},
Darkfid,
};
@@ -44,8 +44,11 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
// Communication setup
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<SyncResponse>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let block_response_sub = channel.subscribe_msg::<SyncResponse>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
let notif_sub = node.subscribers.get("blocks").unwrap();
let proposal_notif_sub = node.subscribers.get("proposals").unwrap();
// TODO: make this parallel and use a head selection method,
// for example use a manual known head and only connect to nodes
@@ -86,6 +89,23 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
last = last_received;
}
// Node syncs current best fork
let request = ForkSyncRequest { tip: last.1, fork_tip: None };
channel.send(&request).await?;
// TODO: add a timeout here to retry
// Node waits for response
let response = proposals_response_sub.receive().await?;
// Verify and store retrieved proposals
debug!(target: "darkfid::task::sync_task", "Processing received proposals");
for proposal in &response.proposals {
node.validator.consensus.append_proposal(proposal).await?;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
proposal_notif_sub.notify(vec![enc_prop].into()).await;
}
*node.validator.synced.write().await = true;
info!(target: "darkfid::task::sync_task", "Blockchain synced!");
Ok(())

View File

@@ -149,12 +149,28 @@ impl Harness {
let alice_blockchain_len = alice.blockchain.len();
assert_eq!(alice_blockchain_len, bob.blockchain.len());
// Last block is not finalized yet
assert_eq!(alice_blockchain_len, total_blocks - 1);
assert_eq!(alice_blockchain_len, total_blocks);
Ok(())
}
pub async fn validate_fork_chains(&self, total_forks: usize, fork_sizes: Vec<usize>) {
let alice = &self.alice.validator.consensus.forks.read().await;
let bob = &self.bob.validator.consensus.forks.read().await;
let alice_forks_len = alice.len();
assert_eq!(alice_forks_len, bob.len());
assert_eq!(alice_forks_len, total_forks);
for (index, fork) in alice.iter().enumerate() {
assert_eq!(fork.proposals.len(), fork_sizes[index]);
}
for (index, fork) in bob.iter().enumerate() {
assert_eq!(fork.proposals.len(), fork_sizes[index]);
}
}
pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> {
// We append the block as a proposal to Alice,
// and then we broadcast it to rest nodes
@@ -163,11 +179,12 @@ impl Harness {
self.alice.validator.consensus.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await;
self.alice.sync_p2p.as_ref().broadcast(&message).await;
}
// Sleep a bit so blocks can be propagated and then
// trigger finalization check to Alice and Bob
sleep(1).await;
sleep(5).await;
self.alice.validator.finalization().await?;
self.bob.validator.finalization().await?;

View File

@@ -54,10 +54,20 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let block4 = th.generate_next_block(&block3).await?;
// Add them to nodes
th.add_blocks(&vec![block1, block2, block3, block4]).await?;
th.add_blocks(&vec![block1, block2, block3.clone(), block4.clone()]).await?;
// Extend current fork sequence
let block5 = th.generate_next_block(&block4).await?;
// Create a new fork extending canonical
let block6 = th.generate_next_block(&block3).await?;
// Add them to nodes
th.add_blocks(&vec![block5, block6.clone()]).await?;
// Validate chains
th.validate_chains(5).await?;
// Last blocks are not finalized yet
th.validate_chains(4).await?;
// Nodes must have one fork with 2 blocks and one with 1 block
th.validate_fork_chains(2, vec![2, 1]).await;
// We are going to create a third node and try to sync from the previous two
let mut sync_settings =
@@ -73,8 +83,53 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
// Verify node synced
let alice = &th.alice.validator;
let charlie = &charlie.validator;
charlie.validate_blockchain(pow_target, pow_fixed_difficulty.clone()).await?;
assert_eq!(alice.blockchain.len(), charlie.blockchain.len());
// Node must have one fork with 2 blocks
let charlie_forks = charlie.consensus.forks.read().await;
assert_eq!(charlie_forks.len(), 1);
assert_eq!(charlie_forks[0].proposals.len(), 2);
drop(charlie_forks);
// Extend the small fork sequence and add it to nodes
let block7 = th.generate_next_block(&block6).await?;
th.add_blocks(&vec![block7.clone()]).await?;
// Nodes must have two forks with 2 blocks each
th.validate_fork_chains(2, vec![2, 2]).await;
// Charlie didn't originaly have the fork, but it
// should be synced when its proposal was received
let charlie_forks = charlie.consensus.forks.read().await;
assert_eq!(charlie_forks.len(), 2);
assert_eq!(charlie_forks[0].proposals.len(), 2);
assert_eq!(charlie_forks[1].proposals.len(), 2);
drop(charlie_forks);
// Extend the second fork and add it to nodes
let block8 = th.generate_next_block(&block7).await?;
th.add_blocks(&vec![block8.clone()]).await?;
// Nodes must have executed finalization, so we validate their chains
th.validate_chains(6).await?;
let bob = &th.bob.validator;
let last = alice.blockchain.last()?.1;
assert_eq!(last, block7.hash()?);
assert_eq!(last, bob.blockchain.last()?.1);
// Nodes must have one fork with 1 block
th.validate_fork_chains(1, vec![1]).await;
let last_proposal = *alice.consensus.forks.read().await[0].proposals.last().unwrap();
assert_eq!(last_proposal, block8.hash()?);
assert_eq!(&last_proposal, bob.consensus.forks.read().await[0].proposals.last().unwrap());
// Same for Charlie
charlie.finalization().await?;
charlie.validate_blockchain(pow_target, pow_fixed_difficulty).await?;
assert_eq!(alice.blockchain.len(), charlie.blockchain.len());
assert_eq!(last, charlie.blockchain.last()?.1);
let charlie_forks = charlie.consensus.forks.read().await;
assert_eq!(charlie_forks.len(), 1);
assert_eq!(charlie_forks[0].proposals.len(), 1);
assert_eq!(last_proposal, charlie_forks[0].proposals[0]);
// Thanks for reading
Ok(())

View File

@@ -146,6 +146,13 @@ impl Consensus {
lock[i] = fork;
}
None => {
// Check if fork already exists
for f in lock.iter() {
if f.proposals == fork.proposals {
drop(lock);
return Err(Error::ProposalAlreadyExists)
}
}
lock.push(fork);
}
}
@@ -267,6 +274,79 @@ impl Consensus {
Ok(finalized)
}
/// Auxilliary function to retrieve a fork proposals.
/// If provided tip is not the canonical(finalized), or fork doesn't exists,
/// an empty vector is returned.
pub async fn get_fork_proposals(
&self,
tip: blake3::Hash,
fork_tip: blake3::Hash,
) -> Result<Vec<Proposal>> {
// Tip must be canonical(finalized) blockchain last
if self.blockchain.last()?.1 != tip {
return Ok(vec![])
}
// Grab a lock over current forks
let forks = self.forks.read().await;
// Check if node has any forks
if forks.is_empty() {
drop(forks);
return Ok(vec![])
}
// Find fork by its tip
for fork in forks.iter() {
if fork.proposals.last() == Some(&fork_tip) {
// Grab its proposals
let blocks = fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?;
let mut ret = Vec::with_capacity(blocks.len());
for block in blocks {
ret.push(Proposal::new(block)?);
}
drop(forks);
return Ok(ret)
}
}
// Fork was not found
Ok(vec![])
}
/// Auxilliary function to retrieve current best fork proposals.
/// If multiple best forks exist, grab the proposals of the first one
/// If provided tip is not the canonical(finalized), or no forks exist,
/// an empty vector is returned.
pub async fn get_best_fork_proposals(&self, tip: blake3::Hash) -> Result<Vec<Proposal>> {
// Tip must be canonical(finalized) blockchain last
if self.blockchain.last()?.1 != tip {
return Ok(vec![])
}
// Grab a lock over current forks
let forks = self.forks.read().await;
// Check if node has any forks
if forks.is_empty() {
drop(forks);
return Ok(vec![])
}
// Grab best fork
let forks_indexes = best_forks_indexes(&forks)?;
let fork = &forks[forks_indexes[0]];
// Grab its proposals
let blocks = fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?;
let mut ret = Vec::with_capacity(blocks.len());
for block in blocks {
ret.push(Proposal::new(block)?);
}
Ok(ret)
}
}
/// This struct represents a block proposal, used for consensus.