diff --git a/controlbox/controlbox.yaml b/controlbox/controlbox.yaml index 4de675e..aed3d26 100644 --- a/controlbox/controlbox.yaml +++ b/controlbox/controlbox.yaml @@ -52,11 +52,12 @@ spec: - status-service-bootstrap.status-go-test.svc.cluster.local - status-backend-light.status-go-test.svc.cluster.local - status-backend-relay.status-go-test.svc.cluster.local + - status-backend-relay-2.status-go-test.svc.cluster.local serviceAccountName: controlbox-sa containers: - name: controlbox image: soutullostatus/controlbox-status:v1.0.0 - imagePullPolicy: IfNotPresent + imagePullPolicy: Always ports: - containerPort: 22 command: ["/bin/bash", "-c"] diff --git a/docker-utils/status-init/README.md b/docker-utils/status-init/README.md index 5cbb25a..2f9439a 100644 --- a/docker-utils/status-init/README.md +++ b/docker-utils/status-init/README.md @@ -5,5 +5,9 @@ It will grab the env variables of BOOT_ENRS and STORE_ENRS (bootstrap and store ## Changelog +- v1.1.0 + - Working with status-backend `b22c58bd3bdd4a387dc09dccba1d866d5ae09adf` + - Moved bootstrap_enrs to only "discV5BootstrapNodes" so light clients are not disconnected. + - v1.0.0 - Working with status-backend `b22c58bd3bdd4a387dc09dccba1d866d5ae09adf` diff --git a/docker-utils/status-init/init_container.py b/docker-utils/status-init/init_container.py index ee0fa40..d061f0a 100644 --- a/docker-utils/status-init/init_container.py +++ b/docker-utils/status-init/init_container.py @@ -31,8 +31,8 @@ def update_config(): config = { "dst.dev": { - "wakuNodes": bootstrap_enrs, - "discV5BootstrapNodes": [], + "wakuNodes": bootstrap_enrs, # --staticnode + "discV5BootstrapNodes": bootstrap_enrs, # --discv5-bootstrap-node "clusterId": int(os.getenv("CLUSTER_ID", 16)), "storeNodes": [] } diff --git a/src/inject_messages.py b/src/inject_messages.py index b4de4a5..9a2f9f5 100644 --- a/src/inject_messages.py +++ b/src/inject_messages.py @@ -60,7 +60,7 @@ async def inject_messages_group_chat(pod: StatusBackend, delay_between_message: await asyncio.sleep(delay_between_message) except AssertionError as e: - logger.error(f"Error sending message: {e}") + logger.error(f"Error sending message from pod {pod.base_url}: {e}") await asyncio.sleep(1) logger.info(f"Finished sending {num_messages} messages") \ No newline at end of file diff --git a/src/signal_client.py b/src/signal_client.py index 3204451..5ffce55 100644 --- a/src/signal_client.py +++ b/src/signal_client.py @@ -20,11 +20,15 @@ SIGNALS_DIR = os.path.dirname(os.path.abspath(__file__)) class BufferedQueue: - def __init__(self, max_size: int = 100): + def __init__(self, max_size: int = 200): self.queue = asyncio.Queue() self.buffer = deque(maxlen=max_size) + self.messages = [] async def put(self, item): + if item.get("event") is not None and item.get("event").get("messages"): + for message in item["event"]["messages"]: + self.messages.append((item["timestamp"], message["text"])) self.buffer.append(item) await self.queue.put(item) @@ -80,6 +84,22 @@ class AsyncSignalClient: elif msg.type == WSMsgType.ERROR: logger.error(f"WebSocket error: {self.ws.exception()}") + def cleanup_signal_queues(self): + queue_names = [ + "messages.new", "message.delivered", "node.ready", + "node.started", "node.stopped" + ] # All but login, so we can find key uid + + if self.signal_queues: + for queue_name in queue_names: + queue = self.signal_queues.get(queue_name) + if queue and isinstance(queue, BufferedQueue): + queue.buffer.clear() + queue.messages.clear() + logger.debug(f"Cleaned queue: {queue_name}") + + logger.debug("Specified signal queues have been cleaned up.") + async def on_message(self, signal: str): signal_data = json.loads(signal) logger.debug(f"Received WebSocket message: {signal_data}") diff --git a/src/status_backend.py b/src/status_backend.py index 554bbb3..a13f062 100644 --- a/src/status_backend.py +++ b/src/status_backend.py @@ -1,6 +1,7 @@ # Python Imports import logging import json +import time from typing import List, Dict from aiohttp import ClientSession, ClientTimeout @@ -153,12 +154,16 @@ class StatusBackend: }) signal = await self.signal.wait_for_login() self.set_public_key(signal) + self.last_login = int(time.time()) return response - async def logout(self) -> dict: + async def logout(self, clean_signals = False) -> dict: json_response = await self.api_valid_request("Logout", {}) _ = await self.signal.wait_for_logout() logger.debug("Successfully logged out") + if clean_signals: + self.signal.cleanup_signal_queues() + return json_response def set_public_key(self, signal_data: dict):