From 2288aee72b4841b0295d3b6f84d7257b55673174 Mon Sep 17 00:00:00 2001 From: Eric Zhu Date: Tue, 23 Jul 2024 17:58:15 -0700 Subject: [PATCH] Adding example for tool intercept and human approval (#252) * Adding example for tool intercept * Format --- python/docs/src/core-concepts/ai-agents.md | 35 ++++++--- python/samples/README.md | 1 + .../tool-use/coding_one_agent_direct.py | 34 ++++++--- .../coding_one_agent_direct_intercept.py | 75 +++++++++++++++++++ 4 files changed, 125 insertions(+), 20 deletions(-) create mode 100644 python/samples/tool-use/coding_one_agent_direct_intercept.py diff --git a/python/docs/src/core-concepts/ai-agents.md b/python/docs/src/core-concepts/ai-agents.md index 7afee349a..006ea850c 100644 --- a/python/docs/src/core-concepts/ai-agents.md +++ b/python/docs/src/core-concepts/ai-agents.md @@ -163,6 +163,11 @@ from agnext.core import CancellationToken class MyMessage: content: str +@dataclass +class FunctionExecutionException(BaseException): + call_id: str + content: str + class ToolAgent(TypeRoutedAgent): def __init__(self, model_client: ChatCompletionClient, tools: List[Tool]) -> None: super().__init__("An agent with tools") @@ -184,12 +189,20 @@ class ToolAgent(TypeRoutedAgent): # Keep iterating until the model stops generating tool calls. while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content): # Execute functions called by the model by sending messages to itself. - results: List[FunctionExecutionResult] = await asyncio.gather( - *[self.send_message(call, self.id, cancellation_token=cancellation_token) for call in response.content] + results: List[FunctionExecutionResult | BaseException] = await asyncio.gather( + *[self.send_message(call, self.id) for call in response.content], + return_exceptions=True, ) - # Combine the results into a single response. - result = FunctionExecutionResultMessage(content=results) - session.append(result) + # Combine the results into a single response and handle exceptions. + function_results : List[FunctionExecutionResult] = [] + for result in results: + if isinstance(result, FunctionExecutionResult): + function_results.append(result) + elif isinstance(result, FunctionExecutionException): + function_results.append(FunctionExecutionResult(content=f"Error: {result}", call_id=result.call_id)) + elif isinstance(result, BaseException): + raise result # Unexpected exception. + session.append(FunctionExecutionResultMessage(content=function_results)) # Query the model again with the new response. response = await self._model_client.create( self._system_messages + session, tools=self._tools, cancellation_token=cancellation_token @@ -203,19 +216,18 @@ class ToolAgent(TypeRoutedAgent): async def handle_function_call( self, message: FunctionCall, cancellation_token: CancellationToken ) -> FunctionExecutionResult: - # Execute the function called by the model. tool = next((tool for tool in self._tools if tool.name == message.name), None) if tool is None: - result_as_str = f"Error: Tool not found: {message.name}" + raise FunctionExecutionException(call_id=message.id, content=f"Error: Tool not found: {message.name}") else: try: arguments = json.loads(message.arguments) result = await tool.run_json(args=arguments, cancellation_token=cancellation_token) result_as_str = tool.return_value_as_string(result) - except json.JSONDecodeError: - result_as_str = f"Error: Invalid arguments: {message.arguments}" + except json.JSONDecodeError as e: + raise FunctionExecutionException(call_id=message.id, content=f"Error: Invalid arguments: {message.arguments}") from e except Exception as e: - result_as_str = f"Error: {e}" + raise FunctionExecutionException(call_id=message.id, content=f"Error: {e}") from e return FunctionExecutionResult(content=result_as_str, call_id=message.id) ``` @@ -270,7 +282,8 @@ The stock price of NVDA on June 1, 2024, was $26.49. See [samples](https://github.com/microsoft/agnext/tree/main/python/samples#tool-use-examples) for more examples of using tools with agents, including how to use -broadcast communication model for tool execution. +broadcast communication model for tool execution, and how to intercept tool +execution for human-in-the-loop approval. ## Memory diff --git a/python/samples/README.md b/python/samples/README.md index 0c2207ce1..32aad3424 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -25,6 +25,7 @@ We provide examples to illustrate how to use tools in AGNext: - [`coding_one_agent_direct.py`](tool-use/coding_one_agent_direct.py): a code execution example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses direct communication. - [`coding_two_agent_pub_sub.py`](tool-use/coding_two_agent_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses broadcast communication. - [`custom_function_tool_one_agent_direct.py`](tool-use/custom_function_tool_one_agent_direct.py): a custom function tool example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses direct communication. +- [`coding_one_agent_direct_intercept.py`](tool-use/coding_one_agent_direct_intercept.py): an example showing human-in-the-loop for approving or denying tool execution. ## Pattern examples diff --git a/python/samples/tool-use/coding_one_agent_direct.py b/python/samples/tool-use/coding_one_agent_direct.py index 507b7ae43..9facda9fc 100644 --- a/python/samples/tool-use/coding_one_agent_direct.py +++ b/python/samples/tool-use/coding_one_agent_direct.py @@ -42,6 +42,12 @@ class Message: content: str +@dataclass +class FunctionExecutionException(BaseException): + call_id: str + content: str + + class ToolEnabledAgent(TypeRoutedAgent): """An agent that uses tools to perform tasks. It executes the tools by itself by sending the tool execution task to itself.""" @@ -68,12 +74,20 @@ class ToolEnabledAgent(TypeRoutedAgent): # Keep executing the tools until the response is not a list of function calls. while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content): - results: List[FunctionExecutionResult] = await asyncio.gather( - *[self.send_message(call, self.id) for call in response.content] + results: List[FunctionExecutionResult | BaseException] = await asyncio.gather( + *[self.send_message(call, self.id) for call in response.content], + return_exceptions=True, ) - # Combine the results into a single response. - result = FunctionExecutionResultMessage(content=results) - session.append(result) + # Combine the results into a single response and handle exceptions. + function_results: List[FunctionExecutionResult] = [] + for result in results: + if isinstance(result, FunctionExecutionResult): + function_results.append(result) + elif isinstance(result, FunctionExecutionException): + function_results.append(FunctionExecutionResult(content=f"Error: {result}", call_id=result.call_id)) + elif isinstance(result, BaseException): + raise result + session.append(FunctionExecutionResultMessage(content=function_results)) # Execute the model again with the new response. response = await self._model_client.create(self._system_messages + session, tools=self._tools) session.append(AssistantMessage(content=response.content, source=self.metadata["name"])) @@ -89,16 +103,18 @@ class ToolEnabledAgent(TypeRoutedAgent): # Find the tool tool = next((tool for tool in self._tools if tool.name == message.name), None) if tool is None: - result_as_str = f"Error: Tool not found: {message.name}" + raise FunctionExecutionException(call_id=message.id, content=f"Error: Tool not found: {message.name}") else: try: arguments = json.loads(message.arguments) result = await tool.run_json(args=arguments, cancellation_token=cancellation_token) result_as_str = tool.return_value_as_string(result) - except json.JSONDecodeError: - result_as_str = f"Error: Invalid arguments: {message.arguments}" + except json.JSONDecodeError as e: + raise FunctionExecutionException( + call_id=message.id, content=f"Error: Invalid arguments: {message.arguments}" + ) from e except Exception as e: - result_as_str = f"Error: {e}" + raise FunctionExecutionException(call_id=message.id, content=f"Error: {e}") from e return FunctionExecutionResult(content=result_as_str, call_id=message.id) diff --git a/python/samples/tool-use/coding_one_agent_direct_intercept.py b/python/samples/tool-use/coding_one_agent_direct_intercept.py new file mode 100644 index 000000000..a7bdc7a67 --- /dev/null +++ b/python/samples/tool-use/coding_one_agent_direct_intercept.py @@ -0,0 +1,75 @@ +""" +This example show case how to intercept the tool execution using +intervention hanlder. +The intervention handler is used to intercept the FunctionCall message +before it is sent out, and prompt the user for permission to execute the tool. +""" + +import asyncio +import os +import sys +from typing import Any, List + +from agnext.application import SingleThreadedAgentRuntime +from agnext.components import FunctionCall +from agnext.components.code_executor import LocalCommandLineCodeExecutor +from agnext.components.models import SystemMessage +from agnext.components.tools import PythonCodeExecutionTool, Tool +from agnext.core import AgentId +from agnext.core.intervention import DefaultInterventionHandler, DropMessage + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from coding_one_agent_direct import FunctionExecutionException, Message, ToolEnabledAgent +from common.utils import get_chat_completion_client_from_envs + + +class ToolInterventionHandler(DefaultInterventionHandler): + async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: + if isinstance(message, FunctionCall): + # Request user prompt for tool execution. + user_input = input( + f"Function call: {message.name}\nArguments: {message.arguments}\nDo you want to execute the tool? (y/n): " + ) + if user_input.strip().lower() != "y": + raise FunctionExecutionException(content="User denied tool execution.", call_id=message.id) + return message + + +async def main() -> None: + # Create the runtime with the intervention handler. + runtime = SingleThreadedAgentRuntime(intervention_handler=ToolInterventionHandler()) + # Define the tools. + tools: List[Tool] = [ + # A tool that executes Python code. + PythonCodeExecutionTool( + LocalCommandLineCodeExecutor(), + ) + ] + # Register agents. + tool_agent = await runtime.register_and_get( + "tool_enabled_agent", + lambda: ToolEnabledAgent( + description="Tool Use Agent", + system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")], + model_client=get_chat_completion_client_from_envs(model="gpt-3.5-turbo"), + tools=tools, + ), + ) + + run_context = runtime.start() + + # Send a task to the tool user. + response = await runtime.send_message(Message("Run the following Python code: print('Hello, World!')"), tool_agent) + print(response.content) + + # Run the runtime until the task is completed. + await run_context.stop() + + +if __name__ == "__main__": + import logging + + logging.basicConfig(level=logging.WARNING) + logging.getLogger("agnext").setLevel(logging.DEBUG) + asyncio.run(main())