cashierd: switch to new rpcserver (tokio -> async_std)

This commit is contained in:
ghassmo
2021-09-23 13:34:07 +03:00
parent eebf9ddc5a
commit 37f7c2c550
2 changed files with 69 additions and 194 deletions

View File

@@ -5,6 +5,7 @@ use drk::{
rpc::{
jsonrpc::{error as jsonerr, response as jsonresp},
jsonrpc::{ErrorCode::*, JsonRequest, JsonResult},
rpcserver::{listen_and_serve, RequestHandler, RpcServerConfig},
},
serial::{deserialize, serialize},
service::{bridge, bridge::Bridge},
@@ -15,14 +16,11 @@ use drk::{
use clap::clap_app;
use log::*;
use serde::Serialize;
use serde_json::{json, Value};
use simplelog::{
CombinedLogger, Config as SimLogConfig, ConfigBuilder, LevelFilter, TermLogger, TerminalMode,
WriteLogger,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use async_executor::Executor;
use easy_parallel::Parallel;
@@ -30,7 +28,7 @@ use ff::Field;
use rand::rngs::OsRng;
use async_std::sync::{Arc, Mutex};
use sha2::{Digest, Sha256};
use async_trait::async_trait;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -42,10 +40,33 @@ struct Cashierd {
cashier_wallet: Arc<CashierDb>,
features: HashMap<String, String>,
client: Arc<Mutex<Client>>,
executor: Arc<Executor<'static>>,
}
#[async_trait]
impl RequestHandler for Cashierd {
// TODO: ServerError codes should be part of the lib.
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if req.params.as_array().is_none() {
return JsonResult::Err(jsonerr(InvalidParams, None, req.id));
}
debug!(target: "RPC", "--> {:#?}", serde_json::to_string(&req).unwrap());
match req.method.as_str() {
Some("deposit") => return self.deposit(req.id, req.params).await,
Some("withdraw") => return self.withdraw(req.id, req.params).await,
Some("features") => return self.features(req.id, req.params).await,
Some(_) => {}
None => {}
};
return JsonResult::Err(jsonerr(MethodNotFound, None, req.id));
}
}
impl Cashierd {
fn new(verbose: bool, config_path: PathBuf) -> Result<Self> {
fn new(verbose: bool, executor: Arc<Executor<'static>>, config_path: PathBuf) -> Result<Self> {
let mint_params_path = join_config_path(&PathBuf::from("cashier_mint.params"))?;
let spend_params_path = join_config_path(&PathBuf::from("cashier_spend.params"))?;
@@ -89,10 +110,11 @@ impl Cashierd {
cashier_wallet,
features,
client: client.clone(),
executor: executor.clone(),
})
}
async fn start(&self, executor: Arc<Executor<'_>>) -> Result<()> {
async fn start(&self) -> Result<()> {
self.cashier_wallet.init_db()?;
let bridge = Bridge::new();
@@ -121,26 +143,27 @@ impl Cashierd {
let _btc_client = BtcClient::new(btc_endpoint);
//bridge.add_clients(sol_client).await?;
}
_ => return Err(Error::NotSupportedNetwork),
_ => {
warn!("No feature enabled for {} network", feature_name);
}
}
}
self.client.lock().await.start().await?;
let (notify, recv_coin) = async_channel::unbounded::<(jubjub::SubgroupPoint, u64)>();
let cashier_client_subscriber_task =
executor.spawn(Client::connect_to_subscriber_from_cashier(
self.client.clone(),
executor.clone(),
self.cashier_wallet.clone(),
notify.clone(),
));
self.executor
.spawn(Client::connect_to_subscriber_from_cashier(
self.client.clone(),
self.executor.clone(),
self.cashier_wallet.clone(),
notify.clone(),
));
let cashier_wallet = self.cashier_wallet.clone();
let ex = executor.clone();
let listen_for_receiving_coins_task = executor.spawn(async move {
let ex = self.executor.clone();
let listen_for_receiving_coins_task = self.executor.spawn(async move {
loop {
Self::listen_for_receiving_coins(
ex.clone(),
@@ -153,8 +176,14 @@ impl Cashierd {
}
});
let rpc_url = self.config.rpc_url.clone();
run_rpc_server(executor.clone(), self.clone(), rpc_url).await?;
let cfg = RpcServerConfig {
socket_addr: self.config.clone().rpc_url,
use_tls: self.config.use_tls,
identity_path: self.config.clone().tls_identity_path,
identity_pass: self.config.clone().tls_identity_password,
};
listen_and_serve(cfg, self.clone()).await?;
listen_for_receiving_coins_task.cancel().await;
cashier_client_subscriber_task.cancel().await;
@@ -206,25 +235,7 @@ impl Cashierd {
Ok(())
}
async fn handle_request(self, executor: Arc<Executor<'_>>, req: JsonRequest) -> JsonResult {
if req.params.as_array().is_none() {
return JsonResult::Err(jsonerr(InvalidParams, None, req.id));
}
debug!(target: "RPC", "--> {:#?}", serde_json::to_string(&req).unwrap());
match req.method.as_str() {
Some("deposit") => return self.deposit(executor.clone(), req.id, req.params).await,
Some("withdraw") => return self.withdraw(req.id, req.params).await,
Some("features") => return self.features(req.id, req.params).await,
Some(_) => {}
None => {}
};
return JsonResult::Err(jsonerr(MethodNotFound, None, req.id));
}
async fn deposit(self, executor: Arc<Executor<'_>>, id: Value, params: Value) -> JsonResult {
async fn deposit(&self, id: Value, params: Value) -> JsonResult {
debug!(target: "CASHIER DAEMON", "RECEIVED DEPOSIT REQUEST");
let args: &Vec<serde_json::Value>;
@@ -248,7 +259,7 @@ impl Cashierd {
}
let result: Result<String> = async {
let asset_id = Self::parse_id(token_id)?;
let asset_id = drk::util::parse_id(token_id)?;
let drk_pub_key = bs58::decode(&drk_pub_key.to_string()).into_vec()?;
let drk_pub_key: jubjub::SubgroupPoint = deserialize(&drk_pub_key)?;
@@ -258,7 +269,8 @@ impl Cashierd {
.cashier_wallet
.get_deposit_token_keys_by_dkey_public(&drk_pub_key, &asset_id)?;
let bridge_subscribtion = self.bridge.subscribe(executor.clone()).await;
let bridge = self.bridge.clone();
let bridge_subscribtion = bridge.subscribe(self.executor.clone()).await;
bridge_subscribtion
.sender
@@ -295,44 +307,9 @@ impl Cashierd {
}
}
// here we hash the alphanumeric token ID. if it fails, we change the last 4 bytes and hash it
// again, and keep repeating until it works.
fn parse_id(token: &Value) -> Result<jubjub::Fr> {
let tkn_str = token.as_str().unwrap();
if bs58::decode(tkn_str).into_vec().is_err() {
// TODO: make this an error
debug!(target: "CASHIER", "COULD NOT DECODE STR");
}
let mut data = bs58::decode(tkn_str).into_vec().unwrap();
let token_id = deserialize::<jubjub::Fr>(&data);
if token_id.is_err() {
let mut counter = 0;
loop {
data.truncate(28);
let serialized_counter = serialize(&counter);
data.extend(serialized_counter.iter());
let mut hasher = Sha256::new();
hasher.update(&data);
let hash = hasher.finalize();
let token_id = deserialize::<jubjub::Fr>(&hash);
if token_id.is_err() {
counter += 1;
continue;
}
debug!(target: "CASHIER", "DESERIALIZATION SUCCESSFUL");
let tkn = token_id.unwrap();
return Ok(tkn);
}
}
unreachable!();
}
async fn withdraw(self, id: Value, params: Value) -> JsonResult {
async fn withdraw(&self, id: Value, params: Value) -> JsonResult {
debug!(target: "CASHIER DAEMON", "RECEIVED DEPOSIT REQUEST");
// TODO Cashier checks if they support the network, and if so,
// return adeposit address.
let args: &Vec<serde_json::Value>;
if let Some(ar) = params.as_array() {
@@ -355,7 +332,7 @@ impl Cashierd {
}
let result: Result<String> = async {
let asset_id = Self::parse_id(&token)?;
let asset_id = drk::util::parse_id(&token)?;
let address = serialize(&address.to_string());
let cashier_public: jubjub::SubgroupPoint;
@@ -389,66 +366,11 @@ impl Cashierd {
}
}
async fn features(self, id: Value, _params: Value) -> JsonResult {
async fn features(&self, id: Value, _params: Value) -> JsonResult {
JsonResult::Resp(jsonresp(json!(self.features), id))
}
}
async fn run_rpc_server(
executor: Arc<Executor<'_>>,
cashierd: Cashierd,
rpc_url: String,
) -> Result<()> {
let listener = TcpListener::bind(rpc_url.clone()).await?;
debug!(target: "RPC SERVER", "Listening on {}", rpc_url);
loop {
debug!(target: "RPC SERVER", "waiting for client");
let (mut socket, _) = listener.accept().await?;
debug!(target: "RPC SERVER", "accepted client");
let cashierd = cashierd.clone();
let ex = executor.clone();
executor.spawn(async move {
let mut buf = [0; 2048];
loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => {
debug!(target: "RPC SERVER", "closed connection");
return;
}
Ok(n) => n,
Err(e) => {
debug!(target: "RPC SERVER", "failed to read from socket; err = {:?}", e);
return;
}
};
let r: JsonRequest = match serde_json::from_slice(&buf[0..n]) {
Ok(r) => r,
Err(e) => {
debug!(target: "RPC SERVER", "received invalid json; err = {:?}", e);
return;
}
};
let reply = cashierd.clone().handle_request(ex.clone(), r).await;
let j = serde_json::to_string(&reply).unwrap();
debug!(target: "RPC", "<-- {:#?}", j);
// Write the data back
if let Err(e) = socket.write_all(j.as_bytes()).await {
debug!(target: "RPC SERVER", "failed to write to socket; err = {:?}", e);
return;
}
}
}).await;
}
}
fn main() -> Result<()> {
let args = clap_app!(cashierd =>
(@arg CONFIG: -c --config +takes_value "Sets a custom config file")
@@ -464,7 +386,9 @@ fn main() -> Result<()> {
config_path = join_config_path(&PathBuf::from("cashierd.toml"))?;
}
let cashierd = Cashierd::new(args.clone().is_present("verbose"), config_path)?;
let ex = Arc::new(Executor::new());
let cashierd = Cashierd::new(args.clone().is_present("verbose"), ex.clone(), config_path)?;
let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build();
let debug_level = if args.is_present("verbose") {
@@ -484,8 +408,6 @@ fn main() -> Result<()> {
])
.unwrap();
let ex = Arc::new(Executor::new());
let ex2 = ex.clone();
let (signal, shutdown) = async_channel::unbounded::<()>();
let cashierd2 = cashierd.clone();
@@ -495,7 +417,7 @@ fn main() -> Result<()> {
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
cashierd2.start(ex2).await?;
cashierd2.start().await?;
drop(signal);
Ok::<(), Error>(())
})
@@ -503,60 +425,3 @@ fn main() -> Result<()> {
Ok(())
}
#[cfg(test)]
mod tests {
use drk::serial::{deserialize, serialize};
use sha2::{Digest, Sha256};
#[test]
fn test_jubjub_parsing() {
// 1. counter = 0
// 2. serialized_counter = serialize(counter)
// 3. asset_id_data = hash(data + serialized_counter)
// 4. asset_id = deserialize(asset_id_data)
// 5. test parse
// 6. loop
let tkn_str = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";
println!("{}", tkn_str);
if bs58::decode(tkn_str).into_vec().is_err() {
println!("Could not decode str into vec");
}
let mut data = bs58::decode(tkn_str).into_vec().unwrap();
println!("{:?}", data);
let mut hasher = Sha256::new();
hasher.update(&data);
let hash = hasher.finalize();
let token_id = deserialize::<jubjub::Fr>(&hash);
println!("{:?}", token_id);
let mut counter = 0;
if token_id.is_err() {
println!("could not deserialize tkn 58");
loop {
println!("TOKEN IS NONE. COMMENCING LOOP");
counter += 1;
println!("LOOP NUMBER {}", counter);
println!("{:?}", data.len());
data.truncate(28);
let serialized_counter = serialize(&counter);
println!("{:?}", serialized_counter);
data.extend(serialized_counter.iter());
println!("{:?}", data.len());
let mut hasher = Sha256::new();
hasher.update(&data);
let hash = hasher.finalize();
let token_id = deserialize::<jubjub::Fr>(&hash);
println!("{:?}", token_id);
if token_id.is_err() {
continue;
}
if counter > 10 {
break;
}
println!("deserialization successful");
token_id.unwrap();
break;
}
};
}
}

View File

@@ -104,6 +104,16 @@ pub struct CashierdConfig {
#[serde(rename = "password")]
pub password: String,
#[serde(rename = "use_tls")]
pub use_tls: bool,
#[serde(rename = "tls_identity_path")]
pub tls_identity_path: String,
#[serde(rename = "tls_identity_password")]
pub tls_identity_password: String,
#[serde(rename = "client_password")]
pub client_password: String,