diff --git a/src/bin/sol-test.rs b/src/bin/sol-test.rs index 1d547583e..9811aa4bb 100644 --- a/src/bin/sol-test.rs +++ b/src/bin/sol-test.rs @@ -11,14 +11,15 @@ async fn run() -> Result<()> { let main_keypair: Keypair; main_keypair = Keypair::from_bytes(&[ - 80, 20, 135, 18, 220, 115, 132, 178, 122, 1, 195, 250, 182, 241, 4, 82, 88, 12, 232, 249, - 154, 105, 222, 221, 229, 138, 22, 232, 197, 151, 155, 28, 173, 182, 189, 174, 66, 196, 63, - 98, 201, 68, 203, 60, 72, 93, 179, 244, 39, 158, 223, 249, 102, 160, 217, 245, 24, 153, - 152, 52, 41, 248, 226, 32, + 60, 233, 151, 189, 143, 1, 111, 173, 241, 1, 171, 31, 123, 156, 160, 235, 32, 108, 157, 75, + 100, 150, 255, 154, 36, 254, 230, 97, 83, 248, 213, 223, 183, 146, 221, 49, 146, 156, 140, + 27, 196, 234, 193, 229, 174, 93, 126, 232, 9, 85, 58, 45, 95, 105, 168, 167, 153, 123, 246, + 110, 193, 70, 192, 186, ]) .unwrap(); let bridge = bridge::Bridge::new(); + println!("main keypair {:?}", main_keypair.to_bytes()); println!("main pubkey {}", main_keypair.pubkey().to_string()); let network = String::from("sol"); @@ -30,6 +31,26 @@ async fn run() -> Result<()> { .add_clients(network.clone(), sol_client) .await?; + let bridge2 = bridge.clone(); + let bridge_subscribtion = bridge2.subscribe().await; + + bridge_subscribtion + .sender + .send(bridge::BridgeRequests { + network: network.clone(), + payload: bridge::BridgeRequestsPayload::Watch(None), + }) + .await?; + + let bridge_res = bridge_subscribtion.receiver.recv().await?; + + match bridge_res.payload { + bridge::BridgeResponsePayload::Watch(_, token_pub) => { + println!("watch this address {}", token_pub); + } + _ => {} + } + let bridge_subscribtion = bridge.subscribe().await; bridge_subscribtion @@ -42,7 +63,6 @@ async fn run() -> Result<()> { let bridge_res = bridge_subscribtion.receiver.recv().await?; - // XXX this will not work match bridge_res.payload { bridge::BridgeResponsePayload::Watch(_, token_pub) => { println!("watch this address {}", token_pub); diff --git a/src/service/sol.rs b/src/service/sol.rs index 4e893c9e9..549e688fc 100644 --- a/src/service/sol.rs +++ b/src/service/sol.rs @@ -39,16 +39,12 @@ struct SubscribeParams { pub struct SolClient { keypair: Keypair, - // subscriptions hashmap using sub_id as an index and a value of (Keypair, amount) + // subscriptions vecotr of puykey subscriptions: Arc>>, notify_channel: ( async_channel::Sender, async_channel::Receiver, ), - subscribe_channel: ( - async_channel::Sender, - async_channel::Receiver, - ), } impl SolClient { @@ -56,21 +52,135 @@ impl SolClient { let keypair: Keypair = deserialize(&keypair)?; let notify_channel = async_channel::unbounded(); - let subscribe_channel = async_channel::unbounded(); Ok(Arc::new(Self { keypair, subscriptions: Arc::new(Mutex::new(Vec::new())), notify_channel, - subscribe_channel, })) } + pub fn send_to_main_account(&self, keypair: &Keypair, mut amount: u64) -> SolResult<()> { + debug!( + target: "SOL BRIDGE", + "sending received token to main account" + ); + + let rpc = RpcClient::new(RPC_SERVER.to_string()); + + let fee = rpc + .get_fees() + .unwrap() + .fee_calculator + .lamports_per_signature; + + if fee >= amount { + warn!( + target: "SOL BRIDGE", + "Received insufficient {} lamports, couldn't send it to + the main_keypair", + amount, + ); + return Ok(()); + } + + // subtract fee from the new_bal + amount -= fee; + + let instruction = + system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount); + + let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey())); + let bhq = BlockhashQuery::default(); + match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) { + Err(_) => panic!("Couldn't connect to RPC"), + Ok(v) => tx.sign(&[keypair], v.0), + } + let _signature = rpc.send_and_confirm_transaction(&tx)?; + Ok(()) + } + + async fn handle_subscribe_request(self: Arc, keypair: Keypair) -> Result<()> { + debug!( + target: "SOL BRIDGE", + "Handle subscribe request" + ); + + // check first if it's not already subscribed + if self.subscriptions.lock().await.contains(&keypair.pubkey()) { + return Ok(()); + } + + // Parameters for subscription to events related to `pubkey`. + let sub_params = SubscribeParams { + encoding: json!("jsonParsed"), + commitment: json!("finalized"), + }; + + let sub_msg = jsonrpc::request( + json!("accountSubscribe"), + json!([json!(keypair.pubkey().to_string()), json!(sub_params)]), + ); + + let rpc = RpcClient::new(RPC_SERVER.to_string()); + let old_balance = rpc + .get_balance(&keypair.pubkey()) + .map_err(|err| SolFailed::from(err))?; + + // WebSocket handshake/connect + let builder = native_tls::TlsConnector::builder(); + let tls = TlsConnector::from(builder); + let (stream, _) = connect(WSS_SERVER, tls).await?; + + + let (mut write, mut read) = stream.split(); + + let (unsubscribe_channel_sx, unsubscribe_channel_rv) = async_channel::unbounded(); + + let unsubscribe_channel_rv2 = unsubscribe_channel_rv.clone(); + let ws_write_task: smol::Task> = smol::spawn(async move { + write.send(Message::text(serde_json::to_string(&sub_msg)?)) + .await + .map_err(|err| SolFailed::from(err))?; + + let unsub_msg = unsubscribe_channel_rv2.recv().await?; + write + .send(Message::text(serde_json::to_string(&unsub_msg)?)) + .await + .map_err(|err| SolFailed::from(err))?; + + Ok(()) + }); + + let keypair = serialize(&keypair); + + loop { + let message = read.next().await; + info!("msg: {:?}", message); + if let Some(msg) = message { + self.clone() + .read_ws_subscribe_msg( + msg, + &keypair, + old_balance, + unsubscribe_channel_sx.clone(), + ) + .await?; + } else { + break; + } + } + + ws_write_task.cancel().await; + Ok(()) + } + async fn read_ws_subscribe_msg( &self, message: std::result::Result, keypair: &Vec, old_balance: u64, + unsubscribe_channel_sx: async_channel::Sender, ) -> SolResult<()> { let data = message?.into_text()?; @@ -168,18 +278,20 @@ impl SolClient { .await .map_err(|err| Error::from(err))?; - self.unsubscribe(sub_id, &keypair.pubkey()).await?; - - //self.send_to_main_account(&keypair, new_bal)?; + self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone()) + .await?; debug!( target: "SOL BRIDGE", "Received {} lamports, to the pubkey: {} ", received_balance, keypair.pubkey().to_string(), ); + + self.send_to_main_account(&keypair, new_bal)?; } false => { - self.unsubscribe(sub_id, &keypair.pubkey()).await?; + self.unsubscribe(sub_id, &keypair.pubkey(), unsubscribe_channel_sx.clone()) + .await?; } } } @@ -187,112 +299,26 @@ impl SolClient { Ok(()) } - pub fn send_to_main_account(&self, keypair: &Keypair, amount: u64) -> SolResult<()> { - debug!( - target: "SOL BRIDGE", - "send received token to main account" - ); - let rpc = RpcClient::new(RPC_SERVER.to_string()); - - let instruction = - system_instruction::transfer(&keypair.pubkey(), &self.keypair.pubkey(), amount); - - let mut tx = Transaction::new_with_payer(&[instruction], Some(&keypair.pubkey())); - let bhq = BlockhashQuery::default(); - match bhq.get_blockhash_and_fee_calculator(&rpc, rpc.commitment()) { - Err(_) => panic!("Couldn't connect to RPC"), - Ok(v) => tx.sign(&[keypair], v.0), - } - let _signature = rpc.send_and_confirm_transaction(&tx)?; - Ok(()) - } - - async fn handle_subscribe_request(self: Arc, keypair: Keypair) -> Result<()> { - debug!( - target: "SOL BRIDGE", - "Handle subscribe request" - ); - - // check first if it's not already subscribed - if self.subscriptions.lock().await.contains(&keypair.pubkey()) { - return Ok(()); - } - - // Parameters for subscription to events related to `pubkey`. - let sub_params = SubscribeParams { - encoding: json!("jsonParsed"), - // XXX: Use "finalized" for 100% certainty. - commitment: json!("confirmed"), - }; - - let sub_msg = jsonrpc::request( - json!("accountSubscribe"), - json!([json!(keypair.pubkey().to_string()), json!(sub_params)]), - ); - - let rpc = RpcClient::new(RPC_SERVER.to_string()); - let old_balance = rpc - .get_balance(&keypair.pubkey()) - .map_err(|err| SolFailed::from(err))?; - - // WebSocket handshake/connect - let builder = native_tls::TlsConnector::builder(); - let tls = TlsConnector::from(builder); - let (stream, _) = connect(WSS_SERVER, tls).await?; - - let (mut write, read) = stream.split(); - - let unsub_recv = self.subscribe_channel.1.clone(); - let ws_write_task: smol::Task> = smol::spawn(async move { - write - .send(Message::text(serde_json::to_string(&sub_msg)?)) - .await - .map_err(|err| SolFailed::from(err))?; - - let unsub_msg = unsub_recv.recv().await?; - write - .send(Message::text(serde_json::to_string(&unsub_msg)?)) - .await - .map_err(|err| SolFailed::from(err))?; - Ok(()) - }); - - let keypair = serialize(&keypair); - - futures::StreamExt::for_each(read, |message| async { - // read ws msg - self.clone() - .read_ws_subscribe_msg(message, &keypair, old_balance) - .await - .expect("read_ws_msg"); - // TODO handle this error - }) - .await; - - ws_write_task.cancel().await; - Ok(()) - } - - async fn unsubscribe(&self, sub_id: u64, pubkey: &Pubkey) -> Result<()> { + async fn unsubscribe( + &self, + sub_id: u64, + pubkey: &Pubkey, + unsubscribe_channel_sx: async_channel::Sender, + ) -> Result<()> { let sub_msg = jsonrpc::request(json!("accountUnsubscribe"), json!([sub_id])); - self.subscribe_channel.0.send(sub_msg).await?; + unsubscribe_channel_sx.send(sub_msg).await?; let index = self .subscriptions .lock() .await .iter() - .position(|p| p == pubkey) - .unwrap(); + .position(|p| p == pubkey); - self.subscriptions.lock().await.remove(index); - - debug!( - target: "SOL BRIDGE", - "Successfully unsubscribe : {:?}", - sub_id - ); + if let Some(ind) = index { + self.subscriptions.lock().await.remove(ind); + } Ok(()) }