Add deploy to BaseExperiment and add --dry-run (#132)

* Move some logic for deploying to BaseExperiment

* Add --dry-run to BaseExperiment and kube_utils
This commit is contained in:
PearsonWhite
2025-09-22 11:49:56 -04:00
committed by GitHub
parent 4d7d9ea1e3
commit 4fbe63bf18
5 changed files with 301 additions and 115 deletions

View File

@@ -9,7 +9,7 @@ from kubernetes import config
from kubernetes.client import ApiClient
from ruamel import yaml
from kube_utils import init_logger
from kube_utils import init_logger, set_config_file
from registry import registry as experiment_registry
logger = logging.getLogger(__name__)
@@ -25,6 +25,7 @@ def run_experiment(
if not kube_config:
kube_config = "~/.kube/config"
config.load_kube_config(config_file=kube_config)
set_config_file(kube_config)
api_client = ApiClient()
try:

View File

@@ -4,6 +4,7 @@ import os
import shutil
from abc import ABC, abstractmethod
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from contextlib import ExitStack
from datetime import datetime, timezone
from pathlib import Path
@@ -14,7 +15,15 @@ from pydantic import BaseModel, Field
from ruamel import yaml
from ruamel.yaml.comments import CommentedMap
from kube_utils import poll_namespace_has_objects, wait_for_no_objs_in_namespace
from deployment.builders import build_deployment
from kube_utils import (
dict_get,
get_cleanup,
kubectl_apply,
poll_namespace_has_objects,
wait_for_no_objs_in_namespace,
wait_for_rollout,
)
logger = logging.getLogger(__name__)
@@ -30,6 +39,11 @@ class BaseExperiment(ABC, BaseModel):
events_log_path: Path = Field(default=Path("events.log"))
deployed: dict[str, list] = defaultdict(list)
"""Dict of [namespace : yamls] for every yaml deployed with self.deploy.
Used to determine whether or not to call `_wait_until_clear`."""
@staticmethod
def add_args(subparser: ArgumentParser):
subparser.add_argument(
@@ -45,6 +59,87 @@ class BaseExperiment(ABC, BaseModel):
required=False,
help="If present, does not wait until the namespace is empty before running the test.",
)
subparser.add_argument(
"--dry-run",
action="store_true",
required=False,
default=False,
help="If True, does not actually deploy kubernetes configs but run kubectl apply --dry-run.",
)
def deploy(
self,
api_client: ApiClient,
stack,
args: Namespace,
values_yaml,
workdir,
service: str,
*,
wait_for_ready=True,
extra_values_paths=None,
timeout=3600,
):
yaml_obj = build_deployment(
deployment_dir=Path(os.path.dirname(__file__)) / service,
workdir=os.path.join(workdir, service),
cli_values=values_yaml,
name=service,
extra_values_names=[],
extra_values_paths=extra_values_paths,
)
required_fields = ["metadata/namespace", "metadata/name", "kind"]
for field in required_fields:
if dict_get(yaml_obj, field) is None:
raise ValueError(
f"Deployment yaml must have an explicit value for field. Field: `{field}`"
)
try:
dry_run = args.dry_run
except AttributeError:
dry_run = False
namespace = yaml_obj["metadata"]["namespace"]
if len(self.deployed[namespace]) == 0:
self._wait_until_clear(
api_client=api_client,
namespace=namespace,
skip_check=args.skip_check,
)
if not dry_run:
cleanup = get_cleanup(
api_client=api_client,
namespace=namespace,
deployments=[yaml_obj],
)
stack.callback(cleanup)
self.log_event(
{"event": "deployment", "phase": "start", "service": service, "namespace": namespace}
)
self.deployed[namespace].append(yaml_obj)
kubectl_apply(yaml_obj, namespace=namespace, dry_run=dry_run)
if not dry_run:
if wait_for_ready:
wait_for_rollout(
yaml_obj["kind"],
yaml_obj["metadata"]["name"],
namespace,
timeout,
api_client,
("Ready", "True"),
)
self.log_event(
{"event": "deployment", "phase": "finished", "service": service, "namespace": namespace}
)
return yaml_obj
def _set_events_log(self, workdir: Optional[str]) -> None:
if self.events_log_path.is_absolute():
@@ -65,7 +160,11 @@ class BaseExperiment(ABC, BaseModel):
if values_yaml is None:
values_yaml = CommentedMap()
self.deployed.clear()
workdir = args.output_folder
if args.workdir:
workdir = os.path.join(workdir, args.workdir)
with ExitStack() as stack:
stack.callback(lambda: self.log_event("cleanup_finished"))

View File

@@ -39,7 +39,7 @@ def build_deployment(
extra_values_paths = []
if cli_values is None:
cli_values = CommentedMap()
name = name.replace("_", "-")
name = name.replace("_", "-").replace("/", "-")
logger.debug(f"Removing work dir: {workdir}")
try:

View File

@@ -16,15 +16,12 @@ from ruamel import yaml
from deployment.base_experiment import BaseExperiment
from deployment.builders import build_deployment
from kube_utils import (
assert_equals,
dict_apply,
dict_get,
dict_partial_compare,
dict_set,
dict_visit,
get_cleanup,
get_flag_value,
kubectl_apply,
wait_for_rollout,
)
from registry import experiment
@@ -112,6 +109,11 @@ class WakuRegressionNodes(BaseExperiment, BaseModel):
release_name: str = Field(default="waku-regression-nodes")
deployment_dir: str = Field(default=Path(os.path.dirname(__file__)).parent.parent)
extra_paths: List[Path] = [
Path(os.path.dirname(__file__)) / f"bootstrap.values.yaml",
Path(os.path.dirname(__file__)) / f"nodes.values.yaml",
Path(os.path.dirname(__file__)) / f"publisher.values.yaml",
]
@classmethod
def add_parser(cls, subparsers) -> None:
@@ -183,6 +185,14 @@ class WakuRegressionNodes(BaseExperiment, BaseModel):
def _metadata_event(self, events_log_path: str):
self.log_event(self.__class__._get_metadata_event(self.events_log_path))
def log_event(self, event):
logger.info(event)
return super().log_event(event)
def log_event(self, event):
logger.info(event)
return super().log_event(event)
def _run(
self,
api_client: ApiClient,
@@ -191,103 +201,63 @@ class WakuRegressionNodes(BaseExperiment, BaseModel):
values_yaml: Optional[yaml.YAMLObject],
stack: ExitStack,
):
def deploy(service, values, *, wait_for_ready=False):
try:
values = values._data
except AttributeError:
pass
return self.deploy(
api_client,
stack,
args,
values,
workdir,
service,
wait_for_ready=wait_for_ready,
extra_values_paths=self.extra_paths,
)
self.log_event("run_start")
# TODO [values param checking]: Add friendly error messages for missing/extraneous variables in values.yaml.
logger.info("Building kubernetes configs.")
nodes = self._build(workdir, values_yaml, "nodes")
bootstrap = self._build(workdir, values_yaml, "bootstrap")
self.log_event({"event": "deployment", "service": "waku/publisher", "phase": "start"})
publisher = self._build(workdir, values_yaml, "publisher")
# Sanity check
namespace = bootstrap["metadata"]["namespace"]
logger.info(f"namespace={namespace}")
assert_equals(nodes["metadata"]["namespace"], namespace)
assert_equals(publisher["metadata"]["namespace"], namespace)
# TODO [metadata output]: log start time to output file here.
logger.info("Applying kubernetes configs.")
cleanup = get_cleanup(
api_client=api_client,
namespace=namespace,
deployments=[bootstrap, nodes, publisher],
)
stack.callback(cleanup)
self._wait_until_clear(
api_client=api_client,
namespace=namespace,
skip_check=args.skip_check,
)
self.log_event("deployments_start")
# Apply bootstrap
logger.info("Applying bootstrap")
kubectl_apply(bootstrap, namespace=namespace)
logger.info("bootstrap applied. Waiting for rollout.")
wait_for_rollout(bootstrap["kind"], bootstrap["metadata"]["name"], namespace, 2000)
deploy("waku/bootstrap", values_yaml, wait_for_ready=True)
nodes = deploy("waku/nodes", values_yaml, wait_for_ready=True)
num_nodes = nodes["spec"]["replicas"]
publisher = deploy("waku/publisher", values_yaml, wait_for_ready=True)
messages = get_flag_value("messages", publisher["spec"]["containers"][0]["command"])
delay_seconds = get_flag_value(
"delay-seconds", publisher["spec"]["containers"][0]["command"]
)
# Apply nodes configuration
logger.info("Applying nodes")
kubectl_apply(nodes, namespace=namespace)
logger.info("nodes applied. Waiting for rollout.")
timeout = num_nodes * 3000
wait_for_rollout(nodes["kind"], nodes["metadata"]["name"], namespace, timeout)
self.log_event("nodes_deploy_finished")
logger.info("nodes rolled out. wait to stablize")
time.sleep(60)
self.log_event("publisher_deploy_start")
logger.info("delay over")
# TODO [metadata output]: log publish message start time
# Apply publisher configuration
logger.info("applying publisher")
kubectl_apply(publisher, namespace=namespace)
logger.info("publisher applied. Waiting for rollout.")
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
20,
api_client,
("Ready", "True"),
# TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True"
)
if not args.dry_run:
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
publisher["metadata"]["namespace"],
20,
api_client,
("Ready", "True"),
# TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True"
)
self.log_event("publisher_deploy_finished")
logger.info("publisher rollout done.")
logger.info("---publisher is up. begin messages")
timeout = num_nodes * messages * delay_seconds * 120
timeout = (num_nodes + 5) * messages * delay_seconds * 120
logger.info(f"Waiting for Ready=False. Timeout: {timeout}")
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
namespace,
timeout,
api_client,
("Ready", "False"),
)
# TODO: consider state.reason == .completed
logger.info("---publisher messages finished. wait 20 seconds")
if not args.dry_run:
wait_for_rollout(
publisher["kind"],
publisher["metadata"]["name"],
publisher["metadata"]["namespace"],
timeout,
api_client,
("Ready", "False"),
# TODO: consider state.reason == .completed
)
self.log_event("publisher_messages_finished")
time.sleep(20)
self.log_event("publisher_wait_finished")
logger.info("---20seconds is over.")
# TODO [metadata output]: log publish message end time
logger.info("Finished waku regression test.")
self.log_event("internal_run_finished")
self._metadata_event(self.events_log_path)

