diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index af2a2c315..672b00fbc 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -260,6 +260,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { &validator, &subscribers, ex.clone(), + sync_p2p.clone(), ) .await, ), diff --git a/bin/darkfid/src/proto/mod.rs b/bin/darkfid/src/proto/mod.rs index bcd0c8904..0ec6c0e95 100644 --- a/bin/darkfid/src/proto/mod.rs +++ b/bin/darkfid/src/proto/mod.rs @@ -30,3 +30,6 @@ pub use protocol_sync::{ /// Transaction broadcast protocol mod protocol_tx; pub use protocol_tx::ProtocolTx; + +/// Communications timeout +pub const COMMS_TIMEOUT: u64 = 15; diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index bf27c31f0..456917a2e 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -36,7 +36,7 @@ use darkfi::{ }; use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable}; -use crate::proto::{ForkSyncRequest, ForkSyncResponse}; +use crate::proto::{ForkSyncRequest, ForkSyncResponse, COMMS_TIMEOUT}; /// Auxiliary [`Proposal`] wrapper structure used for messaging. #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] @@ -46,13 +46,13 @@ impl_p2p_message!(ProposalMessage, "proposal"); pub struct ProtocolProposal { proposal_sub: MessageSubscription, - proposals_response_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, validator: ValidatorPtr, p2p: P2pPtr, channel: ChannelPtr, subscriber: JsonSubscriber, miner: bool, + sync_p2p: Option, } impl ProtocolProposal { @@ -62,6 +62,7 @@ impl ProtocolProposal { p2p: P2pPtr, subscriber: JsonSubscriber, miner: bool, + sync_p2p: Option, ) -> Result { debug!( target: "darkfid::proto::protocol_proposal::init", @@ -73,17 +74,16 @@ impl ProtocolProposal { msg_subsystem.add_dispatch::().await; let proposal_sub = channel.subscribe_msg::().await?; - let proposals_response_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { proposal_sub, - proposals_response_sub, jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()), validator, p2p, channel, subscriber, miner, + sync_p2p, })) } @@ -112,7 +112,7 @@ impl ProtocolProposal { continue } - // Check if node is connected to the miners network { + // Check if node is connected to the miners network if self.miner { debug!( target: "darkfid::proto::protocol_proposal::handle_receive_proposal", @@ -126,6 +126,9 @@ impl ProtocolProposal { match self.validator.consensus.append_proposal(&proposal_copy.0).await { Ok(()) => { self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await; + if let Some(sync_p2p) = self.sync_p2p.as_ref() { + sync_p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await; + } let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await)); self.subscriber.notify(vec![enc_prop].into()).await; @@ -149,11 +152,14 @@ impl ProtocolProposal { debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence"); let last = self.validator.blockchain.last()?; let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) }; + let proposals_response_sub = self.channel.subscribe_msg::().await?; self.channel.send(&request).await?; - // TODO: add a timeout here to retry // Node waits for response - let response = self.proposals_response_sub.receive().await?; + let Ok(response) = proposals_response_sub.receive_with_timeout(COMMS_TIMEOUT).await + else { + continue + }; // Verify and store retrieved proposals debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals"); diff --git a/bin/darkfid/src/task/sync.rs b/bin/darkfid/src/task/sync.rs index f9a8d0936..1d860448a 100644 --- a/bin/darkfid/src/task/sync.rs +++ b/bin/darkfid/src/task/sync.rs @@ -24,7 +24,7 @@ use tinyjson::JsonValue; use crate::{ proto::{ ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, SyncRequest, - SyncResponse, + SyncResponse, COMMS_TIMEOUT, }, Darkfid, }; @@ -52,7 +52,9 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { channel.send(&request).await?; // Node waits for response - let Ok(response) = response_sub.receive_with_timeout(15).await else { continue }; + let Ok(response) = response_sub.receive_with_timeout(COMMS_TIMEOUT).await else { + continue + }; // Parse response if response.synced { diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index 8993b69bb..60cc38116 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -179,10 +179,6 @@ impl Harness { self.alice.validator.consensus.append_proposal(&proposal).await?; let message = ProposalMessage(proposal); self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await; - // FIXME: some miners might not be connected our peers in the sync p2p, - // so we must broadcast to both networks when receiving a proposal - self.alice.sync_p2p.as_ref().broadcast(&message).await; - self.bob.sync_p2p.as_ref().broadcast(&message).await; } // Sleep a bit so blocks can be propagated and then @@ -280,8 +276,10 @@ pub async fn generate_node( let (sync_p2p, miners_p2p) = if let Some(settings) = miners_settings { let sync_p2p = spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), true).await; - let miners_p2p = - Some(spawn_miners_p2p(settings, &validator, &subscribers, ex.clone()).await); + let miners_p2p = Some( + spawn_miners_p2p(settings, &validator, &subscribers, ex.clone(), sync_p2p.clone()) + .await, + ); (sync_p2p, miners_p2p) } else { let sync_p2p = diff --git a/bin/darkfid/src/tests/mod.rs b/bin/darkfid/src/tests/mod.rs index a9489c59f..ed3704031 100644 --- a/bin/darkfid/src/tests/mod.rs +++ b/bin/darkfid/src/tests/mod.rs @@ -69,15 +69,14 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { // Nodes must have one fork with 2 blocks and one with 1 block th.validate_fork_chains(2, vec![2, 1]).await; - // We are going to create a third node and try to sync from the previous two + // We are going to create a third node and try to sync from Bob let mut sync_settings = Settings { localnet: true, inbound_connections: 3, ..Default::default() }; let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?; sync_settings.inbound_addrs = vec![charlie_url]; - let alice_url = th.alice.sync_p2p.settings().inbound_addrs[0].clone(); let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone(); - sync_settings.peers = vec![alice_url, bob_url]; + sync_settings.peers = vec![bob_url]; let charlie = generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?; // Verify node synced diff --git a/bin/darkfid/src/utils.rs b/bin/darkfid/src/utils.rs index cca0555c5..cb14f0538 100644 --- a/bin/darkfid/src/utils.rs +++ b/bin/darkfid/src/utils.rs @@ -54,7 +54,9 @@ pub async fn spawn_sync_p2p( let validator = _validator.clone(); let subscriber = _subscriber.clone(); async move { - ProtocolProposal::init(channel, validator, p2p, subscriber, miner).await.unwrap() + ProtocolProposal::init(channel, validator, p2p, subscriber, miner, None) + .await + .unwrap() } }) .await; @@ -86,6 +88,7 @@ pub async fn spawn_miners_p2p( validator: &ValidatorPtr, subscribers: &HashMap<&'static str, JsonSubscriber>, executor: Arc>, + sync_p2p: P2pPtr, ) -> P2pPtr { info!(target: "darkfid", "Registering miners network P2P protocols..."); let p2p = P2p::new(settings.clone(), executor.clone()).await; @@ -93,12 +96,16 @@ pub async fn spawn_miners_p2p( let _validator = validator.clone(); let _subscriber = subscribers.get("proposals").unwrap().clone(); + let _sync_p2p = Some(sync_p2p); registry .register(SESSION_ALL, move |channel, p2p| { let validator = _validator.clone(); let subscriber = _subscriber.clone(); + let sync_p2p = _sync_p2p.clone(); async move { - ProtocolProposal::init(channel, validator, p2p, subscriber, false).await.unwrap() + ProtocolProposal::init(channel, validator, p2p, subscriber, false, sync_p2p) + .await + .unwrap() } }) .await;