Services (#9)

* Add service class

* Add account service class

* Add wallet service class

* Add wakuext service class

* Add missing typing

* Add more missing typing
This commit is contained in:
Alberto Soutullo
2025-07-01 00:01:26 +02:00
committed by GitHub
parent f43ae0630f
commit a82a2a743b
4 changed files with 102 additions and 0 deletions

19
src/account_service.py Normal file
View File

@@ -0,0 +1,19 @@
# Python Imports
from typing import Dict
# Project Imports
from src.rpc_client import RpcClient
from src.service import Service
class AccountService(Service):
def __init__(self, client: RpcClient):
super().__init__(client, "accounts")
def get_accounts(self) -> Dict:
response = self.rpc_request("getAccounts")
return response.json()
def get_account_keypairs(self) -> Dict:
response = self.rpc_request("getKeypairs")
return response.json()

17
src/service.py Normal file
View File

@@ -0,0 +1,17 @@
# Python Imports
from typing import List
from requests import Response
# Project Imports
from src.rpc_client import RpcClient
class Service:
def __init__(self, client: RpcClient, name: str):
assert name != ""
self.rpc_client = client
self.name = name
def rpc_request(self, method: str, params: List = None, skip_validation: bool = False, enable_logging: bool = True) -> Response:
full_method_name = f"{self.name}_{method}"
return self.rpc_client.rpc_valid_request(full_method_name, params, skip_validation=skip_validation, enable_logging=enable_logging)

52
src/wakuext_service.py Normal file
View File

@@ -0,0 +1,52 @@
# Python Imports
from typing import Dict
# Project Imports
from src.rpc_client import RpcClient
from src.service import Service
class WakuextService(Service):
def __init__(self, client: RpcClient):
super().__init__(client, "wakuext")
def start_messenger(self):
response = self.rpc_request("startMessenger")
json_response = response.json()
if "error" in json_response:
assert json_response["error"]["code"] == -32000
assert json_response["error"]["message"] == "messenger already started"
return
def create_community(self, name: str, color="#ffffff", membership:int = 3) -> Dict:
# TODO check what is membership = 3
params = [{"membership": membership, "name": name, "color": color, "description": name}]
response = self.rpc_request("createCommunity", params)
return response.json()
def fetch_community(self, community_key: str) -> Dict:
params = [{"communityKey": community_key, "waitForResponse": True, "tryDatabase": True}]
response = self.rpc_request("fetchCommunity", params)
return response.json()
def request_to_join_community(self, community_id: str, address: str = "fakeaddress") -> Dict:
params = [{"communityId": community_id, "addressesToReveal": [address], "airdropAddress": address}]
response = self.rpc_request("requestToJoinCommunity", params)
return response.json()
def accept_request_to_join_community(self, request_to_join_id: str) -> Dict:
params = [{"id": request_to_join_id}]
response = self.rpc_request("acceptRequestToJoinCommunity", params)
return response.json()
def send_chat_message(self, chat_id: str, message: str, content_type:int = 1) -> Dict:
# TODO content type can always be 1? (plain TEXT), does it need to be community type for communities?
params = [{"chatId": chat_id, "text": message, "contentType": content_type}]
response = self.rpc_request("sendChatMessage", params)
return response.json()
def send_contact_request(self, contact_id: str, message: str) -> Dict:
params = [{"id": contact_id, "message": message}]
response = self.rpc_request("sendContactRequest", params)
return response.json()

14
src/wallet_service.py Normal file
View File

@@ -0,0 +1,14 @@
# Python Imports
from requests import Response
# Project Imports
from src.rpc_client import RpcClient
from src.service import Service
class WalletService(Service):
def __init__(self, client: RpcClient):
super().__init__(client, "wallet")
def start_wallet(self) -> Response:
return self.rpc_request("startWallet")