SolClient: WIP clean up & change subscribe commitment to finalized & calculate the fee when send received balance to main keypair

This commit is contained in:
ghassmo
2021-09-26 21:38:09 +03:00
parent 6d40023181
commit 027fa103f2
2 changed files with 159 additions and 113 deletions

View File

@@ -11,14 +11,15 @@ async fn run() -> Result<()> {
let main_keypair: Keypair;
main_keypair = Keypair::from_bytes(&[
80, 20, 135, 18, 220, 115, 132, 178, 122, 1, 195, 250, 182, 241, 4, 82, 88, 12, 232, 249,
154, 105, 222, 221, 229, 138, 22, 232, 197, 151, 155, 28, 173, 182, 189, 174, 66, 196, 63,
98, 201, 68, 203, 60, 72, 93, 179, 244, 39, 158, 223, 249, 102, 160, 217, 245, 24, 153,
152, 52, 41, 248, 226, 32,
60, 233, 151, 189, 143, 1, 111, 173, 241, 1, 171, 31, 123, 156, 160, 235, 32, 108, 157, 75,
100, 150, 255, 154, 36, 254, 230, 97, 83, 248, 213, 223, 183, 146, 221, 49, 146, 156, 140,
27, 196, 234, 193, 229, 174, 93, 126, 232, 9, 85, 58, 45, 95, 105, 168, 167, 153, 123, 246,
110, 193, 70, 192, 186,
])
.unwrap();
let bridge = bridge::Bridge::new();
println!("main keypair {:?}", main_keypair.to_bytes());
println!("main pubkey {}", main_keypair.pubkey().to_string());
let network = String::from("sol");
@@ -30,6 +31,26 @@ async fn run() -> Result<()> {
.add_clients(network.clone(), sol_client)
.await?;
let bridge2 = bridge.clone();
let bridge_subscribtion = bridge2.subscribe().await;
bridge_subscribtion
.sender
.send(bridge::BridgeRequests {
network: network.clone(),
payload: bridge::BridgeRequestsPayload::Watch(None),
})
.await?;
let bridge_res = bridge_subscribtion.receiver.recv().await?;
match bridge_res.payload {
bridge::BridgeResponsePayload::Watch(_, token_pub) => {
println!("watch this address {}", token_pub);
}
_ => {}
}
let bridge_subscribtion = bridge.subscribe().await;
bridge_subscribtion
@@ -42,7 +63,6 @@ async fn run() -> Result<()> {
let bridge_res = bridge_subscribtion.receiver.recv().await?;
// XXX this will not work
match bridge_res.payload {
bridge::BridgeResponsePayload::Watch(_, token_pub) => {
println!("watch this address {}", token_pub);

View File

@@ -39,16 +39,12 @@ struct SubscribeParams {
pub struct SolClient {
keypair: Keypair,
// subscriptions hashmap using sub_id as an index and a value of (Keypair, amount)
// subscriptions vecotr of puykey
subscriptions: Arc<Mutex<Vec<Pubkey>>>,
notify_channel: (
async_channel::Sender<TokenNotification>,
async_channel::Receiver<TokenNotification>,
),
subscribe_channel: (
async_channel::Sender<JsonRequest>,
async_channel::Receiver<JsonRequest>,
),
}
impl SolClient {
@@ -56,21 +52,135 @@ impl SolClient {
let keypair: Keypair = deserialize(&keypair)?;
let notify_channel = async_channel::unbounded();
let subscribe_channel = async_channel::unbounded();
Ok(Arc::new(Self {
keypair,
subscriptions: Arc::new(Mutex::new(Vec::new())),
notify_channel,
subscribe_channel,
}))
}
pub fn send_to_main_account(&self, keypair: &Keypair, mut amount: u64) -> SolResult<()> {
debug!(
target: "SOL BRIDGE",
"sending received token to main account"
);
let rpc = RpcClient::new(RPC_SERVER.to_string());
let fee = rpc
.get_fees()
.unwrap()
.fee_calculator
.lamports_per_signature;
if fee >= amount {
warn!(
target: "SOL BRIDGE",
"Received insufficient {} lamports, couldn't send it to
the main_keypair",
amount,
);
return Ok(());
}
// subtract fee from the new_bal
amount -= fee;
let instruction =
system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount);
let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey()));
let bhq = BlockhashQuery::default();
match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) {
Err(_) => panic!("Couldn't connect to RPC"),
Ok(v) => tx.sign(&[keypair], v.0),
}
let _signature = rpc.send_and_confirm_transaction(&tx)?;
Ok(())
}
async fn handle_subscribe_request(self: Arc<Self>, keypair: Keypair) -> Result<()> {
debug!(
target: "SOL BRIDGE",
"Handle subscribe request"
);
// check first if it's not already subscribed
if self.subscriptions.lock().await.contains(&keypair.pubkey()) {
return Ok(());
}
// Parameters for subscription to events related to `pubkey`.
let sub_params = SubscribeParams {
encoding: json!("jsonParsed"),
commitment: json!("finalized"),
};
let sub_msg = jsonrpc::request(
json!("accountSubscribe"),
json!([json!(keypair.pubkey().to_string()), json!(sub_params)]),
);
let rpc = RpcClient::new(RPC_SERVER.to_string());
let old_balance = rpc
.get_balance(&keypair.pubkey())
.map_err(|err| SolFailed::from(err))?;
// WebSocket handshake/connect
let builder = native_tls::TlsConnector::builder();
let tls = TlsConnector::from(builder);
let (stream, _) = connect(WSS_SERVER, tls).await?;
let (mut write, mut read) = stream.split();
let (unsubscribe_channel_sx, unsubscribe_channel_rv) = async_channel::unbounded();
let unsubscribe_channel_rv2 = unsubscribe_channel_rv.clone();
let ws_write_task: smol::Task<Result<()>> = smol::spawn(async move {
write.send(Message::text(serde_json::to_string(&sub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
let unsub_msg = unsubscribe_channel_rv2.recv().await?;
write
.send(Message::text(serde_json::to_string(&unsub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
Ok(())
});
let keypair = serialize(&keypair);
loop {
let message = read.next().await;
info!("msg: {:?}", message);
if let Some(msg) = message {
self.clone()
.read_ws_subscribe_msg(
msg,
&keypair,
old_balance,
unsubscribe_channel_sx.clone(),
)
.await?;
} else {
break;
}
}
ws_write_task.cancel().await;
Ok(())
}
async fn read_ws_subscribe_msg(
&self,
message: std::result::Result<Message, tungstenite::Error>,
keypair: &Vec<u8>,
old_balance: u64,
unsubscribe_channel_sx: async_channel::Sender<JsonRequest>,
) -> SolResult<()> {
let data = message?.into_text()?;
@@ -168,18 +278,20 @@ impl SolClient {
.await
.map_err(|err| Error::from(err))?;
self.unsubscribe(sub_id, &keypair.pubkey()).await?;
//self.send_to_main_account(&keypair, new_bal)?;
self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone())
.await?;
debug!(
target: "SOL BRIDGE",
"Received {} lamports, to the pubkey: {} ",
received_balance, keypair.pubkey().to_string(),
);
self.send_to_main_account(&keypair, new_bal)?;
}
false => {
self.unsubscribe(sub_id, &keypair.pubkey()).await?;
self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone())
.await?;
}
}
}
@@ -187,112 +299,26 @@ impl SolClient {
Ok(())
}
pub fn send_to_main_account(&self, keypair: &Keypair, amount: u64) -> SolResult<()> {
debug!(
target: "SOL BRIDGE",
"send received token to main account"
);
let rpc = RpcClient::new(RPC_SERVER.to_string());
let instruction =
system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount);
let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey()));
let bhq = BlockhashQuery::default();
match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) {
Err(_) => panic!("Couldn't connect to RPC"),
Ok(v) => tx.sign(&[keypair], v.0),
}
let _signature = rpc.send_and_confirm_transaction(&tx)?;
Ok(())
}
async fn handle_subscribe_request(self: Arc<Self>, keypair: Keypair) -> Result<()> {
debug!(
target: "SOL BRIDGE",
"Handle subscribe request"
);
// check first if it's not already subscribed
if self.subscriptions.lock().await.contains(&keypair.pubkey()) {
return Ok(());
}
// Parameters for subscription to events related to `pubkey`.
let sub_params = SubscribeParams {
encoding: json!("jsonParsed"),
// XXX: Use "finalized" for 100% certainty.
commitment: json!("confirmed"),
};
let sub_msg = jsonrpc::request(
json!("accountSubscribe"),
json!([json!(keypair.pubkey().to_string()), json!(sub_params)]),
);
let rpc = RpcClient::new(RPC_SERVER.to_string());
let old_balance = rpc
.get_balance(&keypair.pubkey())
.map_err(|err| SolFailed::from(err))?;
// WebSocket handshake/connect
let builder = native_tls::TlsConnector::builder();
let tls = TlsConnector::from(builder);
let (stream, _) = connect(WSS_SERVER, tls).await?;
let (mut write, read) = stream.split();
let unsub_recv = self.subscribe_channel.1.clone();
let ws_write_task: smol::Task<Result<()>> = smol::spawn(async move {
write
.send(Message::text(serde_json::to_string(&sub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
let unsub_msg = unsub_recv.recv().await?;
write
.send(Message::text(serde_json::to_string(&unsub_msg)?))
.await
.map_err(|err| SolFailed::from(err))?;
Ok(())
});
let keypair = serialize(&keypair);
futures::StreamExt::for_each(read, |message| async {
// read ws msg
self.clone()
.read_ws_subscribe_msg(message, &keypair, old_balance)
.await
.expect("read_ws_msg");
// TODO handle this error
})
.await;
ws_write_task.cancel().await;
Ok(())
}
async fn unsubscribe(&self, sub_id: u64, pubkey: &Pubkey) -> Result<()> {
async fn unsubscribe(
&self,
sub_id: u64,
pubkey: &Pubkey,
unsubscribe_channel_sx: async_channel::Sender<JsonRequest>,
) -> Result<()> {
let sub_msg = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id]));
self.subscribe_channel.0.send(sub_msg).await?;
unsubscribe_channel_sx.send(sub_msg).await?;
let index = self
.subscriptions
.lock()
.await
.iter()
.position(|p| p == pubkey)
.unwrap();
.position(|p| p == pubkey);
self.subscriptions.lock().await.remove(index);
debug!(
target: "SOL BRIDGE",
"Successfully unsubscribe : {:?}",
sub_id
);
if let Some(ind) = index {
self.subscriptions.lock().await.remove(ind);
}
Ok(())
}