diff --git a/src/service/eth.rs b/src/service/eth.rs index 949106525..5b26930c5 100644 --- a/src/service/eth.rs +++ b/src/service/eth.rs @@ -1,18 +1,27 @@ use async_std::sync::{Arc, Mutex}; use std::convert::TryInto; +use std::time::Duration; use async_executor::Executor; use async_trait::async_trait; use hash_db::Hasher; use keccak_hasher::KeccakHasher; use lazy_static::lazy_static; -use log::debug; +use log::{debug, error}; use num_bigint::{BigUint, RandBigInt}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion}; -use crate::{rpc::jsonrpc, rpc::jsonrpc::JsonResult, Error, Result}; +use crate::{ + rpc::jsonrpc, + rpc::jsonrpc::JsonResult, + serial::{deserialize, serialize}, + util::{generate_id, NetworkName}, + Error, Result, +}; + +pub const ETH_NATIVE_TOKEN_ID: &str = "0x0000000000000000000000000000000000000000"; // An ERC-20 token transfer transaction's data is as follows: // @@ -165,7 +174,7 @@ impl EthTx { // pub struct EthClient { socket_path: String, - _subscriptions: Arc>>>, + subscriptions: Arc>>, notify_channel: ( async_channel::Sender, async_channel::Receiver, @@ -173,13 +182,85 @@ pub struct EthClient { } impl EthClient { - pub fn new(socket_path: String) -> Self { + pub fn new(socket_path: String) -> Arc { let notify_channel = async_channel::unbounded(); - let _subscriptions = Arc::new(Mutex::new(Vec::new())); - Self { + let subscriptions = Arc::new(Mutex::new(Vec::new())); + Arc::new(Self { socket_path, - _subscriptions, + subscriptions, notify_channel, + }) + } + + async fn handle_subscribe_request( + self: Arc, + private: String, + addr: String, + drk_pub_key: jubjub::SubgroupPoint, + ) -> Result<()> { + if self.subscriptions.lock().await.contains(&addr) { + return Ok(()); + } + + let decimals = 18; + let prev_balance = self.get_current_balance(&addr, None).await?; + + let mut current_balance; + + let iter_interval = 1; + let mut sub_iter = 0; + + loop { + if sub_iter > 60 * 10 { + // 10 minutes + self.unsubscribe(&addr).await; + return Err(crate::Error::ClientFailed("Deposit for expired".into())); + } + + sub_iter += iter_interval; + async_std::task::sleep(Duration::from_secs(iter_interval)).await; + + current_balance = self.get_current_balance(&addr, None).await?; + + if current_balance != prev_balance { + break; + } + } + + let send_notification = self.notify_channel.0.clone(); + + self.unsubscribe(&addr).await; + + if current_balance < prev_balance { + return Err(crate::Error::ClientFailed( + "New balance is less than previous balance".into(), + )); + } + + let amnt = current_balance - prev_balance; + + + send_notification + .send(TokenNotification { + network: NetworkName::Solana, + token_id: generate_id(ETH_NATIVE_TOKEN_ID, &NetworkName::Solana)?, + drk_pub_key, + // TODO FIX + received_balance: amnt.to_u64_digits()[0], + decimals: decimals as u16, + }) + .await + .map_err(Error::from)?; + + Ok(()) + } + + async fn unsubscribe(self: Arc, pubkey: &String) { + let mut subscriptions = self.subscriptions.lock().await; + let index = subscriptions.iter().position(|p| p == pubkey); + if let Some(ind) = index { + debug!(target: "ETH BRIDGE", "Removing subscription from list"); + subscriptions.remove(ind); } } @@ -216,8 +297,8 @@ impl EthClient { /* pub async fn estimate_gas(&self, tx: &EthTx) -> Result { - let req = jsonrpc::request(json!("eth_estimateGas"), json!([tx])); - Ok(self.request(req).await?) + let req = jsonrpc::request(json!("eth_estimateGas"), json!([tx])); + Ok(self.request(req).await?) } */ @@ -245,6 +326,19 @@ impl EthClient { Ok(self.request(req).await?) } + pub async fn get_current_balance(&self, acc: &str, _mint: Option<&str>) -> Result { + // Latest known block, used to calculate present balance. + let block = self.block_number().await?; + let block = block.as_str().unwrap(); + + // Native ETH balance + let hexbalance = self.get_eth_balance(&acc, block).await?; + let hexbalance = hexbalance.as_str().unwrap().trim_start_matches("0x"); + let balance = BigUint::parse_bytes(hexbalance.as_bytes(), 16).unwrap(); + + Ok(balance) + } + pub async fn send_transaction(&self, tx: &EthTx, passphrase: &str) -> Result { let req = jsonrpc::request(json!("personal_sendTransaction"), json!([tx, passphrase])); Ok(self.request(req).await?) @@ -255,27 +349,50 @@ impl EthClient { impl NetworkClient for EthClient { async fn subscribe( self: Arc, - _drk_pub_key: jubjub::SubgroupPoint, + drk_pub_key: jubjub::SubgroupPoint, _mint_address: Option, - _executor: Arc>, + executor: Arc>, ) -> Result { - let private_key: Vec = vec![]; - let public_key = String::from("addr"); + let private_key = generate_privkey(); + + // TODO fix + let addr: String = self + .import_privkey(&private_key, "testpass") + .await? + .as_str() + .unwrap() + .to_string(); + + let private = private_key.clone(); + let addr_cloned = addr.clone(); + executor + .spawn(async move { + let result = self + .handle_subscribe_request(private, addr_cloned, drk_pub_key) + .await; + if let Err(e) = result { + error!(target: "SOL BRIDGE SUBSCRIPTION","{}", e.to_string()); + } + }) + .detach(); + + let private_key: Vec = serialize(&private_key); + Ok(TokenSubscribtion { private_key, - public_key, + public_key: addr, }) } async fn subscribe_with_keypair( self: Arc, _private_key: Vec, - _public_key: Vec, + public_key: Vec, _drk_pub_key: jubjub::SubgroupPoint, _mint_address: Option, _executor: Arc>, ) -> Result { - let public_key = String::from("addr"); + let public_key: String = deserialize(&public_key)?; Ok(public_key) } diff --git a/src/service/sol.rs b/src/service/sol.rs index 12bf72dda..fde4336d7 100644 --- a/src/service/sol.rs +++ b/src/service/sol.rs @@ -290,7 +290,7 @@ impl SolClient { let mut subscriptions = self.subscriptions.lock().await; let index = subscriptions.iter().position(|p| p == pubkey); if let Some(ind) = index { - debug!("Removing subscription from list"); + debug!(target: "SOL BRIDGE", "Removing subscription from list"); subscriptions.remove(ind); } }