service/sol: Fix and clean up account subscription handling.

This commit is contained in:
parazyd
2021-09-28 14:08:57 +02:00
parent a8941a7045
commit 3ac9d7edbe

View File

@@ -1,29 +1,27 @@
use crate::rpc::{
jsonrpc, jsonrpc::JsonError, jsonrpc::JsonNotification, jsonrpc::JsonRequest,
jsonrpc::JsonResponse, jsonrpc::JsonResult, websockets::connect,
};
use crate::serial::{deserialize, serialize, Decodable, Encodable};
use crate::{Error, Result};
use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion};
use std::convert::TryFrom;
use std::str::FromStr;
use async_native_tls::TlsConnector;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use log::*;
use log::{debug, error, warn};
use rand::rngs::OsRng;
use serde::Serialize;
use serde_json::{json, Value};
use solana_client::{blockhash_query::BlockhashQuery, rpc_client::RpcClient};
use solana_sdk::{
pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, system_instruction,
transaction::Transaction,
native_token::lamports_to_sol, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair,
system_instruction, transaction::Transaction,
};
use std::convert::TryFrom;
use std::str::FromStr;
use tungstenite::Message;
use crate::rpc::{jsonrpc, jsonrpc::JsonResult, websockets};
use crate::serial::{deserialize, serialize, Decodable, Encodable};
use crate::{Error, Result};
use super::bridge::{NetworkClient, TokenNotification, TokenSubscribtion};
#[derive(Serialize)]
struct SubscribeParams {
encoding: Value,
@@ -73,12 +71,152 @@ impl SolClient {
}))
}
pub fn send_to_main_account(&self, keypair: &Keypair, mut amount: u64) -> SolResult<()> {
debug!(
target: "SOL BRIDGE",
"sending received token to main account"
// TODO: Make this function more robust. Currently we just call it
// and put it in the background. This means no errors are actually
// handled, and it just fails silently.
async fn handle_subscribe_request(
self: Arc<Self>,
keypair: Keypair,
is_token: bool,
) -> Result<()> {
debug!(target: "SOL BRIDGE", "handle_subscribe_request()");
// Check if we're already subscribed
if self.subscriptions.lock().await.contains(&keypair.pubkey()) {
return Ok(());
}
let rpc = RpcClient::new(self.rpc_server.to_string());
// Fetch the current balance.
let prev_balance = if !is_token {
rpc.get_balance(&keypair.pubkey())
.map_err(|err| SolFailed::from(err))?
} else {
// TODO: SPL Token balance
0
};
let mut cur_balance = prev_balance;
let mut decimals: Option<u64> = None;
let mut mint: Option<&str> = None;
// WebSocket connection
let builder = native_tls::TlsConnector::builder();
let tls = TlsConnector::from(builder);
let (mut stream, _) = websockets::connect(self.wss_server, tls).await?;
// Subscription request build
let sub_params = SubscribeParams {
encoding: json!("jsonParsed"),
commitment: json!("finalized"),
};
let subscription = jsonrpc::request(
json!("accountSubscribe"),
json!([json!(keypair.pubkey().to_string()), json!(sub_params)]),
);
debug!(target: "SOLANA RPC", "--> {}", serde_json::to_string(&subscription)?);
stream
.send(Message::text(serde_json::to_string(&subscription)?))
.await?;
// Declare params here for longer variable lifetime.
let params: Value;
// Subscription ID used for unsubscribing later.
let mut sub_id: i64 = 0;
loop {
let message = stream.next().await.ok_or_else(|| Error::TungsteniteError)?;
let message = message.unwrap();
debug!(target: "SOLANA SUBSCRIPTION", "<-- {}", message.clone().into_text()?);
match serde_json::from_slice(&message.into_data())? {
JsonResult::Resp(r) => {
// ACK
debug!(target: "SOLANA RPC", "<-- {}", serde_json::to_string(&r)?);
self.subscriptions.lock().await.push(keypair.pubkey());
sub_id = r.result.as_i64().unwrap();
}
JsonResult::Err(e) => {
debug!(target: "SOLANA RPC", "<-- {}", serde_json::to_string(&e)?);
// TODO: Try removing pubkey from subscriptions here?
return Err(Error::JsonRpcError(e.error.message.to_string()));
}
JsonResult::Notif(n) => {
// Account updated
debug!(target: "SOLANA RPC", "Got WebSocket notification");
params = n.params["result"]["value"].clone();
if is_token {
cur_balance = params["data"]["info"]["tokenAmount"]["amount"]
.as_u64()
.unwrap();
decimals = Some(
params["data"]["info"]["tokenAmount"]["decimals"]
.as_u64()
.unwrap(),
);
mint = Some(params["data"]["info"]["mint"].as_str().unwrap());
} else {
cur_balance = params["lamports"].as_u64().unwrap();
decimals = None;
mint = None;
}
break;
}
}
}
// I miss goto/defer.
let index = self
.subscriptions
.lock()
.await
.iter()
.position(|p| p == &keypair.pubkey());
if let Some(ind) = index {
debug!("Removing subscription from list");
self.subscriptions.lock().await.remove(ind);
}
let unsubscription = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id]));
stream
.send(Message::text(serde_json::to_string(&unsubscription)?))
.await?;
if cur_balance - prev_balance <= 0 {
error!("Current balance is not positive");
return Err(Error::ServicesError("Current balance is not positive"));
}
if is_token {
debug!(target: "SOL BRIDGE", "Received {} {:?} tokens",
(cur_balance - prev_balance) * decimals.unwrap(), mint.unwrap());
self.send_tok_to_main_wallet(mint.unwrap(), cur_balance, keypair)
} else {
debug!(target: "SOL BRIDGE", "Received {} SOL", lamports_to_sol(cur_balance - prev_balance));
self.send_sol_to_main_wallet(cur_balance, &keypair)
}
}
// TODO
fn send_tok_to_main_wallet(
self: Arc<Self>,
mint: &str,
amount: u64,
keypair: Keypair,
) -> Result<()> {
debug!(target: "SOL BRIDGE", "Sending tokens to main wallet");
Ok(())
}
fn send_sol_to_main_wallet(self: Arc<Self>, amount: u64, keypair: &Keypair) -> Result<()> {
debug!(target: "SOL BRIDGE", "Sending {} SOL to main wallet", lamports_to_sol(amount));
let rpc = RpcClient::new(self.rpc_server.to_string());
let fee = rpc
@@ -88,250 +226,27 @@ impl SolClient {
.lamports_per_signature;
if fee >= amount {
warn!(
target: "SOL BRIDGE",
"Received insufficient {} lamports, couldn't send it to
the main_keypair",
amount,
);
warn!(target: "SOL BRIDGE", "Insufficient funds on {:?} to send tx", &keypair.pubkey());
return Ok(());
}
// subtract fee from the new_bal
amount -= fee;
let amnt_to_transfer = amount - fee;
let instruction =
system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount);
let ix = system_instruction::transfer(
&keypair.pubkey(),
&self.keypair.pubkey(),
amnt_to_transfer,
);
let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey()));
let mut tx = Transaction::new_with_payer(&[ix], Some(&keypair.pubkey()));
let bhq = BlockhashQuery::default();
match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) {
Err(_) => panic!("Couldn't connect to RPC"),
Ok(v) => tx.sign(&[keypair], v.0),
}
let _signature = rpc.send_and_confirm_transaction(&tx)?;
Ok(())
}
async fn handle_subscribe_request(self: Arc<Self>, keypair: Keypair) -> Result<()> {
debug!(
target: "SOL BRIDGE",
"Handle subscribe request"
);
// check first if it's not already subscribed
if self.subscriptions.lock().await.contains(&keypair.pubkey()) {
return Ok(());
}
// Parameters for subscription to events related to `pubkey`.
let sub_params = SubscribeParams {
encoding: json!("jsonParsed"),
commitment: json!("finalized"),
};
let sub_msg = jsonrpc::request(
json!("accountSubscribe"),
json!([json!(keypair.pubkey().to_string()), json!(sub_params)]),
);
let rpc = RpcClient::new(self.rpc_server.to_string());
let old_balance = rpc
.get_balance(&keypair.pubkey())
.map_err(|err| SolFailed::from(err))?;
// WebSocket handshake/connect
let builder = native_tls::TlsConnector::builder();
let tls = TlsConnector::from(builder);
let (stream, _) = connect(self.wss_server, tls).await?;
let (mut write, mut read) = stream.split();
let (unsubscribe_channel_sx, unsubscribe_channel_rv) = async_channel::unbounded();
let unsubscribe_channel_rv2 = unsubscribe_channel_rv.clone();
let ws_write_task: smol::Task<Result<()>> = smol::spawn(async move {
write
.send(Message::text(serde_json::to_string(&sub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
let unsub_msg = unsubscribe_channel_rv2.recv().await?;
write
.send(Message::text(serde_json::to_string(&unsub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
Ok(())
});
let keypair = serialize(&keypair);
loop {
let message = read.next().await;
info!("msg: {:?}", message);
if let Some(msg) = message {
self.clone()
.read_ws_subscribe_msg(
msg,
&keypair,
old_balance,
unsubscribe_channel_sx.clone(),
)
.await?;
} else {
break;
}
}
ws_write_task.cancel().await;
Ok(())
}
async fn read_ws_subscribe_msg(
&self,
message: std::result::Result<Message, tungstenite::Error>,
keypair: &Vec<u8>,
old_balance: u64,
unsubscribe_channel_sx: async_channel::Sender<JsonRequest>,
) -> SolResult<()> {
let data = message?.into_text()?;
let json_res: JsonResult;
let v: std::collections::HashMap<String, Value> =
serde_json::from_str(&data).map_err(|err| Error::from(err))?;
// XXX this for testing
if v.contains_key(&String::from("result")) {
json_res = JsonResult::Resp(JsonResponse {
jsonrpc: v["jsonrpc"].clone(),
result: v["result"].clone(),
id: v["id"].clone(),
});
} else if v.contains_key(&String::from("error")) {
json_res = JsonResult::Err(JsonError {
jsonrpc: v["jsonrpc"].clone(),
error: serde_json::from_value(v["error"].clone()).unwrap(),
id: v["id"].clone(),
});
} else {
json_res = JsonResult::Notif(JsonNotification {
jsonrpc: v["jsonrpc"].clone(),
method: v["method"].clone(),
params: v["params"].clone(),
});
}
match json_res {
JsonResult::Resp(r) => {
// receive a response with subscription id
let keypair: Keypair = deserialize(&keypair)?;
match r.result.as_bool() {
Some(v) => {
if v {
debug!(
target: "SOL BRIDGE",
"Successfully unsubscribe from address {}",
keypair.pubkey(),
);
} else {
debug!(
target: "SOL BRIDGE",
"Unsuccessfully unsubscribe from address {}",
keypair.pubkey(),
);
}
}
None => {
self.subscriptions.lock().await.push(keypair.pubkey());
debug!(
target: "SOL BRIDGE",
"Successfully get response and subscribed to address {}",
keypair.pubkey(),
);
}
}
}
JsonResult::Err(e) => {
// receive an error
debug!(
target: "SOL BRIDGE",
"Error on subscription: {:?}", e.error.message.to_string());
}
JsonResult::Notif(n) => {
// receive notification once an account get updated
debug!(
target: "SOL BRIDGE",
"receive new notification"
);
// get values from the notification
let new_bal = n.params["result"]["value"]["lamports"]
.as_u64()
.ok_or(Error::ParseIntError)?;
let sub_id = n.params["subscription"]
.as_u64()
.ok_or(Error::ParseIntError)?;
let keypair: Keypair = deserialize(&keypair)?;
match new_bal > old_balance {
true => {
let received_balance = new_bal - old_balance;
self.notify_channel
.0
.send(TokenNotification {
secret_key: serialize(&keypair),
received_balance,
})
.await
.map_err(|err| Error::from(err))?;
self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone())
.await?;
debug!(
target: "SOL BRIDGE",
"Received {} lamports, to the pubkey: {} ",
received_balance, keypair.pubkey().to_string(),
);
self.send_to_main_account(&keypair, new_bal)?;
}
false => {
self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone())
.await?;
}
}
}
}
Ok(())
}
async fn unsubscribe(
&self,
sub_id: u64,
pubkey: &Pubkey,
unsubscribe_channel_sx: async_channel::Sender<JsonRequest>,
) -> Result<()> {
let sub_msg = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id]));
unsubscribe_channel_sx.send(sub_msg).await?;
let index = self
.subscriptions
.lock()
.await
.iter()
.position(|p| p == pubkey);
if let Some(ind) = index {
self.subscriptions.lock().await.remove(ind);
}
let signature = rpc.send_and_confirm_transaction(&tx);
debug!(target: "SOL BRIDGE", "Sent to main wallet: {}", signature.unwrap());
Ok(())
}
@@ -346,7 +261,8 @@ impl NetworkClient for SolClient {
let secret_key = serialize(&keypair);
let self2 = self.clone();
smol::spawn(self2.handle_subscribe_request(keypair)).detach();
// TODO: true/false depending on is_token
smol::spawn(self2.handle_subscribe_request(keypair, false)).detach();
Ok(TokenSubscribtion {
secret_key,
@@ -365,7 +281,8 @@ impl NetworkClient for SolClient {
let public_key = keypair.pubkey().to_string();
let self2 = self.clone();
smol::spawn(self2.handle_subscribe_request(keypair)).detach();
// TODO: true/false depending on is_token
smol::spawn(self2.handle_subscribe_request(keypair, false)).detach();
Ok(public_key)
}