Merge branch 'master' of github.com:darkrenaissance/darkfi

This commit is contained in:
lunar-mining
2022-04-22 08:03:54 +02:00
19 changed files with 385 additions and 253 deletions

View File

@@ -68,25 +68,29 @@ the probability that a party holding all the stake will be selected to
be a leader. Stakeholder is selected as leader for slot j with
probability $\phi_f(\alpha_i)$, $\alpha_i$ is $U_i$ stake.
### absolute stake aggregation dependent leader selection functions
#### linear functions
### linear aggregation dependent leader selection
in the previous leader selection function, it has the unique property of
independent aggregation of the stakes, meaning the property of a leader winning leadership with stakes $\sigma$ is independent of whether the stakeholder would act as a pool of stakes, or distributed stakes on competing coins.
"one minus the probability" of winning leadership with aggregated stakes is
$1-\phi(\sum_{i}\sigma_i)=1-(1+(1-f)^{\sigma_i})=-(1-f)^{\sum_{i}\sigma_i}$,
the joint "one minus probability" of all the stakes (each with probability $\phi(\sigma_i))$
winning aggregated winning the leadership
$\prod_{i}^{n}(1-\phi(\sigma_i))=-(1-f)^{\sum_{\sigma_i}}$
$\prod_{i}^{n}(1-\phi(\sigma_i))=-(1-f)^{\sum_i(\sigma_i)}$
thus:
$$ 1-\phi(\sum_{i}\sigma_i) =\prod_{i}^{n}(1-\phi(\sigma_i)) $$
#### linear leader selection
##### linear leader selection
$$y < T $$
$$y = 2^lk \mid 0 \le k \le 1$$
$$T = 2^l\phi(v)$$
$$ \phi(v)=\frac{1}{v_{max}}v $$
#### dependent aggregation
##### dependent aggregation
linear leader selection has the dependent aggregation property, meaning it's favorable to compete in pools with sum of the stakes over aggregated stakes of distributed stakes:
$$\phi(\sum_{i}{\sigma_i})>\prod_{i}^{n}{\sigma_i}$$
@@ -95,15 +99,48 @@ let's assume the stakes are divided to stakes of value $\sigma_i=1$ for $\Sigma>
$$V>(\frac{1}{v_{max}})^{n-1}$$
note that $(\frac{1}{v_{max}})^{n-1} < 1, V>1$, thus competing with single coin of the sum of stakes held by the stakeholder is favourable.
### scalar linear aggregation dependent leader selection
##### scalar linear aggregation dependent leader selection
a target function T with scalar coefficients can be formalized as
$$T=2^lk\phi(\Sigma)=2^l(\frac{1}{v_{max}})\Sigma$$
let's assume $v_{max}=2^v$, then:
$$T=2^lk\phi(\Sigma)=2^{\frac{l}{v}}\Sigma$$
$$T=2^lk\phi(\Sigma)=2^{l-v}\Sigma$$
then the lead statement is
$$y<2^{\frac{l}{v}}\Sigma$$ for example for a group order or l=24 bits, and maximum value of $v_{max}=2^{10}$, then lead statement:
$$y<2^{l-v}\Sigma$$ for example for a group order or l= 24 bits, and maximum value of $v_{max}=2^{10}$, then lead statement:
$$y<2^{14}\Sigma$$
##### competing max value coins
for a stakeholder with $nv_{max}$ absolute stake, $\mid n \in \mathbb{Z}$ it's advantageous for the stakeholder to
distribute stakes on $n$ competing coins.
#### inverse functions
inverse lead selection functions doesn't require maximum stake, most suitable for absolute stake,
it has the disadvantage that it's inflating with increasing rate as time goes on , but it can be function of the inverse of the slot
to control the increasing frequency of winning leadership.
##### leader selection without maximum stake upper limit
the inverse leader selection without maximum stake value can be $\phi(v)=\frac{v}{v+c}$ where c is $ > 1$
and inversely proportional with probability of winning leadership, let it be called leadership coefficient.
##### decaying linear leader selection
as the time goes one, and stakes increase, this means the combined stakes of all stakeholders increases the probability
of winning leadership in next slots leading to more leaders at a single slot, to maintain, or to be more general to control this frequency of leaders per slot,
c (the leadership coefficient) need to be function of the slot $sl$, i.e $c(sl) = \frac{sl}{R}$ where $R$ is epoch size (number of slots in epoch).
##### pairing leader selection independent aggregation function
the only family of functions that are isomorphic to summation on multiplication (having the independent aggregation property) is the exponential function,
and since it's impossible to implement in plonk,
###### TODO (proof)
a re-formalization of the lead statement using pairing that is isomorphic to summation on multiplication is also an options.
## Leaky non-resettable beacon

Binary file not shown.

View File

@@ -333,7 +333,6 @@ struct StateInfo {
_id: u64,
_consensus: ConsensusInfo,
_blockchain: BlockchainInfo,
_unconfirmed_txs: Vec<Tx>,
}
impl StateInfo {
@@ -341,8 +340,7 @@ impl StateInfo {
let _id = state.id;
let _consensus = ConsensusInfo::new(&state.consensus);
let _blockchain = BlockchainInfo::new(&state.blockchain);
let _unconfirmed_txs = state.unconfirmed_txs.clone();
StateInfo { _id, _consensus, _blockchain, _unconfirmed_txs }
StateInfo { _id, _consensus, _blockchain }
}
}

View File

@@ -1,4 +1,4 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc, thread};
use std::{net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration};
use async_executor::Executor;
use async_trait::async_trait;
@@ -13,9 +13,8 @@ use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
block::{BlockOrder, BlockResponse},
blockchain::{ForkOrder, ForkResponse},
participant::Participant,
state::{ValidatorState, ValidatorStatePtr},
state::{ConsensusRequest, ConsensusResponse, ValidatorState, ValidatorStatePtr},
tx::Tx,
},
net,
@@ -38,8 +37,8 @@ use darkfi::{
use validatord::protocols::{
protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal,
protocol_sync::ProtocolSync, protocol_sync_forks::ProtocolSyncForks, protocol_tx::ProtocolTx,
protocol_vote::ProtocolVote,
protocol_sync::ProtocolSync, protocol_sync_consensus::ProtocolSyncConsensus,
protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
};
const CONFIG_FILE: &str = r"validatord_config.toml";
@@ -152,28 +151,28 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()>
Ok(())
}
async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Node starts syncing forks...");
async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Node starts syncing consensus state...");
// Using len here because is_empty() uses unstable library feature 'exact_size_is_empty'
if p2p.channels().lock().await.values().len() != 0 {
// Nodes ask for the fork chains of the last channel peer
// Nodes ask for the consensus state of the last channel peer
let channel = p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<ForkResponse>().await;
message_subsytem.add_dispatch::<ConsensusResponse>().await;
let response_sub = channel
.subscribe_msg::<ForkResponse>()
.subscribe_msg::<ConsensusResponse>()
.await
.expect("Missing ForkResponse dispatcher!");
.expect("Missing ConsensusResponse dispatcher!");
// Node creates a BlockOrder and sends it
let order = ForkOrder { id: state.read().unwrap().id };
channel.send(order).await?;
// Node creates a ConsensusRequest and sends it
let request = ConsensusRequest { id: state.read().unwrap().id };
channel.send(request).await?;
// Node stores responce data. Extra validations can be added here.
let response = response_sub.receive().await?;
state.write().unwrap().consensus.proposals = response.proposals.clone();
state.write().unwrap().consensus = response.consensus.clone();
} else {
info!("Node is not connected to other nodes, resetting consensus state.");
state.write().unwrap().reset_consensus_state()?;
@@ -184,28 +183,47 @@ async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Resul
}
async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Node syncs its fork chains
let result = syncing_forks_task(p2p.clone(), state.clone()).await;
// Node waits just before the current or next epoch end,
// so it can start syncing latest state.
let mut seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
let one_sec = Duration::new(1, 0);
loop {
if seconds_until_next_epoch > one_sec {
seconds_until_next_epoch = seconds_until_next_epoch - one_sec;
break
}
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
}
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
// Node syncs its consensus state
let result = syncing_consensus_task(p2p.clone(), state.clone()).await;
match result {
Ok(()) => (),
Err(e) => error!("Sync forks failed. Error: {:?}", e),
Err(e) => error!("Sync consensus state failed. Error: {:?}", e),
}
// Node signals the network that it starts participating
// Node signals the network that it will start participating
let participant =
Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch());
state.write().unwrap().append_participant(participant.clone());
let result = p2p.broadcast(participant).await;
let result = p2p.broadcast(participant.clone()).await;
match result {
Ok(()) => info!("Participation message broadcasted successfuly."),
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
// After initialization node should wait for next epoch
// After initialization node waits for next epoch to start participating
let seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
// Node modifies its participating flag to true
state.write().unwrap().participating = true;
loop {
// Node refreshes participants records
state.write().unwrap().refresh_participants();
@@ -268,7 +286,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
}
};
// Node waits untile next epoch
// Node waits until next epoch
let seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
@@ -315,10 +333,20 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
// Adding ProtocolSync to the registry
let state2 = state.clone();
let consensus_mode = true; // This flag should be based on staking
registry
.register(net::SESSION_ALL, move |channel, main_p2p| {
let state = state2.clone();
async move { ProtocolSync::init(channel, state, main_p2p).await }
async move { ProtocolSync::init(channel, state, main_p2p, consensus_mode).await }
})
.await;
// Adding ProtocolTx to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, main_p2p| {
let state = state2.clone();
async move { ProtocolTx::init(channel, state, main_p2p).await }
})
.await;
@@ -335,6 +363,18 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
})
.detach();
// RPC interface
let ex2 = executor.clone();
let ex3 = ex2.clone();
let rpc_interface = Arc::new(JsonRpcInterface {
state: state.clone(),
p2p: main_p2p.clone(),
_rpc_listen_addr: opts.rpc,
});
executor
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
// Node starts syncing
let state2 = state.clone();
syncing_task(main_p2p.clone(), state2).await?;
@@ -343,15 +383,6 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
let consensus_p2p = net::P2p::new(consensus_subnet_settings).await;
let registry = consensus_p2p.protocol_registry();
// Adding ProtocolTx to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, consensus_p2p| {
let state = state2.clone();
async move { ProtocolTx::init(channel, state, consensus_p2p).await }
})
.await;
// Adding PropotolVote to the registry
let p2p = main_p2p.clone();
let state2 = state.clone();
@@ -386,7 +417,7 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
registry
.register(net::SESSION_ALL, move |channel, _consensus_p2p| {
let state = state2.clone();
async move { ProtocolSyncForks::init(channel, state).await }
async move { ProtocolSyncConsensus::init(channel, state).await }
})
.await;
@@ -403,18 +434,6 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
})
.detach();
// RPC interface
let ex2 = executor.clone();
let ex3 = ex2.clone();
let rpc_interface = Arc::new(JsonRpcInterface {
state: state.clone(),
p2p: consensus_p2p.clone(),
_rpc_listen_addr: opts.rpc,
});
executor
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
proposal_task(consensus_p2p, state).await;
Ok(())

View File

@@ -1,13 +1,13 @@
pub mod protocol_participant;
pub mod protocol_proposal;
pub mod protocol_sync;
pub mod protocol_sync_forks;
pub mod protocol_sync_consensus;
pub mod protocol_tx;
pub mod protocol_vote;
pub use protocol_participant::ProtocolParticipant;
pub use protocol_proposal::ProtocolProposal;
pub use protocol_sync::ProtocolSync;
pub use protocol_sync_forks::ProtocolSyncForks;
pub use protocol_sync_consensus::ProtocolSyncConsensus;
pub use protocol_tx::ProtocolTx;
pub use protocol_vote::ProtocolVote;

View File

@@ -49,12 +49,10 @@ impl ProtocolParticipant {
"ProtocolParticipant::handle_receive_participant() received {:?}",
participant
);
if self.state.write().unwrap().append_participant((*participant).clone()) {
let pending_participants =
self.state.read().unwrap().consensus.pending_participants.clone();
for pending_participant in pending_participants {
self.p2p.broadcast(pending_participant.clone()).await?;
}
let participant_copy = (*participant).clone();
if self.state.write().unwrap().append_participant(participant_copy.clone()) {
self.p2p.broadcast(participant_copy).await?;
}
}
}

View File

@@ -20,46 +20,49 @@ const BATCH: u64 = 10;
pub struct ProtocolSync {
channel: ChannelPtr,
order_sub: MessageSubscription<BlockOrder>,
request_sub: MessageSubscription<BlockOrder>,
block_sub: MessageSubscription<BlockInfo>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
_p2p: P2pPtr,
p2p: P2pPtr,
consensus_mode: bool,
}
impl ProtocolSync {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
_p2p: P2pPtr,
p2p: P2pPtr,
consensus_mode: bool,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockOrder>().await;
message_subsytem.add_dispatch::<BlockInfo>().await;
let order_sub =
let request_sub =
channel.subscribe_msg::<BlockOrder>().await.expect("Missing BlockOrder dispatcher!");
let block_sub =
channel.subscribe_msg::<BlockInfo>().await.expect("Missing BlockInfo dispatcher!");
Arc::new(Self {
channel: channel.clone(),
order_sub,
request_sub,
block_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel),
state,
_p2p,
p2p,
consensus_mode,
})
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::handle_receive_tx() [START]");
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::handle_receive_request() [START]");
loop {
let order = self.order_sub.receive().await?;
let order = self.request_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSync::handle_receive_order() received {:?}",
"ProtocolSync::handle_receive_request() received {:?}",
order
);
@@ -82,18 +85,19 @@ impl ProtocolSync {
info
);
// TODO: Following code should be executed only by replicators, not consensus nodes.
// Commented for now, as to not mess consensus testing.
// (Don't forget to remove _ from _p2p)
/*
// Node stores finalized block, if it doesn't exists (checking by slot).
// Node stores finalized block, if it doesn't exists (checking by slot),
// and removes its transactions from the unconfirmed_txs vector.
// Consensus mode enabled nodes have already performed this steps,
// during proposal finalization.
// Extra validations can be added here.
let info_copy = (*info).clone();
if !self.state.read().unwrap().blockchain.has_block(&info_copy)? {
self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?;
self.p2p.broadcast(info_copy).await?;
if !self.consensus_mode {
let info_copy = (*info).clone();
if !self.state.read().unwrap().blockchain.has_block(&info_copy)? {
self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?;
self.state.write().unwrap().remove_txs(info_copy.txs.clone())?;
self.p2p.broadcast(info_copy).await?;
}
}
*/
}
}
}
@@ -103,7 +107,7 @@ impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await;
debug!(target: "ircd", "ProtocolSync::start() [END]");
Ok(())

