Replace requirements.txt with uv and cleanup (#130)

* Removed mesh_creation from the repo

* Removed visualizer from the repo

* Removed old python files

* Rename kubernetes.py file to avoid clashes with kubernetes library import

* Update kubernetes imports

* Add uv files

* Update README.md

* Update pyproject.toml

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>

* Delete old requirements.txt

---------

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>
This commit is contained in:
Alberto Soutullo
2025-08-19 15:30:21 +02:00
committed by GitHub
parent 1bb251d6a2
commit e74babf6c5
35 changed files with 1212 additions and 1637 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11.5

View File

@@ -13,6 +13,16 @@ Note: This is a work in progress. Tooling and folder structure may change in the
---
## Dependencies
### uv
Install [uv](https://docs.astral.sh/uv/#installation) and just run:
```shell
uv sync
```
Required python version will be installed if not present in the system, alongside with the necessary requirements.
## Repository Structure
```
@@ -20,7 +30,6 @@ analysis/
├── scrape.py # Scrape tool
├── example_log_analysis.py # Analysis tool
├── scrape.yaml # Scrape config
└── requirements.txt # Requirements for scrape and analysis tools
deployment/
├── docker_utilities/ # Dockerfiles & resources to build experiment containers
├── kubernetes-utilities/ # Services required on Kubernetes for certain experiments
@@ -28,7 +37,6 @@ deployment/
experiments/
├── deployment.py # Experiment deployment script (generates & deploys)
├── README.md # Usage guide for deployment script
└── requirements.txt # Requirements for deployment script
```
---

Binary file not shown.

View File

@@ -1,13 +0,0 @@
# Message Trace Viewer
![Example](images/example.jpg)
### Steps to run it:
1. Run `app.py`
2. Open `http://127.0.0.1:8050/` if configured by default
3. Upload `csv` file generated by the analyzer

View File

@@ -1,13 +0,0 @@
# Python Imports
# Project Imports
from dash import Dash
from layout import layout
from callbacks import register_callbacks
app = Dash(__name__)
app.layout = layout
register_callbacks(app)
if __name__ == '__main__':
app.run_server(debug=True)

View File

@@ -1,144 +0,0 @@
# Python Imports
import io
import json
import networkx as nx
import pandas as pd
import plotly.graph_objects as go
# Project Imports
from dash import Input, Output, State, ctx
from utils import read_csv_from_file, get_fixed_positions
def register_callbacks(app):
@app.callback(
[Output('df-storage', 'children'),
Output('positions-storage', 'children')],
[Input('upload-data', 'contents')],
)
def upload_files(content):
if content is None:
return None, None
df = read_csv_from_file(content)
positions = get_fixed_positions(df)
positions_json = json.dumps(positions)
return df.to_json(date_format='iso', orient='split'), positions_json
@app.callback(
[Output('timestamp-dropdown', 'options'),
Output('timestamp-dropdown', 'value'),
Output('hash-dropdown', 'options'),
Output('hash-dropdown', 'value'),
Output('timestamp-index-store', 'data')],
[Input('df-storage', 'children'),
Input('previous-button', 'n_clicks'),
Input('next-button', 'n_clicks')],
[State('timestamp-index-store', 'data')]
)
def update_dropdowns_and_index(df_json, prev_clicks, next_clicks, timestamp_index):
if df_json is None:
return [], None, [], None, {'index': 0}
df = pd.read_json(io.StringIO(df_json), orient='split')
hash_options = [{'label': h, 'value': h} for h in df['msg_hash'].unique()]
hash_value = hash_options[0]['value'] if hash_options else None
if hash_value:
timestamps = df.loc[df['msg_hash'] == hash_value, 'timestamp']
timestamp_options = [{'label': str(ts), 'value': str(ts)} for ts in timestamps]
total_timestamps = len(timestamp_options)
else:
timestamp_options = []
total_timestamps = 0
if timestamp_index is None:
timestamp_index = {'index': 0}
index = timestamp_index.get('index', 0)
if ctx.triggered_id == 'next-button' and index + 1 < total_timestamps:
index += 1
elif ctx.triggered_id == 'previous-button' and index - 1 >= 0:
index -= 1
timestamp_value = timestamp_options[index]['value'] if total_timestamps > 0 else None
return timestamp_options, timestamp_value, hash_options, hash_value, {'index': index}
@app.callback(
Output('networkx-trace-graph', 'figure'),
[Input('hash-dropdown', 'value'),
Input('timestamp-dropdown', 'value')],
[State('df-storage', 'children'),
State('positions-storage', 'children')]
)
def update_graph(selected_hash, selected_timestamp, df_json, positions):
if not selected_hash or not selected_timestamp or df_json is None or positions is None:
return go.Figure()
positions = json.loads(positions)
df = pd.read_json(io.StringIO(df_json), orient='split')
filtered_df = df[(df['msg_hash'] == selected_hash) & (df['timestamp'] <= pd.to_datetime(selected_timestamp))]
directed_graph = nx.DiGraph()
for _, row in filtered_df.iterrows():
sender = row['sender_peer_id']
receiver = row['receiver_peer_id']
if not directed_graph.has_node(sender):
directed_graph.add_node(sender)
if not directed_graph.has_node(receiver):
directed_graph.add_node(receiver)
directed_graph.add_edge(sender, receiver, timestamp=row['timestamp'], pod=row['pod-name'])
edge_x = []
edge_y = []
node_x = []
node_y = []
node_text = []
for edge in directed_graph.edges():
x0, y0 = positions[edge[0]]
x1, y1 = positions[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
for node in directed_graph.nodes(data=True):
x, y = positions[node[0]]
node_x.append(x)
node_y.append(y)
node_text.append(f"{node[0]}")
fig = go.Figure()
fig.add_trace(go.Scatter(
x=edge_x, y=edge_y,
marker=dict(size=10, symbol="arrow-bar-up", angleref="previous"),
hoverinfo='none'
))
fig.add_trace(go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
text=node_text,
textposition='top center',
hoverinfo='text',
marker=dict(
color='blue',
size=12,
line=dict(color='black', width=1)
)
))
fig.update_layout(
title=f"Message Trace for {selected_hash}",
showlegend=False,
margin=dict(l=40, r=40, t=60, b=40),
xaxis=dict(visible=False),
yaxis=dict(visible=False),
height=800,
uirevision=True
)
return fig

Binary file not shown.

Before

Width:  |  Height:  |  Size: 95 KiB

View File

@@ -1,44 +0,0 @@
# Python Imports
from dash import dcc, html
layout = html.Div([
html.H1("Message Trace Viewer", style={'textAlign': 'center'}),
html.Label("Upload CSV File"),
dcc.Upload(
id='upload-data',
children=html.Button('Upload File'),
multiple=False
),
html.Label("Select Message Hash:"),
dcc.Dropdown(
id='hash-dropdown',
options=[],
value=None,
placeholder="Select a message hash"
),
html.Label("Selected Timestamp:"),
dcc.Dropdown(
id='timestamp-dropdown',
options=[],
value=None,
placeholder="Select a timestamp"
),
html.Div([
html.Button('Previous', id='previous-button', n_clicks=0),
html.Button('Next', id='next-button', n_clicks=0),
], style={'marginTop': '10px'}),
dcc.Graph(
id='networkx-trace-graph',
config={'scrollZoom': True},
style={'width': '100%', 'height': '800px'}
),
html.Div(id='df-storage', style={'display': 'none'}),
html.Div(id='positions-storage', style={'display': 'none'}),
dcc.Store(id='timestamp-index-store', storage_type='memory')
])

View File

@@ -1,30 +0,0 @@
# Python Imports
import base64
import io
import networkx as nx
import pandas as pd
def read_csv_from_file(contents):
content_type, content_string = contents.split(',')
decoded = base64.b64decode(content_string)
return pd.read_csv(io.StringIO(decoded.decode('utf-8')))
def get_fixed_positions(df):
graph = nx.DiGraph()
for _, row in df.iterrows():
sender = row['sender_peer_id']
receiver = row['receiver_peer_id']
if not graph.has_node(sender):
graph.add_node(sender)
if not graph.has_node(receiver):
graph.add_node(receiver)
# Generate fixed positions for the nodes using a layout algorithm
fixed_pos = nx.spring_layout(graph, seed=0)
fixed_pos_serializable = {node: pos.tolist() for node, pos in fixed_pos.items()}
return fixed_pos_serializable

View File

@@ -1,126 +0,0 @@
# Mesh Creation Module
A Python module for creating and managing mesh networks of p2p nodes in Kubernetes, with support for custom topologies and different node protocols.
## Components
### Core Classes
1. `TopologyManager`
- Generates network topologies
- Supports custom degree constraints
- Imports Pajek format networks
- Configures node connections
2. `PodManager`
- Manages Kubernetes pod deployments
- Handles pod-to-pod communication
- Tracks pod states and identifiers
- Executes commands in pods
3. `NodeProtocol` (Abstract Base Class)
- Base class for protocol implementations
- Defines interface for node communication
- Handles identifier retrieval and connections
### Protocol Implementations
1. `WakuProtocol`
- Implementation for Waku nodes
- Handles ENR URI retrieval
- Manages node connections via HTTP API
2. `LibP2PProtocol`
- Generic LibP2P implementation
- Handles peer ID management
- Configures direct connections
## Usage Examples
### Basic Node Deployment
```python
from mesh_creation.topology_creation import TopologyManager
from mesh_creation.protocols.waku_protocol import WakuProtocol
# Initialize manager with Waku protocol
manager = TopologyManager(
kube_config="config.yaml",
namespace="test",
protocol=WakuProtocol(port=8645)
)
# Deploy nodes from YAML
manager.setup_nodes("waku-nodes.yaml")
# Generate and configure topology
graph = manager.generate_topology(
"libp2p_custom",
n=5,
d_low=2,
d_high=4
)
manager.configure_node_connections(graph)
```
### Custom Topology from Pajek
```python
# Read existing topology
graph = manager.read_pajek("topology.net")
manager.configure_node_connections(graph)
```
## Configuration Files
### Node Deployment (YAML)
```yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: waku-node
spec:
replicas: 5
template:
spec:
containers:
- name: waku
image: wakuorg/node:latest
ports:
- containerPort: 8645
```
## Development
### Adding New Protocol Support
1. Create new protocol class:
```python
from mesh_creation.protocols.base_protocol import BaseProtocol
class CustomProtocol(BaseProtocol):
def get_node_identifier(self) -> List[str]:
return ["curl", "-s", "http://localhost:8080/id"]
def get_connection_command(self, identifier: str) -> List[str]:
return ["curl", "-s", "-X", "POST", f"http://localhost:8080/connect/{identifier}"]
def parse_identifier_response(self, response: str) -> str:
return json.loads(response)["id"]
```
2. Use with topology manager:
```python
manager = TopologyManager(protocol=CustomProtocol())
```
### Running Tests
```bash
# Run all mesh_creation tests
pytest src/mesh_creation/tests/
# Run specific test file
pytest src/mesh_creation/tests/test_pod_manager.py
```

View File

@@ -1,133 +0,0 @@
# Python Imports
import logging
import yaml
from pathlib import Path
from typing import List, Dict, Any, Tuple
from result import Result, Err, Ok
# Project Imports
import src.logger.logger
from topology_creation import TopologyManager
from protocols.base_protocol import BaseProtocol
logger = logging.getLogger("src.mesh_creation.mesh_manager")
class MeshManager:
def __init__(self, kube_config: str, namespace: str):
self.kube_config = kube_config
self.namespace = namespace
def create_mesh(self, statefulset_configs: List[Tuple[str, Dict[str, Any], BaseProtocol]]) -> None:
"""
Create a mesh network using the provided StatefulSets and their respective topology configurations
and protocols.
Args:
statefulset_configs: List of tuples (statefulset_yaml_path, topology_config, protocol)
where:
- statefulset_yaml_path: path to the StatefulSet YAML file
- topology_config: dictionary containing:
- type: implemented topology type (e.g., "custom", "random", etc.)
- parameters: dictionary of parameters for the topology
- protocol: protocol implementation for this StatefulSet
Example:
configs = [
("statefulset1.yaml", {
"type": "libp2p_custom",
"parameters": {"n": 5, "d_low": 4, "d_high": 8}
}, WakuProtocol(port=8645)),
("statefulset2.yaml", {
"type": "random",
"parameters": {"n": 3, "p": 0.5}
}, LibP2PProtocol(port=8080))
]
"""
topology_managers = {
yaml_file: TopologyManager(
kube_config=self.kube_config,
namespace=self.namespace,
protocol=protocol
)
for yaml_file, _, protocol in statefulset_configs
}
yaml_files = [config[0] for config in statefulset_configs]
for yaml_file in yaml_files:
logger.info(f"Deploying nodes from {yaml_file}")
topology_managers[yaml_file].setup_nodes(Path(yaml_file))
for yaml_file, topology_config, _ in statefulset_configs:
logger.info(f"Generating {topology_config['type']} topology for {yaml_file}")
topology_manager = topology_managers[yaml_file]
result = topology_manager.generate_topology(
topology_config["type"],
**topology_config.get("parameters", {})
)
if result.is_err():
break
result = topology_manager.configure_node_connections(result.ok_value)
if result.is_err():
break
def create_mesh_from_pajek_files(self, statefulset_configs: List[Tuple[str, str, BaseProtocol]]) -> None:
"""
Create a mesh network using Pajek format topology files.
Args:
statefulset_configs: List of tuples (statefulset_yaml_path, pajek_file_path, protocol)
"""
try:
# Create topology managers for each StatefulSet with its protocol
topology_managers = {
yaml_file: TopologyManager(
kube_config=self.kube_config,
namespace=self.namespace,
protocol=protocol
)
for yaml_file, _, protocol in statefulset_configs
}
# Deploy all nodes first
yaml_files = [config[0] for config in statefulset_configs]
logger.info(f"Deploying nodes from {yaml_files}")
# Deploy nodes for each StatefulSet using its specific topology manager
for yaml_file in yaml_files:
topology_managers[yaml_file].setup_nodes([yaml_file])
# Configure topology for each StatefulSet
for yaml_file, pajek_path, _ in statefulset_configs:
logger.info(f"Loading topology from {pajek_path} for {yaml_file}")
topology_manager = topology_managers[yaml_file]
graph = topology_manager.read_pajek(pajek_path)
# Configure the connections for this StatefulSet
logger.info(f"Configuring node connections for {yaml_file}")
topology_manager.configure_node_connections(graph)
logger.info("Mesh network creation completed successfully")
except Exception as e:
logger.error(f"Failed to create mesh network from Pajek files: {str(e)}")
raise
def _create_protocol_instance(self, protocol_type: str, **params) -> Result[BaseProtocol, str]:
"""Create a protocol instance based on type and parameters."""
from protocols.waku_protocol import WakuProtocol
from protocols.libp2p_protocol import LibP2PProtocol
protocol_classes = {
"waku": WakuProtocol,
"libp2p": LibP2PProtocol
}
if protocol_type not in protocol_classes:
return Err(f"Unsupported protocol type: {protocol_type}")
return Ok(protocol_classes[protocol_type](**params))

View File

@@ -1,66 +0,0 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nodes-0
namespace: zerotesting
spec:
replicas: 10
podManagementPolicy: "Parallel"
serviceName: zerotesting-service
selector:
matchLabels:
app: zerotenkay
template:
metadata:
labels:
app: zerotenkay
spec:
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local
containers:
- name: waku
image: soutullostatus/nwaku-jq-curl:v0.35.0-rc.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8645
- containerPort: 8008
readinessProbe:
httpGet:
path: /health
port: 8008
successThreshold: 5
initialDelaySeconds: 5
periodSeconds: 1
failureThreshold: 2
timeoutSeconds: 5
resources:
requests:
memory: "64Mi"
cpu: "150m"
limits:
memory: "600Mi"
cpu: "400m"
env:
- name: IP
valueFrom:
fieldRef:
fieldPath: status.podIP
command:
- sh
- -c
- /usr/bin/wakunode \
--relay=true \
--max-connections=200 \
--rest=true \
--rest-admin=true \
--rest-address=0.0.0.0 \
--discv5-discovery=false \
--discv5-enr-auto-update=True \
--log-level=INFO \
--metrics-server=True \
--metrics-server-address=0.0.0.0 \
--nat=extip:${IP} \
--cluster-id=2 \
--shard=0 \
--rendezvous=false

View File

@@ -1,222 +0,0 @@
# Python Imports
import logging
import time
import yaml
from pathlib import Path
from typing import List, Dict, Optional
# Project Imports
from kubernetes import client, config
from kubernetes.stream import stream
from result import Result, Err, Ok
from src.mesh_creation.protocols.base_protocol import BaseProtocol
from src.mesh_creation.protocols.waku_protocol import WakuProtocol
logger = logging.getLogger("src.mesh_creation.pod_manager")
class PodManager:
def __init__(self,
kube_config: str = "rubi3.yaml",
namespace: str = "zerotesting",
protocol: Optional[BaseProtocol] = None):
self.namespace = namespace
self.protocol = protocol or WakuProtocol()
config.load_kube_config(config_file=kube_config)
self.api = client.CoreV1Api()
self.apps_api = client.AppsV1Api()
self.deployed_pods = {}
def execute_pod_command(self, pod_name: str, command: list, container_name: str) -> Result[str, None]:
try:
resp = stream(
self.api.connect_get_namespaced_pod_exec,
pod_name,
self.namespace,
container=container_name,
command=command,
stderr=True,
stdin=True,
stdout=True,
tty=False
)
# If this is a curl command, try to extract only the JSON part
if command[0] == "curl":
try:
json_start = resp.find('{')
if json_start != -1:
return Ok(resp[json_start:])
except Exception as e:
logger.debug(f"Failed to extract JSON from curl response: {str(e)}")
return Err(resp)
return Ok(resp)
except Exception as e:
logger.error(f"Error executing command in pod {pod_name}: {str(e)}")
return Err(None)
def get_pod_identifier(self, pod_name: str, container_name: str) -> Result[str, None]:
"""Get the node identifier (ENR, peer ID, etc.) of a pod."""
command = self.protocol.get_node_identifier()
result = self.execute_pod_command(pod_name, command, container_name)
if result.is_ok():
return Ok(self.protocol.parse_identifier_response(result.ok_value))
logger.error(f"Error getting identifier for pod {pod_name}")
return Err(None)
def connect_pods(self, source_pod: Dict[str, str], target_pod: Dict[str, str]) -> Result[None, None]:
command = self.protocol.get_connection_command(target_pod["identifier"])
result = self.execute_pod_command(
source_pod["name"],
command,
self.deployed_pods['container_name']
)
if result.is_err():
logger.error(f"Error connecting pods: {result.err_value}")
return Err(None)
logger.info(f"Connected pod {source_pod['name']} to {target_pod['name']}")
return Ok(None)
def configure_connections(self, node_to_pod: Dict[int, str], graph) -> Result:
logger.info("Configuring pod connections based on topology")
pod_lookup = {pod['name']: pod for pod in self.deployed_pods['pods']}
for source_idx, target_idx in graph.edges():
source_name = node_to_pod[source_idx]
target_name = node_to_pod[target_idx]
source_pod = pod_lookup.get(source_name)
target_pod = pod_lookup.get(target_name)
if not source_pod or not target_pod:
logger.error(f"Could not find pods for nodes {source_idx} -> {target_idx} ({source_name} -> {target_name})")
return Err(None)
if not source_pod['identifier'] or not target_pod['identifier']:
logger.error(f"Missing identifier for pod connection {source_name} -> {target_name}")
return Err(None)
logger.info(f"Establishing connection: {source_name} -> {target_name}")
result = self.connect_pods(source_pod, target_pod)
if result.is_err():
return Err(result)
logger.info("Successfully configured all pod connections")
return Ok(None)
def apply_yaml_file(self, yaml_path: Path) -> Result[None, str]:
logger.info(f"Applying YAML file: {yaml_path}")
with open(yaml_path, 'r') as f:
docs = yaml.safe_load_all(f)
for doc in docs:
if doc["kind"] != "StatefulSet":
# Only handled for StatefulSets
return Err(f"Yaml file is not a StatefulSet: {yaml_path}")
ss_name = doc["metadata"]["name"]
logger.info(f"Found StatefulSet: {ss_name}")
# Extract container name from the StatefulSet spec
try:
container_name = doc["spec"]["template"]["spec"]["containers"][0]["name"]
replicas = doc["spec"].get("replicas", 1) # Default to 1 if not specified
logger.info(f"Found container name: {container_name} for StatefulSet {ss_name} with {replicas} replicas")
except (KeyError, IndexError) as e:
logger.error(f"Failed to extract container name from StatefulSet {ss_name}: {str(e)}")
return Err(f"StatefulSet {ss_name} must specify a container name")
try:
self.apps_api.create_namespaced_stateful_set(
namespace=self.namespace,
body=doc
)
except client.exceptions.ApiException as e:
if e.status == 409: # Already exists
return Err(f"StatefulSet {ss_name} already exists")
pods = [
{
"name": f"{ss_name}-{i}",
"identifier": "" # Will be filled when pods are ready
}
for i in range(replicas)
]
self.deployed_pods = {
'ss_name': ss_name,
'pods': pods,
'container_name': container_name
}
logger.debug(f"Successfully applied StatefulSet: {ss_name} with expected pods: {[p['name'] for p in pods]}")
return Ok(None)
def wait_for_pods_ready(self, timeout: int = 300) -> Result:
"""
Wait for all pods in the managed StatefulSets to be ready and collect their identifiers.
"""
start_time = time.time()
ss_name = self.deployed_pods['ss_name']
logger.info("Waiting for pods to be ready and collecting identifiers...")
while time.time() - start_time < timeout:
try:
ss = self.apps_api.read_namespaced_stateful_set(
name=ss_name,
namespace=self.namespace
)
if not ss.status.ready_replicas or ss.status.ready_replicas != ss.spec.replicas:
logger.info(f"StatefulSet {ss_name} not ready: "
f"{ss.status.ready_replicas}/{ss.spec.replicas} replicas")
time.sleep(5)
continue
selector = ss.spec.selector.match_labels
selector_str = ",".join([f"{k}={v}" for k, v in selector.items()])
logger.debug(f"Using selector: {selector_str} for StatefulSet {ss_name}")
pods = self.api.list_namespaced_pod(
namespace=self.namespace,
label_selector=selector_str
)
if not pods.items:
logger.warning(f"No pods found for StatefulSet {ss_name} with selector {selector_str}")
break
# Keep the existing pod list structure but update identifiers
logger.info(f"Collecting identifiers for pods")
for pod in pods.items:
pod_name = pod.metadata.name
for managed_pod in self.deployed_pods['pods']:
if managed_pod['name'] == pod_name:
logger.debug(f"Collecting identifier for pod: {pod_name}")
identifier = self.get_pod_identifier(
pod_name,
self.deployed_pods['container_name']
)
if identifier.is_err():
logger.debug(f"No identifier for pod: {pod_name}")
return Err(None)
managed_pod['identifier'] = identifier.ok_value
logger.info("All pods are ready and identifiers collected!")
return Ok(None)
except client.exceptions.ApiException as e:
error = f"Error checking StatefulSet {ss_name}: {str(e)}"
logger.error(error)
return Err(error)
logger.error("Timeout waiting for pods to be ready")
return Err(None)
def get_all_pods(self) -> List[Dict[str, str]]:
return [pod for ss_info in self.deployed_pods.values() for pod in ss_info['pods']]
def get_pods_by_statefulset(self, statefulset_name: str) -> List[Dict[str, str]]:
return self.deployed_pods.get(statefulset_name, {}).get('pods', [])

View File

@@ -1,20 +0,0 @@
# Python Imports
import logging
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class BaseProtocol(ABC):
"""Abstract base class for node communication protocols."""
@abstractmethod
def get_node_identifier(self) -> list:
pass
@abstractmethod
def get_connection_command(self, target_identifier: str) -> list:
pass
@abstractmethod
def parse_identifier_response(self, response: str) -> str:
pass

View File

@@ -1,19 +0,0 @@
# Python Imports
# Project Imports
from src.mesh_creation.protocols.base_protocol import BaseProtocol
class LibP2PProtocol(BaseProtocol):
def __init__(self, port: int = 8080):
self.port = port
def get_node_identifier(self) -> list:
pass
def get_connection_command(self, target_identifier: str) -> list:
pass
def parse_identifier_response(self, response: str) -> str:
pass

View File

@@ -1,44 +0,0 @@
# Python Imports
import ast
import json
import logging
# Project Imports
from src.mesh_creation.protocols.base_protocol import BaseProtocol
logger = logging.getLogger(__name__)
class WakuProtocol(BaseProtocol):
def __init__(self, port: int = 8645):
self.port = port
def get_node_identifier(self) -> list:
return [
"curl", "-s", "-X", "GET",
f"http://127.0.0.1:{self.port}/debug/v1/info",
"-H", "accept: application/json"
]
def get_connection_command(self, target_identifier: str) -> list:
return [
"curl", "-s", "-X", "POST",
f"http://127.0.0.1:{self.port}/admin/v1/peers",
"-H", "Content-Type: application/json",
"-d", f"[{json.dumps(target_identifier)}]"
]
def parse_identifier_response(self, response: str) -> str:
try:
try:
data = ast.literal_eval(response)
except (ValueError, SyntaxError) as e:
response_json = response.replace("'", '"')
data = json.loads(response_json)
return data["listenAddresses"][0]
except Exception as e:
logger.error(f"Error parsing Waku node identifier: {str(e)}")
logger.error(f"Failed response content: {response}")
return ""

View File

@@ -1,54 +0,0 @@
# Python Imports
import logging
# Project Imports
import src.logger.logger
from mesh_manager import MeshManager
from protocols.waku_protocol import WakuProtocol
from protocols.libp2p_protocol import LibP2PProtocol
logger = logging.getLogger("src.mesh_creation.run_mesh_creation")
def main():
logger.info("Starting mesh creation process")
mesh_manager = MeshManager(
kube_config="rubi3.yaml",
namespace="zerotesting"
)
# Example 1: Create mesh with different protocols and topology configs for each StatefulSet
statefulset_configs = [
# First StatefulSet with Waku protocol and custom topology
("nodes.yaml", {
"type": "libp2p_custom",
"parameters": {
"n": 10,
"d_low": 4,
"d_high": 6
}
}, WakuProtocol(port=8645)),
## Second StatefulSet with LibP2P protocol and random topology
#("statefulset2.yaml", {
# "type": "random",
# "parameters": {
# "n": 3,
# "p": 0.5
# }
#}, LibP2PProtocol(port=8080))
]
logger.info("Creating mesh with configurations: %s", statefulset_configs)
mesh_manager.create_mesh(statefulset_configs)
logger.info("Mesh creation completed")
# Example 3: Create mesh from Pajek files with different protocols
# pajek_configs = [
# ("statefulset1.yaml", "topology1.net", WakuProtocol(port=8645)),
# ("statefulset2.yaml", "topology2.net", LibP2PProtocol(port=8080))
# ]
# mesh_manager.create_mesh_from_pajek_files(pajek_configs)
if __name__ == "__main__":
main()

View File

@@ -1,52 +0,0 @@
# Python imports
import pytest
from unittest.mock import Mock
from pathlib import Path
# Project imports
from src.mesh_creation.protocols.base_protocol import BaseProtocol
@pytest.fixture(autouse=True)
def mock_kubernetes():
"""Mock kubernetes configuration and clients for all tests"""
with pytest.MonkeyPatch.context() as mp:
mp.setattr('kubernetes.config.load_kube_config', Mock())
mp.setattr('kubernetes.client.CoreV1Api', Mock)
mp.setattr('kubernetes.client.AppsV1Api', Mock)
yield
@pytest.fixture
def test_data_dir():
"""Return a Path object for the test data directory"""
return Path(__file__).parent / 'data'
@pytest.fixture
def mock_statefulset_yaml():
"""Return a mock StatefulSet YAML configuration"""
return {
"kind": "StatefulSet",
"metadata": {
"name": "test-statefulset"
},
"spec": {
"replicas": 3,
"template": {
"spec": {
"containers": [
{
"name": "test-container"
}
]
}
}
}
}
@pytest.fixture
def mock_protocol():
"""Create a mock protocol that implements BaseProtocol"""
protocol = Mock(spec=BaseProtocol)
protocol.get_node_identifier.return_value = ["curl", "-s", "http://localhost:8645/enr"]
protocol.get_connection_command.return_value = ["curl", "-s", "-X", "POST", "http://localhost:8645/connect"]
protocol.parse_identifier_response.return_value = "enr:-123abc"
return protocol

View File

@@ -1,184 +0,0 @@
# Python imports
import pytest
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
from kubernetes import client
from result import Ok, Err
# Project imports
from src.mesh_creation.pod_manager import PodManager
from src.mesh_creation.protocols.base_protocol import BaseProtocol
@pytest.fixture
def mock_protocol():
protocol = Mock(spec=BaseProtocol)
protocol.get_node_identifier.return_value = Ok(["curl", "-s", "http://localhost:8645/enr"])
protocol.get_connection_command.return_value = Ok(["curl", "-s", "-X", "POST", "http://localhost:8645/connect"])
protocol.parse_identifier_response.return_value = Ok("enr:-123abc")
return protocol
@pytest.fixture
def pod_manager(mock_protocol):
with patch('kubernetes.config.load_kube_config'):
with patch('kubernetes.client.CoreV1Api'):
with patch('kubernetes.client.AppsV1Api'):
return PodManager(
kube_config="test_config.yaml",
namespace="test",
protocol=mock_protocol
)
def test_init(pod_manager):
assert pod_manager.namespace == "test"
assert pod_manager.deployed_pods == {}
def test_execute_pod_command_success(pod_manager):
mock_stream = Mock(return_value="test response")
with patch('kubernetes.stream.stream', mock_stream):
result = pod_manager.execute_pod_command(
"test-pod",
["echo", "hello"],
"test-container"
)
assert result.is_ok()
assert result.ok_value == "test response"
def test_execute_pod_command_failure(pod_manager):
with patch('kubernetes.stream.stream', side_effect=Exception("Connection failed")):
result = pod_manager.execute_pod_command(
"test-pod",
["echo", "hello"],
"test-container"
)
assert result.is_err()
assert "Failed to execute command" in result.err_value
def test_execute_pod_command_curl(pod_manager):
mock_stream = Mock(return_value='Progress... {"key": "value"}')
with patch('kubernetes.stream.stream', mock_stream):
result = pod_manager.execute_pod_command(
"test-pod",
["curl", "http://test"],
"test-container"
)
assert result.is_ok()
assert result.ok_value == '{"key": "value"}'
def test_get_pod_identifier_success(pod_manager, mock_protocol):
with patch.object(pod_manager, 'execute_pod_command', return_value=Ok("test response")):
result = pod_manager.get_pod_identifier("test-pod", "test-container")
assert result.is_ok()
assert result.ok_value == "enr:-123abc"
def test_get_pod_identifier_failure(pod_manager, mock_protocol):
with patch.object(pod_manager, 'execute_pod_command', return_value=Err("Command failed")):
result = pod_manager.get_pod_identifier("test-pod", "test-container")
assert result.is_err()
assert "Failed to get pod identifier" in result.err_value
def test_connect_pods_success(pod_manager):
source_pod = {"name": "pod1", "identifier": "id1"}
target_pod = {"name": "pod2", "identifier": "id2"}
pod_manager.deployed_pods = {'container_name': 'test-container'}
with patch.object(pod_manager, 'execute_pod_command', return_value=Ok("success")):
result = pod_manager.connect_pods(source_pod, target_pod)
assert result.is_ok()
def test_connect_pods_failure(pod_manager):
source_pod = {"name": "pod1", "identifier": "id1"}
target_pod = {"name": "pod2", "identifier": "id2"}
pod_manager.deployed_pods = {'container_name': 'test-container'}
with patch.object(pod_manager, 'execute_pod_command', return_value=Err("Connection failed")):
result = pod_manager.connect_pods(source_pod, target_pod)
assert result.is_err()
assert "Failed to connect pods" in result.err_value
def test_apply_yaml_file_success(pod_manager):
mock_yaml = {
"kind": "StatefulSet",
"metadata": {"name": "test-ss"},
"spec": {
"replicas": 3,
"template": {
"spec": {
"containers": [{"name": "test-container"}]
}
}
}
}
with patch('builtins.open', create=True) as mock_open:
mock_open.return_value.__enter__.return_value.read.return_value = mock_yaml
result = pod_manager.apply_yaml_file(Path("test.yaml"))
assert result.is_ok()
assert pod_manager.deployed_pods['ss_name'] == "test-ss"
assert len(pod_manager.deployed_pods['pods']) == 3
assert pod_manager.deployed_pods['container_name'] == "test-container"
def test_apply_yaml_file_invalid_yaml(pod_manager):
with patch('builtins.open', create=True) as mock_open:
mock_open.return_value.__enter__.return_value.read.side_effect = Exception("Invalid YAML")
result = pod_manager.apply_yaml_file(Path("test.yaml"))
assert result.is_err()
assert "Failed to read YAML file" in result.err_value
def test_wait_for_pods_ready_success(pod_manager):
pod_manager.deployed_pods = {
'ss_name': 'test-ss',
'container_name': 'test-container',
'pods': [{'name': 'test-ss-0', 'identifier': ''}]
}
mock_ss = Mock()
mock_ss.status.ready_replicas = 1
mock_ss.spec.replicas = 1
mock_ss.spec.selector.match_labels = {"app": "test"}
mock_pod = Mock()
mock_pod.metadata.name = "test-ss-0"
mock_pod_list = Mock()
mock_pod_list.items = [mock_pod]
with patch.object(pod_manager.apps_api, 'read_namespaced_stateful_set', return_value=mock_ss):
with patch.object(pod_manager.api, 'list_namespaced_pod', return_value=mock_pod_list):
with patch.object(pod_manager, 'get_pod_identifier', return_value=Ok("test-id")):
result = pod_manager.wait_for_pods_ready()
assert result.is_ok()
assert pod_manager.deployed_pods['pods'][0]['identifier'] == "test-id"
def test_wait_for_pods_ready_timeout(pod_manager):
pod_manager.deployed_pods = {
'ss_name': 'test-ss',
'container_name': 'test-container',
'pods': [{'name': 'test-ss-0', 'identifier': ''}]
}
mock_ss = Mock()
mock_ss.status.ready_replicas = 0
mock_ss.spec.replicas = 1
with patch.object(pod_manager.apps_api, 'read_namespaced_stateful_set', return_value=mock_ss):
result = pod_manager.wait_for_pods_ready(timeout=1)
assert result.is_err()
assert "Timeout waiting for pods" in result.err_value
def test_configure_connections_success(pod_manager):
pod_manager.deployed_pods = {
'pods': [
{'name': 'test-ss-0', 'identifier': 'id0'},
{'name': 'test-ss-1', 'identifier': 'id1'}
],
'container_name': 'test-container'
}
mock_graph = Mock()
mock_graph.edges.return_value = [(0, 1)]
node_to_pod = {0: 'test-ss-0', 1: 'test-ss-1'}
with patch.object(pod_manager, 'connect_pods', return_value=Ok(None)):
result = pod_manager.configure_connections(node_to_pod, mock_graph)
assert result.is_ok()

View File

@@ -1,55 +0,0 @@
# Python imports
import pytest
from unittest.mock import Mock
from result import Ok, Err
# Project imports
from src.mesh_creation.protocols.base_protocol import BaseProtocol
from src.mesh_creation.protocols.waku_protocol import WakuProtocol
def test_waku_protocol_init():
protocol = WakuProtocol(port=8645)
assert protocol.port == 8645
def test_waku_protocol_get_node_identifier():
protocol = WakuProtocol(port=8645)
result = protocol.get_node_identifier()
assert result.is_ok()
assert result.ok_value == ["curl", "-s", "http://localhost:8645/enr"]
def test_waku_protocol_get_connection_command():
protocol = WakuProtocol(port=8645)
enr = "enr:-123abc"
result = protocol.get_connection_command(enr)
assert result.is_ok()
assert result.ok_value == ["curl", "-s", "-X", "POST", f"http://localhost:8645/connect/{enr}"]
def test_waku_protocol_parse_identifier_response_success():
protocol = WakuProtocol(port=8645)
response = '{"enr": "enr:-123abc"}'
result = protocol.parse_identifier_response(response)
assert result.is_ok()
assert result.ok_value == "enr:-123abc"
def test_waku_protocol_parse_identifier_response_invalid():
protocol = WakuProtocol(port=8645)
response = 'invalid json'
result = protocol.parse_identifier_response(response)
assert result.is_err()
assert "Failed to parse JSON response" in result.err_value
def test_base_protocol_abstract_methods():
class ConcreteProtocol(BaseProtocol):
def get_node_identifier(self):
return Ok(["test"])
def get_connection_command(self, identifier):
return Ok(["test", identifier])
def parse_identifier_response(self, response):
return Ok(response)
protocol = ConcreteProtocol()
assert protocol.get_node_identifier().ok_value == ["test"]
assert protocol.get_connection_command("id").ok_value == ["test", "id"]
assert protocol.parse_identifier_response("response").ok_value == "response"

View File

@@ -1,94 +0,0 @@
# Python imports
import pytest
from unittest.mock import Mock, patch
import networkx as nx
from pathlib import Path
from result import Ok, Err
# Project imports
from src.mesh_creation.topology_creation import TopologyManager
from src.mesh_creation.protocols.base_protocol import BaseProtocol
@pytest.fixture
def mock_protocol():
return Mock(spec=BaseProtocol)
@pytest.fixture
def topology_manager(mock_protocol):
with patch('kubernetes.config.load_kube_config'):
with patch('kubernetes.client.CoreV1Api'):
return TopologyManager(
kube_config="test_config.yaml",
namespace="test",
protocol=mock_protocol
)
def test_init(topology_manager, mock_protocol):
assert topology_manager.namespace == "test"
assert topology_manager.protocol == mock_protocol
def test_setup_nodes_success(topology_manager):
with patch.object(topology_manager.pod_manager, 'apply_yaml_file', return_value=Ok(None)):
with patch.object(topology_manager.pod_manager, 'wait_for_pods_ready', return_value=Ok(None)):
result = topology_manager.setup_nodes(Path("test.yaml"))
assert result.is_ok()
def test_setup_nodes_failure(topology_manager):
with patch.object(topology_manager.pod_manager, 'apply_yaml_file', return_value=Ok(None)):
with patch.object(topology_manager.pod_manager, 'wait_for_pods_ready', return_value=Err("Failed to wait for pods")):
result = topology_manager.setup_nodes(Path("test.yaml"))
assert result.is_err()
assert "Failed to wait for pods" in result.err_value
def test_generate_topology_libp2p_custom(topology_manager):
result = topology_manager.generate_topology(
"libp2p_custom",
n=5,
d_low=2,
d_high=4
)
assert result.is_ok()
graph = result.ok_value
assert isinstance(graph, nx.Graph)
assert len(graph.nodes) == 5
assert all(2 <= d <= 4 for _, d in graph.degree())
def test_generate_topology_invalid_type(topology_manager):
result = topology_manager.generate_topology("invalid_type")
assert result.is_err()
def test_configure_libp2p_custom(topology_manager):
graph = topology_manager.configure_libp2p_custom(n=5, d_high=4, d_low=2)
assert isinstance(graph, nx.Graph)
assert len(graph.nodes) == 5
assert all(2 <= d <= 4 for _, d in graph.degree())
def test_configure_node_connections_success(topology_manager):
mock_graph = nx.Graph()
mock_graph.add_edges_from([(0, 1), (1, 2)])
topology_manager.pod_manager.deployed_pods = {
'pods': [
{'name': 'test-0', 'identifier': 'id0'},
{'name': 'test-1', 'identifier': 'id1'},
{'name': 'test-2', 'identifier': 'id2'}
]
}
with patch.object(topology_manager.pod_manager, 'configure_connections', return_value=Ok(None)):
result = topology_manager.configure_node_connections(mock_graph)
assert result.is_ok()
def test_configure_node_connections_not_enough_pods(topology_manager):
mock_graph = nx.Graph()
mock_graph.add_edges_from([(0, 1), (1, 2)])
topology_manager.pod_manager.deployed_pods = {
'pods': [
{'name': 'test-0', 'identifier': 'id0'},
{'name': 'test-1', 'identifier': 'id1'}
]
}
result = topology_manager.configure_node_connections(mock_graph)
assert result.is_err()

View File

@@ -1,19 +0,0 @@
# Topology Configuration File
# Available topology types: random, scale_free, small_world
topology_type: "random"
# Parameters for topology generation
parameters:
# For random (Erdős-Rényi) topology:
n: 10 # number of nodes
p: 0.2 # probability of edge creation
# For scale-free (Barabási-Albert) topology:
# n: 10 # number of nodes
# m: 2 # number of edges to attach from a new node
# For small-world (Watts-Strogatz) topology:
# n: 10 # number of nodes
# k: 4 # each node is connected to k nearest neighbors
# p: 0.1 # probability of rewiring each edge

View File

@@ -1,82 +0,0 @@
# PythonImports
from pathlib import Path
import random
import networkx as nx
import logging
from kubernetes import client, config
from result import Result, Err, Ok
# Project Imports
import src.logger.logger
from typing import Optional
from src.mesh_creation.pod_manager import PodManager
from src.mesh_creation.protocols.base_protocol import BaseProtocol
logger = logging.getLogger(__name__)
class TopologyManager:
def __init__(self, kube_config: str, namespace: str, protocol: Optional[BaseProtocol]):
self.namespace = namespace
self.protocol = protocol
self.pod_manager = PodManager(kube_config, namespace, protocol=self.protocol)
config.load_kube_config(config_file=kube_config)
self.api = client.CoreV1Api()
def setup_nodes(self, yaml_file: Path) -> Result[None, Err]:
result = self.pod_manager.apply_yaml_file(yaml_file)
if result.is_err():
return result
result = self.pod_manager.wait_for_pods_ready()
if result.is_err():
return result
return Ok(None)
def read_pajek(self, pajek_path: str) -> nx.Graph:
return nx.read_pajek(pajek_path)
def generate_topology(self, topology_type: str, **params) -> Result[nx.Graph, None]:
if topology_type == "libp2p_custom":
return Ok(self.configure_libp2p_custom(**params))
else:
logger.error(f"Unsupported topology type: {topology_type}")
return Err(None)
def configure_libp2p_custom(self, n: int, d_high: int, d_low: int) -> nx.Graph:
G = nx.Graph()
G.add_nodes_from(range(n))
# Ensure minimum degree by adding edges
while any(G.degree(node) < d_low for node in G.nodes):
node = random.choice([n for n in G.nodes if G.degree(n) < d_low])
possible_neighbors = [n for n in G.nodes if
n != node and not G.has_edge(node, n) and G.degree(n) < d_high]
# Add edges until node reaches min_degree or no valid neighbors remain
while G.degree(node) < d_low and possible_neighbors:
neighbor = random.choice(possible_neighbors)
G.add_edge(node, neighbor)
possible_neighbors.remove(neighbor)
assert all(d_low <= d <= d_high for _, d in G.degree()), "Some nodes do not meet the degree constraints"
return G
def configure_node_connections(self, graph: nx.Graph) -> Result[None, None]:
logger.info("Starting node connection configuration")
pods = self.pod_manager.deployed_pods['pods']
if len(pods) < graph.number_of_nodes():
logger.error(f"Not enough pods ({len(pods)}) for the topology ({graph.number_of_nodes()} nodes)")
return Err(None)
node_to_pod = {i: pod['name'] for i, pod in enumerate(pods[:graph.number_of_nodes()])}
logger.debug(f"Node to pod mapping: {node_to_pod}")
result = self.pod_manager.configure_connections(node_to_pod, graph)
if result.is_err():
return Err(None)
return Ok(None)

View File

@@ -6,7 +6,7 @@ from result import Ok, Err
# Project Imports
from src.metrics import scrape_utils
from src.metrics import kubernetes
from src.metrics import kubernetes_manager
from src.data.data_request_handler import DataRequestHandler
from src.utils.file_utils import read_yaml_file
@@ -19,7 +19,7 @@ class Scrapper:
self._query_config = None
self._query_config_file = query_config_file
self._set_query_config()
self._k8s = kubernetes.KubernetesManager(kube_config)
self._k8s = kubernetes_manager.KubernetesManager(kube_config)
def query_and_dump_metrics(self):
# https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py

View File

@@ -1,71 +0,0 @@
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import ticker
sns.set_theme()
language = ["Nim", "Rust", "Go"]
data_to_plot = ["Rx", "Tx", "Rp", "Tp", "Rpd", "Tpd"]
y_label = ["KBytes/s", "KBytes/s", "KPackets/s", "KPackets/s", "Packets/s", "Packets/s"]
scale = [True, True, False, False, False, False]
folders_grouped = [("../gossipsubdata_2nd/csv/rx/", "../gossipsubdatarust/csv/rx/", "../gossipsubdatago/csv/rx/"),
("../gossipsubdata_2nd/csv/tx/", "../gossipsubdatarust/csv/tx/", "../gossipsubdatago/csv/tx/"),
("../gossipsubdata_2nd/csv/rp/", "../gossipsubdatarust/csv/rp/", "../gossipsubdatago/csv/rp/"),
("../gossipsubdata_2nd/csv/tp/", "../gossipsubdatarust/csv/tp/", "../gossipsubdatago/csv/tp/"),
("../gossipsubdata_2nd/csv/rpd/", "../gossipsubdatarust/csv/rpd/", "../gossipsubdatago/csv/rpd/"),
("../gossipsubdata_2nd/csv/tpd/", "../gossipsubdatarust/csv/tpd/", "../gossipsubdatago/csv/tpd/"), ]
# file_data = ["Rx-500B-1.csv", "Rx-1KB-1.csv", "Rx-2.5KB-1.csv", "Rx-10KB-1.csv", "Rx-20KB-1.csv"]
# file_data = ["Tx-500B-1.csv", "Tx-1KB-1.csv", "Tx-2.5KB-1.csv", "Tx-10KB-1.csv", "Tx-20KB-1.csv"]
fig, axs = plt.subplots(nrows=3, ncols=2, figsize=(14, 16), sharex=True, sharey='row')
for j, group in enumerate(folders_grouped):
final_df = pd.DataFrame()
for i, folder_path in enumerate(group):
if not os.path.exists(folder_path): continue
files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
files = sorted(files, key=lambda x: float(x.split("-")[1].split("KB")[0]))
folder_df = pd.DataFrame()
for file in files:
df = pd.read_csv(folder_path + file, parse_dates=['Time'], index_col='Time')
column_name = file.split("-")[1]
df_avg = df.mean()
df_avg_mean = df_avg.median()
vertical_offset = df_avg.median() * 0.05 # offset from median for display
folder_df = pd.concat([folder_df, df_avg.rename(column_name)], axis=1)
folder_df["node"] = language[i]
final_df = pd.concat([final_df, folder_df])
final_df = pd.melt(final_df, id_vars=["node"])
box_plot = sns.boxplot(data=final_df, x="variable", y="value", hue="node", ax=axs[j // 2, j % 2])
box_plot.set_title(f'{data_to_plot[j]} (N=300)')
box_plot.set(xlabel='Payload size (KB)', ylabel=f"{y_label[j]}")
box_plot.tick_params(labelbottom=True)
# plt.ylabel(f"{y_label[j]}")
# plt.xlabel('Payload size (KB)')
# sns.move_legend(box_plot, "upper left", bbox_to_anchor=(1, 1))
# plt.tight_layout()
if scale[j]:
# Create a custom formatter to divide x-axis ticks by 1000
formatter = ticker.FuncFormatter(lambda x, pos: '{:.0f}'.format(x / 1000))
# Apply the custom formatter to the x-axis ticks
box_plot.yaxis.set_major_formatter(formatter)
plt.tight_layout()
plt.savefig(f"all.png")
plt.show()
# box_plot.figure.savefig(f"{data_to_plot[j]}-melted.png")

View File

@@ -1,37 +0,0 @@
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme()
language = ["Nim", "Rust", "Go"]
server = ["metal-01.he-eu-hel1.misc.vacdst"]
folders_grouped = [("../gossipsubdata_2nd/load/109/", "../gossipsubdatarust/load/109/", "../gossipsubdatago/load/109/"),
("../gossipsubdata_2nd/load/198/", "../gossipsubdatarust/load/198/", "../gossipsubdatago/load/198/")]
for j, group in enumerate(folders_grouped):
for i, folder_path in enumerate(group):
if not os.path.exists(folder_path): continue
files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
files = sorted(files, key=lambda x: float(x.split("-")[1].split("KB")[0]))
for file in files:
df = pd.read_csv(folder_path + file)
# get column as list
df = df['1m load average'].tolist()
sns.lineplot(df, label=file.split("-")[1], legend="full")
plt.xlabel('Time (1min step)')
plt.ylabel('uptime')
plt.title(f'Nim Loads ({server[j]})')
plt.legend(title='Nodes', bbox_to_anchor=(1, 1), loc='upper left')
plt.tight_layout()
plt.savefig(f"{language[i]}-{server[j]}.png")
plt.show()

View File

@@ -1,66 +0,0 @@
import os
import re
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
sns.set_theme()
#data_folders = ["logs-0.5KB-1/", "logs-1KB-1/", "logs-2.5KB-1/", "logs-5KB-1/", "logs-10KB-1/",
# "logs-20KB-1/", "logs-40KB-1/"]
#node_folders = ["../gossipsubdata_2nd/logs/", "../gossipsubdatarust/logs/", "../gossipsubdatago/logs/"]
#language = ["Nim", "Rust", "Go"]
data_folders = ["logs-10KB-1/"]
node_folders = ["../gossipsubdata_2nd/logs/", "../test/logs/"]
language = ["Before", "After"]
pattern = r'nds:\s[0-9]+'
final_df = pd.DataFrame()
for j, node_folder in enumerate(node_folders):
folder_df = pd.DataFrame()
for i, data_folder in enumerate(data_folders):
file_df = pd.DataFrame()
column_name = data_folder.split("-")[1][:-1]
files = os.listdir(node_folder + data_folder)
for log_file in files:
values = []
with open(node_folder + data_folder + log_file, 'r') as file:
for line in file:
match = re.search(pattern, line)
if match:
value = int(match.group().split(":")[1].strip())
values.append(value)
file_df = pd.concat([file_df, pd.Series(values)], ignore_index=True)
file_df = file_df.rename({file_df.columns[0]: column_name}, axis=1)
folder_df = pd.concat([folder_df, file_df], axis=1)
folder_df["node"] = language[j]
final_df = pd.concat([final_df, folder_df])
final_df = pd.melt(final_df, id_vars=["node"])
final_df = final_df.dropna()
box_plot = sns.boxplot(data=final_df, x="variable", y="value", hue="node")
plt.ylabel('Arrival Time (ms)')
plt.xlabel('Payload size (KB)')
plt.title('Times (N=300)')
plt.show()
box_plot.figure.savefig("test.png")
# remove outliers
final_df = final_df[final_df["value"] < 1000]
box_plot = sns.boxplot(data=final_df, x="variable", y="value", hue="node")
plt.ylabel('Arrival Time (ms)')
plt.xlabel('Payload size (KB)')
plt.title('Times (N=300)')
plt.show()
box_plot.figure.savefig("test_noo.png")

View File

@@ -1,33 +0,0 @@
import os
import re
pattern = r'[0-9]+\s[A-Za-z]+:\s[0-9]+'
folder_dir = "../gossipsubdata_2nd/logs/logs-20KB-1/"
def parse_file(data, folder):
log_files = os.listdir(folder)
for log_file in log_files:
with open(folder + log_file, 'r') as file:
for line in file:
match = re.search(pattern, line)
if match:
value = int(match.group().split(" ")[0])
if value not in data:
data[value] = [[log_file], 1]
else:
data[value][0].append(log_file)
data[value][1] += 1
all_data = {}
parse_file(all_data, folder_dir)
for key, value in sorted(all_data.items()):
print(f"{key}: {value[1]}")
# Wednesday, 3 January 2024 13:40:03.052
# Wednesday, 3 January 2024 13:55:26.148
# Tuesday, 2 January 2024 14:16:03.825
# Tuesday, 2 January 2024 14:31:27.839

View File

@@ -1,12 +0,0 @@
PyYAML==6.0.2
Pygments==2.19.1
black==25.1.0
dateparser==1.2.1
humanfriendly==10.0
kubernetes==32.0.1
pydantic==2.11.7
pylint
pytest==8.3.5
regex==2024.11.6
ruamel.yaml==0.17.21
yamlpath==3.8.2

18
pyproject.toml Normal file
View File

@@ -0,0 +1,18 @@
[project]
name = "10ksim"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.11"
dependencies = [
"dateparser>=1.2.2",
"httpx>=0.28.1",
"humanfriendly>=10.0",
"kubernetes>=33.1.0",
"pandas>=2.3.1",
"pydantic>=2.11.7",
"pyyaml>=6.0.2",
"requests>=2.32.4",
"result>=0.17.0",
"ruamel-yaml==0.17.21",
"seaborn>=0.13.2",
]

1181
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff