validatord: p2p impl for all consensus messages implemented, consensus structures added at src/consensus, WIP: consensus network

This commit is contained in:
aggstam
2022-03-26 17:02:08 +02:00
committed by parazyd
parent bd580fbf35
commit 5a00ea2c51
18 changed files with 1255 additions and 215 deletions

View File

@@ -68,6 +68,7 @@ indexmap = {version = "1.8.0", optional = true}
itertools = {version = "0.10.3", optional = true}
darkfi-derive = {path = "src/util/derive", optional = true}
darkfi-derive-internal = {path = "src/util/derive-internal", optional = true}
chrono = {version = "0.4.19", optional = true}
# Misc
termion = {version = "1.5.6", optional = true}
@@ -183,6 +184,7 @@ blockchain = [
"async-runtime",
"util",
"chrono",
]
system = [

View File

@@ -5,7 +5,7 @@ edition = "2021"
[dependencies.darkfi]
path = "../../../"
features = ["net", "rpc"]
features = ["blockchain", "crypto", "net", "rpc"]
[dependencies]
@@ -21,17 +21,14 @@ easy-parallel = "3.2.0"
rand = "0.8.5"
# Structopt dependencies for arguments parsing
serde = "1.0.136"
serde = "1.0.104"
serde_json = "1.0.79"
serde_derive = "1.0.136"
structopt = "0.3.26"
structopt = "0.3.11"
structopt-toml = "0.5.0"
toml = "0.5.8"
# Misc
log = "0.4.16"
num_cpus = "1.13.1"
simplelog = "0.12.0-alpha1"
fxhash = "0.2.1"
simplelog = "0.11.2"
[workspace]

View File

@@ -1,23 +1,18 @@
// In a terminal start a seed node
$ cargo +nightly run -- -v
$ cargo +nightly run --
// In a new terminal start node 1
$ cargo +nightly run -- -v --accept 0.0.0.0:11001 --slots 5 --seeds 127.0.0.1:9999 --rpc 127.0.0.1:6661 --external 127.0.0.1:11001
$ cargo +nightly run -- --accept 0.0.0.0:11001 --seeds 127.0.0.1:11000 --rpc 127.0.0.1:6661 --external 127.0.0.1:11001 --id 1 --state ~/.config/darkfi/validator_state_1
// In a new terminal start node 2
$ cargo +nightly run -- -v --accept 0.0.0.0:11002 --slots 5 --seeds 127.0.0.1:9999 --rpc 127.0.0.1:6662 --external 127.0.0.1:11002
$ cargo +nightly run -- --accept 0.0.0.0:11002 --seeds 127.0.0.1:11000 --rpc 127.0.0.1:6662 --external 127.0.0.1:11002 --id 2 --state ~/.config/darkfi/validator_state_2
// In a new terminal start node 3
$ cargo +nightly run -- -v --accept 0.0.0.0:11003 --slots 5 --seeds 127.0.0.1:9999 --rpc 127.0.0.1:6663 --external 127.0.0.1:11003
$ cargo +nightly run -- --accept 0.0.0.0:11003 --seeds 127.0.0.1:11000 --rpc 127.0.0.1:6663 --external 127.0.0.1:11003 --id 3 --state ~/.config/darkfi/validator_state_3
// In a new terminal, telnet to one of the nodes and submit a tx json command
$ telnet 127.0.0.1 6661
json: {"jsonrpc": "2.0", "method": "receive_tx", "params": ["tx"], "id": 42}
// In new terminal windows(one for each node), telnet rest nodes
$ telnet 127.0.0.1 6661
$ telnet 127.0.0.1 6662
$ telnet 127.0.0.1 6663
// verify they all hold the same tx in the state file of each node
// verify they all hold the same tx by submiting the following json in each terminal
json: {"jsonrpc": "2.0", "method": "get_tx_pool", "params": [], "id": 42}

View File

@@ -12,6 +12,10 @@ use structopt::StructOpt;
use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
state::{State, StatePtr},
tx::Tx,
},
net,
rpc::{
jsonrpc,
@@ -24,14 +28,14 @@ use darkfi::{
},
util::{
cli::{log_config, spawn_config},
expand_path,
path::get_config_path,
},
Result,
};
use validatord::protocols::{
protocol_tx_pool::ProtocolTxPool,
tx_pool::{Tx, TxPool, TxPoolPtr},
protocol_proposal::ProtocolProposal, protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
};
const CONFIG_FILE: &str = r"validatord_config.toml";
@@ -43,36 +47,42 @@ struct Opt {
#[structopt(short, long, default_value = CONFIG_FILE)]
/// Configuration file to use
config: String,
#[structopt(long)]
#[structopt(long, default_value = "0.0.0.0:11000")]
/// Accept address
accept: Option<SocketAddr>,
accept: SocketAddr,
#[structopt(long)]
/// Seed nodes
seeds: Vec<SocketAddr>,
#[structopt(long)]
/// Manual connections
connect: Vec<SocketAddr>,
#[structopt(long, default_value = "0")]
#[structopt(long, default_value = "5")]
/// Connection slots
slots: u32,
#[structopt(long)]
#[structopt(long, default_value = "127.0.0.1:11000")]
/// External address
external: Option<SocketAddr>,
external: SocketAddr,
#[structopt(long, default_value = "/tmp/darkfid.log")]
/// Logfile path
log: String,
#[structopt(long, default_value = "127.0.0.1:9000")]
#[structopt(long, default_value = "127.0.0.1:6660")]
/// The endpoint where validatord will bind its RPC socket
rpc: SocketAddr,
#[structopt(long)]
/// Whether to listen with TLS or plain TCP
serve_tls: bool,
tls: bool,
#[structopt(long, default_value = "~/.config/darkfi/validatord_identity.pfx")]
/// TLS certificate to use
tls_identity_path: PathBuf,
identity: PathBuf,
#[structopt(long, default_value = "FOOBAR")]
/// Password for the created TLS identity
tls_identity_password: String,
password: String,
#[structopt(long, default_value = "~/.config/darkfi/validatord_state_0")]
/// Path to the state file
state: String,
#[structopt(long, default_value = "0")]
/// How many threads to utilize
id: u64,
#[structopt(short, long, default_value = "0")]
/// How many threads to utilize
threads: usize,
@@ -81,49 +91,130 @@ struct Opt {
verbose: u8,
}
fn proposal_task() {
// TODO:
// 1. Nodes count not hardcoded.
// 2. Remove dummy delay.
async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf) {
let nodes_count = 4;
// After initialization node should wait for next epoch
let seconds_until_next_epoch = state.read().unwrap().get_seconds_until_next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
loop {
info!("Waiting for next epoch({:?} sec)...", 20);
thread::sleep(time::Duration::from_secs(20));
let result = if state.read().unwrap().check_if_epoch_leader(nodes_count) {
state.read().unwrap().propose_block()
} else {
Ok(None)
};
match result {
Ok(proposal) => {
if proposal.is_none() {
info!("Node is not the epoch leader. Sleeping till next epoch...");
} else {
let unwrapped = proposal.unwrap();
info!("Node is the epoch leader. Proposed block: {:?}", unwrapped);
let vote = state.write().unwrap().receive_proposed_block(
&unwrapped,
nodes_count,
true,
);
match vote {
Ok(x) => {
if x.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
state.write().unwrap().receive_vote(&vote, nodes_count as usize);
// Broadcasting block
let result = p2p.broadcast(unwrapped).await;
match result {
Ok(()) => info!("Proposal broadcasted successfuly."),
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
// Broadcasting leader vote
thread::sleep(time::Duration::from_secs(10)); // communication delay simulation
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => info!("Leader vote broadcasted successfuly."),
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
}
}
Err(e) => {
debug!(target: "ircd", "ProtocolBlock::handle_receive_proposal() error prosessing proposal: {:?}", e)
}
}
}
}
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
let seconds_until_next_epoch = state.read().unwrap().get_seconds_until_next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
let result = state.read().unwrap().save(state_path);
match result {
Ok(()) => (),
Err(e) => {
debug!(target: "ircd", "ProtocolVote::handle_receive_proposal() error saving state: {:?}", e)
}
};
}
}
async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
let rpc_server_config = RpcServerConfig {
socket_addr: opts.rpc,
use_tls: opts.serve_tls,
identity_path: opts.tls_identity_path.clone(),
identity_pass: opts.tls_identity_password.clone(),
use_tls: opts.tls,
identity_path: opts.identity.clone(),
identity_pass: opts.password.clone(),
};
let network_settings = net::Settings {
inbound: opts.accept,
inbound: Some(opts.accept),
outbound_connections: opts.slots,
external_addr: opts.external,
external_addr: Some(opts.external),
peers: opts.connect.clone(),
seeds: opts.seeds.clone(),
..Default::default()
};
let tx_pool = TxPool::new();
// State setup
let state_path = expand_path(&opts.state).unwrap();
let id = opts.id.clone();
let state = State::load_current_state(id, &state_path).unwrap();
// P2P registry setup
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
let (sender, _) = async_channel::unbounded();
let tx_pool2 = tx_pool.clone();
let sender2 = sender.clone();
// Adding ProtocolTxPool to the registry
// Adding ProtocolTx to the registry
let state2 = state.clone();
registry
.register(!net::SESSION_SEED, move |channel, p2p| {
let sender = sender2.clone();
let tx_pool = tx_pool2.clone();
async move { ProtocolTxPool::init(channel, sender, tx_pool, p2p).await }
.register(net::SESSION_ALL, move |channel, _p2p| {
let state = state2.clone();
async move { ProtocolTx::init(channel, state).await }
})
.await;
// Adding PropotolVote to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, _p2p| {
let state = state2.clone();
async move { ProtocolVote::init(channel, state).await }
})
.await;
// Adding ProtocolProposal to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = state2.clone();
async move { ProtocolProposal::init(channel, state, p2p).await }
})
.await;
// TODO: Add protocols for rest message types (block, vote)
// Performs seed session
p2p.clone().start(executor.clone()).await?;
@@ -142,7 +233,7 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
let ex2 = executor.clone();
let ex3 = ex2.clone();
let rpc_interface = Arc::new(JsonRpcInterface {
tx_pool: tx_pool.clone(),
state: state.clone(),
p2p: p2p.clone(),
_rpc_listen_addr: opts.rpc,
});
@@ -150,20 +241,13 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
proposal_task();
// TODO:
// - Add protocols for tx message type - DONE
// - Add p2p impl - DONE
// - Add prc impl (to receive network staff) - DONE
// - Add block proposal task impl
// - Add tx receival like irc - DONE
proposal_task(p2p, state, &state_path).await;
Ok(())
}
struct JsonRpcInterface {
tx_pool: TxPoolPtr,
state: StatePtr,
p2p: net::P2pPtr,
_rpc_listen_addr: SocketAddr,
}
@@ -180,7 +264,6 @@ impl RequestHandler for JsonRpcInterface {
return match req.method.as_str() {
Some("ping") => self.pong(req.id, req.params).await,
Some("get_info") => self.get_info(req.id, req.params).await,
Some("get_tx_pool") => self.get_tx_pool(req.id, req.params).await,
Some("receive_tx") => self.receive_tx(req.id, req.params).await,
Some(_) | None => jsonrpc::error(MethodNotFound, None, req.id).into(),
}
@@ -201,13 +284,6 @@ impl JsonRpcInterface {
JsonResult::Resp(jsonresp(resp, id))
}
// --> {"jsonrpc": "2.0", "method": "get_tx_pool", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": {"tx_pool", "id": 42}
async fn get_tx_pool(&self, id: Value, _params: Value) -> JsonResult {
let pool = format!("{:?}", self.tx_pool);
JsonResult::Resp(jsonresp(json!(pool), id))
}
// --> {"jsonrpc": "2.0", "method": "receive_tx", "params": ["tx"], "id": 42}
// <-- {"jsonrpc": "2.0", "result": true, "id": 0}
async fn receive_tx(&self, id: Value, params: Value) -> JsonResult {
@@ -217,15 +293,14 @@ impl JsonRpcInterface {
return jsonrpc::error(InvalidParams, None, id).into()
}
// TODO: add proper tx hash here and check if its already in the pool
// TODO: add proper tx hash here
let random_id = OsRng.next_u32();
let payload = String::from(args[0].as_str().unwrap());
let tx = Tx { hash: random_id, payload };
self.tx_pool.add_tx(tx).await;
let protocol_tx = Tx { hash: random_id, payload: args[0].to_string() };
let result = self.p2p.broadcast(protocol_tx).await;
self.state.write().unwrap().append_tx(tx.clone());
let result = self.p2p.broadcast(tx).await;
match result {
Ok(()) => JsonResult::Resp(jsonresp(json!(true), id)),
Err(e) => jsonrpc::error(ServerError(-32603), Some(e.to_string()), id).into(),

View File

@@ -1,5 +1,7 @@
pub mod protocol_tx_pool;
pub mod tx_pool;
pub mod protocol_proposal;
pub mod protocol_tx;
pub mod protocol_vote;
pub use protocol_tx_pool::ProtocolTxPool;
pub use tx_pool::{Tx, TxPool, TxPoolPtr};
pub use protocol_proposal::ProtocolProposal;
pub use protocol_tx::ProtocolTx;
pub use protocol_vote::ProtocolVote;

View File

@@ -0,0 +1,95 @@
use std::{thread, time};
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{block::BlockProposal, state::StatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<BlockProposal>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
}
impl ProtocolProposal {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockProposal>().await;
let proposal_sub =
channel.subscribe_msg::<BlockProposal>().await.expect("Missing Proposal dispatcher!");
Arc::new(Self {
proposal_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel),
state,
p2p,
})
}
// TODO:
// 1. Nodes count not hardcoded.
// 2. Remove dummy delay.
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolBlock::handle_receive_proposal() [START]");
loop {
let proposal = self.proposal_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolProposal::handle_receive_proposal() received {:?}",
proposal
);
let proposal_copy = (*proposal).clone();
let nodes_count = 4;
let vote = self.state.write().unwrap().receive_proposed_block(
&proposal_copy,
nodes_count,
false,
);
match vote {
Ok(x) => {
if x.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
self.state.write().unwrap().receive_vote(&vote, nodes_count as usize);
thread::sleep(time::Duration::from_secs(10)); // communication delay simulation
self.p2p.broadcast(vote).await?;
}
}
Err(e) => {
debug!(target: "ircd", "ProtocolBlock::handle_receive_proposal() error prosessing proposal: {:?}", e)
}
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolProposal {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolProposal::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await;
debug!(target: "ircd", "ProtocolProposal::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolProposal"
}
}

View File

@@ -0,0 +1,63 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{state::StatePtr, tx::Tx},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolTx {
tx_sub: MessageSubscription<Tx>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
}
impl ProtocolTx {
pub async fn init(channel: ChannelPtr, state: StatePtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Tx>().await;
let tx_sub = channel.subscribe_msg::<Tx>().await.expect("Missing Tx dispatcher!");
Arc::new(Self { tx_sub, jobsman: ProtocolJobsManager::new("TxProtocol", channel), state })
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolTx::handle_receive_tx() [START]");
loop {
let tx = self.tx_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolTx::handle_receive_tx() received {:?}",
tx
);
let tx_copy = (*tx).clone();
self.state.write().unwrap().append_tx(tx_copy.clone());
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolTx {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolTx::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await;
debug!(target: "ircd", "ProtocolTx::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolTx"
}
}

View File

@@ -1,82 +0,0 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{net, Result};
use log::debug;
use std::sync::Arc;
use super::tx_pool::{Tx, TxPoolPtr};
pub struct ProtocolTxPool {
notify_queue_sender: async_channel::Sender<Arc<Tx>>,
tx_pool_sub: net::MessageSubscription<Tx>,
jobsman: net::ProtocolJobsManagerPtr,
tx_pool: TxPoolPtr,
p2p: net::P2pPtr,
}
impl ProtocolTxPool {
pub async fn init(
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<Arc<Tx>>,
tx_pool: TxPoolPtr,
p2p: net::P2pPtr,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Tx>().await;
let tx_sub = channel.subscribe_msg::<Tx>().await.expect("Missing Tx dispatcher!");
Arc::new(Self {
notify_queue_sender,
tx_pool_sub: tx_sub,
jobsman: net::ProtocolJobsManager::new("TxPoolProtocol", channel),
tx_pool,
p2p,
})
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolTxPool::handle_receive_tx() [START]");
loop {
let tx = self.tx_pool_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolTxPool::handle_receive_tx() received {:?}",
tx
);
let tx_copy = (*tx).clone();
// Do we already have this tx?
if self.tx_pool.tx_exists(&tx_copy).await {
continue
}
self.tx_pool.add_tx(tx_copy.clone()).await;
// If not then broadcast to everybody else
self.p2p.broadcast(tx_copy).await?;
self.notify_queue_sender.send(tx).await.expect("notify_queue_sender send failed!");
}
}
}
#[async_trait]
impl net::ProtocolBase for ProtocolTxPool {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolTxPool::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await;
debug!(target: "ircd", "ProtocolTxPool::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolTxPool"
}
}

View File

@@ -0,0 +1,69 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{state::StatePtr, vote::Vote},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolVote {
vote_sub: MessageSubscription<Vote>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
}
impl ProtocolVote {
pub async fn init(channel: ChannelPtr, state: StatePtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Vote>().await;
let vote_sub = channel.subscribe_msg::<Vote>().await.expect("Missing Vote dispatcher!");
Arc::new(Self {
vote_sub,
jobsman: ProtocolJobsManager::new("VoteProtocol", channel),
state,
})
}
// TODO: 1. Nodes count not hardcoded.
async fn handle_receive_vote(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolVote::handle_receive_vote() [START]");
loop {
let vote = self.vote_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolVote::handle_receive_vote() received {:?}",
vote
);
let vote_copy = (*vote).clone();
let nodes_count = 4;
self.state.write().unwrap().receive_vote(&vote_copy, nodes_count);
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolVote {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolVote::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_vote(), executor.clone()).await;
debug!(target: "ircd", "ProtocolVote::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolVote"
}
}

View File

@@ -1,60 +0,0 @@
use async_std::sync::Mutex;
use std::{io, sync::Arc};
use fxhash::FxHashSet;
use darkfi::{
net,
util::serial::{Decodable, Encodable},
Result,
};
pub type TxHash = u32; // Change this to a proper hash type
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct Tx {
pub hash: TxHash,
pub payload: String,
}
impl net::Message for Tx {
fn name() -> &'static str {
"tx"
}
}
impl Encodable for Tx {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.hash.encode(&mut s)?;
len += self.payload.encode(&mut s)?;
Ok(len)
}
}
impl Decodable for Tx {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
Ok(Self { hash: Decodable::decode(&mut d)?, payload: Decodable::decode(&mut d)? })
}
}
#[derive(Debug)]
pub struct TxPool {
tx_pool: Mutex<FxHashSet<Tx>>,
}
pub type TxPoolPtr = Arc<TxPool>;
impl TxPool {
pub fn new() -> Arc<Self> {
Arc::new(Self { tx_pool: Mutex::new(FxHashSet::default()) })
}
pub async fn add_tx(&self, tx: Tx) {
self.tx_pool.lock().await.insert(tx);
}
pub async fn tx_exists(&self, tx: &Tx) -> bool {
self.tx_pool.lock().await.contains(tx)
}
}

131
src/consensus/block.rs Normal file
View File

@@ -0,0 +1,131 @@
use serde::{Deserialize, Serialize};
use std::{
hash::{Hash, Hasher},
io,
};
use super::{metadata::Metadata, tx::Tx};
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
net,
util::serial::{Decodable, Encodable},
Result,
};
/// This struct represents a tuple of the form (st, sl, txs, metadata).
/// Each blocks parent hash h may be computed simply as a hash of the parent block.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Block {
/// Previous block hash
pub st: String,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional block information
pub metadata: Metadata,
}
impl Block {
pub fn new(st: String, sl: u64, txs: Vec<Tx>, proof: String, r: String, s: String) -> Block {
Block { st, sl, txs, metadata: Metadata::new(proof, r, s) }
}
}
impl PartialEq for Block {
fn eq(&self, other: &Self) -> bool {
self.st == other.st && self.sl == other.sl && self.txs == other.txs
}
}
impl Hash for Block {
fn hash<H: Hasher>(&self, hasher: &mut H) {
format!("{:?}{:?}{:?}", self.st, self.sl, self.txs).hash(hasher);
}
}
impl Encodable for Block {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.st.encode(&mut s).unwrap();
len += self.sl.encode(&mut s).unwrap();
len += self.txs.encode(&mut s).unwrap();
len += self.metadata.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for Block {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let st = Decodable::decode(&mut d)?;
let sl = Decodable::decode(&mut d)?;
let txs = Decodable::decode(&mut d)?;
let metadata = Decodable::decode(&mut d)?;
Ok(Self { st, sl, txs, metadata })
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct BlockProposal {
/// leader public key
pub public_key: PublicKey,
/// signed block
pub signature: Signature,
/// leader id
pub id: u64,
/// Previous block hash
pub st: String,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
}
impl BlockProposal {
pub fn new(
public_key: PublicKey,
signature: Signature,
id: u64,
st: String,
sl: u64,
txs: Vec<Tx>,
) -> BlockProposal {
BlockProposal { public_key, signature, id, st, sl, txs }
}
}
impl net::Message for BlockProposal {
fn name() -> &'static str {
"proposal"
}
}
impl Encodable for BlockProposal {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.public_key.encode(&mut s).unwrap();
len += self.signature.encode(&mut s).unwrap();
len += self.id.encode(&mut s).unwrap();
len += self.st.encode(&mut s).unwrap();
len += self.sl.encode(&mut s).unwrap();
len += self.txs.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for BlockProposal {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let public_key = Decodable::decode(&mut d)?;
let signature = Decodable::decode(&mut d)?;
let id = Decodable::decode(&mut d)?;
let st = Decodable::decode(&mut d)?;
let sl = Decodable::decode(&mut d)?;
let txs = Decodable::decode(&mut d)?;
Ok(Self { public_key, signature, id, st, sl, txs })
}
}
pub fn proposal_eq_block(proposal: &BlockProposal, block: &Block) -> bool {
proposal.st == block.st && proposal.sl == block.sl && proposal.txs == block.txs
}

View File

@@ -0,0 +1,56 @@
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
use serde::{Deserialize, Serialize};
use super::block::Block;
/// This struct represents a sequence of blocks starting with the genesis block.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Blockchain {
pub blocks: Vec<Block>,
}
impl Blockchain {
pub fn new(intial_block: Block) -> Blockchain {
Blockchain { blocks: vec![intial_block] }
}
/// A block is considered valid when its parent hash is equal to the hash of the
/// previous block and their epochs are incremental, exluding genesis.
/// Additional validity rules can be applied.
pub fn check_block_validity(&self, block: &Block, previous_block: &Block) {
assert!(block.st != "", "Genesis block provided.");
let mut hasher = DefaultHasher::new();
previous_block.hash(&mut hasher);
assert!(
block.st == hasher.finish().to_string() && block.sl > previous_block.sl,
"Provided block is invalid."
);
}
/// A blockchain is considered valid, when every block is valid, based on check_block_validity method.
pub fn check_chain_validity(&self) {
for (index, block) in self.blocks[1..].iter().enumerate() {
self.check_block_validity(&block, &self.blocks[index])
}
}
/// Insertion of a valid block.
pub fn add_block(&mut self, block: &Block) {
self.check_block_validity(&block, &self.blocks.last().unwrap());
self.blocks.push(block.clone());
}
/// Blockchain notarization check.
pub fn is_notarized(&self) -> bool {
for block in &self.blocks {
if !block.metadata.sm.notarized {
return false
}
}
true
}
}

