darkfid: ask peers if they are synced on sync task

This commit is contained in:
skoupidi
2024-02-23 15:28:33 +02:00
parent b468b9c399
commit ed96c06adb
8 changed files with 130 additions and 16 deletions

View File

@@ -237,9 +237,14 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
// Initialize syncing P2P network
let sync_p2p =
spawn_sync_p2p(&blockchain_config.sync_net.into(), &validator, &subscribers, ex.clone())
.await;
let sync_p2p = spawn_sync_p2p(
&blockchain_config.sync_net.into(),
&validator,
&subscribers,
ex.clone(),
blockchain_config.miner,
)
.await;
// Initialize miners P2P network
let (miners_p2p, rpc_client) = if blockchain_config.miner {

View File

@@ -23,7 +23,8 @@ pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
/// Validator blockchain sync protocol
mod protocol_sync;
pub use protocol_sync::{
ForkSyncRequest, ForkSyncResponse, ProtocolSync, SyncRequest, SyncResponse,
ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, ProtocolSync,
SyncRequest, SyncResponse,
};
/// Transaction broadcast protocol

View File

@@ -52,6 +52,7 @@ pub struct ProtocolProposal {
p2p: P2pPtr,
channel: ChannelPtr,
subscriber: JsonSubscriber,
miner: bool,
}
impl ProtocolProposal {
@@ -60,6 +61,7 @@ impl ProtocolProposal {
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
miner: bool,
) -> Result<ProtocolBasePtr> {
debug!(
target: "darkfid::proto::protocol_proposal::init",
@@ -81,6 +83,7 @@ impl ProtocolProposal {
p2p,
channel,
subscriber,
miner,
}))
}
@@ -109,6 +112,15 @@ impl ProtocolProposal {
continue
}
// Check if node is connected to the miners network {
if self.miner {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Node is connected to the miners network, skipping..."
);
continue
}
let proposal_copy = (*proposal).clone();
match self.validator.consensus.append_proposal(&proposal_copy.0).await {

View File

@@ -37,6 +37,21 @@ use darkfi_serial::{SerialDecodable, SerialEncodable};
// Constant defining how many blocks we send during syncing.
const BATCH: u64 = 10;
/// Auxiliary structure used for blockchain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct IsSyncedRequest {}
impl_p2p_message!(IsSyncedRequest, "issyncedrequest");
/// Auxiliary structure used for blockchain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct IsSyncedResponse {
/// Flag indicating the node is synced
pub synced: bool,
}
impl_p2p_message!(IsSyncedResponse, "issyncedresponse");
/// Auxiliary structure used for blockchain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct SyncRequest {
@@ -76,6 +91,7 @@ pub struct ForkSyncResponse {
impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
pub struct ProtocolSync {
is_synced_sub: MessageSubscription<IsSyncedRequest>,
request_sub: MessageSubscription<SyncRequest>,
fork_request_sub: MessageSubscription<ForkSyncRequest>,
jobsman: ProtocolJobsManagerPtr,
@@ -90,13 +106,16 @@ impl ProtocolSync {
"Adding ProtocolSync to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<IsSyncedRequest>().await;
msg_subsystem.add_dispatch::<SyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
let is_synced_sub = channel.subscribe_msg::<IsSyncedRequest>().await?;
let request_sub = channel.subscribe_msg::<SyncRequest>().await?;
let fork_request_sub = channel.subscribe_msg::<ForkSyncRequest>().await?;
Ok(Arc::new(Self {
is_synced_sub,
request_sub,
fork_request_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()),
@@ -105,6 +124,31 @@ impl ProtocolSync {
}))
}
async fn handle_receive_is_synced_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_is_synced_request", "START");
loop {
if let Err(e) = self.is_synced_sub.receive().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_is_synced_request",
"recv fail: {}",
e
);
continue
};
// TODO: This needs to be protected so peer can't spam us
// Check if node has finished syncing its blockchain and respond
let response = IsSyncedResponse { synced: *self.validator.synced.read().await };
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_is_synced_request",
"channel send fail: {}",
e
)
};
}
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
@@ -213,6 +257,10 @@ impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman
.clone()
.spawn(self.clone().handle_receive_is_synced_request(), executor.clone())
.await;
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()

