diff --git a/src/bin/cashierd.rs b/src/bin/cashierd.rs index 9d5153f3c..a6f5a7ce6 100644 --- a/src/bin/cashierd.rs +++ b/src/bin/cashierd.rs @@ -1,4 +1,3 @@ -use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; use clap::clap_app; @@ -408,7 +407,6 @@ impl Cashierd { async fn start( &mut self, mut client: Client, - executor: Arc>, ) -> Result<( smol::Task>, smol::Task>, @@ -479,7 +477,7 @@ impl Cashierd { } } - let resume_watch_deposit_keys_task = executor.spawn(Self::resume_watch_deposit_keys( + let resume_watch_deposit_keys_task = smol::spawn(Self::resume_watch_deposit_keys( self.bridge.clone(), self.cashier_wallet.clone(), self.features.clone(), @@ -493,7 +491,6 @@ impl Cashierd { .connect_to_subscriber_from_cashier( self.cashier_wallet.clone(), notify.clone(), - executor.clone(), ) .await?; @@ -561,7 +558,6 @@ async fn main() -> Result<()> { }; simple_logger::init_with_level(loglevel)?; - let ex = Arc::new(Executor::new()); let mut cashierd = Cashierd::new(config_path).await?; let client_wallet = WalletDb::new( @@ -592,7 +588,7 @@ async fn main() -> Result<()> { identity_pass: cashierd.config.tls_identity_password.clone(), }; - let (t1, t2, t3) = cashierd.start(client, ex.clone()).await?; + let (t1, t2, t3) = cashierd.start(client).await?; listen_and_serve(cfg, Arc::new(cashierd)).await?; t1.cancel().await; diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 4d33ae8a2..54cdb3cc7 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -1,12 +1,11 @@ -use async_executor::Executor; use async_trait::async_trait; use clap::clap_app; use log::debug; use serde_json::{json, Value}; -use async_std::sync::Arc; +use async_std::sync::{Arc, Mutex}; use std::path::PathBuf; -//use std::sync::Arc; +use std::str::FromStr; use drk::{ blockchain::Rocks, @@ -17,15 +16,18 @@ use drk::{ jsonrpc::{ErrorCode::*, JsonRequest, JsonResult}, rpcserver::{listen_and_serve, RequestHandler, RpcServerConfig}, }, - serial::serialize, - util::{assign_id, decimals, decode_base10, expand_path, join_config_path, TokenList}, + serial::{deserialize, serialize}, + util::{ + assign_id, decimals, decode_base10, expand_path, generate_id, join_config_path, + NetworkName, TokenList, + }, wallet::WalletDb, Result, }; struct Darkfid { config: DarkfidConfig, - client: Client, + client: Arc>, tokenlist: TokenList, } @@ -78,6 +80,8 @@ impl Darkfid { ) .await?; + let client = Arc::new(Mutex::new(client)); + let tokenlist = TokenList::new()?; Ok(Self { @@ -87,9 +91,13 @@ impl Darkfid { }) } - async fn start(&mut self, executor: Arc>) -> Result<()> { - self.client.start().await?; - self.client.connect_to_subscriber(executor).await?; + async fn start(&mut self) -> Result<()> { + self.client.lock().await.start().await?; + self.client + .lock() + .await + .connect_to_subscriber() + .await?; Ok(()) } @@ -103,7 +111,7 @@ impl Darkfid { // --> {"method": "create_wallet", "params": []} // <-- {"result": true} async fn create_wallet(&self, id: Value, _params: Value) -> JsonResult { - match self.client.init_db().await { + match self.client.lock().await.init_db().await { Ok(()) => return JsonResult::Resp(jsonresp(json!(true), id)), Err(e) => { return JsonResult::Err(jsonerr(ServerError(-32001), Some(e.to_string()), id)) @@ -114,7 +122,7 @@ impl Darkfid { // --> {"method": "key_gen", "params": []} // <-- {"result": true} async fn key_gen(&self, id: Value, _params: Value) -> JsonResult { - match self.client.key_gen().await { + match self.client.lock().await.key_gen().await { Ok(()) => return JsonResult::Resp(jsonresp(json!(true), id)), Err(e) => { return JsonResult::Err(jsonerr(ServerError(-32002), Some(e.to_string()), id)) @@ -125,7 +133,7 @@ impl Darkfid { // --> {"method": "get_key", "params": []} // <-- {"result": "vdNS7oBj7KvsMWWmo9r96SV4SqATLrGsH2a3PGpCfJC"} async fn get_key(&self, id: Value, _params: Value) -> JsonResult { - let pk = self.client.main_keypair.public; + let pk = self.client.lock().await.main_keypair.public; let b58 = bs58::encode(serialize(&pk)).into_string(); return JsonResult::Resp(jsonresp(json!(b58), id)); } @@ -219,7 +227,7 @@ impl Darkfid { // TODO: Optional sanity checking here, but cashier *must* do so too. - let pk = self.client.main_keypair.public; + let pk = self.client.lock().await.main_keypair.public; let pubkey = bs58::encode(serialize(&pk)).into_string(); // Send request to cashier. If the cashier supports the requested network @@ -311,7 +319,7 @@ impl Darkfid { json!("withdraw"), json!([network, token_id, address, amount_in_apo]), ); - let rep: JsonResult; + let mut rep: JsonResult; match send_request(&self.config.cashier_rpc_url, json!(req)).await { Ok(v) => rep = v, Err(e) => { @@ -320,6 +328,45 @@ impl Darkfid { } } + // send drk to cashier_public + if let JsonResult::Resp(cashier_public) = &rep { + let result: Result<()> = async { + let cashier_public = cashier_public.result.as_str().unwrap(); + + let cashier_public: jubjub::SubgroupPoint = + deserialize(&bs58::decode(cashier_public).into_vec()?)?; + + self.client + .lock() + .await + .send( + cashier_public, + amount_in_apo, + generate_id(&token_id, &NetworkName::from_str(network)?)?, + true, + ) + .await?; + + Ok(()) + } + .await; + + match result { + Err(e) => { + rep = JsonResult::Err(jsonerr(InternalError, Some(e.to_string()), id.clone())) + } + Ok(_) => { + rep = JsonResult::Resp(jsonresp( + json!(format!( + "Sent request to withdraw {} amount of {}", + amount, token_id + )), + json!(id.clone()), + )) + } + } + }; + match rep { JsonResult::Resp(r) => return JsonResult::Resp(r), JsonResult::Err(e) => return JsonResult::Err(e), @@ -414,8 +461,6 @@ async fn main() -> Result<()> { simple_logger::init_with_level(loglevel)?; - let ex = Arc::new(Executor::new()); - let mut darkfid = Darkfid::new(config_path).await?; let server_config = RpcServerConfig { @@ -425,6 +470,6 @@ async fn main() -> Result<()> { identity_pass: darkfid.config.tls_identity_password.clone(), }; - darkfid.start(ex.clone()).await?; + darkfid.start().await?; listen_and_serve(server_config, Arc::new(darkfid)).await } diff --git a/src/bin/drk.rs b/src/bin/drk.rs index 8e009a2fd..aaaa7a43e 100644 --- a/src/bin/drk.rs +++ b/src/bin/drk.rs @@ -188,7 +188,7 @@ async fn start(config: &DrkConfig, options: ArgMatches<'_>) -> Result<()> { let reply = client.withdraw(&network, &token, &address, amount).await?; - println!("Transaction ID: {}", &reply.to_string()); + println!("{}", &reply.to_string()); return Ok(()); } diff --git a/src/client.rs b/src/client.rs index db0ea9c05..126382e16 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,3 @@ -use async_executor::Executor; use async_std::sync::{Arc, Mutex}; use bellman::groth16; use bls12_381::Bls12; @@ -238,12 +237,11 @@ impl Client { &self, cashier_wallet: CashierDbPtr, notify: async_channel::Sender<(jubjub::SubgroupPoint, u64)>, - executor: Arc>, ) -> Result<()> { // start subscribing - debug!(target: "CLIENT", "Start subscriber"); + debug!(target: "CLIENT", "Start subscriber for cashier"); let gateway_slabs_sub: GatewaySlabsSubscriber = - self.gateway.start_subscriber(executor.clone()).await?; + self.gateway.start_subscriber().await?; let secret_key = self.main_keypair.private; let state = self.state.clone(); @@ -269,14 +267,16 @@ impl Client { task.detach(); + debug!(target: "CLIENT", "End subscriber for cashier"); + Ok(()) } - pub async fn connect_to_subscriber(&self, executor: Arc>) -> Result<()> { + pub async fn connect_to_subscriber(&self) -> Result<()> { // start subscribing debug!(target: "CLIENT", "Start subscriber"); let gateway_slabs_sub: GatewaySlabsSubscriber = - self.gateway.start_subscriber(executor.clone()).await?; + self.gateway.start_subscriber().await?; let (notify, _) = async_channel::unbounded::<(jubjub::SubgroupPoint, u64)>(); diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 13e904b0e..ad21d284d 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -282,19 +282,15 @@ impl GatewayClient { self.slabstore.clone() } - pub async fn start_subscriber( - &self, - executor: Arc>, - ) -> Result { + pub async fn start_subscriber(&self) -> Result { let mut subscriber = Subscriber::new(self.sub_addr, String::from("GATEWAY CLIENT")); subscriber.start().await?; - executor - .spawn(Self::subscribe_loop( - subscriber, - self.slabstore.clone(), - self.gateway_slabs_sub_s.clone(), - )) - .detach(); + smol::spawn(Self::subscribe_loop( + subscriber, + self.slabstore.clone(), + self.gateway_slabs_sub_s.clone(), + )) + .detach(); Ok(self.gateway_slabs_sub_rv.clone()) }