124
src/consensus/metadata.rs Normal file
View File

@@ -0,0 +1,124 @@
use serde::{Deserialize, Serialize};
use std::io;
use super::{
util::{get_current_time, Timestamp},
vote::Vote,
};
use crate::{
util::serial::{Decodable, Encodable},
Result,
};
/// This struct represents additional Block information used by the consensus protocol.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Metadata {
/// Block information used by Ouroboros consensus
pub om: OuroborosMetadata,
/// Block information used by Streamlet consensus
pub sm: StreamletMetadata,
/// Block recieval timestamp
pub timestamp: Timestamp,
}
impl Metadata {
pub fn new(proof: String, r: String, s: String) -> Metadata {
Metadata {
om: OuroborosMetadata::new(proof, r, s),
sm: StreamletMetadata::new(),
timestamp: get_current_time(),
}
}
}
impl Encodable for Metadata {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.om.encode(&mut s).unwrap();
len += self.sm.encode(&mut s).unwrap();
len += self.timestamp.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for Metadata {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let om = Decodable::decode(&mut d)?;
let sm = Decodable::decode(&mut d)?;
let timestamp = Decodable::decode(&mut d)?;
Ok(Self { om, sm, timestamp })
}
}
/// This struct represents Block information used by Ouroboros consensus protocol.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct OuroborosMetadata {
/// Proof the stakeholder is the block owner
pub proof: String,
/// Random seed for VRF
pub r: String,
/// Block owner signature
pub s: String,
}
impl OuroborosMetadata {
pub fn new(proof: String, r: String, s: String) -> OuroborosMetadata {
OuroborosMetadata { proof, r, s }
}
}
impl Encodable for OuroborosMetadata {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.proof.encode(&mut s).unwrap();
len += self.r.encode(&mut s).unwrap();
len += self.s.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for OuroborosMetadata {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let proof = Decodable::decode(&mut d)?;
let r = Decodable::decode(&mut d)?;
let s = Decodable::decode(&mut d)?;
Ok(Self { proof, r, s })
}
}
/// This struct represents Block information used by Streamlet consensus protocol.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamletMetadata {
/// Epoch votes
pub votes: Vec<Vote>,
/// Block notarization flag
pub notarized: bool,
/// Block finalization flag
pub finalized: bool,
}
impl StreamletMetadata {
pub fn new() -> StreamletMetadata {
StreamletMetadata { votes: Vec::new(), notarized: false, finalized: false }
}
}
impl Encodable for StreamletMetadata {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.votes.encode(&mut s).unwrap();
len += self.notarized.encode(&mut s).unwrap();
len += self.finalized.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for StreamletMetadata {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let votes = Decodable::decode(&mut d)?;
let notarized = Decodable::decode(&mut d)?;
let finalized = Decodable::decode(&mut d)?;
Ok(Self { votes, notarized, finalized })
}
}

