validatord: sync tasks and protocols implemented

This commit is contained in:
aggstam
2022-04-21 04:43:30 +03:00
parent d36dd1847e
commit c217030367
12 changed files with 696 additions and 66 deletions

View File

@@ -4,12 +4,12 @@
nodes=4
# Copying the node state files with a blockchain containing only the genesis block.
bound=$(($nodes - 1))
for i in $(eval echo "{0..$bound}")
do
rm -rf ~/.config/darkfi/validatord_db_$i
done
# Copying the node state files with a blockchain containing only the genesis block. Uncomment for fresh runs.
#bound=$(($nodes - 1))
#for i in $(eval echo "{0..$bound}")
#do
# rm -rf ~/.config/darkfi/validatord_db_$i
#done
# PIDs array
pids=()
@@ -28,6 +28,7 @@ do
cargo run -- \
--accept 0.0.0.0:1100$i \
--caccept 0.0.0.0:1200$i \
--seeds 127.0.0.1:11000 \
--cseeds 127.0.0.1:12000 \
--rpc 127.0.0.1:666$i \
--external 127.0.0.1:1100$i \
@@ -55,6 +56,7 @@ bound=$(($nodes-1))
cargo run -- \
--accept 0.0.0.0:1100$bound \
--caccept 0.0.0.0:1200$bound \
--seeds 127.0.0.1:11000 \
--cseeds 127.0.0.1:12000 \
--rpc 127.0.0.1:666$bound \
--external 127.0.0.1:1100$bound \

View File

