diff --git a/src/bin/cashierd.rs b/src/bin/cashierd.rs index ca6656d90..52c3c14ee 100644 --- a/src/bin/cashierd.rs +++ b/src/bin/cashierd.rs @@ -5,6 +5,7 @@ use drk::{ rpc::{ jsonrpc::{error as jsonerr, response as jsonresp}, jsonrpc::{ErrorCode::*, JsonRequest, JsonResult}, + rpcserver::{listen_and_serve, RequestHandler, RpcServerConfig}, }, serial::{deserialize, serialize}, service::{bridge, bridge::Bridge}, @@ -15,14 +16,11 @@ use drk::{ use clap::clap_app; use log::*; -use serde::Serialize; use serde_json::{json, Value}; use simplelog::{ CombinedLogger, Config as SimLogConfig, ConfigBuilder, LevelFilter, TermLogger, TerminalMode, WriteLogger, }; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpListener; use async_executor::Executor; use easy_parallel::Parallel; @@ -30,7 +28,7 @@ use ff::Field; use rand::rngs::OsRng; use async_std::sync::{Arc, Mutex}; -use sha2::{Digest, Sha256}; +use async_trait::async_trait; use std::collections::HashMap; use std::path::PathBuf; @@ -42,10 +40,33 @@ struct Cashierd { cashier_wallet: Arc, features: HashMap, client: Arc>, + executor: Arc>, +} + +#[async_trait] +impl RequestHandler for Cashierd { + // TODO: ServerError codes should be part of the lib. + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + if req.params.as_array().is_none() { + return JsonResult::Err(jsonerr(InvalidParams, None, req.id)); + } + + debug!(target: "RPC", "--> {:#?}", serde_json::to_string(&req).unwrap()); + + match req.method.as_str() { + Some("deposit") => return self.deposit(req.id, req.params).await, + Some("withdraw") => return self.withdraw(req.id, req.params).await, + Some("features") => return self.features(req.id, req.params).await, + Some(_) => {} + None => {} + }; + + return JsonResult::Err(jsonerr(MethodNotFound, None, req.id)); + } } impl Cashierd { - fn new(verbose: bool, config_path: PathBuf) -> Result { + fn new(verbose: bool, executor: Arc>, config_path: PathBuf) -> Result { let mint_params_path = join_config_path(&PathBuf::from("cashier_mint.params"))?; let spend_params_path = join_config_path(&PathBuf::from("cashier_spend.params"))?; @@ -89,10 +110,11 @@ impl Cashierd { cashier_wallet, features, client: client.clone(), + executor: executor.clone(), }) } - async fn start(&self, executor: Arc>) -> Result<()> { + async fn start(&self) -> Result<()> { self.cashier_wallet.init_db()?; let bridge = Bridge::new(); @@ -121,26 +143,27 @@ impl Cashierd { let _btc_client = BtcClient::new(btc_endpoint); //bridge.add_clients(sol_client).await?; } - _ => return Err(Error::NotSupportedNetwork), + _ => { + warn!("No feature enabled for {} network", feature_name); + } } } self.client.lock().await.start().await?; let (notify, recv_coin) = async_channel::unbounded::<(jubjub::SubgroupPoint, u64)>(); - let cashier_client_subscriber_task = - executor.spawn(Client::connect_to_subscriber_from_cashier( - self.client.clone(), - executor.clone(), - self.cashier_wallet.clone(), - notify.clone(), - )); + self.executor + .spawn(Client::connect_to_subscriber_from_cashier( + self.client.clone(), + self.executor.clone(), + self.cashier_wallet.clone(), + notify.clone(), + )); let cashier_wallet = self.cashier_wallet.clone(); - - let ex = executor.clone(); - let listen_for_receiving_coins_task = executor.spawn(async move { + let ex = self.executor.clone(); + let listen_for_receiving_coins_task = self.executor.spawn(async move { loop { Self::listen_for_receiving_coins( ex.clone(), @@ -153,8 +176,14 @@ impl Cashierd { } }); - let rpc_url = self.config.rpc_url.clone(); - run_rpc_server(executor.clone(), self.clone(), rpc_url).await?; + let cfg = RpcServerConfig { + socket_addr: self.config.clone().rpc_url, + use_tls: self.config.use_tls, + identity_path: self.config.clone().tls_identity_path, + identity_pass: self.config.clone().tls_identity_password, + }; + + listen_and_serve(cfg, self.clone()).await?; listen_for_receiving_coins_task.cancel().await; cashier_client_subscriber_task.cancel().await; @@ -206,25 +235,7 @@ impl Cashierd { Ok(()) } - async fn handle_request(self, executor: Arc>, req: JsonRequest) -> JsonResult { - if req.params.as_array().is_none() { - return JsonResult::Err(jsonerr(InvalidParams, None, req.id)); - } - - debug!(target: "RPC", "--> {:#?}", serde_json::to_string(&req).unwrap()); - - match req.method.as_str() { - Some("deposit") => return self.deposit(executor.clone(), req.id, req.params).await, - Some("withdraw") => return self.withdraw(req.id, req.params).await, - Some("features") => return self.features(req.id, req.params).await, - Some(_) => {} - None => {} - }; - - return JsonResult::Err(jsonerr(MethodNotFound, None, req.id)); - } - - async fn deposit(self, executor: Arc>, id: Value, params: Value) -> JsonResult { + async fn deposit(&self, id: Value, params: Value) -> JsonResult { debug!(target: "CASHIER DAEMON", "RECEIVED DEPOSIT REQUEST"); let args: &Vec; @@ -248,7 +259,7 @@ impl Cashierd { } let result: Result = async { - let asset_id = Self::parse_id(token_id)?; + let asset_id = drk::util::parse_id(token_id)?; let drk_pub_key = bs58::decode(&drk_pub_key.to_string()).into_vec()?; let drk_pub_key: jubjub::SubgroupPoint = deserialize(&drk_pub_key)?; @@ -258,7 +269,8 @@ impl Cashierd { .cashier_wallet .get_deposit_token_keys_by_dkey_public(&drk_pub_key, &asset_id)?; - let bridge_subscribtion = self.bridge.subscribe(executor.clone()).await; + let bridge = self.bridge.clone(); + let bridge_subscribtion = bridge.subscribe(self.executor.clone()).await; bridge_subscribtion .sender @@ -295,44 +307,9 @@ impl Cashierd { } } - // here we hash the alphanumeric token ID. if it fails, we change the last 4 bytes and hash it - // again, and keep repeating until it works. - fn parse_id(token: &Value) -> Result { - let tkn_str = token.as_str().unwrap(); - if bs58::decode(tkn_str).into_vec().is_err() { - // TODO: make this an error - debug!(target: "CASHIER", "COULD NOT DECODE STR"); - } - let mut data = bs58::decode(tkn_str).into_vec().unwrap(); - let token_id = deserialize::(&data); - if token_id.is_err() { - let mut counter = 0; - loop { - data.truncate(28); - let serialized_counter = serialize(&counter); - data.extend(serialized_counter.iter()); - let mut hasher = Sha256::new(); - hasher.update(&data); - let hash = hasher.finalize(); - let token_id = deserialize::(&hash); - if token_id.is_err() { - counter += 1; - continue; - } - debug!(target: "CASHIER", "DESERIALIZATION SUCCESSFUL"); - let tkn = token_id.unwrap(); - return Ok(tkn); - } - } - unreachable!(); - } - - async fn withdraw(self, id: Value, params: Value) -> JsonResult { + async fn withdraw(&self, id: Value, params: Value) -> JsonResult { debug!(target: "CASHIER DAEMON", "RECEIVED DEPOSIT REQUEST"); - // TODO Cashier checks if they support the network, and if so, - // return adeposit address. - let args: &Vec; if let Some(ar) = params.as_array() { @@ -355,7 +332,7 @@ impl Cashierd { } let result: Result = async { - let asset_id = Self::parse_id(&token)?; + let asset_id = drk::util::parse_id(&token)?; let address = serialize(&address.to_string()); let cashier_public: jubjub::SubgroupPoint; @@ -389,66 +366,11 @@ impl Cashierd { } } - async fn features(self, id: Value, _params: Value) -> JsonResult { + async fn features(&self, id: Value, _params: Value) -> JsonResult { JsonResult::Resp(jsonresp(json!(self.features), id)) } } -async fn run_rpc_server( - executor: Arc>, - cashierd: Cashierd, - rpc_url: String, -) -> Result<()> { - let listener = TcpListener::bind(rpc_url.clone()).await?; - debug!(target: "RPC SERVER", "Listening on {}", rpc_url); - loop { - debug!(target: "RPC SERVER", "waiting for client"); - - let (mut socket, _) = listener.accept().await?; - - debug!(target: "RPC SERVER", "accepted client"); - - let cashierd = cashierd.clone(); - let ex = executor.clone(); - executor.spawn(async move { - let mut buf = [0; 2048]; - - loop { - let n = match socket.read(&mut buf).await { - Ok(n) if n == 0 => { - debug!(target: "RPC SERVER", "closed connection"); - return; - } - Ok(n) => n, - Err(e) => { - debug!(target: "RPC SERVER", "failed to read from socket; err = {:?}", e); - return; - } - }; - - let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) { - Ok(r) => r, - Err(e) => { - debug!(target: "RPC SERVER", "received invalid json; err = {:?}", e); - return; - } - }; - - let reply = cashierd.clone().handle_request(ex.clone(), r).await; - let j = serde_json::to_string(&reply).unwrap(); - - debug!(target: "RPC", "<-- {:#?}", j); - - // Write the data back - if let Err(e) = socket.write_all(j.as_bytes()).await { - debug!(target: "RPC SERVER", "failed to write to socket; err = {:?}", e); - return; - } - } - }).await; - } -} - fn main() -> Result<()> { let args = clap_app!(cashierd => (@arg CONFIG: -c --config +takes_value "Sets a custom config file") @@ -464,7 +386,9 @@ fn main() -> Result<()> { config_path = join_config_path(&PathBuf::from("cashierd.toml"))?; } - let cashierd = Cashierd::new(args.clone().is_present("verbose"), config_path)?; + let ex = Arc::new(Executor::new()); + + let cashierd = Cashierd::new(args.clone().is_present("verbose"), ex.clone(), config_path)?; let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); let debug_level = if args.is_present("verbose") { @@ -484,8 +408,6 @@ fn main() -> Result<()> { ]) .unwrap(); - let ex = Arc::new(Executor::new()); - let ex2 = ex.clone(); let (signal, shutdown) = async_channel::unbounded::<()>(); let cashierd2 = cashierd.clone(); @@ -495,7 +417,7 @@ fn main() -> Result<()> { // Run the main future on the current thread. .finish(|| { smol::future::block_on(async move { - cashierd2.start(ex2).await?; + cashierd2.start().await?; drop(signal); Ok::<(), Error>(()) }) @@ -503,60 +425,3 @@ fn main() -> Result<()> { Ok(()) } - -#[cfg(test)] -mod tests { - use drk::serial::{deserialize, serialize}; - use sha2::{Digest, Sha256}; - - #[test] - fn test_jubjub_parsing() { - // 1. counter = 0 - // 2. serialized_counter = serialize(counter) - // 3. asset_id_data = hash(data + serialized_counter) - // 4. asset_id = deserialize(asset_id_data) - // 5. test parse - // 6. loop - let tkn_str = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"; - println!("{}", tkn_str); - if bs58::decode(tkn_str).into_vec().is_err() { - println!("Could not decode str into vec"); - } - let mut data = bs58::decode(tkn_str).into_vec().unwrap(); - println!("{:?}", data); - let mut hasher = Sha256::new(); - hasher.update(&data); - let hash = hasher.finalize(); - let token_id = deserialize::(&hash); - println!("{:?}", token_id); - let mut counter = 0; - if token_id.is_err() { - println!("could not deserialize tkn 58"); - loop { - println!("TOKEN IS NONE. COMMENCING LOOP"); - counter += 1; - println!("LOOP NUMBER {}", counter); - println!("{:?}", data.len()); - data.truncate(28); - let serialized_counter = serialize(&counter); - println!("{:?}", serialized_counter); - data.extend(serialized_counter.iter()); - println!("{:?}", data.len()); - let mut hasher = Sha256::new(); - hasher.update(&data); - let hash = hasher.finalize(); - let token_id = deserialize::(&hash); - println!("{:?}", token_id); - if token_id.is_err() { - continue; - } - if counter > 10 { - break; - } - println!("deserialization successful"); - token_id.unwrap(); - break; - } - }; - } -} diff --git a/src/cli/cli_config.rs b/src/cli/cli_config.rs index 066182dc9..7d525700a 100644 --- a/src/cli/cli_config.rs +++ b/src/cli/cli_config.rs @@ -104,6 +104,16 @@ pub struct CashierdConfig { #[serde(rename = "password")] pub password: String, + + #[serde(rename = "use_tls")] + pub use_tls: bool, + + #[serde(rename = "tls_identity_path")] + pub tls_identity_path: String, + + #[serde(rename = "tls_identity_password")] + pub tls_identity_password: String, + #[serde(rename = "client_password")] pub client_password: String,