mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-01-08 06:04:02 -05:00
Second benchmark additions (#32)
* Change controlbox pull to Always * Update status-init to fix discovery issues. * Improve logging in inject_messages_group_chat * Save messages in a list with timestamps * Add method for cleanup signals on logout (probably to avoid issues when loging in and searching for signals) * Save last loging time
This commit is contained in:
@@ -52,11 +52,12 @@ spec:
|
|||||||
- status-service-bootstrap.status-go-test.svc.cluster.local
|
- status-service-bootstrap.status-go-test.svc.cluster.local
|
||||||
- status-backend-light.status-go-test.svc.cluster.local
|
- status-backend-light.status-go-test.svc.cluster.local
|
||||||
- status-backend-relay.status-go-test.svc.cluster.local
|
- status-backend-relay.status-go-test.svc.cluster.local
|
||||||
|
- status-backend-relay-2.status-go-test.svc.cluster.local
|
||||||
serviceAccountName: controlbox-sa
|
serviceAccountName: controlbox-sa
|
||||||
containers:
|
containers:
|
||||||
- name: controlbox
|
- name: controlbox
|
||||||
image: soutullostatus/controlbox-status:v1.0.0
|
image: soutullostatus/controlbox-status:v1.0.0
|
||||||
imagePullPolicy: IfNotPresent
|
imagePullPolicy: Always
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 22
|
- containerPort: 22
|
||||||
command: ["/bin/bash", "-c"]
|
command: ["/bin/bash", "-c"]
|
||||||
|
|||||||
@@ -5,5 +5,9 @@ It will grab the env variables of BOOT_ENRS and STORE_ENRS (bootstrap and store
|
|||||||
|
|
||||||
## Changelog
|
## Changelog
|
||||||
|
|
||||||
|
- v1.1.0
|
||||||
|
- Working with status-backend `b22c58bd3bdd4a387dc09dccba1d866d5ae09adf`
|
||||||
|
- Moved bootstrap_enrs to only "discV5BootstrapNodes" so light clients are not disconnected.
|
||||||
|
|
||||||
- v1.0.0
|
- v1.0.0
|
||||||
- Working with status-backend `b22c58bd3bdd4a387dc09dccba1d866d5ae09adf`
|
- Working with status-backend `b22c58bd3bdd4a387dc09dccba1d866d5ae09adf`
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ def update_config():
|
|||||||
|
|
||||||
config = {
|
config = {
|
||||||
"dst.dev": {
|
"dst.dev": {
|
||||||
"wakuNodes": bootstrap_enrs,
|
"wakuNodes": bootstrap_enrs, # --staticnode
|
||||||
"discV5BootstrapNodes": [],
|
"discV5BootstrapNodes": bootstrap_enrs, # --discv5-bootstrap-node
|
||||||
"clusterId": int(os.getenv("CLUSTER_ID", 16)),
|
"clusterId": int(os.getenv("CLUSTER_ID", 16)),
|
||||||
"storeNodes": []
|
"storeNodes": []
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ async def inject_messages_group_chat(pod: StatusBackend, delay_between_message:
|
|||||||
await asyncio.sleep(delay_between_message)
|
await asyncio.sleep(delay_between_message)
|
||||||
|
|
||||||
except AssertionError as e:
|
except AssertionError as e:
|
||||||
logger.error(f"Error sending message: {e}")
|
logger.error(f"Error sending message from pod {pod.base_url}: {e}")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
logger.info(f"Finished sending {num_messages} messages")
|
logger.info(f"Finished sending {num_messages} messages")
|
||||||
@@ -20,11 +20,15 @@ SIGNALS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|||||||
|
|
||||||
|
|
||||||
class BufferedQueue:
|
class BufferedQueue:
|
||||||
def __init__(self, max_size: int = 100):
|
def __init__(self, max_size: int = 200):
|
||||||
self.queue = asyncio.Queue()
|
self.queue = asyncio.Queue()
|
||||||
self.buffer = deque(maxlen=max_size)
|
self.buffer = deque(maxlen=max_size)
|
||||||
|
self.messages = []
|
||||||
|
|
||||||
async def put(self, item):
|
async def put(self, item):
|
||||||
|
if item.get("event") is not None and item.get("event").get("messages"):
|
||||||
|
for message in item["event"]["messages"]:
|
||||||
|
self.messages.append((item["timestamp"], message["text"]))
|
||||||
self.buffer.append(item)
|
self.buffer.append(item)
|
||||||
await self.queue.put(item)
|
await self.queue.put(item)
|
||||||
|
|
||||||
@@ -80,6 +84,22 @@ class AsyncSignalClient:
|
|||||||
elif msg.type == WSMsgType.ERROR:
|
elif msg.type == WSMsgType.ERROR:
|
||||||
logger.error(f"WebSocket error: {self.ws.exception()}")
|
logger.error(f"WebSocket error: {self.ws.exception()}")
|
||||||
|
|
||||||
|
def cleanup_signal_queues(self):
|
||||||
|
queue_names = [
|
||||||
|
"messages.new", "message.delivered", "node.ready",
|
||||||
|
"node.started", "node.stopped"
|
||||||
|
] # All but login, so we can find key uid
|
||||||
|
|
||||||
|
if self.signal_queues:
|
||||||
|
for queue_name in queue_names:
|
||||||
|
queue = self.signal_queues.get(queue_name)
|
||||||
|
if queue and isinstance(queue, BufferedQueue):
|
||||||
|
queue.buffer.clear()
|
||||||
|
queue.messages.clear()
|
||||||
|
logger.debug(f"Cleaned queue: {queue_name}")
|
||||||
|
|
||||||
|
logger.debug("Specified signal queues have been cleaned up.")
|
||||||
|
|
||||||
async def on_message(self, signal: str):
|
async def on_message(self, signal: str):
|
||||||
signal_data = json.loads(signal)
|
signal_data = json.loads(signal)
|
||||||
logger.debug(f"Received WebSocket message: {signal_data}")
|
logger.debug(f"Received WebSocket message: {signal_data}")
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
# Python Imports
|
# Python Imports
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
from aiohttp import ClientSession, ClientTimeout
|
from aiohttp import ClientSession, ClientTimeout
|
||||||
|
|
||||||
@@ -153,12 +154,16 @@ class StatusBackend:
|
|||||||
})
|
})
|
||||||
signal = await self.signal.wait_for_login()
|
signal = await self.signal.wait_for_login()
|
||||||
self.set_public_key(signal)
|
self.set_public_key(signal)
|
||||||
|
self.last_login = int(time.time())
|
||||||
return response
|
return response
|
||||||
|
|
||||||
async def logout(self) -> dict:
|
async def logout(self, clean_signals = False) -> dict:
|
||||||
json_response = await self.api_valid_request("Logout", {})
|
json_response = await self.api_valid_request("Logout", {})
|
||||||
_ = await self.signal.wait_for_logout()
|
_ = await self.signal.wait_for_logout()
|
||||||
logger.debug("Successfully logged out")
|
logger.debug("Successfully logged out")
|
||||||
|
if clean_signals:
|
||||||
|
self.signal.cleanup_signal_queues()
|
||||||
|
|
||||||
return json_response
|
return json_response
|
||||||
|
|
||||||
def set_public_key(self, signal_data: dict):
|
def set_public_key(self, signal_data: dict):
|
||||||
|
|||||||
Reference in New Issue
Block a user