From bcec0502d705755ee14efc492fa9941d4b70db37 Mon Sep 17 00:00:00 2001 From: gagb Date: Tue, 2 Jul 2024 16:18:48 -0700 Subject: [PATCH] Numerous fixes for agbench (#170) * Shift to new runtime API * Add pretty printing * Reformat * Fix linting errors --- python/benchmarks/.gitignore | 2 + .../HumanEval/Templates/TeamOne/scenario.py | 54 ++++++-- .../HumanEval/Templates/TwoAgents/scenario.py | 130 +++++++++++++----- .../src/team_one/agents/orchestrator.py | 35 ++++- .../teams/team-one/src/team_one/messages.py | 6 + 5 files changed, 177 insertions(+), 50 deletions(-) create mode 100644 python/benchmarks/.gitignore diff --git a/python/benchmarks/.gitignore b/python/benchmarks/.gitignore new file mode 100644 index 000000000..fc9b39c1e --- /dev/null +++ b/python/benchmarks/.gitignore @@ -0,0 +1,2 @@ +*/Results/ +*/Tasks/ \ No newline at end of file diff --git a/python/benchmarks/HumanEval/Templates/TeamOne/scenario.py b/python/benchmarks/HumanEval/Templates/TeamOne/scenario.py index 01464ece7..7b14453f4 100644 --- a/python/benchmarks/HumanEval/Templates/TeamOne/scenario.py +++ b/python/benchmarks/HumanEval/Templates/TeamOne/scenario.py @@ -1,30 +1,36 @@ import asyncio -#from typing import Any, Dict, List, Tuple, Union +import logging + +# from typing import Any, Dict, List, Tuple, Union from agnext.application import SingleThreadedAgentRuntime from agnext.components.models import ( AzureOpenAIChatCompletionClient, - LLMMessage, ModelCapabilities, UserMessage, ) from agnext.components.code_executor import LocalCommandLineCodeExecutor +from agnext.application.logging import EVENT_LOGGER_NAME + from team_one.agents.coder import Coder, Executor from team_one.agents.orchestrator import RoundRobinOrchestrator -from team_one.messages import BroadcastMessage +from team_one.messages import BroadcastMessage, OrchestrationEvent + async def main() -> None: # Create the runtime. runtime = SingleThreadedAgentRuntime() # Create the AzureOpenAI client, with AAD auth - #token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + # token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") client = AzureOpenAIChatCompletionClient( api_version="2024-02-15-preview", azure_endpoint="https://aif-complex-tasks-west-us-3.openai.azure.com/", model="gpt-4o-2024-05-13", - model_capabilities=ModelCapabilities(function_calling=True, json_output=True, vision=True), - #azure_ad_token_provider=token_provider + model_capabilities=ModelCapabilities( + function_calling=True, json_output=True, vision=True + ), + # azure_ad_token_provider=token_provider ) # Register agents. @@ -34,7 +40,9 @@ async def main() -> None: ) executor = runtime.register_and_get_proxy( "Executor", - lambda: Executor("A agent for executing code", executor=LocalCommandLineCodeExecutor()) + lambda: Executor( + "A agent for executing code", executor=LocalCommandLineCodeExecutor() + ), ) runtime.register("orchestrator", lambda: RoundRobinOrchestrator([coder, executor])) @@ -43,7 +51,7 @@ async def main() -> None: with open("prompt.txt", "rt") as fh: prompt = fh.read() - entry_point = "__ENTRY_POINT__" + entry_point = "__ENTRY_POINT__" task = f""" The following python code imports the `run_tests` function from unit_tests.py, and runs @@ -64,16 +72,32 @@ run_tests({entry_point}) ``` """.strip() + run_context = runtime.start() - await runtime.publish_message(BroadcastMessage(content=UserMessage(content=task, source="human")), namespace="default") + await runtime.publish_message( + BroadcastMessage(content=UserMessage(content=task, source="human")), + namespace="default", + ) + + await run_context.stop_when_idle() + + +class MyHandler(logging.Handler): + def __init__(self) -> None: + super().__init__() + + def emit(self, record: logging.LogRecord) -> None: + try: + if isinstance(record.msg, OrchestrationEvent): + print(f"[{record.msg.timestamp}]: {record.msg.message}", flush=True) + except Exception: + self.handleError(record) - # Run the runtime until the task is completed. - await runtime.process_until_idle() if __name__ == "__main__": - import logging - logging.basicConfig(level=logging.WARNING) - logging.getLogger("agnext").setLevel(logging.DEBUG) + logger = logging.getLogger(EVENT_LOGGER_NAME) + logger.setLevel(logging.INFO) + my_handler = MyHandler() + logger.handlers = [my_handler] asyncio.run(main()) - diff --git a/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py b/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py index 72bbda2ed..cbb1e2386 100644 --- a/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py +++ b/python/benchmarks/HumanEval/Templates/TwoAgents/scenario.py @@ -7,7 +7,11 @@ from typing import Any, Dict, List, Tuple, Union from agnext.application import SingleThreadedAgentRuntime from agnext.components import FunctionCall, TypeRoutedAgent, message_handler -from agnext.components.code_executor import CodeBlock, CodeExecutor, LocalCommandLineCodeExecutor +from agnext.components.code_executor import ( + CodeBlock, + CodeExecutor, + LocalCommandLineCodeExecutor, +) from agnext.components.models import ( AssistantMessage, AzureOpenAIChatCompletionClient, @@ -23,30 +27,35 @@ from agnext.components.models import ( from agnext.components.tools import CodeExecutionResult, PythonCodeExecutionTool from agnext.core import AgentId, CancellationToken -#from azure.identity import DefaultAzureCredential, get_bearer_token_provider +# from azure.identity import DefaultAzureCredential, get_bearer_token_provider + @dataclass class TaskMessage: content: str + @dataclass class CodeExecutionRequestMessage: session_id: str execution_request: str + @dataclass class CodeExecutionResultMessage: session_id: str output: str exit_code: int + class Coder(TypeRoutedAgent): """An agent that uses tools to write, execute, and debug Python code.""" DEFAULT_DESCRIPTION = "A Python coder assistant." DEFAULT_SYSTEM_MESSAGES = [ - SystemMessage("""You are a helpful AI assistant. Solve tasks using your Python coding skills. The code you output must be formatted in Markdown code blocks demarcated by triple backticks (```). As an example: + SystemMessage( + """You are a helpful AI assistant. Solve tasks using your Python coding skills. The code you output must be formatted in Markdown code blocks demarcated by triple backticks (```). As an example: ```python @@ -62,7 +71,8 @@ The user cannot provide any feedback or perform any other action beyond executin Check the execution result returned by the user. If the result indicates there is an error, fix the error and output the code again. Suggest the full code instead of partial code or code changes -- code blocks must stand alone and be ready to execute without modification. If the error can't be fixed or if the task is not solved even after the code is executed successfully, analyze the problem, revisit your assumption, and think of a different approach to try. If the code has executed successfully, and the problem is stolved, reply "TERMINATE". -""") +""" + ) ] def __init__( @@ -85,60 +95,105 @@ If the code has executed successfully, and the problem is stolved, reply "TERMIN """Handle a user message, execute the model and tools, and returns the response.""" # Create a new session. session_id = str(uuid.uuid4()) - self._session_memory.setdefault(session_id, []).append(UserMessage(content=message.content, source="user")) + self._session_memory.setdefault(session_id, []).append( + UserMessage(content=message.content, source="user") + ) # Make an inference to the model. - response = await self._model_client.create(self._system_messages + self._session_memory[session_id]) + response = await self._model_client.create( + self._system_messages + self._session_memory[session_id] + ) assert isinstance(response.content, str) - self._session_memory[session_id].append(AssistantMessage(content=response.content, source=self.metadata["name"])) + self._session_memory[session_id].append( + AssistantMessage(content=response.content, source=self.metadata["name"]) + ) - await self.publish_message(CodeExecutionRequestMessage(execution_request=response.content, session_id=session_id), cancellation_token=cancellation_token) + await self.publish_message( + CodeExecutionRequestMessage( + execution_request=response.content, session_id=session_id + ), + cancellation_token=cancellation_token, + ) - @message_handler - async def handle_code_execution_result(self, message: CodeExecutionResultMessage, cancellation_token: CancellationToken) -> None: + async def handle_code_execution_result( + self, message: CodeExecutionResultMessage, cancellation_token: CancellationToken + ) -> None: execution_result = f"The script ran, then exited with Unix exit code: {message.exit_code}\nIts output was:\n{message.output}" # Store the code execution output. - self._session_memory[message.session_id].append(UserMessage(content=execution_result, source="user")) + self._session_memory[message.session_id].append( + UserMessage(content=execution_result, source="user") + ) # Count the number of rounds so far if self._max_turns is not None: - n_turns = sum(1 for message in self._session_memory[message.session_id] if isinstance(message, AssistantMessage)) + n_turns = sum( + 1 + for message in self._session_memory[message.session_id] + if isinstance(message, AssistantMessage) + ) if n_turns >= self._max_turns: return # Make an inference to the model. - response = await self._model_client.create(self._system_messages + self._session_memory[message.session_id]) + response = await self._model_client.create( + self._system_messages + self._session_memory[message.session_id] + ) assert isinstance(response.content, str) - self._session_memory[message.session_id].append(AssistantMessage(content=response.content, source=self.metadata["name"])) + self._session_memory[message.session_id].append( + AssistantMessage(content=response.content, source=self.metadata["name"]) + ) if "TERMINATE" in response.content: - return + return else: - await self.publish_message(CodeExecutionRequestMessage(execution_request=response.content, session_id=message.session_id), cancellation_token=cancellation_token) - + await self.publish_message( + CodeExecutionRequestMessage( + execution_request=response.content, session_id=message.session_id + ), + cancellation_token=cancellation_token, + ) + class Executor(TypeRoutedAgent): def __init__(self, description: str, executor: CodeExecutor) -> None: super().__init__(description) self._executor = executor - + @message_handler - async def handle_code_execution(self, message: CodeExecutionRequestMessage, cancellation_token: CancellationToken) -> None: + async def handle_code_execution( + self, + message: CodeExecutionRequestMessage, + cancellation_token: CancellationToken, + ) -> None: # Extract code block from the message. code = self._extract_execution_request(message.execution_request) if code is not None: execution_requests = [CodeBlock(code=code, language="python")] - future = asyncio.get_event_loop().run_in_executor(None, self._executor.execute_code_blocks, execution_requests) + future = asyncio.get_event_loop().run_in_executor( + None, self._executor.execute_code_blocks, execution_requests + ) cancellation_token.link_future(future) result = await future - await self.publish_message(CodeExecutionResultMessage(output=result.output, exit_code=result.exit_code, session_id=message.session_id)) + await self.publish_message( + CodeExecutionResultMessage( + output=result.output, + exit_code=result.exit_code, + session_id=message.session_id, + ) + ) else: - await self.publish_message(CodeExecutionResultMessage(output="No code block detected. Please provide a markdown-encoded code block to execute.", exit_code=1, session_id=message.session_id)) + await self.publish_message( + CodeExecutionResultMessage( + output="No code block detected. Please provide a markdown-encoded code block to execute.", + exit_code=1, + session_id=message.session_id, + ) + ) def _extract_execution_request(self, markdown_text: str) -> Union[str, None]: pattern = r"```(\w+)\n(.*?)\n```" @@ -149,18 +204,21 @@ class Executor(TypeRoutedAgent): return match.group(2) return None + async def main() -> None: # Create the runtime. runtime = SingleThreadedAgentRuntime() # Create the AzureOpenAI client, with AAD auth - #token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") + # token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") client = AzureOpenAIChatCompletionClient( api_version="2024-02-15-preview", azure_endpoint="https://aif-complex-tasks-west-us-3.openai.azure.com/", model="gpt-4o-2024-05-13", - model_capabilities=ModelCapabilities(function_calling=True, json_output=True, vision=True), - #azure_ad_token_provider=token_provider + model_capabilities=ModelCapabilities( + function_calling=True, json_output=True, vision=True + ), + # azure_ad_token_provider=token_provider ) # Register agents. @@ -170,16 +228,19 @@ async def main() -> None: ) runtime.register( "Executor", - lambda: Executor("A agent for executing code", executor=LocalCommandLineCodeExecutor()) + lambda: Executor( + "A agent for executing code", executor=LocalCommandLineCodeExecutor() + ), ) prompt = "" with open("prompt.txt", "rt") as fh: prompt = fh.read() - entry_point = "__ENTRY_POINT__" + entry_point = "__ENTRY_POINT__" - task = TaskMessage(f""" + task = TaskMessage( + f""" The following python code imports the `run_tests` function from unit_tests.py, and runs it on the function `{entry_point}`. This will run a set of automated unit tests to verify the correct implementation of `{entry_point}`. However, `{entry_point}` is only partially @@ -196,13 +257,15 @@ from unit_tests import run_tests # Run the unit tests run_tests({entry_point}) ``` -""".strip()) - - # Send a task to the tool user. - await runtime.send_message(task, coder) +""".strip() + ) # Run the runtime until the task is completed. - await runtime.process_until_idle() + run_context = runtime.start() + # Send a task to the tool user. + await runtime.send_message(task, coder) + await run_context.stop_when_idle() + if __name__ == "__main__": import logging @@ -210,4 +273,3 @@ if __name__ == "__main__": logging.basicConfig(level=logging.WARNING) logging.getLogger("agnext").setLevel(logging.DEBUG) asyncio.run(main()) - diff --git a/python/teams/team-one/src/team_one/agents/orchestrator.py b/python/teams/team-one/src/team_one/agents/orchestrator.py index cdb21ef00..56f89f5da 100644 --- a/python/teams/team-one/src/team_one/agents/orchestrator.py +++ b/python/teams/team-one/src/team_one/agents/orchestrator.py @@ -1,9 +1,15 @@ +import logging +from datetime import datetime from typing import List +from agnext.application.logging import EVENT_LOGGER_NAME from agnext.components import TypeRoutedAgent, message_handler +from agnext.components.models import AssistantMessage, UserMessage from agnext.core import AgentProxy, CancellationToken -from ..messages import BroadcastMessage, RequestReplyMessage +from ..messages import BroadcastMessage, OrchestrationEvent, RequestReplyMessage + +logger = logging.getLogger(EVENT_LOGGER_NAME + ".orchestrator") class RoundRobinOrchestrator(TypeRoutedAgent): @@ -19,12 +25,39 @@ class RoundRobinOrchestrator(TypeRoutedAgent): @message_handler async def handle_incoming_message(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None: """Handle an incoming message.""" + source = "Unknown" + if isinstance(message.content, UserMessage) or isinstance(message.content, AssistantMessage): + source = message.content.source + + assert isinstance(source, str) + + current_timestamp = datetime.now().isoformat() + logger.info( + OrchestrationEvent( + current_timestamp, + f""" +------------------------------------- +{source}: {message.content.content} +------------------------------------- +""", + ) + ) if self._num_rounds > 20: return next_agent = self._select_next_agent() request_reply_message = RequestReplyMessage() + # emit an event + + current_timestamp = datetime.now().isoformat() + logger.info( + OrchestrationEvent( + current_timestamp, + f"Orchestrator (thought): Next speaker {next_agent.metadata['name']}", + ) + ) + await self.send_message(request_reply_message, next_agent.id) self._num_rounds += 1 diff --git a/python/teams/team-one/src/team_one/messages.py b/python/teams/team-one/src/team_one/messages.py index 2cbf5a57b..df207ac98 100644 --- a/python/teams/team-one/src/team_one/messages.py +++ b/python/teams/team-one/src/team_one/messages.py @@ -11,3 +11,9 @@ class BroadcastMessage: @dataclass class RequestReplyMessage: pass + + +@dataclass +class OrchestrationEvent: + timestamp: str + message: str