Refactor python scaffold MVP

Minimum Viable Product for scaffold refactor.

Changes stucture of scaffold to:
- Run experiments by building from different types of services
- Copy whole folder to workdir for more organized debugging
- Use decorator to scan for experiment and add subparsers
This commit is contained in:
Pearson White
2025-06-03 17:16:36 -04:00
parent 877e1ee36a
commit 1d64902ebc
60 changed files with 1529 additions and 462 deletions

View File

@@ -1,18 +1,59 @@
# Purpose
This tool helps with setting up and running tests. Primarily, it generates deployment yamls, applies the deployments, waits for the test, and cleans up the environment based on the parameters passed in. However, it is flexible enough that it can be adapted to other workflows.
This tool helps with setting up and running tests.
Primarily, it:
- Generates deployment yamls.
- Applies the deployments.
- Waits for the test.
- Cleans up the environment based on the parameters passed in.
However, it is flexible enough that it can be adapted to other workflows.
# Example usage
```
python ./main.py --values ./deployment/waku/regression/values.yaml --config ../ruby.yaml -vv regression_nodes --type waku
python ./main.py -vv --config ~/sapphire.yaml regression-nodes --type waku --workdir ./workdir
```
Here we use the values.yaml already in the deployment folder, which already has all the required parameters.
# Requirements
- [helm](https://helm.sh/docs/intro/install/) should be installed and in $PATH. The python code utilized `helm` in a subprocess to generate the deployment yamls.
- [helm](https://helm.sh/docs/intro/install/) should be installed and in $PATH.
The python code utilized `helm` in a subprocess to generate the deployment yamls.
- `pip install -r requirements.txt`
# Pitfalls
Make sure you do not create a python virtual in this folder.
`make format` and the `registry.py` scan will raise errors.
# Structure
Essentially, this script consists of several parts:
- `kube_utils.py` - A bunch of utilities for interacting with kubernetes and a few misc utilities as well.
- `kube_utils.py` - A bunch of utilities for interacting with kubernetes
and a few misc utilities as well.
- `main.py` - Parses common parameters, does a small amount of setup, and selects experiment type.
- `experiment/dispatch.py` - Contains the information nessesary to set up and run the experiment. This may be broken into smaller pieces, such as `regression_tests/waku.py` and `regression_tests/nimlibp2p.py`. Each experiment should contain a subparser for its own parameters and a function to run that can be selected by `main.py:run_experiment`.
- `deployments/` - Contains experiments and helm template info
## Experiments
Experiments are gathered by `registry.py`,
which scans files looking for the `@experiment(...)` decorator.
Each experiment should contain the following functions:
### add_parser
- def add_parser(subparsers)
- Called in `main` to add subparsers for CLI arguments.
### run
- def run(
self,
api_client: ApiClient,
args: argparse.Namespace,
values_yaml: Optional[yaml.YAMLObject]
)
- Called in `main.py/run_experiment` with `args` and `values_yaml`
from CLI args and `api_client` created with the kubeconfig value from `--config`.

0
experiments/__init__.py Normal file
View File

View File

@@ -0,0 +1,18 @@
Each subfolder (nimlibp2p2, waku, etc) contains all the resources to
set up and run experiments for that type of deployment.
See the README.md under each subfolder for more details.
When creating a new deployment type:
- Use the file suffix `.values.yaml` for helper values yamls.
Within each deployment, for each `*.values.yaml` file under the project dir,
`--values <file.values.yaml>` will be added to the `helm template` command.
- `.values.yaml` files should be under `./templates`.
That is where `get_values_yamls` looks for values yamls.
- Common values should be in `<project_dir>/values.yaml`.
This is in-line with `helm` conventions[¹](https://helm.sh/docs/chart_template_guide/values_files/).
- Helper templates (`.tpl` files) should have a leading underscore (eg. `_metrics.tpl`).
Otherwise, the file will be treated as a deployment, and the output file from
`helm template` will have an extra yaml document from the `.tpl` file.

View File

View File

@@ -0,0 +1,83 @@
import logging
import shutil
from abc import ABC, abstractmethod
from argparse import ArgumentParser, Namespace
from contextlib import ExitStack
from typing import Optional
from kubernetes.client import ApiClient
from pydantic import BaseModel
from ruamel import yaml
from kube_utils import maybe_dir, poll_namespace_has_objects, wait_for_no_objs_in_namespace
logger = logging.getLogger(__name__)
class BaseExperiment(ABC, BaseModel):
"""Base experiment that add an ExitStack with `workdir` to `run` and uses an internal `_run`.
How to use:
- Inherit from this class.
- Call `BaseExperiment.add_args` in the child class's `add_parser`
- Implement `_run` in the child class.
"""
@staticmethod
def add_args(subparser: ArgumentParser):
subparser.add_argument(
"--workdir",
type=str,
required=False,
default=None,
help="Folder to use for generating the deployment files.",
)
subparser.add_argument(
"--skip-check",
action="store_true",
required=False,
help="If present, does not wait until the namespace is empty before running the test.",
)
def run(
self,
api_client: ApiClient,
args: Namespace,
values_yaml: Optional[yaml.YAMLObject],
):
with ExitStack() as stack:
workdir = args.workdir
stack.enter_context(maybe_dir(workdir))
try:
shutil.rmtree(workdir)
except FileNotFoundError:
pass
self._run(
api_client=api_client,
workdir=workdir,
args=args,
values_yaml=values_yaml,
stack=stack,
)
@abstractmethod
def _run(
self,
api_client: ApiClient,
workdir: str,
args: Namespace,
values_yaml: Optional[yaml.YAMLObject],
stack: ExitStack,
):
pass
def _wait_until_clear(self, api_client: ApiClient, namespace: str, skip_check: bool):
# Wait for namespace to be clear unless --skip-check flag was used.
if not skip_check:
wait_for_no_objs_in_namespace(namespace=namespace, api_client=api_client)
else:
namepace_is_empty = poll_namespace_has_objects(
namespace=namespace, api_client=api_client
)
if not namepace_is_empty:
logger.warning(f"Namespace is not empty! Namespace: `{namespace}`")

View File

@@ -1,21 +1,18 @@
#!/usr/bin/env python3
import itertools
import logging
import os
import re
import shutil
import time
from argparse import ArgumentParser
from datetime import datetime, timedelta, timezone
from typing import Literal, Optional
import humanfriendly
from kubernetes import client
from kubernetes.client import ApiClient
from pydantic import BaseModel, ConfigDict, Field
from ruamel import yaml
from deployment.common import BaseExperiment
from kube_utils import (
cleanup_resources,
get_cleanup_resources,
@@ -37,9 +34,26 @@ logger = logging.getLogger(__name__)
class NimRegressionNodes(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
api_client: ApiClient = Field(default=client.ApiClient())
release_name: str = Field(default="nim-regression-nodes")
@staticmethod
def add_args(subparser: ArgumentParser):
subparser.add_argument(
"--delay",
type=str,
dest="delay",
required=False,
help="For nimlibp2p tests only. The delay before nodes activate in string format (eg. 1hr20min)",
)
@staticmethod
def add_parser(subparsers) -> None:
subparser = subparsers.add_parser(
"nimlibp2p-regression-nodes", help="Run a regression_nodes test using waku."
)
BaseExperiment.common_flags(subparser)
NimRegressionNodes.add_args(subparser)
def run(
self,
values_yaml: yaml.YAMLObject,

View File

@@ -0,0 +1,9 @@
import logging
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class Builder(BaseModel):
pass

View File

@@ -0,0 +1,42 @@
import logging
from argparse import Namespace
from ruamel import yaml
from deployment.common import BaseExperiment
from deployment.nimlibp2p.experiments.regression import NimRegressionNodes
from deployment.waku.experiments.regression import WakuRegressionNodes
from registry import experiment
logger = logging.getLogger(__name__)
@experiment(name="regression-nodes", type="dispatch")
class RegressionNodes:
"""Proxy for running waku-regression-nodes or nim-regression-nodes."""
@staticmethod
def add_parser(subparsers) -> None:
regression_nodes = subparsers.add_parser(
"regression-nodes", help="Run a regression_nodes test."
)
regression_nodes.add_argument(
"--type", type=str, choices=["waku", "nim"], required=True, help=""
)
BaseExperiment.add_args(regression_nodes)
NimRegressionNodes.add_args(regression_nodes)
def run(self, api_client, args: Namespace, values_yaml: yaml.YAMLObject):
logger.debug(f"args: {args}")
run_args = {
"api_client": api_client,
"args": args,
"values_yaml": values_yaml,
}
if args.type == "waku":
WakuRegressionNodes().run(**run_args)
elif args.type == "nim":
NimRegressionNodes().run(**run_args)
else:
raise ValueError(f"Unknown regression experiment type: `{args['type']}`")

View File

@@ -0,0 +1,16 @@
File structure
.
├── README.md
├── builders.py
├── experiments
│   └── ...
├── <service> (bootstrap/publisher/etc)
│ ├── Chart.yaml
│ ├── values.yaml # helm will use ./values.yaml as the base/default values.yaml
│ ├── templates
│ │   └── ...
│   └── ...
└── templates # Templates common to multiple services (nodes, publisher, etc.)
└── helpers

View File

View File

@@ -0,0 +1,4 @@
apiVersion: v2
name: my-chart
version: 0.1.0
description: A Helm chart for Kubernetes

View File

@@ -6,7 +6,7 @@ metadata:
spec:
replicas: 3
podManagementPolicy: "Parallel"
serviceName: zerotesting-bootstrap
serviceName: {{ default "zerotesting-bootstrap" .Values.serviceName }}
selector:
matchLabels:
app: zerotenkay-bootstrap
@@ -17,11 +17,10 @@ spec:
spec:
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local
- zerotesting-bootstrap.zerotesting.svc.cluster.local
{{- .Values.waku.bootstrap.dnsConfig.searches | toYaml | nindent 10}}
containers:
- name: waku
image: soutullostatus/nwaku-jq-curl:v0.34.0-rc1
image: {{ default "soutullostatus/nwaku-jq-curl" (.Values.waku.bootstrap.image).repository }}:{{ default "v0.34.0-rc1" (.Values.waku.bootstrap.image).tag }}
imagePullPolicy: IfNotPresent
readinessProbe:
httpGet:
@@ -48,7 +47,7 @@ spec:
- containerPort: 8645
- containerPort: 8008
command:
- sh
- -c
- |
/usr/bin/wakunode --relay=false --rest=true --rest-address=0.0.0.0 --max-connections=1000 --discv5-discovery=true --discv5-enr-auto-update=True --log-level=INFO --metrics-server=True --metrics-server-address=0.0.0.0 --nat=extip:$IP --cluster-id=2
{{- include "waku.container.command" ( dict
"includes" (dict)
"command" .Values.waku.bootstrap.command
) | nindent 14 }}

View File

@@ -0,0 +1,17 @@
waku:
bootstrap:
command:
presets:
regression:
relay: false
rest: true
restAddress: "0.0.0.0"
maxConnections: 1000
discv5Discovery: true
discv5EnrAutoUpdate: True
logLevel: "INFO"
metricsServer: True
metricsServerAddress: "0.0.0.0"
nat: "extip:$IP"
clusterId: 2

View File

@@ -0,0 +1,6 @@
waku:
bootstrap:
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local
- zerotesting-bootstrap.zerotesting.svc.cluster.local

View File

@@ -0,0 +1,9 @@
waku:
bootstrap:
command:
type: "regression"
args:
maxConnections: 1000
logLevel: "INFO"
clusterId: 2

View File

@@ -0,0 +1,96 @@
import logging
import os
import shutil
from typing import List, Literal, Optional
from kubernetes import client
from kubernetes.client import ApiClient
from pydantic import BaseModel, ConfigDict, Field
# from ruamel import yaml
from ruamel.yaml import YAMLObject
from ruamel.yaml.comments import CommentedMap
from kube_utils import get_values_yamls, get_YAML, helm_build_dir
logger = logging.getLogger(__name__)
class WakuBuilder(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
api_client: ApiClient = Field(default=client.ApiClient())
deployment_dir: str = Field(default=os.path.dirname(__file__))
def build(
self,
workdir: str,
cli_values: Optional[YAMLObject],
service: Literal["nodes", "publisher", "bootstrap"],
extra_values_names: Optional[List[str]] = None,
name: Optional[str] = None,
) -> YAMLObject:
"""
:param cli_values: Yaml object of values.yaml passed in main CLI.
:type cli_values: Optional[yaml.YAMLObject],
:param extra_values_names: The names of the extra values yamls to use from the ./values/ subdirectory. Eg. ["regression.values.yaml"]
:type extra_values_names: Optional[List[str]]
"""
logger.debug(f"Building waku deployment file. Deployment type: `{service}`")
if name is None:
name = service
if extra_values_names is None:
extra_values_names = []
if cli_values is None:
cli_values = CommentedMap()
work_sub_dir = os.path.join(workdir, service)
logger.debug(f"Removing work subdir: {work_sub_dir}")
try:
shutil.rmtree(work_sub_dir)
except FileNotFoundError:
pass
shutil.copytree(
os.path.join(self.deployment_dir, service),
work_sub_dir,
)
# TODO [error checking] Check for collision between service dir and common templates.
shutil.copytree(
os.path.join(self.deployment_dir, "templates"),
os.path.join(work_sub_dir, "templates"),
"templates",
dirs_exist_ok=True,
)
values_path = os.path.join(work_sub_dir, "cli_values.yaml")
yaml = get_YAML()
assert not os.path.exists(
values_path
), "Unexpected: cli_values.yaml already exists in template path."
with open(values_path, "w") as out:
yaml.dump(cli_values, out)
all_values = (
get_values_yamls(work_sub_dir)
+ [os.path.join("values", name) for name in extra_values_names]
+ [
os.path.relpath(values_path, work_sub_dir)
] # It is significant that [values_path] is at the end.
)
deployment = helm_build_dir(
workdir=work_sub_dir,
values_paths=all_values,
name=name,
)
# Dump the constructed deployment yaml for debugging/reference.
with open(os.path.join(work_sub_dir, "out_deployment.yaml"), "w") as out:
yaml.dump(deployment, out)
return deployment

View File

@@ -0,0 +1,116 @@
import logging
import time
from argparse import Namespace
from contextlib import ExitStack
from typing import Optional
from kubernetes.client import ApiClient
from pydantic import BaseModel, ConfigDict, Field
from ruamel import yaml
from deployment.common import BaseExperiment
from deployment.waku.builders import WakuBuilder
from kube_utils import assert_equals, get_cleanup, get_flag_value, kubectl_apply, wait_for_rollout
from registry import experiment
logger = logging.getLogger(__name__)
@experiment(name="waku-regression-nodes")
class WakuRegressionNodes(BaseExperiment, BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
release_name: str = Field(default="waku-regression-nodes")
@staticmethod
def add_parser(subparsers) -> None:
subparser = subparsers.add_parser(
"waku-regression-nodes", help="Run a regression_nodes test using waku."
)
BaseExperiment.add_args(subparser)
def _run(
self,
api_client: ApiClient,
workdir: str,
args: Namespace,
values_yaml: Optional[yaml.YAMLObject],
stack: ExitStack,
):
# TODO [values param checking]: Add friendly error messages for missing/extraneous variables in values.yaml.
logger.info("Building kubernetes configs.")
builder = WakuBuilder(api_client=api_client)
nodes = builder.build(workdir, values_yaml, "nodes", ["regression.values.yaml"])
bootstrap = builder.build(workdir, values_yaml, "bootstrap", ["regression.values.yaml"])
publisher = builder.build(workdir, values_yaml, "publisher", ["regression.values.yaml"])
# Sanity check
namespace = bootstrap["metadata"]["namespace"]
logger.info(f"namespace={namespace}")
assert_equals(nodes["metadata"]["namespace"], namespace)
assert_equals(publisher["metadata"]["namespace"], namespace)
# TODO [metadata output]: log start time to output file here.
logger.info("Applying kubernetes configs.")
cleanup = get_cleanup(
api_client=api_client,
namespace=namespace,
deployments=[bootstrap, nodes, publisher],
)
stack.callback(cleanup)
self._wait_until_clear(
api_client=api_client,
namespace=namespace,
skip_check=args.skip_check,
)
# Apply bootstrap
logger.info("Applying bootstrap")
kubectl_apply(bootstrap, namespace=namespace)
logger.info("bootstrap applied. Waiting for rollout.")
wait_for_rollout(bootstrap["kind"], bootstrap["metadata"]["name"], namespace, 2000)
num_nodes = nodes["spec"]["replicas"]
messages = get_flag_value("messages", publisher["spec"]["containers"][0]["command"])
delay_seconds = get_flag_value(
"delay-seconds", publisher["spec"]["containers"][0]["command"]
)
# Apply nodes configuration
logger.info("Applying nodes")
kubectl_apply(nodes, namespace=namespace)
logger.info("nodes applied. Waiting for rollout.")
timeout = num_nodes * 3000
wait_for_rollout(nodes["kind"], nodes["metadata"]["name"], namespace, timeout)
# TODO [metadata output]: log publish message start time
# Apply publisher configuration
logger.info("applying publisher")
kubectl_apply(publisher, namespace=namespace)
logger.info("publisher applied. Waiting for rollout.")
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
20,
api_client,
("Ready", "True"),
# TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True"
)
logger.info("publisher rollout done.")
timeout = num_nodes * messages * delay_seconds * 120
logger.info(f"Waiting for Ready=False. Timeout: {timeout}")
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
timeout,
api_client,
("Ready", "False"),
)
# TODO: consider state.reason == .completed
time.sleep(20)
# TODO [metadata output]: log publish message end time

View File

@@ -0,0 +1,4 @@
apiVersion: v2
name: my-chart
version: 0.1.0
description: A Helm chart for Kubernetes

View File

@@ -0,0 +1,27 @@
{{- define "waku.nodes.postgress.container" -}}
- name: postgres
image: postgres:15.1-alpine
imagePullPolicy: IfNotPresent
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
env:
- name: POSTGRES_DB
value: wakumessages
- name: POSTGRES_USER
value: wakuuser
- name: POSTGRES_PASSWORD
value: wakupassword
ports:
- containerPort: 5432
readinessProbe:
exec:
command:
- sh
- -c
- |
pg_isready -U wakuuser -d wakumessages
initialDelaySeconds: 5
periodSeconds: 2
timeoutSeconds: 5
{{- end }}

View File

@@ -0,0 +1,21 @@
waku:
command:
presets:
basic:
relay : true
maxConnections : 150
rest : true
restAdmin : true
restAddress : 0.0.0.0
discv5Discovery : true
discv5EnrAutoUpdate : True
logLevel : INFO
metricsServer : True
metricsServerAddress : 0.0.0.0
discv5BootstrapNode:
- $ENR1
- $ENR2
- $ENR3
nat : extip:${IP}
clusterId : 2
shard : 0

View File

@@ -0,0 +1,21 @@
waku:
command:
presets:
filter:
relay: true
maxConnections: 150
rest: true
restAdmin: true
restAddress: 0.0.0.0
discv5Discovery: true
discv5EnrAutoUpdate: True
logLevel: INFO
metricsServer: True
metricsServerAddress: 0.0.0.0
discv5BootstrapNode:
- $ENR1
- $ENR2
- $ENR3
nat: extip:${IP}
clusterId: 2
shard: 0

View File

@@ -0,0 +1,15 @@
waku:
command:
presets:
lightpush:
lightpushnode: $addrs1
relay: false
rest: true
restAdmin: true
restAddress: 0.0.0.0
logLevel: INFO
metricsServer: True
metricsServerAddress: 0.0.0.0
nat: extip:${IP}
clusterId: 2
shard: 0

View File

@@ -0,0 +1,24 @@
{{- define "waku.commandSets.regression" -}}
{{- $values := .values -}}
waku:
command:
presets:
regression:
relay : {{ $values.waku.nodes.isRelay }}
maxConnections : 150
rest : true
restAdmin : true
restAddress : 0.0.0.0
discv5Discovery : true
discv5EnrAutoUpdate : True
logLevel : {{ $values.logLevel }}
metricsServer : True
metricsServerAddress : 0.0.0.0
discv5BootstrapNode:
- $ENR1
- $ENR2
- $ENR3
nat : extip:${IP}
clusterId : 2
shard : 0
{{- end -}}

View File

@@ -0,0 +1,24 @@
{{- define "waku.commandSets.regression" -}}
{{- $value := .values -}}
waku:
command:
presets:
regression:
relay : true
maxConnections : 150
rest : true
restAdmin : true
restAddress : 0.0.0.0
discv5Discovery : true
discv5EnrAutoUpdate : True
logLevel : INFO
metricsServer : True
metricsServerAddress : 0.0.0.0
discv5BootstrapNode:
- $ENR1
- $ENR2
- $ENR3
nat : extip:${IP}
clusterId : 2
shard : 0
{{- end -}}

View File

@@ -0,0 +1,22 @@
waku:
nodes:
command:
presets:
regression:
relay : true
maxConnections : 150
rest : true
restAdmin : true
restAddress : 0.0.0.0
discv5Discovery : true
discv5EnrAutoUpdate : True
logLevel : INFO
metricsServer : True
metricsServerAddress : 0.0.0.0
discv5BootstrapNode:
- $ENR1
- $ENR2
- $ENR3
nat : extip:${IP}
clusterId : 2
shard : 0

View File

@@ -0,0 +1,20 @@
waku:
command:
presets:
relay:
relay: true
maxConnections: 150
rest: true
restAdmin: true
restAddress: 0.0.0.0
discv5Discovery: true
discv5EnrAutoUpdate: True
logLevel: INFO
metricsServer: True
metricsServerAddress: 0.0.0.0
discv5BootstrapNode: $ENR1
discv5BootstrapNode: $ENR2
discv5BootstrapNode: $ENR3
nat: extip:${IP}
clusterId: 2
shard: 0

View File

@@ -0,0 +1,21 @@
waku:
command:
presets:
store:
relay: true
maxConnections: 150
rest: true
restAdmin: true
restPrivate: true
restAddress: 0.0.0.0
discv5Discovery: true
discv5EnrAutoUpdate: True
logLevel: INFO
metricsServer: True
metricsServerAddress: 0.0.0.0
discv5BootstrapNode: $ENR1
discv5BootstrapNode: $ENR2
discv5BootstrapNode: $ENR3
nat: extip:${IP}
clusterId: 2
pubsubTopic: "/waku/2/rs/2/0"

View File

@@ -0,0 +1,13 @@
{{- define "waku.nodes.getAddress" -}}
- name: grabaddress
image: {{ default "soutullostatus/getaddress" .Values.waku.initContainers.getAddress.repo }}: {{ default "v0.1.0" .Values.waku.getAddress.tag }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: address-data
mountPath: /etc/addrs
command:
- /app/getaddress.sh
args:
- {{ include "ensureQuoted" (default "1" .Values.initContainers.getAddress.numAddrs) }}
- {{ default "" .Values.initContainers.getAddress.serviceName }}
{{- end }}

View File

@@ -0,0 +1,34 @@
{{- define "waku.nodes.getEnr" -}}
{{- $values := .Values }}
- name: grabenr
image: {{ default "soutullostatus/getenr" $values.getEnr.repo }}:{{ default "v0.5.0" .Values.getEnr.tag }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: enr-data
mountPath: /etc/enr
command:
- /app/getenr.sh
args:
# Check to make sure the number of environment variables matches the numEnrs arg we give to the shell script.
# TODO [waku-regression-nodes sanity checks]: add this same sanity check to getAddress.tpl.
{{- if not ($values.command.full).container }}
{{- if ($values.command.full).waku }}
{{ include "assertFlagCountInCommand" ( dict
"command" $values.command.full.waku
"flag" "--discv5-bootstrap-node"
"expectedCount" (default 3 $values.getEnr.numEnrs)) | indent 2 }}
{{ else }}
{{- $preset := $values.command.type | default "basic" }}
{{- $wakuCommand := include "command.genArgs" ( dict
"args" $values.command.args
"presets" $values.command.presets
"preset" $preset) | indent 4 }}
{{- include "assertFlagCountInCommand" ( dict
"command" $wakuCommand
"flag" "--discv5-bootstrap-node"
"expectedCount" (default 3 $values.getEnr.numEnrs)) | indent 2}}
{{- end }}
{{- end }}
{{- toYaml (list (toString (default 3 $values.names))) | nindent 4 }}
{{- toYaml (list ( default "" $values.getEnr.serviceName )) | nindent 4 }}
{{- end }}

View File

@@ -0,0 +1,103 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Values.name }}
namespace: {{ default "zerotesting" .Values.namespace }}
spec:
replicas: {{ .Values.waku.nodes.numNodes }}
podManagementPolicy: "Parallel"
serviceName: {{ .Values.serviceName }}
selector:
matchLabels:
app: {{ .Values.app }}
template:
metadata:
labels:
app: {{ .Values.app }}
spec:
dnsConfig:
{{- toYaml .Values.waku.nodes.dnsConfig | nindent 8 }}
volumes:
{{ with .Values.volumes }}
{{ toYaml . }}
{{ end }}
{{ if .Values.waku.nodes.includes.getEnr }}
- name: enr-data
emptyDir: {}
{{ end }}
{{ if .Values.waku.nodes.includes.getAddress }}
- name: address-data
emptyDir: {}
{{ end }}
{{ if .Values.waku.nodes.storeNode }}
- name: postgres-data
emptyDir: {}
{{ end }}
initContainers:
{{ if .Values.waku.nodes.initContainers }}
{{ toYaml .Values.waku.nodes.initContainers }}
{{ end }}
{{- if .Values.waku.nodes.includes.getAddress }}
{{- include "waku.nodes.getAddress" ( dict "Values" .Values.waku.nodes ) | nindent 8 }}
{{- end }}
{{- if .Values.waku.nodes.includes.getEnr }}
{{- include "waku.nodes.getEnr" ( dict "Values" .Values.waku.nodes ) | nindent 8 }}
{{- end }}
containers:
{{- if .Values.waku.nodes.storeNode }}
{{- include "waku.nodes.postgress.container" . | indent 5 }}
{{- end }}
- name: waku
image: {{ default "soutullostatus/nwaku-jq-curl" .Values.waku.nodes.image.repository }}:{{ default "v0.34.0-rc1" .Values.waku.nodes.image.tag }}
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8645
- containerPort: 8008
volumeMounts:
{{- with .Values.waku.nodes.volumesMounts }}
{{ toYaml . | indent 7 }}
{{- end }}
{{- if .Values.waku.nodes.includes.getAddress }}
- name: address-data
mountPath: /etc/addrs
{{- end }}
{{- if .Values.waku.nodes.includes.getEnr }}
- name: enr-data
mountPath: /etc/enr
{{- end }}
readinessProbe:
exec:
command:
{{- include "valueOrPreset"
(dict "value" .Values.waku.nodes.readinessProbe.command
"presetKey" (default "health" .Values.waku.nodes.readinessProbe.type)
"presets" .Values.waku.nodes.readinessProbe.presets ) | nindent 16 }}
successThreshold: 5
initialDelaySeconds: 5
periodSeconds: 1
failureThreshold: 2
timeoutSeconds: 5
resources:
requests:
memory: "64Mi"
cpu: "150m"
limits:
memory: "600Mi"
cpu: "400m"
env:
- name: IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if .Values.waku.nodes.storeNode }}
- name: POSTGRES_URL
value: "postgres://wakuuser:wakupassword@127.0.0.1:5432/wakumessages"
{{- end }}
command:
{{- include "waku.container.command" ( dict
"includes" (dict
"getEnr" .Values.waku.nodes.getEnr
"getAddress" .Values.waku.nodes.address
)
"command" .Values.waku.nodes.command
) | nindent 14 }}

View File

@@ -0,0 +1,10 @@
{{- define "waku.nodes.readiness-probe.health" -}}
- /bin/sh
- -c
- >
if curl -s http://127.0.0.1:8008/health | grep -q 'OK'; then
exit 0; # success, healthy state
else
exit 1; # failure, unhealthy state
fi
{{- end }}

View File

@@ -0,0 +1,22 @@
{{- define "waku.nodes.readiness-probe.metrics" -}}
- /bin/sh
- -c
- >
curl_output=$(curl -s http://127.0.0.1:8008/metrics);
curl_status=$?;
if [ $curl_status -ne 0 ]; then
echo "Curl failed with status $curl_status";
exit 1; # failure, unhealthy state
fi;
echo "$curl_output" | awk '
!/^#/ && /^libp2p_gossipsub_healthy_peers_topics / {
print "Found gossipsub:", $0;
if ($2 == 1.0) {
exit 0; # success, healthy state
} else {
exit 1; # failure, unhealthy state
}
}
END { if (NR == 0) exit 1 } # If no matching line is found, exit with failure
'
{{- end }}

View File

@@ -0,0 +1,6 @@
waku:
nodes:
readinessProbe:
presets:
health: "include:waku.nodes.readiness-probe.health"
metrics: "include:waku.nodes.readiness-probe.metrics"

View File

@@ -0,0 +1,4 @@
waku:
command:
args: {}
container: {}

View File

@@ -0,0 +1,4 @@
# TODO [lightpush]: incomplete
includes:
getAddress: true

View File

@@ -0,0 +1,29 @@
name: nodes-0
namespace: zerotesting
serviceName: zerotesting-service
app: zerotenkay
waku:
nodes:
numNodes: 10
getEnr:
repo: "soutullostatus/getenr"
tag: "v0.5.0"
numEnrs: 3
serviceName: "zerotesting-bootstrap.zerotesting"
command:
type: "regression"
args:
maxConnections: 200
readinessProbe:
type: "metrics"
includes:
getEnr: True
image:
repository: "soutullostatus/nwaku-jq-curl"
tag: "v0.34.0-rc1"
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local

View File

@@ -0,0 +1,4 @@
apiVersion: v2
name: my-chart
version: 0.1.0
description: A Helm chart for Kubernetes

View File

@@ -0,0 +1,27 @@
{{- define "waku.publisher.container.command" -}}
{{- $values := .values -}}
{{- if $values.full }}
{{ $values.full }}
{{- else }}
- sh
- -c
- |
python /app/traffic.py
{{- $preset := $values.type }}
{{- print " --protocols" }}
{{- $protocolDict := (include "valueOrPreset" (dict
"value" $values.protocols
"presetKey" $preset
"presets" $values.protocolPresets
"asYaml" true
)) | fromYaml }}
{{- $protocolDict = (include "map.keepTrue" $protocolDict) | fromYaml }}
{{- range $protocol, $_ := $protocolDict }} {{ $protocol }} {{- end }}
{{- printf " \\" -}}
{{- include "command.genArgs" (dict
"args" $values.args
"presets" $values.presets
"preset" $preset
) | nindent 4 }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,13 @@
waku:
publisher:
command:
protocolPresets:
regression:
relay: true
presets:
regression:
messages: 600
msgSizeKbytes: 1
delaySeconds: 1
pubsubTopic: "/waku/2/rs/2/"

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: {{ default "publisher" .Values.name }}
namespace: {{ default "zerotesting" .Values.namespace }}
spec:
restartPolicy: Never
dnsConfig:
{{- toYaml .Values.waku.publisher.dnsConfig | nindent 4 }}
containers:
- name: publisher-container
image: {{ default "soutullostatus/publisher" (.Values.waku.publisher.image).repository }}:{{ default "testing" (.Values.waku.publisher.image).tag }}
imagePullPolicy: Always
command:
{{- include "waku.publisher.container.command" ( dict "values" .Values.waku.publisher.command ) | nindent 8 }}

View File

@@ -0,0 +1,11 @@
waku:
publisher:
container: {}
command:
args: {}
protocols: {}
image: {}
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local

View File

@@ -0,0 +1,9 @@
# Example of regression values.yaml.
waku:
publisher:
command:
type: "regression"
args:
messages: 10
msgSizeKbytes: 10
delaySeconds: 1

View File

@@ -1,109 +0,0 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nodes-0
namespace: {{ default "zerotesting" .Values.namespace }}
spec:
replicas: {{ .Values.numNodes }}
podManagementPolicy: "Parallel"
serviceName: zerotesting-service
selector:
matchLabels:
app: zerotenkay
template:
metadata:
labels:
app: zerotenkay
tags: AutomationTesting
spec:
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local
volumes:
- name: enr-data
emptyDir: {}
initContainers:
- name: grabenr
image: {{ default "soutullostatus/getenr" .Values.initContainers.repository }}:{{ default "v0.5.0" .Values.initContainers.image.tag }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: enr-data
mountPath: /etc/enr
command:
- /app/getenr.sh
args:
- "3"
- "zerotesting-bootstrap.zerotesting"
containers:
- name: waku
image: {{ default "soutullostatus/nwaku-jq-curl" .Values.image.repository }}:{{ default "v0.34.0-rc1" .Values.image.tag }}
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8645
- containerPort: 8008
volumeMounts:
- name: enr-data
mountPath: /etc/enr
readinessProbe:
exec:
command:
- /bin/sh
- -c
- >
curl_output=$(curl -s http://127.0.0.1:8008/metrics);
curl_status=$?;
if [ $curl_status -ne 0 ]; then
echo "Curl failed with status $curl_status";
exit 1; # failure, unhealthy state
fi;
echo "$curl_output" | awk '
!/^#/ && /^libp2p_gossipsub_healthy_peers_topics / {
print "Found gossipsub:", $0;
if ($2 == 1.0) {
exit 0; # success, healthy state
} else {
exit 1; # failure, unhealthy state
}
}
END { if (NR == 0) exit 1 } # If no matching line is found, exit with failure
'
successThreshold: 5
initialDelaySeconds: 5
periodSeconds: 1
failureThreshold: 2
timeoutSeconds: 5
resources:
requests:
memory: "64Mi"
cpu: "150m"
limits:
memory: "600Mi"
cpu: "400m"
env:
- name: IP
valueFrom:
fieldRef:
fieldPath: status.podIP
command:
- sh
- -c
- |
. /etc/enr/enr.env
echo ENRs are $ENR1 $ENR2 $ENR3
nice -n 19 /usr/bin/wakunode \
--relay=true \
--max-connections=150 \
--rest=true \
--rest-admin=true \
--rest-address=0.0.0.0 \
--discv5-discovery=true \
--discv5-enr-auto-update=True \
--log-level=INFO \
--metrics-server=True \
--metrics-server-address=0.0.0.0 \
--discv5-bootstrap-node=$ENR1 \
--discv5-bootstrap-node=$ENR2 \
--discv5-bootstrap-node=$ENR3 \
--nat=extip:${IP} \
--cluster-id=2 \
--shard=0

View File

@@ -1,24 +0,0 @@
apiVersion: v1
kind: Pod
metadata:
name: publisher
namespace: {{ default "zerotesting" .Values.namespace }}
spec:
restartPolicy: Never
dnsConfig:
searches:
- zerotesting-service.zerotesting.svc.cluster.local
containers:
- name: publisher-container
image: soutullostatus/publisher:testing
imagePullPolicy: Always
command:
- sh
- -c
- |
python /app/traffic.py \
--messages={{ .Values.messages }} \
--msg-size-kbytes={{ .Values.msgSizeKbytes }} \
--delay-seconds={{ .Values.delaySeconds }} \
--pubsub-topic="/waku/2/rs/2/" \
--protocols relay

View File

@@ -1,13 +0,0 @@
numNodes: 3000
messages: 600
msgSizeKbytes: 1
delaySeconds: 1
namespace: zerotesting
initContainers:
image:
tag: null
repository: null
image:
image:
tag: null
repository: null

View File

@@ -0,0 +1,102 @@
{{/*
toHyphenCase
Converts a lowerCamelCase or PascalCase string to hyphen-case (kebab-case).
For example: "maxConnections" becomes "max-connections".
Useful for mapping YAML/Helm value keys to command-line flag names.
Usage:
{{ include "toHyphenCase" "maxConnections" }}
*/}}
{{- define "toHyphenCase" -}}
{{- regexReplaceAll "(?m)([a-z0-9])([A-Z])" . "${1}-${2}" | lower -}}
{{- end }}
{{- define "quoteIfNeeded" -}}
{{- $val := . -}}
{{- if not (kindIs "string" $val) -}}
{{- printf "%v" $val -}}
{{- else -}}
{{- $str := $val | trim -}}
{{- if regexMatch "^(['\"]).*\\1$" $str -}}
{{- /* Already quoted */ -}}
{{- $val -}}
{{- else -}}
{{- if regexMatch "[\\s]" $val -}}
{{- $escaped := replace $val "\"" "\\\"" -}}
"{{ $escaped }}"
{{- else -}}
{{- $val -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{- end }}
{{/*
command.genArgs
Generates command-line arguments from a combination of user overrides and preset values.
- Accepts a dict with:
- "args": map of user-supplied arguments (can be empty or undefined).
- "presets": map of preset argument sets (e.g., .Values.presets).
- "preset": name of the preset to use.
- For each unique key in either `args` or the selected preset:
- If the value is nil or empty, outputs a switch: --flag
- If the value is a list, outputs multiple: --flag="item" (one per list item)
- `value` is from `args` if in `args`, otherwise from the preset.
Usage:
{{ include "command.genArgs" (dict "args" .Values.command.args "presets" .Values.presets "preset" .Values.preset) }}
Example output:
--log-level=INFO \
--enable-debug \
--discv5-bootstrap-node=$ENR1 \
--discv5-bootstrap-node=$ENR2
*/}}
{{- define "command.genArgs" -}}
{{- $args := .args | default dict -}}
{{- $presets := .presets | default dict -}}
{{- $presetName := .preset | default "" -}}
{{- $preset := (index $presets $presetName) | default dict -}}
{{- /* Collect all unique keys */ -}}
{{- $allKeys := dict -}}
{{- range $key, $value := $preset }} {{- $_ := set $allKeys $key true }} {{- end -}}
{{- range $key, $value := $args }} {{- $_ := set $allKeys $key true }} {{- end -}}
{{- /* Convert keys dict to list */ -}}
{{- $keys := list -}}
{{- range $key, $_ := $allKeys }} {{- $keys = append $keys $key }} {{- end -}}
{{- /* Collect all argument lines into a slice */ -}}
{{- $lines := list -}}
{{- range $i, $key := $keys }}
{{- $value := (index $args $key) | default (index $preset $key) -}}
{{- $flag := include "toHyphenCase" $key -}}
{{- if eq $value nil }}
{{- $lines = append $lines (printf "--%s" $flag) }}
{{- else if kindIs "slice" $value }}
{{- range $item := $value }}
{{- $arg := printf "--%s=%s" $flag (include "quoteIfNeeded" $item) }}
{{- $lines = append $lines $arg }}
{{- end }}
{{- else }}
{{- $arg := printf "--%s=%s" $flag (include "quoteIfNeeded" $value) }}
{{- $lines = append $lines $arg }}
{{- end }}
{{- end }}
{{- /* Print lines with trailing backslash except last */ -}}
{{- $lastIndex := sub (len $lines) 1 -}}
{{- range $i, $line := $lines }}
{{- if lt $i $lastIndex }}
{{- printf "%s \\\n" $line }}
{{- else }}
{{- printf "%s\n" $line }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,162 @@
{{/*
valueOrPreset
Returns .value if it is set (not nil, not empty).
Otherwise, checks the value from .presets[.presetKey]:
- If it starts with "include:", treats the rest as a template name and includes it.
- Otherwise, returns the preset value.
Optionally, pass `"asYaml": true` to serialize the result as YAML (allowing for piping to `fromYaml`).
If `"asYaml"` is omitted or false, the result is returned as a string (for direct YAML insertion).
Parameters:
.value - The direct value to use if set
.presetKey - The key to use for the preset
.presets - Map of preset values or template references (e.g., from values.yaml)
.asYaml - (optional, bool) If true, output is YAML (for use with `fromYaml`); if false or omitted, output is a string
Usage:
# For direct YAML insertion (string result):
{{ include "valueOrPreset" (dict "value" .Values.container.command "presetKey" .Values.type "presets" .Values.commandPresets) }}
# For dictionary/object use (YAML result, e.g., for fromYaml):
{{- $myDict := (include "valueOrPreset" (dict "value" .Values.command "presetKey" .Values.type "presets" .Values.presets "asYaml" true) | fromYaml) }}
Examples:
# In values.yaml:
commandPresets:
type_1: "include:commandTemplateType1"
type_2: "echo Goodbye"
# In _helpers.tpl:
{{- define "commandTemplateType1" -}}
echo "Hello from template 1"
{{- end }}
# In template:
{{ include "valueOrPreset" (dict "value" .Values.container.command "presetKey" .Values.type "presets" .Values.commandPresets) }}
*/}}
{{- define "valueOrPreset" -}}
{{- $value := .value -}}
{{- $presetKey := .presetKey -}}
{{- $presets := .presets -}}
{{- $asYaml := .asYaml | default false -}}
{{- $result := "" -}}
{{- if $value }}
{{- $result = $value }}
{{- else }}
{{- $presetValue := index $presets $presetKey }}
{{- if and $presetValue (kindIs "string" $presetValue) (hasPrefix "include:" $presetValue) }}
{{- $tplName := trimPrefix "include:" $presetValue | trim }}
{{- $result = include $tplName . }}
{{- else if $presetValue }}
{{- $result = $presetValue }}
{{- end }}
{{- end }}
{{- if $asYaml }}
{{- toYaml $result }}
{{- else }}
{{- $result }}
{{- end }}
{{- end }}
{{/*
applyAll
- Outputs .value if it is set (not nil, not empty).
- For each item in .list (if set), looks up the value in .dict:
- If the value starts with "include:", treats the rest as a template name and includes it.
- Otherwise, outputs the value as a string.
Parameters:
.value - A single value to output if set.
.dict - A dictionary (map) of key-value pairs or template references.
.list - A list of keys to look up in .dict. If not set, treated as an empty list.
Usage:
{{ include "applyAll" (dict
"value" .Values.current.sportTool
"dict" (dict "baseball" "bat" "hockey" "stick" "bowling" "include:bowlingTemplate")
"list" .Values.current.sports
) }}
Example:
If .Values.current.sportTool is "helmet", and .Values.current.sports is ["baseball", "bowling"], output:
helmet
bat
[contents of bowlingTemplate]
*/}}
{{- define "applyAll" -}}
{{- $value := .value -}}
{{- $dict := .dict -}}
{{- $list := .list | default (list) -}}
{{- if $value }}
{{ $value }}
{{- end }}
{{- range $item := $list }}
{{- $dictVal := index $dict $item }}
{{- if $dictVal }}
{{- if hasPrefix "include:" $dictVal }}
{{- $tplName := trimPrefix "include:" $dictVal | trim }}
{{- include $tplName . }}
{{- else }}
{{ $dictVal }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{/*
assertFlagCountInCommand
Asserts that the number of times a given flag (e.g., "--flag1") appears in a command string equals the expected count.
Fails template rendering if not.
Parameters:
.command (string) - The command string to check.
.flag (string) - The flag to search for (e.g., "--flag1").
.expectedCount (int) - The expected number of times the flag should appear.
Usage:
{{ include "assertFlagCountInCommand" (dict
"command" .Values.command
"flag" "--flag1"
"expectedCount" .Values.expectedNumFlags
) }}
*/}}
{{- define "assertFlagCountInCommand" -}}
{{- $command := .command | toString -}}
{{- $flag := .flag | toString -}}
{{- $expectedCount := .expectedCount | int -}}
{{- $pattern := printf "(^|\\s)%s(\\s|=|$)" $flag -}}
{{- $matches := regexFindAll $pattern $command -1 -}}
{{- $actualCount := len $matches -}}
{{- if ne $expectedCount $actualCount }}
{{- fail (printf "Assertion failed: expected %d instances of flag '%s' in command, but found %d" $expectedCount $flag $actualCount) }}
{{- end }}
{{- end }}
{{/*
map.keepTrue
Given a dictionary, return a new dictionary comprised of all key value pairs for which value is true.
Usage:
{{ include "map.keepTrue" .Values.shouldInclude }}
*/}}
{{- define "map.keepTrue" -}}
{{- $out := dict -}}
{{- range $key, $value := . }}
{{- if $value }}
{{- $out = set $out $key $value }}
{{- end }}
{{- end }}
{{- toYaml $out }}
{{- end }}

View File

@@ -0,0 +1,33 @@
{{- define "waku.container.command" -}}
{{- $includes := .includes -}}
{{- $command := .command -}}
{{- if ($command.full).container -}}
{{ $command.full.container }}
{{- else -}}
- sh
- -c
- |
{{- if $includes.getAddress }}
. /etc/addrs/addrs.env
echo addrs are{{- range $i, $ := until (int $includes.getAddr.numEnrs) }} $ENR{{ add1 $i }}{{- end }}
{{- end }}
{{- if $includes.getEnr }}
. /etc/enr/enr.env
echo ENRs are{{- range $i, $ := until (int $includes.getEnr.numEnrs) }} $ENR{{ add1 $i }}{{- end }}
{{- end }}
{{- if ($command.full).waku }}
{{- $command.full.waku | indent 1 }}
{{- else }}
{{- if $command.sleep }}
sleep 10
{{- end }}
/usr/bin/wakunode \
{{- $preset := $command.type | default "basic" -}}
{{- include "command.genArgs" ( dict
"args" $command.args
"presets" $command.presets
"preset" $preset) | nindent 4 -}}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -1,10 +1,9 @@
#!/usr/bin/env python3
import argparse
import contextlib
import glob
import itertools
import logging
import logging.config
import os
import re
import shutil
@@ -13,7 +12,7 @@ import tempfile
import time
from datetime import datetime, timedelta
from datetime import timezone as dt_timezone
from typing import Dict, Iterator, List, Optional, Tuple, Union
from typing import Callable, Dict, Iterator, List, Optional, Tuple, Union
import dateparser
from kubernetes import client, utils
@@ -96,7 +95,7 @@ def init_logger(logger: logging.Logger, verbosity: Union[str, int], log_path: Op
logger = logging.getLogger(__name__)
def kubectl_apply(kube_yaml, namespace="zerotesting"):
def kubectl_apply(kube_yaml: yaml.YAMLObject, namespace="zerotesting"):
logger.debug(f"kubectl_apply the following config:\n{str(yaml.dump(kube_yaml))}")
with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml", delete=False) as temp:
yaml.dump(kube_yaml, temp)
@@ -104,7 +103,7 @@ def kubectl_apply(kube_yaml, namespace="zerotesting"):
utils.create_from_yaml(client.ApiClient(), yaml_file=temp.name, namespace=namespace)
def get_cleanup_resources(yamls: list[yaml.YAMLObject], types: Optional[List[str]] = None):
def get_cleanup_resources(yamls: List[yaml.YAMLObject], types: Optional[List[str]] = None):
"""
Get dict of resources to cleanup based on yamls.
@@ -288,6 +287,23 @@ def wait_for_cleanup(
time.sleep(polling_interval)
def get_cleanup(
api_client: ApiClient, namespace: str, deployments: List[yaml.YAMLObject]
) -> Callable[[], None]:
def cleanup():
logger.info("Cleaning up resources.")
resources_to_cleanup = get_cleanup_resources(deployments)
logger.info(f"Resources to clean up: `{resources_to_cleanup}`")
logger.info("Start cleanup.")
cleanup_resources(resources_to_cleanup, namespace, api_client)
logger.info("Waiting for cleanup.")
wait_for_cleanup(resources_to_cleanup, namespace, api_client)
logger.info("Finished cleanup.")
return cleanup
def poll_rollout_status(
kind: str,
name: str,
@@ -559,7 +575,7 @@ def get_defs_from_template(template_path):
def validate_values_yaml(values_yaml, template_yamls: List[yaml.yaml_object]):
# TODO: ensure bijection between values.yaml and deployments.yaml.
# Consider experiments with multiple deployments.yaml. For example:
# boostrap, nodes, publishers.
# bootstrap, nodes, publishers.
raise NotImplementedError()
@@ -603,7 +619,7 @@ def dict_add(dict: Dict, path: str | List[str], value, sep=os.path.sep) -> None:
dict[path[-1]] = value
def default_chart_yaml(name):
def default_chart_yaml_str(name) -> str:
return """
apiVersion: v2
name: {name}
@@ -615,19 +631,42 @@ def default_chart_yaml(name):
def helm_build_dir(workdir: str, values_paths: List[str], name: str) -> yaml.YAMLObject:
values = [["--values", values_path] for values_path in values_paths]
command = ["helm", "template", ".", "--name-template", name, "--debug"] + list(
itertools.chain(*values)
)
logger.info(f"Running helm template command. cwd: `{workdir}`\tcommand: `{command}`")
logger.info(f"Usable command: `{' '.join(command)}`")
result = subprocess.run(
["helm", "template", ".", "--name-template", name, "--debug"]
+ list(itertools.chain(*values)),
command,
cwd=workdir,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise Exception(result.stderr)
raise Exception(
f"Failed to build helm template. cwd: `{workdir}`\tcommand: `{command}`\tstderr: `{result.stderr}`"
)
return yaml.safe_load(result.stdout)
import ruamel.yaml
def get_YAML():
"""Return a ruamel.yaml.YAML() that dumps multipline strings as multiple lines instead of escaping newlines."""
def str_representer(dumper, data):
if "\n" in data:
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml = ruamel.yaml.YAML()
yaml.Representer.add_representer(str, str_representer)
yaml.indent(mapping=2, sequence=4, offset=2)
return yaml
def helm_build(
# list of (source_path, target_path) or (source_path)
deployment_template_paths: Union[List[Tuple[str, str]], List[str]],
@@ -672,7 +711,6 @@ def helm_build(
def helm_build_from_params(
# template_path, values_yaml, out_path, workdir
template_path,
values_yaml: yaml.YAMLObject,
workdir: str,
@@ -684,11 +722,32 @@ def helm_build_from_params(
which will be used for `.Release.Name` when making the deployment template.
"""
values = [("values.yaml", values_yaml)]
chart_yaml = default_chart_yaml("my-chart")
chart_yaml = default_chart_yaml_str("my-chart")
name = name if name else "noname"
return helm_build([template_path], values, workdir, name, chart_yaml)
def relative_paths(base_path: str, paths: List[str]) -> List[str]:
return [
os.path.relpath(
os.path.join(base_path, path) if not os.path.isabs(path) else path, base_path
)
for path in paths
]
def get_values_yamls(work_sub_dir):
"""Get all *.yaml files from this experiment that should be included in `--values <values.yaml>` args.
Make sure to add your own values.yaml passed through the CLI.
"""
templates_dir = os.path.join(work_sub_dir, "templates")
return [
os.path.relpath(path, work_sub_dir)
for path in glob.glob(os.path.join(templates_dir, "**", "*.values.yaml"), recursive=True)
]
@contextlib.contextmanager
def maybe_dir(dir: Optional[str]) -> Iterator[str]:
if dir:
@@ -765,3 +824,13 @@ def str_to_timedelta(duration: str):
if parsed_date is None:
raise ValueError(f"Failed to parse duration: `{duration}`")
return utc_now - parsed_date
def get_flag_value(flag: str, command: List[str]) -> Optional[int]:
for node in command:
matches = re.search(f"--{flag}=(?P<numMessages>\\d+)", node)
try:
return int(matches["numMessages"])
except (TypeError, IndexError):
pass
return None

View File

@@ -1,32 +1,40 @@
#!/usr/bin/env python3
import argparse
import logging
import os
from typing import Optional
from kubernetes import config
from kubernetes.client import ApiClient
from ruamel import yaml
from kube_utils import (
init_logger,
)
from regression_tests.dispatch import add_subparser as add_regression_tests_subparser
from regression_tests.dispatch import run_regression_tests
from kube_utils import init_logger
from registry import registry as experiment_registry
logger = logging.getLogger(__name__)
def run_experiment(experiment, params, values_path, kube_config=None):
logger.debug(f"params: {params}")
def run_experiment(
name: str,
args: argparse.Namespace,
values_path: Optional[str],
kube_config=None,
):
logger.debug(f"params: {args}")
if not kube_config:
kube_config = "~/.kube/config"
config.load_kube_config(config_file=kube_config)
api_client = ApiClient()
# TODO [automatic experiment collection]: Programmatically gather tests by searching in test folders.
if experiment == "regression_nodes":
run_regression_tests(api_client, params, values_path)
else:
raise NotImplementedError()
try:
with open(values_path, "r") as values:
values_yaml = yaml.safe_load(values.read())
except TypeError:
# values_path is None.
values_yaml = None
info = experiment_registry[name]
experiment = info.cls()
experiment.run(api_client, args, values_yaml)
def main():
@@ -36,7 +44,7 @@ def main():
subparsers = parser.add_subparsers(dest="experiment", required=True)
parser.add_argument("--values", required=True, help="", dest="values_path")
parser.add_argument("--values", default=None, help="Path to values.yaml", dest="values_path")
parser.add_argument(
"--config",
required=True,
@@ -60,17 +68,23 @@ def main():
help="Pipes the log to given file in addition to stdout/stderr.",
)
# Add more subparsers as needed for new experiments here.
add_regression_tests_subparser(subparsers)
# Scan for experiments.
experiment_registry.scan(os.path.join(os.path.dirname(__file__), "deployment"), mode="skip")
# Add subparsers for all experiments.
for info in experiment_registry.items():
try:
info.cls.add_parser(subparsers)
except AttributeError as e:
raise AttributeError(f"{info}") from e
args = parser.parse_args()
verbosity = args.verbosity or 2
init_logger(logging.getLogger(), verbosity, args.log_file_path)
params = vars(args)
try:
run_experiment(
experiment=args.experiment,
params=params,
name=args.experiment,
args=args,
values_path=args.values_path,
kube_config=args.kube_config,
)

107
experiments/registry.py Normal file
View File

@@ -0,0 +1,107 @@
import importlib
import logging
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
logger = logging.getLogger(__name__)
@dataclass
class ExperimentInfo:
name: str
cls: type
metadata: Dict[str, Any]
class Registry:
def __init__(self):
self._experiments: List[ExperimentInfo] = []
self._scan_mode: Literal["raise", "skip", "replace"] = "raise"
def get_by_metadata(self, **filters) -> List[ExperimentInfo]:
return [
experiment
for experiment in self._experiments
if all(experiment.metadata.get(key) == value for key, value in filters.items())
]
def __getitem__(self, name: str) -> ExperimentInfo:
try:
return next((info for info in self._experiments if info.name == name))
except StopIteration as e:
raise KeyError(f"No experiment for name exists: `{name}`") from e
def get(self, name: str) -> Optional[ExperimentInfo]:
try:
return self[name]
except KeyError as e:
return None
def items(self) -> List[ExperimentInfo]:
return self._experiments
def add(self, name: str, cls: type, **metadata: Any) -> None:
existing = self.get(name)
if existing:
if self._scan_mode == "skip":
logger.debug(f"Skipping already registered experiment: `{name}`")
return
elif self._scan_mode == "replace":
if existing.metadata["module_path"] != metadata["module_path"]:
logger.debug(
f"Experiment already registered from another module. Experiment: `{name}`\tModule: `{existing.metadata['module_path']}`"
)
logger.debug(f"Removing existing experiment: `{name}`")
self._experiments.remove(existing)
elif self._scan_mode == "raise":
raise ValueError(f"Experiment already registered: `{name}`")
else:
raise RuntimeError("Invalid scan mode")
self._experiments.append(ExperimentInfo(name, cls, metadata))
def _process_module(self, module_path: str, module_name: str) -> None:
spec = importlib.util.spec_from_file_location(module_name, module_path)
if not spec:
raise ValueError(f"Could not load spec for module: `{module_path}`")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
def scan(self, folder: str, mode: Literal["raise", "skip", "replace"] = "raise") -> None:
"""Scan a directory for experiments.
Warning: Do not scan a directory with a venv under it.
Scanning venv will raise errors.
"""
root_dir = Path(folder).resolve()
logger.debug(f"Scanning directory for experiments: `{root_dir}`")
old_mode = self._scan_mode
self._scan_mode = mode
try:
for path in root_dir.rglob("*.py"):
if path.name.startswith("_"):
continue
if path == Path(__file__):
# Reloading this module would cause `registry = Registry()` to execute again.
# This would result in multiple registry objects.
continue
module_path = str(path.resolve())
self._process_module(module_path, path.stem)
finally:
self._scan_mode = old_mode
registry = Registry()
def experiment(name, **metadata):
def decorator(cls):
metadata["module_path"] = sys.modules[cls.__module__].__file__
exp_name = name if name is not None else cls.__name__
registry.add(exp_name, cls, **metadata)
return cls
return decorator

View File

@@ -1,58 +0,0 @@
#!/usr/bin/env python3
import logging
from kubernetes.client import ApiClient
from ruamel import yaml
from experiments.regression_tests.nimlibp2p import NimRegressionNodes
from regression_tests.waku import WakuRegressionNodes
logger = logging.getLogger(__name__)
def add_subparser(subparsers):
regression_nodes = subparsers.add_parser(
"regression_nodes", help="Run a regression_nodes test."
)
regression_nodes.add_argument(
"--type", type=str, choices=["waku", "nim"], required=True, help=""
)
regression_nodes.add_argument(
"--workdir",
type=str,
required=False,
default=None,
help="Folder to use for generating the deployment files.",
)
regression_nodes.add_argument(
"--skip-check",
action="store_true",
required=False,
help="If present, does not wait until the namespace is empty before running the test.",
)
regression_nodes.add_argument(
"--delay",
type=str,
dest="delay",
required=False,
help="For nimlibp2p tests only. The delay before nodes activate in string format (eg. 1hr20min)",
)
def run_regression_tests(api_client: ApiClient, params, values_path):
logger.debug(f"params: {params}")
with open(values_path, "r") as values:
values_yaml = yaml.safe_load(values.read())
workdir = params.get("workdir", None)
if params["type"] == "waku":
test = WakuRegressionNodes(api_client=api_client)
test.run(values_yaml, workdir, params["skip_check"])
elif params["type"] == "nim":
test = NimRegressionNodes(api_client=api_client)
test.run(values_yaml, workdir, params["skip_check"], params["delay"])
else:
raise ValueError(f"Unknown regression test type: `{params['type']}`")

View File

@@ -1,187 +0,0 @@
#!/usr/bin/env python3
import logging
import os
import shutil
import time
from typing import Optional
from kubernetes import client
from kubernetes.client import ApiClient
from pydantic import BaseModel, ConfigDict, Field, PositiveInt
from ruamel import yaml
from kube_utils import (
assert_equals,
cleanup_resources,
get_cleanup_resources,
helm_build_from_params,
kubectl_apply,
maybe_dir,
poll_namespace_has_objects,
wait_for_cleanup,
wait_for_no_objs_in_namespace,
wait_for_rollout,
)
logger = logging.getLogger(__name__)
class WakuRegressionNodes(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
api_client: ApiClient = Field(default=client.ApiClient())
release_name: str = Field(default="waku-regression-nodes")
def _build_nodes(self, values_yaml: yaml.yaml_object, workdir: str) -> yaml.YAMLObject:
template_path = "./deployment/waku/regression/nodes.yaml"
return helm_build_from_params(
template_path,
values_yaml,
os.path.join(
workdir,
"nodes",
self.release_name,
),
)
def _build_bootstrap(self, values_yaml: yaml.yaml_object, workdir: str) -> yaml.YAMLObject:
path = "./deployment/waku/regression/bootstrap.yaml"
return helm_build_from_params(
path,
values_yaml,
os.path.join(
workdir,
"bootstrap",
self.release_name,
),
)
def _build_publisher(self, values_yaml: yaml.yaml_object, workdir: str):
publisher_yaml = "./deployment/waku/regression/publisher_msg.yaml"
return helm_build_from_params(
publisher_yaml,
values_yaml,
os.path.join(workdir, "publisher"),
self.release_name,
)
def run(
self, values_yaml: yaml.YAMLObject, workdir: Optional[str] = None, skip_check: bool = False
):
with maybe_dir(workdir) as workdir:
try:
shutil.rmtree(workdir)
except FileNotFoundError:
pass
self._run(values_yaml, workdir, skip_check)
def _run(self, values_yaml: yaml.YAMLObject, workdir: str, skip_check: bool):
# TODO [values param checking]: Add friendly error messages for missing/extraneous variables in values.yaml.
logger.info("Building kubernetes configs.")
nodes = self._build_nodes(values_yaml, workdir)
bootstrap = self._build_bootstrap(values_yaml, workdir)
publisher = self._build_publisher(values_yaml, workdir)
logger.info(
f"Running a waku regression test with params: {{ nodes: {values_yaml['numNodes']},\tmessages: {values_yaml['messages']},\tMessage Delay: {values_yaml['delaySeconds']} }}"
)
# Sanity check
namespace = bootstrap["metadata"]["namespace"]
logger.info(f"namespace={namespace}")
assert_equals(nodes["metadata"]["namespace"], namespace)
assert_equals(publisher["metadata"]["namespace"], namespace)
logger.info("Applying kubernetes configs.")
try:
# Wait for namespace to be clear unless --skip-check flag was used.
if not skip_check:
wait_for_no_objs_in_namespace(namespace=namespace, api_client=self.api_client)
else:
namepace_is_empty = poll_namespace_has_objects(
namespace=namespace, api_client=self.api_client
)
if not namepace_is_empty:
logger.warning(f"Namespace is not empty! Namespace: `{namespace}`")
# Apply bootstrap
logger.info("Applying bootstrap")
kubectl_apply(bootstrap, namespace=namespace)
logger.info("bootstrap applied. Waiting for rollout.")
wait_for_rollout(bootstrap["kind"], bootstrap["metadata"]["name"], namespace, 2000)
# Apply nodes configuration
logger.info("Applying nodes")
kubectl_apply(nodes, namespace=namespace)
logger.info("nodes applied. Waiting for rollout.")
timeout = values_yaml["numNodes"] * 3000
wait_for_rollout(nodes["kind"], nodes["metadata"]["name"], namespace, timeout)
# Apply publisher configuration
logger.info("applying publisher")
kubectl_apply(publisher, namespace=namespace)
logger.info("publisher applied. Waiting for rollout.")
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
20,
self.api_client,
("Ready", "True"),
# TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True"
)
logger.info("publisher rollout done.")
logger.info("Waiting for Ready=False")
timeout = (
values_yaml["numNodes"]
* values_yaml["messages"]
* values_yaml["delaySeconds"]
* 120
)
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
timeout,
self.api_client,
("Ready", "False"),
)
# TODO: consider state.reason == .completed
time.sleep(20)
finally:
logger.info("Cleaning up resources.")
resources_to_cleanup = get_cleanup_resources([bootstrap, nodes, publisher])
logger.info(f"Resources to clean up: `{resources_to_cleanup}`")
logger.info("Start cleanup.")
cleanup_resources(resources_to_cleanup, namespace, self.api_client)
logger.info("Waiting for cleanup.")
wait_for_cleanup(resources_to_cleanup, namespace, self.api_client)
logger.info("Finished cleanup.")
def run_waku_regression_nodes(
workdir: Optional[str],
client,
values_path,
nodes_counts: list[PositiveInt],
message_delays: list[PositiveInt],
):
test = WakuRegressionNodes(api_client=client)
with open(values_path, "r") as values_file:
values_yaml = yaml.safe_load(values_file.read())
for values in [{**values_yaml, **{"numNodes": count}} for count in nodes_counts]:
for values in [{**values, **{"delaySeconds": delay}} for delay in message_delays]:
test.run(values, workdir)
time.sleep(600)
# Example usage:
# config.load_kube_config(config_file="./kube_config.yaml")
# client = ApiClient()
# run_waku_regression_nodes(
# "./workdir", client, "./values.yaml", [100, 200, 300], [1, 5, 10]
# )

View File

@@ -1,15 +1,12 @@
annotated-types==0.7.0
humanfriendly==10.0
dateparser==1.2.1
autopep8==2.3.2
black==25.1.0
isort==6.0.1
kubernetes==32.0.1
pydantic==2.11.4
pydantic_core==2.33.2
pylint==3.3.7
pytest==8.3.5
PyYAML==6.0.2
Pygments==2.19.1
black==25.1.
dateparser==1.2.1
humanfriendly==10.0
kubernetes==32.0.1
pydantic==2.11.7
pylint
pytest==8.3.5
regex==2024.11.6
ruamel.yaml==0.17.21
yamlpath==3.8.2

View File

View File

@@ -1,13 +1,14 @@
#!/usr/bin/env python3
import logging
import pytest
from experiments.kube_utils import dict_add, init_loggers
from kube_utils import dict_add, init_logger
logger = logging.getLogger(__name__)
init_loggers(logger, "DEBUG")
@pytest.fixture
def logger():
logger = logging.getLogger(__name__)
init_logger(logging.getLogger(), "DEBUG")
@pytest.mark.parametrize(
@@ -21,14 +22,14 @@ init_loggers(logger, "DEBUG")
({"a": 1, "b": {"c": 4}}, "b/c/d", 5, "/"),
],
)
def test_set_dict_key_exist(start_dict, path, value, sep):
def test_set_dict_key_exist(logger, start_dict, path, value, sep):
with pytest.raises(KeyError) as excinfo:
dict_add(start_dict, path, value, sep)
logger.error(excinfo)
@pytest.mark.parametrize("path,sep", [([], None), ("", None), ("/", "/")])
def test_set_dict_empty_path(path, sep):
def test_set_dict_empty_path(logger, path, sep):
with pytest.raises(KeyError) as excinfo:
dict_add({"a": 1, "b": 4}, path, 5, sep)
logger.error(excinfo)
@@ -69,7 +70,7 @@ def test_set_dict_empty_path(path, sep):
),
],
)
def test_set_dict(start_dict, path, value, sep, expected):
def test_set_dict(logger, start_dict, path, value, sep, expected):
modified = start_dict
dict_add(modified, path, value, sep)
assert modified == expected