14
src/consensus/mod.rs Normal file
View File

@@ -0,0 +1,14 @@
pub mod block;
pub mod blockchain;
pub mod metadata;
pub mod state;
pub mod tx;
pub mod util;
pub mod vote;
pub use block::{proposal_eq_block, Block, BlockProposal};
pub use blockchain::Blockchain;
pub use metadata::Metadata;
pub use state::State;
pub use tx::Tx;
pub use vote::Vote;

404
src/consensus/state.rs Normal file
View File

@@ -0,0 +1,404 @@
use chrono::{NaiveDateTime, Utc};
use log::error;
use serde::{Deserialize, Serialize};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
path::PathBuf,
sync::{Arc, RwLock},
time::Duration,
};
use crate::{
crypto::{
keypair::{PublicKey, SecretKey},
schnorr::{SchnorrPublic, SchnorrSecret},
},
util::serial::Encodable,
Result,
};
use rand::rngs::OsRng;
use super::{
block::{proposal_eq_block, Block, BlockProposal},
blockchain::Blockchain,
tx::Tx,
util::{get_current_time, load, save, Timestamp},
vote::Vote,
};
const DELTA: u64 = 60;
/// Atomic pointer to state.
pub type StatePtr = Arc<RwLock<State>>;
/// This struct represents the state of a consensus node.
/// Each node is numbered and has a secret-public keys pair, to sign messages.
/// Nodes hold a set of Blockchains(some of which are not notarized)
/// and a set of unconfirmed pending transactions.
#[derive(Deserialize, Serialize)]
pub struct State {
pub id: u64,
pub genesis_time: Timestamp,
pub secret_key: SecretKey,
pub public_key: PublicKey,
pub canonical_blockchain: Blockchain,
pub node_blockchains: Vec<Blockchain>,
pub unconfirmed_txs: Vec<Tx>,
}
impl State {
pub fn new(id: u64, genesis_time: Timestamp, init_block: Block) -> State {
// TODO: clock sync
let secret = SecretKey::random(&mut OsRng);
State {
id,
genesis_time,
secret_key: secret,
public_key: PublicKey::from_secret(secret),
canonical_blockchain: Blockchain::new(init_block),
node_blockchains: Vec::new(),
unconfirmed_txs: Vec::new(),
}
}
/// Node retreives a transaction and append it to the unconfirmed transactions list.
/// Additional validity rules must be defined by the protocol for transactions.
pub fn append_tx(&mut self, tx: Tx) {
self.unconfirmed_txs.push(tx);
}
/// Node calculates seconds until next epoch starting time.
/// Epochs duration is configured using the delta value.
pub fn get_seconds_until_next_epoch_start(&self) -> Duration {
let start_time = NaiveDateTime::from_timestamp(self.genesis_time.0, 0);
let current_epoch = self.get_current_epoch() + 1;
let next_epoch_start_timestamp =
(current_epoch * (2 * DELTA)) + (start_time.timestamp() as u64);
let next_epoch_start =
NaiveDateTime::from_timestamp(next_epoch_start_timestamp.try_into().unwrap(), 0);
let current_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = next_epoch_start - current_time;
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Node calculates current epoch, based on elapsed time from the genesis block.
/// Epochs duration is configured using the delta value.
pub fn get_current_epoch(&self) -> u64 {
self.genesis_time.clone().elapsed() / (2 * DELTA)
}
/// Node finds epochs leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating in the network.
pub fn get_epoch_leader(&self, nodes_count: u64) -> u64 {
let epoch = self.get_current_epoch();
let mut hasher = DefaultHasher::new();
epoch.hash(&mut hasher);
hasher.finish() % nodes_count
}
/// Node checks if they are the current epoch leader.
pub fn check_if_epoch_leader(&self, nodes_count: u64) -> bool {
let leader = self.get_epoch_leader(nodes_count);
self.id == leader
}
/// Node generates a block proposal for the current epoch,
/// containing all uncorfirmed transactions.
/// Block extends the longest notarized blockchain the node holds.
pub fn propose_block(&self) -> Result<Option<BlockProposal>> {
let epoch = self.get_current_epoch();
let longest_notarized_chain = self.find_longest_notarized_chain();
let mut hasher = DefaultHasher::new();
longest_notarized_chain.blocks.last().unwrap().hash(&mut hasher);
let hash = hasher.finish().to_string();
let unproposed_txs = self.get_unproposed_txs();
let mut encoded_block = vec![];
hash.encode(&mut encoded_block)?;
epoch.encode(&mut encoded_block)?;
unproposed_txs.encode(&mut encoded_block)?;
let signed_block = self.secret_key.sign(&encoded_block[..]);
Ok(Some(BlockProposal::new(
self.public_key,
signed_block,
self.id,
hash,
epoch,
unproposed_txs,
)))
}
/// Node retrieves all unconfiremd transactions not proposed in previous blocks.
pub fn get_unproposed_txs(&self) -> Vec<Tx> {
let mut unproposed_txs = self.unconfirmed_txs.clone();
for blockchain in &self.node_blockchains {
for block in &blockchain.blocks {
for tx in &block.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
}
}
}
unproposed_txs
}
/// Finds the longest fully notarized blockchain the node holds.
pub fn find_longest_notarized_chain(&self) -> &Blockchain {
let mut longest_notarized_chain = &self.canonical_blockchain;
let mut length = 0;
for blockchain in &self.node_blockchains {
if blockchain.is_notarized() && blockchain.blocks.len() > length {
length = blockchain.blocks.len();
longest_notarized_chain = &blockchain;
}
}
&longest_notarized_chain
}
/// Node receives the proposed block, verifies its sender(epoch leader),
/// and proceeds with voting on it.
pub fn receive_proposed_block(
&mut self,
proposed_block: &BlockProposal,
nodes_count: u64,
leader: bool,
) -> Result<Option<Vote>> {
assert!(self.get_epoch_leader(nodes_count) == proposed_block.id);
let mut encoded_block = vec![];
proposed_block.st.encode(&mut encoded_block)?;
proposed_block.sl.encode(&mut encoded_block)?;
proposed_block.txs.encode(&mut encoded_block)?;
assert!(proposed_block.public_key.verify(&encoded_block[..], &proposed_block.signature));
self.vote_block(&proposed_block, leader)
}
/// Given a block, node finds which blockchain it extends.
/// If block extends the canonical blockchain, a new fork blockchain is created.
/// Node votes on the block, only if it extends the longest notarized chain it has seen.
pub fn vote_block(&mut self, proposal: &BlockProposal, leader: bool) -> Result<Option<Vote>> {
let block = Block::new(
proposal.st.clone(),
proposal.sl,
proposal.txs.clone(),
String::from("proof"),
String::from("r"),
String::from("s"),
);
let index = self.find_extended_blockchain_index(&block, leader);
if index == -2 {
return Ok(None)
}
let blockchain = match index {
-1 => {
let blockchain = Blockchain::new(block.clone());
self.node_blockchains.push(blockchain);
self.node_blockchains.last().unwrap()
}
_ => {
self.node_blockchains[index as usize].add_block(&block);
&self.node_blockchains[index as usize]
}
};
if self.extends_notarized_blockchain(blockchain) {
let mut encoded_proposal = vec![];
proposal.encode(&mut encoded_proposal)?;
let signed_proposal = self.secret_key.sign(&encoded_proposal[..]);
return Ok(Some(Vote::new(self.public_key, signed_proposal, proposal.clone(), self.id)))
}
Ok(None)
}
/// Node verifies if provided blockchain is notarized excluding the last block.
pub fn extends_notarized_blockchain(&self, blockchain: &Blockchain) -> bool {
for block in &blockchain.blocks[..(blockchain.blocks.len() - 1)] {
if !block.metadata.sm.notarized {
return false
}
}
true
}
/// Given a block, node finds the index of the blockchain it extends.
pub fn find_extended_blockchain_index(&self, block: &Block, leader: bool) -> i64 {
let mut hasher = DefaultHasher::new();
for (index, blockchain) in self.node_blockchains.iter().enumerate() {
let last_block = blockchain.blocks.last().unwrap();
last_block.hash(&mut hasher);
if (leader && block.st == hasher.finish().to_string() && block.sl >= last_block.sl) ||
(!leader && block.st == hasher.finish().to_string() && block.sl > last_block.sl)
{
return index as i64
}
}
let last_block = self.canonical_blockchain.blocks.last().unwrap();
last_block.hash(&mut hasher);
if (leader && block.st != hasher.finish().to_string() || block.sl < last_block.sl) ||
(!leader && block.st != hasher.finish().to_string() || block.sl <= last_block.sl)
{
error!("Proposed block doesn't extend any known chains.");
return -2
}
-1
}
/// Node receives a vote for a block.
/// First, sender is verified using their public key.
/// Block is searched in nodes blockchains.
/// If the vote wasn't received before, it is appended to block votes list.
/// When a node sees 2n/3 votes for a block it notarizes it.
/// When a block gets notarized, the transactions it contains are removed from
/// nodes unconfirmed transactions list.
/// Finally, we check if the notarization of the block can finalize parent blocks
/// in its blockchain.
pub fn receive_vote(&mut self, vote: &Vote, nodes_count: usize) {
let mut encoded_block = vec![];
let result = vote.block.encode(&mut encoded_block);
match result {
Ok(_) => (),
Err(e) => {
error!("Block encoding failed. Error: {:?}", e);
return
}
};
assert!(&vote.node_public_key.verify(&encoded_block[..], &vote.vote));
let vote_block = self.find_block(&vote.block);
if vote_block == None {
error!("Received vote for unknown block.");
return
}
let (unwrapped_vote_block, blockchain_index) = vote_block.unwrap();
if !unwrapped_vote_block.metadata.sm.votes.contains(vote) {
unwrapped_vote_block.metadata.sm.votes.push(vote.clone());
}
if !unwrapped_vote_block.metadata.sm.notarized &&
unwrapped_vote_block.metadata.sm.votes.len() > (2 * nodes_count / 3)
{
unwrapped_vote_block.metadata.sm.notarized = true;
self.check_blockchain_finalization(blockchain_index);
}
}
/// Node searches it the blockchains it holds for provided block.
pub fn find_block(&mut self, vote_block: &BlockProposal) -> Option<(&mut Block, i64)> {
for (index, blockchain) in &mut self.node_blockchains.iter_mut().enumerate() {
for block in blockchain.blocks.iter_mut().rev() {
if proposal_eq_block(vote_block, block) {
return Some((block, index as i64))
}
}
}
for block in &mut self.canonical_blockchain.blocks.iter_mut().rev() {
if proposal_eq_block(vote_block, block) {
return Some((block, -1))
}
}
None
}
/// Node checks if the index blockchain can be finalized.
/// Consensus finalization logic: If node has observed the notarization of 3 consecutive
/// blocks in a fork chain, it finalizes (appends to canonical blockchain) all blocks up to the middle block.
/// When fork chain blocks are finalized, rest fork chains not starting by those blocks are removed.
pub fn check_blockchain_finalization(&mut self, blockchain_index: i64) {
let blockchain = if blockchain_index == -1 {
&mut self.canonical_blockchain
} else {
&mut self.node_blockchains[blockchain_index as usize]
};
let blockchain_len = blockchain.blocks.len();
if blockchain_len > 2 {
let mut consecutive_notarized = 0;
for block in &blockchain.blocks {
if block.metadata.sm.notarized {
consecutive_notarized = consecutive_notarized + 1;
} else {
break
}
}
if consecutive_notarized > 2 {
let mut finalized_blocks = Vec::new();
for block in &mut blockchain.blocks[..(consecutive_notarized - 1)] {
block.metadata.sm.finalized = true;
finalized_blocks.push(block.clone());
for tx in block.txs.clone() {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
}
blockchain.blocks.drain(0..(consecutive_notarized - 1));
for block in &finalized_blocks {
self.canonical_blockchain.blocks.push(block.clone());
}
let mut hasher = DefaultHasher::new();
let last_finalized_block = self.canonical_blockchain.blocks.last().unwrap();
last_finalized_block.hash(&mut hasher);
let last_finalized_block_hash = hasher.finish().to_string();
let mut dropped_blockchains = Vec::new();
for (index, blockchain) in self.node_blockchains.iter().enumerate() {
let first_block = blockchain.blocks.first().unwrap();
if first_block.st != last_finalized_block_hash ||
first_block.sl <= last_finalized_block.sl
{
dropped_blockchains.push(index);
}
}
for index in dropped_blockchains {
self.node_blockchains.remove(index);
}
}
}
}
/// Util function to save the current node state to provided file path.
pub fn save(&self, path: &PathBuf) -> Result<()> {
save::<Self>(path, self)
}
/// Util function to load current node state by the provided file path.
// If file is not found, node state is reset.
pub fn load_or_create(id: u64, path: &PathBuf) -> Result<Self> {
match load::<Self>(path) {
Ok(state) => Ok(state),
Err(_) => return Self::reset(id, path),
}
}
/// Util function to load the current node state by the provided file path.
pub fn load_current_state(id: u64, path: &PathBuf) -> Result<StatePtr> {
let state = Self::load_or_create(id, path)?;
Ok(Arc::new(RwLock::new(state)))
}
/// Util function to reset node state.
pub fn reset(id: u64, path: &PathBuf) -> Result<State> {
// Genesis block is generated.
let mut genesis_block = Block::new(
String::from(""),
0,
vec![],
String::from("proof"),
String::from("r"),
String::from("s"),
);
genesis_block.metadata.sm.notarized = true;
genesis_block.metadata.sm.finalized = true;
let genesis_time = get_current_time();
let state = Self::new(id, genesis_time, genesis_block.clone());
state.save(path)?;
return Ok(state)
}
}

37
src/consensus/tx.rs Normal file
View File

@@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};
use std::io;
use crate::{
impl_vec, net,
util::serial::{Decodable, Encodable, VarInt},
Result,
};
#[derive(Debug, Clone, Deserialize, Serialize, Eq, Hash, PartialEq)]
pub struct Tx {
pub hash: u32, // Change this to a proper hash type
pub payload: String,
}
impl net::Message for Tx {
fn name() -> &'static str {
"tx"
}
}
impl Encodable for Tx {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.hash.encode(&mut s)?;
len += self.payload.encode(&mut s)?;
Ok(len)
}
}
impl Decodable for Tx {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
Ok(Self { hash: Decodable::decode(&mut d)?, payload: Decodable::decode(&mut d)? })
}
}
impl_vec!(Tx);

