diff --git a/src/service/sol.rs b/src/service/sol.rs index 59ecec369..43e1aefeb 100644 --- a/src/service/sol.rs +++ b/src/service/sol.rs @@ -1,6 +1,6 @@ use crate::rpc::{jsonrpc, jsonrpc::JsonResult}; use crate::serial::{deserialize, serialize, Decodable, Encodable}; -use crate::Result; +use crate::{Error, Result}; use super::bridge::CoinClient; @@ -17,7 +17,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction, transaction::Transaction, }; -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use tokio_tungstenite::{connect_async, tungstenite, tungstenite::protocol::Message}; use async_std::sync::{Arc, Mutex}; use std::collections::HashMap; @@ -77,104 +77,110 @@ impl SolClient { Ok(self.notify_channel.1.clone()) } - pub async fn run(self: Arc, executor: Arc>) -> Result<()> { + pub async fn run(self: Arc, executor: Arc>) -> SolResult<()> { // WebSocket handshake/connect - let (ws_stream, _) = connect_async(WSS_SERVER) - .await - .expect("Failed to connect to WebSocket server"); + let (ws_stream, _) = connect_async(WSS_SERVER).await?; let (mut write, read) = ws_stream.split(); let self2 = self.clone(); - executor - .spawn(async move { - loop { - // Send the subscription request - let sub_msg = self2 - .watch_channel - .1 - .recv() - .await - .expect("receiving subscription msg"); + let _: async_executor::Task> = executor.spawn(async move { + loop { + let sub_msg = self2.watch_channel.1.recv().await?; - write - .send(Message::Text(serde_json::to_string(&sub_msg).unwrap())) - .await - .unwrap(); - } - }) - .detach(); + write + .send(Message::Text(serde_json::to_string(&sub_msg)?)) + .await + .map_err(|err| SolFailed::from(err))?; + } + }); read.for_each(|message| async { - let data = message.unwrap().into_text().unwrap(); - let v: JsonResult = serde_json::from_str(&data).unwrap(); - match v { - JsonResult::Resp(r) => { - debug!( - target: "SOL BRIDGE", - "Successfully get response : {:?}", - r.result.as_i64().unwrap() - ); - // let sub_id = r.result.as_i64().unwrap(); - } - - JsonResult::Err(e) => { - debug!( - target: "SOL BRIDGE", - "Error on subscription: {:?}", e.error.message.to_string()); - } - - JsonResult::Notif(n) => { - // TODO remove unwrap - let new_bal = n.params["result"]["value"]["lamports"].as_u64().unwrap(); - let owner_pubkey = n.params["result"]["value"]["owner"].as_str().unwrap(); - let (keypair, old_balance) = - self.subscriptions.lock().await[owner_pubkey].clone(); - - if new_bal > old_balance { - let sub_id = n.params["subscription"].as_u64().unwrap(); - let received_balance = new_bal - old_balance; - - let keypair: Keypair = deserialize(&keypair).expect("deserialize keypair"); - - self.send_to_main_account(keypair) - .expect("Send to main account"); - - self.notify_channel - .0 - .send(( - serialize(&Pubkey::from_str(owner_pubkey).unwrap()), - received_balance, - )) - .await - .expect("send notify msg"); - - SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id) - .await - .unwrap(); - - debug!( - target: "SOL BRIDGE", - "Received {} lamports, to the pubkey: {} ", - received_balance, owner_pubkey.to_string(), - ); - } else if new_bal < old_balance { - let sub_id = n.params["subscription"].as_u64().unwrap(); - SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id) - .await - .unwrap(); - } - } - } + let self2 = self.clone(); + self2 + .read_ws_msg(message) + .await + .expect("read from websocket"); }) .await; Ok(()) } - fn send_to_main_account(&self, keypair: Keypair) -> Result<()> { + async fn read_ws_msg( + self: Arc, + message: std::result::Result, + ) -> SolResult<()> { + let data = message?.into_text()?; + + let v: JsonResult = serde_json::from_str(&data).map_err(|err| Error::from(err))?; + + match v { + JsonResult::Resp(r) => { + if let Some(sub_id) = r.result.as_i64() { + debug!( + target: "SOL BRIDGE", + "Successfully get response : {:?}", + sub_id + ); + } + } + + JsonResult::Err(e) => { + debug!( + target: "SOL BRIDGE", + "Error on subscription: {:?}", e.error.message.to_string()); + } + + JsonResult::Notif(n) => { + let new_bal = n.params["result"]["value"]["lamports"] + .as_u64() + .ok_or(Error::ParseIntError)?; + + let owner_pubkey = n.params["result"]["value"]["owner"] + .as_str() + .ok_or(Error::ParseFailed("Error Parse serde_json Value to &str"))?; + + let (keypair, old_balance) = self.subscriptions.lock().await[owner_pubkey].clone(); + + let sub_id = n.params["subscription"] + .as_u64() + .ok_or(Error::ParseIntError)?; + + if new_bal > old_balance { + let received_balance = new_bal - old_balance; + + let keypair: Keypair = deserialize(&keypair)?; + + self.send_to_main_account(keypair)?; + + self.notify_channel + .0 + .send(( + serialize(&Pubkey::from_str(owner_pubkey)?), + received_balance, + )) + .await + .map_err(|err| Error::from(err))?; + + SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id).await?; + + debug!( + target: "SOL BRIDGE", + "Received {} lamports, to the pubkey: {} ", + received_balance, owner_pubkey.to_string(), + ); + } else if new_bal < old_balance { + SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id).await?; + } + } + } + Ok(()) + } + + fn send_to_main_account(&self, keypair: Keypair) -> SolResult<()> { let rpc = RpcClient::new(RPC_SERVER.to_string()); - let amount = rpc.get_balance(&keypair.pubkey()).unwrap(); + let amount = rpc.get_balance(&keypair.pubkey())?; let instruction = system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount); @@ -185,9 +191,7 @@ impl SolClient { Err(_) => panic!("Couldn't connect to RPC"), Ok(v) => tx.sign(&[&keypair], v.0), } - let _signature = rpc - .send_and_confirm_transaction(&tx) - .expect("send transaction"); + let _signature = rpc.send_and_confirm_transaction(&tx)?; Ok(()) } @@ -219,12 +223,14 @@ impl CoinClient for SolClient { ); let rpc = RpcClient::new(RPC_SERVER.to_string()); - let balance = rpc.get_balance(&keypair.pubkey()).unwrap(); + let balance = rpc + .get_balance(&keypair.pubkey()) + .map_err(|err| SolFailed::from(err))?; - self.subscriptions.lock().await.insert( - keypair.pubkey().to_string(), - (serialize(&keypair), balance), - ); + self.subscriptions + .lock() + .await + .insert(keypair.pubkey().to_string(), (serialize(&keypair), balance)); self.watch_channel.0.send(sub_msg).await?; @@ -247,7 +253,8 @@ impl CoinClient for SolClient { let _signature = rpc .send_and_confirm_transaction(&tx) - .expect("send transaction"); + .map_err(|err| SolFailed::from(err))?; + Ok(()) } } @@ -296,8 +303,11 @@ impl Decodable for Pubkey { pub enum SolFailed { NotEnoughValue(u64), BadSolAddress(String), - SolError(String), DecodeAndEncodeError(String), + WebSocketError(String), + SolClientError(String), + ParseError(String), + SolError(String), } impl std::error::Error for SolFailed {} @@ -314,6 +324,15 @@ impl std::fmt::Display for SolFailed { SolFailed::DecodeAndEncodeError(ref err) => { write!(f, "Decode and decode keys error: {}", err) } + SolFailed::WebSocketError(i) => { + write!(f, "WebSocket Error: {}", i) + } + SolFailed::ParseError(i) => { + write!(f, "Parse Error: {}", i) + } + SolFailed::SolClientError(i) => { + write!(f, "Solana Client Error: {}", i) + } SolFailed::SolError(i) => { write!(f, "SolFailed: {}", i) } @@ -321,6 +340,24 @@ impl std::fmt::Display for SolFailed { } } +impl From for SolFailed { + fn from(err: solana_sdk::pubkey::ParsePubkeyError) -> SolFailed { + SolFailed::ParseError(err.to_string()) + } +} + +impl From for SolFailed { + fn from(err: tungstenite::Error) -> SolFailed { + SolFailed::WebSocketError(err.to_string()) + } +} + +impl From for SolFailed { + fn from(err: solana_client::client_error::ClientError) -> SolFailed { + SolFailed::SolError(err.to_string()) + } +} + impl From for SolFailed { fn from(err: crate::error::Error) -> SolFailed { SolFailed::SolError(err.to_string())