diff --git a/script/research/validatord/simulation.sh b/script/research/validatord/simulation.sh index 73207703c..e6ba89daf 100755 --- a/script/research/validatord/simulation.sh +++ b/script/research/validatord/simulation.sh @@ -4,12 +4,12 @@ nodes=4 -# Copying the node state files with a blockchain containing only the genesis block. -bound=$(($nodes - 1)) -for i in $(eval echo "{0..$bound}") -do - rm -rf ~/.config/darkfi/validatord_db_$i -done +# Copying the node state files with a blockchain containing only the genesis block. Uncomment for fresh runs. +#bound=$(($nodes - 1)) +#for i in $(eval echo "{0..$bound}") +#do +# rm -rf ~/.config/darkfi/validatord_db_$i +#done # PIDs array pids=() @@ -28,6 +28,7 @@ do cargo run -- \ --accept 0.0.0.0:1100$i \ --caccept 0.0.0.0:1200$i \ + --seeds 127.0.0.1:11000 \ --cseeds 127.0.0.1:12000 \ --rpc 127.0.0.1:666$i \ --external 127.0.0.1:1100$i \ @@ -55,6 +56,7 @@ bound=$(($nodes-1)) cargo run -- \ --accept 0.0.0.0:1100$bound \ --caccept 0.0.0.0:1200$bound \ + --seeds 127.0.0.1:11000 \ --cseeds 127.0.0.1:12000 \ --rpc 127.0.0.1:666$bound \ --external 127.0.0.1:1100$bound \ diff --git a/script/research/validatord/src/main.rs b/script/research/validatord/src/main.rs index a9e1d023d..2b024ba51 100644 --- a/script/research/validatord/src/main.rs +++ b/script/research/validatord/src/main.rs @@ -12,6 +12,8 @@ use structopt_toml::StructOptToml; use darkfi::{ consensus::{ + block::{BlockOrder, BlockResponse}, + blockchain::{ForkOrder, ForkResponse}, participant::Participant, state::{ValidatorState, ValidatorStatePtr}, tx::Tx, @@ -36,7 +38,8 @@ use darkfi::{ use validatord::protocols::{ protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal, - protocol_tx::ProtocolTx, protocol_vote::ProtocolVote, + protocol_sync::ProtocolSync, protocol_sync_forks::ProtocolSyncForks, protocol_tx::ProtocolTx, + protocol_vote::ProtocolVote, }; const CONFIG_FILE: &str = r"validatord_config.toml"; @@ -104,7 +107,90 @@ struct Opt { verbose: u8, } +async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Node starts syncing blockchain..."); + // We retrieve p2p network connected channels, so we can use it to parallelize downloads + // Using len here because is_empty() uses unstable library feature 'exact_size_is_empty' + if p2p.channels().lock().await.values().len() != 0 { + // Currently we will use just the last channel + let channel = p2p.channels().lock().await.values().last().unwrap().clone(); + + // Communication setup + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + let response_sub = channel + .subscribe_msg::() + .await + .expect("Missing BlockResponse dispatcher!"); + + // Nodes sends the last known block hash of the canonical blockchain + // and loops until the respond is the same block (used to utilize batch requests) + let mut last = state.read().unwrap().blockchain.last()?.unwrap(); + info!("Last known block: {:?} - {:?}", last.0, last.1); + loop { + // Node creates a BlockOrder and sends it + let order = BlockOrder { sl: last.0, block: last.1 }; + channel.send(order).await?; + + // Node stores responce data. Extra validations can be added here. + let response = response_sub.receive().await?; + for info in &response.blocks { + state.write().unwrap().blockchain.add_by_info(info.clone())?; + } + let last_received = state.read().unwrap().blockchain.last()?.unwrap(); + info!("Last received block: {:?} - {:?}", last_received.0, last_received.1); + if last == last_received { + break + } + last = last_received; + } + } else { + info!("Node is not connected to other nodes."); + } + + info!("Node synced!"); + Ok(()) +} + +async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Node starts syncing forks..."); + // Using len here because is_empty() uses unstable library feature 'exact_size_is_empty' + if p2p.channels().lock().await.values().len() != 0 { + // Nodes ask for the fork chains of the last channel peer + let channel = p2p.channels().lock().await.values().last().unwrap().clone(); + + // Communication setup + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + let response_sub = channel + .subscribe_msg::() + .await + .expect("Missing ForkResponse dispatcher!"); + + // Node creates a BlockOrder and sends it + let order = ForkOrder { id: state.read().unwrap().id }; + channel.send(order).await?; + + // Node stores responce data. Extra validations can be added here. + let response = response_sub.receive().await?; + state.write().unwrap().consensus.proposals = response.proposals.clone(); + } else { + info!("Node is not connected to other nodes, resetting consensus state."); + state.write().unwrap().reset_consensus_state()?; + } + + info!("Node synced!"); + Ok(()) +} + async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { + // Node syncs its fork chains + let result = syncing_forks_task(p2p.clone(), state.clone()).await; + match result { + Ok(()) => (), + Err(e) => error!("Sync forks failed. Error: {:?}", e), + } + // Node signals the network that it starts participating let participant = Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch()); @@ -142,7 +228,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { match vote { Ok(x) => { if x.is_none() { - debug!("Node did not vote for the proposed block."); + error!("Node did not vote for the proposed block."); } else { let vote = x.unwrap(); let result = state.write().unwrap().receive_vote(&vote); @@ -224,57 +310,94 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { let state = ValidatorState::new(database_path, id, genesis).unwrap(); // Main P2P registry setup - let p2p = net::P2p::new(subnet_settings).await; - let _registry = p2p.protocol_registry(); + let main_p2p = net::P2p::new(subnet_settings).await; + let registry = main_p2p.protocol_registry(); + + // Adding ProtocolSync to the registry + let state2 = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, main_p2p| { + let state = state2.clone(); + async move { ProtocolSync::init(channel, state, main_p2p).await } + }) + .await; + + // Performs seed session + main_p2p.clone().start(executor.clone()).await?; + // Actual main p2p session + let ex2 = executor.clone(); + let p2p = main_p2p.clone(); + executor + .spawn(async move { + if let Err(err) = p2p.run(ex2).await { + error!("Error: p2p run failed {}", err); + } + }) + .detach(); + + // Node starts syncing + let state2 = state.clone(); + syncing_task(main_p2p.clone(), state2).await?; // Consensus P2P registry setup - let p2p = net::P2p::new(consensus_subnet_settings).await; - let registry = p2p.protocol_registry(); + let consensus_p2p = net::P2p::new(consensus_subnet_settings).await; + let registry = consensus_p2p.protocol_registry(); // Adding ProtocolTx to the registry let state2 = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(net::SESSION_ALL, move |channel, consensus_p2p| { let state = state2.clone(); - async move { ProtocolTx::init(channel, state, p2p).await } + async move { ProtocolTx::init(channel, state, consensus_p2p).await } }) .await; // Adding PropotolVote to the registry + let p2p = main_p2p.clone(); let state2 = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(net::SESSION_ALL, move |channel, consensus_p2p| { let state = state2.clone(); - async move { ProtocolVote::init(channel, state, p2p).await } + let main_p2p = p2p.clone(); + async move { ProtocolVote::init(channel, state, main_p2p, consensus_p2p).await } }) .await; // Adding ProtocolProposal to the registry let state2 = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(net::SESSION_ALL, move |channel, consensus_p2p| { let state = state2.clone(); - async move { ProtocolProposal::init(channel, state, p2p).await } + async move { ProtocolProposal::init(channel, state, consensus_p2p).await } }) .await; // Adding ProtocolParticipant to the registry let state2 = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(net::SESSION_ALL, move |channel, consensus_p2p| { let state = state2.clone(); - async move { ProtocolParticipant::init(channel, state, p2p).await } + async move { ProtocolParticipant::init(channel, state, consensus_p2p).await } + }) + .await; + + // Adding ProtocolSyncForks to the registry + let state2 = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, _consensus_p2p| { + let state = state2.clone(); + async move { ProtocolSyncForks::init(channel, state).await } }) .await; // Performs seed session - p2p.clone().start(executor.clone()).await?; + consensus_p2p.clone().start(executor.clone()).await?; // Actual consensus p2p session let ex2 = executor.clone(); - let p2p2 = p2p.clone(); + let p2p = consensus_p2p.clone(); executor .spawn(async move { - if let Err(err) = p2p2.run(ex2).await { + if let Err(err) = p2p.run(ex2).await { error!("Error: p2p run failed {}", err); } }) @@ -285,14 +408,14 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { let ex3 = ex2.clone(); let rpc_interface = Arc::new(JsonRpcInterface { state: state.clone(), - p2p: p2p.clone(), + p2p: consensus_p2p.clone(), _rpc_listen_addr: opts.rpc, }); executor .spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await }) .detach(); - proposal_task(p2p, state).await; + proposal_task(consensus_p2p, state).await; Ok(()) } diff --git a/script/research/validatord/src/protocols/mod.rs b/script/research/validatord/src/protocols/mod.rs index c84bbfaa9..64243e37e 100644 --- a/script/research/validatord/src/protocols/mod.rs +++ b/script/research/validatord/src/protocols/mod.rs @@ -1,9 +1,13 @@ pub mod protocol_participant; pub mod protocol_proposal; +pub mod protocol_sync; +pub mod protocol_sync_forks; pub mod protocol_tx; pub mod protocol_vote; pub use protocol_participant::ProtocolParticipant; pub use protocol_proposal::ProtocolProposal; +pub use protocol_sync::ProtocolSync; +pub use protocol_sync_forks::ProtocolSyncForks; pub use protocol_tx::ProtocolTx; pub use protocol_vote::ProtocolVote; diff --git a/script/research/validatord/src/protocols/protocol_sync.rs b/script/research/validatord/src/protocols/protocol_sync.rs new file mode 100644 index 000000000..217b4f953 --- /dev/null +++ b/script/research/validatord/src/protocols/protocol_sync.rs @@ -0,0 +1,115 @@ +use async_executor::Executor; +use async_trait::async_trait; + +use darkfi::{ + consensus::{ + block::{BlockInfo, BlockOrder, BlockResponse}, + state::ValidatorStatePtr, + }, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; +use log::debug; +use std::sync::Arc; + +// Constant defining how many blocks we send during syncing. +const BATCH: u64 = 10; + +pub struct ProtocolSync { + channel: ChannelPtr, + order_sub: MessageSubscription, + block_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + _p2p: P2pPtr, +} + +impl ProtocolSync { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + _p2p: P2pPtr, + ) -> ProtocolBasePtr { + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + message_subsytem.add_dispatch::().await; + + let order_sub = + channel.subscribe_msg::().await.expect("Missing BlockOrder dispatcher!"); + let block_sub = + channel.subscribe_msg::().await.expect("Missing BlockInfo dispatcher!"); + + Arc::new(Self { + channel: channel.clone(), + order_sub, + block_sub, + jobsman: ProtocolJobsManager::new("SyncProtocol", channel), + state, + _p2p, + }) + } + + async fn handle_receive_order(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolSync::handle_receive_tx() [START]"); + loop { + let order = self.order_sub.receive().await?; + + debug!( + target: "ircd", + "ProtocolSync::handle_receive_order() received {:?}", + order + ); + + // Extra validations can be added here. + let key = order.sl; + let blocks = self.state.read().unwrap().blockchain.get_with_info(key, BATCH)?; + let response = BlockResponse { blocks }; + self.channel.send(response).await?; + } + } + + async fn handle_receive_block(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolSync::handle_receive_block() [START]"); + loop { + let info = self.block_sub.receive().await?; + + debug!( + target: "ircd", + "ProtocolSync::handle_receive_block() received {:?}", + info + ); + + // TODO: Following code should be executed only by replicators, not consensus nodes. + // Commented for now, as to not mess consensus testing. + // (Don't forget to remove _ from _p2p) + /* + // Node stores finalized block, if it doesn't exists (checking by slot). + // Extra validations can be added here. + let info_copy = (*info).clone(); + if !self.state.read().unwrap().blockchain.has_block(&info_copy)? { + self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?; + self.p2p.broadcast(info_copy).await?; + } + */ + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSync { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "ircd", "ProtocolSync::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; + debug!(target: "ircd", "ProtocolSync::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolSync" + } +} diff --git a/script/research/validatord/src/protocols/protocol_sync_forks.rs b/script/research/validatord/src/protocols/protocol_sync_forks.rs new file mode 100644 index 000000000..0529e6e17 --- /dev/null +++ b/script/research/validatord/src/protocols/protocol_sync_forks.rs @@ -0,0 +1,73 @@ +use async_executor::Executor; +use async_trait::async_trait; + +use darkfi::{ + consensus::{ + blockchain::{ForkOrder, ForkResponse}, + state::ValidatorStatePtr, + }, + net::{ + ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, + ProtocolJobsManagerPtr, + }, + Result, +}; +use log::debug; +use std::sync::Arc; + +pub struct ProtocolSyncForks { + channel: ChannelPtr, + order_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, +} + +impl ProtocolSyncForks { + pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr { + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + + let order_sub = + channel.subscribe_msg::().await.expect("Missing ForkOrder dispatcher!"); + + Arc::new(Self { + channel: channel.clone(), + order_sub, + jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel), + state, + }) + } + + async fn handle_receive_order(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolSyncForks::handle_receive_tx() [START]"); + loop { + let order = self.order_sub.receive().await?; + + debug!( + target: "ircd", + "ProtocolSyncForks::handle_receive_order() received {:?}", + order + ); + + // Extra validations can be added here. + let proposals = self.state.read().unwrap().consensus.proposals.clone(); + let response = ForkResponse { proposals }; + self.channel.send(response).await?; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSyncForks { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "ircd", "ProtocolSyncForks::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; + debug!(target: "ircd", "ProtocolSyncForks::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolSyncForks" + } +} diff --git a/script/research/validatord/src/protocols/protocol_vote.rs b/script/research/validatord/src/protocols/protocol_vote.rs index 917c45575..daa994967 100644 --- a/script/research/validatord/src/protocols/protocol_vote.rs +++ b/script/research/validatord/src/protocols/protocol_vote.rs @@ -16,14 +16,16 @@ pub struct ProtocolVote { vote_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, - p2p: P2pPtr, + main_p2p: P2pPtr, + consensus_p2p: P2pPtr, } impl ProtocolVote { pub async fn init( channel: ChannelPtr, state: ValidatorStatePtr, - p2p: P2pPtr, + main_p2p: P2pPtr, + consensus_p2p: P2pPtr, ) -> ProtocolBasePtr { let message_subsytem = channel.get_message_subsystem(); message_subsytem.add_dispatch::().await; @@ -34,7 +36,8 @@ impl ProtocolVote { vote_sub, jobsman: ProtocolJobsManager::new("VoteProtocol", channel), state, - p2p, + main_p2p, + consensus_p2p, }) } @@ -49,8 +52,18 @@ impl ProtocolVote { vote ); let vote_copy = (*vote).clone(); - if self.state.write().unwrap().receive_vote(&vote_copy)? { - self.p2p.broadcast(vote_copy).await?; + let (voted, to_broadcast) = self.state.write().unwrap().receive_vote(&vote_copy)?; + if voted { + self.consensus_p2p.broadcast(vote_copy).await?; + // Broadcasting finalized blocks info, if any + match to_broadcast { + Some(blocks) => { + for info in blocks { + self.main_p2p.broadcast(info).await?; + } + } + None => continue, + } }; } } diff --git a/src/consensus/block.rs b/src/consensus/block.rs index cfb4f1372..f10a20f13 100644 --- a/src/consensus/block.rs +++ b/src/consensus/block.rs @@ -77,6 +77,24 @@ impl BlockStore { Ok(blockhash) } + /// Fetch given blocks from the blockstore. + /// The resulting vector contains `Option` which is `Some` if the block + /// was found in the blockstore, and `None`, if it has not. + pub fn get(&self, blockhashes: &[blake3::Hash]) -> Result>> { + let mut ret: Vec> = Vec::with_capacity(blockhashes.len()); + + for i in blockhashes { + if let Some(found) = self.0.get(i.as_bytes())? { + let block = deserialize(&found)?; + ret.push(Some((i.clone(), block))); + } else { + ret.push(None); + } + } + + Ok(ret) + } + /// Retrieve all blocks. /// Be carefull as this will try to load everything in memory. pub fn get_all(&self) -> Result>> { @@ -92,6 +110,69 @@ impl BlockStore { } } +/// Auxilary structure used for blockchain syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct BlockOrder { + /// Slot uid + pub sl: u64, + /// Block hash of that slot + pub block: blake3::Hash, +} + +impl net::Message for BlockOrder { + fn name() -> &'static str { + "blockorder" + } +} + +/// Auxilary structure represending a full block data, used for blockchain syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct BlockInfo { + /// Previous block hash + pub st: blake3::Hash, + /// Slot uid, generated by the beacon + pub sl: u64, + /// Transactions payload + pub txs: Vec, + /// Additional proposal information + pub metadata: Metadata, + /// Proposal information used by Streamlet consensus + pub sm: StreamletMetadata, +} + +impl BlockInfo { + pub fn new( + st: blake3::Hash, + sl: u64, + txs: Vec, + metadata: Metadata, + sm: StreamletMetadata, + ) -> BlockInfo { + BlockInfo { st, sl, txs, metadata, sm } + } +} + +impl net::Message for BlockInfo { + fn name() -> &'static str { + "blockinfo" + } +} + +impl_vec!(BlockInfo); + +/// Auxilary structure used for blockchain syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct BlockResponse { + /// Response blocks. + pub blocks: Vec, +} + +impl net::Message for BlockResponse { + fn name() -> &'static str { + "blockresponse" + } +} + #[derive(Debug)] pub struct BlockOrderStore(sled::Tree); @@ -117,6 +198,24 @@ impl BlockOrderStore { Ok(()) } + /// Fetch given slots block hashes from the blockstore. + /// The resulting vector contains `Option` which is `Some` if the block + /// was found in the blockstore, and `None`, if it has not. + pub fn get(&self, slots: &[u64]) -> Result>> { + let mut ret: Vec> = Vec::with_capacity(slots.len()); + + for sl in slots { + if let Some(found) = self.0.get(sl.to_be_bytes())? { + let block = deserialize(&found)?; + ret.push(Some(BlockOrder { sl: sl.clone(), block })); + } else { + ret.push(None); + } + } + + Ok(ret) + } + /// Retrieve the last block hash in the tree, based on the Ord implementation for Vec. pub fn get_last(&self) -> Result> { if let Some(found) = self.0.last()? { @@ -129,6 +228,24 @@ impl BlockOrderStore { Ok(None) } + /// Retrieve n hashes after key. + pub fn get_after(&self, mut key: u64, n: u64) -> Result> { + let mut hashes = Vec::new(); + let mut counter = 0; + while counter <= n { + if let Some(found) = self.0.get_gt(key.to_be_bytes())? { + let key_bytes: [u8; 8] = found.0.as_ref().try_into().unwrap(); + key = u64::from_be_bytes(key_bytes); + let block_hash = deserialize(&found.1)?; + hashes.push(block_hash); + counter = counter + 1; + } else { + break + } + } + Ok(hashes) + } + /// Retrieve all blocks hashes. /// Be carefull as this will try to load everything in memory. pub fn get_all(&self) -> Result>> { diff --git a/src/consensus/blockchain.rs b/src/consensus/blockchain.rs index 194097fa6..295e8b2bc 100644 --- a/src/consensus/blockchain.rs +++ b/src/consensus/blockchain.rs @@ -3,13 +3,13 @@ use std::io; use log::debug; use crate::{ - impl_vec, + impl_vec, net, util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Result, }; use super::{ - block::{Block, BlockOrderStore, BlockProposal, BlockStore}, + block::{Block, BlockInfo, BlockOrderStore, BlockProposal, BlockStore}, metadata::StreamletMetadataStore, tx::TxStore, }; @@ -32,12 +32,12 @@ impl Blockchain { let blocks = BlockStore::new(db, genesis)?; let order = BlockOrderStore::new(db, genesis)?; let transactions = TxStore::new(db)?; - let streamlet_metadata = StreamletMetadataStore::new(db)?; + let streamlet_metadata = StreamletMetadataStore::new(db, genesis)?; Ok(Blockchain { blocks, order, transactions, streamlet_metadata }) } /// Insertion of a block proposal. - pub fn add(&mut self, proposal: BlockProposal) -> Result { + pub fn add_by_proposal(&mut self, proposal: BlockProposal) -> Result { // Storing transactions let mut txs = Vec::new(); for tx in proposal.txs { @@ -58,10 +58,86 @@ impl Blockchain { Ok(hash) } + /// Insertion of a block info. + pub fn add_by_info(&mut self, info: BlockInfo) -> Result { + if self.has_block(&info)? { + let blockhash = + BlockProposal::to_proposal_hash(info.st, info.sl, &info.txs, &info.metadata); + return Ok(blockhash) + } + + // Storing transactions + let mut txs = Vec::new(); + for tx in info.txs { + let hash = self.transactions.insert(&tx)?; + txs.push(hash); + } + + // Storing block + let block = Block { st: info.st, sl: info.sl, txs, metadata: info.metadata }; + let hash = self.blocks.insert(&block)?; + + // Storing block order + self.order.insert(block.sl, hash)?; + + // Storing streamlet metadata + self.streamlet_metadata.insert(hash, &info.sm)?; + + Ok(hash) + } + /// Retrieve the last block slot and hash. pub fn last(&self) -> Result> { self.order.get_last() } + + /// Retrieve the last block slot and hash. + pub fn has_block(&self, info: &BlockInfo) -> Result { + let hashes = self.order.get(&vec![info.sl])?; + if hashes.is_empty() { + return Ok(false) + } + if let Some(found) = &hashes[0] { + // Checking provided info produces same hash + let blockhash = + BlockProposal::to_proposal_hash(info.st, info.sl, &info.txs, &info.metadata); + + return Ok(blockhash == found.block) + } + Ok(false) + } + + /// Retrieve n blocks with all their info, after start key. + pub fn get_with_info(&self, key: u64, n: u64) -> Result> { + let mut blocks_info = Vec::new(); + + // Retrieve requested hashes from order store + let hashes = self.order.get_after(key, n)?; + + // Retrieve blocks for found hashes + let blocks = self.blocks.get(&hashes)?; + + // For each found block, retrieve its txs and metadata and convert to BlockProposal + for option in blocks { + match option { + None => continue, + Some((hash, block)) => { + let mut txs = Vec::new(); + let found = self.transactions.get(&block.txs)?; + for option in found { + match option { + Some(tx) => txs.push(tx), + None => continue, + } + } + let sm = self.streamlet_metadata.get(&vec![hash])?[0].as_ref().unwrap().clone(); + blocks_info.push(BlockInfo::new(block.st, block.sl, txs, block.metadata, sm)); + } + } + } + + Ok(blocks_info) + } } /// This struct represents a sequence of block proposals. @@ -125,3 +201,29 @@ impl ProposalsChain { } impl_vec!(ProposalsChain); + +/// Auxilary structure used for forks syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct ForkOrder { + /// Validator id + pub id: u64, +} + +impl net::Message for ForkOrder { + fn name() -> &'static str { + "forkorder" + } +} + +/// Auxilary structure used for forks syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct ForkResponse { + /// Fork chains containing block proposals + pub proposals: Vec, +} + +impl net::Message for ForkResponse { + fn name() -> &'static str { + "forkresponse" + } +} diff --git a/src/consensus/metadata.rs b/src/consensus/metadata.rs index 9946b3f09..99064c79a 100644 --- a/src/consensus/metadata.rs +++ b/src/consensus/metadata.rs @@ -3,7 +3,7 @@ use crate::{ Result, }; -use super::{participant::Participant, util::Timestamp, vote::Vote}; +use super::{block::Block, participant::Participant, util::Timestamp, vote::Vote}; const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata"; @@ -62,9 +62,22 @@ impl StreamletMetadata { pub struct StreamletMetadataStore(sled::Tree); impl StreamletMetadataStore { - pub fn new(db: &sled::Db) -> Result { + pub fn new(db: &sled::Db, genesis: i64) -> Result { let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?; - Ok(Self(tree)) + let store = Self(tree); + if store.0.is_empty() { + // Genesis block record is generated. + let block = blake3::hash(&serialize(&Block::genesis_block(genesis))); + let metadata = StreamletMetadata { + votes: vec![], + notarized: true, + finalized: true, + participants: vec![], + }; + store.insert(block, &metadata)?; + } + + Ok(store) } /// Insert streamlet metadata into the store. @@ -74,6 +87,24 @@ impl StreamletMetadataStore { Ok(()) } + /// Fetch given streamlet metadata from the store. + /// The resulting vector contains `Option` which is `Some` if the metadata + /// was found in the store, and `None`, if it has not. + pub fn get(&self, hashes: &[blake3::Hash]) -> Result>> { + let mut ret: Vec> = Vec::with_capacity(hashes.len()); + + for i in hashes { + if let Some(found) = self.0.get(i.as_bytes())? { + let metadata = deserialize(&found)?; + ret.push(Some(metadata)); + } else { + ret.push(None); + } + } + + Ok(ret) + } + /// Retrieve all streamlet metadata. /// Be carefull as this will try to load everything in memory. pub fn get_all(&self) -> Result>> { diff --git a/src/consensus/state.rs b/src/consensus/state.rs index 145f98c2a..d3356e26c 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -19,7 +19,7 @@ use crate::{ }; use super::{ - block::{Block, BlockProposal}, + block::{Block, BlockInfo, BlockProposal}, blockchain::{Blockchain, ProposalsChain}, metadata::{Metadata, StreamletMetadata}, participant::Participant, @@ -206,21 +206,22 @@ impl ValidatorState { /// Finds the longest fully notarized blockchain the node holds and returns the last block hash. pub fn longest_notarized_chain_last_hash(&self) -> Result { - let hash = if !self.consensus.proposals.is_empty() { - let mut longest_notarized_chain = &self.consensus.proposals[0]; - let mut length = longest_notarized_chain.proposals.len(); - if self.consensus.proposals.len() > 1 { - for chain in &self.consensus.proposals[1..] { - if chain.notarized() && chain.proposals.len() > length { - length = chain.proposals.len(); - longest_notarized_chain = chain; - } + let mut longest_notarized_chain: Option = None; + let mut length = 0; + if !self.consensus.proposals.is_empty() { + for chain in &self.consensus.proposals[1..] { + if chain.notarized() && chain.proposals.len() > length { + longest_notarized_chain = Some(chain.clone()); + length = chain.proposals.len(); } } - longest_notarized_chain.proposals.last().unwrap().hash() - } else { - self.blockchain.last()?.unwrap().1 + } + + let hash = match longest_notarized_chain { + Some(chain) => chain.proposals.last().unwrap().hash(), + None => self.blockchain.last()?.unwrap().1, }; + Ok(hash) } @@ -305,11 +306,14 @@ impl ValidatorState { /// Node verifies if provided chain is notarized excluding the last block. pub fn extends_notarized_chain(&self, chain: &ProposalsChain) -> bool { - for proposal in &chain.proposals[..(chain.proposals.len() - 1)] { - if !proposal.sm.notarized { - return false + if chain.proposals.len() > 1 { + for proposal in &chain.proposals[..(chain.proposals.len() - 1)] { + if !proposal.sm.notarized { + return false + } } } + true } @@ -329,7 +333,7 @@ impl ValidatorState { let (last_sl, last_block) = self.blockchain.last()?.unwrap(); if proposal.st != last_block || proposal.sl <= last_sl { - debug!("Proposal doesn't extend any known chains."); + error!("Proposal doesn't extend any known chains."); return Ok(-2) } @@ -345,20 +349,20 @@ impl ValidatorState { /// nodes unconfirmed transactions list. /// Finally, we check if the notarization of the proposal can finalize parent proposals /// in its chain. - pub fn receive_vote(&mut self, vote: &Vote) -> Result { + pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option>)> { let mut encoded_proposal = vec![]; let result = vote.proposal.encode(&mut encoded_proposal); match result { Ok(_) => (), Err(e) => { error!("Proposal encoding failed. Error: {:?}", e); - return Ok(false) + return Ok((false, None)) } }; if !vote.public_key.verify(&encoded_proposal[..], &vote.vote) { debug!("Voter signature couldn't be verified. Voter: {:?}", vote.id); - return Ok(false) + return Ok((false, None)) } let nodes_count = self.consensus.participants.len(); @@ -369,12 +373,12 @@ impl ValidatorState { Some(participant) => { if self.current_epoch() <= participant.joined { debug!("Voter joined after current epoch. Voter: {:?}", vote.id); - return Ok(false) + return Ok((false, None)) } } None => { debug!("Voter is not a participant. Voter: {:?}", vote.id); - return Ok(false) + return Ok((false, None)) } } @@ -384,16 +388,17 @@ impl ValidatorState { if !self.consensus.orphan_votes.contains(vote) { self.consensus.orphan_votes.push(vote.clone()); } - return Ok(false) + return Ok((false, None)) } let (unwrapped, chain_index) = proposal.unwrap(); if !unwrapped.sm.votes.contains(vote) { unwrapped.sm.votes.push(vote.clone()); + let mut to_broadcast = Vec::new(); if !unwrapped.sm.notarized && unwrapped.sm.votes.len() > (2 * nodes_count / 3) { unwrapped.sm.notarized = true; - self.chain_finalization(chain_index)?; + to_broadcast = self.chain_finalization(chain_index)?; } // updating participant vote @@ -414,9 +419,9 @@ impl ValidatorState { self.consensus.participants.insert(participant.id, participant); - return Ok(true) + return Ok((true, Some(to_broadcast))) } - Ok(false) + return Ok((false, None)) } /// Node searches it the chains it holds for provided proposal. @@ -439,7 +444,8 @@ impl ValidatorState { /// Consensus finalization logic: If node has observed the notarization of 3 consecutive /// proposals in a fork chain, it finalizes (appends to canonical blockchain) all proposals up to the middle block. /// When fork chain proposals are finalized, rest fork chains not starting by those proposals are removed. - pub fn chain_finalization(&mut self, chain_index: i64) -> Result<()> { + pub fn chain_finalization(&mut self, chain_index: i64) -> Result> { + let mut to_broadcast = Vec::new(); let chain = &mut self.consensus.proposals[chain_index as usize]; let len = chain.proposals.len(); if len > 2 { @@ -465,7 +471,14 @@ impl ValidatorState { } chain.proposals.drain(0..(consecutive - 1)); for proposal in &finalized { - self.blockchain.add(proposal.clone())?; + self.blockchain.add_by_proposal(proposal.clone())?; + to_broadcast.push(BlockInfo::new( + proposal.st, + proposal.sl, + proposal.txs.clone(), + proposal.metadata.clone(), + proposal.sm.clone(), + )); } let (last_sl, last_block) = self.blockchain.last()?.unwrap(); @@ -493,7 +506,7 @@ impl ValidatorState { } } - Ok(()) + Ok(to_broadcast) } /// Node retreives a new participant and appends it to the pending participants list. @@ -555,4 +568,19 @@ impl ValidatorState { _ => Ok(()), } } + + /// Util function to reset the current consensus state. + pub fn reset_consensus_state(&mut self) -> Result<()> { + let genesis = self.consensus.genesis.clone(); + let consensus = ConsensusState { + genesis, + proposals: Vec::new(), + orphan_votes: Vec::new(), + participants: BTreeMap::new(), + pending_participants: Vec::new(), + }; + + self.consensus = consensus; + Ok(()) + } } diff --git a/src/consensus/tx.rs b/src/consensus/tx.rs index c4547fc15..a816c03e4 100644 --- a/src/consensus/tx.rs +++ b/src/consensus/tx.rs @@ -44,6 +44,24 @@ impl TxStore { Ok(txhash) } + /// Fetch given transactions from the txstore. + /// The resulting vector contains `Option` which is `Some` if the tx + /// was found in the txstore, and `None`, if it has not. + pub fn get(&self, txhashes: &[blake3::Hash]) -> Result>> { + let mut ret: Vec> = Vec::with_capacity(txhashes.len()); + + for i in txhashes { + if let Some(found) = self.0.get(i.as_bytes())? { + let tx = deserialize(&found)?; + ret.push(Some(tx)); + } else { + ret.push(None); + } + } + + Ok(ret) + } + /// Retrieve all transactions. /// Be carefull as this will try to load everything in memory. pub fn get_all(&self) -> Result>> { diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 1ac955ea8..4a981a684 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -237,4 +237,8 @@ impl P2p { pub async fn subscribe_stop(&self) -> Subscription { self.stop_subscriber.clone().subscribe().await } + + pub fn channels(&self) -> &ConnectedChannels { + &self.channels + } }