diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 2a9b65e0c..1fb5faece 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -12,7 +12,7 @@ use drk::crypto::{ use drk::rpc::adapter::RpcAdapter; use drk::rpc::jsonserver; use drk::serial::{deserialize, Decodable, Encodable}; -use drk::service::{GatewayClient, GatewaySlabsSubscriber}; +use drk::service::{CashierClient, GatewayClient, GatewaySlabsSubscriber}; use drk::state::{state_transition, ProgramState, StateUpdate}; use drk::util::join_config_path; use drk::wallet::{WalletDb, WalletPtr}; @@ -199,16 +199,27 @@ async fn start(executor: Arc>, config: Arc<&DarkfidConfig>) -> Resu debug!(target: "Client", "Creating client"); let mut client = GatewayClient::new(connect_addr, slabstore)?; + // create gateway client + debug!(target: "Cashier Client", "Creating cashier client"); + let mut cashier_client = CashierClient::new("127.0.0.1:7777".parse()?)?; + debug!(target: "Gateway", "Start subscriber"); // start subscribing let gateway_slabs_sub: GatewaySlabsSubscriber = client.start_subscriber(sub_addr, executor.clone()).await?; + // Channels to handle transfer request from adapter let (publish_tx_send, publish_tx_recv) = async_channel::unbounded::(); + // Channels to handle deposit request from adapter and send cashier public key + let (deposit_send, deposit_recv) = async_channel::unbounded::(); + let (cashier_deposit_addr_send, cashier_deposit_addr_recv) = + async_channel::unbounded::>(); + // start gateway client debug!(target: "fn::start client", "start() Client started"); client.start().await?; + cashier_client.start().await?; executor .spawn(async move { @@ -221,6 +232,10 @@ async fn start(executor: Arc>, config: Arc<&DarkfidConfig>) -> Resu let update = state_transition(&state, tx).unwrap(); state.apply(update).await.unwrap(); } + deposit_addr = deposit_recv.recv().fuse() => { + let cashier_public = cashier_client.get_address(deposit_addr.unwrap()).await.unwrap(); + cashier_deposit_addr_send.send(cashier_public).await.unwrap(); + } transfer_params = publish_tx_recv.recv().fuse() => { let transfer_params = transfer_params.unwrap(); @@ -270,7 +285,11 @@ async fn start(executor: Arc>, config: Arc<&DarkfidConfig>) -> Resu }) .detach(); - let adapter = RpcAdapter::new(wallet.clone(), config.connect_url.clone(), publish_tx_send)?; + let adapter = RpcAdapter::new( + wallet.clone(), + publish_tx_send, + (deposit_send, cashier_deposit_addr_recv), + )?; // start the rpc server jsonserver::start(ex.clone(), config.clone(), adapter).await?; diff --git a/src/rpc/adapter.rs b/src/rpc/adapter.rs index 4f78be125..34ca38cd4 100644 --- a/src/rpc/adapter.rs +++ b/src/rpc/adapter.rs @@ -1,39 +1,36 @@ use crate::cli::TransferParams; -use crate::cli::WithdrawParams; use crate::serial::serialize; use crate::service::btc::PubAddress; -use crate::service::cashier::CashierClient; use crate::wallet::WalletDb; use crate::{Error, Result}; use log::*; use async_std::sync::Arc; -use std::net::SocketAddr; pub type AdapterPtr = Arc; +pub type DepositChannel = ( + async_channel::Sender, + async_channel::Receiver>, +); pub struct RpcAdapter { pub wallet: Arc, - pub cashier_client: CashierClient, - pub connect_url: String, publish_tx_send: async_channel::Sender, + deposit_channel: DepositChannel, } impl RpcAdapter { pub fn new( wallet: Arc, - connect_url: String, publish_tx_send: async_channel::Sender, + deposit_channel: DepositChannel ) -> Result { debug!(target: "ADAPTER", "new() [CREATING NEW WALLET]"); - let connect_addr: SocketAddr = connect_url.parse()?; - let cashier_client = CashierClient::new(connect_addr)?; Ok(Self { wallet, - cashier_client, - connect_url, publish_tx_send, + deposit_channel, }) } @@ -90,7 +87,8 @@ impl RpcAdapter { let (public, private) = self.wallet.key_gen(); self.wallet.put_keypair(public, private)?; let dkey = self.wallet.get_public()?; - match self.cashier_client.get_address(dkey).await? { + self.deposit_channel.0.send(dkey).await?; + match self.deposit_channel.1.recv().await? { Some(key) => Ok(key), None => Err(Error::CashierNoReply), } diff --git a/src/rpc/jsonserver.rs b/src/rpc/jsonserver.rs index 627ade461..b8b35756c 100644 --- a/src/rpc/jsonserver.rs +++ b/src/rpc/jsonserver.rs @@ -243,8 +243,11 @@ impl RpcInterface { let self2 = self1.clone(); async move { println!("Deposit initiated"); - let btckey = self2.adapter.deposit().await?; - Ok(jsonrpc_core::Value::String(format!("Send testnet BTC to: {}", btckey))) + let btckey = self2.adapter.deposit().await?; + Ok(jsonrpc_core::Value::String(format!( + "Send testnet BTC to: {}", + btckey + ))) } }); diff --git a/src/service/cashier.rs b/src/service/cashier.rs index cf4136bd1..07aff4cb4 100644 --- a/src/service/cashier.rs +++ b/src/service/cashier.rs @@ -15,7 +15,6 @@ use async_executor::Executor; use async_std::sync::Arc; use log::*; use std::net::SocketAddr; -use async_std::sync::Mutex; #[repr(u8)] enum CashierError { @@ -172,29 +171,27 @@ impl CashierService { } pub struct CashierClient { - protocol: Mutex, + protocol: ReqProtocol, } impl CashierClient { pub fn new(addr: SocketAddr) -> Result { - let protocol = Mutex::new(ReqProtocol::new(addr, String::from("CASHIER CLIENT"))); + let protocol = ReqProtocol::new(addr, String::from("CASHIER CLIENT")); Ok(CashierClient { protocol }) } - pub async fn start(&self) -> Result<()> { + pub async fn start(&mut self) -> Result<()> { debug!(target: "Cashier", "Start CashierClient"); - self.protocol.lock().await.start().await?; + self.protocol.start().await?; Ok(()) } - pub async fn get_address(&self, index: jubjub::SubgroupPoint) -> Result> { + pub async fn get_address(&mut self, index: jubjub::SubgroupPoint) -> Result> { let handle_error = Arc::new(handle_error); let rep = self .protocol - .lock() - .await .request( CashierCommand::GetDBTC as u8, serialize(&index),