diff --git a/src/service/sol.rs b/src/service/sol.rs index 10908fba4..fc3b1fefb 100644 --- a/src/service/sol.rs +++ b/src/service/sol.rs @@ -1,29 +1,27 @@ -use crate::rpc::{ - jsonrpc, jsonrpc::JsonError, jsonrpc::JsonNotification, jsonrpc::JsonRequest, - jsonrpc::JsonResponse, jsonrpc::JsonResult, websockets::connect, -}; -use crate::serial::{deserialize, serialize, Decodable, Encodable}; -use crate::{Error, Result}; - -use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion}; +use std::convert::TryFrom; +use std::str::FromStr; use async_native_tls::TlsConnector; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use futures::{SinkExt, StreamExt}; -use log::*; +use log::{debug, error, warn}; use rand::rngs::OsRng; use serde::Serialize; use serde_json::{json, Value}; use solana_client::{blockhash_query::BlockhashQuery, rpc_client::RpcClient}; use solana_sdk::{ - pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction, - transaction::Transaction, + native_token::lamports_to_sol, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, + system_instruction, transaction::Transaction, }; -use std::convert::TryFrom; -use std::str::FromStr; use tungstenite::Message; +use crate::rpc::{jsonrpc, jsonrpc::JsonResult, websockets}; +use crate::serial::{deserialize, serialize, Decodable, Encodable}; +use crate::{Error, Result}; + +use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion}; + #[derive(Serialize)] struct SubscribeParams { encoding: Value, @@ -73,12 +71,152 @@ impl SolClient { })) } - pub fn send_to_main_account(&self, keypair: &Keypair, mut amount: u64) -> SolResult<()> { - debug!( - target: "SOL BRIDGE", - "sending received token to main account" + // TODO: Make this function more robust. Currently we just call it + // and put it in the background. This means no errors are actually + // handled, and it just fails silently. + async fn handle_subscribe_request( + self: Arc, + keypair: Keypair, + is_token: bool, + ) -> Result<()> { + debug!(target: "SOL BRIDGE", "handle_subscribe_request()"); + + // Check if we're already subscribed + if self.subscriptions.lock().await.contains(&keypair.pubkey()) { + return Ok(()); + } + + let rpc = RpcClient::new(self.rpc_server.to_string()); + + // Fetch the current balance. + let prev_balance = if !is_token { + rpc.get_balance(&keypair.pubkey()) + .map_err(|err| SolFailed::from(err))? + } else { + // TODO: SPL Token balance + 0 + }; + let mut cur_balance = prev_balance; + let mut decimals: Option = None; + let mut mint: Option<&str> = None; + + // WebSocket connection + let builder = native_tls::TlsConnector::builder(); + let tls = TlsConnector::from(builder); + let (mut stream, _) = websockets::connect(self.wss_server, tls).await?; + + // Subscription request build + let sub_params = SubscribeParams { + encoding: json!("jsonParsed"), + commitment: json!("finalized"), + }; + + let subscription = jsonrpc::request( + json!("accountSubscribe"), + json!([json!(keypair.pubkey().to_string()), json!(sub_params)]), ); + debug!(target: "SOLANA RPC", "--> {}", serde_json::to_string(&subscription)?); + stream + .send(Message::text(serde_json::to_string(&subscription)?)) + .await?; + + // Declare params here for longer variable lifetime. + let params: Value; + + // Subscription ID used for unsubscribing later. + let mut sub_id: i64 = 0; + + loop { + let message = stream.next().await.ok_or_else(|| Error::TungsteniteError)?; + let message = message.unwrap(); + debug!(target: "SOLANA SUBSCRIPTION", "<-- {}", message.clone().into_text()?); + + match serde_json::from_slice(&message.into_data())? { + JsonResult::Resp(r) => { + // ACK + debug!(target: "SOLANA RPC", "<-- {}", serde_json::to_string(&r)?); + self.subscriptions.lock().await.push(keypair.pubkey()); + sub_id = r.result.as_i64().unwrap(); + } + JsonResult::Err(e) => { + debug!(target: "SOLANA RPC", "<-- {}", serde_json::to_string(&e)?); + // TODO: Try removing pubkey from subscriptions here? + return Err(Error::JsonRpcError(e.error.message.to_string())); + } + JsonResult::Notif(n) => { + // Account updated + debug!(target: "SOLANA RPC", "Got WebSocket notification"); + params = n.params["result"]["value"].clone(); + + if is_token { + cur_balance = params["data"]["info"]["tokenAmount"]["amount"] + .as_u64() + .unwrap(); + + decimals = Some( + params["data"]["info"]["tokenAmount"]["decimals"] + .as_u64() + .unwrap(), + ); + + mint = Some(params["data"]["info"]["mint"].as_str().unwrap()); + } else { + cur_balance = params["lamports"].as_u64().unwrap(); + decimals = None; + mint = None; + } + break; + } + } + } + + // I miss goto/defer. + let index = self + .subscriptions + .lock() + .await + .iter() + .position(|p| p == &keypair.pubkey()); + if let Some(ind) = index { + debug!("Removing subscription from list"); + self.subscriptions.lock().await.remove(ind); + } + + let unsubscription = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id])); + stream + .send(Message::text(serde_json::to_string(&unsubscription)?)) + .await?; + + if cur_balance - prev_balance <= 0 { + error!("Current balance is not positive"); + return Err(Error::ServicesError("Current balance is not positive")); + } + + if is_token { + debug!(target: "SOL BRIDGE", "Received {} {:?} tokens", + (cur_balance - prev_balance) * decimals.unwrap(), mint.unwrap()); + self.send_tok_to_main_wallet(mint.unwrap(), cur_balance, keypair) + } else { + debug!(target: "SOL BRIDGE", "Received {} SOL", lamports_to_sol(cur_balance - prev_balance)); + self.send_sol_to_main_wallet(cur_balance, &keypair) + } + } + + // TODO + fn send_tok_to_main_wallet( + self: Arc, + mint: &str, + amount: u64, + keypair: Keypair, + ) -> Result<()> { + debug!(target: "SOL BRIDGE", "Sending tokens to main wallet"); + Ok(()) + } + + fn send_sol_to_main_wallet(self: Arc, amount: u64, keypair: &Keypair) -> Result<()> { + debug!(target: "SOL BRIDGE", "Sending {} SOL to main wallet", lamports_to_sol(amount)); + let rpc = RpcClient::new(self.rpc_server.to_string()); let fee = rpc @@ -88,250 +226,27 @@ impl SolClient { .lamports_per_signature; if fee >= amount { - warn!( - target: "SOL BRIDGE", - "Received insufficient {} lamports, couldn't send it to - the main_keypair", - amount, - ); + warn!(target: "SOL BRIDGE", "Insufficient funds on {:?} to send tx", &keypair.pubkey()); return Ok(()); } - // subtract fee from the new_bal - amount -= fee; + let amnt_to_transfer = amount - fee; - let instruction = - system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount); + let ix = system_instruction::transfer( + &keypair.pubkey(), + &self.keypair.pubkey(), + amnt_to_transfer, + ); - let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey())); + let mut tx = Transaction::new_with_payer(&[ix], Some(&keypair.pubkey())); let bhq = BlockhashQuery::default(); match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) { Err(_) => panic!("Couldn't connect to RPC"), Ok(v) => tx.sign(&[keypair], v.0), } - let _signature = rpc.send_and_confirm_transaction(&tx)?; - Ok(()) - } - async fn handle_subscribe_request(self: Arc, keypair: Keypair) -> Result<()> { - debug!( - target: "SOL BRIDGE", - "Handle subscribe request" - ); - - // check first if it's not already subscribed - if self.subscriptions.lock().await.contains(&keypair.pubkey()) { - return Ok(()); - } - - // Parameters for subscription to events related to `pubkey`. - let sub_params = SubscribeParams { - encoding: json!("jsonParsed"), - commitment: json!("finalized"), - }; - - let sub_msg = jsonrpc::request( - json!("accountSubscribe"), - json!([json!(keypair.pubkey().to_string()), json!(sub_params)]), - ); - - let rpc = RpcClient::new(self.rpc_server.to_string()); - let old_balance = rpc - .get_balance(&keypair.pubkey()) - .map_err(|err| SolFailed::from(err))?; - - // WebSocket handshake/connect - let builder = native_tls::TlsConnector::builder(); - let tls = TlsConnector::from(builder); - let (stream, _) = connect(self.wss_server, tls).await?; - - let (mut write, mut read) = stream.split(); - - let (unsubscribe_channel_sx, unsubscribe_channel_rv) = async_channel::unbounded(); - - let unsubscribe_channel_rv2 = unsubscribe_channel_rv.clone(); - let ws_write_task: smol::Task> = smol::spawn(async move { - write - .send(Message::text(serde_json::to_string(&sub_msg)?)) - .await - .map_err(|err| SolFailed::from(err))?; - - let unsub_msg = unsubscribe_channel_rv2.recv().await?; - write - .send(Message::text(serde_json::to_string(&unsub_msg)?)) - .await - .map_err(|err| SolFailed::from(err))?; - - Ok(()) - }); - - let keypair = serialize(&keypair); - - loop { - let message = read.next().await; - info!("msg: {:?}", message); - if let Some(msg) = message { - self.clone() - .read_ws_subscribe_msg( - msg, - &keypair, - old_balance, - unsubscribe_channel_sx.clone(), - ) - .await?; - } else { - break; - } - } - - ws_write_task.cancel().await; - Ok(()) - } - - async fn read_ws_subscribe_msg( - &self, - message: std::result::Result, - keypair: &Vec, - old_balance: u64, - unsubscribe_channel_sx: async_channel::Sender, - ) -> SolResult<()> { - let data = message?.into_text()?; - - let json_res: JsonResult; - - let v: std::collections::HashMap = - serde_json::from_str(&data).map_err(|err| Error::from(err))?; - - // XXX this for testing - if v.contains_key(&String::from("result")) { - json_res = JsonResult::Resp(JsonResponse { - jsonrpc: v["jsonrpc"].clone(), - result: v["result"].clone(), - id: v["id"].clone(), - }); - } else if v.contains_key(&String::from("error")) { - json_res = JsonResult::Err(JsonError { - jsonrpc: v["jsonrpc"].clone(), - error: serde_json::from_value(v["error"].clone()).unwrap(), - id: v["id"].clone(), - }); - } else { - json_res = JsonResult::Notif(JsonNotification { - jsonrpc: v["jsonrpc"].clone(), - method: v["method"].clone(), - params: v["params"].clone(), - }); - } - - match json_res { - JsonResult::Resp(r) => { - // receive a response with subscription id - let keypair: Keypair = deserialize(&keypair)?; - match r.result.as_bool() { - Some(v) => { - if v { - debug!( - target: "SOL BRIDGE", - "Successfully unsubscribe from address {}", - keypair.pubkey(), - ); - } else { - debug!( - target: "SOL BRIDGE", - "Unsuccessfully unsubscribe from address {}", - keypair.pubkey(), - ); - } - } - None => { - self.subscriptions.lock().await.push(keypair.pubkey()); - debug!( - target: "SOL BRIDGE", - "Successfully get response and subscribed to address {}", - keypair.pubkey(), - ); - } - } - } - - JsonResult::Err(e) => { - // receive an error - debug!( - target: "SOL BRIDGE", - "Error on subscription: {:?}", e.error.message.to_string()); - } - - JsonResult::Notif(n) => { - // receive notification once an account get updated - debug!( - target: "SOL BRIDGE", - "receive new notification" - ); - // get values from the notification - let new_bal = n.params["result"]["value"]["lamports"] - .as_u64() - .ok_or(Error::ParseIntError)?; - - let sub_id = n.params["subscription"] - .as_u64() - .ok_or(Error::ParseIntError)?; - - let keypair: Keypair = deserialize(&keypair)?; - - match new_bal > old_balance { - true => { - let received_balance = new_bal - old_balance; - - self.notify_channel - .0 - .send(TokenNotification { - secret_key: serialize(&keypair), - received_balance, - }) - .await - .map_err(|err| Error::from(err))?; - - self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone()) - .await?; - - debug!( - target: "SOL BRIDGE", - "Received {} lamports, to the pubkey: {} ", - received_balance, keypair.pubkey().to_string(), - ); - - self.send_to_main_account(&keypair, new_bal)?; - } - false => { - self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone()) - .await?; - } - } - } - } - Ok(()) - } - - async fn unsubscribe( - &self, - sub_id: u64, - pubkey: &Pubkey, - unsubscribe_channel_sx: async_channel::Sender, - ) -> Result<()> { - let sub_msg = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id])); - - unsubscribe_channel_sx.send(sub_msg).await?; - - let index = self - .subscriptions - .lock() - .await - .iter() - .position(|p| p == pubkey); - - if let Some(ind) = index { - self.subscriptions.lock().await.remove(ind); - } + let signature = rpc.send_and_confirm_transaction(&tx); + debug!(target: "SOL BRIDGE", "Sent to main wallet: {}", signature.unwrap()); Ok(()) } @@ -346,7 +261,8 @@ impl NetworkClient for SolClient { let secret_key = serialize(&keypair); let self2 = self.clone(); - smol::spawn(self2.handle_subscribe_request(keypair)).detach(); + // TODO: true/false depending on is_token + smol::spawn(self2.handle_subscribe_request(keypair, false)).detach(); Ok(TokenSubscribtion { secret_key, @@ -365,7 +281,8 @@ impl NetworkClient for SolClient { let public_key = keypair.pubkey().to_string(); let self2 = self.clone(); - smol::spawn(self2.handle_subscribe_request(keypair)).detach(); + // TODO: true/false depending on is_token + smol::spawn(self2.handle_subscribe_request(keypair, false)).detach(); Ok(public_key) }