eth: WIP implement subscribe function

This commit is contained in:
ghassmo
2021-10-26 16:22:19 +03:00
parent 8052fcca3a
commit d38b0808a0
2 changed files with 134 additions and 17 deletions

View File

@@ -1,18 +1,27 @@
use async_std::sync::{Arc, Mutex};
use std::convert::TryInto;
use std::time::Duration;
use async_executor::Executor;
use async_trait::async_trait;
use hash_db::Hasher;
use keccak_hasher::KeccakHasher;
use lazy_static::lazy_static;
use log::debug;
use log::{debug, error};
use num_bigint::{BigUint, RandBigInt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion};
use crate::{rpc::jsonrpc, rpc::jsonrpc::JsonResult, Error, Result};
use crate::{
rpc::jsonrpc,
rpc::jsonrpc::JsonResult,
serial::{deserialize, serialize},
util::{generate_id, NetworkName},
Error, Result,
};
pub const ETH_NATIVE_TOKEN_ID: &str = "0x0000000000000000000000000000000000000000";
// An ERC-20 token transfer transaction's data is as follows:
//
@@ -165,7 +174,7 @@ impl EthTx {
//
pub struct EthClient {
socket_path: String,
_subscriptions: Arc<Mutex<Vec<Vec<u8>>>>,
subscriptions: Arc<Mutex<Vec<String>>>,
notify_channel: (
async_channel::Sender<TokenNotification>,
async_channel::Receiver<TokenNotification>,
@@ -173,13 +182,85 @@ pub struct EthClient {
}
impl EthClient {
pub fn new(socket_path: String) -> Self {
pub fn new(socket_path: String) -> Arc<Self> {
let notify_channel = async_channel::unbounded();
let _subscriptions = Arc::new(Mutex::new(Vec::new()));
Self {
let subscriptions = Arc::new(Mutex::new(Vec::new()));
Arc::new(Self {
socket_path,
_subscriptions,
subscriptions,
notify_channel,
})
}
async fn handle_subscribe_request(
self: Arc<Self>,
private: String,
addr: String,
drk_pub_key: jubjub::SubgroupPoint,
) -> Result<()> {
if self.subscriptions.lock().await.contains(&addr) {
return Ok(());
}
let decimals = 18;
let prev_balance = self.get_current_balance(&addr, None).await?;
let mut current_balance;
let iter_interval = 1;
let mut sub_iter = 0;
loop {
if sub_iter > 60 * 10 {
// 10 minutes
self.unsubscribe(&addr).await;
return Err(crate::Error::ClientFailed("Deposit for expired".into()));
}
sub_iter += iter_interval;
async_std::task::sleep(Duration::from_secs(iter_interval)).await;
current_balance = self.get_current_balance(&addr, None).await?;
if current_balance != prev_balance {
break;
}
}
let send_notification = self.notify_channel.0.clone();
self.unsubscribe(&addr).await;
if current_balance < prev_balance {
return Err(crate::Error::ClientFailed(
"New balance is less than previous balance".into(),
));
}
let amnt = current_balance - prev_balance;
send_notification
.send(TokenNotification {
network: NetworkName::Solana,
token_id: generate_id(ETH_NATIVE_TOKEN_ID, &NetworkName::Solana)?,
drk_pub_key,
// TODO FIX
received_balance: amnt.to_u64_digits()[0],
decimals: decimals as u16,
})
.await
.map_err(Error::from)?;
Ok(())
}
async fn unsubscribe(self: Arc<Self>, pubkey: &String) {
let mut subscriptions = self.subscriptions.lock().await;
let index = subscriptions.iter().position(|p| p == pubkey);
if let Some(ind) = index {
debug!(target: "ETH BRIDGE", "Removing subscription from list");
subscriptions.remove(ind);
}
}
@@ -216,8 +297,8 @@ impl EthClient {
/*
pub async fn estimate_gas(&self, tx: &EthTx) -> Result<Value> {
let req = jsonrpc::request(json!("eth_estimateGas"), json!([tx]));
Ok(self.request(req).await?)
let req = jsonrpc::request(json!("eth_estimateGas"), json!([tx]));
Ok(self.request(req).await?)
}
*/
@@ -245,6 +326,19 @@ impl EthClient {
Ok(self.request(req).await?)
}
pub async fn get_current_balance(&self, acc: &str, _mint: Option<&str>) -> Result<BigUint> {
// Latest known block, used to calculate present balance.
let block = self.block_number().await?;
let block = block.as_str().unwrap();
// Native ETH balance
let hexbalance = self.get_eth_balance(&acc, block).await?;
let hexbalance = hexbalance.as_str().unwrap().trim_start_matches("0x");
let balance = BigUint::parse_bytes(hexbalance.as_bytes(), 16).unwrap();
Ok(balance)
}
pub async fn send_transaction(&self, tx: &EthTx, passphrase: &str) -> Result<Value> {
let req = jsonrpc::request(json!("personal_sendTransaction"), json!([tx, passphrase]));
Ok(self.request(req).await?)
@@ -255,27 +349,50 @@ impl EthClient {
impl NetworkClient for EthClient {
async fn subscribe(
self: Arc<Self>,
_drk_pub_key: jubjub::SubgroupPoint,
drk_pub_key: jubjub::SubgroupPoint,
_mint_address: Option<String>,
_executor: Arc<Executor<'_>>,
executor: Arc<Executor<'_>>,
) -> Result<TokenSubscribtion> {
let private_key: Vec<u8> = vec![];
let public_key = String::from("addr");
let private_key = generate_privkey();
// TODO fix
let addr: String = self
.import_privkey(&private_key, "testpass")
.await?
.as_str()
.unwrap()
.to_string();
let private = private_key.clone();
let addr_cloned = addr.clone();
executor
.spawn(async move {
let result = self
.handle_subscribe_request(private, addr_cloned, drk_pub_key)
.await;
if let Err(e) = result {
error!(target: "SOL BRIDGE SUBSCRIPTION","{}", e.to_string());
}
})
.detach();
let private_key: Vec<u8> = serialize(&private_key);
Ok(TokenSubscribtion {
private_key,
public_key,
public_key: addr,
})
}
async fn subscribe_with_keypair(
self: Arc<Self>,
_private_key: Vec<u8>,
_public_key: Vec<u8>,
public_key: Vec<u8>,
_drk_pub_key: jubjub::SubgroupPoint,
_mint_address: Option<String>,
_executor: Arc<Executor<'_>>,
) -> Result<String> {
let public_key = String::from("addr");
let public_key: String = deserialize(&public_key)?;
Ok(public_key)
}

View File

@@ -290,7 +290,7 @@ impl SolClient {
let mut subscriptions = self.subscriptions.lock().await;
let index = subscriptions.iter().position(|p| p == pubkey);
if let Some(ind) = index {
debug!("Removing subscription from list");
debug!(target: "SOL BRIDGE", "Removing subscription from list");
subscriptions.remove(ind);
}
}