View File

@@ -22,24 +22,56 @@ use log::{debug, info, warn};
use tinyjson::JsonValue;
use crate::{
proto::{ForkSyncRequest, ForkSyncResponse, SyncRequest, SyncResponse},
proto::{
ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, SyncRequest,
SyncResponse,
},
Darkfid,
};
/// async task used for block syncing
pub async fn sync_task(node: &Darkfid) -> Result<()> {
info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
// Block until at least node is connected to at least one peer
// Block until at least node is connected to at least one synced peer
let mut peers = vec![];
loop {
if !node.sync_p2p.channels().await.is_empty() {
// Grab channels
let channels = node.sync_p2p.channels().await;
// Check anyone is connected
if !channels.is_empty() {
// Ask each peer if they are synced
for channel in channels {
// Communication setup
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<IsSyncedResponse>().await;
let response_sub = channel.subscribe_msg::<IsSyncedResponse>().await?;
// Node creates a `IsSyncedRequest` and sends it
let request = IsSyncedRequest {};
channel.send(&request).await?;
// Node waits for response
let Ok(response) = response_sub.receive_with_timeout(15).await else { continue };
// Parse response
if response.synced {
peers.push(channel)
}
}
}
// Check if we got peers to sync from
if !peers.is_empty() {
break
}
warn!(target: "darkfid::task::sync_task", "Node is not connected to other nodes, waiting to retry...");
sleep(10).await;
}
// Getting a random connected channel to ask from peers
let channel = node.sync_p2p.random_channel().await.unwrap();
// Getting a peer to ask for blocks
let channel = &peers[0];
// Communication setup
let msg_subsystem = channel.message_subsystem();

View File

@@ -274,11 +274,16 @@ pub async fn generate_node(
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
let sync_p2p = spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone()).await;
let miners_p2p = if let Some(settings) = miners_settings {
Some(spawn_miners_p2p(settings, &validator, &subscribers, ex.clone()).await)
let (sync_p2p, miners_p2p) = if let Some(settings) = miners_settings {
let sync_p2p =
spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), true).await;
let miners_p2p =
Some(spawn_miners_p2p(settings, &validator, &subscribers, ex.clone()).await);
(sync_p2p, miners_p2p)
} else {
None
let sync_p2p =
spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), false).await;
(sync_p2p, None)
};
let node =
Darkfid::new(sync_p2p.clone(), miners_p2p.clone(), validator, subscribers, None).await;

View File

@@ -41,6 +41,7 @@ pub async fn spawn_sync_p2p(
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
executor: Arc<Executor<'static>>,
miner: bool,
) -> P2pPtr {
info!(target: "darkfid", "Registering sync network P2P protocols...");
let p2p = P2p::new(settings.clone(), executor.clone()).await;
@@ -52,7 +53,9 @@ pub async fn spawn_sync_p2p(
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move { ProtocolProposal::init(channel, validator, p2p, subscriber).await.unwrap() }
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, miner).await.unwrap()
}
})
.await;
@@ -94,7 +97,9 @@ pub async fn spawn_miners_p2p(
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move { ProtocolProposal::init(channel, validator, p2p, subscriber).await.unwrap() }
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, false).await.unwrap()
}
})
.await;

View File

@@ -325,7 +325,7 @@ impl Validator {
// Append finalized blocks
info!(target: "validator::finalization", "Finalizing {} proposals:", finalized.len());
for block in &finalized {
info!(target: "validator::finalization", "\t{}", block.hash()?);
info!(target: "validator::finalization", "\t{} - {}", block.hash()?, block.header.height);
}
self.add_blocks(&finalized).await?;
@@ -365,6 +365,12 @@ impl Validator {
// Validate and insert each block
for block in blocks {
// Skip already existing block
if overlay.lock().unwrap().has_block(block)? {
previous = block;
continue;
}
// Verify block
if verify_block(&overlay, &module, block, previous).await.is_err() {
error!(target: "validator::add_blocks", "Erroneous block found in set");