feat(server): more complicated blocks

This commit is contained in:
Nicholas Tindle
2024-08-14 21:42:04 -05:00
parent 9198a86c0e
commit 05c9931c11

View File

@@ -1,26 +1,26 @@
import enum
import io
import logging
import sys
import json
import multiprocessing
import subprocess
import time
import venv
import os
import tempfile
import shutil
import time
from typing import Any, Union, Dict, List
from abc import ABC, abstractmethod
from autogpt_server.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from autogpt_server.data.model import SchemaField
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ExecutionMode(enum.Enum):
FAST = "fast"
FLEXIBLE = "flexible"
class BasePythonExecutionBlock(Block, ABC):
class FastPythonExecutionBlock(Block):
class Input(BlockSchema):
code: str = SchemaField(
description="Python code to execute", placeholder="print(f'Hello, {name}!')"
@@ -38,32 +38,14 @@ class BasePythonExecutionBlock(Block, ABC):
result: str = SchemaField(description="Execution result or output")
error: str = SchemaField(description="Error message if execution failed")
def __init__(self, block_id: str, description: str) -> None:
def __init__(self):
super().__init__(
id=block_id,
description=description,
id="ffb7dd8e-8a9e-42cd-a620-dc58a6c78d8c",
description="This block executes Python code quickly using multiprocessing.",
categories={BlockCategory.BASIC},
input_schema=self.Input,
output_schema=self.Output,
)
self.allowed_packages = set(
[
"pandas",
"numpy",
"matplotlib",
"scipy",
"sklearn",
"requests",
"beautifulsoup4",
"lxml",
"pyyaml",
"jinja2",
]
)
@abstractmethod
def execute_code(self, code: str, args: Any, timeout: float) -> str:
pass
def run(self, input_data: Input) -> BlockOutput:
try:
@@ -74,14 +56,6 @@ class BasePythonExecutionBlock(Block, ABC):
except Exception as e:
yield "error", str(e)
class FastPythonExecutionBlock(BasePythonExecutionBlock):
def __init__(self) -> None:
super().__init__(
block_id="ffb7dd8e-8a9e-42cd-a620-dc58a6c78d8c",
description="This block executes Python code quickly using multiprocessing.",
)
def execute_code(self, code: str, args: Any, timeout: float) -> str:
result_queue = multiprocessing.Queue()
process = multiprocessing.Process(
@@ -119,14 +93,62 @@ class FastPythonExecutionBlock(BasePythonExecutionBlock):
sys.stdout = sys.__stdout__
class FlexiblePythonExecutionBlock(BasePythonExecutionBlock):
def __init__(self) -> None:
class FlexiblePythonExecutionBlock(Block):
class Input(BlockSchema):
code: str = SchemaField(
description="Python code to execute", placeholder="print(f'Hello, {name}!')"
)
args: Union[Dict[str, Any], List[Dict[str, Any]], str] = SchemaField(
description="Arguments to pass to the code. Can be a dictionary, list of dictionaries, or a JSON string.",
default={},
placeholder='{"name": "World", "number": 42}',
)
timeout: float = SchemaField(
description="Execution timeout in seconds", default=30.0
)
dependencies: List[str] = SchemaField(
description="List of Python packages to install",
default=[],
)
class Output(BlockSchema):
result: str = SchemaField(description="Execution result or output")
error: str = SchemaField(description="Error message if execution failed")
def __init__(self):
super().__init__(
block_id="96e5a653-6b3b-46c3-87ed-56a7ff098f28",
id="96e5a653-6b3b-46c3-87ed-56a7ff098f28",
description="This block executes Python code with dynamic dependencies using virtual environments.",
categories={BlockCategory.BASIC},
input_schema=self.Input,
output_schema=self.Output,
)
self.venv_path = tempfile.mkdtemp()
self.create_venv()
self.allowed_packages = set(
[
"pandas",
"numpy",
"matplotlib",
"scipy",
"sklearn",
"requests",
"beautifulsoup4",
"lxml",
"pyyaml",
"jinja2",
]
)
def run(self, input_data: Input) -> BlockOutput:
try:
self.install_dependencies(input_data.dependencies)
result = self.execute_code(
input_data.code, input_data.args, input_data.timeout
)
yield "result", result
except Exception as e:
yield "error", str(e)
def create_venv(self):
venv.create(self.venv_path, with_pip=True)
@@ -166,74 +188,67 @@ class FlexiblePythonExecutionBlock(BasePythonExecutionBlock):
shutil.rmtree(self.venv_path, ignore_errors=True)
class UnifiedPythonExecutionBlock(BasePythonExecutionBlock):
class Input(BasePythonExecutionBlock.Input):
mode: str = SchemaField(
description="Execution mode: 'fast' or 'flexible'", default="fast"
class UnifiedPythonExecutionBlock(Block):
class Input(BlockSchema):
code: str = SchemaField(
description="Python code to execute", placeholder="print(f'Hello, {name}!')"
)
args: Union[Dict[str, Any], List[Dict[str, Any]], str] = SchemaField(
description="Arguments to pass to the code. Can be a dictionary, list of dictionaries, or a JSON string.",
default={},
placeholder='{"name": "World", "number": 42}',
)
timeout: float = SchemaField(
description="Execution timeout in seconds", default=30.0
)
mode: ExecutionMode = SchemaField(
description="Execution mode: 'fast' or 'flexible'",
default=ExecutionMode.FAST,
)
dependencies: List[str] = SchemaField(
description="List of Python packages to install (only for 'flexible' mode)",
default=[],
)
class Output(BasePythonExecutionBlock.Output):
class Output(BlockSchema):
result: str = SchemaField(description="Execution result or output")
error: str = SchemaField(description="Error message if execution failed")
execution_time: float = SchemaField(description="Execution time in seconds")
memory_usage: float = SchemaField(description="Peak memory usage in MB")
def __init__(self) -> None:
def __init__(self):
super().__init__(
block_id="d72ba52d-45ee-4c3b-baa0-75bafcae0183",
id="d72ba52d-45ee-4c3b-baa0-75bafcae0183",
description="This block executes Python code in either 'fast' or 'flexible' mode, with optional dependencies.",
categories={BlockCategory.BASIC},
input_schema=self.Input,
output_schema=self.Output,
)
self.fast_executor = FastPythonExecutionBlock()
self.flexible_executor = FlexiblePythonExecutionBlock()
def execute_code(self, code: str, args: Any, timeout: float) -> str:
self.start_time = time.time()
peak_memory = 0
try:
if self.input_data.mode == "fast":
result = self.fast_executor.execute_code(code, args, timeout)
elif self.input_data.mode == "flexible":
self.flexible_executor.install_dependencies(
self.input_data.dependencies
)
result = self.flexible_executor.execute_code(code, args, timeout)
else:
raise ValueError(f"Invalid mode: {self.input_data.mode}")
peak_memory = self.get_peak_memory_usage()
return result
finally:
execution_time = time.time() - self.start_time
logger.info(f"Execution completed in {execution_time:.2f} seconds")
logger.info(f"Peak memory usage: {peak_memory:.2f} MB")
def run(self, input_data: Input) -> BlockOutput:
self.input_data = input_data
self.validate_input()
start_time = time.time()
try:
result = self.execute_code(
input_data.code, input_data.args, input_data.timeout
)
if input_data.mode == ExecutionMode.FAST:
result = self.fast_executor.execute_code(
input_data.code, input_data.args, input_data.timeout
)
elif input_data.mode == ExecutionMode.FLEXIBLE:
self.flexible_executor.install_dependencies(input_data.dependencies)
result = self.flexible_executor.execute_code(
input_data.code, input_data.args, input_data.timeout
)
else:
raise ValueError(f"Invalid mode: {input_data.mode}")
yield "result", result
yield "execution_time", time.time() - self.start_time
yield "execution_time", time.time() - start_time
yield "memory_usage", self.get_peak_memory_usage()
except Exception as e:
logger.error(f"Error during execution: {str(e)}")
yield "error", str(e)
def validate_input(self):
if self.input_data.mode not in ["fast", "flexible"]:
raise ValueError(f"Invalid mode: {self.input_data.mode}")
if self.input_data.mode == "flexible":
for pkg in self.input_data.dependencies:
if pkg not in self.allowed_packages:
raise ValueError(f"Package '{pkg}' is not in the allowed list.")
yield "execution_time", time.time() - start_time
yield "memory_usage", self.get_peak_memory_usage()
@staticmethod
def get_peak_memory_usage():