@@ -12,6 +12,8 @@ use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
block::{BlockOrder, BlockResponse},
blockchain::{ForkOrder, ForkResponse},
participant::Participant,
state::{ValidatorState, ValidatorStatePtr},
tx::Tx,
@@ -36,7 +38,8 @@ use darkfi::{
use validatord::protocols::{
protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal,
protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
protocol_sync::ProtocolSync, protocol_sync_forks::ProtocolSyncForks, protocol_tx::ProtocolTx,
protocol_vote::ProtocolVote,
};
const CONFIG_FILE: &str = r"validatord_config.toml";
@@ -104,7 +107,90 @@ struct Opt {
verbose: u8,
}
async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Node starts syncing blockchain...");
// We retrieve p2p network connected channels, so we can use it to parallelize downloads
// Using len here because is_empty() uses unstable library feature 'exact_size_is_empty'
if p2p.channels().lock().await.values().len() != 0 {
// Currently we will use just the last channel
let channel = p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockResponse>().await;
let response_sub = channel
.subscribe_msg::<BlockResponse>()
.await
.expect("Missing BlockResponse dispatcher!");
// Nodes sends the last known block hash of the canonical blockchain
// and loops until the respond is the same block (used to utilize batch requests)
let mut last = state.read().unwrap().blockchain.last()?.unwrap();
info!("Last known block: {:?} - {:?}", last.0, last.1);
loop {
// Node creates a BlockOrder and sends it
let order = BlockOrder { sl: last.0, block: last.1 };
channel.send(order).await?;
// Node stores responce data. Extra validations can be added here.
let response = response_sub.receive().await?;
for info in &response.blocks {
state.write().unwrap().blockchain.add_by_info(info.clone())?;
}
let last_received = state.read().unwrap().blockchain.last()?.unwrap();
info!("Last received block: {:?} - {:?}", last_received.0, last_received.1);
if last == last_received {
break
}
last = last_received;
}
} else {
info!("Node is not connected to other nodes.");
}
info!("Node synced!");
Ok(())
}
async fn syncing_forks_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Node starts syncing forks...");
// Using len here because is_empty() uses unstable library feature 'exact_size_is_empty'
if p2p.channels().lock().await.values().len() != 0 {
// Nodes ask for the fork chains of the last channel peer
let channel = p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<ForkResponse>().await;
let response_sub = channel
.subscribe_msg::<ForkResponse>()
.await
.expect("Missing ForkResponse dispatcher!");
// Node creates a BlockOrder and sends it
let order = ForkOrder { id: state.read().unwrap().id };
channel.send(order).await?;
// Node stores responce data. Extra validations can be added here.
let response = response_sub.receive().await?;
state.write().unwrap().consensus.proposals = response.proposals.clone();
} else {
info!("Node is not connected to other nodes, resetting consensus state.");
state.write().unwrap().reset_consensus_state()?;
}
info!("Node synced!");
Ok(())
}
async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Node syncs its fork chains
let result = syncing_forks_task(p2p.clone(), state.clone()).await;
match result {
Ok(()) => (),
Err(e) => error!("Sync forks failed. Error: {:?}", e),
}
// Node signals the network that it starts participating
let participant =
Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch());
@@ -142,7 +228,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
match vote {
Ok(x) => {
if x.is_none() {
debug!("Node did not vote for the proposed block.");
error!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
let result = state.write().unwrap().receive_vote(&vote);
@@ -224,57 +310,94 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
let state = ValidatorState::new(database_path, id, genesis).unwrap();
// Main P2P registry setup
let p2p = net::P2p::new(subnet_settings).await;
let _registry = p2p.protocol_registry();
let main_p2p = net::P2p::new(subnet_settings).await;
let registry = main_p2p.protocol_registry();
// Adding ProtocolSync to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, main_p2p| {
let state = state2.clone();
async move { ProtocolSync::init(channel, state, main_p2p).await }
})
.await;
// Performs seed session
main_p2p.clone().start(executor.clone()).await?;
// Actual main p2p session
let ex2 = executor.clone();
let p2p = main_p2p.clone();
executor
.spawn(async move {
if let Err(err) = p2p.run(ex2).await {
error!("Error: p2p run failed {}", err);
}
})
.detach();
// Node starts syncing
let state2 = state.clone();
syncing_task(main_p2p.clone(), state2).await?;
// Consensus P2P registry setup
let p2p = net::P2p::new(consensus_subnet_settings).await;
let registry = p2p.protocol_registry();
let consensus_p2p = net::P2p::new(consensus_subnet_settings).await;
let registry = consensus_p2p.protocol_registry();
// Adding ProtocolTx to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(net::SESSION_ALL, move |channel, consensus_p2p| {
let state = state2.clone();
async move { ProtocolTx::init(channel, state, p2p).await }
async move { ProtocolTx::init(channel, state, consensus_p2p).await }
})
.await;
// Adding PropotolVote to the registry
let p2p = main_p2p.clone();
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(net::SESSION_ALL, move |channel, consensus_p2p| {
let state = state2.clone();
async move { ProtocolVote::init(channel, state, p2p).await }
let main_p2p = p2p.clone();
async move { ProtocolVote::init(channel, state, main_p2p, consensus_p2p).await }
})
.await;
// Adding ProtocolProposal to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(net::SESSION_ALL, move |channel, consensus_p2p| {
let state = state2.clone();
async move { ProtocolProposal::init(channel, state, p2p).await }
async move { ProtocolProposal::init(channel, state, consensus_p2p).await }
})
.await;
// Adding ProtocolParticipant to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(net::SESSION_ALL, move |channel, consensus_p2p| {
let state = state2.clone();
async move { ProtocolParticipant::init(channel, state, p2p).await }
async move { ProtocolParticipant::init(channel, state, consensus_p2p).await }
})
.await;
// Adding ProtocolSyncForks to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, _consensus_p2p| {
let state = state2.clone();
async move { ProtocolSyncForks::init(channel, state).await }
})
.await;
// Performs seed session
p2p.clone().start(executor.clone()).await?;
consensus_p2p.clone().start(executor.clone()).await?;
// Actual consensus p2p session
let ex2 = executor.clone();
let p2p2 = p2p.clone();
let p2p = consensus_p2p.clone();
executor
.spawn(async move {
if let Err(err) = p2p2.run(ex2).await {
if let Err(err) = p2p.run(ex2).await {
error!("Error: p2p run failed {}", err);
}
})
@@ -285,14 +408,14 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
let ex3 = ex2.clone();
let rpc_interface = Arc::new(JsonRpcInterface {
state: state.clone(),
p2p: p2p.clone(),
p2p: consensus_p2p.clone(),
_rpc_listen_addr: opts.rpc,
});
executor
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
proposal_task(p2p, state).await;
proposal_task(consensus_p2p, state).await;
Ok(())
}

View File

@@ -1,9 +1,13 @@
pub mod protocol_participant;
pub mod protocol_proposal;
pub mod protocol_sync;
pub mod protocol_sync_forks;
pub mod protocol_tx;
pub mod protocol_vote;
pub use protocol_participant::ProtocolParticipant;
pub use protocol_proposal::ProtocolProposal;
pub use protocol_sync::ProtocolSync;
pub use protocol_sync_forks::ProtocolSyncForks;
pub use protocol_tx::ProtocolTx;
pub use protocol_vote::ProtocolVote;

