From a02e3475f61ca57e0c4e36c2078a709bc7fde320 Mon Sep 17 00:00:00 2001 From: parazyd Date: Tue, 22 Aug 2023 18:28:38 +0200 Subject: [PATCH] lib: Remove async-std dependency. --- src/consensus/proto/protocol_proposal.rs | 3 +- src/consensus/proto/protocol_sync.rs | 3 +- .../proto/protocol_sync_consensus.rs | 3 +- src/consensus/proto/protocol_tx.rs | 3 +- src/consensus/state.rs | 99 +++--- src/consensus/task/proposal.rs | 3 +- src/consensus/validator.rs | 4 +- src/event_graph/events_queue.rs | 3 +- src/event_graph/mod.rs | 80 ++--- src/event_graph/model.rs | 303 +++++++++--------- src/event_graph/protocol_event.rs | 4 +- src/event_graph/view.rs | 5 +- src/geode/mod.rs | 22 +- src/net/dnet.rs | 1 - src/net/transport.rs | 2 +- src/rpc/client.rs | 18 +- src/rpc/common.rs | 6 +- src/rpc/server.rs | 3 +- src/sdk/python/src/zkas.rs | 3 +- src/util/cli.rs | 16 +- src/util/encoding/base64.rs | 4 +- src/validator/mod.rs | 4 +- src/wallet/walletdb.rs | 77 ++--- src/zk/proof.rs | 2 +- tests/blockchain.rs | 59 ++-- tests/jsonrpc.rs | 80 +++-- tests/network_transports.rs | 116 ++++--- 27 files changed, 483 insertions(+), 443 deletions(-) diff --git a/src/consensus/proto/protocol_proposal.rs b/src/consensus/proto/protocol_proposal.rs index bb51d3407..bfddd0b83 100644 --- a/src/consensus/proto/protocol_proposal.rs +++ b/src/consensus/proto/protocol_proposal.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use async_trait::async_trait; use log::{debug, error, trace}; use smol::Executor; diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 3a30a52aa..4fd2c3ff1 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use async_trait::async_trait; use log::{debug, error, info}; use smol::Executor; diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index 9059e29f4..83690cc6a 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use async_trait::async_trait; use log::{debug, error}; use smol::Executor; diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index 81a98cdd3..3c66f4220 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use async_trait::async_trait; use log::debug; use smol::Executor; diff --git a/src/consensus/state.rs b/src/consensus/state.rs index 9139ec982..0fc91db57 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -842,58 +842,57 @@ mod tests { TESTNET_GENESIS_TIMESTAMP, TESTNET_INITIAL_DISTRIBUTION, }, wallet::WalletDb, - Result, }; - #[async_std::test] - async fn calc_sigmas_test() -> Result<()> { - // Generate dummy state - let wallet = WalletDb::new(None, None)?; - let sled_db = sled::Config::new().temporary(true).open()?; - let blockchain = Blockchain::new(&sled_db)?; - let state = ConsensusState::new( - wallet, - blockchain, - *TESTNET_BOOTSTRAP_TIMESTAMP, - *TESTNET_GENESIS_TIMESTAMP, - *TESTNET_GENESIS_HASH_BYTES, - *TESTNET_INITIAL_DISTRIBUTION, - true, - ); + #[test] + fn calc_sigmas_test() { + smol::block_on(async { + // Generate dummy state + let wallet = WalletDb::new(None, None).unwrap(); + let sled_db = sled::Config::new().temporary(true).open().unwrap(); + let blockchain = Blockchain::new(&sled_db).unwrap(); + let state = ConsensusState::new( + wallet, + blockchain, + *TESTNET_BOOTSTRAP_TIMESTAMP, + *TESTNET_GENESIS_TIMESTAMP, + *TESTNET_GENESIS_HASH_BYTES, + *TESTNET_INITIAL_DISTRIBUTION, + true, + ); - let precision_diff = Float10::try_from( - "10000000000000000000000000000000000000000000000000000000000000000000000000", - ) - .unwrap(); - let precision_diff_base = fbig2base(precision_diff); - let f = Float10::try_from("0.01").unwrap(); - let total_stake = Float10::try_from("100").unwrap(); - let (sigma1, sigma2) = state.calc_sigmas(f, total_stake); - let sigma1_rhs = Float10::try_from( - "2909373465034095801035568917399197865646520818579502832252119592405565440", - ) - .unwrap(); - let sigma1_rhs_base = fbig2base(sigma1_rhs); - let sigma2_rhs = Float10::try_from( - "9137556389643100714432609642916129738741963230846798778430644027392", - ) - .unwrap(); - let sigma2_rhs_base = fbig2base(sigma2_rhs); - let sigma1_delta = if sigma1_rhs_base > sigma1 { - sigma1_rhs_base - sigma1 - } else { - sigma1 - sigma1_rhs_base - }; - let sigma2_delta = if sigma2_rhs_base > sigma2 { - sigma2_rhs_base - sigma2 - } else { - sigma2 - sigma2_rhs_base - }; - //note! test cases were generated by low precision python scripts. - //https://github.com/ertosns/lotterysim/blob/master/pallas_unittests.csv - assert!(sigma1_delta < precision_diff_base); - assert!(sigma2_delta < precision_diff_base); - - Ok(()) + let precision_diff = Float10::try_from( + "10000000000000000000000000000000000000000000000000000000000000000000000000", + ) + .unwrap(); + let precision_diff_base = fbig2base(precision_diff); + let f = Float10::try_from("0.01").unwrap(); + let total_stake = Float10::try_from("100").unwrap(); + let (sigma1, sigma2) = state.calc_sigmas(f, total_stake); + let sigma1_rhs = Float10::try_from( + "2909373465034095801035568917399197865646520818579502832252119592405565440", + ) + .unwrap(); + let sigma1_rhs_base = fbig2base(sigma1_rhs); + let sigma2_rhs = Float10::try_from( + "9137556389643100714432609642916129738741963230846798778430644027392", + ) + .unwrap(); + let sigma2_rhs_base = fbig2base(sigma2_rhs); + let sigma1_delta = if sigma1_rhs_base > sigma1 { + sigma1_rhs_base - sigma1 + } else { + sigma1 - sigma1_rhs_base + }; + let sigma2_delta = if sigma2_rhs_base > sigma2 { + sigma2_rhs_base - sigma2 + } else { + sigma2 - sigma2_rhs_base + }; + //note! test cases were generated by low precision python scripts. + //https://github.com/ertosns/lotterysim/blob/master/pallas_unittests.csv + assert!(sigma1_delta < precision_diff_base); + assert!(sigma2_delta < precision_diff_base); + }); } } diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 79d61c5c7..ad865abb1 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use log::{debug, error, info, warn}; use super::consensus_sync_task; diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index 0c40cc782..c2448e4c1 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -16,9 +16,8 @@ * along with this program. If not, see . */ -use std::{collections::HashMap, io::Cursor}; +use std::{collections::HashMap, io::Cursor, sync::Arc}; -use async_std::sync::{Arc, RwLock}; use darkfi_sdk::{ blockchain::Slot, crypto::{ @@ -32,6 +31,7 @@ use darkfi_serial::{serialize, Decodable, Encodable, WriteExt}; use halo2_proofs::arithmetic::Field; use log::{debug, error, info, warn}; use rand::rngs::OsRng; +use smol::lock::RwLock; use crate::{ blockchain::{BlockInfo, Blockchain, BlockchainOverlay, BlockchainOverlayPtr}, diff --git a/src/event_graph/events_queue.rs b/src/event_graph/events_queue.rs index 9efd1bf51..0dbfda7c1 100644 --- a/src/event_graph/events_queue.rs +++ b/src/event_graph/events_queue.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use darkfi_serial::{Decodable, Encodable}; use crate::{event_graph::model::Event, Error, Result}; diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index d4f1c4ef8..2f14ba583 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -55,54 +55,56 @@ mod tests { } } - #[async_std::test] - async fn event_graph_integration() { - // Base structures - let events_queue = EventsQueue::::new(); - let mut model = Model::new(events_queue.clone()); - let _view = View::new(events_queue); + #[test] + fn event_graph_integration() { + smol::block_on(async { + // Base structures + let events_queue = EventsQueue::::new(); + let mut model = Model::new(events_queue.clone()); + let _view = View::new(events_queue); - // Buffers - let _seen_event: SeenPtr = Seen::new(); - let seen_inv: SeenPtr = Seen::new(); + // Buffers + let _seen_event: SeenPtr = Seen::new(); + let seen_inv: SeenPtr = Seen::new(); - let seen_ids = Seen::new(); - // Keeps track of the events we received, but haven't read yet - let mut unread_msgs = vec![]; + let seen_ids = Seen::new(); + // Keeps track of the events we received, but haven't read yet + let mut unread_msgs = vec![]; - let test_event0 = - TestEvent { nick: "brawndo".to_string(), msg: "Electrolytes".to_string() }; - let _test_event1 = - TestEvent { nick: "camacho".to_string(), msg: "Shieeeeeeeet".to_string() }; + let test_event0 = + TestEvent { nick: "brawndo".to_string(), msg: "Electrolytes".to_string() }; + let _test_event1 = + TestEvent { nick: "camacho".to_string(), msg: "Shieeeeeeeet".to_string() }; - // We create an event and broadcast it - let head_hash = model.get_head_hash(); - let event0 = Event { - previous_event_hash: head_hash, - action: test_event0, - timestamp: Timestamp::current_time(), - }; + // We create an event and broadcast it + let head_hash = model.get_head_hash(); + let event0 = Event { + previous_event_hash: head_hash, + action: test_event0, + timestamp: Timestamp::current_time(), + }; - // Simulate receiving the event - assert!(seen_ids.push(&event0.hash()).await); - // Simulate receiving the event again - assert!(!seen_ids.push(&event0.hash()).await); + // Simulate receiving the event + assert!(seen_ids.push(&event0.hash()).await); + // Simulate receiving the event again + assert!(!seen_ids.push(&event0.hash()).await); - // Add the event into the model - model.add(event0.clone()).await; + // Add the event into the model + model.add(event0.clone()).await; - // Send inventory - let inv0 = Inv { invs: vec![InvItem { hash: event0.hash() }] }; - // Simulate recieving the inventory - assert!(seen_inv.push(&inv0.invs[0].hash).await); - // Simulate recieving the inventory again - assert!(!seen_inv.push(&inv0.invs[0].hash).await); + // Send inventory + let inv0 = Inv { invs: vec![InvItem { hash: event0.hash() }] }; + // Simulate recieving the inventory + assert!(seen_inv.push(&inv0.invs[0].hash).await); + // Simulate recieving the inventory again + assert!(!seen_inv.push(&inv0.invs[0].hash).await); - // TODO: getdata (self.send_getdata(vec![inv_item.hash]).await?) + // TODO: getdata (self.send_getdata(vec![inv_item.hash]).await?) - // Add the event to the unread msgs vec - unread_msgs.push(event0); + // Add the event to the unread msgs vec + unread_msgs.push(event0); - // TODO: Simulate network behaviour, etc. + // TODO: Simulate network behaviour, etc. + }); } } diff --git a/src/event_graph/model.rs b/src/event_graph/model.rs index 7daff165a..63a089d96 100644 --- a/src/event_graph/model.rs +++ b/src/event_graph/model.rs @@ -16,14 +16,13 @@ * along with this program. If not, see . */ -use std::{cmp::Ordering, collections::HashMap, fmt::Debug, path::Path}; +use std::{cmp::Ordering, collections::HashMap, fmt::Debug, path::Path, sync::Arc}; -use async_std::sync::{Arc, Mutex}; -use blake3; use darkfi_serial::{ deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, }; use log::{error, info}; +use smol::lock::Mutex; use tinyjson::JsonValue; use crate::{ @@ -457,188 +456,196 @@ mod tests { Event { previous_event_hash, action: PrivMsgEvent::new(), timestamp } } - #[async_std::test] - async fn test_remove_old_events() { - let events_queue = EventsQueue::new(); - let mut model = Model::new(events_queue); - let root_id = model.current_root; + #[test] + fn test_remove_old_events() { + smol::block_on(async { + let events_queue = EventsQueue::new(); + let mut model = Model::new(events_queue); + let root_id = model.current_root; - // event_node 1 - // Fill this node with 10 events - // These are considered old events from 10 days ago - let mut event_node_1_ids = vec![]; - let mut id1 = root_id; - let timestamp = Timestamp::current_time().0 - 864000; // 864000 is 10 days in seconds - for i in 0..10 { - let node = create_message(id1, Timestamp(timestamp + i)); - id1 = node.hash(); - model.add(node).await; - event_node_1_ids.push(id1); - } - sleep(1).await; + // event_node 1 + // Fill this node with 10 events + // These are considered old events from 10 days ago + let mut event_node_1_ids = vec![]; + let mut id1 = root_id; + let timestamp = Timestamp::current_time().0 - 864000; // 864000 is 10 days in seconds + for i in 0..10 { + let node = create_message(id1, Timestamp(timestamp + i)); + id1 = node.hash(); + model.add(node).await; + event_node_1_ids.push(id1); + } + sleep(1).await; - // event_node 2 - // Fill this node with 10 events - // These are considered new events at current time - let timestamp = Timestamp::current_time().0; - for i in 0..150 { - let node = create_message(id1, Timestamp(timestamp + i)); - id1 = node.hash(); - model.add(node).await; - } - sleep(1).await; + // event_node 2 + // Fill this node with 10 events + // These are considered new events at current time + let timestamp = Timestamp::current_time().0; + for i in 0..150 { + let node = create_message(id1, Timestamp(timestamp + i)); + id1 = node.hash(); + model.add(node).await; + } + sleep(1).await; - // every event older than one week gets removed - let ts = Timestamp::current_time().0 - 604800; // one week in seconds - let _ = model.remove_old_events(Timestamp(ts)); + // every event older than one week gets removed + let ts = Timestamp::current_time().0 - 604800; // one week in seconds + let _ = model.remove_old_events(Timestamp(ts)); - // ensure the 10 events from event_node 1 are not in the tree anymore - for event in event_node_1_ids { - assert!(!model.event_map.contains_key(&event)); - } + // ensure the 10 events from event_node 1 are not in the tree anymore + for event in event_node_1_ids { + assert!(!model.event_map.contains_key(&event)); + } - // event_node 2 events (150) + root event = 151 events - assert_eq!(model.event_map.len(), 151_usize); + // event_node 2 events (150) + root event = 151 events + assert_eq!(model.event_map.len(), 151_usize); + }); } - #[async_std::test] - async fn test_prune_chains() { - let events_queue = EventsQueue::new(); - let mut model = Model::new(events_queue); - let root_id = model.current_root; + #[test] + fn test_prune_chains() { + smol::block_on(async { + let events_queue = EventsQueue::new(); + let mut model = Model::new(events_queue); + let root_id = model.current_root; - // event_node 1 - // Fill this node with 10 events - let mut event_node_1_ids = vec![]; - let mut id1 = root_id; - for _ in 0..10 { - let node = create_message(id1, Timestamp::current_time()); - id1 = node.hash(); - model.add(node).await; - event_node_1_ids.push(id1); - } + // event_node 1 + // Fill this node with 10 events + let mut event_node_1_ids = vec![]; + let mut id1 = root_id; + for _ in 0..10 { + let node = create_message(id1, Timestamp::current_time()); + id1 = node.hash(); + model.add(node).await; + event_node_1_ids.push(id1); + } - sleep(1).await; + sleep(1).await; - // event_node 2 - // Start from the root_id and fill the node with (MAX_DEPTH + 10) events. - // All the events from event_node_1 should get removed from the tree - let mut id2 = root_id; - for _ in 0..(MAX_DEPTH + 10) { - let node = create_message(id2, Timestamp::current_time()); - id2 = node.hash(); - model.add(node).await; - } + // event_node 2 + // Start from the root_id and fill the node with (MAX_DEPTH + 10) events. + // All the events from event_node_1 should get removed from the tree + let mut id2 = root_id; + for _ in 0..(MAX_DEPTH + 10) { + let node = create_message(id2, Timestamp::current_time()); + id2 = node.hash(); + model.add(node).await; + } - assert_eq!(model.find_head(), id2); + assert_eq!(model.find_head(), id2); - // Ensure events from node 1 are removed in favor of node 2's longer chain - for id in event_node_1_ids { - assert!(!model.event_map.contains_key(&id)); - } + // Ensure events from node 1 are removed in favor of node 2's longer chain + for id in event_node_1_ids { + assert!(!model.event_map.contains_key(&id)); + } - // node1: (10 leaves) + node2: (MAX_DEPTH + 10) events + root event = (MAX_DEPTH + 11) - // these ^^^^^^^^^^^ are pruned - assert_eq!(model.event_map.len(), (MAX_DEPTH + 11) as usize); + // node1: (10 leaves) + node2: (MAX_DEPTH + 10) events + root event = (MAX_DEPTH + 11) + // these ^^^^^^^^^^^ are pruned + assert_eq!(model.event_map.len(), (MAX_DEPTH + 11) as usize); + }); } - #[async_std::test] - async fn test_diff_depth() { - let events_queue = EventsQueue::new(); - let mut model = Model::new(events_queue); - let root_id = model.current_root; + #[test] + fn test_diff_depth() { + smol::block_on(async { + let events_queue = EventsQueue::new(); + let mut model = Model::new(events_queue); + let root_id = model.current_root; - // event_node 1 - // Fill this node with (MAX_DEPTH / 2) events - let mut id1 = root_id; - for _ in 0..(MAX_DEPTH / 2) { - let node = create_message(id1, Timestamp::current_time()); - id1 = node.hash(); - model.add(node).await; - } + // event_node 1 + // Fill this node with (MAX_DEPTH / 2) events + let mut id1 = root_id; + for _ in 0..(MAX_DEPTH / 2) { + let node = create_message(id1, Timestamp::current_time()); + id1 = node.hash(); + model.add(node).await; + } - sleep(1).await; + sleep(1).await; - // event_node 2 - // Start from the root_id and fill the node with (MAX_DEPTH + 10) events - // all the events must be added since the depth between id1 - // and the last head is less than MAX_DEPTH - let mut id2 = root_id; - for _ in 0..(MAX_DEPTH + 10) { - let node = create_message(id2, Timestamp::current_time()); - id2 = node.hash(); - model.add(node).await; - } + // event_node 2 + // Start from the root_id and fill the node with (MAX_DEPTH + 10) events + // all the events must be added since the depth between id1 + // and the last head is less than MAX_DEPTH + let mut id2 = root_id; + for _ in 0..(MAX_DEPTH + 10) { + let node = create_message(id2, Timestamp::current_time()); + id2 = node.hash(); + model.add(node).await; + } - assert_eq!(model.find_head(), id2); + assert_eq!(model.find_head(), id2); - sleep(1).await; + sleep(1).await; - // event_node 3 - // This will start as new chain, but no events will be added - // since the last event's depth is MAX_DEPTH + 10 - let mut id3 = root_id; - for _ in 0..30 { - let node = create_message(id3, Timestamp::current_time()); - id3 = node.hash(); - model.add(node).await; + // event_node 3 + // This will start as new chain, but no events will be added + // since the last event's depth is MAX_DEPTH + 10 + let mut id3 = root_id; + for _ in 0..30 { + let node = create_message(id3, Timestamp::current_time()); + id3 = node.hash(); + model.add(node).await; - // ensure events are not added - assert!(!model.event_map.contains_key(&id3)); - } + // ensure events are not added + assert!(!model.event_map.contains_key(&id3)); + } - sleep(1).await; + sleep(1).await; - assert_eq!(model.find_head(), id2); + assert_eq!(model.find_head(), id2); - // Add more events to the event_node 1 - // At the end this chain must overtake the event_node 2 - for _ in (MAX_DEPTH / 2)..(MAX_DEPTH + 15) { - let node = create_message(id1, Timestamp::current_time()); - id1 = node.hash(); - model.add(node).await; - } + // Add more events to the event_node 1 + // At the end this chain must overtake the event_node 2 + for _ in (MAX_DEPTH / 2)..(MAX_DEPTH + 15) { + let node = create_message(id1, Timestamp::current_time()); + id1 = node.hash(); + model.add(node).await; + } - assert_eq!(model.find_head(), id1); + assert_eq!(model.find_head(), id1); + }); } - #[async_std::test] - async fn save_load_model() -> Result<()> { - // Setup directories - let path = "/tmp/test_model"; - remove_dir_all(path).ok(); - let path = PathBuf::from(path); - create_dir_all(&path)?; + #[test] + fn save_load_model() -> Result<()> { + smol::block_on(async { + // Setup directories + let path = "/tmp/test_model"; + remove_dir_all(path).ok(); + let path = PathBuf::from(path); + create_dir_all(&path)?; - // First model - let events_queue = EventsQueue::::new(); - let mut model1 = Model::new(events_queue); - let root_id = model1.current_root; + // First model + let events_queue = EventsQueue::::new(); + let mut model1 = Model::new(events_queue); + let root_id = model1.current_root; - // Create an event - let event = create_message(root_id, Timestamp::current_time()); - // Add event to first model - model1.add(event).await; + // Create an event + let event = create_message(root_id, Timestamp::current_time()); + // Add event to first model + model1.add(event).await; - // Save first model - model1.save_tree(&path)?; + // Save first model + model1.save_tree(&path)?; - // Second model - let events_queue = EventsQueue::::new(); - let mut model2 = Model::new(events_queue); + // Second model + let events_queue = EventsQueue::::new(); + let mut model2 = Model::new(events_queue); - // Load into second model - model2.load_tree(&path)?; + // Load into second model + model2.load_tree(&path)?; - // Test equality - let res = model1.event_map.len() == model2.event_map.len() && - model1.event_map.keys().all(|k| model2.event_map.contains_key(k)); + // Test equality + let res = model1.event_map.len() == model2.event_map.len() && + model1.event_map.keys().all(|k| model2.event_map.contains_key(k)); - assert!(res); + assert!(res); - remove_dir_all(path).ok(); + remove_dir_all(path).ok(); - Ok(()) + Ok(()) + }) } #[test] diff --git a/src/event_graph/protocol_event.rs b/src/event_graph/protocol_event.rs index a4cf53428..6e78aabd3 100644 --- a/src/event_graph/protocol_event.rs +++ b/src/event_graph/protocol_event.rs @@ -16,12 +16,12 @@ * along with this program. If not, see . */ -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; -use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use darkfi_serial::{Decodable, Encodable, SerialDecodable, SerialEncodable}; use log::debug; +use smol::lock::Mutex; use super::EventMsg; use crate::{ diff --git a/src/event_graph/view.rs b/src/event_graph/view.rs index f2375dab5..2775df8bf 100644 --- a/src/event_graph/view.rs +++ b/src/event_graph/view.rs @@ -16,9 +16,10 @@ * along with this program. If not, see . */ -use async_std::sync::{Arc, Mutex}; +use std::{collections::HashMap, sync::Arc}; + use darkfi_serial::{Decodable, Encodable}; -use std::collections::HashMap; +use smol::lock::Mutex; use crate::{ event_graph::{ diff --git a/src/geode/mod.rs b/src/geode/mod.rs index d981020f7..76a92d098 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -56,16 +56,16 @@ //! chunks to be specific to a single file and therefore when we do garbage //! collection, we keep chunks and files independent of each other. -use std::collections::HashSet; +use std::{collections::HashSet, path::PathBuf}; -use async_std::{ - fs::{self, File, OpenOptions}, - io::{prelude::*, BufReader, Cursor, SeekFrom}, - path::PathBuf, - stream::StreamExt, -}; use futures::AsyncRead; use log::{debug, info, warn}; +use smol::{ + fs, + fs::{File, OpenOptions}, + io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, SeekFrom}, + stream::StreamExt, +}; use crate::{Error, Result}; @@ -159,7 +159,7 @@ impl Geode { let chunk_path = entry.path(); // Skip if we're not a plain file - if !chunk_path.is_file().await { + if !chunk_path.is_file() { continue } @@ -220,7 +220,7 @@ impl Geode { let path = entry.path(); // Skip if we're not a plain file - if !path.is_file().await { + if !path.is_file() { continue } @@ -394,7 +394,7 @@ impl Geode { let mut c_path = self.chunks_path.clone(); c_path.push(chunk_hash.to_hex().as_str()); - if !c_path.exists().await || !c_path.is_file().await { + if !c_path.exists() || !c_path.is_file() { // TODO: We should be aggressive here and remove the non-file. continue } @@ -424,7 +424,7 @@ impl Geode { let mut chunk_path = self.chunks_path.clone(); chunk_path.push(chunk_hash.to_hex().as_str()); - if !chunk_path.exists().await || !chunk_path.is_file().await { + if !chunk_path.exists() || !chunk_path.is_file() { // TODO: We should be aggressive here and remove the non-file. return Err(Error::GeodeChunkNotFound) } diff --git a/src/net/dnet.rs b/src/net/dnet.rs index 1bc6bcd7c..3aa1a546b 100644 --- a/src/net/dnet.rs +++ b/src/net/dnet.rs @@ -16,7 +16,6 @@ * along with this program. If not, see . */ -use darkfi_serial::{SerialDecodable, SerialEncodable}; use url::Url; use super::channel::ChannelInfo; diff --git a/src/net/transport.rs b/src/net/transport.rs index 8d318c89b..15ccc2f59 100644 --- a/src/net/transport.rs +++ b/src/net/transport.rs @@ -319,7 +319,7 @@ impl Listener { #[cfg(feature = "p2p-transport-unix")] ListenerVariant::Unix(listener) => { let path = self.endpoint.to_file_path()?; - let l = listener.do_listen(&path.into()).await?; + let l = listener.do_listen(&path).await?; Ok(Box::new(l)) } } diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 29e63b7f1..8b5991e59 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use futures::{select, FutureExt}; use log::{debug, error}; use smol::channel::{Receiver, Sender}; @@ -43,7 +44,7 @@ pub struct RpcClient { impl RpcClient { /// Instantiate a new JSON-RPC client that will connect to the given endpoint - pub async fn new(endpoint: Url, ex: Option>>) -> Result { + pub async fn new(endpoint: Url, ex: Arc>) -> Result { let (sender, receiver, stop_signal) = Self::open_channels(endpoint.clone(), ex).await?; Ok(Self { sender, receiver, stop_signal, endpoint }) } @@ -51,7 +52,7 @@ impl RpcClient { /// Instantiate async channels for a new [`RpcClient`] async fn open_channels( endpoint: Url, - ex: Option>>, + ex: Arc>, ) -> Result<(Sender<(JsonRequest, bool)>, Receiver, Sender<()>)> { let (data_send, data_recv) = smol::channel::unbounded(); let (result_send, result_recv) = smol::channel::unbounded(); @@ -61,14 +62,9 @@ impl RpcClient { // TODO: Could add a timeout here: let stream = dialer.dial(None).await?; - // By passing in an executor we can avoid the global executor provided - // by these crates. Production usage should actually give an exexutor - // to `RpcClient::new()`. - if let Some(ex) = ex { - ex.spawn(Self::reqrep_loop(stream, result_send, data_recv, stop_recv)).detach(); - } else { - smol::spawn(Self::reqrep_loop(stream, result_send, data_recv, stop_recv)).detach(); - } + // TODO: StoppableTask, see also above there's the stop_{send,recv}. Remove them + // and replace with using StoppableTask. + ex.spawn(Self::reqrep_loop(stream, result_send, data_recv, stop_recv)).detach(); Ok((data_send, result_recv, stop_send)) } diff --git a/src/rpc/common.rs b/src/rpc/common.rs index ceb027ffe..3ee70d940 100644 --- a/src/rpc/common.rs +++ b/src/rpc/common.rs @@ -18,10 +18,10 @@ use std::time::Duration; -use async_std::io::{timeout, ReadExt, WriteExt}; +use smol::io::{AsyncReadExt, AsyncWriteExt}; use super::jsonrpc::*; -use crate::{error::RpcError, net::transport::PtStream, Result}; +use crate::{error::RpcError, net::transport::PtStream, system::io_timeout, Result}; pub(super) const INIT_BUF_SIZE: usize = 4096; // 4K pub(super) const MAX_BUF_SIZE: usize = 1024 * 8192; // 8M @@ -40,7 +40,7 @@ pub(super) async fn read_from_stream( // Lame we have to duplicate this code, but it is what it is. if with_timeout { - match timeout(READ_TIMEOUT, stream.read(&mut buf[total_read..])).await { + match io_timeout(READ_TIMEOUT, stream.read(&mut buf[total_read..])).await { Ok(0) if total_read == 0 => { return Err( RpcError::ConnectionClosed("Connection closed cleanly".to_string()).into() diff --git a/src/rpc/server.rs b/src/rpc/server.rs index eea059269..5c919426f 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -16,7 +16,8 @@ * along with this program. If not, see . */ -use async_std::sync::Arc; +use std::sync::Arc; + use async_trait::async_trait; use log::{debug, error, info}; use tinyjson::JsonValue; diff --git a/src/sdk/python/src/zkas.rs b/src/sdk/python/src/zkas.rs index 94e8b3ed0..f84ec1af2 100644 --- a/src/sdk/python/src/zkas.rs +++ b/src/sdk/python/src/zkas.rs @@ -273,8 +273,7 @@ impl Proof { .unwrap(); // Now replace the "stuff" back again - for (old_circ, (circ, stuff)) in - circuits.iter().zip(ucircuits.into_iter().zip(other_stuff.into_iter())) + for (old_circ, (circ, stuff)) in circuits.iter().zip(ucircuits.into_iter().zip(other_stuff)) { old_circ.replace(ZkCircuit(circ, stuff.0, stuff.1)); } diff --git a/src/util/cli.rs b/src/util/cli.rs index c21b2a1bb..969c47e73 100644 --- a/src/util/cli.rs +++ b/src/util/cli.rs @@ -133,8 +133,8 @@ pub fn get_log_config(verbosity_level: u8) -> simplelog::Config { /// /// Example usage: /// ``` -/// use async_std::{stream::StreamExt, sync::Arc}; -// use darkfi::{async_daemonize, cli_desc, Result}; +/// use darkfi::{async_daemonize, cli_desc, Result}; +/// use smol::stream::StreamExt; /// use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; /// /// const CONFIG_FILE: &str = "daemond_config.toml"; @@ -228,7 +228,9 @@ macro_rules! async_daemonize { } impl SignalHandler { - fn new() -> Result<(Self, async_std::task::JoinHandle>)> { + fn new( + ex: std::sync::Arc>, + ) -> Result<(Self, smol::Task>)> { let (term_tx, term_rx) = smol::channel::bounded::<()>(1); let signals = signal_hook_async_std::Signals::new([ signal_hook::consts::SIGHUP, @@ -238,17 +240,13 @@ macro_rules! async_daemonize { ])?; let handle = signals.handle(); let sighup_sub = darkfi::system::Subscriber::new(); - let signals_task = - async_std::task::spawn(handle_signals(signals, term_tx, sighup_sub.clone())); + let signals_task = ex.spawn(handle_signals(signals, term_tx, sighup_sub.clone())); Ok((Self { term_rx, handle, sighup_sub }, signals_task)) } /// Handler waits for termination signal - async fn wait_termination( - &self, - signals_task: async_std::task::JoinHandle>, - ) -> Result<()> { + async fn wait_termination(&self, signals_task: smol::Task>) -> Result<()> { self.term_rx.recv().await?; print!("\r"); self.handle.close(); diff --git a/src/util/encoding/base64.rs b/src/util/encoding/base64.rs index e2c411c43..ad8b35e0c 100644 --- a/src/util/encoding/base64.rs +++ b/src/util/encoding/base64.rs @@ -257,10 +257,10 @@ mod tests { ]; for &(input, answer) in EXAMPLES.iter() { - let res = encode(&input); + let res = encode(input); assert_eq!(answer, res); - let res = decode(&answer).unwrap(); + let res = decode(answer).unwrap(); assert_eq!(input, res); } } diff --git a/src/validator/mod.rs b/src/validator/mod.rs index e9d218cd6..bb798a900 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -16,10 +16,12 @@ * along with this program. If not, see . */ -use async_std::sync::{Arc, RwLock}; +use std::sync::Arc; + use darkfi_sdk::{blockchain::Slot, crypto::PublicKey}; use darkfi_serial::serialize; use log::{debug, error, info, warn}; +use smol::lock::RwLock; use crate::{ blockchain::{BlockInfo, Blockchain, BlockchainOverlay}, diff --git a/src/wallet/walletdb.rs b/src/wallet/walletdb.rs index 8d13b64a2..d726bddb9 100644 --- a/src/wallet/walletdb.rs +++ b/src/wallet/walletdb.rs @@ -16,11 +16,11 @@ * along with this program. If not, see . */ -use std::{any::Any, path::PathBuf}; +use std::{any::Any, path::PathBuf, sync::Arc}; -use async_std::sync::{Arc, Mutex}; use log::{debug, info}; use rusqlite::Connection; +use smol::lock::Mutex; use crate::Result; @@ -162,48 +162,51 @@ impl WalletDb { mod tests { use super::*; - #[async_std::test] - async fn test_mem_wallet() { - let wallet = WalletDb::new(None, Some("foobar")).unwrap(); - wallet.exec_sql("CREATE TABLE mista ( numba INTEGER );").await.unwrap(); - wallet.exec_sql("INSERT INTO mista ( numba ) VALUES ( 42 );").await.unwrap(); + #[test] + fn test_mem_wallet() { + smol::block_on(async { + let wallet = WalletDb::new(None, Some("foobar")).unwrap(); + wallet.exec_sql("CREATE TABLE mista ( numba INTEGER );").await.unwrap(); + wallet.exec_sql("INSERT INTO mista ( numba ) VALUES ( 42 );").await.unwrap(); - let conn = wallet.conn.lock().await; - let mut stmt = conn.prepare("SELECT numba FROM mista").unwrap(); - let numba: u64 = stmt.query_row((), |row| Ok(row.get("numba").unwrap())).unwrap(); - stmt.finalize().unwrap(); - assert!(numba == 42); + let conn = wallet.conn.lock().await; + let mut stmt = conn.prepare("SELECT numba FROM mista").unwrap(); + let numba: u64 = stmt.query_row((), |row| Ok(row.get("numba").unwrap())).unwrap(); + stmt.finalize().unwrap(); + assert!(numba == 42); + }); } - #[async_std::test] - async fn test_query_single() { - let wallet = WalletDb::new(None, None).unwrap(); - wallet - .exec_sql("CREATE TABLE mista ( why INTEGER, are TEXT, you INTEGER, gae BLOB );") - .await - .unwrap(); + #[test] + fn test_query_single() { + smol::block_on(async { + let wallet = WalletDb::new(None, None).unwrap(); + wallet + .exec_sql("CREATE TABLE mista ( why INTEGER, are TEXT, you INTEGER, gae BLOB );") + .await + .unwrap(); - let why = 42; - let are = "are".to_string(); - let you = 69; - let gae = vec![42u8; 32]; + let why = 42; + let are = "are".to_string(); + let you = 69; + let gae = vec![42u8; 32]; - let query_str = - format!("INSERT INTO mista ( why, are, you, gae ) VALUES (?1, ?2, ?3, ?4);"); + let query_str = "INSERT INTO mista ( why, are, you, gae ) VALUES (?1, ?2, ?3, ?4);"; - let wallet_conn = wallet.conn.lock().await; - let mut stmt = wallet_conn.prepare(&query_str).unwrap(); - stmt.execute(rusqlite::params![why, are, you, gae]).unwrap(); - stmt.finalize().unwrap(); - drop(wallet_conn); + let wallet_conn = wallet.conn.lock().await; + let mut stmt = wallet_conn.prepare(query_str).unwrap(); + stmt.execute(rusqlite::params![why, are, you, gae]).unwrap(); + stmt.finalize().unwrap(); + drop(wallet_conn); - let ret = - wallet.query_single("mista", vec!["why", "are", "you", "gae"], None).await.unwrap(); - assert!(ret.len() == 4); + let ret = + wallet.query_single("mista", vec!["why", "are", "you", "gae"], None).await.unwrap(); + assert!(ret.len() == 4); - assert!(ret[0].inner::().unwrap() == &why); - assert!(ret[1].inner::().unwrap() == &are); - assert!(ret[2].inner::().unwrap() == &you); - assert!(ret[3].inner::>().unwrap() == &gae); + assert!(ret[0].inner::().unwrap() == &why); + assert!(ret[1].inner::().unwrap() == &are); + assert!(ret[2].inner::().unwrap() == &you); + assert!(ret[3].inner::>().unwrap() == &gae); + }); } } diff --git a/src/zk/proof.rs b/src/zk/proof.rs index 46aa7081a..9539cbd90 100644 --- a/src/zk/proof.rs +++ b/src/zk/proof.rs @@ -17,10 +17,10 @@ */ use std::{io, io::Cursor}; +use darkfi_sdk::pasta::{pallas, vesta}; use darkfi_serial::{SerialDecodable, SerialEncodable}; use halo2_proofs::{ helpers::SerdeFormat, - pasta::{pallas, vesta}, plonk, plonk::{Circuit, SingleVerifier}, poly::commitment::Params, diff --git a/tests/blockchain.rs b/tests/blockchain.rs index 5a357433d..337206084 100644 --- a/tests/blockchain.rs +++ b/tests/blockchain.rs @@ -62,10 +62,10 @@ impl Harness { let previous_slot_info = PreviousSlot::new( producers, vec![previous_hash], - vec![previous.header.previous.clone()], + vec![previous.header.previous], previous_slot.pid.error, ); - let (f, error, sigma1, sigma2) = slot_pid_output(&previous_slot, producers); + let (f, error, sigma1, sigma2) = slot_pid_output(previous_slot, producers); let pid = PidOutput::new(f, error, sigma1, sigma2); let total_tokens = previous_slot.total_tokens + previous_slot.reward; let reward = next_block_reward(); @@ -76,13 +76,8 @@ impl Harness { timestamp.add(1); // Generate header - let header = Header::new( - previous_hash, - previous.header.epoch, - id, - timestamp, - previous.header.root.clone(), - ); + let header = + Header::new(previous_hash, previous.header.epoch, id, timestamp, previous.header.root); BlockInfo::new(header, vec![], previous.producer.clone(), vec![slot]) } @@ -97,7 +92,7 @@ impl Harness { // This is what the validator will execute when it receives a block. fn add_blocks_to_chain(&self, blockchain: &Blockchain, blocks: &[BlockInfo]) -> Result<()> { // Create overlay - let blockchain_overlay = BlockchainOverlay::new(&blockchain)?; + let blockchain_overlay = BlockchainOverlay::new(blockchain)?; let lock = blockchain_overlay.lock().unwrap(); // When we insert genesis, chain is empty @@ -133,34 +128,36 @@ impl Harness { } } -#[async_std::test] -async fn blockchain_add_blocks() -> Result<()> { - // Initialize harness - let th = Harness::new()?; +#[test] +fn blockchain_add_blocks() -> Result<()> { + smol::block_on(async { + // Initialize harness + let th = Harness::new()?; - // Check that nothing exists - th.is_empty(); + // Check that nothing exists + th.is_empty(); - // We generate some blocks - let mut blocks = vec![]; + // We generate some blocks + let mut blocks = vec![]; - let genesis_block = BlockInfo::default(); - blocks.push(genesis_block.clone()); + let genesis_block = BlockInfo::default(); + blocks.push(genesis_block.clone()); - let block = th.generate_next_block(&genesis_block); - blocks.push(block.clone()); + let block = th.generate_next_block(&genesis_block); + blocks.push(block.clone()); - let block = th.generate_next_block(&block); - blocks.push(block.clone()); + let block = th.generate_next_block(&block); + blocks.push(block.clone()); - let block = th.generate_next_block(&block); - blocks.push(block.clone()); + let block = th.generate_next_block(&block); + blocks.push(block.clone()); - th.add_blocks(&blocks)?; + th.add_blocks(&blocks)?; - // Validate chains - th.validate_chains()?; + // Validate chains + th.validate_chains()?; - // Thanks for reading - Ok(()) + // Thanks for reading + Ok(()) + }) } diff --git a/tests/jsonrpc.rs b/tests/jsonrpc.rs index 63390b041..39c401f52 100644 --- a/tests/jsonrpc.rs +++ b/tests/jsonrpc.rs @@ -16,9 +16,14 @@ * along with this program. If not, see . */ -use async_std::{net::TcpListener, sync::Arc, task}; +use std::sync::Arc; + use async_trait::async_trait; -use smol::channel::{Receiver, Sender}; +use smol::{ + channel::{Receiver, Sender}, + net::TcpListener, + Executor, +}; use tinyjson::JsonValue; use url::Url; @@ -51,49 +56,56 @@ impl RpcSrv { impl RequestHandler for RpcSrv { async fn handle_request(&self, req: JsonRequest) -> JsonResult { assert!(req.params.is_array()); - let method = String::try_from(req.method).unwrap(); - let params = req.params; - match method.as_str() { - "ping" => return self.pong(req.id, params).await, - "kill" => return self.kill(req.id, params).await, + match req.method.as_str() { + "ping" => return self.pong(req.id, req.params).await, + "kill" => return self.kill(req.id, req.params).await, _ => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } } } -#[async_std::test] -async fn jsonrpc_reqrep() -> Result<()> { - // Find an available port - let listener = TcpListener::bind("127.0.0.1:0").await?; - let sockaddr = listener.local_addr()?; - let endpoint = Url::parse(&format!("tcp://127.0.0.1:{}", sockaddr.port()))?; - drop(listener); +#[test] +fn jsonrpc_reqrep() -> Result<()> { + let executor = Arc::new(Executor::new()); + let executor_ = executor.clone(); - let rpcsrv = Arc::new(RpcSrv { stop_sub: smol::channel::unbounded() }); - let listener = Listener::new(endpoint.clone()).await?.listen().await?; + smol::block_on(executor.run(async { + // Find an available port + let listener = TcpListener::bind("127.0.0.1:0").await?; + let sockaddr = listener.local_addr()?; + let endpoint = Url::parse(&format!("tcp://127.0.0.1:{}", sockaddr.port()))?; + drop(listener); - task::spawn(async move { - while let Ok((stream, peer_addr)) = listener.next().await { - let _rh = rpcsrv.clone(); - task::spawn(async move { - let _ = accept(stream, peer_addr.clone(), _rh).await; - }); - } - }); + let rpcsrv = Arc::new(RpcSrv { stop_sub: smol::channel::unbounded() }); + let listener = Listener::new(endpoint.clone()).await?.listen().await?; - let client = RpcClient::new(endpoint, None).await?; - let req = JsonRequest::new("ping", vec![]); - let rep = client.request(req).await?; + executor + .spawn(async move { + while let Ok((stream, peer_addr)) = listener.next().await { + let _rh = rpcsrv.clone(); + executor_ + .spawn(async move { + let _ = accept(stream, peer_addr.clone(), _rh).await; + }) + .detach(); + } + }) + .detach(); - let rep = String::try_from(rep).unwrap(); - assert_eq!(&rep, "pong"); + let client = RpcClient::new(endpoint, executor.clone()).await?; + let req = JsonRequest::new("ping", vec![]); + let rep = client.request(req).await?; - let req = JsonRequest::new("kill", vec![]); - let rep = client.request(req).await?; + let rep = String::try_from(rep).unwrap(); + assert_eq!(&rep, "pong"); - let rep = String::try_from(rep).unwrap(); - assert_eq!(&rep, "bye"); + let req = JsonRequest::new("kill", vec![]); + let rep = client.request(req).await?; - Ok(()) + let rep = String::try_from(rep).unwrap(); + assert_eq!(&rep, "bye"); + + Ok(()) + })) } diff --git a/tests/network_transports.rs b/tests/network_transports.rs index e2ec2538d..76e49f284 100644 --- a/tests/network_transports.rs +++ b/tests/network_transports.rs @@ -16,79 +16,97 @@ * along with this program. If not, see . */ -use async_std::{ - io, - io::{ReadExt, WriteExt}, - task, +use smol::{ + io::{self, AsyncReadExt, AsyncWriteExt}, + LocalExecutor, }; use url::Url; use darkfi::net::transport::{Dialer, Listener}; -#[async_std::test] -async fn tcp_transport() { +#[test] +fn tcp_transport() { + let executor = LocalExecutor::new(); let url = Url::parse("tcp://127.0.0.1:5432").unwrap(); - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); - task::spawn(async move { - let (stream, _) = listener.next().await.unwrap(); - let (mut reader, mut writer) = smol::io::split(stream); - io::copy(&mut reader, &mut writer).await.unwrap(); - }); - let payload = b"ohai tcp"; + smol::block_on(executor.run(async { + let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + executor + .spawn(async move { + let (stream, _) = listener.next().await.unwrap(); + let (mut reader, mut writer) = smol::io::split(stream); + io::copy(&mut reader, &mut writer).await.unwrap(); + }) + .detach(); - let dialer = Dialer::new(url).await.unwrap(); - let mut client = dialer.dial(None).await.unwrap(); - client.write_all(payload).await.unwrap(); - let mut buf = vec![0u8; 8]; - client.read_exact(&mut buf).await.unwrap(); + let payload = b"ohai tcp"; - assert_eq!(buf, payload); + let dialer = Dialer::new(url).await.unwrap(); + let mut client = dialer.dial(None).await.unwrap(); + client.write_all(payload).await.unwrap(); + let mut buf = vec![0u8; 8]; + client.read_exact(&mut buf).await.unwrap(); + + assert_eq!(buf, payload); + })); } -#[async_std::test] -async fn tcp_tls_transport() { +#[test] +fn tcp_tls_transport() { + let executor = LocalExecutor::new(); let url = Url::parse("tcp+tls://127.0.0.1:5433").unwrap(); - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); - task::spawn(async move { - let (stream, _) = listener.next().await.unwrap(); - let (mut reader, mut writer) = smol::io::split(stream); - io::copy(&mut reader, &mut writer).await.unwrap(); - }); - let payload = b"ohai tls"; + smol::block_on(executor.run(async { + let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + executor + .spawn(async move { + let (stream, _) = listener.next().await.unwrap(); + let (mut reader, mut writer) = smol::io::split(stream); + io::copy(&mut reader, &mut writer).await.unwrap(); + }) + .detach(); - let dialer = Dialer::new(url).await.unwrap(); - let mut client = dialer.dial(None).await.unwrap(); - client.write_all(payload).await.unwrap(); - let mut buf = vec![0u8; 8]; - client.read_exact(&mut buf).await.unwrap(); + let payload = b"ohai tls"; - assert_eq!(buf, payload); + let dialer = Dialer::new(url).await.unwrap(); + let mut client = dialer.dial(None).await.unwrap(); + client.write_all(payload).await.unwrap(); + let mut buf = vec![0u8; 8]; + client.read_exact(&mut buf).await.unwrap(); + + assert_eq!(buf, payload); + })); } -#[async_std::test] -async fn unix_transport() { +#[test] +fn unix_transport() { + let executor = LocalExecutor::new(); + let tmpdir = std::env::temp_dir(); let url = Url::parse(&format!( "unix://{}/darkfi_unix_plain.sock", tmpdir.as_os_str().to_str().unwrap() )) .unwrap(); - let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); - task::spawn(async move { - let (stream, _) = listener.next().await.unwrap(); - let (mut reader, mut writer) = smol::io::split(stream); - io::copy(&mut reader, &mut writer).await.unwrap(); - }); - let payload = b"ohai unix"; + smol::block_on(executor.run(async { + let listener = Listener::new(url.clone()).await.unwrap().listen().await.unwrap(); + executor + .spawn(async move { + let (stream, _) = listener.next().await.unwrap(); + let (mut reader, mut writer) = smol::io::split(stream); + io::copy(&mut reader, &mut writer).await.unwrap(); + }) + .detach(); - let dialer = Dialer::new(url).await.unwrap(); - let mut client = dialer.dial(None).await.unwrap(); - client.write_all(payload).await.unwrap(); - let mut buf = vec![0u8; 9]; - client.read_exact(&mut buf).await.unwrap(); + let payload = b"ohai unix"; - assert_eq!(buf, payload); + let dialer = Dialer::new(url).await.unwrap(); + let mut client = dialer.dial(None).await.unwrap(); + client.write_all(payload).await.unwrap(); + let mut buf = vec![0u8; 9]; + client.read_exact(&mut buf).await.unwrap(); + + assert_eq!(buf, payload); + })); }