From 070a97ceaa17efcb769b2cee66b02167b5f22fb6 Mon Sep 17 00:00:00 2001 From: peterychang <49209570+peterychang@users.noreply.github.com> Date: Fri, 26 Jul 2024 18:37:34 -0400 Subject: [PATCH] make code execution async (#219) * make code execution async * python 3.10 does not support asyncio.timeout() * make code execution cancellable * make code execution async * python 3.10 does not support asyncio.timeout() * make code execution cancellable * make entire callstack for code_executor async * Update python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py Co-authored-by: Jack Gerrits * fix variable description * remove unnecessary code * fix usage of execute_code_blocks * fix usage of execute_code_blocks --------- Co-authored-by: Jack Gerrits Co-authored-by: Eric Zhu --- .../HumanEval/Templates/TwoAgents/scenario.py | 6 +- python/samples/patterns/coder_executor.py | 4 +- .../agnext/components/code_executor/_base.py | 2 +- .../_impl/local_commandline_code_executor.py | 94 ++++++++++++------- .../components/tools/_code_execution.py | 9 +- .../team-one/src/team_one/agents/coder.py | 7 +- .../test_commandline_code_executor.py | 30 +++--- .../execution/test_user_defined_functions.py | 30 +++--- 8 files changed, 100 insertions(+), 82 deletions(-) diff --git a/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py b/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py index 835d6c18d..cadd340a7 100644 --- a/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py +++ b/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py @@ -169,11 +169,7 @@ class Executor(TypeRoutedAgent): code = self._extract_execution_request(message.execution_request) if code is not None: execution_requests = [CodeBlock(code=code, language="python")] - future = asyncio.get_event_loop().run_in_executor( - None, self._executor.execute_code_blocks, execution_requests - ) - cancellation_token.link_future(future) - result = await future + result = await self._executor.execute_code_blocks(execution_requests) await self.publish_message( CodeExecutionResultMessage( output=result.output, diff --git a/python/samples/patterns/coder_executor.py b/python/samples/patterns/coder_executor.py index 5b77c8500..c64d60201 100644 --- a/python/samples/patterns/coder_executor.py +++ b/python/samples/patterns/coder_executor.py @@ -155,9 +155,7 @@ class Executor(TypeRoutedAgent): ) return # Execute code blocks. - future = asyncio.get_event_loop().run_in_executor(None, self._executor.execute_code_blocks, code_blocks) - cancellation_token.link_future(future) - result = await future + result = await self._executor.execute_code_blocks(code_blocks=code_blocks) # Publish the code execution result. await self.publish_message( CodeExecutionTaskResult(output=result.output, exit_code=result.exit_code, session_id=message.session_id), diff --git a/python/src/agnext/components/code_executor/_base.py b/python/src/agnext/components/code_executor/_base.py index 2d05629bb..c4b3c02de 100644 --- a/python/src/agnext/components/code_executor/_base.py +++ b/python/src/agnext/components/code_executor/_base.py @@ -27,7 +27,7 @@ class CodeResult: class CodeExecutor(Protocol): """Executes code blocks and returns the result.""" - def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CodeResult: + async def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CodeResult: """Execute code blocks and return the result. This method should be implemented by the code executor. diff --git a/python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py b/python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py index 194a1eace..aa10507c4 100644 --- a/python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py +++ b/python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py @@ -1,17 +1,18 @@ # File based from: https://github.com/microsoft/autogen/blob/main/autogen/coding/local_commandline_code_executor.py # Credit to original authors +import asyncio import logging -import subprocess import sys import warnings from hashlib import md5 from pathlib import Path from string import Template -from typing import Any, Callable, ClassVar, List, Sequence, Union +from typing import Any, Callable, ClassVar, List, Sequence, Union, Optional from typing_extensions import ParamSpec +from ....core import CancellationToken from .._base import CodeBlock, CodeExecutor from .._func_with_reqs import ( FunctionWithRequirements, @@ -21,6 +22,7 @@ from .._func_with_reqs import ( ) from .command_line_code_result import CommandLineCodeResult from .utils import PYTHON_VARIANTS, get_file_name_from_content, lang_to_cmd, silence_pip # type: ignore +from ....core import CancellationToken __all__ = ("LocalCommandLineCodeExecutor",) @@ -75,7 +77,7 @@ $functions""" block. Args: - timeout (int): The timeout for code execution. Default is 60. + timeout (int): The timeout for the execution of any single code block. Default is 60. work_dir (str): The working directory for the code execution. If None, a default working directory will be used. The default working directory is the current directory ".". @@ -144,7 +146,7 @@ $functions""" """(Experimental) The working directory for the code execution.""" return self._work_dir - def _setup_functions(self) -> None: + async def _setup_functions(self, cancellation_token: Optional[CancellationToken]) -> None: func_file_content = build_python_functions_file(self._functions) func_file = self._work_dir / f"{self._functions_module}.py" func_file.write_text(func_file_content) @@ -156,46 +158,61 @@ $functions""" if len(required_packages) > 0: logging.info("Ensuring packages are installed in executor.") - cmd = [sys.executable, "-m", "pip", "install"] - cmd.extend(required_packages) + cmd_args = ["-m", "pip", "install"] + cmd_args.extend(required_packages) - try: - result = subprocess.run( - cmd, + task = asyncio.create_task( + asyncio.create_subprocess_exec( + sys.executable, + *cmd_args, cwd=self._work_dir, - capture_output=True, - text=True, - timeout=float(self._timeout), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - except subprocess.TimeoutExpired as e: + ) + if cancellation_token: + cancellation_token.link_future(task) + try: + proc = await task + stdout, stderr = await asyncio.wait_for(proc.communicate(), self._timeout) + except asyncio.TimeoutError as e: raise ValueError("Pip install timed out") from e + except asyncio.CancelledError as e: + raise ValueError("Pip install was cancelled") from e - if result.returncode != 0: - raise ValueError(f"Pip install failed. {result.stdout}, {result.stderr}") + if proc.returncode is not None and proc.returncode != 0: + raise ValueError(f"Pip install failed. {stdout.decode()}, {stderr.decode()}") # Attempt to load the function file to check for syntax errors, imports etc. - exec_result = self._execute_code_dont_check_setup([CodeBlock(code=func_file_content, language="python")]) + exec_result = await self._execute_code_dont_check_setup( + [CodeBlock(code=func_file_content, language="python")], cancellation_token + ) if exec_result.exit_code != 0: raise ValueError(f"Functions failed to load: {exec_result.output}") self._setup_functions_complete = True - def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult: + async def execute_code_blocks( + self, code_blocks: List[CodeBlock], cancellation_token: Optional[CancellationToken] = None + ) -> CommandLineCodeResult: """(Experimental) Execute the code blocks and return the result. Args: code_blocks (List[CodeBlock]): The code blocks to execute. + cancellation_token (CancellationToken|None): an optional token to cancel the operation Returns: CommandLineCodeResult: The result of the code execution.""" if not self._setup_functions_complete: - self._setup_functions() + await self._setup_functions(cancellation_token) - return self._execute_code_dont_check_setup(code_blocks) + return await self._execute_code_dont_check_setup(code_blocks, cancellation_token) - def _execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult: + async def _execute_code_dont_check_setup( + self, code_blocks: List[CodeBlock], cancellation_token: Optional[CancellationToken] + ) -> CommandLineCodeResult: logs_all: str = "" file_names: List[Path] = [] exitcode = 0 @@ -235,25 +252,38 @@ $functions""" file_names.append(written_file) program = sys.executable if lang.startswith("python") else lang_to_cmd(lang) - cmd = [program, str(written_file.absolute())] - - try: - result = subprocess.run( - cmd, + # Wrap in a task to make it cancellable + task = asyncio.create_task( + asyncio.create_subprocess_exec( + program, + str(written_file.absolute()), cwd=self._work_dir, - capture_output=True, - text=True, - timeout=float(self._timeout), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - except subprocess.TimeoutExpired: + ) + if cancellation_token: + cancellation_token.link_future(task) + try: + proc = await task + stdout, stderr = await asyncio.wait_for(proc.communicate(), self._timeout) + exitcode = proc.returncode or 0 + + except asyncio.TimeoutError: logs_all += "\n Timeout" # Same exit code as the timeout command on linux. exitcode = 124 break + except asyncio.CancelledError: + logs_all += "\n Cancelled" + # TODO: which exit code? 125 is Operation Canceled + exitcode = 125 + break - logs_all += result.stderr - logs_all += result.stdout - exitcode = result.returncode + self._running_cmd_task = None + + logs_all += stderr.decode() + logs_all += stdout.decode() if exitcode != 0: break diff --git a/python/src/agnext/components/tools/_code_execution.py b/python/src/agnext/components/tools/_code_execution.py index a959686ee..6f34f9247 100644 --- a/python/src/agnext/components/tools/_code_execution.py +++ b/python/src/agnext/components/tools/_code_execution.py @@ -1,6 +1,3 @@ -import asyncio -import functools - from pydantic import BaseModel, Field, model_serializer from ...core import CancellationToken @@ -28,10 +25,6 @@ class PythonCodeExecutionTool(BaseTool[CodeExecutionInput, CodeExecutionResult]) async def run(self, args: CodeExecutionInput, cancellation_token: CancellationToken) -> CodeExecutionResult: code_blocks = [CodeBlock(code=args.code, language="python")] - future = asyncio.get_event_loop().run_in_executor( - None, functools.partial(self._executor.execute_code_blocks, code_blocks=code_blocks) - ) - cancellation_token.link_future(future) - result = await future + result = await self._executor.execute_code_blocks(code_blocks=code_blocks) return CodeExecutionResult(success=result.exit_code == 0, output=result.output) diff --git a/python/teams/team-one/src/team_one/agents/coder.py b/python/teams/team-one/src/team_one/agents/coder.py index 7f8752839..03d64687f 100644 --- a/python/teams/team-one/src/team_one/agents/coder.py +++ b/python/teams/team-one/src/team_one/agents/coder.py @@ -1,4 +1,3 @@ -import asyncio import re from typing import List, Optional, Tuple, Union @@ -81,11 +80,7 @@ class Executor(BaseAgent): code = self._extract_execution_request(message_content_to_str(message.content)) if code is not None: execution_requests = [CodeBlock(code=code, language="python")] - future = asyncio.get_event_loop().run_in_executor( - None, self._executor.execute_code_blocks, execution_requests - ) - cancellation_token.link_future(future) - result = await future + result = await self._executor.execute_code_blocks(execution_requests) if result.output.strip() == "": # Sometimes agents forget to print(). Remind the to print something diff --git a/python/tests/execution/test_commandline_code_executor.py b/python/tests/execution/test_commandline_code_executor.py index 672ac3440..f49a0033b 100644 --- a/python/tests/execution/test_commandline_code_executor.py +++ b/python/tests/execution/test_commandline_code_executor.py @@ -12,15 +12,15 @@ UNIX_SHELLS = ["bash", "sh", "shell"] WINDOWS_SHELLS = ["ps1", "pwsh", "powershell"] PYTHON_VARIANTS = ["python", "Python", "py"] - -def test_execute_code() -> None: +@pytest.mark.asyncio +async def test_execute_code() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) # Test single code block. code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")] - code_result = executor.execute_code_blocks(code_blocks) + code_result = await executor.execute_code_blocks(code_blocks) assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None # Test multiple code blocks. @@ -28,7 +28,7 @@ def test_execute_code() -> None: CodeBlock(code="import sys; print('hello world!')", language="python"), CodeBlock(code="a = 100 + 100; print(a)", language="python"), ] - code_result = executor.execute_code_blocks(code_blocks) + code_result = await executor.execute_code_blocks(code_blocks) assert ( code_result.exit_code == 0 and "hello world!" in code_result.output @@ -39,13 +39,13 @@ def test_execute_code() -> None: # Test bash script. if sys.platform not in ["win32"]: code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")] - code_result = executor.execute_code_blocks(code_blocks) + code_result = await executor.execute_code_blocks(code_blocks) assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None # Test running code. file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"] code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")] - code_result = executor.execute_code_blocks(code_blocks) + code_result = await executor.execute_code_blocks(code_blocks) assert ( code_result.exit_code == 0 and "hello world!" in code_result.output @@ -59,12 +59,12 @@ def test_execute_code() -> None: for file_line, code_line in zip(file_lines, code_lines): assert file_line.strip() == code_line.strip() - -def test_commandline_code_executor_timeout() -> None: +@pytest.mark.asyncio +async def test_commandline_code_executor_timeout() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor(timeout=1, work_dir=temp_dir) code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")] - code_result = executor.execute_code_blocks(code_blocks) + code_result = await executor.execute_code_blocks(code_blocks) assert code_result.exit_code and "Timeout" in code_result.output @@ -75,18 +75,18 @@ def test_local_commandline_code_executor_restart() -> None: - -def test_invalid_relative_path() -> None: +@pytest.mark.asyncio +async def test_invalid_relative_path() -> None: executor = LocalCommandLineCodeExecutor() code = """# filename: /tmp/test.py print("hello world") """ - result = executor.execute_code_blocks([CodeBlock(code=code, language="python")]) + result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")]) assert result.exit_code == 1 and "Filename is not in the workspace" in result.output - -def test_valid_relative_path() -> None: +@pytest.mark.asyncio +async def test_valid_relative_path() -> None: with tempfile.TemporaryDirectory() as temp_dir_str: temp_dir = Path(temp_dir_str) executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) @@ -94,7 +94,7 @@ def test_valid_relative_path() -> None: print("hello world") """ - result = executor.execute_code_blocks([CodeBlock(code=code, language="python")]) + result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")]) assert result.exit_code == 0 assert "hello world" in result.output assert result.code_file is not None diff --git a/python/tests/execution/test_user_defined_functions.py b/python/tests/execution/test_user_defined_functions.py index f36edc72e..2176f0b65 100644 --- a/python/tests/execution/test_user_defined_functions.py +++ b/python/tests/execution/test_user_defined_functions.py @@ -47,7 +47,8 @@ def function_missing_reqs() -> "polars.DataFrame": return polars.DataFrame() -def test_can_load_function_with_reqs() -> None: +@pytest.mark.asyncio +async def test_can_load_function_with_reqs() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor( work_dir=temp_dir, functions=[load_data] @@ -59,7 +60,7 @@ import polars data = load_data() print(data['name'][0])""" - result = executor.execute_code_blocks( + result = await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ] @@ -68,7 +69,8 @@ print(data['name'][0])""" assert result.exit_code == 0 -def test_can_load_function() -> None: +@pytest.mark.asyncio +async def test_can_load_function() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor( work_dir=temp_dir, functions=[add_two_numbers] @@ -76,7 +78,7 @@ def test_can_load_function() -> None: code = f"""from {executor.functions_module} import add_two_numbers print(add_two_numbers(1, 2))""" - result = executor.execute_code_blocks( + result = await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ] @@ -85,7 +87,8 @@ print(add_two_numbers(1, 2))""" assert result.exit_code == 0 -def test_fails_for_function_incorrect_import() -> None: +@pytest.mark.asyncio +async def test_fails_for_function_incorrect_import() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor( work_dir=temp_dir, functions=[function_incorrect_import] @@ -94,14 +97,15 @@ def test_fails_for_function_incorrect_import() -> None: function_incorrect_import()""" with pytest.raises(ValueError): - executor.execute_code_blocks( + await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ] ) -def test_fails_for_function_incorrect_dep() -> None: +@pytest.mark.asyncio +async def test_fails_for_function_incorrect_dep() -> None: with tempfile.TemporaryDirectory() as temp_dir: executor = LocalCommandLineCodeExecutor( work_dir=temp_dir, functions=[function_incorrect_dep] @@ -110,7 +114,7 @@ def test_fails_for_function_incorrect_dep() -> None: function_incorrect_dep()""" with pytest.raises(ValueError): - executor.execute_code_blocks( + await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ] @@ -152,7 +156,8 @@ def add_two_numbers(a: int, b: int) -> int: ) -def test_can_load_str_function_with_reqs() -> None: +@pytest.mark.asyncio +async def test_can_load_str_function_with_reqs() -> None: with tempfile.TemporaryDirectory() as temp_dir: func = FunctionWithRequirements.from_str( ''' @@ -166,7 +171,7 @@ def add_two_numbers(a: int, b: int) -> int: code = f"""from {executor.functions_module} import add_two_numbers print(add_two_numbers(1, 2))""" - result = executor.execute_code_blocks( + result = await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ] @@ -187,7 +192,8 @@ invaliddef add_two_numbers(a: int, b: int) -> int: ) -def test_cant_run_broken_str_function_with_reqs() -> None: +@pytest.mark.asyncio +async def test_cant_run_broken_str_function_with_reqs() -> None: with tempfile.TemporaryDirectory() as temp_dir: func = FunctionWithRequirements.from_str( ''' @@ -201,7 +207,7 @@ def add_two_numbers(a: int, b: int) -> int: code = f"""from {executor.functions_module} import add_two_numbers print(add_two_numbers(object(), False))""" - result = executor.execute_code_blocks( + result = await executor.execute_code_blocks( code_blocks=[ CodeBlock(language="python", code=code), ]