rpcclient: don't use Executor for spawn a task

This commit is contained in:
ghassmo
2022-05-13 11:33:02 +03:00
parent 94f37b764b
commit 5cec2b98ff
5 changed files with 16 additions and 48 deletions

View File

@@ -1,9 +1,5 @@
use async_std::sync::Arc;
use async_executor::Executor;
use clap::{IntoApp, Parser, Subcommand};
use serde_json::{json, Value};
use smol::future;
use url::Url;
use darkfi::{
@@ -41,9 +37,9 @@ impl Rpc {
}
}
async fn start(options: CliDao, executor: Arc<Executor<'_>>) -> Result<()> {
async fn start(options: CliDao) -> Result<()> {
let rpc_addr = "tcp://127.0.0.1:7777";
let client = Rpc { client: RpcClient::new(Url::parse(rpc_addr)?, executor).await? };
let client = Rpc { client: RpcClient::new(Url::parse(rpc_addr)?).await? };
match options.command {
Some(CliDaoSubCommands::Hello {}) => {
let reply = client.say_hello().await?;
@@ -75,11 +71,5 @@ async fn main() -> Result<()> {
//let config = Config::<DrkConfig>::load(config_path)?;
let executor = Arc::new(Executor::new());
let task = executor.spawn(start(args, executor.clone()));
// Run the executor until the task completes.
future::block_on(executor.run(task))?;
Ok(())
start(args).await
}

View File

@@ -40,8 +40,8 @@ struct DnetView {
}
impl DnetView {
async fn new(url: Url, name: String, executor: Arc<Executor<'_>>) -> Result<Self> {
let rpc_client = RpcClient::new(url, executor).await?;
async fn new(url: Url, name: String) -> Result<Self> {
let rpc_client = RpcClient::new(url).await?;
Ok(Self { name, rpc_client })
}
@@ -118,8 +118,7 @@ async fn poll_and_update_model(
model: Arc<Model>,
) -> DnetViewResult<()> {
for node in &config.nodes {
let client =
DnetView::new(Url::parse(&node.rpc_url)?, node.name.clone(), ex.clone()).await?;
let client = DnetView::new(Url::parse(&node.rpc_url)?, node.name.clone()).await?;
ex.spawn(poll(client, model.clone())).detach();
}
Ok(())

View File

@@ -1,10 +1,6 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::error;
use serde_json::json;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use url::Url;
@@ -30,9 +26,8 @@ use primitives::{TaskEvent, TaskInfo};
use util::{desc_in_editor, CONFIG_FILE, CONFIG_FILE_CONTENTS};
use view::{comments_as_string, print_list_of_task, print_task_info};
async fn start(mut options: cli::CliTau, executor: Arc<Executor<'_>>) -> Result<()> {
let rpc_client =
Rpc { client: RpcClient::new(Url::parse(&options.rpc_listen)?, executor).await? };
async fn start(mut options: cli::CliTau) -> Result<()> {
let rpc_client = Rpc { client: RpcClient::new(Url::parse(&options.rpc_listen)?).await? };
let states: Vec<String> = vec!["stop".into(), "open".into(), "pause".into()];
@@ -117,7 +112,8 @@ async fn start(mut options: cli::CliTau, executor: Arc<Executor<'_>>) -> Result<
Ok(())
}
fn main() -> Result<()> {
#[async_std::main]
async fn main() -> Result<()> {
let args = cli::CliTau::from_args_with_toml("").unwrap();
let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
spawn_config(&cfg_path, CONFIG_FILE_CONTENTS.as_bytes())?;
@@ -126,12 +122,5 @@ fn main() -> Result<()> {
let (lvl, conf) = log_config(args.verbose.into())?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;
let executor = Arc::new(Executor::new());
let task = executor.spawn(start(args, executor.clone()));
// Run the executor until the task completes.
future::block_on(executor.run(task))?;
Ok(())
start(args).await
}

View File

@@ -1,7 +1,5 @@
use async_std::sync::Arc;
use std::{env, str, time::Duration};
use async_executor::Executor;
use async_std::io::timeout;
use futures::{select, AsyncReadExt, AsyncWriteExt, FutureExt};
use log::error;
@@ -198,7 +196,6 @@ async fn reqrep_loop<T: TransportStream>(
pub async fn open_channels(
uri: &Url,
executor: Arc<Executor<'_>>,
) -> Result<(
async_channel::Sender<Value>,
async_channel::Receiver<JsonResult>,
@@ -228,14 +225,12 @@ pub async fn open_channels(
match $upgrade {
None => {
executor
.spawn(reqrep_loop(stream, result_sender, data_receiver, stop_receiver))
smol::spawn(reqrep_loop(stream, result_sender, data_receiver, stop_receiver))
.detach();
}
Some(u) if u == "tls" => {
let stream = $transport.upgrade_dialer(stream)?.await?;
executor
.spawn(reqrep_loop(stream, result_sender, data_receiver, stop_receiver))
smol::spawn(reqrep_loop(stream, result_sender, data_receiver, stop_receiver))
.detach();
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
@@ -272,9 +267,7 @@ pub async fn open_channels(
return Err(Error::ConnectFailed)
}
executor
.spawn(reqrep_loop(stream?, result_sender, data_receiver, stop_receiver))
.detach();
smol::spawn(reqrep_loop(stream?, result_sender, data_receiver, stop_receiver)).detach();
}
_ => unimplemented!(),
}

View File

@@ -1,6 +1,3 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::{debug, error};
use serde_json::{json, Value};
use url::Url;
@@ -16,8 +13,8 @@ pub struct RpcClient {
}
impl RpcClient {
pub async fn new(url: Url, executor: Arc<Executor<'_>>) -> Result<Self> {
let (sender, receiver, stop_signal) = jsonrpc::open_channels(&url, executor).await?;
pub async fn new(url: Url) -> Result<Self> {
let (sender, receiver, stop_signal) = jsonrpc::open_channels(&url).await?;
Ok(Self { sender, receiver, stop_signal })
}