mirror of
https://github.com/microsoft/autogen.git
synced 2026-02-12 12:34:55 -05:00
Add function and code execution (#34)
* WIP code execution * add tests, reorganize * fix polars test * credit statements * attributions
This commit is contained in:
105
tests/execution/test_commandline_code_executor.py
Normal file
105
tests/execution/test_commandline_code_executor.py
Normal file
@@ -0,0 +1,105 @@
|
||||
# File based from: https://github.com/microsoft/autogen/blob/main/test/coding/test_commandline_code_executor.py
|
||||
# Credit to original authors
|
||||
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agnext.agent_components.code_executor import LocalCommandLineCodeExecutor, CodeBlock
|
||||
|
||||
UNIX_SHELLS = ["bash", "sh", "shell"]
|
||||
WINDOWS_SHELLS = ["ps1", "pwsh", "powershell"]
|
||||
PYTHON_VARIANTS = ["python", "Python", "py"]
|
||||
|
||||
|
||||
def test_execute_code() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
|
||||
|
||||
|
||||
# Test single code block.
|
||||
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
|
||||
code_result = executor.execute_code_blocks(code_blocks)
|
||||
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
|
||||
|
||||
# Test multiple code blocks.
|
||||
code_blocks = [
|
||||
CodeBlock(code="import sys; print('hello world!')", language="python"),
|
||||
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
|
||||
]
|
||||
code_result = executor.execute_code_blocks(code_blocks)
|
||||
assert (
|
||||
code_result.exit_code == 0
|
||||
and "hello world!" in code_result.output
|
||||
and "200" in code_result.output
|
||||
and code_result.code_file is not None
|
||||
)
|
||||
|
||||
# Test bash script.
|
||||
if sys.platform not in ["win32"]:
|
||||
code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")]
|
||||
code_result = executor.execute_code_blocks(code_blocks)
|
||||
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
|
||||
|
||||
# Test running code.
|
||||
file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
|
||||
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
|
||||
code_result = executor.execute_code_blocks(code_blocks)
|
||||
assert (
|
||||
code_result.exit_code == 0
|
||||
and "hello world!" in code_result.output
|
||||
and "200" in code_result.output
|
||||
and code_result.code_file is not None
|
||||
)
|
||||
|
||||
# Check saved code file.
|
||||
with open(code_result.code_file) as f:
|
||||
code_lines = f.readlines()
|
||||
for file_line, code_line in zip(file_lines, code_lines):
|
||||
assert file_line.strip() == code_line.strip()
|
||||
|
||||
|
||||
def test_commandline_code_executor_timeout() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(timeout=1, work_dir=temp_dir)
|
||||
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
|
||||
code_result = executor.execute_code_blocks(code_blocks)
|
||||
assert code_result.exit_code and "Timeout" in code_result.output
|
||||
|
||||
|
||||
def test_local_commandline_code_executor_restart() -> None:
|
||||
executor = LocalCommandLineCodeExecutor()
|
||||
with pytest.warns(UserWarning, match=r".*No action is taken."):
|
||||
executor.restart()
|
||||
|
||||
|
||||
|
||||
|
||||
def test_invalid_relative_path() -> None:
|
||||
executor = LocalCommandLineCodeExecutor()
|
||||
code = """# filename: /tmp/test.py
|
||||
|
||||
print("hello world")
|
||||
"""
|
||||
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
|
||||
assert result.exit_code == 1 and "Filename is not in the workspace" in result.output
|
||||
|
||||
|
||||
def test_valid_relative_path() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir_str:
|
||||
temp_dir = Path(temp_dir_str)
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
|
||||
code = """# filename: test.py
|
||||
|
||||
print("hello world")
|
||||
"""
|
||||
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
|
||||
assert result.exit_code == 0
|
||||
assert "hello world" in result.output
|
||||
assert result.code_file is not None
|
||||
assert "test.py" in result.code_file
|
||||
assert (temp_dir / Path("test.py")).resolve() == Path(result.code_file).resolve()
|
||||
assert (temp_dir / Path("test.py")).exists()
|
||||
|
||||
118
tests/execution/test_markdown_code_extractor.py
Normal file
118
tests/execution/test_markdown_code_extractor.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# File based from: https://github.com/microsoft/autogen/blob/main/test/coding/test_markdown_code_extractor.py
|
||||
# Credit to original authors
|
||||
|
||||
from agnext.agent_components.code_executor import MarkdownCodeExtractor
|
||||
|
||||
_message_1 = """
|
||||
Example:
|
||||
```
|
||||
print("hello extract code")
|
||||
```
|
||||
"""
|
||||
|
||||
_message_2 = """Example:
|
||||
```python
|
||||
def scrape(url):
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
response = requests.get(url)
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
title = soup.find("title").text
|
||||
text = soup.find("div", {"id": "bodyContent"}).text
|
||||
return title, text
|
||||
```
|
||||
Test:
|
||||
```python
|
||||
url = "https://en.wikipedia.org/wiki/Web_scraping"
|
||||
title, text = scrape(url)
|
||||
print(f"Title: {title}")
|
||||
print(f"Text: {text}")
|
||||
```
|
||||
"""
|
||||
|
||||
_message_3 = """
|
||||
Example:
|
||||
```python
|
||||
def scrape(url):
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
response = requests.get(url)
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
title = soup.find("title").text
|
||||
text = soup.find("div", {"id": "bodyContent"}).text
|
||||
return title, text
|
||||
```
|
||||
"""
|
||||
|
||||
_message_4 = """
|
||||
Example:
|
||||
``` python
|
||||
def scrape(url):
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
response = requests.get(url)
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
title = soup.find("title").text
|
||||
text = soup.find("div", {"id": "bodyContent"}).text
|
||||
return title, text
|
||||
```
|
||||
""".replace(
|
||||
"\n", "\r\n"
|
||||
)
|
||||
|
||||
_message_5 = """
|
||||
Test bash script:
|
||||
```bash
|
||||
echo 'hello world!'
|
||||
```
|
||||
"""
|
||||
|
||||
_message_6 = """
|
||||
Test some C# code, expecting ""
|
||||
```
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
|
||||
namespace ConsoleApplication1
|
||||
{
|
||||
class Program
|
||||
{
|
||||
static void Main(string[] args)
|
||||
{
|
||||
Console.WriteLine("Hello World");
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
"""
|
||||
|
||||
_message_7 = """
|
||||
Test some message that has no code block.
|
||||
"""
|
||||
|
||||
|
||||
def test_extract_code() -> None:
|
||||
extractor = MarkdownCodeExtractor()
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_1)
|
||||
assert len(code_blocks) == 1 and code_blocks[0].language == "python"
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_2)
|
||||
assert len(code_blocks) == 2 and code_blocks[0].language == "python" and code_blocks[1].language == "python"
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_3)
|
||||
assert len(code_blocks) == 1 and code_blocks[0].language == "python"
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_4)
|
||||
assert len(code_blocks) == 1 and code_blocks[0].language == "python"
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_5)
|
||||
assert len(code_blocks) == 1 and code_blocks[0].language == "bash"
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_6)
|
||||
assert len(code_blocks) == 1 and code_blocks[0].language == ""
|
||||
|
||||
code_blocks = extractor.extract_code_blocks(_message_7)
|
||||
assert len(code_blocks) == 0
|
||||
199
tests/execution/test_user_defined_functions.py
Normal file
199
tests/execution/test_user_defined_functions.py
Normal file
@@ -0,0 +1,199 @@
|
||||
# File based from: https://github.com/microsoft/autogen/blob/main/test/coding/test_user_defined_functions.py
|
||||
# Credit to original authors
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from agnext.agent_components.code_executor import LocalCommandLineCodeExecutor, CodeBlock
|
||||
from agnext.agent_components.func_with_reqs import FunctionWithRequirements, with_requirements
|
||||
|
||||
import polars
|
||||
|
||||
def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
return a + b
|
||||
|
||||
|
||||
@with_requirements(python_packages=["polars"], global_imports=["polars"])
|
||||
def load_data() -> polars.DataFrame:
|
||||
"""Load some sample data.
|
||||
|
||||
Returns:
|
||||
polars.DataFrame: A DataFrame with the following columns: name(str), location(str), age(int)
|
||||
"""
|
||||
data = {
|
||||
"name": ["John", "Anna", "Peter", "Linda"],
|
||||
"location": ["New York", "Paris", "Berlin", "London"],
|
||||
"age": [24, 13, 53, 33],
|
||||
}
|
||||
return polars.DataFrame(data)
|
||||
|
||||
|
||||
@with_requirements(global_imports=["NOT_A_REAL_PACKAGE"])
|
||||
def function_incorrect_import() -> "polars.DataFrame":
|
||||
return polars.DataFrame()
|
||||
|
||||
|
||||
@with_requirements(python_packages=["NOT_A_REAL_PACKAGE"])
|
||||
def function_incorrect_dep() -> "polars.DataFrame":
|
||||
return polars.DataFrame()
|
||||
|
||||
|
||||
def function_missing_reqs() -> "polars.DataFrame":
|
||||
return polars.DataFrame()
|
||||
|
||||
|
||||
def test_can_load_function_with_reqs() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[load_data])
|
||||
code = f"""from {executor.functions_module} import load_data
|
||||
import polars
|
||||
|
||||
# Get first row's name
|
||||
data = load_data()
|
||||
print(data['name'][0])"""
|
||||
|
||||
result = executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
assert result.output == "John\n"
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_can_load_function() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[add_two_numbers])
|
||||
code = f"""from {executor.functions_module} import add_two_numbers
|
||||
print(add_two_numbers(1, 2))"""
|
||||
|
||||
result = executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
assert result.output == "3\n"
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_fails_for_function_incorrect_import() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[function_incorrect_import])
|
||||
code = f"""from {executor.functions_module} import function_incorrect_import
|
||||
function_incorrect_import()"""
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_fails_for_function_incorrect_dep() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[function_incorrect_dep])
|
||||
code = f"""from {executor.functions_module} import function_incorrect_dep
|
||||
function_incorrect_dep()"""
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
|
||||
def test_formatted_prompt() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[add_two_numbers])
|
||||
|
||||
result = executor.format_functions_for_prompt()
|
||||
assert (
|
||||
'''def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
'''
|
||||
in result
|
||||
)
|
||||
|
||||
|
||||
def test_formatted_prompt_str_func() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
func = FunctionWithRequirements.from_str(
|
||||
'''
|
||||
def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
return a + b
|
||||
'''
|
||||
)
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[func])
|
||||
|
||||
result = executor.format_functions_for_prompt()
|
||||
assert (
|
||||
'''def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
'''
|
||||
in result
|
||||
)
|
||||
|
||||
|
||||
|
||||
def test_can_load_str_function_with_reqs() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
func = FunctionWithRequirements.from_str(
|
||||
'''
|
||||
def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
return a + b
|
||||
'''
|
||||
)
|
||||
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[func])
|
||||
code = f"""from {executor.functions_module} import add_two_numbers
|
||||
print(add_two_numbers(1, 2))"""
|
||||
|
||||
result = executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
assert result.output == "3\n"
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_cant_load_broken_str_function_with_reqs() -> None:
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
_ = FunctionWithRequirements.from_str(
|
||||
'''
|
||||
invaliddef add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
return a + b
|
||||
'''
|
||||
)
|
||||
|
||||
|
||||
def test_cant_run_broken_str_function_with_reqs() -> None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
func = FunctionWithRequirements.from_str(
|
||||
'''
|
||||
def add_two_numbers(a: int, b: int) -> int:
|
||||
"""Add two numbers together."""
|
||||
return a + b
|
||||
'''
|
||||
)
|
||||
|
||||
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir, functions=[func])
|
||||
code = f"""from {executor.functions_module} import add_two_numbers
|
||||
print(add_two_numbers(object(), False))"""
|
||||
|
||||
result = executor.execute_code_blocks(
|
||||
code_blocks=[
|
||||
CodeBlock(language="python", code=code),
|
||||
]
|
||||
)
|
||||
assert "TypeError: unsupported operand type(s) for +:" in result.output
|
||||
assert result.exit_code == 1
|
||||
Reference in New Issue
Block a user