From 76ef0789c0f12198fee20ebb2a559e265f59a688 Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Thu, 29 Feb 2024 09:54:11 -0500 Subject: [PATCH] Implement docker based jupyter executor (#1794) * Implement docker based jupyter client * formatting * skip docker tests when asked * feedback * add log * update build * formatting * structural changes * update setup.py * update tests --------- Co-authored-by: Chi Wang --- .github/workflows/build.yml | 3 +- autogen/coding/__init__.py | 9 +- autogen/coding/factory.py | 6 +- autogen/coding/jupyter/__init__.py | 13 +- .../coding/jupyter/docker_jupyter_server.py | 167 ++++++++++++++++++ .../embedded_ipython_code_executor.py | 6 +- autogen/coding/jupyter/jupyter_client.py | 21 ++- .../{ => jupyter}/jupyter_code_executor.py | 35 ++-- setup.py | 11 +- .../test_embedded_ipython_code_executor.py | 38 +++- 10 files changed, 268 insertions(+), 41 deletions(-) create mode 100644 autogen/coding/jupyter/docker_jupyter_server.py rename autogen/coding/{ => jupyter}/embedded_ipython_code_executor.py (98%) rename autogen/coding/{ => jupyter}/jupyter_code_executor.py (91%) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2abbc6bd6..c02e7bc2e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -46,9 +46,8 @@ jobs: # code executors auto skip without deps, so only run for python 3.11 if: matrix.python-version == '3.11' run: | - pip install jupyter-client ipykernel + pip install -e ".[jupyter-executor]" python -m ipykernel install --user --name python3 - pip install -e ".[local-jupyter-exec]" - name: Set AUTOGEN_USE_DOCKER based on OS shell: bash run: | diff --git a/autogen/coding/__init__.py b/autogen/coding/__init__.py index 7c223401d..cf75d1143 100644 --- a/autogen/coding/__init__.py +++ b/autogen/coding/__init__.py @@ -2,4 +2,11 @@ from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult from .factory import CodeExecutorFactory from .markdown_code_extractor import MarkdownCodeExtractor -__all__ = ("CodeBlock", "CodeResult", "CodeExtractor", "CodeExecutor", "CodeExecutorFactory", "MarkdownCodeExtractor") +__all__ = ( + "CodeBlock", + "CodeResult", + "CodeExtractor", + "CodeExecutor", + "CodeExecutorFactory", + "MarkdownCodeExtractor", +) diff --git a/autogen/coding/factory.py b/autogen/coding/factory.py index ceb01ca3d..e4ff09c56 100644 --- a/autogen/coding/factory.py +++ b/autogen/coding/factory.py @@ -30,16 +30,12 @@ class CodeExecutorFactory: # If the executor is already an instance of CodeExecutor, return it. return executor if executor == "ipython-embedded": - from .embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor + from .jupyter.embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor return EmbeddedIPythonCodeExecutor(**code_execution_config.get("ipython-embedded", {})) elif executor == "commandline-local": from .local_commandline_code_executor import LocalCommandlineCodeExecutor return LocalCommandlineCodeExecutor(**code_execution_config.get("commandline-local", {})) - elif executor == "jupyter-local": - from .jupyter_code_executor import LocalJupyterCodeExecutor - - return LocalJupyterCodeExecutor(**code_execution_config.get("jupyter-local", {})) else: raise ValueError(f"Unknown code executor {executor}") diff --git a/autogen/coding/jupyter/__init__.py b/autogen/coding/jupyter/__init__.py index 96c8cf4a6..5c1a9607f 100644 --- a/autogen/coding/jupyter/__init__.py +++ b/autogen/coding/jupyter/__init__.py @@ -1,5 +1,16 @@ from .base import JupyterConnectable, JupyterConnectionInfo from .jupyter_client import JupyterClient from .local_jupyter_server import LocalJupyterServer +from .docker_jupyter_server import DockerJupyterServer +from .embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor +from .jupyter_code_executor import JupyterCodeExecutor -__all__ = ["JupyterConnectable", "JupyterConnectionInfo", "JupyterClient", "LocalJupyterServer"] +__all__ = [ + "JupyterConnectable", + "JupyterConnectionInfo", + "JupyterClient", + "LocalJupyterServer", + "DockerJupyterServer", + "EmbeddedIPythonCodeExecutor", + "JupyterCodeExecutor", +] diff --git a/autogen/coding/jupyter/docker_jupyter_server.py b/autogen/coding/jupyter/docker_jupyter_server.py new file mode 100644 index 000000000..5288d295c --- /dev/null +++ b/autogen/coding/jupyter/docker_jupyter_server.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +from pathlib import Path +import sys +from time import sleep +from types import TracebackType +import uuid +from typing import Dict, Optional, Union +import docker +import secrets +import io +import atexit +import logging + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + + +from .jupyter_client import JupyterClient +from .base import JupyterConnectable, JupyterConnectionInfo + + +def _wait_for_ready(container: docker.Container, timeout: int = 60, stop_time: int = 0.1) -> None: + elapsed_time = 0 + while container.status != "running" and elapsed_time < timeout: + sleep(stop_time) + elapsed_time += stop_time + container.reload() + continue + if container.status != "running": + raise ValueError("Container failed to start") + + +class DockerJupyterServer(JupyterConnectable): + DEFAULT_DOCKERFILE = """FROM quay.io/jupyter/docker-stacks-foundation + +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +USER ${NB_UID} +RUN mamba install --yes jupyter_kernel_gateway ipykernel && \ + mamba clean --all -f -y && \ + fix-permissions "${CONDA_DIR}" && \ + fix-permissions "/home/${NB_USER}" + +ENV TOKEN="UNSET" +CMD python -m jupyter kernelgateway --KernelGatewayApp.ip=0.0.0.0 \ + --KernelGatewayApp.port=8888 \ + --KernelGatewayApp.auth_token="${TOKEN}" \ + --JupyterApp.answer_yes=true \ + --JupyterWebsocketPersonality.list_kernels=true + +EXPOSE 8888 + +WORKDIR "${HOME}" +""" + + class GenerateToken: + pass + + def __init__( + self, + *, + custom_image_name: Optional[str] = None, + container_name: Optional[str] = None, + auto_remove: bool = True, + stop_container: bool = True, + docker_env: Dict[str, str] = {}, + token: Union[str, GenerateToken] = GenerateToken(), + ): + """Start a Jupyter kernel gateway server in a Docker container. + + Args: + custom_image_name (Optional[str], optional): Custom image to use. If this is None, + then the bundled image will be built and used. The default image is based on + quay.io/jupyter/docker-stacks-foundation and extended to include jupyter_kernel_gateway + container_name (Optional[str], optional): Name of the container to start. + A name will be generated if None. + auto_remove (bool, optional): If true the Docker container will be deleted + when it is stopped. + stop_container (bool, optional): If true the container will be stopped, + either by program exit or using the context manager + docker_env (Dict[str, str], optional): Extra environment variables to pass + to the running Docker container. + token (Union[str, GenerateToken], optional): Token to use for authentication. + If GenerateToken is used, a random token will be generated. Empty string + will be unauthenticated. + """ + if container_name is None: + container_name = f"autogen-jupyterkernelgateway-{uuid.uuid4()}" + + client = docker.from_env() + if custom_image_name is None: + image_name = "autogen-jupyterkernelgateway" + # Make sure the image exists + try: + client.images.get(image_name) + except docker.errors.ImageNotFound: + # Build the image + # Get this script directory + here = Path(__file__).parent + dockerfile = io.BytesIO(self.DEFAULT_DOCKERFILE.encode("utf-8")) + logging.info(f"Image {image_name} not found. Building it now.") + client.images.build(path=here, fileobj=dockerfile, tag=image_name) + logging.info(f"Image {image_name} built successfully.") + else: + image_name = custom_image_name + # Check if the image exists + try: + client.images.get(image_name) + except docker.errors.ImageNotFound: + raise ValueError(f"Custom image {image_name} does not exist") + + if isinstance(token, DockerJupyterServer.GenerateToken): + self._token = secrets.token_hex(32) + else: + self._token = token + + # Run the container + env = {"TOKEN": self._token} + env.update(docker_env) + container = client.containers.run( + image_name, + detach=True, + auto_remove=auto_remove, + environment=env, + publish_all_ports=True, + name=container_name, + ) + _wait_for_ready(container) + container_ports = container.ports + self._port = int(container_ports["8888/tcp"][0]["HostPort"]) + self._container_id = container.id + + def cleanup(): + try: + inner_container = client.containers.get(container.id) + inner_container.stop() + except docker.errors.NotFound: + pass + + atexit.unregister(cleanup) + + if stop_container: + atexit.register(cleanup) + + self._cleanup_func = cleanup + self._stop_container = stop_container + + @property + def connection_info(self) -> JupyterConnectionInfo: + return JupyterConnectionInfo(host="127.0.0.1", use_https=False, port=self._port, token=self._token) + + def stop(self): + self._cleanup_func() + + def get_client(self) -> JupyterClient: + return JupyterClient(self.connection_info) + + def __enter__(self) -> Self: + return self + + def __exit__( + self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + self.stop() diff --git a/autogen/coding/embedded_ipython_code_executor.py b/autogen/coding/jupyter/embedded_ipython_code_executor.py similarity index 98% rename from autogen/coding/embedded_ipython_code_executor.py rename to autogen/coding/jupyter/embedded_ipython_code_executor.py index a83dab233..7758c0a2b 100644 --- a/autogen/coding/embedded_ipython_code_executor.py +++ b/autogen/coding/jupyter/embedded_ipython_code_executor.py @@ -11,9 +11,9 @@ from jupyter_client import KernelManager # type: ignore[attr-defined] from jupyter_client.kernelspec import KernelSpecManager from pydantic import BaseModel, Field, field_validator -from ..agentchat.agent import LLMAgent -from .base import CodeBlock, CodeExtractor, IPythonCodeResult -from .markdown_code_extractor import MarkdownCodeExtractor +from ...agentchat.agent import LLMAgent +from ..base import CodeBlock, CodeExtractor, IPythonCodeResult +from ..markdown_code_extractor import MarkdownCodeExtractor __all__ = "EmbeddedIPythonCodeExecutor" diff --git a/autogen/coding/jupyter/jupyter_client.py b/autogen/coding/jupyter/jupyter_client.py index edecc415c..459add85b 100644 --- a/autogen/coding/jupyter/jupyter_client.py +++ b/autogen/coding/jupyter/jupyter_client.py @@ -14,6 +14,7 @@ import json import uuid import datetime import requests +from requests.adapters import HTTPAdapter, Retry import websocket from websocket import WebSocket @@ -26,6 +27,9 @@ class JupyterClient: def __init__(self, connection_info: JupyterConnectionInfo): self._connection_info = connection_info + self._session = requests.Session() + retries = Retry(total=5, backoff_factor=0.1) + self._session.mount("http://", HTTPAdapter(max_retries=retries)) def _get_headers(self) -> Dict[str, str]: if self._connection_info.token is None: @@ -40,11 +44,11 @@ class JupyterClient: return f"ws://{self._connection_info.host}:{self._connection_info.port}" def list_kernel_specs(self) -> Dict[str, Dict[str, str]]: - response = requests.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers()) + response = self._session.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers()) return cast(Dict[str, Dict[str, str]], response.json()) def list_kernels(self) -> List[Dict[str, str]]: - response = requests.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers()) + response = self._session.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers()) return cast(List[Dict[str, str]], response.json()) def start_kernel(self, kernel_spec_name: str) -> str: @@ -57,15 +61,21 @@ class JupyterClient: str: ID of the started kernel """ - response = requests.post( + response = self._session.post( f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers(), json={"name": kernel_spec_name}, ) return cast(str, response.json()["id"]) + def delete_kernel(self, kernel_id: str) -> None: + response = self._session.delete( + f"{self._get_api_base_url()}/api/kernels/{kernel_id}", headers=self._get_headers() + ) + response.raise_for_status() + def restart_kernel(self, kernel_id: str) -> None: - response = requests.post( + response = self._session.post( f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers() ) response.raise_for_status() @@ -100,6 +110,9 @@ class JupyterKernelClient: def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> None: + self.stop() + + def stop(self) -> None: self._websocket.close() def _send_message(self, *, content: Dict[str, Any], channel: str, message_type: str) -> str: diff --git a/autogen/coding/jupyter_code_executor.py b/autogen/coding/jupyter/jupyter_code_executor.py similarity index 91% rename from autogen/coding/jupyter_code_executor.py rename to autogen/coding/jupyter/jupyter_code_executor.py index 551aea18a..5e190d5f1 100644 --- a/autogen/coding/jupyter_code_executor.py +++ b/autogen/coding/jupyter/jupyter_code_executor.py @@ -3,18 +3,22 @@ import json import os from pathlib import Path import re +from types import TracebackType import uuid -from typing import Any, ClassVar, List, Union +from typing import Any, ClassVar, List, Optional, Union +import sys -from pydantic import Field +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self -from ..agentchat.agent import LLMAgent -from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult, IPythonCodeResult -from .markdown_code_extractor import MarkdownCodeExtractor -from .jupyter import JupyterConnectable, JupyterConnectionInfo, LocalJupyterServer, JupyterClient - -__all__ = ("JupyterCodeExecutor", "LocalJupyterCodeExecutor") +from ...agentchat.agent import LLMAgent +from ..base import CodeBlock, CodeExecutor, CodeExtractor, IPythonCodeResult +from ..markdown_code_extractor import MarkdownCodeExtractor +from .base import JupyterConnectable, JupyterConnectionInfo +from .jupyter_client import JupyterClient class JupyterCodeExecutor(CodeExecutor): @@ -214,9 +218,14 @@ the output will be a path to the image instead of the image itself. lines[i] = line.replace(match.group(0), match.group(0) + " -qqq") return "\n".join(lines) + def stop(self) -> None: + """Stop the kernel.""" + self._jupyter_client.delete_kernel(self._kernel_id) -class LocalJupyterCodeExecutor(JupyterCodeExecutor): - def __init__(self, **kwargs: Any): - """Creates a LocalJupyterServer and passes it to JupyterCodeExecutor, see JupyterCodeExecutor for args""" - jupyter_server = LocalJupyterServer() - super().__init__(jupyter_server=jupyter_server, **kwargs) + def __enter__(self) -> Self: + return self + + def __exit__( + self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + self.stop() diff --git a/setup.py b/setup.py index 768ad708c..b90711546 100644 --- a/setup.py +++ b/setup.py @@ -56,10 +56,15 @@ setuptools.setup( "websurfer": ["beautifulsoup4", "markdownify", "pdfminer.six", "pathvalidate"], "redis": ["redis"], # Dependencies for EmbeddedIPythonExecutor, to be removed once upstream bug fixed + # jupyter-client # https://github.com/jupyter-server/kernel_gateway/issues/398 - "ipython": ["jupyter-client>=8.6.0", "ipykernel>=6.29.0"], - # Dependencies for LocalJupyterExecutor - "local-jupyter-exec": ["jupyter-kernel-gateway", "websocket-client", "requests", "ipykernel"], + "jupyter-executor": [ + "jupyter-kernel-gateway", + "websocket-client", + "requests", + "jupyter-client>=8.6.0", + "ipykernel>=6.29.0", + ], }, classifiers=[ "Programming Language :: Python :: 3", diff --git a/test/coding/test_embedded_ipython_code_executor.py b/test/coding/test_embedded_ipython_code_executor.py index fcd423497..75d827fdf 100644 --- a/test/coding/test_embedded_ipython_code_executor.py +++ b/test/coding/test_embedded_ipython_code_executor.py @@ -9,11 +9,25 @@ from autogen.agentchat.conversable_agent import ConversableAgent from autogen.coding.base import CodeBlock, CodeExecutor from autogen.coding.factory import CodeExecutorFactory from autogen.oai.openai_utils import config_list_from_json -from conftest import MOCK_OPEN_AI_API_KEY, skip_openai # noqa: E402 +from conftest import MOCK_OPEN_AI_API_KEY, skip_openai, skip_docker # noqa: E402 try: - from autogen.coding.embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor - from autogen.coding.jupyter_code_executor import LocalJupyterCodeExecutor + from autogen.coding.jupyter import ( + DockerJupyterServer, + EmbeddedIPythonCodeExecutor, + JupyterCodeExecutor, + LocalJupyterServer, + ) + + class DockerJupyterExecutor(JupyterCodeExecutor): + def __init__(self, **kwargs): + jupyter_server = DockerJupyterServer() + super().__init__(jupyter_server=jupyter_server, **kwargs) + + class LocalJupyterCodeExecutor(JupyterCodeExecutor): + def __init__(self, **kwargs): + jupyter_server = LocalJupyterServer() + super().__init__(jupyter_server=jupyter_server, **kwargs) # Skip on windows due to kernelgateway bug https://github.com/jupyter-server/kernel_gateway/issues/398 if sys.platform == "win32": @@ -21,21 +35,27 @@ try: else: classes_to_test = [EmbeddedIPythonCodeExecutor, LocalJupyterCodeExecutor] + if not skip_docker: + classes_to_test.append(DockerJupyterExecutor) + skip = False skip_reason = "" -except ImportError: +except ImportError as e: skip = True - skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor or LocalJupyterCodeExecutor not installed." + skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor or LocalJupyterCodeExecutor not installed. " + e.msg classes_to_test = [] +@pytest.mark.skipif(skip, reason=skip_reason) +def test_create_dict() -> None: + config: Dict[str, Union[str, CodeExecutor]] = {"executor": "ipython-embedded"} + executor = CodeExecutorFactory.create(config) + assert isinstance(executor, EmbeddedIPythonCodeExecutor) + + @pytest.mark.skipif(skip, reason=skip_reason) @pytest.mark.parametrize("cls", classes_to_test) def test_create(cls) -> None: - config: Dict[str, Union[str, CodeExecutor]] = {"executor": "ipython-embedded"} - executor = CodeExecutorFactory.create(config) - assert isinstance(executor, EmbeddedIPythonCodeExecutor) - config = {"executor": cls()} executor = CodeExecutorFactory.create(config) assert executor is config["executor"]