diff --git a/doc/src/architecture/blockchain.md b/doc/src/architecture/blockchain.md index dd3ab3749..b595ce45e 100644 --- a/doc/src/architecture/blockchain.md +++ b/doc/src/architecture/blockchain.md @@ -68,25 +68,29 @@ the probability that a party holding all the stake will be selected to be a leader. Stakeholder is selected as leader for slot j with probability $\phi_f(\alpha_i)$, $\alpha_i$ is $U_i$ stake. +### absolute stake aggregation dependent leader selection functions + +#### linear functions -### linear aggregation dependent leader selection in the previous leader selection function, it has the unique property of independent aggregation of the stakes, meaning the property of a leader winning leadership with stakes $\sigma$ is independent of whether the stakeholder would act as a pool of stakes, or distributed stakes on competing coins. "one minus the probability" of winning leadership with aggregated stakes is $1-\phi(\sum_{i}\sigma_i)=1-(1+(1-f)^{\sigma_i})=-(1-f)^{\sum_{i}\sigma_i}$, the joint "one minus probability" of all the stakes (each with probability $\phi(\sigma_i))$ winning aggregated winning the leadership -$\prod_{i}^{n}(1-\phi(\sigma_i))=-(1-f)^{\sum_{\sigma_i}}$ +$\prod_{i}^{n}(1-\phi(\sigma_i))=-(1-f)^{\sum_i(\sigma_i)}$ thus: $$ 1-\phi(\sum_{i}\sigma_i) =\prod_{i}^{n}(1-\phi(\sigma_i)) $$ -#### linear leader selection +##### linear leader selection + $$y < T $$ $$y = 2^lk \mid 0 \le k \le 1$$ $$T = 2^l\phi(v)$$ $$ \phi(v)=\frac{1}{v_{max}}v $$ -#### dependent aggregation +##### dependent aggregation + linear leader selection has the dependent aggregation property, meaning it's favorable to compete in pools with sum of the stakes over aggregated stakes of distributed stakes: $$\phi(\sum_{i}{\sigma_i})>\prod_{i}^{n}{\sigma_i}$$ @@ -95,15 +99,48 @@ let's assume the stakes are divided to stakes of value $\sigma_i=1$ for $\Sigma> $$V>(\frac{1}{v_{max}})^{n-1}$$ note that $(\frac{1}{v_{max}})^{n-1} < 1, V>1$, thus competing with single coin of the sum of stakes held by the stakeholder is favourable. -### scalar linear aggregation dependent leader selection +##### scalar linear aggregation dependent leader selection + a target function T with scalar coefficients can be formalized as $$T=2^lk\phi(\Sigma)=2^l(\frac{1}{v_{max}})\Sigma$$ let's assume $v_{max}=2^v$, then: - $$T=2^lk\phi(\Sigma)=2^{\frac{l}{v}}\Sigma$$ + $$T=2^lk\phi(\Sigma)=2^{l-v}\Sigma$$ then the lead statement is - $$y<2^{\frac{l}{v}}\Sigma$$ for example for a group order or l=24 bits, and maximum value of $v_{max}=2^{10}$, then lead statement: + $$y<2^{l-v}\Sigma$$ for example for a group order or l= 24 bits, and maximum value of $v_{max}=2^{10}$, then lead statement: $$y<2^{14}\Sigma$$ +##### competing max value coins + +for a stakeholder with $nv_{max}$ absolute stake, $\mid n \in \mathbb{Z}$ it's advantageous for the stakeholder to +distribute stakes on $n$ competing coins. + + +#### inverse functions + +inverse lead selection functions doesn't require maximum stake, most suitable for absolute stake, +it has the disadvantage that it's inflating with increasing rate as time goes on , but it can be function of the inverse of the slot +to control the increasing frequency of winning leadership. + +##### leader selection without maximum stake upper limit + +the inverse leader selection without maximum stake value can be $\phi(v)=\frac{v}{v+c}$ where c is $ > 1$ +and inversely proportional with probability of winning leadership, let it be called leadership coefficient. + + +##### decaying linear leader selection + +as the time goes one, and stakes increase, this means the combined stakes of all stakeholders increases the probability +of winning leadership in next slots leading to more leaders at a single slot, to maintain, or to be more general to control this frequency of leaders per slot, +c (the leadership coefficient) need to be function of the slot $sl$, i.e $c(sl) = \frac{sl}{R}$ where $R$ is epoch size (number of slots in epoch). + +##### pairing leader selection independent aggregation function + +the only family of functions that are isomorphic to summation on multiplication (having the independent aggregation property) is the exponential function, +and since it's impossible to implement in plonk, + +###### TODO (proof) + +a re-formalization of the lead statement using pairing that is isomorphic to summation on multiplication is also an options. ## Leaky non-resettable beacon diff --git a/doc/src/architecture/blockchain.pdf b/doc/src/architecture/blockchain.pdf index 92f07900c..639bd6efc 100644 Binary files a/doc/src/architecture/blockchain.pdf and b/doc/src/architecture/blockchain.pdf differ diff --git a/script/research/nodes-tool/src/main.rs b/script/research/nodes-tool/src/main.rs index a9a33bb04..5e7dbba0a 100644 --- a/script/research/nodes-tool/src/main.rs +++ b/script/research/nodes-tool/src/main.rs @@ -333,7 +333,6 @@ struct StateInfo { _id: u64, _consensus: ConsensusInfo, _blockchain: BlockchainInfo, - _unconfirmed_txs: Vec, } impl StateInfo { @@ -341,8 +340,7 @@ impl StateInfo { let _id = state.id; let _consensus = ConsensusInfo::new(&state.consensus); let _blockchain = BlockchainInfo::new(&state.blockchain); - let _unconfirmed_txs = state.unconfirmed_txs.clone(); - StateInfo { _id, _consensus, _blockchain, _unconfirmed_txs } + StateInfo { _id, _consensus, _blockchain } } } diff --git a/script/research/validatord/src/main.rs b/script/research/validatord/src/main.rs index 2b024ba51..a13d4c84f 100644 --- a/script/research/validatord/src/main.rs +++ b/script/research/validatord/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, path::PathBuf, sync::Arc, thread}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration}; use async_executor::Executor; use async_trait::async_trait; @@ -13,9 +13,8 @@ use structopt_toml::StructOptToml; use darkfi::{ consensus::{ block::{BlockOrder, BlockResponse}, - blockchain::{ForkOrder, ForkResponse}, participant::Participant, - state::{ValidatorState, ValidatorStatePtr}, + state::{ConsensusRequest, ConsensusResponse, ValidatorState, ValidatorStatePtr}, tx::Tx, }, net, @@ -38,8 +37,8 @@ use darkfi::{ use validatord::protocols::{ protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal, - protocol_sync::ProtocolSync, protocol_sync_forks::ProtocolSyncForks, protocol_tx::ProtocolTx, - protocol_vote::ProtocolVote, + protocol_sync::ProtocolSync, protocol_sync_consensus::ProtocolSyncConsensus, + protocol_tx::ProtocolTx, protocol_vote::ProtocolVote, }; const CONFIG_FILE: &str = r"validatord_config.toml"; @@ -152,28 +151,28 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> Ok(()) } -async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { - info!("Node starts syncing forks..."); +async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Node starts syncing consensus state..."); // Using len here because is_empty() uses unstable library feature 'exact_size_is_empty' if p2p.channels().lock().await.values().len() != 0 { - // Nodes ask for the fork chains of the last channel peer + // Nodes ask for the consensus state of the last channel peer let channel = p2p.channels().lock().await.values().last().unwrap().clone(); // Communication setup let message_subsytem = channel.get_message_subsystem(); - message_subsytem.add_dispatch::().await; + message_subsytem.add_dispatch::().await; let response_sub = channel - .subscribe_msg::() + .subscribe_msg::() .await - .expect("Missing ForkResponse dispatcher!"); + .expect("Missing ConsensusResponse dispatcher!"); - // Node creates a BlockOrder and sends it - let order = ForkOrder { id: state.read().unwrap().id }; - channel.send(order).await?; + // Node creates a ConsensusRequest and sends it + let request = ConsensusRequest { id: state.read().unwrap().id }; + channel.send(request).await?; // Node stores responce data. Extra validations can be added here. let response = response_sub.receive().await?; - state.write().unwrap().consensus.proposals = response.proposals.clone(); + state.write().unwrap().consensus = response.consensus.clone(); } else { info!("Node is not connected to other nodes, resetting consensus state."); state.write().unwrap().reset_consensus_state()?; @@ -184,28 +183,47 @@ async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Resul } async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { - // Node syncs its fork chains - let result = syncing_forks_task(p2p.clone(), state.clone()).await; + // Node waits just before the current or next epoch end, + // so it can start syncing latest state. + let mut seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + let one_sec = Duration::new(1, 0); + loop { + if seconds_until_next_epoch > one_sec { + seconds_until_next_epoch = seconds_until_next_epoch - one_sec; + break + } + info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); + thread::sleep(seconds_until_next_epoch); + seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + } + info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); + thread::sleep(seconds_until_next_epoch); + + // Node syncs its consensus state + let result = syncing_consensus_task(p2p.clone(), state.clone()).await; match result { Ok(()) => (), - Err(e) => error!("Sync forks failed. Error: {:?}", e), + Err(e) => error!("Sync consensus state failed. Error: {:?}", e), } - // Node signals the network that it starts participating + // Node signals the network that it will start participating let participant = Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch()); state.write().unwrap().append_participant(participant.clone()); - let result = p2p.broadcast(participant).await; + let result = p2p.broadcast(participant.clone()).await; match result { Ok(()) => info!("Participation message broadcasted successfuly."), Err(e) => error!("Broadcast failed. Error: {:?}", e), } - // After initialization node should wait for next epoch + // After initialization node waits for next epoch to start participating let seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); + // Node modifies its participating flag to true + state.write().unwrap().participating = true; + loop { // Node refreshes participants records state.write().unwrap().refresh_participants(); @@ -268,7 +286,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { } }; - // Node waits untile next epoch + // Node waits until next epoch let seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); @@ -315,10 +333,20 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { // Adding ProtocolSync to the registry let state2 = state.clone(); + let consensus_mode = true; // This flag should be based on staking registry .register(net::SESSION_ALL, move |channel, main_p2p| { let state = state2.clone(); - async move { ProtocolSync::init(channel, state, main_p2p).await } + async move { ProtocolSync::init(channel, state, main_p2p, consensus_mode).await } + }) + .await; + + // Adding ProtocolTx to the registry + let state2 = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, main_p2p| { + let state = state2.clone(); + async move { ProtocolTx::init(channel, state, main_p2p).await } }) .await; @@ -335,6 +363,18 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { }) .detach(); + // RPC interface + let ex2 = executor.clone(); + let ex3 = ex2.clone(); + let rpc_interface = Arc::new(JsonRpcInterface { + state: state.clone(), + p2p: main_p2p.clone(), + _rpc_listen_addr: opts.rpc, + }); + executor + .spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await }) + .detach(); + // Node starts syncing let state2 = state.clone(); syncing_task(main_p2p.clone(), state2).await?; @@ -343,15 +383,6 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { let consensus_p2p = net::P2p::new(consensus_subnet_settings).await; let registry = consensus_p2p.protocol_registry(); - // Adding ProtocolTx to the registry - let state2 = state.clone(); - registry - .register(net::SESSION_ALL, move |channel, consensus_p2p| { - let state = state2.clone(); - async move { ProtocolTx::init(channel, state, consensus_p2p).await } - }) - .await; - // Adding PropotolVote to the registry let p2p = main_p2p.clone(); let state2 = state.clone(); @@ -386,7 +417,7 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { registry .register(net::SESSION_ALL, move |channel, _consensus_p2p| { let state = state2.clone(); - async move { ProtocolSyncForks::init(channel, state).await } + async move { ProtocolSyncConsensus::init(channel, state).await } }) .await; @@ -403,18 +434,6 @@ async fn start(executor: Arc>, opts: &Opt) -> Result<()> { }) .detach(); - // RPC interface - let ex2 = executor.clone(); - let ex3 = ex2.clone(); - let rpc_interface = Arc::new(JsonRpcInterface { - state: state.clone(), - p2p: consensus_p2p.clone(), - _rpc_listen_addr: opts.rpc, - }); - executor - .spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await }) - .detach(); - proposal_task(consensus_p2p, state).await; Ok(()) diff --git a/script/research/validatord/src/protocols/mod.rs b/script/research/validatord/src/protocols/mod.rs index 64243e37e..b71c14ced 100644 --- a/script/research/validatord/src/protocols/mod.rs +++ b/script/research/validatord/src/protocols/mod.rs @@ -1,13 +1,13 @@ pub mod protocol_participant; pub mod protocol_proposal; pub mod protocol_sync; -pub mod protocol_sync_forks; +pub mod protocol_sync_consensus; pub mod protocol_tx; pub mod protocol_vote; pub use protocol_participant::ProtocolParticipant; pub use protocol_proposal::ProtocolProposal; pub use protocol_sync::ProtocolSync; -pub use protocol_sync_forks::ProtocolSyncForks; +pub use protocol_sync_consensus::ProtocolSyncConsensus; pub use protocol_tx::ProtocolTx; pub use protocol_vote::ProtocolVote; diff --git a/script/research/validatord/src/protocols/protocol_participant.rs b/script/research/validatord/src/protocols/protocol_participant.rs index a31eea238..1d7adfd1b 100644 --- a/script/research/validatord/src/protocols/protocol_participant.rs +++ b/script/research/validatord/src/protocols/protocol_participant.rs @@ -49,12 +49,10 @@ impl ProtocolParticipant { "ProtocolParticipant::handle_receive_participant() received {:?}", participant ); - if self.state.write().unwrap().append_participant((*participant).clone()) { - let pending_participants = - self.state.read().unwrap().consensus.pending_participants.clone(); - for pending_participant in pending_participants { - self.p2p.broadcast(pending_participant.clone()).await?; - } + + let participant_copy = (*participant).clone(); + if self.state.write().unwrap().append_participant(participant_copy.clone()) { + self.p2p.broadcast(participant_copy).await?; } } } diff --git a/script/research/validatord/src/protocols/protocol_sync.rs b/script/research/validatord/src/protocols/protocol_sync.rs index 217b4f953..137a341ac 100644 --- a/script/research/validatord/src/protocols/protocol_sync.rs +++ b/script/research/validatord/src/protocols/protocol_sync.rs @@ -20,46 +20,49 @@ const BATCH: u64 = 10; pub struct ProtocolSync { channel: ChannelPtr, - order_sub: MessageSubscription, + request_sub: MessageSubscription, block_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, - _p2p: P2pPtr, + p2p: P2pPtr, + consensus_mode: bool, } impl ProtocolSync { pub async fn init( channel: ChannelPtr, state: ValidatorStatePtr, - _p2p: P2pPtr, + p2p: P2pPtr, + consensus_mode: bool, ) -> ProtocolBasePtr { let message_subsytem = channel.get_message_subsystem(); message_subsytem.add_dispatch::().await; message_subsytem.add_dispatch::().await; - let order_sub = + let request_sub = channel.subscribe_msg::().await.expect("Missing BlockOrder dispatcher!"); let block_sub = channel.subscribe_msg::().await.expect("Missing BlockInfo dispatcher!"); Arc::new(Self { channel: channel.clone(), - order_sub, + request_sub, block_sub, jobsman: ProtocolJobsManager::new("SyncProtocol", channel), state, - _p2p, + p2p, + consensus_mode, }) } - async fn handle_receive_order(self: Arc) -> Result<()> { - debug!(target: "ircd", "ProtocolSync::handle_receive_tx() [START]"); + async fn handle_receive_request(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolSync::handle_receive_request() [START]"); loop { - let order = self.order_sub.receive().await?; + let order = self.request_sub.receive().await?; debug!( target: "ircd", - "ProtocolSync::handle_receive_order() received {:?}", + "ProtocolSync::handle_receive_request() received {:?}", order ); @@ -82,18 +85,19 @@ impl ProtocolSync { info ); - // TODO: Following code should be executed only by replicators, not consensus nodes. - // Commented for now, as to not mess consensus testing. - // (Don't forget to remove _ from _p2p) - /* - // Node stores finalized block, if it doesn't exists (checking by slot). + // Node stores finalized block, if it doesn't exists (checking by slot), + // and removes its transactions from the unconfirmed_txs vector. + // Consensus mode enabled nodes have already performed this steps, + // during proposal finalization. // Extra validations can be added here. - let info_copy = (*info).clone(); - if !self.state.read().unwrap().blockchain.has_block(&info_copy)? { - self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?; - self.p2p.broadcast(info_copy).await?; + if !self.consensus_mode { + let info_copy = (*info).clone(); + if !self.state.read().unwrap().blockchain.has_block(&info_copy)? { + self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?; + self.state.write().unwrap().remove_txs(info_copy.txs.clone())?; + self.p2p.broadcast(info_copy).await?; + } } - */ } } } @@ -103,7 +107,7 @@ impl ProtocolBase for ProtocolSync { async fn start(self: Arc, executor: Arc>) -> Result<()> { debug!(target: "ircd", "ProtocolSync::start() [START]"); self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; debug!(target: "ircd", "ProtocolSync::start() [END]"); Ok(()) diff --git a/script/research/validatord/src/protocols/protocol_sync_consensus.rs b/script/research/validatord/src/protocols/protocol_sync_consensus.rs new file mode 100644 index 000000000..d062c6e55 --- /dev/null +++ b/script/research/validatord/src/protocols/protocol_sync_consensus.rs @@ -0,0 +1,72 @@ +use async_executor::Executor; +use async_trait::async_trait; + +use darkfi::{ + consensus::state::{ConsensusRequest, ConsensusResponse, ValidatorStatePtr}, + net::{ + ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, + ProtocolJobsManagerPtr, + }, + Result, +}; +use log::debug; +use std::sync::Arc; + +pub struct ProtocolSyncConsensus { + channel: ChannelPtr, + request_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, +} + +impl ProtocolSyncConsensus { + pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr { + let message_subsytem = channel.get_message_subsystem(); + message_subsytem.add_dispatch::().await; + + let request_sub = channel + .subscribe_msg::() + .await + .expect("Missing ConsensusRequest dispatcher!"); + + Arc::new(Self { + channel: channel.clone(), + request_sub, + jobsman: ProtocolJobsManager::new("SyncConsensusProtocol", channel), + state, + }) + } + + async fn handle_receive_request(self: Arc) -> Result<()> { + debug!(target: "ircd", "ProtocolSyncConsensus::handle_receive_request() [START]"); + loop { + let order = self.request_sub.receive().await?; + + debug!( + target: "ircd", + "ProtocolSyncConsensus::handle_receive_request() received {:?}", + order + ); + + // Extra validations can be added here. + let consensus = self.state.read().unwrap().consensus.clone(); + let response = ConsensusResponse { consensus }; + self.channel.send(response).await?; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSyncConsensus { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "ircd", "ProtocolSyncConsensus::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; + debug!(target: "ircd", "ProtocolSyncConsensus::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolSyncConsensus" + } +} diff --git a/script/research/validatord/src/protocols/protocol_sync_forks.rs b/script/research/validatord/src/protocols/protocol_sync_forks.rs deleted file mode 100644 index 0529e6e17..000000000 --- a/script/research/validatord/src/protocols/protocol_sync_forks.rs +++ /dev/null @@ -1,73 +0,0 @@ -use async_executor::Executor; -use async_trait::async_trait; - -use darkfi::{ - consensus::{ - blockchain::{ForkOrder, ForkResponse}, - state::ValidatorStatePtr, - }, - net::{ - ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, - ProtocolJobsManagerPtr, - }, - Result, -}; -use log::debug; -use std::sync::Arc; - -pub struct ProtocolSyncForks { - channel: ChannelPtr, - order_sub: MessageSubscription, - jobsman: ProtocolJobsManagerPtr, - state: ValidatorStatePtr, -} - -impl ProtocolSyncForks { - pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr { - let message_subsytem = channel.get_message_subsystem(); - message_subsytem.add_dispatch::().await; - - let order_sub = - channel.subscribe_msg::().await.expect("Missing ForkOrder dispatcher!"); - - Arc::new(Self { - channel: channel.clone(), - order_sub, - jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel), - state, - }) - } - - async fn handle_receive_order(self: Arc) -> Result<()> { - debug!(target: "ircd", "ProtocolSyncForks::handle_receive_tx() [START]"); - loop { - let order = self.order_sub.receive().await?; - - debug!( - target: "ircd", - "ProtocolSyncForks::handle_receive_order() received {:?}", - order - ); - - // Extra validations can be added here. - let proposals = self.state.read().unwrap().consensus.proposals.clone(); - let response = ForkResponse { proposals }; - self.channel.send(response).await?; - } - } -} - -#[async_trait] -impl ProtocolBase for ProtocolSyncForks { - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "ircd", "ProtocolSyncForks::start() [START]"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; - debug!(target: "ircd", "ProtocolSyncForks::start() [END]"); - Ok(()) - } - - fn name(&self) -> &'static str { - "ProtocolSyncForks" - } -} diff --git a/script/research/validatord/src/protocols/protocol_tx.rs b/script/research/validatord/src/protocols/protocol_tx.rs index ba4a12815..6f8f881bc 100644 --- a/script/research/validatord/src/protocols/protocol_tx.rs +++ b/script/research/validatord/src/protocols/protocol_tx.rs @@ -49,6 +49,8 @@ impl ProtocolTx { tx ); let tx_copy = (*tx).clone(); + + // Nodes use unconfirmed_txs vector as seen_txs pool. if self.state.write().unwrap().append_tx(tx_copy.clone()) { self.p2p.broadcast(tx_copy).await?; } diff --git a/src/consensus/blockchain.rs b/src/consensus/blockchain.rs index 295e8b2bc..06c1a9a1e 100644 --- a/src/consensus/blockchain.rs +++ b/src/consensus/blockchain.rs @@ -3,7 +3,7 @@ use std::io; use log::debug; use crate::{ - impl_vec, net, + impl_vec, util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Result, }; @@ -201,29 +201,3 @@ impl ProposalsChain { } impl_vec!(ProposalsChain); - -/// Auxilary structure used for forks syncing. -#[derive(Debug, SerialEncodable, SerialDecodable)] -pub struct ForkOrder { - /// Validator id - pub id: u64, -} - -impl net::Message for ForkOrder { - fn name() -> &'static str { - "forkorder" - } -} - -/// Auxilary structure used for forks syncing. -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct ForkResponse { - /// Fork chains containing block proposals - pub proposals: Vec, -} - -impl net::Message for ForkResponse { - fn name() -> &'static str { - "forkresponse" - } -} diff --git a/src/consensus/state.rs b/src/consensus/state.rs index d3356e26c..d4d363546 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -14,6 +14,7 @@ use crate::{ keypair::{PublicKey, SecretKey}, schnorr::{SchnorrPublic, SchnorrSecret}, }, + net, util::serial::{deserialize, serialize, Encodable, SerialDecodable, SerialEncodable}, Error, Result, }; @@ -28,11 +29,11 @@ use super::{ vote::Vote, }; -const DELTA: u64 = 60; +pub const DELTA: u64 = 10; const SLED_CONSESUS_STATE_TREE: &[u8] = b"_consensus_state"; /// This struct represents the information required by the consensus algorithm. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct ConsensusState { /// Genesis block creation timestamp pub genesis: Timestamp, @@ -67,6 +68,32 @@ impl ConsensusState { } } +/// Auxilary structure used for consensus syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct ConsensusRequest { + /// Validator id + pub id: u64, +} + +impl net::Message for ConsensusRequest { + fn name() -> &'static str { + "consensusrequest" + } +} + +/// Auxilary structure used for consensus syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct ConsensusResponse { + /// Hot/live data used by the consensus algorithm + pub consensus: ConsensusState, +} + +impl net::Message for ConsensusResponse { + fn name() -> &'static str { + "consensusresponse" + } +} + /// Atomic pointer to validator state. pub type ValidatorStatePtr = Arc>; @@ -88,6 +115,8 @@ pub struct ValidatorState { pub unconfirmed_txs: Vec, /// Genesis block hash, used for validations pub genesis_block: blake3::Hash, + /// Participation flag + pub participating: bool, } impl ValidatorState { @@ -100,6 +129,7 @@ impl ValidatorState { let blockchain = Blockchain::new(&db, genesis)?; let unconfirmed_txs = Vec::new(); let genesis_block = blake3::hash(&serialize(&Block::genesis_block(genesis))); + let participating = false; Ok(Arc::new(RwLock::new(ValidatorState { id, secret, @@ -109,6 +139,7 @@ impl ValidatorState { blockchain, unconfirmed_txs, genesis_block, + participating, }))) } @@ -148,7 +179,6 @@ impl ValidatorState { let epoch = self.current_epoch(); let mut hasher = DefaultHasher::new(); epoch.hash(&mut hasher); - self.zero_participants_check(); let pos = hasher.finish() % (self.consensus.participants.len() as u64); self.consensus.participants.iter().nth(pos as usize).unwrap().1.id } @@ -164,8 +194,8 @@ impl ValidatorState { /// Proposal extends the longest notarized fork chain the node holds. pub fn propose(&self) -> Result> { let epoch = self.current_epoch(); - let previous_hash = self.longest_notarized_chain_last_hash().unwrap(); - let unproposed_txs = self.unproposed_txs(); + let (previous_hash, index) = self.longest_notarized_chain_last_hash().unwrap(); + let unproposed_txs = self.unproposed_txs(index); let metadata = Metadata::new( get_current_time(), String::from("proof"), @@ -189,30 +219,43 @@ impl ValidatorState { ))) } - /// Node retrieves all unconfiremd transactions not proposed in previous blocks. - pub fn unproposed_txs(&self) -> Vec { + /// Node retrieves all unconfirmed transactions not proposed + /// in previous blocks of provided index chain. + pub fn unproposed_txs(&self, index: i64) -> Vec { let mut unproposed_txs = self.unconfirmed_txs.clone(); - for chain in &self.consensus.proposals { - for proposal in &chain.proposals { - for tx in &proposal.txs { - if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) { - unproposed_txs.remove(pos); - } + + // If index is -1(canonical blockchain) a new fork chain will be generated, + // therefore all unproposed transactions can be included in the proposal. + if index == -1 { + return unproposed_txs + } + + // We iterate the fork chain proposals to find already proposed transactions + // and remove them from the local unproposed_txs vector. + let chain = &self.consensus.proposals[index as usize]; + for proposal in &chain.proposals { + for tx in &proposal.txs { + if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) { + unproposed_txs.remove(pos); } } } + unproposed_txs } - /// Finds the longest fully notarized blockchain the node holds and returns the last block hash. - pub fn longest_notarized_chain_last_hash(&self) -> Result { + /// Finds the longest fully notarized blockchain the node holds and returns the last block hash + /// and the chain index. + pub fn longest_notarized_chain_last_hash(&self) -> Result<(blake3::Hash, i64)> { let mut longest_notarized_chain: Option = None; let mut length = 0; + let mut index = -1; if !self.consensus.proposals.is_empty() { - for chain in &self.consensus.proposals[1..] { + for (i, chain) in self.consensus.proposals.iter().enumerate() { if chain.notarized() && chain.proposals.len() > length { longest_notarized_chain = Some(chain.clone()); length = chain.proposals.len(); + index = i as i64; } } } @@ -222,12 +265,17 @@ impl ValidatorState { None => self.blockchain.last()?.unwrap().1, }; - Ok(hash) + Ok((hash, index)) } /// Node receives the proposed block, verifies its sender(epoch leader), /// and proceeds with voting on it. pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result> { + // Node hasn't started participating + if !self.participating { + return Ok(None) + } + let leader = self.epoch_leader(); if leader != proposal.id { debug!( @@ -256,7 +304,6 @@ impl ValidatorState { /// If proposal extends the canonical blockchain, a new fork chain is created. /// Node votes on the proposal, only if it extends the longest notarized fork chain it has seen. pub fn vote(&mut self, proposal: &BlockProposal) -> Result> { - self.zero_participants_check(); let mut proposal = proposal.clone(); // Generate proposal hash @@ -318,7 +365,7 @@ impl ValidatorState { } /// Given a proposal, node finds the index of the chain it extends. - pub fn find_extended_chain_index(&self, proposal: &BlockProposal) -> Result { + pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result { for (index, chain) in self.consensus.proposals.iter().enumerate() { let last = chain.proposals.last().unwrap(); let hash = last.hash(); @@ -350,6 +397,11 @@ impl ValidatorState { /// Finally, we check if the notarization of the proposal can finalize parent proposals /// in its chain. pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option>)> { + // Node hasn't started participating + if !self.participating { + return Ok((false, None)) + } + let mut encoded_proposal = vec![]; let result = vote.proposal.encode(&mut encoded_proposal); match result { @@ -366,8 +418,6 @@ impl ValidatorState { } let nodes_count = self.consensus.participants.len(); - self.zero_participants_check(); - // Checking that the voter can actually vote. match self.consensus.participants.get(&vote.id) { Some(participant) => { @@ -440,6 +490,17 @@ impl ValidatorState { Ok(None) } + /// Note removes provided transactions vector, from unconfirmed_txs, if they exist. + pub fn remove_txs(&mut self, transactions: Vec) -> Result<()> { + for tx in transactions { + if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { + self.unconfirmed_txs.remove(pos); + } + } + + Ok(()) + } + /// Provided an index, node checks if chain can be finalized. /// Consensus finalization logic: If node has observed the notarization of 3 consecutive /// proposals in a fork chain, it finalizes (appends to canonical blockchain) all proposals up to the middle block. @@ -463,15 +524,11 @@ impl ValidatorState { for proposal in &mut chain.proposals[..(consecutive - 1)] { proposal.sm.finalized = true; finalized.push(proposal.clone()); - for tx in proposal.txs.clone() { - if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { - self.unconfirmed_txs.remove(pos); - } - } } chain.proposals.drain(0..(consecutive - 1)); for proposal in &finalized { self.blockchain.add_by_proposal(proposal.clone())?; + self.remove_txs(proposal.txs.clone())?; to_broadcast.push(BlockInfo::new( proposal.st, proposal.sl, @@ -518,17 +575,6 @@ impl ValidatorState { true } - /// This prevent the extreme case scenario where network is initialized, but some nodes - /// have not pushed the initial participants in the map. - pub fn zero_participants_check(&mut self) { - if self.consensus.participants.len() == 0 { - for participant in &self.consensus.pending_participants { - self.consensus.participants.insert(participant.id, participant.clone()); - } - self.consensus.pending_participants = Vec::new(); - } - } - /// Node refreshes participants map, to retain only the active ones. /// Active nodes are considered those who joined or voted on previous epoch. pub fn refresh_participants(&mut self) { @@ -557,6 +603,12 @@ impl ValidatorState { for index in inactive { self.consensus.participants.remove(&index); } + + if self.consensus.participants.is_empty() { + // If no nodes are active, node becomes a single node network. + let participant = Participant::new(self.id, self.current_epoch()); + self.consensus.pending_participants.push(participant); + } } /// Util function to save the current consensus state to provided file path. diff --git a/src/error.rs b/src/error.rs index e0d9b7de7..b1f6f84f6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -229,6 +229,10 @@ pub enum Error { #[error("Unsupported network transport")] UnsupportedTransport, + + #[cfg(feature = "net2")] + #[error("TransportError: {0}")] + TransportError(String), } #[cfg(feature = "node")] @@ -353,6 +357,13 @@ impl From for Error { } } +#[cfg(feature = "net2")] +impl From> for Error { + fn from(err: crate::net2::transport::TransportError) -> Error { + Error::TransportError(err.to_string()) + } +} + #[cfg(feature = "wasm-runtime")] impl From for Error { fn from(err: wasmer::RuntimeError) -> Error { diff --git a/src/net2/acceptor.rs b/src/net2/acceptor.rs index 0992ce6d2..0794e01e0 100644 --- a/src/net2/acceptor.rs +++ b/src/net2/acceptor.rs @@ -61,9 +61,9 @@ impl Acceptor { /// Run the accept loop. async fn run_accept_loop(self: Arc, url: url::Url) -> Result<()> { let transport = T::new(None, 1024); - let listener = Arc::new(transport.listen_on(url.clone()).unwrap().await.unwrap()); + let listener = Arc::new(transport.listen_on(url.clone())?.await?); loop { - let stream = T::accept(listener.clone()).await; + let stream = T::accept(listener.clone()).await?; let channel = Channel::::new(stream, url.clone()).await; self.channel_subscriber.notify(Ok(channel)).await; } diff --git a/src/net2/connector.rs b/src/net2/connector.rs index 6fad7dc71..43a326712 100644 --- a/src/net2/connector.rs +++ b/src/net2/connector.rs @@ -23,7 +23,7 @@ impl Connector { let stream_result = timeout(Duration::from_secs(self.settings.connect_timeout_seconds.into()), async { let transport = T::new(None, 1024); - let connect_stream = transport.dial(hostaddr.clone()).unwrap().await.unwrap(); + let connect_stream = transport.dial(hostaddr.clone())?.await?; let channel = Channel::::new(connect_stream, hostaddr).await; Ok(channel) }) diff --git a/src/net2/transport.rs b/src/net2/transport.rs index ab4fbf555..08412ea53 100644 --- a/src/net2/transport.rs +++ b/src/net2/transport.rs @@ -20,8 +20,10 @@ pub trait Transport: Sync + Send + 'static + Clone { type Error: Error; - type Listener: Future> + Sync + Send; - type Dial: Future> + Sync + Send; + type Listener: Future>> + + Sync + + Send; + type Dial: Future>> + Sync + Send; fn listen_on(self, url: Url) -> Result> where @@ -33,7 +35,9 @@ pub trait Transport: Sync + Send + 'static + Clone { fn new(ttl: Option, backlog: i32) -> Self; - async fn accept(listener: Arc) -> Self::Connector; + async fn accept( + listener: Arc, + ) -> Result>; } #[derive(Debug, thiserror::Error)] diff --git a/src/net2/transport/tcp.rs b/src/net2/transport/tcp.rs index dda7ca0c3..7689e7976 100644 --- a/src/net2/transport/tcp.rs +++ b/src/net2/transport/tcp.rs @@ -27,9 +27,14 @@ impl Transport for TcpTransport { type Error = io::Error; - type Listener = - Pin> + Send + Sync>>; - type Dial = Pin> + Send + Sync>>; + type Listener = Pin< + Box>> + Send + Sync>, + >; + type Dial = Pin< + Box< + dyn Future>> + Send + Sync, + >, + >; fn listen_on(self, url: Url) -> Result> { if url.scheme() != "tcp" { @@ -55,8 +60,10 @@ impl Transport for TcpTransport { Self { ttl, backlog } } - async fn accept(listener: Arc) -> Self::Connector { - listener.accept().await.unwrap().0 + async fn accept( + listener: Arc, + ) -> Result> { + Ok(listener.accept().await?.0) } } @@ -76,7 +83,10 @@ impl TcpTransport { Ok(socket) } - async fn do_listen(self, socket_addr: SocketAddr) -> Result { + async fn do_listen( + self, + socket_addr: SocketAddr, + ) -> Result> { let socket = self.create_socket(socket_addr)?; socket.bind(&socket_addr.into())?; socket.listen(self.backlog)?; @@ -84,7 +94,10 @@ impl TcpTransport { Ok(TcpListener::from(std::net::TcpListener::from(socket))) } - async fn do_dial(self, socket_addr: SocketAddr) -> Result { + async fn do_dial( + self, + socket_addr: SocketAddr, + ) -> Result> { let socket = self.create_socket(socket_addr)?; socket.set_nonblocking(true)?; @@ -92,7 +105,7 @@ impl TcpTransport { Ok(()) => {} Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - Err(err) => return Err(err), + Err(err) => return Err(TransportError::Other(err)), }; let stream = TcpStream::from(std::net::TcpStream::from(socket)); diff --git a/src/net2/transport/tls.rs b/src/net2/transport/tls.rs index 0e1e07124..54db7c8c2 100644 --- a/src/net2/transport/tls.rs +++ b/src/net2/transport/tls.rs @@ -87,9 +87,14 @@ impl Transport for TlsTransport { type Error = io::Error; - type Listener = - Pin> + Send + Sync>>; - type Dial = Pin> + Send + Sync>>; + type Listener = Pin< + Box>> + Send + Sync>, + >; + type Dial = Pin< + Box< + dyn Future>> + Send + Sync, + >, + >; fn listen_on(self, url: Url) -> Result> { if url.scheme() != "tls" { @@ -151,9 +156,11 @@ impl Transport for TlsTransport { Self { ttl, backlog, server_config, client_config } } - async fn accept(listener: Arc) -> Self::Connector { - let stream = listener.1.accept().await.unwrap().0; - listener.0.accept(stream).await.unwrap().into() + async fn accept( + listener: Arc, + ) -> Result> { + let stream = listener.1.accept().await?.0; + Ok(listener.0.accept(stream).await?.into()) } } @@ -173,7 +180,10 @@ impl TlsTransport { Ok(socket) } - async fn do_listen(self, url: Url) -> Result<(TlsAcceptor, TcpListener), io::Error> { + async fn do_listen( + self, + url: Url, + ) -> Result<(TlsAcceptor, TcpListener), TransportError> { let socket_addr = url.socket_addrs(|| None)?[0]; let socket = self.create_socket(socket_addr)?; socket.bind(&socket_addr.into())?; @@ -185,7 +195,7 @@ impl TlsTransport { Ok((acceptor, listener)) } - async fn do_dial(self, url: Url) -> Result, io::Error> { + async fn do_dial(self, url: Url) -> Result, TransportError> { let socket_addr = url.socket_addrs(|| None)?[0]; let server_name = ServerName::try_from("dark.fi").unwrap(); let socket = self.create_socket(socket_addr)?; @@ -197,7 +207,7 @@ impl TlsTransport { Ok(()) => {} Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - Err(err) => return Err(err), + Err(err) => return Err(TransportError::Other(err)), }; let stream = TcpStream::from(std::net::TcpStream::from(socket)); diff --git a/src/net2/transport/tor.rs b/src/net2/transport/tor.rs index 2ee2b9545..e0f9d36e4 100644 --- a/src/net2/transport/tor.rs +++ b/src/net2/transport/tor.rs @@ -194,7 +194,10 @@ impl TorTransport { .create_ehs(url) } - pub async fn do_dial(self, url: Url) -> Result, TorError> { + pub async fn do_dial( + self, + url: Url, + ) -> Result, TransportError> { let socks_url_str = self.socks_url.socket_addrs(|| None)?[0].to_string(); let host = url.host().unwrap().to_string(); let port = url.port().unwrap_or(80); @@ -209,11 +212,12 @@ impl TorTransport { self.socks_url.password().unwrap().to_string(), config, ) - .await? + .await } else { - Socks5Stream::connect(socks_url_str, host, port, config).await? + Socks5Stream::connect(socks_url_str, host, port, config).await }; - Ok(stream) + // FIXME + Ok(stream.unwrap()) } fn create_socket(&self, socket_addr: SocketAddr) -> io::Result { @@ -226,7 +230,7 @@ impl TorTransport { Ok(socket) } - pub async fn do_listen(self, url: Url) -> Result { + pub async fn do_listen(self, url: Url) -> Result> { let socket_addr = url.socket_addrs(|| None)?[0]; let socket = self.create_socket(socket_addr)?; socket.bind(&socket_addr.into())?; @@ -243,9 +247,14 @@ impl Transport for TorTransport { type Error = TorError; - type Listener = - Pin> + Send + Sync>>; - type Dial = Pin> + Send + Sync>>; + type Listener = Pin< + Box>> + Send + Sync>, + >; + type Dial = Pin< + Box< + dyn Future>> + Send + Sync, + >, + >; fn listen_on(self, url: Url) -> Result> { if url.scheme() != "tcp" { @@ -261,7 +270,9 @@ impl Transport for TorTransport { unimplemented!() } - async fn accept(_listener: Arc) -> Self::Connector { + async fn accept( + _listener: Arc, + ) -> Result> { unimplemented!() } }