58
src/consensus/util.rs Normal file
View File

@@ -0,0 +1,58 @@
use chrono::{NaiveDateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{fs::File, io, io::BufReader, path::PathBuf};
use crate::{
util::serial::{Decodable, Encodable},
Result,
};
/// Util function to load a structure saved as a JSON in the provided path file, using serde crate.
pub fn load<T: DeserializeOwned>(path: &PathBuf) -> Result<T> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let value: T = serde_json::from_reader(reader)?;
Ok(value)
}
/// Util function to save a structure as a JSON in the provided path file, using serde crate.
pub fn save<T: Serialize>(path: &PathBuf, value: &T) -> Result<()> {
let file = File::create(path)?;
serde_json::to_writer_pretty(file, value)?;
Ok(())
}
/// Util structure to represend chrono UTC timestamps.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Timestamp(pub i64);
impl Timestamp {
/// Calculates elapsed time of a Timestamp.
pub fn elapsed(self) -> u64 {
let start_time = NaiveDateTime::from_timestamp(self.0, 0);
let end_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = end_time - start_time;
diff.num_seconds().try_into().unwrap()
}
}
impl Encodable for Timestamp {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.0.encode(&mut s).unwrap();
Ok(len)
}
}
impl Decodable for Timestamp {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let timestamp = Decodable::decode(&mut d)?;
Ok(Timestamp(timestamp))
}
}
/// Util function to generate a Timestamp of current time.
pub fn get_current_time() -> Timestamp {
Timestamp(Utc::now().timestamp())
}

