darkfid: minor communications fixes

This commit is contained in:
skoupidi
2024-02-26 21:42:52 +02:00
parent 490615a94b
commit 57bb4f8477
7 changed files with 36 additions and 20 deletions

View File

@@ -260,6 +260,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
&validator,
&subscribers,
ex.clone(),
sync_p2p.clone(),
)
.await,
),

View File

@@ -30,3 +30,6 @@ pub use protocol_sync::{
/// Transaction broadcast protocol
mod protocol_tx;
pub use protocol_tx::ProtocolTx;
/// Communications timeout
pub const COMMS_TIMEOUT: u64 = 15;

View File

@@ -36,7 +36,7 @@ use darkfi::{
};
use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
use crate::proto::{ForkSyncRequest, ForkSyncResponse};
use crate::proto::{ForkSyncRequest, ForkSyncResponse, COMMS_TIMEOUT};
/// Auxiliary [`Proposal`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
@@ -46,13 +46,13 @@ impl_p2p_message!(ProposalMessage, "proposal");
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<ProposalMessage>,
proposals_response_sub: MessageSubscription<ForkSyncResponse>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel: ChannelPtr,
subscriber: JsonSubscriber,
miner: bool,
sync_p2p: Option<P2pPtr>,
}
impl ProtocolProposal {
@@ -62,6 +62,7 @@ impl ProtocolProposal {
p2p: P2pPtr,
subscriber: JsonSubscriber,
miner: bool,
sync_p2p: Option<P2pPtr>,
) -> Result<ProtocolBasePtr> {
debug!(
target: "darkfid::proto::protocol_proposal::init",
@@ -73,17 +74,16 @@ impl ProtocolProposal {
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let proposal_sub = channel.subscribe_msg::<ProposalMessage>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
Ok(Arc::new(Self {
proposal_sub,
proposals_response_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()),
validator,
p2p,
channel,
subscriber,
miner,
sync_p2p,
}))
}
@@ -112,7 +112,7 @@ impl ProtocolProposal {
continue
}
// Check if node is connected to the miners network {
// Check if node is connected to the miners network
if self.miner {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
@@ -126,6 +126,9 @@ impl ProtocolProposal {
match self.validator.consensus.append_proposal(&proposal_copy.0).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
if let Some(sync_p2p) = self.sync_p2p.as_ref() {
sync_p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
}
let enc_prop =
JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await));
self.subscriber.notify(vec![enc_prop].into()).await;
@@ -149,11 +152,14 @@ impl ProtocolProposal {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence");
let last = self.validator.blockchain.last()?;
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) };
let proposals_response_sub = self.channel.subscribe_msg::<ForkSyncResponse>().await?;
self.channel.send(&request).await?;
// TODO: add a timeout here to retry
// Node waits for response
let response = self.proposals_response_sub.receive().await?;
let Ok(response) = proposals_response_sub.receive_with_timeout(COMMS_TIMEOUT).await
else {
continue
};
// Verify and store retrieved proposals
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals");

View File

@@ -24,7 +24,7 @@ use tinyjson::JsonValue;
use crate::{
proto::{
ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, SyncRequest,
SyncResponse,
SyncResponse, COMMS_TIMEOUT,
},
Darkfid,
};
@@ -52,7 +52,9 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
channel.send(&request).await?;
// Node waits for response
let Ok(response) = response_sub.receive_with_timeout(15).await else { continue };
let Ok(response) = response_sub.receive_with_timeout(COMMS_TIMEOUT).await else {
continue
};
// Parse response
if response.synced {

View File

@@ -179,10 +179,6 @@ impl Harness {
self.alice.validator.consensus.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await;
// FIXME: some miners might not be connected our peers in the sync p2p,
// so we must broadcast to both networks when receiving a proposal
self.alice.sync_p2p.as_ref().broadcast(&message).await;
self.bob.sync_p2p.as_ref().broadcast(&message).await;
}
// Sleep a bit so blocks can be propagated and then
@@ -280,8 +276,10 @@ pub async fn generate_node(
let (sync_p2p, miners_p2p) = if let Some(settings) = miners_settings {
let sync_p2p =
spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), true).await;
let miners_p2p =
Some(spawn_miners_p2p(settings, &validator, &subscribers, ex.clone()).await);
let miners_p2p = Some(
spawn_miners_p2p(settings, &validator, &subscribers, ex.clone(), sync_p2p.clone())
.await,
);
(sync_p2p, miners_p2p)
} else {
let sync_p2p =

View File

@@ -69,15 +69,14 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
// Nodes must have one fork with 2 blocks and one with 1 block
th.validate_fork_chains(2, vec![2, 1]).await;
// We are going to create a third node and try to sync from the previous two
// We are going to create a third node and try to sync from Bob
let mut sync_settings =
Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?;
sync_settings.inbound_addrs = vec![charlie_url];
let alice_url = th.alice.sync_p2p.settings().inbound_addrs[0].clone();
let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone();
sync_settings.peers = vec![alice_url, bob_url];
sync_settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?;
// Verify node synced

View File

@@ -54,7 +54,9 @@ pub async fn spawn_sync_p2p(
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, miner).await.unwrap()
ProtocolProposal::init(channel, validator, p2p, subscriber, miner, None)
.await
.unwrap()
}
})
.await;
@@ -86,6 +88,7 @@ pub async fn spawn_miners_p2p(
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
executor: Arc<Executor<'static>>,
sync_p2p: P2pPtr,
) -> P2pPtr {
info!(target: "darkfid", "Registering miners network P2P protocols...");
let p2p = P2p::new(settings.clone(), executor.clone()).await;
@@ -93,12 +96,16 @@ pub async fn spawn_miners_p2p(
let _validator = validator.clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
let _sync_p2p = Some(sync_p2p);
registry
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
let sync_p2p = _sync_p2p.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, false).await.unwrap()
ProtocolProposal::init(channel, validator, p2p, subscriber, false, sync_p2p)
.await
.unwrap()
}
})
.await;