View File

@@ -0,0 +1,115 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{
block::{BlockInfo, BlockOrder, BlockResponse},
state::ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
// Constant defining how many blocks we send during syncing.
const BATCH: u64 = 10;
pub struct ProtocolSync {
channel: ChannelPtr,
order_sub: MessageSubscription<BlockOrder>,
block_sub: MessageSubscription<BlockInfo>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
_p2p: P2pPtr,
}
impl ProtocolSync {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
_p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockOrder>().await;
message_subsytem.add_dispatch::<BlockInfo>().await;
let order_sub =
channel.subscribe_msg::<BlockOrder>().await.expect("Missing BlockOrder dispatcher!");
let block_sub =
channel.subscribe_msg::<BlockInfo>().await.expect("Missing BlockInfo dispatcher!");
Arc::new(Self {
channel: channel.clone(),
order_sub,
block_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel),
state,
_p2p,
})
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::handle_receive_tx() [START]");
loop {
let order = self.order_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSync::handle_receive_order() received {:?}",
order
);
// Extra validations can be added here.
let key = order.sl;
let blocks = self.state.read().unwrap().blockchain.get_with_info(key, BATCH)?;
let response = BlockResponse { blocks };
self.channel.send(response).await?;
}
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::handle_receive_block() [START]");
loop {
let info = self.block_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSync::handle_receive_block() received {:?}",
info
);
// TODO: Following code should be executed only by replicators, not consensus nodes.
// Commented for now, as to not mess consensus testing.
// (Don't forget to remove _ from _p2p)
/*
// Node stores finalized block, if it doesn't exists (checking by slot).
// Extra validations can be added here.
let info_copy = (*info).clone();
if !self.state.read().unwrap().blockchain.has_block(&info_copy)? {
self.state.write().unwrap().blockchain.add_by_info(info_copy.clone())?;
self.p2p.broadcast(info_copy).await?;
}
*/
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolSync::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await;
debug!(target: "ircd", "ProtocolSync::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSync"
}
}

View File

@@ -0,0 +1,73 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{
blockchain::{ForkOrder, ForkResponse},
state::ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolSyncForks {
channel: ChannelPtr,
order_sub: MessageSubscription<ForkOrder>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
}
impl ProtocolSyncForks {
pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<ForkOrder>().await;
let order_sub =
channel.subscribe_msg::<ForkOrder>().await.expect("Missing ForkOrder dispatcher!");
Arc::new(Self {
channel: channel.clone(),
order_sub,
jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel),
state,
})
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncForks::handle_receive_tx() [START]");
loop {
let order = self.order_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolSyncForks::handle_receive_order() received {:?}",
order
);
// Extra validations can be added here.
let proposals = self.state.read().unwrap().consensus.proposals.clone();
let response = ForkResponse { proposals };
self.channel.send(response).await?;
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSyncForks {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolSyncForks::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
debug!(target: "ircd", "ProtocolSyncForks::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSyncForks"
}
}

View File

@@ -16,14 +16,16 @@ pub struct ProtocolVote {
vote_sub: MessageSubscription<Vote>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
main_p2p: P2pPtr,
consensus_p2p: P2pPtr,
}
impl ProtocolVote {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
main_p2p: P2pPtr,
consensus_p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Vote>().await;
@@ -34,7 +36,8 @@ impl ProtocolVote {
vote_sub,
jobsman: ProtocolJobsManager::new("VoteProtocol", channel),
state,
p2p,
main_p2p,
consensus_p2p,
})
}
@@ -49,8 +52,18 @@ impl ProtocolVote {
vote
);
let vote_copy = (*vote).clone();
if self.state.write().unwrap().receive_vote(&vote_copy)? {
self.p2p.broadcast(vote_copy).await?;
let (voted, to_broadcast) = self.state.write().unwrap().receive_vote(&vote_copy)?;
if voted {
self.consensus_p2p.broadcast(vote_copy).await?;
// Broadcasting finalized blocks info, if any
match to_broadcast {
Some(blocks) => {
for info in blocks {
self.main_p2p.broadcast(info).await?;
}
}
None => continue,
}
};
}
}