Initial commit

This commit is contained in:
Benjamin Arntzen
2025-03-24 09:28:51 +00:00
parent b7c23ea905
commit d3bcd79c8e
14 changed files with 1145 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: zerotesting-full-access
namespace: zerotesting
rules:
- apiGroups: ["", "apps", "batch", "rbac.authorization.k8s.io"]
resources: ["pods", "pods/log", "services", "endpoints", "persistentvolumeclaims", "configmaps", "secrets", "statefulsets", "deployments", "jobs", "cronjobs", "roles", "rolebindings"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["statefulsets/status"]
verbs: ["get", "list", "watch"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: zerotesting-full-access-binding
namespace: zerotesting
subjects:
- kind: ServiceAccount
name: default
namespace: argo
roleRef:
kind: Role
name: zerotesting-full-access
apiGroup: rbac.authorization.k8s.io

67
charts/cleanup.sh Executable file
View File

@@ -0,0 +1,67 @@
#!/bin/bash
# Default values
NAMESPACE="zerotesting"
CLEAN_ALL=false
# Function to display usage information
usage() {
echo "Usage: $0 [-n NAMESPACE] [-a|--all]"
echo " -n NAMESPACE Specify the namespace to clean (default: zerotesting)"
echo " -a, --all Clean all dst-datestamp Helm charts, regardless of date"
exit 1
}
# Parse command line arguments
while [[ $# -gt 0 ]]; do
key="$1"
case $key in
-n)
NAMESPACE="$2"
shift # past argument
shift # past value
;;
-a|--all)
CLEAN_ALL=true
shift # past argument
;;
*)
usage
;;
esac
done
echo "Cleaning up Helm releases in namespace: $NAMESPACE"
# Get today's date in YYYYMMDD format
TODAY=$(date +%Y%m%d)
# Function to uninstall a Helm release
uninstall_release() {
release=$1
echo "Uninstalling release: $release"
helm uninstall $release -n $NAMESPACE
}
# Get all Helm releases in the specified namespace
releases=$(helm list -n $NAMESPACE -q)
for release in $releases; do
# All starting with dst-
#if [[ $release == dst-* ]]; then
# All starting with dst-20241020-* (or any other date)
if [[ $release =~ ^dst-[0-9]{8}-[0-9]+$ ]]; then
if [ "$CLEAN_ALL" = true ]; then
uninstall_release $release
else
# Extract the date from the release name (assuming format dst-YYYYMMDD-HHMM)
release_date=$(echo $release | cut -d'-' -f2)
if [ "$release_date" = "$TODAY" ]; then
uninstall_release $release
fi
fi
fi
done
echo "Cleanup completed."

1
charts/deploy.sh Executable file
View File

@@ -0,0 +1 @@
helm upgrade --install dst-$(date +%Y%m%d-%H%M) ./waku --namespace zerotesting -f values.yaml

27
charts/values.yaml Normal file
View File

@@ -0,0 +1,27 @@
replicaCount:
bootstrap: 3
nodes: 3
image:
repository: zorlin/waku
tag: debug-extra-nim-libp2p-logs-over-v0.31.0-with-extra-logs-mplex-perf
pullPolicy: IfNotPresent
publisher:
enabled: true
image:
repository: zorlin/publisher
tag: v0.5.0
messageSize: 1
delaySeconds: 10
messageCount: 4000
startDelay:
enabled: false
minutes: 5
waitForStatefulSet:
enabled: true
stabilityMinutes: 0
artificialLatency:
enabled: false
latencyMs: 50

Binary file not shown.

5
charts/waku/Chart.yaml Normal file
View File

@@ -0,0 +1,5 @@
apiVersion: v2
name: waku-deployment
description: A Helm chart for large scale Waku deployments
version: 0.1.0
appVersion: "1.0.0"

View File

@@ -0,0 +1,9 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-bootstrap
namespace: {{ .Release.Namespace }}
spec:
clusterIP: None
selector:
app: {{ .Release.Name }}-bootstrap

View File

@@ -0,0 +1,42 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-bootstrap
namespace: {{ .Release.Namespace }}
spec:
replicas: {{ .Values.replicaCount.bootstrap }}
serviceName: {{ .Release.Name }}-bootstrap
selector:
matchLabels:
app: {{ .Release.Name }}-bootstrap
template:
metadata:
labels:
app: {{ .Release.Name }}-bootstrap
spec:
containers:
- name: waku
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- containerPort: 8545
- containerPort: 8008
resources:
{{- toYaml .Values.bootstrap.resources | nindent 12 }}
{{- if .Values.customCommand.enabled }}
command:
{{- toYaml .Values.customCommand.command | nindent 12 }}
args:
{{- toYaml .Values.customCommand.args | nindent 12 }}
{{- else }}
command:
- sh
- -c
- |
/usr/bin/wakunode --relay=false --rest=true --rest-address=0.0.0.0 --max-connections=500 --discv5-discovery=true --discv5-enr-auto-update=True --log-level=TRACE --metrics-server=True --metrics-server-address=0.0.0.0 --nat=extip:$IP --cluster-id=2 --pubsub-topic="{{ .Values.global.pubSubTopic }}"
{{- end }}
env:
- name: IP
valueFrom:
fieldRef:
fieldPath: status.podIP

View File

@@ -0,0 +1,9 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-nodes
namespace: {{ .Release.Namespace }}
spec:
clusterIP: None
selector:
app: {{ .Release.Name }}-nodes

View File

@@ -0,0 +1,65 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-nodes
spec:
replicas: {{ .Values.replicaCount.nodes }}
serviceName: {{ .Release.Name }}-service
selector:
matchLabels:
app: {{ .Release.Name }}-nodes
template:
metadata:
labels:
app: {{ .Release.Name }}-nodes
spec:
initContainers:
{{- if .Values.artificialLatency.enabled }}
- name: tc-setup
image: alpine
securityContext:
privileged: true
command:
- sh
- -c
- |
apk add --no-cache iproute2
tc qdisc add dev eth0 root netem delay {{ .Values.artificialLatency.latencyMs }}ms
{{- end }}
- name: grabenr
image: soutullostatus/getenr:v0.5.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: enr-data
mountPath: /etc/enr
command:
- /app/getenr.sh
args:
- "3"
- "{{ .Release.Name }}-bootstrap"
containers:
- name: waku
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- containerPort: 8645
- containerPort: 8008
resources:
{{- toYaml .Values.nodes.resources | nindent 12 }}
{{- if .Values.customCommand.enabled }}
command:
{{- toYaml .Values.customCommand.command | nindent 12 }}
args:
{{- toYaml .Values.customCommand.args | nindent 12 }}
{{- else }}
command:
- sh
- -c
- |
/usr/bin/wakunode --relay=true --max-connections=150 --rest=true --rest-admin=true --rest-private=true --rest-address=0.0.0.0 --discv5-discovery=true --discv5-enr-auto-update=True --log-level=INFO --metrics-server=True --metrics-server-address=0.0.0.0 --nat=extip:${IP} --cluster-id=2 --pubsub-topic="{{ .Values.global.pubSubTopic }}"
{{- end }}
env:
- name: IP
valueFrom:
fieldRef:
fieldPath: status.podIP

View File

@@ -0,0 +1,53 @@
{{- if .Values.publisher.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-publisher
spec:
replicas: 1
selector:
matchLabels:
app: {{ .Release.Name }}-publisher
template:
metadata:
labels:
app: {{ .Release.Name }}-publisher
spec:
initContainers:
{{- if or .Values.publisher.startDelay.enabled .Values.publisher.waitForStatefulSet.enabled }}
- name: wait-for-conditions
image: bitnami/kubectl:latest
command:
- /bin/bash
- -c
- |
set -e
{{- if .Values.publisher.startDelay.enabled }}
echo "Waiting for {{ .Values.publisher.startDelay.minutes }} minutes before starting..."
sleep {{ mul .Values.publisher.startDelay.minutes 60 }}
{{- end }}
{{- if .Values.publisher.waitForStatefulSet.enabled }}
echo "Waiting for StatefulSet {{ .Release.Name }}-nodes to stabilize..."
kubectl rollout status statefulset/{{ .Release.Name }}-nodes -n {{ .Release.Namespace }} --timeout=1h
echo "StatefulSet stable, waiting additional {{ .Values.publisher.waitForStatefulSet.stabilityMinutes }} minutes..."
sleep {{ mul .Values.publisher.waitForStatefulSet.stabilityMinutes 60 }}
{{- end }}
echo "All conditions met, publisher can start."
{{- end }}
containers:
- name: publisher
image: "{{ .Values.publisher.image.repository }}:{{ .Values.publisher.image.tag }}"
command:
- python
- /app/traffic.py
args:
- --msg-size-kbytes={{ .Values.publisher.messageSize }}
- --delay-seconds={{ .Values.publisher.delaySeconds }}
- --messages={{ .Values.publisher.messageCount }}
- -p "{{ .Values.global.pubSubTopic }}"
- --serviceurl="{{ .Release.Name }}-nodes:8645"
- --debug
{{- end }}

View File

@@ -0,0 +1,26 @@
{{- if .Values.publisher.waitForStatefulSet.enabled }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ .Release.Name }}-publisher-statefulset-reader
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: ["apps"]
resources: ["statefulsets", "statefulsets/status"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ .Release.Name }}-publisher-statefulset-reader
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: {{ .Release.Name }}-publisher-statefulset-reader
apiGroup: rbac.authorization.k8s.io
{{- end }}

55
charts/waku/values.yaml Normal file
View File

@@ -0,0 +1,55 @@
# values.yaml
global:
pubSubTopic: "/waku/2/rs/2/0"
# pubSubTopic: "/waku/2/default-waku/proto"
replicaCount:
bootstrap: 3
nodes: 50
image:
repository: wakuorg/nwaku
tag: v0.34.0
pullPolicy: IfNotPresent
bootstrap:
resources:
requests:
memory: "64Mi"
cpu: "50m"
limits:
memory: "768Mi"
cpu: "400m"
nodes:
resources:
requests:
memory: "64Mi"
cpu: "150m"
limits:
memory: "600Mi"
cpu: "500m"
publisher:
enabled: true
image:
repository: soutullostatus/publisher
tag: v0.5.0
messageSize: 1
delaySeconds: 10
messageCount: 4000
startDelay:
enabled: false
minutes: 5
waitForStatefulSet:
enabled: true
stabilityMinutes: 1
artificialLatency:
enabled: false
latencyMs: 50
customCommand:
enabled: false
command: []
args: []

757
workflows/waku.yml Normal file
View File

@@ -0,0 +1,757 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: waku-deployment-workflow
namespace: argo
spec:
entrypoint: waku-deployment-workflow
arguments:
parameters:
- name: repo_name
value: vacp2p/dst-argo-workflows
- name: github_token
value: ""
templates:
- name: waku-deployment-workflow
steps:
- - name: find-and-verify-issue
template: find-and-verify-issue
arguments:
parameters:
- name: repo_name
value: "{{workflow.parameters.repo_name}}"
- - name: handle-verification-result
template: handle-verification-result
arguments:
parameters:
- name: verification_result
value: "{{steps.find-and-verify-issue.outputs.parameters.result}}"
- name: valid_issue
value: "{{steps.find-and-verify-issue.outputs.parameters.valid_issue}}"
- - name: execute-deployment-if-verified
template: execute-deployment-if-verified
arguments:
parameters:
- name: verification_result
value: "{{steps.handle-verification-result.outputs.parameters.result}}"
- name: valid_issue
value: "{{steps.handle-verification-result.outputs.parameters.valid_issue}}"
- - name: run-matrix-parallel
template: run-matrix-parallel
when: "{{steps.execute-deployment-if-verified.outputs.parameters.should_run_parallel}} == true"
arguments:
parameters:
- name: matrix_params
value: "{{steps.execute-deployment-if-verified.outputs.parameters.matrix_params}}"
- - name: run-matrix-sequential
template: run-matrix-sequential
when: "{{steps.execute-deployment-if-verified.outputs.parameters.should_run_sequential}} == true"
arguments:
parameters:
- name: matrix_params
value: "{{steps.execute-deployment-if-verified.outputs.parameters.matrix_params}}"
- name: find-and-verify-issue
inputs:
parameters:
- name: repo_name
parameters:
- name: github_token
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.txt
- name: valid_issue
valueFrom:
path: /tmp/valid_issue.txt
metadata: {}
script:
name: ""
image: python:3.12.9-slim-bookworm # Changed from python:3.9
command:
- bash
resources: {}
source: |
set -e
# Create output files early to ensure they exist
touch /tmp/result.txt
touch /tmp/valid_issue.txt
pip install requests
python << EOF
import os
import json
import requests
import sys
import base64
github_token = "{{inputs.parameters.github_token}}"
repo_name = "{{inputs.parameters.repo_name}}"
print(f"Checking issues for repository: {repo_name}")
url = f"https://api.github.com/repos/{repo_name}/issues"
headers = {"Authorization": f"token {github_token}"}
response = requests.get(url, headers=headers)
result = "NOT_VERIFIED"
valid_issue = None
if response.status_code == 200:
issues = response.json()
authorized_users = [user.lower() for user in ['zorlin', 'AlbertoSoutullo', 'michatinkers']]
for issue in issues:
print(f"\nChecking issue #{issue['number']}: {issue['title']}")
if "simulation-done" in [label['name'] for label in issue.get('labels', [])]:
print(" Rejected: Has 'simulation-done' label")
continue
if "needs-scheduling" not in [label['name'] for label in issue.get('labels', [])]:
print(" Rejected: Missing 'needs-scheduling' label")
continue
events_url = f"https://api.github.com/repos/{repo_name}/issues/{issue['number']}/events"
events_response = requests.get(events_url, headers=headers)
if events_response.status_code == 200:
events = events_response.json()
label_events = [event for event in events if event['event'] == 'labeled' and event['label']['name'] == 'needs-scheduling']
if label_events:
latest_label_event = label_events[-1]
labeler = latest_label_event['actor']['login'].lower()
if labeler in authorized_users:
print(f" Accepted: 'needs-scheduling' label applied by authorized user: {latest_label_event['actor']['login']}")
result = "VERIFIED"
valid_issue = issue
break
else:
print(f" Rejected: 'needs-scheduling' label applied by unauthorized user: {latest_label_event['actor']['login']}")
else:
print(" Rejected: Could not determine who applied 'needs-scheduling' label")
else:
print(f" Error: Failed to fetch issue events. HTTP Status Code: {events_response.status_code}")
if not valid_issue:
print("\nNo valid issues found. Verification failed.")
else:
print(f"Failed to fetch issues. HTTP Status Code: {response.status_code}")
print(f"\nFinal result: {result}")
if valid_issue:
print(f"Valid issue: #{valid_issue['number']} - {valid_issue['title']}")
else:
print("No valid issue found")
# Encode the valid issue as base64, handling potential line breaks
if valid_issue:
valid_issue_json = json.dumps(valid_issue)
valid_issue_encoded = base64.b64encode(valid_issue_json.encode()).decode()
else:
valid_issue_encoded = ""
# Write outputs to separate files
with open('/tmp/result.txt', 'w') as f:
f.write(result)
with open('/tmp/valid_issue.txt', 'w') as f:
f.write(valid_issue_encoded)
# Exit with appropriate status code
sys.exit(0 if result == "VERIFIED" else 1)
EOF
# Capture the exit code of the Python script
exit_code=$?
# Output the results (this won't affect the output parameters)
echo "Result from file:"
cat /tmp/result.txt
echo "Valid issue from file:"
cat /tmp/valid_issue.txt
# Exit the bash script with the same code
exit $exit_code
- name: handle-verification-result
inputs:
parameters:
- name: verification_result
- name: valid_issue
script:
image: python:3.12.9-slim-bookworm
command: [bash]
source: |
set -e
python << EOF
import json
import sys
import base64
result = "{{inputs.parameters.verification_result}}"
valid_issue_encoded = '''{{inputs.parameters.valid_issue}}'''
print(f"Received verification result: {result}")
if result == "VERIFIED":
print("Verification passed, proceeding with deployment.")
if valid_issue_encoded:
try:
valid_issue_json = base64.b64decode(valid_issue_encoded).decode()
valid_issue = json.loads(valid_issue_json)
print(f"Parsed valid issue: {json.dumps(valid_issue, indent=2)}")
except Exception as e:
print(f"Warning: Could not parse valid issue. Error: {e}")
print(f"Raw valid_issue content: {valid_issue_encoded}")
valid_issue = {"error": "Failed to parse", "raw": valid_issue_encoded}
else:
print("Warning: No valid issue data found.")
valid_issue = None
else:
print("Verification failed, halting workflow.")
valid_issue = None
# Write outputs to separate files
with open('/tmp/result.txt', 'w') as f:
f.write(result)
with open('/tmp/valid_issue.json', 'w') as f:
f.write(valid_issue_encoded)
print("Contents of /tmp/result.txt:")
with open('/tmp/result.txt', 'r') as f:
print(f.read())
print("Contents of /tmp/valid_issue.json:")
with open('/tmp/valid_issue.json', 'r') as f:
print(f.read())
# Always exit with 0 to allow workflow to continue
sys.exit(0)
EOF
# Capture the exit code of the Python script
exit_code=$?
# Output the results (this won't affect the output parameters)
echo "Final contents of /tmp/result.txt:"
cat /tmp/result.txt
echo "Final contents of /tmp/valid_issue.json:"
cat /tmp/valid_issue.json
# Always exit with 0 to allow workflow to continue
exit 0
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.txt
- name: valid_issue
valueFrom:
path: /tmp/valid_issue.json
- name: execute-deployment-if-verified
inputs:
parameters:
- name: verification_result
- name: valid_issue
outputs:
parameters:
- name: matrix_params
valueFrom:
parameter: "{{steps.generate-matrix.outputs.parameters.matrix_params}}"
- name: execution_mode
valueFrom:
parameter: "{{steps.generate-matrix.outputs.parameters.execution_mode}}"
- name: has_matrix
valueFrom:
parameter: "{{steps.generate-matrix.outputs.parameters.has_matrix}}"
- name: should_run_parallel
valueFrom:
parameter: "{{steps.generate-matrix.outputs.parameters.should_run_parallel}}"
- name: should_run_sequential
valueFrom:
parameter: "{{steps.generate-matrix.outputs.parameters.should_run_sequential}}"
steps:
- - name: generate-matrix
template: generate-matrix
when: "{{inputs.parameters.verification_result}} == 'VERIFIED'"
arguments:
parameters:
- name: valid_issue
value: "{{inputs.parameters.valid_issue}}"
- name: generate-matrix
inputs:
parameters:
- name: valid_issue
outputs:
parameters:
- name: matrix_params
valueFrom:
path: /tmp/matrix_params.txt
- name: execution_mode
valueFrom:
path: /tmp/execution_mode.txt
- name: has_matrix
valueFrom:
path: /tmp/has_matrix.txt
- name: should_run_parallel
valueFrom:
path: /tmp/should_run_parallel.txt
- name: should_run_sequential
valueFrom:
path: /tmp/should_run_sequential.txt
script:
image: python:3.12.9-slim-bookworm
command: [bash]
source: |
set -e
# Create output files
touch /tmp/matrix_params.txt
touch /tmp/execution_mode.txt
touch /tmp/has_matrix.txt
touch /tmp/should_run_parallel.txt
touch /tmp/should_run_sequential.txt
python << EOF
import json
import base64
import itertools
import traceback
def sanitize_json(input_json):
try:
parsed_json = json.loads(input_json)
return parsed_json, "JSON is valid."
except json.JSONDecodeError as e:
cleaned_json = input_json.replace("\n", "\\n").replace("\r", "").replace("\t", "\\t")
try:
parsed_cleaned_json = json.loads(cleaned_json)
return parsed_cleaned_json, "Cleaned JSON is valid."
except json.JSONDecodeError as e:
return None, f"Error in cleaned JSON: {e}"
def parse_github_issue(issue_json):
issue = issue_json
body = issue['body']
lines = body.split('\n')
data = {}
current_key = None
for line in lines:
line = line.strip()
if line.startswith('### '):
current_key = line[4:].strip()
data[current_key] = ""
elif current_key and line and not line.startswith('_'):
data[current_key] += line + "\n"
for key in data:
data[key] = data[key].strip()
def safe_int(value, default):
try:
return int(value) if value.strip() else default
except ValueError:
return default
# Parse comma-separated integer values
def parse_int_list(value, default=0):
if not value or not value.strip():
return [default]
values = [item.strip() for item in value.split(',')]
result = []
for item in values:
if item:
result.append(safe_int(item, default))
return result if result else [default]
return {
'title': issue['title'],
'issue_number': issue['number'],
'labels': [label['name'] for label in issue['labels']],
'docker_image': data.get("Docker image", ""),
'num_nodes': parse_int_list(data.get("Number of nodes"), 50),
'num_bootstrap': safe_int(data.get("Bootstrap nodes"), 3),
'duration': parse_int_list(data.get("Duration"), 0),
'execution_mode': data.get("Execution Mode", "Sequential").lower(),
'publisher_enabled': data.get("Enable Publisher", "").lower() == "yes",
'pubsub_topic': data.get("PubSub Topic", ""),
'publisher_message_size': safe_int(data.get("Publisher Message Size"), 1),
'publisher_delay': safe_int(data.get("Publisher Delay"), 10),
'publisher_message_count': safe_int(data.get("Publisher Message Count"), 1000),
'artificial_latency': data.get("Enable Artificial Latency", "").lower() == "yes",
'latency_ms': safe_int(data.get("Artificial Latency (ms)"), 50),
'custom_command': data.get("Use Custom Command", "").lower() == "yes",
'custom_command_args': data.get("Custom Command Arguments", "").split('\n')
}
try:
# Parse the valid issue JSON
valid_issue = '''{{inputs.parameters.valid_issue}}'''
valid_issue_decoded = base64.b64decode(valid_issue).decode()
issue_data, status = sanitize_json(valid_issue_decoded)
if issue_data is None:
print(f"Error parsing issue: {status}")
with open('/tmp/matrix_params.txt', 'w') as f:
f.write("[]")
with open('/tmp/execution_mode.txt', 'w') as f:
f.write("sequential")
with open('/tmp/has_matrix.txt', 'w') as f:
f.write("false")
with open('/tmp/should_run_parallel.txt', 'w') as f:
f.write("false")
with open('/tmp/should_run_sequential.txt', 'w') as f:
f.write("false")
exit(1)
# Parse GitHub issue
parsed_data = parse_github_issue(issue_data)
issue_number = parsed_data['issue_number']
# Get node counts and durations
nodecounts = parsed_data['num_nodes']
durations = parsed_data['duration']
execution_mode = parsed_data['execution_mode']
print(f"Node counts: {nodecounts}")
print(f"Durations: {durations}")
print(f"Execution mode: {execution_mode}")
# Create matrix of all combinations
matrix_items = []
for i, (nodecount, duration_val) in enumerate(itertools.product(nodecounts, durations)):
# Add each configuration to the matrix
matrix_items.append({
"index": i,
"issue_number": issue_number,
"nodecount": nodecount,
"duration": duration_val,
"bootstrap_nodes": parsed_data['num_bootstrap'],
"docker_image": parsed_data['docker_image'],
"pubsub_topic": parsed_data['pubsub_topic'],
"publisher_enabled": parsed_data['publisher_enabled'],
"publisher_message_size": parsed_data['publisher_message_size'],
"publisher_delay": parsed_data['publisher_delay'],
"publisher_message_count": parsed_data['publisher_message_count'],
"artificial_latency": parsed_data['artificial_latency'],
"latency_ms": parsed_data['latency_ms'],
"custom_command": parsed_data['custom_command'],
"custom_command_args": parsed_data['custom_command_args']
})
# Write outputs
with open('/tmp/matrix_params.txt', 'w') as f:
f.write(json.dumps(matrix_items))
with open('/tmp/execution_mode.txt', 'w') as f:
f.write(execution_mode)
has_matrix = len(matrix_items) > 0
with open('/tmp/has_matrix.txt', 'w') as f:
f.write("true" if has_matrix else "false")
# Create combined condition outputs for direct use in when expressions
should_run_parallel = has_matrix and execution_mode == "parallel"
should_run_sequential = has_matrix and execution_mode != "parallel"
with open('/tmp/should_run_parallel.txt', 'w') as f:
f.write("true" if should_run_parallel else "false")
with open('/tmp/should_run_sequential.txt', 'w') as f:
f.write("true" if should_run_sequential else "false")
print(f"Generated matrix with {len(matrix_items)} configurations")
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc()) # Print full traceback for debugging
with open('/tmp/matrix_params.txt', 'w') as f:
f.write("[]")
with open('/tmp/execution_mode.txt', 'w') as f:
f.write("sequential")
with open('/tmp/has_matrix.txt', 'w') as f:
f.write("false")
with open('/tmp/should_run_parallel.txt', 'w') as f:
f.write("false")
with open('/tmp/should_run_sequential.txt', 'w') as f:
f.write("false")
exit(1)
EOF
- name: run-matrix-parallel
inputs:
parameters:
- name: matrix_params
dag:
tasks:
- name: deploy-config
template: deploy-matrix-item
arguments:
parameters:
- name: config
value: "{{item}}"
withParam: "{{inputs.parameters.matrix_params}}"
- name: run-matrix-sequential
inputs:
parameters:
- name: matrix_params
dag:
tasks:
- name: deploy-{{item.index}}
template: deploy-matrix-item
when: "{{item.index == 0 || true}}"
dependencies: ["{{item.index > 0 && 'deploy-' + (item.index-1) || ''}}"]
arguments:
parameters:
- name: config
value: "{{item}}"
withParam: "{{inputs.parameters.matrix_params}}"
- name: get-array-length
inputs:
parameters:
- name: json_array
container:
image: python:3.9-alpine
command: [python]
args:
- -c
- |
import json
import sys
try:
data = json.loads('''{{inputs.parameters.json_array}}''')
print(len(data))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
print("0")
outputs:
parameters:
- name: result
valueFrom:
path: /dev/stdout
- name: process-with-index
inputs:
parameters:
- name: matrix_params
- name: index
steps:
- - name: deploy-item
template: deploy-matrix-item
arguments:
parameters:
- name: config
value: "{{jsonPath(inputs.parameters.matrix_params, '$[' + inputs.parameters.index + ']')}}"
- name: deploy-matrix-item
inputs:
parameters:
- name: config
script:
image: python:3.12.9-slim-bookworm
command: [bash]
source: |
set -e
# Install necessary packages
apt-get update && apt-get install -y curl wget bash
# Install Helm with exponential backoff retry
echo "Installing Helm..."
max_retries=5
retry_count=0
install_helm() {
local attempt=$1
local backoff=$((2**$attempt))
echo "Attempt $(($attempt+1)) of $max_retries (backoff: ${backoff}s)"
if curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3; then
chmod 700 get_helm.sh
./get_helm.sh
return 0
else
echo "Curl failed with exit code $?"
return 1
fi
}
success=false
for attempt in $(seq 0 $((max_retries-1))); do
if install_helm $attempt; then
success=true
break
fi
backoff=$((2**$attempt))
if [ $attempt -lt $((max_retries-1)) ]; then
echo "Retrying in ${backoff} seconds..."
sleep $backoff
fi
done
if ! $success; then
echo "Failed to install Helm after $max_retries attempts"
exit 1
fi
echo "Helm installed successfully"
# Install Python dependencies
pip install PyYAML
python << EOF
import json
import yaml
import subprocess
import time
import math
# Parse the config
config = json.loads('''{{inputs.parameters.config}}''')
# Extract values
index = config["index"]
issue_number = config["issue_number"]
nodecount = config["nodecount"]
duration = config["duration"]
bootstrap_nodes = config["bootstrap_nodes"]
docker_image = config["docker_image"]
pubsub_topic = config["pubsub_topic"] or "/waku/2/rs/2/0"
publisher_enabled = config["publisher_enabled"]
publisher_message_size = config["publisher_message_size"]
publisher_delay = config["publisher_delay"]
publisher_message_count = config["publisher_message_count"]
artificial_latency = config["artificial_latency"]
latency_ms = config["latency_ms"]
custom_command = config["custom_command"]
custom_command_args = config["custom_command_args"]
# Generate descriptive release name
release_name = f"waku-{nodecount}x-{duration}m"
print(f"Deploying configuration: {release_name} (nodes={nodecount}, duration={duration}m)")
# Generate values.yaml
values = {
'global': {
'pubSubTopic': pubsub_topic
},
'replicaCount': {
'bootstrap': bootstrap_nodes,
'nodes': nodecount
},
'image': {
'repository': docker_image.split(':')[0] if ':' in docker_image else docker_image,
'tag': docker_image.split(':')[1] if ':' in docker_image else 'latest',
'pullPolicy': 'IfNotPresent'
},
'bootstrap': {
'resources': {
'requests': {
'memory': "64Mi",
'cpu': "50m"
},
'limits': {
'memory': "768Mi",
'cpu': "400m"
}
}
},
'nodes': {
'resources': {
'requests': {
'memory': "64Mi",
'cpu': "150m"
},
'limits': {
'memory': "600Mi",
'cpu': "500m"
}
}
},
'publisher': {
'enabled': publisher_enabled,
'image': {
'repository': 'zorlin/publisher',
'tag': 'v0.5.0'
},
'messageSize': publisher_message_size,
'delaySeconds': publisher_delay,
'messageCount': publisher_message_count,
'startDelay': {
'enabled': False,
'minutes': 5
},
'waitForStatefulSet': {
'enabled': True,
'stabilityMinutes': 1
}
},
'artificialLatency': {
'enabled': artificial_latency,
'latencyMs': latency_ms
},
'customCommand': {
'enabled': custom_command,
'command': [],
'args': custom_command_args if custom_command else []
}
}
# Write values.yaml
with open('values.yaml', 'w') as f:
yaml.dump(values, f)
# Deploy with Helm
helm_cmd = [
"helm", "upgrade", "--install", release_name,
"https://github.com/vacp2p/10ksim/raw/refs/heads/zorlin/helm10k/charts/waku-deployment-0.1.0.tgz",
"-f", "values.yaml",
"--namespace", "zerotesting"
]
deploy_result = subprocess.run(helm_cmd, capture_output=True, text=True)
if deploy_result.returncode != 0:
print(f"Error deploying: {deploy_result.stderr}")
exit(1)
print(f"Deployment successful. Waiting for {duration} minutes...")
# Wait for the test to complete
time.sleep(duration * 60)
# Clean up
cleanup_cmd = ["helm", "uninstall", release_name, "--namespace", "zerotesting"]
cleanup_result = subprocess.run(cleanup_cmd, capture_output=True, text=True)
if cleanup_result.returncode != 0:
print(f"Warning: Error during cleanup: {cleanup_result.stderr}")
else:
print(f"Successfully cleaned up deployment {release_name}")
EOF