View File

@@ -0,0 +1,72 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::state::{ConsensusRequest, ConsensusResponse, ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolSyncConsensus {
channel: ChannelPtr,
request_sub: MessageSubscription<ConsensusRequest>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
}
impl ProtocolSyncConsensus {
pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<ConsensusRequest>().await;
let request_sub = channel
.subscribe_msg::<ConsensusRequest>()
.await
.expect("Missing ConsensusRequest dispatcher!");
Arc::new(Self {
channel: channel.clone(),
request_sub,
jobsman: ProtocolJobsManager::new("SyncConsensusProtocol", channel),
state,
})
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncConsensus::handle_receive_request() [START]");
loop {
let order = self.request_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSyncConsensus::handle_receive_request() received {:?}",
order
);
// Extra validations can be added here.
let consensus = self.state.read().unwrap().consensus.clone();
let response = ConsensusResponse { consensus };
self.channel.send(response).await?;
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSyncConsensus {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncConsensus::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
debug!(target: "ircd", "ProtocolSyncConsensus::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSyncConsensus"
}
}

View File

@@ -1,73 +0,0 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{
blockchain::{ForkOrder, ForkResponse},
state::ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolSyncForks {
channel: ChannelPtr,
order_sub: MessageSubscription<ForkOrder>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
}
impl ProtocolSyncForks {
pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<ForkOrder>().await;
let order_sub =
channel.subscribe_msg::<ForkOrder>().await.expect("Missing ForkOrder dispatcher!");
Arc::new(Self {
channel: channel.clone(),
order_sub,
jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel),
state,
})
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncForks::handle_receive_tx() [START]");
loop {
let order = self.order_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSyncForks::handle_receive_order() received {:?}",
order
);
// Extra validations can be added here.
let proposals = self.state.read().unwrap().consensus.proposals.clone();
let response = ForkResponse { proposals };
self.channel.send(response).await?;
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSyncForks {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncForks::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
debug!(target: "ircd", "ProtocolSyncForks::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSyncForks"
}
}

View File

@@ -49,6 +49,8 @@ impl ProtocolTx {
tx
);
let tx_copy = (*tx).clone();
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().unwrap().append_tx(tx_copy.clone()) {
self.p2p.broadcast(tx_copy).await?;
}

View File

@@ -3,7 +3,7 @@ use std::io;
use log::debug;
use crate::{
impl_vec, net,
impl_vec,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
@@ -201,29 +201,3 @@ impl ProposalsChain {
}
impl_vec!(ProposalsChain);
/// Auxilary structure used for forks syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ForkOrder {
/// Validator id
pub id: u64,
}
impl net::Message for ForkOrder {
fn name() -> &'static str {
"forkorder"
}
}
/// Auxilary structure used for forks syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ForkResponse {
/// Fork chains containing block proposals
pub proposals: Vec<ProposalsChain>,
}
impl net::Message for ForkResponse {
fn name() -> &'static str {
"forkresponse"
}
}

View File

@@ -14,6 +14,7 @@ use crate::{
keypair::{PublicKey, SecretKey},
schnorr::{SchnorrPublic, SchnorrSecret},
},
net,
util::serial::{deserialize, serialize, Encodable, SerialDecodable, SerialEncodable},
Error, Result,
};
@@ -28,11 +29,11 @@ use super::{
vote::Vote,
};
const DELTA: u64 = 60;
pub const DELTA: u64 = 10;
const SLED_CONSESUS_STATE_TREE: &[u8] = b"_consensus_state";
/// This struct represents the information required by the consensus algorithm.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ConsensusState {
/// Genesis block creation timestamp
pub genesis: Timestamp,
@@ -67,6 +68,32 @@ impl ConsensusState {
}
}
/// Auxilary structure used for consensus syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ConsensusRequest {
/// Validator id
pub id: u64,
}
impl net::Message for ConsensusRequest {
fn name() -> &'static str {
"consensusrequest"
}
}
/// Auxilary structure used for consensus syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ConsensusResponse {
/// Hot/live data used by the consensus algorithm
pub consensus: ConsensusState,
}
impl net::Message for ConsensusResponse {
fn name() -> &'static str {
"consensusresponse"
}
}
/// Atomic pointer to validator state.
pub type ValidatorStatePtr = Arc<RwLock<ValidatorState>>;
@@ -88,6 +115,8 @@ pub struct ValidatorState {
pub unconfirmed_txs: Vec<Tx>,
/// Genesis block hash, used for validations
pub genesis_block: blake3::Hash,
/// Participation flag
pub participating: bool,
}
impl ValidatorState {
@@ -100,6 +129,7 @@ impl ValidatorState {
let blockchain = Blockchain::new(&db, genesis)?;
let unconfirmed_txs = Vec::new();
let genesis_block = blake3::hash(&serialize(&Block::genesis_block(genesis)));
let participating = false;
Ok(Arc::new(RwLock::new(ValidatorState {
id,
secret,
@@ -109,6 +139,7 @@ impl ValidatorState {
blockchain,
unconfirmed_txs,
genesis_block,
participating,
})))
}
@@ -148,7 +179,6 @@ impl ValidatorState {
let epoch = self.current_epoch();
let mut hasher = DefaultHasher::new();
epoch.hash(&mut hasher);
self.zero_participants_check();
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
}
@@ -164,8 +194,8 @@ impl ValidatorState {
/// Proposal extends the longest notarized fork chain the node holds.
pub fn propose(&self) -> Result<Option<BlockProposal>> {
let epoch = self.current_epoch();
let previous_hash = self.longest_notarized_chain_last_hash().unwrap();
let unproposed_txs = self.unproposed_txs();
let (previous_hash, index) = self.longest_notarized_chain_last_hash().unwrap();
let unproposed_txs = self.unproposed_txs(index);
let metadata = Metadata::new(
get_current_time(),
String::from("proof"),
@@ -189,30 +219,43 @@ impl ValidatorState {
)))
}
/// Node retrieves all unconfiremd transactions not proposed in previous blocks.
pub fn unproposed_txs(&self) -> Vec<Tx> {
/// Node retrieves all unconfirmed transactions not proposed
/// in previous blocks of provided index chain.
pub fn unproposed_txs(&self, index: i64) -> Vec<Tx> {
let mut unproposed_txs = self.unconfirmed_txs.clone();
for chain in &self.consensus.proposals {
for proposal in &chain.proposals {
for tx in &proposal.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
// If index is -1(canonical blockchain) a new fork chain will be generated,
// therefore all unproposed transactions can be included in the proposal.
if index == -1 {
return unproposed_txs
}
// We iterate the fork chain proposals to find already proposed transactions
// and remove them from the local unproposed_txs vector.
let chain = &self.consensus.proposals[index as usize];
for proposal in &chain.proposals {
for tx in &proposal.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
}
}
unproposed_txs
}
/// Finds the longest fully notarized blockchain the node holds and returns the last block hash.
pub fn longest_notarized_chain_last_hash(&self) -> Result<blake3::Hash> {
/// Finds the longest fully notarized blockchain the node holds and returns the last block hash
/// and the chain index.
pub fn longest_notarized_chain_last_hash(&self) -> Result<(blake3::Hash, i64)> {
let mut longest_notarized_chain: Option<ProposalsChain> = None;
let mut length = 0;
let mut index = -1;
if !self.consensus.proposals.is_empty() {
for chain in &self.consensus.proposals[1..] {
for (i, chain) in self.consensus.proposals.iter().enumerate() {
if chain.notarized() && chain.proposals.len() > length {
longest_notarized_chain = Some(chain.clone());
length = chain.proposals.len();
index = i as i64;
}
}
}
@@ -222,12 +265,17 @@ impl ValidatorState {
None => self.blockchain.last()?.unwrap().1,
};
Ok(hash)
Ok((hash, index))
}
/// Node receives the proposed block, verifies its sender(epoch leader),
/// and proceeds with voting on it.
pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
// Node hasn't started participating
if !self.participating {
return Ok(None)
}
let leader = self.epoch_leader();
if leader != proposal.id {
debug!(
@@ -256,7 +304,6 @@ impl ValidatorState {
/// If proposal extends the canonical blockchain, a new fork chain is created.
/// Node votes on the proposal, only if it extends the longest notarized fork chain it has seen.
pub fn vote(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
self.zero_participants_check();
let mut proposal = proposal.clone();
// Generate proposal hash
@@ -318,7 +365,7 @@ impl ValidatorState {
}
/// Given a proposal, node finds the index of the chain it extends.
pub fn find_extended_chain_index(&self, proposal: &BlockProposal) -> Result<i64> {
pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result<i64> {
for (index, chain) in self.consensus.proposals.iter().enumerate() {
let last = chain.proposals.last().unwrap();
let hash = last.hash();
@@ -350,6 +397,11 @@ impl ValidatorState {
/// Finally, we check if the notarization of the proposal can finalize parent proposals
/// in its chain.
pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option<Vec<BlockInfo>>)> {
// Node hasn't started participating
if !self.participating {
return Ok((false, None))
}
let mut encoded_proposal = vec![];
let result = vote.proposal.encode(&mut encoded_proposal);
match result {
@@ -366,8 +418,6 @@ impl ValidatorState {
}
let nodes_count = self.consensus.participants.len();
self.zero_participants_check();
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.id) {
Some(participant) => {
@@ -440,6 +490,17 @@ impl ValidatorState {
Ok(None)
}
/// Note removes provided transactions vector, from unconfirmed_txs, if they exist.
pub fn remove_txs(&mut self, transactions: Vec<Tx>) -> Result<()> {
for tx in transactions {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
Ok(())
}
/// Provided an index, node checks if chain can be finalized.
/// Consensus finalization logic: If node has observed the notarization of 3 consecutive
/// proposals in a fork chain, it finalizes (appends to canonical blockchain) all proposals up to the middle block.
@@ -463,15 +524,11 @@ impl ValidatorState {
for proposal in &mut chain.proposals[..(consecutive - 1)] {
proposal.sm.finalized = true;
finalized.push(proposal.clone());
for tx in proposal.txs.clone() {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
}
chain.proposals.drain(0..(consecutive - 1));
for proposal in &finalized {
self.blockchain.add_by_proposal(proposal.clone())?;
self.remove_txs(proposal.txs.clone())?;
to_broadcast.push(BlockInfo::new(
proposal.st,
proposal.sl,
@@ -518,17 +575,6 @@ impl ValidatorState {
true
}
/// This prevent the extreme case scenario where network is initialized, but some nodes
/// have not pushed the initial participants in the map.
pub fn zero_participants_check(&mut self) {
if self.consensus.participants.len() == 0 {
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
self.consensus.pending_participants = Vec::new();
}
}
/// Node refreshes participants map, to retain only the active ones.
/// Active nodes are considered those who joined or voted on previous epoch.
pub fn refresh_participants(&mut self) {
@@ -557,6 +603,12 @@ impl ValidatorState {
for index in inactive {
self.consensus.participants.remove(&index);
}
if self.consensus.participants.is_empty() {
// If no nodes are active, node becomes a single node network.
let participant = Participant::new(self.id, self.current_epoch());
self.consensus.pending_participants.push(participant);
}
}
/// Util function to save the current consensus state to provided file path.

View File

@@ -229,6 +229,10 @@ pub enum Error {
#[error("Unsupported network transport")]
UnsupportedTransport,
#[cfg(feature = "net2")]
#[error("TransportError: {0}")]
TransportError(String),
}
#[cfg(feature = "node")]
@@ -353,6 +357,13 @@ impl From<wasmer::ExportError> for Error {
}
}
#[cfg(feature = "net2")]
impl<T: std::fmt::Display> From<crate::net2::transport::TransportError<T>> for Error {
fn from(err: crate::net2::transport::TransportError<T>) -> Error {
Error::TransportError(err.to_string())
}
}
#[cfg(feature = "wasm-runtime")]
impl From<wasmer::RuntimeError> for Error {
fn from(err: wasmer::RuntimeError) -> Error {

View File

@@ -61,9 +61,9 @@ impl<T: Transport> Acceptor<T> {
/// Run the accept loop.
async fn run_accept_loop(self: Arc<Self>, url: url::Url) -> Result<()> {
let transport = T::new(None, 1024);
let listener = Arc::new(transport.listen_on(url.clone()).unwrap().await.unwrap());
let listener = Arc::new(transport.listen_on(url.clone())?.await?);
loop {
let stream = T::accept(listener.clone()).await;
let stream = T::accept(listener.clone()).await?;
let channel = Channel::<T>::new(stream, url.clone()).await;
self.channel_subscriber.notify(Ok(channel)).await;
}

View File

@@ -23,7 +23,7 @@ impl Connector {
let stream_result =
timeout(Duration::from_secs(self.settings.connect_timeout_seconds.into()), async {
let transport = T::new(None, 1024);
let connect_stream = transport.dial(hostaddr.clone()).unwrap().await.unwrap();
let connect_stream = transport.dial(hostaddr.clone())?.await?;
let channel = Channel::<T>::new(connect_stream, hostaddr).await;
Ok(channel)
})

View File

@@ -20,8 +20,10 @@ pub trait Transport: Sync + Send + 'static + Clone {
type Error: Error;
type Listener: Future<Output = Result<Self::Acceptor, Self::Error>> + Sync + Send;
type Dial: Future<Output = Result<Self::Connector, Self::Error>> + Sync + Send;
type Listener: Future<Output = Result<Self::Acceptor, TransportError<Self::Error>>>
+ Sync
+ Send;
type Dial: Future<Output = Result<Self::Connector, TransportError<Self::Error>>> + Sync + Send;
fn listen_on(self, url: Url) -> Result<Self::Listener, TransportError<Self::Error>>
where
@@ -33,7 +35,9 @@ pub trait Transport: Sync + Send + 'static + Clone {
fn new(ttl: Option<u32>, backlog: i32) -> Self;
async fn accept(listener: Arc<Self::Acceptor>) -> Self::Connector;
async fn accept(
listener: Arc<Self::Acceptor>,
) -> Result<Self::Connector, TransportError<Self::Error>>;
}
#[derive(Debug, thiserror::Error)]

View File

@@ -27,9 +27,14 @@ impl Transport for TcpTransport {
type Error = io::Error;
type Listener =
Pin<Box<dyn Future<Output = Result<Self::Acceptor, Self::Error>> + Send + Sync>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Connector, Self::Error>> + Send + Sync>>;
type Listener = Pin<
Box<dyn Future<Output = Result<Self::Acceptor, TransportError<Self::Error>>> + Send + Sync>,
>;
type Dial = Pin<
Box<
dyn Future<Output = Result<Self::Connector, TransportError<Self::Error>>> + Send + Sync,
>,
>;
fn listen_on(self, url: Url) -> Result<Self::Listener, TransportError<Self::Error>> {
if url.scheme() != "tcp" {
@@ -55,8 +60,10 @@ impl Transport for TcpTransport {
Self { ttl, backlog }
}
async fn accept(listener: Arc<Self::Acceptor>) -> Self::Connector {
listener.accept().await.unwrap().0
async fn accept(
listener: Arc<Self::Acceptor>,
) -> Result<Self::Connector, TransportError<Self::Error>> {
Ok(listener.accept().await?.0)
}
}
@@ -76,7 +83,10 @@ impl TcpTransport {
Ok(socket)
}
async fn do_listen(self, socket_addr: SocketAddr) -> Result<TcpListener, io::Error> {
async fn do_listen(
self,
socket_addr: SocketAddr,
) -> Result<TcpListener, TransportError<io::Error>> {
let socket = self.create_socket(socket_addr)?;
socket.bind(&socket_addr.into())?;
socket.listen(self.backlog)?;
@@ -84,7 +94,10 @@ impl TcpTransport {
Ok(TcpListener::from(std::net::TcpListener::from(socket)))
}
async fn do_dial(self, socket_addr: SocketAddr) -> Result<TcpStream, io::Error> {
async fn do_dial(
self,
socket_addr: SocketAddr,
) -> Result<TcpStream, TransportError<io::Error>> {
let socket = self.create_socket(socket_addr)?;
socket.set_nonblocking(true)?;
@@ -92,7 +105,7 @@ impl TcpTransport {
Ok(()) => {}
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
Err(err) => return Err(err),
Err(err) => return Err(TransportError::Other(err)),
};
let stream = TcpStream::from(std::net::TcpStream::from(socket));

View File

@@ -87,9 +87,14 @@ impl Transport for TlsTransport {
type Error = io::Error;
type Listener =
Pin<Box<dyn Future<Output = Result<Self::Acceptor, Self::Error>> + Send + Sync>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Connector, Self::Error>> + Send + Sync>>;
type Listener = Pin<
Box<dyn Future<Output = Result<Self::Acceptor, TransportError<Self::Error>>> + Send + Sync>,
>;
type Dial = Pin<
Box<
dyn Future<Output = Result<Self::Connector, TransportError<Self::Error>>> + Send + Sync,
>,
>;
fn listen_on(self, url: Url) -> Result<Self::Listener, TransportError<Self::Error>> {
if url.scheme() != "tls" {
@@ -151,9 +156,11 @@ impl Transport for TlsTransport {
Self { ttl, backlog, server_config, client_config }
}
async fn accept(listener: Arc<Self::Acceptor>) -> Self::Connector {
let stream = listener.1.accept().await.unwrap().0;
listener.0.accept(stream).await.unwrap().into()
async fn accept(
listener: Arc<Self::Acceptor>,
) -> Result<Self::Connector, TransportError<Self::Error>> {
let stream = listener.1.accept().await?.0;
Ok(listener.0.accept(stream).await?.into())
}
}
@@ -173,7 +180,10 @@ impl TlsTransport {
Ok(socket)
}
async fn do_listen(self, url: Url) -> Result<(TlsAcceptor, TcpListener), io::Error> {
async fn do_listen(
self,
url: Url,
) -> Result<(TlsAcceptor, TcpListener), TransportError<io::Error>> {
let socket_addr = url.socket_addrs(|| None)?[0];
let socket = self.create_socket(socket_addr)?;
socket.bind(&socket_addr.into())?;
@@ -185,7 +195,7 @@ impl TlsTransport {
Ok((acceptor, listener))
}
async fn do_dial(self, url: Url) -> Result<TlsStream<TcpStream>, io::Error> {
async fn do_dial(self, url: Url) -> Result<TlsStream<TcpStream>, TransportError<io::Error>> {
let socket_addr = url.socket_addrs(|| None)?[0];
let server_name = ServerName::try_from("dark.fi").unwrap();
let socket = self.create_socket(socket_addr)?;
@@ -197,7 +207,7 @@ impl TlsTransport {
Ok(()) => {}
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
Err(err) => return Err(err),
Err(err) => return Err(TransportError::Other(err)),
};
let stream = TcpStream::from(std::net::TcpStream::from(socket));

View File

@@ -194,7 +194,10 @@ impl TorTransport {
.create_ehs(url)
}
pub async fn do_dial(self, url: Url) -> Result<Socks5Stream<TcpStream>, TorError> {
pub async fn do_dial(
self,
url: Url,
) -> Result<Socks5Stream<TcpStream>, TransportError<TorError>> {
let socks_url_str = self.socks_url.socket_addrs(|| None)?[0].to_string();
let host = url.host().unwrap().to_string();
let port = url.port().unwrap_or(80);
@@ -209,11 +212,12 @@ impl TorTransport {
self.socks_url.password().unwrap().to_string(),
config,
)
.await?
.await
} else {
Socks5Stream::connect(socks_url_str, host, port, config).await?
Socks5Stream::connect(socks_url_str, host, port, config).await
};
Ok(stream)
// FIXME
Ok(stream.unwrap())
}
fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
@@ -226,7 +230,7 @@ impl TorTransport {
Ok(socket)
}
pub async fn do_listen(self, url: Url) -> Result<TcpListener, TorError> {
pub async fn do_listen(self, url: Url) -> Result<TcpListener, TransportError<TorError>> {
let socket_addr = url.socket_addrs(|| None)?[0];
let socket = self.create_socket(socket_addr)?;
socket.bind(&socket_addr.into())?;
@@ -243,9 +247,14 @@ impl Transport for TorTransport {
type Error = TorError;
type Listener =
Pin<Box<dyn Future<Output = Result<Self::Acceptor, Self::Error>> + Send + Sync>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Connector, Self::Error>> + Send + Sync>>;
type Listener = Pin<
Box<dyn Future<Output = Result<Self::Acceptor, TransportError<Self::Error>>> + Send + Sync>,
>;
type Dial = Pin<
Box<
dyn Future<Output = Result<Self::Connector, TransportError<Self::Error>>> + Send + Sync,
>,
>;
fn listen_on(self, url: Url) -> Result<Self::Listener, TransportError<Self::Error>> {
if url.scheme() != "tcp" {
@@ -261,7 +270,9 @@ impl Transport for TorTransport {
unimplemented!()
}
async fn accept(_listener: Arc<Self::Acceptor>) -> Self::Connector {
async fn accept(
_listener: Arc<Self::Acceptor>,
) -> Result<Self::Connector, TransportError<Self::Error>> {
unimplemented!()
}
}