Files
autogen/test/coding/test_kubernetes_commandline_code_executor.py
kiyoung 02977ee250 K8s code executor (#3419)
* remove coding directory from gitignore

* add k8s PodCommandLineCodeExecutor

* add error handlings and custom pod spec parameter

* change parameter name to kube_config_file

* add param container_name

* add test case for PodCommandLineCodeExecutor

* add test guide

* draft for docs notebook

* test code fix indent

* add document

* add license info

* revise documentation

* modify document: install autogen-agentchat

* apply pre-commit

* revert change to gitignore

* error handling: move import block into try block

---------

Co-authored-by: Ryan Sweet <rysweet@microsoft.com>
Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>
2024-10-16 12:22:50 -07:00

204 lines
8.3 KiB
Python

import importlib
import os
import sys
from pathlib import Path
import pytest
from autogen.code_utils import TIMEOUT_MSG
from autogen.coding.base import CodeBlock, CodeExecutor
try:
from autogen.coding.kubernetes import PodCommandLineCodeExecutor
client = importlib.import_module("kubernetes.client")
config = importlib.import_module("kubernetes.config")
kubeconfig = Path(".kube/config")
if os.environ.get("KUBECONFIG", None):
kubeconfig = Path(os.environ["KUBECONFIG"])
elif sys.platform == "win32":
kubeconfig = os.environ["userprofile"] / kubeconfig
else:
kubeconfig = os.environ["HOME"] / kubeconfig
if kubeconfig.is_file():
config.load_config(config_file=str(kubeconfig))
api_client = client.CoreV1Api()
api_client.list_namespace()
skip_kubernetes_tests = False
else:
skip_kubernetes_tests = True
pod_spec = client.V1Pod(
metadata=client.V1ObjectMeta(
name="abcd", namespace="default", annotations={"sidecar.istio.io/inject": "false"}
),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[
client.V1Container(
args=["-c", "while true;do sleep 5; done"],
command=["/bin/sh"],
name="abcd",
image="python:3.11-slim",
env=[
client.V1EnvVar(name="TEST", value="TEST"),
client.V1EnvVar(
name="POD_NAME",
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path="metadata.name")
),
),
],
)
],
),
)
except Exception:
skip_kubernetes_tests = True
@pytest.mark.skipif(skip_kubernetes_tests, reason="kubernetes not accessible")
def test_create_default_pod_executor():
with PodCommandLineCodeExecutor(namespace="default", kube_config_file=str(kubeconfig)) as executor:
assert executor.timeout == 60
assert executor.work_dir == Path("/workspace")
assert executor._container_name == "autogen-code-exec"
assert executor._pod.metadata.name.startswith("autogen-code-exec-")
_test_execute_code(executor)
@pytest.mark.skipif(skip_kubernetes_tests, reason="kubernetes not accessible")
def test_create_node_pod_executor():
with PodCommandLineCodeExecutor(
image="node:22-alpine",
namespace="default",
work_dir="./app",
timeout=30,
kube_config_file=str(kubeconfig),
execution_policies={"javascript": True},
) as executor:
assert executor.timeout == 30
assert executor.work_dir == Path("./app")
assert executor._container_name == "autogen-code-exec"
assert executor._pod.metadata.name.startswith("autogen-code-exec-")
assert executor.execution_policies["javascript"]
# Test single code block.
code_blocks = [CodeBlock(code="console.log('hello world!')", language="javascript")]
code_result = executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test multiple code blocks.
code_blocks = [
CodeBlock(code="console.log('hello world!')", language="javascript"),
CodeBlock(code="let a = 100 + 100; console.log(a)", language="javascript"),
]
code_result = executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test running code.
file_lines = ["console.log('hello world!')", "let a = 100 + 100", "console.log(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="javascript")]
code_result = executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
@pytest.mark.skipif(skip_kubernetes_tests, reason="kubernetes not accessible")
def test_create_pod_spec_pod_executor():
with PodCommandLineCodeExecutor(
pod_spec=pod_spec, container_name="abcd", kube_config_file=str(kubeconfig)
) as executor:
assert executor.timeout == 60
assert executor._container_name == "abcd"
assert executor._pod.metadata.name == pod_spec.metadata.name
assert executor._pod.metadata.namespace == pod_spec.metadata.namespace
_test_execute_code(executor)
# Test bash script.
if sys.platform not in ["win32"]:
code_blocks = [CodeBlock(code="echo $TEST $POD_NAME", language="bash")]
code_result = executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0 and "TEST abcd" in code_result.output and code_result.code_file is not None
)
@pytest.mark.skipif(skip_kubernetes_tests, reason="kubernetes not accessible")
def test_pod_executor_timeout():
with PodCommandLineCodeExecutor(namespace="default", timeout=5, kube_config_file=str(kubeconfig)) as executor:
assert executor.timeout == 5
assert executor.work_dir == Path("/workspace")
assert executor._container_name == "autogen-code-exec"
assert executor._pod.metadata.name.startswith("autogen-code-exec-")
# Test running code.
file_lines = ["import time", "time.sleep(10)", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 124 and TIMEOUT_MSG in code_result.output and code_result.code_file is not None
def _test_execute_code(executor: CodeExecutor) -> None:
# Test single code block.
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test multiple code blocks.
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test bash script.
if sys.platform not in ["win32"]:
code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")]
code_result = executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test running code.
file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test running code has filename.
file_lines = ["# filename: test.py", "import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = executor.execute_code_blocks(code_blocks)
print(code_result.code_file)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file.find("test.py") > 0
)
# Test error code.
code_blocks = [CodeBlock(code="print(sys.platform)", language="python")]
code_result = executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 1 and "Traceback" in code_result.output and code_result.code_file is not None