diff --git a/deployments/deployment.py b/deployments/deployment.py index a316b12..74e1943 100644 --- a/deployments/deployment.py +++ b/deployments/deployment.py @@ -9,7 +9,7 @@ from kubernetes import config from kubernetes.client import ApiClient from ruamel import yaml -from kube_utils import init_logger +from kube_utils import init_logger, set_config_file from registry import registry as experiment_registry logger = logging.getLogger(__name__) @@ -25,6 +25,7 @@ def run_experiment( if not kube_config: kube_config = "~/.kube/config" config.load_kube_config(config_file=kube_config) + set_config_file(kube_config) api_client = ApiClient() try: diff --git a/deployments/deployment/base_experiment.py b/deployments/deployment/base_experiment.py index e73057d..d8d13f4 100755 --- a/deployments/deployment/base_experiment.py +++ b/deployments/deployment/base_experiment.py @@ -4,6 +4,7 @@ import os import shutil from abc import ABC, abstractmethod from argparse import ArgumentParser, Namespace +from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timezone from pathlib import Path @@ -14,7 +15,15 @@ from pydantic import BaseModel, Field from ruamel import yaml from ruamel.yaml.comments import CommentedMap -from kube_utils import poll_namespace_has_objects, wait_for_no_objs_in_namespace +from deployment.builders import build_deployment +from kube_utils import ( + dict_get, + get_cleanup, + kubectl_apply, + poll_namespace_has_objects, + wait_for_no_objs_in_namespace, + wait_for_rollout, +) logger = logging.getLogger(__name__) @@ -30,6 +39,11 @@ class BaseExperiment(ABC, BaseModel): events_log_path: Path = Field(default=Path("events.log")) + deployed: dict[str, list] = defaultdict(list) + """Dict of [namespace : yamls] for every yaml deployed with self.deploy. + + Used to determine whether or not to call `_wait_until_clear`.""" + @staticmethod def add_args(subparser: ArgumentParser): subparser.add_argument( @@ -45,6 +59,87 @@ class BaseExperiment(ABC, BaseModel): required=False, help="If present, does not wait until the namespace is empty before running the test.", ) + subparser.add_argument( + "--dry-run", + action="store_true", + required=False, + default=False, + help="If True, does not actually deploy kubernetes configs but run kubectl apply --dry-run.", + ) + + def deploy( + self, + api_client: ApiClient, + stack, + args: Namespace, + values_yaml, + workdir, + service: str, + *, + wait_for_ready=True, + extra_values_paths=None, + timeout=3600, + ): + + yaml_obj = build_deployment( + deployment_dir=Path(os.path.dirname(__file__)) / service, + workdir=os.path.join(workdir, service), + cli_values=values_yaml, + name=service, + extra_values_names=[], + extra_values_paths=extra_values_paths, + ) + + required_fields = ["metadata/namespace", "metadata/name", "kind"] + for field in required_fields: + if dict_get(yaml_obj, field) is None: + raise ValueError( + f"Deployment yaml must have an explicit value for field. Field: `{field}`" + ) + + try: + dry_run = args.dry_run + except AttributeError: + dry_run = False + + namespace = yaml_obj["metadata"]["namespace"] + + if len(self.deployed[namespace]) == 0: + self._wait_until_clear( + api_client=api_client, + namespace=namespace, + skip_check=args.skip_check, + ) + + if not dry_run: + cleanup = get_cleanup( + api_client=api_client, + namespace=namespace, + deployments=[yaml_obj], + ) + stack.callback(cleanup) + + self.log_event( + {"event": "deployment", "phase": "start", "service": service, "namespace": namespace} + ) + self.deployed[namespace].append(yaml_obj) + kubectl_apply(yaml_obj, namespace=namespace, dry_run=dry_run) + + if not dry_run: + if wait_for_ready: + wait_for_rollout( + yaml_obj["kind"], + yaml_obj["metadata"]["name"], + namespace, + timeout, + api_client, + ("Ready", "True"), + ) + self.log_event( + {"event": "deployment", "phase": "finished", "service": service, "namespace": namespace} + ) + + return yaml_obj def _set_events_log(self, workdir: Optional[str]) -> None: if self.events_log_path.is_absolute(): @@ -65,7 +160,11 @@ class BaseExperiment(ABC, BaseModel): if values_yaml is None: values_yaml = CommentedMap() + self.deployed.clear() + workdir = args.output_folder + if args.workdir: + workdir = os.path.join(workdir, args.workdir) with ExitStack() as stack: stack.callback(lambda: self.log_event("cleanup_finished")) diff --git a/deployments/deployment/builders.py b/deployments/deployment/builders.py index 4a2e62a..1216cc5 100644 --- a/deployments/deployment/builders.py +++ b/deployments/deployment/builders.py @@ -39,7 +39,7 @@ def build_deployment( extra_values_paths = [] if cli_values is None: cli_values = CommentedMap() - name = name.replace("_", "-") + name = name.replace("_", "-").replace("/", "-") logger.debug(f"Removing work dir: {workdir}") try: diff --git a/deployments/deployment/waku/experiments/regression/regression.py b/deployments/deployment/waku/experiments/regression/regression.py index 2a38078..8863f06 100755 --- a/deployments/deployment/waku/experiments/regression/regression.py +++ b/deployments/deployment/waku/experiments/regression/regression.py @@ -16,15 +16,12 @@ from ruamel import yaml from deployment.base_experiment import BaseExperiment from deployment.builders import build_deployment from kube_utils import ( - assert_equals, dict_apply, dict_get, dict_partial_compare, dict_set, dict_visit, - get_cleanup, get_flag_value, - kubectl_apply, wait_for_rollout, ) from registry import experiment @@ -112,6 +109,11 @@ class WakuRegressionNodes(BaseExperiment, BaseModel): release_name: str = Field(default="waku-regression-nodes") deployment_dir: str = Field(default=Path(os.path.dirname(__file__)).parent.parent) + extra_paths: List[Path] = [ + Path(os.path.dirname(__file__)) / f"bootstrap.values.yaml", + Path(os.path.dirname(__file__)) / f"nodes.values.yaml", + Path(os.path.dirname(__file__)) / f"publisher.values.yaml", + ] @classmethod def add_parser(cls, subparsers) -> None: @@ -183,6 +185,14 @@ class WakuRegressionNodes(BaseExperiment, BaseModel): def _metadata_event(self, events_log_path: str): self.log_event(self.__class__._get_metadata_event(self.events_log_path)) + def log_event(self, event): + logger.info(event) + return super().log_event(event) + + def log_event(self, event): + logger.info(event) + return super().log_event(event) + def _run( self, api_client: ApiClient, @@ -191,103 +201,63 @@ class WakuRegressionNodes(BaseExperiment, BaseModel): values_yaml: Optional[yaml.YAMLObject], stack: ExitStack, ): + def deploy(service, values, *, wait_for_ready=False): + try: + values = values._data + except AttributeError: + pass + return self.deploy( + api_client, + stack, + args, + values, + workdir, + service, + wait_for_ready=wait_for_ready, + extra_values_paths=self.extra_paths, + ) + self.log_event("run_start") - # TODO [values param checking]: Add friendly error messages for missing/extraneous variables in values.yaml. - logger.info("Building kubernetes configs.") - nodes = self._build(workdir, values_yaml, "nodes") - bootstrap = self._build(workdir, values_yaml, "bootstrap") - - self.log_event({"event": "deployment", "service": "waku/publisher", "phase": "start"}) - publisher = self._build(workdir, values_yaml, "publisher") - - # Sanity check - namespace = bootstrap["metadata"]["namespace"] - logger.info(f"namespace={namespace}") - assert_equals(nodes["metadata"]["namespace"], namespace) - assert_equals(publisher["metadata"]["namespace"], namespace) - - # TODO [metadata output]: log start time to output file here. - logger.info("Applying kubernetes configs.") - - cleanup = get_cleanup( - api_client=api_client, - namespace=namespace, - deployments=[bootstrap, nodes, publisher], - ) - stack.callback(cleanup) - - self._wait_until_clear( - api_client=api_client, - namespace=namespace, - skip_check=args.skip_check, - ) - - self.log_event("deployments_start") - - # Apply bootstrap - logger.info("Applying bootstrap") - kubectl_apply(bootstrap, namespace=namespace) - logger.info("bootstrap applied. Waiting for rollout.") - wait_for_rollout(bootstrap["kind"], bootstrap["metadata"]["name"], namespace, 2000) + deploy("waku/bootstrap", values_yaml, wait_for_ready=True) + nodes = deploy("waku/nodes", values_yaml, wait_for_ready=True) num_nodes = nodes["spec"]["replicas"] + + publisher = deploy("waku/publisher", values_yaml, wait_for_ready=True) messages = get_flag_value("messages", publisher["spec"]["containers"][0]["command"]) delay_seconds = get_flag_value( "delay-seconds", publisher["spec"]["containers"][0]["command"] ) - # Apply nodes configuration - logger.info("Applying nodes") - kubectl_apply(nodes, namespace=namespace) - logger.info("nodes applied. Waiting for rollout.") - timeout = num_nodes * 3000 - wait_for_rollout(nodes["kind"], nodes["metadata"]["name"], namespace, timeout) - - self.log_event("nodes_deploy_finished") - logger.info("nodes rolled out. wait to stablize") - time.sleep(60) - self.log_event("publisher_deploy_start") - logger.info("delay over") - - # TODO [metadata output]: log publish message start time - # Apply publisher configuration - logger.info("applying publisher") - kubectl_apply(publisher, namespace=namespace) - logger.info("publisher applied. Waiting for rollout.") - wait_for_rollout( - publisher["kind"], - publisher["metadata"]["name"], - namespace, - 20, - api_client, - ("Ready", "True"), - # TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True" - ) + if not args.dry_run: + wait_for_rollout( + publisher["kind"], + publisher["metadata"]["name"], + publisher["metadata"]["namespace"], + 20, + api_client, + ("Ready", "True"), + # TODO [extend condition checks] lambda cond : cond.type == "Ready" and cond.status == "True" + ) self.log_event("publisher_deploy_finished") - logger.info("publisher rollout done.") - logger.info("---publisher is up. begin messages") - - timeout = num_nodes * messages * delay_seconds * 120 + timeout = (num_nodes + 5) * messages * delay_seconds * 120 logger.info(f"Waiting for Ready=False. Timeout: {timeout}") - wait_for_rollout( - publisher["kind"], - publisher["metadata"]["name"], - namespace, - timeout, - api_client, - ("Ready", "False"), - ) - # TODO: consider state.reason == .completed - logger.info("---publisher messages finished. wait 20 seconds") + if not args.dry_run: + wait_for_rollout( + publisher["kind"], + publisher["metadata"]["name"], + publisher["metadata"]["namespace"], + timeout, + api_client, + ("Ready", "False"), + # TODO: consider state.reason == .completed + ) self.log_event("publisher_messages_finished") time.sleep(20) self.log_event("publisher_wait_finished") - logger.info("---20seconds is over.") - # TODO [metadata output]: log publish message end time - logger.info("Finished waku regression test.") self.log_event("internal_run_finished") self._metadata_event(self.events_log_path) diff --git a/deployments/kube_utils.py b/deployments/kube_utils.py index 1bd0e89..47bb94f 100644 --- a/deployments/kube_utils.py +++ b/deployments/kube_utils.py @@ -17,6 +17,7 @@ from pathlib import Path from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Tuple, Union import dateparser +import ruamel.yaml from kubernetes import client, utils from kubernetes.client import ApiClient from kubernetes.client.rest import ApiException @@ -96,13 +97,101 @@ def init_logger(logger: logging.Logger, verbosity: Union[str, int], log_path: Op logger = logging.getLogger(__name__) +_kube_config = None -def kubectl_apply(kube_yaml: yaml.YAMLObject, namespace="zerotesting"): + +def set_config_file(config: str): + global _kube_config + _kube_config = config + + +def kubectl_apply( + kube_yaml: yaml.YAMLObject, namespace="zerotesting", *, config_file=None, dry_run=False +): + if dry_run: + _kubectl_apply_dry_run(kube_yaml, namespace, config_file=config_file) + else: + _kubectl_apply(kube_yaml, namespace) + + +def _kubectl_apply(kube_yaml: yaml.YAMLObject, namespace="zerotesting"): logger.debug(f"kubectl_apply the following config:\n{str(yaml.dump(kube_yaml))}") + kind = kube_yaml.get("kind") + name = kube_yaml.get("metadata", {}).get("name") + if not kind or not name: + raise ValueError( + f"YAML missing nessesary attributes 'kind' and 'metadata.name'. yaml: `{kube_yaml}`" + ) + + api_client = client.ApiClient() + + replace_map = { + "Deployment": lambda: client.AppsV1Api(api_client).replace_namespaced_deployment( + name, namespace, kube_yaml + ), + "StatefulSet": lambda: client.AppsV1Api(api_client).replace_namespaced_stateful_set( + name, namespace, kube_yaml + ), + "DaemonSet": lambda: client.AppsV1Api(api_client).replace_namespaced_daemon_set( + name, namespace, kube_yaml + ), + "ReplicaSet": lambda: client.AppsV1Api(api_client).replace_namespaced_replica_set( + name, namespace, kube_yaml + ), + "Job": lambda: client.BatchV1Api(api_client).replace_namespaced_job( + name, namespace, kube_yaml + ), + "CronJob": lambda: client.BatchV1Api(api_client).replace_namespaced_cron_job( + name, namespace, kube_yaml + ), + "ReplicationController": lambda: client.CoreV1Api( + api_client + ).replace_namespaced_replication_controller(name, namespace, kube_yaml), + "Pod": lambda: client.CoreV1Api(api_client).replace_namespaced_pod( + name, namespace, kube_yaml + ), + "Service": lambda: client.CoreV1Api(api_client).replace_namespaced_service( + name, namespace, kube_yaml + ), + } + + try: + with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml", delete=False) as temp: + yaml.dump(kube_yaml, temp) + temp.flush() + utils.create_from_yaml(client.ApiClient(), yaml_file=temp.name, namespace=namespace) + except utils.FailToCreateError: + replace_fn = replace_map.get(kind) + if not replace_fn: + raise ValueError(f"Replace operation not supported for resource. kind: `{kind}`") + replace_fn() + + +def _kubectl_apply_dry_run(kube_yaml: yaml.YAMLObject, namespace: str, *, config_file: str): + config = config_file if config_file else _kube_config + with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml", delete=False) as temp: yaml.dump(kube_yaml, temp) temp.flush() - utils.create_from_yaml(client.ApiClient(), yaml_file=temp.name, namespace=namespace) + config_segment = ["--kubeconfig", config] if config else [] + cmd = ( + ["kubectl"] + + config_segment + + ["apply", "-f", temp.name, "--namespace", namespace, "--dry-run=server"] + ) + logger.info(f"Running command: `{' '.join(cmd)}`") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logger.error( + f"Dry run for applying kubernetes config failed." + f"deploy.yaml: `{kube_yaml}`" + f"returncode: `{result.returncode}`" + f"stdout: `{result.stdout}`" + f"stderr: `{result.stderr}`" + ) + raise ValueError("Dry run for applying kubernetes config failed.") + logger.debug(f"Dry run deploying `{kube_yaml}`" f"stdout: `{result.stdout}`") + return result def get_cleanup_resources(yamls: List[yaml.YAMLObject], types: Optional[List[str]] = None): @@ -134,6 +223,17 @@ def get_cleanup_resources(yamls: List[yaml.YAMLObject], types: Optional[List[str return {resource[0]: resource[1] for resource in resources.items() if resource[1]} +def delete_pod(name, namespace, *, grace_period=0): + v1 = client.CoreV1Api() + v1.delete_namespaced_pod( + name=name, + namespace=namespace, + body=client.V1DeleteOptions(grace_period_seconds=grace_period), + grace_period_seconds=grace_period, + propagation_policy="Foreground", + ) + + def cleanup_resources( resources: dict, namespace: str, @@ -293,15 +393,21 @@ def get_cleanup( api_client: ApiClient, namespace: str, deployments: List[yaml.YAMLObject] ) -> Callable[[], None]: def cleanup(): - logger.info("Cleaning up resources.") + logger.debug("Cleaning up resources.") resources_to_cleanup = get_cleanup_resources(deployments) - logger.info(f"Resources to clean up: `{resources_to_cleanup}`") + logger.debug(f"Resources to clean up: `{resources_to_cleanup}`") - logger.info("Start cleanup.") - cleanup_resources(resources_to_cleanup, namespace, api_client) - logger.info("Waiting for cleanup.") + logger.debug(f"Start cleanup.") + try: + cleanup_resources(resources_to_cleanup, namespace, api_client) + except client.exceptions.ApiException as e: + logger.error( + f"Exception cleaning up resources. Resources: `{resources_to_cleanup}` exception: `{e}`", + exc_info=True, + ) + logger.debug(f"Waiting for cleanup. Resources: `{resources_to_cleanup}`") wait_for_cleanup(resources_to_cleanup, namespace, api_client) - logger.info("Finished cleanup.") + logger.info(f"Finished cleanup. Resources: `{resources_to_cleanup}`") return cleanup @@ -591,30 +697,39 @@ def gen_argparse(arg_defs): def dict_get( - dict: Dict, path: str | List[str], *, default: Any = None, sep: Optional[str] = os.path.sep + obj: Dict | list, + path: str | List[str | int] | Path, + *, + default: Any = None, + sep: Optional[str] = "/", ) -> Any: if isinstance(path, str): path = [node for node in path.split(sep) if node] + if isinstance(path, Path): + path = [node for node in path.parts] if len(path) < 1: raise KeyError(f"Invalid path. Path: `{path}`") if len(path) == 1: - return dict.get(path[0], default) + if isinstance(obj, list): + return obj[int(path[0])] + return obj.get(path[0], default) try: - return dict_get(dict[path[0]], path[1:], default=default, sep=sep) + key = int(path[0]) if isinstance(obj, list) else path[0] + return dict_get(obj[key], path[1:], default=default, sep=sep) except (TypeError, KeyError): return default def dict_set( - dict: Dict, - path: str | List[str], + obj: Dict, + path: str | List[str] | Path, value: Any, *, replace_leaf: bool = False, replace_nondict_stems: bool = False, - sep: Optional[str] = os.path.sep, + sep: Optional[str] = "/", ) -> Optional[Any]: """Set value in `dict` at `path`, creating sub-dicts at path nodes if they do not already exist. @@ -646,22 +761,27 @@ def dict_set( for i, node in enumerate(path[:-1]): node = path[i] try: - if node not in dict.keys() or replace_nondict_stems: - dict[node] = {} - dict = dict[node] + if isinstance(obj, list): + node = int(node) + if node == len(obj): + obj[node].append({}) + obj = obj[node] + else: + if node not in obj.keys() or replace_nondict_stems: + obj[node] = {} + obj = obj[node] except (AttributeError, TypeError): raise KeyError( - f"Non-dict value already exists at path. Path: `{path[0:i]}`\tKey: `{node}`\tValue: `{dict}`" + f"Non-dict value already exists at path. Path: `{path[0:i]}`\tKey: `{node}`\tValue: `{obj}`" ) previous = None - if path[-1] in dict: + key = int(path[-1]) if isinstance(obj, list) else path[-1] + if key in obj: if not replace_leaf: - raise KeyError( - f"Value already exists at path. Path: `{path}`\tValue: `{dict[path[-1]]}`" - ) - previous = dict[path[-1]] - dict[path[-1]] = value + raise KeyError(f"Value already exists at path. Path: `{path}`\tValue: `{obj[key]}`") + previous = obj[key] + obj[key] = value return previous @@ -762,7 +882,6 @@ def helm_build_dir(workdir: str, values_paths: List[str], name: str) -> yaml.YAM itertools.chain(*values) ) logger.info(f"Running helm template command. cwd: `{workdir}`\tcommand: `{command}`") - # import pdb; pdb.set_trace() # todo asdf logger.info(f"Usable command: `{' '.join(command)}`") result = subprocess.run( command, @@ -778,9 +897,6 @@ def helm_build_dir(workdir: str, values_paths: List[str], name: str) -> yaml.YAM return yaml.safe_load(result.stdout) -import ruamel.yaml - - def get_YAML(): """Return a ruamel.yaml.YAML() that dumps multipline strings as multiple lines instead of escaping newlines."""