Event-Based GroupChat and Update Examples (#62)

* initial

* convert examples to event-based; add user proxy agent.

* update examples
This commit is contained in:
Eric Zhu
2024-06-10 19:51:51 -07:00
committed by GitHub
parent 03efb87cd7
commit 1448cbc138
13 changed files with 416 additions and 138 deletions

0
examples/README.md Normal file
View File

View File

@@ -1,19 +1,20 @@
"""This is an example of a chat with an OpenAIAssistantAgent.
You must have OPENAI_API_KEY set up in your environment to
run this example.
"""
"""This is an example of a terminal-based ChatGPT clone
using an OpenAIAssistantAgent and event-based orchestration."""
import argparse
import asyncio
import logging
import os
import re
from typing import Any, List
from typing import List
import aiofiles
import openai
from agnext.application import SingleThreadedAgentRuntime
from agnext.chat.agents.oai_assistant import OpenAIAssistantAgent
from agnext.chat.patterns.group_chat import GroupChatOutput
from agnext.chat.patterns.two_agent_chat import TwoAgentChat
from agnext.chat.types import RespondNow, TextMessage
from agnext.chat.memory import BufferedChatMemory
from agnext.chat.patterns.group_chat_manager import GroupChatManager
from agnext.chat.types import PublishNow, TextMessage
from agnext.components import TypeRoutedAgent, message_handler
from agnext.core import AgentRuntime, CancellationToken
from openai import AsyncAssistantEventHandler
@@ -22,27 +23,15 @@ from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override
class TwoAgentChatOutput(GroupChatOutput): # type: ignore
def on_message_received(self, message: Any) -> None:
pass
def get_output(self) -> Any:
return None
def reset(self) -> None:
pass
sep = "-" * 50
class UserProxyAgent(TypeRoutedAgent): # type: ignore
def __init__(
def __init__( # type: ignore
self,
name: str,
runtime: AgentRuntime,
client: openai.AsyncClient,
runtime: AgentRuntime, # type: ignore
client: openai.AsyncClient, # type: ignore
assistant_id: str,
thread_id: str,
vector_store_id: str,
@@ -63,10 +52,14 @@ class UserProxyAgent(TypeRoutedAgent): # type: ignore
# print(f"{message.source}: {message.content}")
pass
async def _get_user_input(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)
@message_handler() # type: ignore
async def on_respond_now(self, message: RespondNow, cancellation_token: CancellationToken) -> TextMessage: # type: ignore
async def on_publish_now(self, message: PublishNow, cancellation_token: CancellationToken) -> None: # type: ignore
while True:
user_input = input(f"\n{sep}\nYou: ")
user_input = await self._get_user_input(f"\n{sep}\nYou: ")
# Parse upload file command '[upload code_interpreter | file_search filename]'.
match = re.search(r"\[upload\s+(code_interpreter|file_search)\s+(.+)\]", user_input)
if match:
@@ -110,9 +103,13 @@ class UserProxyAgent(TypeRoutedAgent): # type: ignore
elif user_input.startswith("[upload"):
print("Invalid upload command. Please use '[upload code_interpreter | file_search filename]'.")
continue
elif user_input.strip().lower() == "exit":
# Exit handler.
return
else:
# Send user input to assistant.
return TextMessage(content=user_input, source=self.name)
# Publish user input and exit handler.
await self._publish_message(TextMessage(content=user_input, source=self.name))
return
class EventHandler(AsyncAssistantEventHandler):
@@ -169,7 +166,7 @@ class EventHandler(AsyncAssistantEventHandler):
print("\n".join(citations))
def assistant_chat(runtime: AgentRuntime) -> TwoAgentChat: # type: ignore
def assistant_chat(runtime: AgentRuntime) -> UserProxyAgent: # type: ignore
oai_assistant = openai.beta.assistants.create(
model="gpt-4-turbo",
description="An AI assistant that helps with everyday tasks.",
@@ -197,15 +194,15 @@ def assistant_chat(runtime: AgentRuntime) -> TwoAgentChat: # type: ignore
thread_id=thread.id,
vector_store_id=vector_store.id,
)
return TwoAgentChat(
name="AssistantChat",
description="A chat with an AI assistant",
# Create a group chat manager to facilitate a turn-based conversation.
_ = GroupChatManager(
name="GroupChatManager",
description="A group chat manager.",
runtime=runtime,
first_speaker=assistant,
second_speaker=user,
num_rounds=100,
output=TwoAgentChatOutput(),
memory=BufferedChatMemory(buffer_size=10),
participants=[assistant, user],
)
return user
async def main() -> None:
@@ -220,19 +217,25 @@ where 'code_interpreter' or 'file_search' is the purpose of the file and
[upload code_interpreter data.csv]
This will upload data.csv to the assistant for use with the code interpreter tool.
Type "exit" to exit the chat.
"""
runtime = SingleThreadedAgentRuntime()
chat = assistant_chat(runtime)
user = assistant_chat(runtime)
print(usage)
future = runtime.send_message(
TextMessage(content="Hello.", source="User"),
chat,
)
while not future.done():
# Request the user to start the conversation.
runtime.send_message(PublishNow(), user)
while True:
# TODO: have a way to exit the loop.
await runtime.process_next()
await asyncio.sleep(1)
if __name__ == "__main__":
import asyncio
parser = argparse.ArgumentParser(description="Chat with an AI assistant.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@@ -1,6 +1,6 @@
"""This is an example of a chat room with AI agents. It demonstrates how to use the
`TypeRoutedAgent` class to create custom agents that can use custom message types,
and interact with other using event-based messaging."""
and interact with other using event-based messaging without an orchestrator."""
import argparse
import asyncio
@@ -85,7 +85,7 @@ Use the following JSON format to provide your thought on the latest message and
# Publish the response if needed.
if respond is True or str(respond).lower().strip() == "true":
self._publish_message(ChatRoomMessage(source=self.name, content=str(response)))
await self._publish_message(ChatRoomMessage(source=self.name, content=str(response)))
print(f"{sep}\n{self._color}{self.name}:{Style.RESET_ALL}\n{response}")
@@ -100,7 +100,6 @@ def chat_room(runtime: AgentRuntime) -> None: # type: ignore
model_client=OpenAI(model="gpt-4-turbo"), # type: ignore
color=Fore.CYAN,
)
_ = ChatRoomAgent(
name="Bob",
description="Bob in the chat room.",
@@ -110,7 +109,6 @@ def chat_room(runtime: AgentRuntime) -> None: # type: ignore
model_client=OpenAI(model="gpt-4-turbo"), # type: ignore
color=Fore.GREEN,
)
_ = ChatRoomAgent(
name="Charlie",
description="Charlie in the chat room.",
@@ -133,10 +131,11 @@ async def main(user_name: str, wait_seconds: int) -> None:
while True:
# TODO: allow user to input at any time while runtime is running.
# Get user input and send messages to the chat room.
# TODO: use Textual to build the UI.
user_input = await get_user_input(f"{sep}\nYou:\n")
if user_input.strip():
# Publish user message if it is not empty.
runtime.publish_message(ChatRoomMessage(source=user_name, content=user_input))
await runtime.publish_message(ChatRoomMessage(source=user_name, content=user_input))
# Wait for agents to respond.
while runtime.unprocessed_messages:
await runtime.process_next()

View File

@@ -1,8 +1,6 @@
"""This is an example of simulating a chess game with two agents
that play against each other, using tools to reason about the game state
and make moves.
You must have OPENAI_API_KEY set up in your environment to run this example.
"""
and make moves, and using a group chat manager to orchestrate the conversation."""
import argparse
import asyncio
@@ -12,8 +10,7 @@ from typing import Annotated, Literal
from agnext.application import SingleThreadedAgentRuntime
from agnext.chat.agents.chat_completion_agent import ChatCompletionAgent
from agnext.chat.memory import BufferedChatMemory
from agnext.chat.patterns.group_chat import GroupChat, GroupChatOutput
from agnext.chat.patterns.two_agent_chat import TwoAgentChat
from agnext.chat.patterns.group_chat_manager import GroupChatManager
from agnext.chat.types import TextMessage
from agnext.components.models import OpenAI, SystemMessage
from agnext.components.tools import FunctionTool
@@ -22,17 +19,6 @@ from chess import BLACK, SQUARE_NAMES, WHITE, Board, Move
from chess import piece_name as get_piece_name
class ChessGameOutput(GroupChatOutput): # type: ignore
def on_message_received(self, message: TextMessage) -> None: # type: ignore
pass
def get_output(self) -> None:
pass
def reset(self) -> None:
pass
def validate_turn(board: Board, player: Literal["white", "black"]) -> None:
"""Validate that it is the player's turn to move."""
last_move = board.peek() if board.move_stack else None
@@ -96,7 +82,7 @@ def make_move(
return f"Moved {piece_name} ({piece_symbol}) from {SQUARE_NAMES[newMove.from_square]} to {SQUARE_NAMES[newMove.to_square]}."
def chess_game(runtime: AgentRuntime) -> GroupChat: # type: ignore
def chess_game(runtime: AgentRuntime) -> None: # type: ignore
"""Create agents for a chess game and return the group chat."""
# Create the board.
@@ -196,37 +182,33 @@ def chess_game(runtime: AgentRuntime) -> GroupChat: # type: ignore
model_client=OpenAI(model="gpt-4-turbo"),
tools=white_tools,
)
game_chat = TwoAgentChat(
# Create a group chat manager for the chess game to orchestrate a turn-based
# conversation between the two agents.
_ = GroupChatManager(
name="ChessGame",
description="A chess game between two agents.",
runtime=runtime,
first_speaker=white,
second_speaker=black,
num_rounds=100,
output=ChessGameOutput(),
memory=BufferedChatMemory(buffer_size=10),
participants=[white, black], # white goes first
)
return game_chat
async def main(message: str) -> None:
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
game_chat = chess_game(runtime)
future = runtime.send_message(TextMessage(content=message, source="Human"), game_chat)
while not future.done():
chess_game(runtime)
# Publish an initial message to trigger the group chat manager to start orchestration.
runtime.publish_message(TextMessage(content="Game started.", source="System"))
while True:
await runtime.process_next()
await asyncio.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a chess game between two agents.")
parser.add_argument(
"--initial-message",
default="Please make a move.",
help="The initial message to send to the agent playing white.",
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main(args.initial_message))
asyncio.run(main())

88
examples/group_chat.py Normal file
View File

@@ -0,0 +1,88 @@
"""This is an example demonstrates event-driven orchestration using a
group chat manager agnent."""
import argparse
import asyncio
import logging
from agnext.application import SingleThreadedAgentRuntime
from agnext.chat.agents import ChatCompletionAgent, UserProxyAgent
from agnext.chat.memory import BufferedChatMemory
from agnext.chat.patterns.group_chat_manager import GroupChatManager
from agnext.chat.types import PublishNow
from agnext.components.models import OpenAI, SystemMessage
from agnext.core import AgentRuntime
def software_development(runtime: AgentRuntime) -> UserProxyAgent: # type: ignore
alice = ChatCompletionAgent(
name="Alice",
description="A software engineer likes to code.",
runtime=runtime,
system_messages=[SystemMessage("Your name is Alice and you are a software engineer likes to code.")],
model_client=OpenAI(model="gpt-4-turbo"),
memory=BufferedChatMemory(buffer_size=10),
)
bob = ChatCompletionAgent(
name="Bob",
description="A data scientist likes to analyze data.",
runtime=runtime,
system_messages=[SystemMessage("Your name is Bob and you are a data scientist likes to analyze data.")],
model_client=OpenAI(model="gpt-4-turbo"),
memory=BufferedChatMemory(buffer_size=10),
)
charlie = ChatCompletionAgent(
name="Charlie",
description="A designer likes to design user interfaces.",
runtime=runtime,
system_messages=[SystemMessage("Your name is Charlie and you are a designer likes to design user interfaces.")],
model_client=OpenAI(model="gpt-4-turbo"),
memory=BufferedChatMemory(buffer_size=10),
)
susan = ChatCompletionAgent(
name="Susan",
description="A product manager likes to understand user's requirement and bring it into software specifications.",
runtime=runtime,
system_messages=[
SystemMessage(
"Your name is Susan and you are a product manager likes to understand user's requirement and bring it into software specifications."
)
],
model_client=OpenAI(model="gpt-4-turbo"),
memory=BufferedChatMemory(buffer_size=10),
)
user_proxy = UserProxyAgent(
name="User", description="A user requesting for help.", runtime=runtime, user_input_prompt=f"{'-'*50}\nYou:\n"
)
_ = GroupChatManager(
name="GroupChatManager",
description="A group chat manager.",
runtime=runtime,
memory=BufferedChatMemory(buffer_size=10),
model_client=OpenAI(model="gpt-4-turbo"),
participants=[alice, bob, charlie, susan, user_proxy],
on_message_received=lambda message: print(f"{'-'*50}\n{message.source}: {message.content}"),
)
return user_proxy
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
user_proxy = software_development(runtime)
# Request the user to start the conversation.
runtime.send_message(PublishNow(), user_proxy)
while True:
# TODO: Add a way to stop the loop.
await runtime.process_next()
await asyncio.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Chat with software development team.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@@ -1,4 +1,5 @@
from .chat_completion_agent import ChatCompletionAgent
from .oai_assistant import OpenAIAssistantAgent
from .user_proxy import UserProxyAgent
__all__ = ["ChatCompletionAgent", "OpenAIAssistantAgent"]
__all__ = ["ChatCompletionAgent", "OpenAIAssistantAgent", "UserProxyAgent"]

View File

@@ -19,6 +19,7 @@ from ..memory import ChatMemory
from ..types import (
FunctionCallMessage,
Message,
PublishNow,
Reset,
RespondNow,
ResponseFormat,
@@ -59,49 +60,15 @@ class ChatCompletionAgent(TypeRoutedAgent):
async def on_respond_now(
self, message: RespondNow, cancellation_token: CancellationToken
) -> TextMessage | FunctionCallMessage:
# Get a response from the model.
response = await self._client.create(
self._system_messages + convert_messages_to_llm_messages(self._memory.get_messages(), self.name),
tools=self._tools,
json_output=message.response_format == ResponseFormat.json_object,
)
# If the agent has function executor, and the response is a list of
# tool calls, iterate with itself until we get a response that is not a
# list of tool calls.
while (
len(self._tools) > 0
and isinstance(response.content, list)
and all(isinstance(x, FunctionCall) for x in response.content)
):
# Send a function call message to itself.
response = await self._send_message(
message=FunctionCallMessage(content=response.content, source=self.name),
recipient=self,
cancellation_token=cancellation_token,
)
# Make an assistant message from the response.
response = await self._client.create(
self._system_messages + convert_messages_to_llm_messages(self._memory.get_messages(), self.name),
tools=self._tools,
json_output=message.response_format == ResponseFormat.json_object,
)
final_response: Message
if isinstance(response.content, str):
# If the response is a string, return a text message.
final_response = TextMessage(content=response.content, source=self.name)
elif isinstance(response.content, list) and all(isinstance(x, FunctionCall) for x in response.content):
# If the response is a list of function calls, return a function call message.
final_response = FunctionCallMessage(content=response.content, source=self.name)
else:
raise ValueError(f"Unexpected response: {response.content}")
# Add the response to the chat messages.
self._memory.add_message(final_response)
# Return the response.
return final_response
return await self._generate_response(message.response_format, cancellation_token)
@message_handler()
async def on_publish_now(self, message: PublishNow, cancellation_token: CancellationToken) -> None:
# Generate a response.
response = await self._generate_response(message.response_format, cancellation_token)
# Publish the response.
await self._publish_message(response)
@message_handler()
async def on_tool_call_message(
@@ -129,7 +96,7 @@ class ChatCompletionAgent(TypeRoutedAgent):
)
continue
# Execute the function.
future = self.execute_function(
future = self._execute_function(
function_call.name,
arguments,
function_call.id,
@@ -153,7 +120,55 @@ class ChatCompletionAgent(TypeRoutedAgent):
# Return the results.
return tool_call_result_msg
async def execute_function(
async def _generate_response(
self,
response_format: ResponseFormat,
cancellation_token: CancellationToken,
) -> TextMessage | FunctionCallMessage:
# Get a response from the model.
response = await self._client.create(
self._system_messages + convert_messages_to_llm_messages(self._memory.get_messages(), self.name),
tools=self._tools,
json_output=response_format == ResponseFormat.json_object,
)
# If the agent has function executor, and the response is a list of
# tool calls, iterate with itself until we get a response that is not a
# list of tool calls.
while (
len(self._tools) > 0
and isinstance(response.content, list)
and all(isinstance(x, FunctionCall) for x in response.content)
):
# Send a function call message to itself.
response = await self._send_message(
message=FunctionCallMessage(content=response.content, source=self.name),
recipient=self,
cancellation_token=cancellation_token,
)
# Make an assistant message from the response.
response = await self._client.create(
self._system_messages + convert_messages_to_llm_messages(self._memory.get_messages(), self.name),
tools=self._tools,
json_output=response_format == ResponseFormat.json_object,
)
final_response: Message
if isinstance(response.content, str):
# If the response is a string, return a text message.
final_response = TextMessage(content=response.content, source=self.name)
elif isinstance(response.content, list) and all(isinstance(x, FunctionCall) for x in response.content):
# If the response is a list of function calls, return a function call message.
final_response = FunctionCallMessage(content=response.content, source=self.name)
else:
raise ValueError(f"Unexpected response: {response.content}")
# Add the response to the chat messages.
self._memory.add_message(final_response)
return final_response
async def _execute_function(
self,
name: str,
args: Dict[str, Any],

View File

@@ -6,7 +6,7 @@ from openai.types.beta import AssistantResponseFormatParam
from ...components import TypeRoutedAgent, message_handler
from ...core import AgentRuntime, CancellationToken
from ..types import Reset, RespondNow, ResponseFormat, TextMessage
from ..types import PublishNow, Reset, RespondNow, ResponseFormat, TextMessage
class OpenAIAssistantAgent(TypeRoutedAgent):
@@ -56,8 +56,18 @@ class OpenAIAssistantAgent(TypeRoutedAgent):
@message_handler()
async def on_respond_now(self, message: RespondNow, cancellation_token: CancellationToken) -> TextMessage:
return await self._generate_response(message.response_format, cancellation_token)
@message_handler()
async def on_publish_now(self, message: PublishNow, cancellation_token: CancellationToken) -> None:
response = await self._generate_response(message.response_format, cancellation_token)
await self._publish_message(response)
async def _generate_response(
self, requested_response_format: ResponseFormat, cancellation_token: CancellationToken
) -> TextMessage:
# Handle response format.
if message.response_format == ResponseFormat.json_object:
if requested_response_format == ResponseFormat.json_object:
response_format = AssistantResponseFormatParam(type="json_object")
else:
response_format = AssistantResponseFormatParam(type="text")

View File

@@ -0,0 +1,20 @@
import asyncio
from ...components import TypeRoutedAgent, message_handler
from ...core import AgentRuntime, CancellationToken
from ..types import PublishNow, TextMessage
class UserProxyAgent(TypeRoutedAgent):
def __init__(self, name: str, description: str, runtime: AgentRuntime, user_input_prompt: str) -> None:
super().__init__(name, description, runtime)
self._user_input_prompt = user_input_prompt
@message_handler()
async def on_publish_now(self, message: PublishNow, cancellation_token: CancellationToken) -> None:
user_input = await self.get_user_input(self._user_input_prompt)
await self._publish_message(TextMessage(content=user_input, source=self.name))
async def get_user_input(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)

View File

@@ -0,0 +1,70 @@
from typing import Callable, List
from ...components import TypeRoutedAgent, message_handler
from ...components.models import ChatCompletionClient
from ...core import Agent, AgentRuntime, CancellationToken
from ..memory import ChatMemory
from ..types import (
PublishNow,
Reset,
TextMessage,
)
from .group_chat_utils import select_speaker
class GroupChatManager(TypeRoutedAgent):
def __init__(
self,
name: str,
description: str,
runtime: AgentRuntime,
participants: List[Agent],
memory: ChatMemory,
model_client: ChatCompletionClient | None = None,
termination_word: str = "TERMINATE",
on_message_received: Callable[[TextMessage], None] | None = None,
):
super().__init__(name, description, runtime)
self._memory = memory
self._client = model_client
self._participants = participants
self._termination_word = termination_word
self._on_message_received = on_message_received
@message_handler()
async def on_reset(self, message: Reset, cancellation_token: CancellationToken) -> None:
self._memory.clear()
@message_handler()
async def on_text_message(self, message: TextMessage, cancellation_token: CancellationToken) -> None:
# Call the custom on_message_received handler if provided.
if self._on_message_received is not None:
self._on_message_received(message)
# Check if the message is the termination word.
if message.content.strip() == self._termination_word:
# Terminate the group chat by not selecting the next speaker.
return
# Save the message to chat memory.
self._memory.add_message(message)
# Select speaker.
if self._client is None:
# If no model client is provided, select the next speaker from the list of participants.
last_speaker_name = message.source
last_speaker_index = next(
(i for i, p in enumerate(self._participants) if p.name == last_speaker_name), None
)
if last_speaker_index is None:
# If the last speaker is not found, select the first speaker in the list.
next_speaker_index = 0
else:
next_speaker_index = (last_speaker_index + 1) % len(self._participants)
speaker = self._participants[next_speaker_index]
else:
# If a model client is provided, select the speaker based on the model output.
speaker = await select_speaker(self._memory, self._client, self._participants)
# Send the message to the selected speaker to ask it to publish a response.
await self._send_message(PublishNow(), speaker)

View File

@@ -0,0 +1,79 @@
"""Credit to the original authors: https://github.com/microsoft/autogen/blob/main/autogen/agentchat/groupchat.py"""
import re
from typing import Dict, List
from ...components.models import ChatCompletionClient, SystemMessage
from ...core import Agent
from ..memory import ChatMemory
from ..types import TextMessage
async def select_speaker(memory: ChatMemory, client: ChatCompletionClient, agents: List[Agent]) -> Agent:
# TODO: Handle multi-modal messages.
# Construct formated current message history.
history_messages: List[str] = []
for msg in memory.get_messages():
assert isinstance(msg, TextMessage)
history_messages.append(f"{msg.source}: {msg.content}")
history = "\n".join(history_messages)
# Construct agent roles.
roles = "\n".join([f"{agent.name}: {agent.description}".strip() for agent in agents])
# Construct agent list.
participants = str([agent.name for agent in agents])
# Select the next speaker.
select_speaker_prompt = f"""You are in a role play game. The following roles are available:
{roles}.
Read the following conversation. Then select the next role from {participants} to play. Only return the role.
{history}
Read the above conversation. Then select the next role from {participants} to play. Only return the role.
"""
select_speaker_messages = [SystemMessage(select_speaker_prompt)]
response = await client.create(messages=select_speaker_messages)
assert isinstance(response.content, str)
mentions = mentioned_agents(response.content, agents)
if len(mentions) != 1:
raise ValueError(f"Expected exactly one agent to be mentioned, but got {mentions}")
agent_name = list(mentions.keys())[0]
agent = next((agent for agent in agents if agent.name == agent_name), None)
assert agent is not None
return agent
def mentioned_agents(message_content: str, agents: List[Agent]) -> Dict[str, int]:
"""Counts the number of times each agent is mentioned in the provided message content.
Agent names will match under any of the following conditions (all case-sensitive):
- Exact name match
- If the agent name has underscores it will match with spaces instead (e.g. 'Story_writer' == 'Story writer')
- If the agent name has underscores it will match with '\\_' instead of '_' (e.g. 'Story_writer' == 'Story\\_writer')
Args:
message_content (Union[str, List]): The content of the message, either as a single string or a list of strings.
agents (List[Agent]): A list of Agent objects, each having a 'name' attribute to be searched in the message content.
Returns:
Dict: a counter for mentioned agents.
"""
mentions: Dict[str, int] = dict()
for agent in agents:
# Finds agent mentions, taking word boundaries into account,
# accommodates escaping underscores and underscores as spaces
regex = (
r"(?<=\W)("
+ re.escape(agent.name)
+ r"|"
+ re.escape(agent.name.replace("_", " "))
+ r"|"
+ re.escape(agent.name.replace("_", r"\_"))
+ r")(?=\W)"
)
count = len(re.findall(regex, f" {message_content} ")) # Pad the message to help with matching
if count > 0:
mentions[agent.name] = count
return mentions

View File

@@ -51,7 +51,7 @@ class OrchestratorChat(TypeRoutedAgent):
while total_turns < self._max_turns:
# Reset all agents.
for agent in [*self._specialists, self._orchestrator]:
self._send_message(Reset(), agent)
await self._send_message(Reset(), agent)
# Create the task specs.
task_specs = f"""
@@ -73,7 +73,7 @@ Some additional points to consider:
# Send the task specs to the orchestrator and specialists.
for agent in [*self._specialists, self._orchestrator]:
self._send_message(TextMessage(content=task_specs, source=self.name), agent)
await self._send_message(TextMessage(content=task_specs, source=self.name), agent)
# Inner loop.
stalled_turns = 0
@@ -144,7 +144,7 @@ Some additional points to consider:
# Update all other agents with the speaker's response.
for agent in [agent for agent in self._specialists if agent != speaker] + [self._orchestrator]:
self._send_message(
await self._send_message(
TextMessage(
content=speaker_response.content,
source=speaker_response.source,
@@ -162,7 +162,7 @@ Some additional points to consider:
async def _prepare_task(self, task: str, sender: str) -> Tuple[str, str, str, str]:
# Reset planner.
self._send_message(Reset(), self._planner)
await self._send_message(Reset(), self._planner)
# A reusable description of the team.
team = "\n".join([agent.name + ": " + agent.description for agent in self._specialists])
@@ -197,7 +197,7 @@ When answering this survey, keep in mind that "facts" will typically be specific
""".strip()
# Ask the planner to obtain prior knowledge about facts.
self._send_message(TextMessage(content=closed_book_prompt, source=sender), self._planner)
await self._send_message(TextMessage(content=closed_book_prompt, source=sender), self._planner)
facts_response = await self._send_message(RespondNow(), self._planner)
facts = str(facts_response.content)
@@ -210,7 +210,7 @@ When answering this survey, keep in mind that "facts" will typically be specific
Based on the team composition, and known and unknown facts, please devise a short bullet-point plan for addressing the original request. Remember, there is no requirement to involve all team members -- a team member's particular expertise may not be needed for this task.""".strip()
# Send second messag eto the planner.
self._send_message(TextMessage(content=plan_prompt, source=sender), self._planner)
await self._send_message(TextMessage(content=plan_prompt, source=sender), self._planner)
plan_response = await self._send_message(RespondNow(), self._planner)
plan = str(plan_response.content)
@@ -263,7 +263,7 @@ Please output an answer in pure JSON format according to the following schema. T
request = step_prompt
while True:
# Send a message to the orchestrator.
self._send_message(TextMessage(content=request, source=sender), self._orchestrator)
await self._send_message(TextMessage(content=request, source=sender), self._orchestrator)
# Request a response.
step_response = await self._send_message(
RespondNow(response_format=ResponseFormat.json_object),
@@ -326,7 +326,7 @@ Please output an answer in pure JSON format according to the following schema. T
{facts}
""".strip()
# Send a message to the orchestrator.
self._send_message(TextMessage(content=new_facts_prompt, source=sender), self._orchestrator)
await self._send_message(TextMessage(content=new_facts_prompt, source=sender), self._orchestrator)
# Request a response.
new_facts_response = await self._send_message(RespondNow(), self._orchestrator)
return str(new_facts_response.content)
@@ -351,7 +351,7 @@ Please output an answer in pure JSON format according to the following schema. T
request = educated_guess_promt
while True:
# Send a message to the orchestrator.
self._send_message(
await self._send_message(
TextMessage(content=request, source=sender),
self._orchestrator,
)
@@ -385,7 +385,7 @@ Team membership:
{team}
""".strip()
# Send a message to the orchestrator.
self._send_message(TextMessage(content=new_plan_prompt, source=sender), self._orchestrator)
await self._send_message(TextMessage(content=new_plan_prompt, source=sender), self._orchestrator)
# Request a response.
new_plan_response = await self._send_message(RespondNow(), self._orchestrator)
return str(new_plan_response.content)

View File

@@ -39,6 +39,17 @@ class ResponseFormat(Enum):
@dataclass
class RespondNow:
"""A message to request a response from the addressed agent. The sender
expects a response upon sening and waits for it synchronously."""
response_format: ResponseFormat = field(default=ResponseFormat.text)
@dataclass
class PublishNow:
"""A message to request an event to be published to the addressed agent.
Unlike RespondNow, the sender does not expect a response upon sending."""
response_format: ResponseFormat = field(default=ResponseFormat.text)