View File

@@ -17,6 +17,7 @@ from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Tuple, Union
import dateparser
import ruamel.yaml
from kubernetes import client, utils
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException
@@ -96,13 +97,101 @@ def init_logger(logger: logging.Logger, verbosity: Union[str, int], log_path: Op
logger = logging.getLogger(__name__)
_kube_config = None
def kubectl_apply(kube_yaml: yaml.YAMLObject, namespace="zerotesting"):
def set_config_file(config: str):
global _kube_config
_kube_config = config
def kubectl_apply(
kube_yaml: yaml.YAMLObject, namespace="zerotesting", *, config_file=None, dry_run=False
):
if dry_run:
_kubectl_apply_dry_run(kube_yaml, namespace, config_file=config_file)
else:
_kubectl_apply(kube_yaml, namespace)
def _kubectl_apply(kube_yaml: yaml.YAMLObject, namespace="zerotesting"):
logger.debug(f"kubectl_apply the following config:\n{str(yaml.dump(kube_yaml))}")
kind = kube_yaml.get("kind")
name = kube_yaml.get("metadata", {}).get("name")
if not kind or not name:
raise ValueError(
f"YAML missing nessesary attributes 'kind' and 'metadata.name'. yaml: `{kube_yaml}`"
)
api_client = client.ApiClient()
replace_map = {
"Deployment": lambda: client.AppsV1Api(api_client).replace_namespaced_deployment(
name, namespace, kube_yaml
),
"StatefulSet": lambda: client.AppsV1Api(api_client).replace_namespaced_stateful_set(
name, namespace, kube_yaml
),
"DaemonSet": lambda: client.AppsV1Api(api_client).replace_namespaced_daemon_set(
name, namespace, kube_yaml
),
"ReplicaSet": lambda: client.AppsV1Api(api_client).replace_namespaced_replica_set(
name, namespace, kube_yaml
),
"Job": lambda: client.BatchV1Api(api_client).replace_namespaced_job(
name, namespace, kube_yaml
),
"CronJob": lambda: client.BatchV1Api(api_client).replace_namespaced_cron_job(
name, namespace, kube_yaml
),
"ReplicationController": lambda: client.CoreV1Api(
api_client
).replace_namespaced_replication_controller(name, namespace, kube_yaml),
"Pod": lambda: client.CoreV1Api(api_client).replace_namespaced_pod(
name, namespace, kube_yaml
),
"Service": lambda: client.CoreV1Api(api_client).replace_namespaced_service(
name, namespace, kube_yaml
),
}
try:
with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml", delete=False) as temp:
yaml.dump(kube_yaml, temp)
temp.flush()
utils.create_from_yaml(client.ApiClient(), yaml_file=temp.name, namespace=namespace)
except utils.FailToCreateError:
replace_fn = replace_map.get(kind)
if not replace_fn:
raise ValueError(f"Replace operation not supported for resource. kind: `{kind}`")
replace_fn()
def _kubectl_apply_dry_run(kube_yaml: yaml.YAMLObject, namespace: str, *, config_file: str):
config = config_file if config_file else _kube_config
with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml", delete=False) as temp:
yaml.dump(kube_yaml, temp)
temp.flush()
utils.create_from_yaml(client.ApiClient(), yaml_file=temp.name, namespace=namespace)
config_segment = ["--kubeconfig", config] if config else []
cmd = (
["kubectl"]
+ config_segment
+ ["apply", "-f", temp.name, "--namespace", namespace, "--dry-run=server"]
)
logger.info(f"Running command: `{' '.join(cmd)}`")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(
f"Dry run for applying kubernetes config failed."
f"deploy.yaml: `{kube_yaml}`"
f"returncode: `{result.returncode}`"
f"stdout: `{result.stdout}`"
f"stderr: `{result.stderr}`"
)
raise ValueError("Dry run for applying kubernetes config failed.")
logger.debug(f"Dry run deploying `{kube_yaml}`" f"stdout: `{result.stdout}`")
return result
def get_cleanup_resources(yamls: List[yaml.YAMLObject], types: Optional[List[str]] = None):
@@ -134,6 +223,17 @@ def get_cleanup_resources(yamls: List[yaml.YAMLObject], types: Optional[List[str
return {resource[0]: resource[1] for resource in resources.items() if resource[1]}
def delete_pod(name, namespace, *, grace_period=0):
v1 = client.CoreV1Api()
v1.delete_namespaced_pod(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(grace_period_seconds=grace_period),
grace_period_seconds=grace_period,
propagation_policy="Foreground",
)
def cleanup_resources(
resources: dict,
namespace: str,
@@ -293,15 +393,21 @@ def get_cleanup(
api_client: ApiClient, namespace: str, deployments: List[yaml.YAMLObject]
) -> Callable[[], None]:
def cleanup():
logger.info("Cleaning up resources.")
logger.debug("Cleaning up resources.")
resources_to_cleanup = get_cleanup_resources(deployments)
logger.info(f"Resources to clean up: `{resources_to_cleanup}`")
logger.debug(f"Resources to clean up: `{resources_to_cleanup}`")
logger.info("Start cleanup.")
cleanup_resources(resources_to_cleanup, namespace, api_client)
logger.info("Waiting for cleanup.")
logger.debug(f"Start cleanup.")
try:
cleanup_resources(resources_to_cleanup, namespace, api_client)
except client.exceptions.ApiException as e:
logger.error(
f"Exception cleaning up resources. Resources: `{resources_to_cleanup}` exception: `{e}`",
exc_info=True,
)
logger.debug(f"Waiting for cleanup. Resources: `{resources_to_cleanup}`")
wait_for_cleanup(resources_to_cleanup, namespace, api_client)
logger.info("Finished cleanup.")
logger.info(f"Finished cleanup. Resources: `{resources_to_cleanup}`")
return cleanup
@@ -591,30 +697,39 @@ def gen_argparse(arg_defs):
def dict_get(
dict: Dict, path: str | List[str], *, default: Any = None, sep: Optional[str] = os.path.sep
obj: Dict | list,
path: str | List[str | int] | Path,
*,
default: Any = None,
sep: Optional[str] = "/",
) -> Any:
if isinstance(path, str):
path = [node for node in path.split(sep) if node]
if isinstance(path, Path):
path = [node for node in path.parts]
if len(path) < 1:
raise KeyError(f"Invalid path. Path: `{path}`")
if len(path) == 1:
return dict.get(path[0], default)
if isinstance(obj, list):
return obj[int(path[0])]
return obj.get(path[0], default)
try:
return dict_get(dict[path[0]], path[1:], default=default, sep=sep)
key = int(path[0]) if isinstance(obj, list) else path[0]
return dict_get(obj[key], path[1:], default=default, sep=sep)
except (TypeError, KeyError):
return default
def dict_set(
dict: Dict,
path: str | List[str],
obj: Dict,
path: str | List[str] | Path,
value: Any,
*,
replace_leaf: bool = False,
replace_nondict_stems: bool = False,
sep: Optional[str] = os.path.sep,
sep: Optional[str] = "/",
) -> Optional[Any]:
"""Set value in `dict` at `path`, creating sub-dicts at path nodes if they do not already exist.
@@ -646,22 +761,27 @@ def dict_set(
for i, node in enumerate(path[:-1]):
node = path[i]
try:
if node not in dict.keys() or replace_nondict_stems:
dict[node] = {}
dict = dict[node]
if isinstance(obj, list):
node = int(node)
if node == len(obj):
obj[node].append({})
obj = obj[node]
else:
if node not in obj.keys() or replace_nondict_stems:
obj[node] = {}
obj = obj[node]
except (AttributeError, TypeError):
raise KeyError(
f"Non-dict value already exists at path. Path: `{path[0:i]}`\tKey: `{node}`\tValue: `{dict}`"
f"Non-dict value already exists at path. Path: `{path[0:i]}`\tKey: `{node}`\tValue: `{obj}`"
)
previous = None
if path[-1] in dict:
key = int(path[-1]) if isinstance(obj, list) else path[-1]
if key in obj:
if not replace_leaf:
raise KeyError(
f"Value already exists at path. Path: `{path}`\tValue: `{dict[path[-1]]}`"
)
previous = dict[path[-1]]
dict[path[-1]] = value
raise KeyError(f"Value already exists at path. Path: `{path}`\tValue: `{obj[key]}`")
previous = obj[key]
obj[key] = value
return previous
@@ -762,7 +882,6 @@ def helm_build_dir(workdir: str, values_paths: List[str], name: str) -> yaml.YAM
itertools.chain(*values)
)
logger.info(f"Running helm template command. cwd: `{workdir}`\tcommand: `{command}`")
# import pdb; pdb.set_trace() # todo asdf
logger.info(f"Usable command: `{' '.join(command)}`")
result = subprocess.run(
command,
@@ -778,9 +897,6 @@ def helm_build_dir(workdir: str, values_paths: List[str], name: str) -> yaml.YAM
return yaml.safe_load(result.stdout)
import ruamel.yaml
def get_YAML():
"""Return a ruamel.yaml.YAML() that dumps multipline strings as multiple lines instead of escaping newlines."""