Sol Bridge: clean up & avoid calling unwrap()

This commit is contained in:
ghassmo
2021-09-19 04:03:06 +03:00
parent c1e0127a86
commit d4d941cb15

View File

@@ -1,6 +1,6 @@
use crate::rpc::{jsonrpc, jsonrpc::JsonResult};
use crate::serial::{deserialize, serialize, Decodable, Encodable};
use crate::Result;
use crate::{Error, Result};
use super::bridge::CoinClient;
@@ -17,7 +17,7 @@ use solana_sdk::{
pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction,
transaction::Transaction,
};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio_tungstenite::{connect_async, tungstenite, tungstenite::protocol::Message};
use async_std::sync::{Arc, Mutex};
use std::collections::HashMap;
@@ -77,104 +77,110 @@ impl SolClient {
Ok(self.notify_channel.1.clone())
}
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> SolResult<()> {
// WebSocket handshake/connect
let (ws_stream, _) = connect_async(WSS_SERVER)
.await
.expect("Failed to connect to WebSocket server");
let (ws_stream, _) = connect_async(WSS_SERVER).await?;
let (mut write, read) = ws_stream.split();
let self2 = self.clone();
executor
.spawn(async move {
loop {
// Send the subscription request
let sub_msg = self2
.watch_channel
.1
.recv()
.await
.expect("receiving subscription msg");
let _: async_executor::Task<Result<()>> = executor.spawn(async move {
loop {
let sub_msg = self2.watch_channel.1.recv().await?;
write
.send(Message::Text(serde_json::to_string(&sub_msg).unwrap()))
.await
.unwrap();
}
})
.detach();
write
.send(Message::Text(serde_json::to_string(&sub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
}
});
read.for_each(|message| async {
let data = message.unwrap().into_text().unwrap();
let v: JsonResult = serde_json::from_str(&data).unwrap();
match v {
JsonResult::Resp(r) => {
debug!(
target: "SOL BRIDGE",
"Successfully get response : {:?}",
r.result.as_i64().unwrap()
);
// let sub_id = r.result.as_i64().unwrap();
}
JsonResult::Err(e) => {
debug!(
target: "SOL BRIDGE",
"Error on subscription: {:?}", e.error.message.to_string());
}
JsonResult::Notif(n) => {
// TODO remove unwrap
let new_bal = n.params["result"]["value"]["lamports"].as_u64().unwrap();
let owner_pubkey = n.params["result"]["value"]["owner"].as_str().unwrap();
let (keypair, old_balance) =
self.subscriptions.lock().await[owner_pubkey].clone();
if new_bal > old_balance {
let sub_id = n.params["subscription"].as_u64().unwrap();
let received_balance = new_bal - old_balance;
let keypair: Keypair = deserialize(&keypair).expect("deserialize keypair");
self.send_to_main_account(keypair)
.expect("Send to main account");
self.notify_channel
.0
.send((
serialize(&Pubkey::from_str(owner_pubkey).unwrap()),
received_balance,
))
.await
.expect("send notify msg");
SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id)
.await
.unwrap();
debug!(
target: "SOL BRIDGE",
"Received {} lamports, to the pubkey: {} ",
received_balance, owner_pubkey.to_string(),
);
} else if new_bal < old_balance {
let sub_id = n.params["subscription"].as_u64().unwrap();
SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id)
.await
.unwrap();
}
}
}
let self2 = self.clone();
self2
.read_ws_msg(message)
.await
.expect("read from websocket");
})
.await;
Ok(())
}
fn send_to_main_account(&self, keypair: Keypair) -> Result<()> {
async fn read_ws_msg(
self: Arc<Self>,
message: std::result::Result<Message, tungstenite::Error>,
) -> SolResult<()> {
let data = message?.into_text()?;
let v: JsonResult = serde_json::from_str(&data).map_err(|err| Error::from(err))?;
match v {
JsonResult::Resp(r) => {
if let Some(sub_id) = r.result.as_i64() {
debug!(
target: "SOL BRIDGE",
"Successfully get response : {:?}",
sub_id
);
}
}
JsonResult::Err(e) => {
debug!(
target: "SOL BRIDGE",
"Error on subscription: {:?}", e.error.message.to_string());
}
JsonResult::Notif(n) => {
let new_bal = n.params["result"]["value"]["lamports"]
.as_u64()
.ok_or(Error::ParseIntError)?;
let owner_pubkey = n.params["result"]["value"]["owner"]
.as_str()
.ok_or(Error::ParseFailed("Error Parse serde_json Value to &str"))?;
let (keypair, old_balance) = self.subscriptions.lock().await[owner_pubkey].clone();
let sub_id = n.params["subscription"]
.as_u64()
.ok_or(Error::ParseIntError)?;
if new_bal > old_balance {
let received_balance = new_bal - old_balance;
let keypair: Keypair = deserialize(&keypair)?;
self.send_to_main_account(keypair)?;
self.notify_channel
.0
.send((
serialize(&Pubkey::from_str(owner_pubkey)?),
received_balance,
))
.await
.map_err(|err| Error::from(err))?;
SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id).await?;
debug!(
target: "SOL BRIDGE",
"Received {} lamports, to the pubkey: {} ",
received_balance, owner_pubkey.to_string(),
);
} else if new_bal < old_balance {
SolClient::unsubscribe(self.watch_channel.0.clone(), sub_id).await?;
}
}
}
Ok(())
}
fn send_to_main_account(&self, keypair: Keypair) -> SolResult<()> {
let rpc = RpcClient::new(RPC_SERVER.to_string());
let amount = rpc.get_balance(&keypair.pubkey()).unwrap();
let amount = rpc.get_balance(&keypair.pubkey())?;
let instruction =
system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount);
@@ -185,9 +191,7 @@ impl SolClient {
Err(_) => panic!("Couldn't connect to RPC"),
Ok(v) => tx.sign(&[&keypair], v.0),
}
let _signature = rpc
.send_and_confirm_transaction(&tx)
.expect("send transaction");
let _signature = rpc.send_and_confirm_transaction(&tx)?;
Ok(())
}
@@ -219,12 +223,14 @@ impl CoinClient for SolClient {
);
let rpc = RpcClient::new(RPC_SERVER.to_string());
let balance = rpc.get_balance(&keypair.pubkey()).unwrap();
let balance = rpc
.get_balance(&keypair.pubkey())
.map_err(|err| SolFailed::from(err))?;
self.subscriptions.lock().await.insert(
keypair.pubkey().to_string(),
(serialize(&keypair), balance),
);
self.subscriptions
.lock()
.await
.insert(keypair.pubkey().to_string(), (serialize(&keypair), balance));
self.watch_channel.0.send(sub_msg).await?;
@@ -247,7 +253,8 @@ impl CoinClient for SolClient {
let _signature = rpc
.send_and_confirm_transaction(&tx)
.expect("send transaction");
.map_err(|err| SolFailed::from(err))?;
Ok(())
}
}
@@ -296,8 +303,11 @@ impl Decodable for Pubkey {
pub enum SolFailed {
NotEnoughValue(u64),
BadSolAddress(String),
SolError(String),
DecodeAndEncodeError(String),
WebSocketError(String),
SolClientError(String),
ParseError(String),
SolError(String),
}
impl std::error::Error for SolFailed {}
@@ -314,6 +324,15 @@ impl std::fmt::Display for SolFailed {
SolFailed::DecodeAndEncodeError(ref err) => {
write!(f, "Decode and decode keys error: {}", err)
}
SolFailed::WebSocketError(i) => {
write!(f, "WebSocket Error: {}", i)
}
SolFailed::ParseError(i) => {
write!(f, "Parse Error: {}", i)
}
SolFailed::SolClientError(i) => {
write!(f, "Solana Client Error: {}", i)
}
SolFailed::SolError(i) => {
write!(f, "SolFailed: {}", i)
}
@@ -321,6 +340,24 @@ impl std::fmt::Display for SolFailed {
}
}
impl From<solana_sdk::pubkey::ParsePubkeyError> for SolFailed {
fn from(err: solana_sdk::pubkey::ParsePubkeyError) -> SolFailed {
SolFailed::ParseError(err.to_string())
}
}
impl From<tungstenite::Error> for SolFailed {
fn from(err: tungstenite::Error) -> SolFailed {
SolFailed::WebSocketError(err.to_string())
}
}
impl From<solana_client::client_error::ClientError> for SolFailed {
fn from(err: solana_client::client_error::ClientError) -> SolFailed {
SolFailed::SolError(err.to_string())
}
}
impl From<crate::error::Error> for SolFailed {
fn from(err: crate::error::Error) -> SolFailed {
SolFailed::SolError(err.to_string())