60
src/consensus/vote.rs Normal file
View File

@@ -0,0 +1,60 @@
use serde::{Deserialize, Serialize};
use std::io;
use super::block::BlockProposal;
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{Decodable, Encodable, VarInt},
Result,
};
/// This struct represents a tuple of the form (vote, B, id).
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Vote {
/// Node public key
pub node_public_key: PublicKey,
/// signed block
pub vote: Signature,
/// block hash to vote on
pub block: BlockProposal,
/// node id
pub id: u64,
}
impl Vote {
pub fn new(node_public_key: PublicKey, vote: Signature, block: BlockProposal, id: u64) -> Vote {
Vote { node_public_key, vote, block, id }
}
}
impl net::Message for Vote {
fn name() -> &'static str {
"vote"
}
}
impl Encodable for Vote {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.node_public_key.encode(&mut s)?;
len += self.vote.encode(&mut s)?;
len += self.block.encode(&mut s)?;
len += self.id.encode(&mut s)?;
Ok(len)
}
}
impl Decodable for Vote {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
Ok(Self {
node_public_key: Decodable::decode(&mut d)?,
vote: Decodable::decode(&mut d)?,
block: Decodable::decode(&mut d)?,
id: Decodable::decode(&mut d)?,
})
}
}
impl_vec!(Vote);