mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-01-07 22:54:01 -05:00
Asyncio (#13)
* Change RPCClient to be Async with aiohttp * Change Service to be async with AsyncRPCClient * Change AccountService to async due to Service changes * Change WakuextService to async due to Service changes * Change WalletService to async due to Service changes * Change SignalClient to async * Favored composition over inheritance in StatusBackend, also make async needed changes * Make inject messages async
This commit is contained in:
@@ -1,19 +1,18 @@
|
||||
# Python Imports
|
||||
from typing import Dict
|
||||
|
||||
# Project Imports
|
||||
from src.rpc_client import RpcClient
|
||||
from src.service import Service
|
||||
from src.rpc_client import AsyncRpcClient
|
||||
from src.service import AsyncService
|
||||
|
||||
|
||||
class AccountService(Service):
|
||||
def __init__(self, client: RpcClient):
|
||||
super().__init__(client, "accounts")
|
||||
class AccountAsyncService(AsyncService):
|
||||
def __init__(self, rpc: AsyncRpcClient):
|
||||
super().__init__(rpc, "accounts")
|
||||
|
||||
def get_accounts(self) -> Dict:
|
||||
response = self.rpc_request("getAccounts")
|
||||
return response.json()
|
||||
async def get_accounts(self) -> dict:
|
||||
response_dict = await self.rpc.rpc_valid_request("getAccounts")
|
||||
return response_dict
|
||||
|
||||
def get_account_keypairs(self) -> Dict:
|
||||
response = self.rpc_request("getKeypairs")
|
||||
return response.json()
|
||||
async def get_account_keypairs(self) -> dict:
|
||||
response_dict = await self.rpc.rpc_valid_request("getKeypairs")
|
||||
return response_dict
|
||||
|
||||
@@ -1,39 +1,28 @@
|
||||
# Python Imports
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
|
||||
# Project Imports
|
||||
from src.status_backend import StatusBackend
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, num_messages: int):
|
||||
delay = 1 / msg_per_sec
|
||||
for message_count in range(num_messages):
|
||||
try:
|
||||
logger.info(f"Sending message {message_count}")
|
||||
await pod.wakuext_service.send_chat_message(chat_id, f"Message {message_count}")
|
||||
|
||||
# TODO times can get blocked by response time of the other node. Improve with true concurrency.
|
||||
def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, num_messages: int):
|
||||
def message_sender():
|
||||
message_count = 0
|
||||
while message_count < num_messages:
|
||||
try:
|
||||
logger.info(f"Sending message {message_count}")
|
||||
_ = pod.wakuext_service.send_chat_message(chat_id, f"Message {message_count}")
|
||||
if message_count == 0:
|
||||
logger.info("Successfully began sending messages")
|
||||
elif message_count % 10 == 0:
|
||||
logger.debug(f"Sent {message_count} messages")
|
||||
|
||||
if message_count == 0:
|
||||
logger.info("Successfully began sending messages")
|
||||
elif message_count % 10 == 0:
|
||||
logger.debug(f"Sent {message_count} messages")
|
||||
message_count += 1
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
time.sleep(1 / msg_per_sec)
|
||||
except AssertionError as e:
|
||||
logger.error(f"Error sending message: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
except AssertionError as e:
|
||||
logger.error(f"Error sending message: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
logger.info(f"Finished sending {num_messages} messages")
|
||||
|
||||
# Start the message sender in a background thread
|
||||
sender_thread = threading.Thread(target=message_sender, daemon=True)
|
||||
sender_thread.start()
|
||||
logger.info(f"Message injection started in background thread for {num_messages} messages")
|
||||
return sender_thread
|
||||
logger.info(f"Finished sending {num_messages} messages")
|
||||
|
||||
@@ -1,79 +1,82 @@
|
||||
# Python Imports
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
from typing import List
|
||||
from requests import Session, Response
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from json import JSONDecodeError
|
||||
from typing import List, Optional, Any
|
||||
from aiohttp import ClientSession, ClientTimeout, ClientError
|
||||
from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type
|
||||
|
||||
# Project Imports
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RpcClient:
|
||||
|
||||
def __init__(self, rpc_url: str, client: Session = requests.Session()):
|
||||
self.client = client
|
||||
class AsyncRpcClient:
|
||||
def __init__(self, rpc_url: str, session: Optional[ClientSession] = None):
|
||||
self.rpc_url = rpc_url
|
||||
self._owns_session = session is None
|
||||
self.session = session or ClientSession(timeout=ClientTimeout(total=10))
|
||||
self.request_counter = 0
|
||||
|
||||
def _check_decode_and_key_errors_in_response(self, response: Response, key: str) -> str:
|
||||
try:
|
||||
return response.json()[key]
|
||||
except json.JSONDecodeError:
|
||||
raise AssertionError(f"Invalid JSON in response: {response.content}")
|
||||
except KeyError:
|
||||
raise AssertionError(f"Key '{key}' not found in the JSON response: {response.content}")
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
def verify_is_valid_json_rpc_response(self, response: Response, _id: str = None) -> Response:
|
||||
assert response.status_code == 200, f"Got response {response.content}, status code {response.status_code}"
|
||||
assert response.content
|
||||
self._check_decode_and_key_errors_in_response(response, "result")
|
||||
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
|
||||
await self.close()
|
||||
|
||||
if _id:
|
||||
try:
|
||||
if _id != response.json()["id"]:
|
||||
raise AssertionError(f"got id: {response.json()['id']} instead of expected id: {_id}")
|
||||
except KeyError:
|
||||
raise AssertionError(f"no id in response {response.json()}")
|
||||
return response
|
||||
async def close(self):
|
||||
if self._owns_session:
|
||||
await self.session.close()
|
||||
|
||||
def verify_is_json_rpc_error(self, response: Response):
|
||||
assert response.status_code == 200
|
||||
assert response.content
|
||||
self._check_decode_and_key_errors_in_response(response, "error")
|
||||
def _check_key_in_json(self, data: dict, key: str) -> str:
|
||||
if key not in data:
|
||||
raise AssertionError(f"Key '{key}' missing in response: {data}")
|
||||
return data[key]
|
||||
|
||||
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.5), reraise=True)
|
||||
def rpc_request(self, method: str, params: List = None, request_id: str = None, url: str = None,
|
||||
enable_logging: bool = True) -> Response:
|
||||
if not request_id:
|
||||
def verify_is_valid_json_rpc_response(self, data: dict, request_id: Optional[str] = None):
|
||||
self._check_key_in_json(data, "result")
|
||||
if request_id is not None and str(data.get("id")) != str(request_id):
|
||||
raise AssertionError(f"Expected ID {request_id}, got {data.get('id')}")
|
||||
|
||||
def verify_is_json_rpc_error(self, data: dict):
|
||||
self._check_key_in_json(data, "error")
|
||||
|
||||
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.5), reraise=True, retry=retry_if_exception_type((
|
||||
ClientError, json.JSONDecodeError, AssertionError, asyncio.TimeoutError
|
||||
)))
|
||||
async def rpc_request(self, method: str, params: Optional[List] = None, request_id: Optional[str] = None,
|
||||
url: Optional[str] = None, enable_logging: bool = True) -> dict:
|
||||
if request_id is None:
|
||||
request_id = self.request_counter
|
||||
self.request_counter += 1
|
||||
if params is None:
|
||||
params = []
|
||||
url = url if url else self.rpc_url
|
||||
data = {"jsonrpc": "2.0", "method": method, "id": request_id}
|
||||
if params:
|
||||
data["params"] = params
|
||||
|
||||
url = url or self.rpc_url
|
||||
payload = {"jsonrpc": "2.0", "method": method, "id": request_id, "params": params or []}
|
||||
|
||||
if enable_logging:
|
||||
logging.debug(f"Sending POST request to url {url} with data: {json.dumps(data, sort_keys=True)}")
|
||||
response = self.client.post(url, json=data)
|
||||
try:
|
||||
resp_json = response.json()
|
||||
logger.debug(f"Sending async POST to {url} with data: {json.dumps(payload, sort_keys=True)}")
|
||||
|
||||
async with self.session.post(url, json=payload) as response:
|
||||
resp_text = await response.text()
|
||||
|
||||
if response.status != 200:
|
||||
raise AssertionError(f"Bad HTTP status: {response.status}, body: {resp_text}")
|
||||
|
||||
try:
|
||||
resp_json = await response.json()
|
||||
except json.JSONDecodeError:
|
||||
raise AssertionError(f"Invalid JSON in response: {resp_text}")
|
||||
|
||||
if enable_logging:
|
||||
logging.debug(f"Got response: {json.dumps(resp_json, sort_keys=True)}")
|
||||
if resp_json.get("error"):
|
||||
assert "JSON-RPC client is unavailable" != resp_json["error"]
|
||||
except JSONDecodeError:
|
||||
if enable_logging:
|
||||
logging.debug(f"Got response: {response.content}")
|
||||
return response
|
||||
logger.debug(f"Received response: {json.dumps(resp_json, sort_keys=True)}")
|
||||
|
||||
def rpc_valid_request(self, method: str, params: List = None, _id: str = None, url: str = None,
|
||||
skip_validation: bool = False, enable_logging: bool = True) -> Response:
|
||||
response = self.rpc_request(method, params, _id, url, enable_logging=enable_logging)
|
||||
if "error" in resp_json:
|
||||
raise AssertionError(f"JSON-RPC Error: {resp_json['error']}")
|
||||
|
||||
if not skip_validation:
|
||||
self.verify_is_valid_json_rpc_response(response, _id)
|
||||
return resp_json
|
||||
|
||||
return response
|
||||
async def rpc_valid_request(self, method: str, params: Optional[List] = None, request_id: Optional[str] = None,
|
||||
url: Optional[str] = None, enable_logging: bool = True) -> dict:
|
||||
resp_json = await self.rpc_request(method, params, request_id, url, enable_logging=enable_logging)
|
||||
self.verify_is_valid_json_rpc_response(resp_json, request_id)
|
||||
return resp_json
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
# Python Imports
|
||||
from typing import List
|
||||
from requests import Response
|
||||
from typing import Optional, Any
|
||||
|
||||
# Project Imports
|
||||
from src.rpc_client import RpcClient
|
||||
from src.rpc_client import AsyncRpcClient
|
||||
|
||||
|
||||
class Service:
|
||||
def __init__(self, client: RpcClient, name: str):
|
||||
assert name != ""
|
||||
self.rpc_client = client
|
||||
class AsyncService:
|
||||
def __init__(self, async_rpc_client: AsyncRpcClient, name: str):
|
||||
self.rpc = async_rpc_client
|
||||
self.name = name
|
||||
|
||||
def rpc_request(self, method: str, params: List = None, skip_validation: bool = False, enable_logging: bool = True) -> Response:
|
||||
async def rpc_request(self, method: str, params: Optional[list] = None, enable_logging: bool = True) -> Any:
|
||||
full_method_name = f"{self.name}_{method}"
|
||||
return self.rpc_client.rpc_valid_request(full_method_name, params, skip_validation=skip_validation, enable_logging=enable_logging)
|
||||
return await self.rpc.rpc_valid_request(full_method_name, params or [], enable_logging=enable_logging)
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
# Python Imports
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import websocket
|
||||
import os
|
||||
from typing import List, Any, Dict, Optional
|
||||
from enum import Enum
|
||||
from typing import Optional, Callable
|
||||
from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from websocket import WebSocketApp
|
||||
|
||||
from src.enums import SignalType
|
||||
# Project Imports
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LOG_SIGNALS_TO_FILE = False
|
||||
SIGNALS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
@@ -24,14 +26,17 @@ class SignalType(Enum):
|
||||
NODE_LOGOUT = "node.stopped"
|
||||
|
||||
|
||||
|
||||
class SignalClient:
|
||||
def __init__(self, ws_url: str, await_signals: List[str]):
|
||||
class AsyncSignalClient:
|
||||
def __init__(self, ws_url: str, await_signals: list[str]):
|
||||
self.url = f"{ws_url}/signals"
|
||||
|
||||
self.await_signals = await_signals
|
||||
# TODO: Improve delta_count explanation
|
||||
self.received_signals = {
|
||||
self.ws: Optional[ClientWebSocketResponse] = None
|
||||
self.session: Optional[ClientSession] = None
|
||||
self.signal_file_path = None
|
||||
self.signal_lock = asyncio.Lock()
|
||||
|
||||
self.received_signals: dict[str, dict] = {
|
||||
# For each signal type, store:
|
||||
# - list of received signals
|
||||
# - expected received event delta count (resets to 1 after each wait_for_event call)
|
||||
@@ -45,102 +50,96 @@ class SignalClient:
|
||||
}
|
||||
for signal in self.await_signals
|
||||
}
|
||||
if LOG_SIGNALS_TO_FILE:
|
||||
|
||||
if LOG_SIGNALS_TO_FILE: # Not being used currently
|
||||
Path(SIGNALS_DIR).mkdir(parents=True, exist_ok=True)
|
||||
self.signal_file_path = os.path.join(
|
||||
SIGNALS_DIR,
|
||||
f"signal_{ws_url.split(':')[-1]}_{datetime.now().strftime('%H%M%S')}.log",
|
||||
)
|
||||
Path(SIGNALS_DIR).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def on_message(self, ws: WebSocketApp, signal: str):
|
||||
async def __aenter__(self):
|
||||
self.session = ClientSession()
|
||||
self.ws = await self.session.ws_connect(self.url)
|
||||
asyncio.create_task(self._listen())
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.ws:
|
||||
await self.ws.close()
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
|
||||
async def _listen(self):
|
||||
async for msg in self.ws:
|
||||
if msg.type == WSMsgType.TEXT:
|
||||
await self.on_message(msg.data)
|
||||
elif msg.type == WSMsgType.ERROR:
|
||||
logger.error(f"WebSocket error: {self.ws.exception()}")
|
||||
|
||||
async def on_message(self, signal: str):
|
||||
signal_data = json.loads(signal)
|
||||
if LOG_SIGNALS_TO_FILE:
|
||||
self.write_signal_to_file(signal_data)
|
||||
pass # TODO
|
||||
|
||||
signal_type = signal_data.get("type")
|
||||
if signal_type in self.await_signals:
|
||||
accept_fn = self.received_signals[signal_type]["accept_fn"]
|
||||
if not accept_fn or accept_fn(signal_data):
|
||||
self.received_signals[signal_type]["received"].append(signal_data)
|
||||
|
||||
def check_signal_type(self, signal_type: str):
|
||||
if signal_type not in self.await_signals:
|
||||
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
|
||||
async with self.signal_lock:
|
||||
accept_fn = self.received_signals[signal_type]["accept_fn"]
|
||||
if not accept_fn or accept_fn(signal_data):
|
||||
self.received_signals[signal_type]["received"].append(signal_data)
|
||||
|
||||
# Used to set up how many instances of a signal to wait for, before triggering the actions
|
||||
# that cause them to be emitted.
|
||||
def prepare_wait_for_signal(self, signal_type: str, delta_count: int, accept_fn=None):
|
||||
self.check_signal_type(signal_type)
|
||||
async def prepare_wait_for_signal(self, signal_type: str, delta_count: int, accept_fn: Optional[Callable] = None):
|
||||
if signal_type not in self.await_signals:
|
||||
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
|
||||
async with self.signal_lock:
|
||||
self.received_signals[signal_type]["delta_count"] = delta_count
|
||||
self.received_signals[signal_type]["expected_count"] = (
|
||||
len(self.received_signals[signal_type]["received"]) + delta_count
|
||||
)
|
||||
self.received_signals[signal_type]["accept_fn"] = accept_fn
|
||||
|
||||
if delta_count < 1:
|
||||
raise ValueError("delta_count must be greater than 0")
|
||||
self.received_signals[signal_type]["delta_count"] = delta_count
|
||||
self.received_signals[signal_type]["expected_count"] = len(self.received_signals[signal_type]["received"]) + delta_count
|
||||
self.received_signals[signal_type]["accept_fn"] = accept_fn
|
||||
async def wait_for_signal(self, signal_type: str, timeout: int = 20) -> dict | list[dict]:
|
||||
if signal_type not in self.await_signals:
|
||||
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
|
||||
|
||||
def wait_for_signal(self, signal_type: str, timeout: int = 20) -> Dict | List[Dict]:
|
||||
self.check_signal_type(signal_type)
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
while True:
|
||||
async with self.signal_lock:
|
||||
received = self.received_signals[signal_type]["received"]
|
||||
expected = self.received_signals[signal_type]["expected_count"]
|
||||
delta_count = self.received_signals[signal_type]["delta_count"]
|
||||
|
||||
start_time = time.time()
|
||||
received_signals = self.received_signals.get(signal_type)
|
||||
while (not received_signals) or len(received_signals["received"]) < received_signals["expected_count"]:
|
||||
if time.time() - start_time >= timeout:
|
||||
raise TimeoutError(f"Signal {signal_type} is not received in {timeout} seconds")
|
||||
time.sleep(0.2)
|
||||
logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds")
|
||||
# TODO: Improve delta_count explanation
|
||||
delta_count = received_signals["delta_count"]
|
||||
self.prepare_wait_for_signal(signal_type, 1)
|
||||
if delta_count == 1:
|
||||
return self.received_signals[signal_type]["received"][-1]
|
||||
return self.received_signals[signal_type]["received"][-delta_count:]
|
||||
if len(received) >= expected:
|
||||
await self.prepare_wait_for_signal(signal_type, 1)
|
||||
return received[-1] if delta_count == 1 else received[-delta_count:]
|
||||
|
||||
def wait_for_login(self) -> Dict:
|
||||
signal = self.wait_for_signal(SignalType.NODE_LOGIN.value)
|
||||
if asyncio.get_event_loop().time() - start_time >= timeout:
|
||||
raise TimeoutError(f"Signal {signal_type} not received in {timeout} seconds")
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
async def wait_for_login(self) -> dict:
|
||||
signal = await self.wait_for_signal(SignalType.NODE_LOGIN.value)
|
||||
if "error" in signal["event"]:
|
||||
error_details = signal["event"]["error"]
|
||||
assert not error_details, f"Unexpected error during login: {error_details}"
|
||||
self.node_login_event = signal
|
||||
return signal
|
||||
|
||||
def wait_for_logout(self) -> Dict:
|
||||
signal = self.wait_for_signal(SignalType.NODE_LOGOUT.value)
|
||||
return signal
|
||||
async def wait_for_logout(self) -> dict:
|
||||
return await self.wait_for_signal(SignalType.NODE_LOGOUT.value)
|
||||
|
||||
def find_signal_containing_string(self, signal_type: str, event_string: str, timeout=20) -> Optional[Dict]:
|
||||
start_time = time.time()
|
||||
async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout=20) -> Optional[dict]:
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
while True:
|
||||
if time.time() - start_time >= timeout:
|
||||
raise TimeoutError(f"Signal {signal_type} containing {event_string} is not received in {timeout} seconds")
|
||||
if not self.received_signals.get(signal_type):
|
||||
time.sleep(0.2)
|
||||
continue
|
||||
for event in self.received_signals[signal_type]["received"]:
|
||||
if event_string in json.dumps(event):
|
||||
logging.info(f"Signal {signal_type} containing {event_string} is received in {round(time.time() - start_time)} seconds")
|
||||
return event
|
||||
time.sleep(0.2)
|
||||
async with self.signal_lock:
|
||||
for event in self.received_signals.get(signal_type, {}).get("received", []):
|
||||
if event_string in json.dumps(event):
|
||||
logger.info(f"Found {signal_type} containing '{event_string}'")
|
||||
return event
|
||||
|
||||
def _on_error(self, ws: WebSocketApp, error: Any):
|
||||
logging.error(f"Error: {error}")
|
||||
|
||||
def _on_close(self, ws: WebSocketApp, close_status_code: Any, close_msg: Any):
|
||||
logging.info(f"Connection closed: {close_status_code}, {close_msg}")
|
||||
|
||||
def _on_open(self, ws: WebSocketApp):
|
||||
logging.info("Connection opened")
|
||||
|
||||
def _connect(self):
|
||||
ws = websocket.WebSocketApp(
|
||||
self.url,
|
||||
on_message=self.on_message,
|
||||
on_error=self._on_error,
|
||||
on_close=self._on_close,
|
||||
)
|
||||
ws.on_open = self._on_open
|
||||
ws.run_forever()
|
||||
|
||||
def write_signal_to_file(self, signal_data: Dict):
|
||||
with open(self.signal_file_path, "a+") as file:
|
||||
json.dump(signal_data, file)
|
||||
file.write("\n")
|
||||
if asyncio.get_event_loop().time() - start_time >= timeout:
|
||||
raise TimeoutError(f"Signal {signal_type} containing '{event_string}' not received in {timeout} seconds")
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
@@ -1,75 +1,76 @@
|
||||
# Python Imports
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import requests
|
||||
from typing import List, Dict
|
||||
from requests import Response
|
||||
from aiohttp import ClientSession, ClientTimeout, ClientResponse
|
||||
|
||||
# Project Imports
|
||||
from src.account_service import AccountService
|
||||
from src.rpc_client import RpcClient
|
||||
from src.signal_client import SignalClient
|
||||
from src.wakuext_service import WakuextService
|
||||
from src.wallet_service import WalletService
|
||||
from src.account_service import AccountAsyncService
|
||||
from src.rpc_client import AsyncRpcClient
|
||||
from src.signal_client import AsyncSignalClient
|
||||
from src.wakuext_service import WakuextAsyncService
|
||||
from src.wallet_service import WalletAsyncService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class StatusBackend(RpcClient, SignalClient):
|
||||
|
||||
class StatusBackend:
|
||||
def __init__(self, url: str, await_signals: List[str] = None):
|
||||
self.base_url = url
|
||||
self.api_url = f"{url}/statusgo"
|
||||
self.ws_url = f"{url}".replace("http", "ws")
|
||||
self.ws_url = url.replace("http", "ws")
|
||||
self.rpc_url = f"{url}/statusgo/CallRPC"
|
||||
self.public_key = ""
|
||||
|
||||
RpcClient.__init__(self, self.rpc_url)
|
||||
SignalClient.__init__(self, self.ws_url, await_signals)
|
||||
self.rpc = AsyncRpcClient(self.rpc_url)
|
||||
self.signal = AsyncSignalClient(self.ws_url, await_signals)
|
||||
self.session = ClientSession(timeout=ClientTimeout(total=10))
|
||||
|
||||
websocket_thread = threading.Thread(target=self._connect)
|
||||
websocket_thread.daemon = True
|
||||
websocket_thread.start()
|
||||
self.wakuext_service = WakuextAsyncService(self.rpc)
|
||||
self.wallet_service = WalletAsyncService(self.rpc)
|
||||
self.accounts_service = AccountAsyncService(self.rpc)
|
||||
|
||||
self.wakuext_service = WakuextService(self)
|
||||
self.wallet_service = WalletService(self)
|
||||
self.accounts_service = AccountService(self)
|
||||
async def __aenter__(self):
|
||||
await self.rpc.__aenter__()
|
||||
await self.signal.__aenter__()
|
||||
return self
|
||||
|
||||
def api_request(self, method: str, data: Dict, url: str = None) -> Response:
|
||||
url = url if url else self.api_url
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.signal.__aexit__(exc_type, exc_val, exc_tb)
|
||||
await self.rpc.__aexit__(exc_type, exc_val, exc_tb)
|
||||
await self.session.close()
|
||||
|
||||
async def call_rpc(self, method: str, params: List = None):
|
||||
return await self.rpc.rpc_valid_request(method, params or [])
|
||||
|
||||
async def api_request(self, method: str, data: Dict, url: str = None) -> ClientResponse:
|
||||
url = url or self.api_url
|
||||
url = f"{url}/{method}"
|
||||
logger.debug(f"Sending async POST request to {url} with data: {json.dumps(data, sort_keys=True)}")
|
||||
async with self.session.post(url, json=data) as response:
|
||||
logger.debug(f"Got response: {await response.text()}")
|
||||
return response
|
||||
|
||||
logger.debug(f"Sending POST request to url {url} with data: {json.dumps(data, sort_keys=True)}")
|
||||
response = requests.post(url, json=data)
|
||||
async def verify_is_valid_api_response(self, response: ClientResponse):
|
||||
if response.status != 200:
|
||||
raise AssertionError(f"Bad HTTP status: {response.status}")
|
||||
try:
|
||||
json_data = await response.json()
|
||||
if "error" in json_data:
|
||||
raise AssertionError(f"API error: {json_data['error']}")
|
||||
except Exception as e:
|
||||
raise AssertionError(f"Invalid JSON response: {e}")
|
||||
|
||||
logger.debug(f"Got response: {response.content}")
|
||||
async def api_valid_request(self, method: str, data: Dict, url: str = None) -> ClientResponse:
|
||||
response = await self.api_request(method, data, url)
|
||||
await self.verify_is_valid_api_response(response)
|
||||
return response
|
||||
|
||||
def verify_is_valid_api_response(self, response: Response):
|
||||
assert response.status_code == 200, f"Got response {response.content}, status code {response.status_code}"
|
||||
assert response.content
|
||||
logger.debug(f"Got response: {response.content}")
|
||||
async def start_status_backend(self) -> ClientResponse:
|
||||
try:
|
||||
error = response.json()["error"]
|
||||
assert not error, f"Error: {error}"
|
||||
except json.JSONDecodeError:
|
||||
raise AssertionError(f"Invalid JSON in response: {response.content}")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def api_valid_request(self, method: str, data: Dict, url: str = None) -> Response:
|
||||
response = self.api_request(method, data, url)
|
||||
self.verify_is_valid_api_response(response)
|
||||
return response
|
||||
|
||||
def start_status_backend(self) -> Response:
|
||||
logger.debug("Automatically logging out before InitializeApplication")
|
||||
try:
|
||||
self.logout()
|
||||
logger.debug("successfully logged out")
|
||||
await self.logout()
|
||||
logger.debug("Successfully logged out")
|
||||
except AssertionError:
|
||||
logger.debug("failed to log out")
|
||||
pass
|
||||
logger.debug("Failed to log out")
|
||||
|
||||
method = "InitializeApplication"
|
||||
data = {
|
||||
@@ -80,31 +81,28 @@ class StatusBackend(RpcClient, SignalClient):
|
||||
"wakuFleetsConfigFilePath": "/static/configs/config.json"
|
||||
# TODO check wakuFleetsConfigFilePath?
|
||||
}
|
||||
|
||||
return self.api_valid_request(method, data)
|
||||
return await self.api_valid_request(method, data)
|
||||
|
||||
def _set_networks(self, data: Dict):
|
||||
anvil_network = {
|
||||
"ChainID": 31337,
|
||||
"ChainName": "Anvil",
|
||||
"Enabled": True,
|
||||
"IsTest": False,
|
||||
"Layer": 1,
|
||||
"NativeCurrencyDecimals": 18,
|
||||
"NativeCurrencyName": "Ether",
|
||||
"NativeCurrencySymbol": "ETH",
|
||||
"RpcProviders": [
|
||||
{
|
||||
"authType": "no-auth",
|
||||
"chainId": 31337,
|
||||
"enableRpsLimiter": False,
|
||||
"enabled": True,
|
||||
"name": "Anvil Direct",
|
||||
"type": "embedded-direct",
|
||||
"url": "http://127.0.0.1:8545"
|
||||
}
|
||||
],
|
||||
"ShortName": "eth"
|
||||
"ChainID": 31337,
|
||||
"ChainName": "Anvil",
|
||||
"Enabled": True,
|
||||
"IsTest": False,
|
||||
"Layer": 1,
|
||||
"NativeCurrencyDecimals": 18,
|
||||
"NativeCurrencyName": "Ether",
|
||||
"NativeCurrencySymbol": "ETH",
|
||||
"RpcProviders": [{
|
||||
"authType": "no-auth",
|
||||
"chainId": 31337,
|
||||
"enableRpsLimiter": False,
|
||||
"enabled": True,
|
||||
"name": "Anvil Direct",
|
||||
"type": "embedded-direct",
|
||||
"url": "http://127.0.0.1:8545"
|
||||
}],
|
||||
"ShortName": "eth"
|
||||
}
|
||||
data["testNetworksEnabled"] = False
|
||||
data["networkId"] = 31337
|
||||
@@ -128,27 +126,22 @@ class StatusBackend(RpcClient, SignalClient):
|
||||
self._set_networks(data)
|
||||
return data
|
||||
|
||||
def create_account_and_login(self, **kwargs) -> Response:
|
||||
method = "CreateAccountAndLogin"
|
||||
data = self._create_account_request(**kwargs)
|
||||
return self.api_valid_request(method, data)
|
||||
async def create_account_and_login(self, **kwargs) -> ClientResponse:
|
||||
return await self.api_valid_request("CreateAccountAndLogin", self._create_account_request(**kwargs))
|
||||
|
||||
def login(self, key_uid: str) -> Response:
|
||||
method = "LoginAccount"
|
||||
data = {
|
||||
async def login(self, key_uid: str) -> ClientResponse:
|
||||
return await self.api_valid_request("LoginAccount", {
|
||||
"password": "Strong12345",
|
||||
"keyUid": key_uid,
|
||||
"kdfIterations": 256000,
|
||||
}
|
||||
return self.api_valid_request(method, data)
|
||||
})
|
||||
|
||||
def logout(self) -> Response:
|
||||
method = "Logout"
|
||||
return self.api_valid_request(method, {})
|
||||
async def logout(self) -> ClientResponse:
|
||||
return await self.api_valid_request("Logout", {})
|
||||
|
||||
def set_public_key(self):
|
||||
# Only make sense to call this method if the lodes are logged in, otherwise public_key will be set to None.
|
||||
self.public_key = self.node_login_event.get("event", {}).get("settings", {}).get("public-key")
|
||||
self.public_key = self.signal.node_login_event.get("event", {}).get("settings", {}).get("public-key")
|
||||
|
||||
def find_key_uid(self) -> str:
|
||||
return self.node_login_event.get("event", {}).get("account", {}).get("key-uid")
|
||||
return self.signal.node_login_event.get("event", {}).get("account", {}).get("key-uid")
|
||||
|
||||
@@ -2,51 +2,51 @@
|
||||
from typing import Dict
|
||||
|
||||
# Project Imports
|
||||
from src.rpc_client import RpcClient
|
||||
from src.service import Service
|
||||
from src.rpc_client import AsyncRpcClient
|
||||
from src.service import AsyncService
|
||||
|
||||
|
||||
class WakuextService(Service):
|
||||
def __init__(self, client: RpcClient):
|
||||
super().__init__(client, "wakuext")
|
||||
class WakuextAsyncService(AsyncService):
|
||||
def __init__(self, async_rpc_client: AsyncRpcClient):
|
||||
super().__init__(async_rpc_client, "wakuext")
|
||||
|
||||
def start_messenger(self):
|
||||
response = self.rpc_request("startMessenger")
|
||||
json_response = response.json()
|
||||
async def start_messenger(self):
|
||||
response = await self.rpc_request("startMessenger")
|
||||
json_response = await response.json()
|
||||
|
||||
if "error" in json_response:
|
||||
assert json_response["error"]["code"] == -32000
|
||||
assert json_response["error"]["message"] == "messenger already started"
|
||||
return
|
||||
|
||||
def create_community(self, name: str, color="#ffffff", membership:int = 3) -> Dict:
|
||||
async def create_community(self, name: str, color="#ffffff", membership: int = 3) -> Dict:
|
||||
# TODO check what is membership = 3
|
||||
params = [{"membership": membership, "name": name, "color": color, "description": name}]
|
||||
response = self.rpc_request("createCommunity", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("createCommunity", params)
|
||||
return await response.json()
|
||||
|
||||
def fetch_community(self, community_key: str) -> Dict:
|
||||
async def fetch_community(self, community_key: str) -> Dict:
|
||||
params = [{"communityKey": community_key, "waitForResponse": True, "tryDatabase": True}]
|
||||
response = self.rpc_request("fetchCommunity", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("fetchCommunity", params)
|
||||
return await response.json()
|
||||
|
||||
def request_to_join_community(self, community_id: str, address: str = "fakeaddress") -> Dict:
|
||||
async def request_to_join_community(self, community_id: str, address: str = "fakeaddress") -> Dict:
|
||||
params = [{"communityId": community_id, "addressesToReveal": [address], "airdropAddress": address}]
|
||||
response = self.rpc_request("requestToJoinCommunity", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("requestToJoinCommunity", params)
|
||||
return await response.json()
|
||||
|
||||
def accept_request_to_join_community(self, request_to_join_id: str) -> Dict:
|
||||
async def accept_request_to_join_community(self, request_to_join_id: str) -> Dict:
|
||||
params = [{"id": request_to_join_id}]
|
||||
response = self.rpc_request("acceptRequestToJoinCommunity", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("acceptRequestToJoinCommunity", params)
|
||||
return await response.json()
|
||||
|
||||
def send_chat_message(self, chat_id: str, message: str, content_type:int = 1) -> Dict:
|
||||
async def send_chat_message(self, chat_id: str, message: str, content_type: int = 1) -> Dict:
|
||||
# TODO content type can always be 1? (plain TEXT), does it need to be community type for communities?
|
||||
params = [{"chatId": chat_id, "text": message, "contentType": content_type}]
|
||||
response = self.rpc_request("sendChatMessage", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("sendChatMessage", params)
|
||||
return await response.json()
|
||||
|
||||
def send_contact_request(self, contact_id: str, message: str) -> Dict:
|
||||
async def send_contact_request(self, contact_id: str, message: str) -> Dict:
|
||||
params = [{"id": contact_id, "message": message}]
|
||||
response = self.rpc_request("sendContactRequest", params)
|
||||
return response.json()
|
||||
response = await self.rpc_request("sendContactRequest", params)
|
||||
return await response.json()
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
# Python Imports
|
||||
from requests import Response
|
||||
from typing import Any
|
||||
|
||||
# Project Imports
|
||||
from src.rpc_client import RpcClient
|
||||
from src.service import Service
|
||||
from src.rpc_client import AsyncRpcClient
|
||||
from src.service import AsyncService
|
||||
|
||||
|
||||
class WalletService(Service):
|
||||
def __init__(self, client: RpcClient):
|
||||
super().__init__(client, "wallet")
|
||||
class WalletAsyncService(AsyncService):
|
||||
def __init__(self, async_rpc_client: AsyncRpcClient):
|
||||
super().__init__(async_rpc_client, "wallet")
|
||||
|
||||
def start_wallet(self) -> Response:
|
||||
return self.rpc_request("startWallet")
|
||||
async def start_wallet(self) -> Any:
|
||||
response = await self.rpc_request("startWallet")
|
||||
return await response.json()
|
||||
|
||||
Reference in New Issue
Block a user