mirror of
https://github.com/MAGICGrants/autoforward-autoconvert.git
synced 2026-01-08 21:18:09 -05:00
112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
import requests
|
|
import json
|
|
from time import sleep
|
|
from datetime import datetime
|
|
from requests.auth import HTTPDigestAuth
|
|
|
|
from helpers import *
|
|
|
|
BITCOIN_RPC_URL = os.getenv('BITCOIN_RPC_URL')
|
|
BITCOIN_RPC_USERNAME = os.getenv('BITCOIN_RPC_USERNAME')
|
|
BITCOIN_RPC_PASSWORD = os.getenv('BITCOIN_RPC_PASSWORD')
|
|
MONERO_RPC_URL = os.getenv('MONERO_RPC_URL')
|
|
MONERO_RPC_USERNAME = os.getenv('MONERO_RPC_USERNAME')
|
|
MONERO_RPC_PASSWORD = os.getenv('MONERO_RPC_PASSWORD')
|
|
|
|
def request_bitcoin_rpc(method: str, params: list[str] = []) -> any:
|
|
headers = {'content-type': 'application/json'}
|
|
|
|
data = {
|
|
'jsonrpc': '1.0',
|
|
'id': 'python-bitcoin',
|
|
'method': 'getbalance',
|
|
'params': params,
|
|
}
|
|
|
|
return requests.post(
|
|
BITCOIN_RPC_URL,
|
|
headers=headers,
|
|
data=json.dumps(data),
|
|
auth=(BITCOIN_RPC_USERNAME, BITCOIN_RPC_PASSWORD)
|
|
).json()['result']
|
|
|
|
def request_monero_rpc(method: str, params: dict[str, any] = {}) -> any:
|
|
headers = {'content-type': 'application/json'}
|
|
|
|
data = {
|
|
'jsonrpc': '2.0',
|
|
'id': '0',
|
|
'method': method,
|
|
'params': params
|
|
}
|
|
|
|
return requests.post(
|
|
MONERO_RPC_URL,
|
|
headers=headers,
|
|
json=data,
|
|
auth=HTTPDigestAuth(MONERO_RPC_USERNAME, MONERO_RPC_PASSWORD)
|
|
).json()['result']
|
|
|
|
def get_bitcoin_fee_rate() -> int:
|
|
return requests.get('https://mempool.space/api/v1/fees/recommended').json()['halfHourFee']
|
|
|
|
def get_bitcoin_balance() -> float:
|
|
return request_bitcoin_rpc('getbalance')
|
|
|
|
def send_bitcoin(address: str, amount: float, fee_rate: int) -> None:
|
|
# https://bitcoincore.org/en/doc/24.0.0/rpc/wallet/sendtoaddress/
|
|
params = [address, amount, null, null, true, true, null, null, null, fee_rate]
|
|
request_bitcoin_rpc('sendtoaddress', params)
|
|
|
|
def get_monero_balance() -> float:
|
|
params = {'account_index': 0}
|
|
|
|
return request_monero_rpc('get_balance', params)['balance']
|
|
|
|
def sweep_all_monero(address: str) -> None:
|
|
params = {
|
|
'account_index': 0,
|
|
'address': address,
|
|
}
|
|
|
|
request_monero_rpc('sweep_all', params)
|
|
|
|
def get_new_kraken_address(asset: 'XBT' | 'XMR') -> str:
|
|
payload = {
|
|
'asset': asset,
|
|
'method': 'Bitcoin' if asset == 'XBT' else 'Monero'
|
|
}
|
|
|
|
result = kraken_request('/0/private/DepositAddresses', payload)
|
|
adddress = next((address['address'] for address in result['result'] if address['new']), None)
|
|
|
|
return address
|
|
|
|
while 1:
|
|
try:
|
|
balance = get_bitcoin_balance()
|
|
|
|
if balance > 0:
|
|
fee_rate = get_bitcoin_fee_rate()
|
|
address = get_new_kraken_address('XBT')
|
|
amount = balance
|
|
send_bitcoin(address, amount, fee_rate)
|
|
print(get_time(), f'Sent {amount} BTC to {address}!')
|
|
else:
|
|
print(get_time(), 'No bitcoin balance to sweep.')
|
|
except Exception as e:
|
|
print(get_time(), 'Error autoforwarding Bitcoin:', e)
|
|
|
|
try:
|
|
balance = get_monero_balance()
|
|
|
|
if balance > 0:
|
|
address = get_new_kraken_address('XMR')
|
|
sweep_all_monero(address)
|
|
else:
|
|
print(get_time(), 'No bitcoin balance to sweep.')
|
|
except Exception as e:
|
|
print(get_time(), 'Error autoforwarding Monero:', e)
|
|
|
|
sleep(60 * 5)
|