From 98e6bba168c784cc3d8ae9491453241b10f4f5ee Mon Sep 17 00:00:00 2001 From: Eric Zhu Date: Sun, 27 Jul 2025 22:59:09 -0700 Subject: [PATCH] Supporting Teams as Participants in a GroupChat (#5863) --- .../src/autogen_agentchat/base/_team.py | 14 + .../teams/_group_chat/_base_group_chat.py | 52 +- .../_group_chat/_base_group_chat_manager.py | 19 +- .../_group_chat/_chat_agent_container.py | 109 ++- .../teams/_group_chat/_events.py | 16 +- .../_group_chat/_graph/_digraph_group_chat.py | 25 +- .../_magentic_one/_magentic_one_group_chat.py | 25 +- .../_magentic_one_orchestrator.py | 21 +- .../_group_chat/_round_robin_group_chat.py | 170 +++-- .../teams/_group_chat/_selector_group_chat.py | 50 +- .../teams/_group_chat/_swarm_group_chat.py | 31 +- .../tests/test_group_chat_nested.py | 668 ++++++++++++++++++ 12 files changed, 1089 insertions(+), 111 deletions(-) create mode 100644 python/packages/autogen-agentchat/tests/test_group_chat_nested.py diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_team.py b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_team.py index 4cbdc9741..e39aedaa6 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/base/_team.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/base/_team.py @@ -10,6 +10,20 @@ from ._task import TaskRunner class Team(ABC, TaskRunner, ComponentBase[BaseModel]): component_type = "team" + @property + @abstractmethod + def name(self) -> str: + """The name of the team. This is used by team to uniquely identify itself + in a larger team of teams.""" + ... + + @property + @abstractmethod + def description(self) -> str: + """A description of the team. This is used to provide context about the + team and its purpose to its parent orchestrator.""" + ... + @abstractmethod async def reset(self) -> None: """Reset the team and all its participants to its initial state.""" diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py index 9a078bd42..60f222912 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py @@ -40,15 +40,34 @@ from ._sequential_routed_agent import SequentialRoutedAgent class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): """The base class for group chat teams. + In a group chat team, participants share context by publishing their messages + to all other participants. + + If an :class:`~autogen_agentchat.base.ChatAgent` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` from the agent response's + :attr:`~autogen_agentchat.base.Response.chat_message` will be published + to other participants in the group chat. + + If a :class:`~autogen_agentchat.base.Team` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` + from the team result' :attr:`~autogen_agentchat.base.TaskResult.messages` will be published + to other participants in the group chat. + To implement a group chat team, first create a subclass of :class:`BaseGroupChatManager` and then create a subclass of :class:`BaseGroupChat` that uses the group chat manager. + + This base class provides the mapping between the agents of the AgentChat API + and the agent runtime of the Core API, and handles high-level features like + running, pausing, resuming, and resetting the team. """ component_type = "team" def __init__( self, - participants: List[ChatAgent], + name: str, + description: str, + participants: List[ChatAgent | Team], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, @@ -57,6 +76,8 @@ class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, ): + self._name = name + self._description = description if len(participants) == 0: raise ValueError("At least one participant is required.") if len(participants) != len(set(participant.name for participant in participants)): @@ -71,14 +92,15 @@ class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): self._message_factory.register(message_type) for agent in participants: - for message_type in agent.produced_message_types: - try: - is_registered = self._message_factory.is_registered(message_type) # type: ignore[reportUnknownArgumentType] - if issubclass(message_type, StructuredMessage) and not is_registered: - self._message_factory.register(message_type) # type: ignore[reportUnknownArgumentType] - except TypeError: - # Not a class or not a valid subclassable type (skip) - pass + if isinstance(agent, ChatAgent): + for message_type in agent.produced_message_types: + try: + is_registered = self._message_factory.is_registered(message_type) # type: ignore[reportUnknownArgumentType] + if issubclass(message_type, StructuredMessage) and not is_registered: + self._message_factory.register(message_type) # type: ignore[reportUnknownArgumentType] + except TypeError: + # Not a class or not a valid subclassable type (skip) + pass # The team ID is a UUID that is used to identify the team and its participants # in the agent runtime. It is used to create unique topic types for each participant. @@ -128,6 +150,16 @@ class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): # Flag to track if the team events should be emitted. self._emit_team_events = emit_team_events + @property + def name(self) -> str: + """The name of the group chat team.""" + return self._name + + @property + def description(self) -> str: + """A description of the group chat team.""" + return self._description + @abstractmethod def _create_group_chat_manager_factory( self, @@ -147,7 +179,7 @@ class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): self, parent_topic_type: str, output_topic_type: str, - agent: ChatAgent, + agent: ChatAgent | Team, message_factory: MessageFactory, ) -> Callable[[], ChatAgentContainer]: def _factory() -> ChatAgentContainer: diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py index 48feee601..b0a0c1d55 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py @@ -15,6 +15,7 @@ from ._events import ( GroupChatReset, GroupChatResume, GroupChatStart, + GroupChatTeamResponse, GroupChatTermination, SerializableException, ) @@ -52,6 +53,7 @@ class BaseGroupChatManager(SequentialRoutedAgent, ABC): sequential_message_types=[ GroupChatStart, GroupChatAgentResponse, + GroupChatTeamResponse, GroupChatMessage, GroupChatReset, ], @@ -130,20 +132,25 @@ class BaseGroupChatManager(SequentialRoutedAgent, ABC): await self._transition_to_next_speakers(ctx.cancellation_token) @event - async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None: + async def handle_agent_response( + self, message: GroupChatAgentResponse | GroupChatTeamResponse, ctx: MessageContext + ) -> None: try: # Construct the detla from the agent response. delta: List[BaseAgentEvent | BaseChatMessage] = [] - if message.agent_response.inner_messages is not None: - for inner_message in message.agent_response.inner_messages: - delta.append(inner_message) - delta.append(message.agent_response.chat_message) + if isinstance(message, GroupChatAgentResponse): + if message.response.inner_messages is not None: + for inner_message in message.response.inner_messages: + delta.append(inner_message) + delta.append(message.response.chat_message) + else: + delta.extend(message.result.messages) # Append the messages to the message thread. await self.update_message_thread(delta) # Remove the agent from the active speakers list. - self._active_speakers.remove(message.agent_name) + self._active_speakers.remove(message.name) if len(self._active_speakers) > 0: # If there are still active speakers, return without doing anything. return diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py index df2d6d50a..ff660c6f7 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py @@ -4,7 +4,7 @@ from autogen_core import DefaultTopicId, MessageContext, event, rpc, trace_invok from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, MessageFactory -from ...base import ChatAgent, Response +from ...base import ChatAgent, Response, TaskResult, Team from ...state import ChatAgentContainerState from ._events import ( GroupChatAgentResponse, @@ -15,6 +15,7 @@ from ._events import ( GroupChatReset, GroupChatResume, GroupChatStart, + GroupChatTeamResponse, SerializableException, ) from ._sequential_routed_agent import SequentialRoutedAgent @@ -22,19 +23,19 @@ from ._sequential_routed_agent import SequentialRoutedAgent class ChatAgentContainer(SequentialRoutedAgent): """A core agent class that delegates message handling to an - :class:`autogen_agentchat.base.ChatAgent` so that it can be used in a - group chat team. + :class:`autogen_agentchat.base.ChatAgent` or :class:`autogen_agentchat.base.Team` + so that it can be used in a group chat team. Args: parent_topic_type (str): The topic type of the parent orchestrator. output_topic_type (str): The topic type for the output. - agent (ChatAgent): The agent to delegate message handling to. + agent (ChatAgent | Team): The agent or team to delegate message handling to. message_factory (MessageFactory): The message factory to use for creating messages from JSON data. """ def __init__( - self, parent_topic_type: str, output_topic_type: str, agent: ChatAgent, message_factory: MessageFactory + self, parent_topic_type: str, output_topic_type: str, agent: ChatAgent | Team, message_factory: MessageFactory ) -> None: super().__init__( description=agent.description, @@ -43,6 +44,7 @@ class ChatAgentContainer(SequentialRoutedAgent): GroupChatRequestPublish, GroupChatReset, GroupChatAgentResponse, + GroupChatTeamResponse, ], ) self._parent_topic_type = parent_topic_type @@ -61,40 +63,50 @@ class ChatAgentContainer(SequentialRoutedAgent): @event async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None: """Handle an agent response event by appending the content to the buffer.""" - self._buffer_message(message.agent_response.chat_message) + self._buffer_message(message.response.chat_message) + + @event + async def handle_team_response(self, message: GroupChatTeamResponse, ctx: MessageContext) -> None: + """Handle a team response event by appending the content to the buffer.""" + for msg in message.result.messages: + if isinstance(msg, BaseChatMessage): + self._buffer_message(msg) @rpc async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> None: """Handle a reset event by resetting the agent.""" self._message_buffer.clear() - await self._agent.on_reset(ctx.cancellation_token) + if isinstance(self._agent, Team): + # If the agent is a team, reset the team. + await self._agent.reset() + else: + await self._agent.on_reset(ctx.cancellation_token) @event async def handle_request(self, message: GroupChatRequestPublish, ctx: MessageContext) -> None: """Handle a content request event by passing the messages in the buffer to the delegate agent and publish the response.""" - with trace_invoke_agent_span( - agent_name=self._agent.name, - agent_description=self._agent.description, - agent_id=str(self.id), - ): + if isinstance(self._agent, Team): try: - # Pass the messages in the buffer to the delegate agent. - response: Response | None = None - async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token): - if isinstance(msg, Response): - await self._log_message(msg.chat_message) - response = msg + stream = self._agent.run_stream( + task=self._message_buffer, + cancellation_token=ctx.cancellation_token, + output_task_messages=False, + ) + result: TaskResult | None = None + async for team_event in stream: + if isinstance(team_event, TaskResult): + result = team_event else: - await self._log_message(msg) - if response is None: - raise ValueError( - "The agent did not produce a final response. Check the agent's on_messages_stream method." + await self._log_message(team_event) + if result is None: + raise RuntimeError( + "The team did not produce a final TaskResult. Check the team's run_stream method." ) - # Publish the response to the group chat. self._message_buffer.clear() + # Publish the team response to the group chat. await self.publish_message( - GroupChatAgentResponse(agent_response=response, agent_name=self._agent.name), + GroupChatTeamResponse(result=result, name=self._agent.name), topic_id=DefaultTopicId(type=self._parent_topic_type), cancellation_token=ctx.cancellation_token, ) @@ -108,6 +120,43 @@ class ChatAgentContainer(SequentialRoutedAgent): ) # Raise the error to the runtime. raise + else: + # If the agent is not a team, handle it as a single agent. + with trace_invoke_agent_span( + agent_name=self._agent.name, + agent_description=self._agent.description, + agent_id=str(self.id), + ): + try: + # Pass the messages in the buffer to the delegate agent. + response: Response | None = None + async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token): + if isinstance(msg, Response): + await self._log_message(msg.chat_message) + response = msg + else: + await self._log_message(msg) + if response is None: + raise RuntimeError( + "The agent did not produce a final response. Check the agent's on_messages_stream method." + ) + # Publish the response to the group chat. + self._message_buffer.clear() + await self.publish_message( + GroupChatAgentResponse(response=response, name=self._agent.name), + topic_id=DefaultTopicId(type=self._parent_topic_type), + cancellation_token=ctx.cancellation_token, + ) + except Exception as e: + # Publish the error to the group chat. + error_message = SerializableException.from_exception(e) + await self.publish_message( + GroupChatError(error=error_message), + topic_id=DefaultTopicId(type=self._parent_topic_type), + cancellation_token=ctx.cancellation_token, + ) + # Raise the error to the runtime. + raise def _buffer_message(self, message: BaseChatMessage) -> None: if not self._message_factory.is_registered(message.__class__): @@ -127,12 +176,20 @@ class ChatAgentContainer(SequentialRoutedAgent): @rpc async def handle_pause(self, message: GroupChatPause, ctx: MessageContext) -> None: """Handle a pause event by pausing the agent.""" - await self._agent.on_pause(ctx.cancellation_token) + if isinstance(self._agent, Team): + # If the agent is a team, pause the team. + await self._agent.pause() + else: + await self._agent.on_pause(ctx.cancellation_token) @rpc async def handle_resume(self, message: GroupChatResume, ctx: MessageContext) -> None: """Handle a resume event by resuming the agent.""" - await self._agent.on_resume(ctx.cancellation_token) + if isinstance(self._agent, Team): + # If the agent is a team, resume the team. + await self._agent.resume() + else: + await self._agent.on_resume(ctx.cancellation_token) async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None: raise ValueError(f"Unhandled message in agent container: {type(message)}") diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py index 49d9a6322..a954dd6e5 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_events.py @@ -3,7 +3,7 @@ from typing import List from pydantic import BaseModel -from ...base import Response +from ...base import Response, TaskResult from ...messages import BaseAgentEvent, BaseChatMessage, StopMessage @@ -48,13 +48,23 @@ class GroupChatStart(BaseModel): class GroupChatAgentResponse(BaseModel): """A response published to a group chat.""" - agent_response: Response + response: Response """The response from an agent.""" - agent_name: str + name: str """The name of the agent that produced the response.""" +class GroupChatTeamResponse(BaseModel): + """A response published to a group chat from a team.""" + + result: TaskResult + """The result from a team.""" + + name: str + """The name of the team that produced the response.""" + + class GroupChatRequestPublish(BaseModel): """A request to publish a message to a group chat.""" diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py index c269e1963..b9b607b22 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_graph/_digraph_group_chat.py @@ -539,6 +539,8 @@ class GraphFlowManager(BaseGroupChatManager): class GraphFlowConfig(BaseModel): """The declarative configuration for GraphFlow.""" + name: str | None = None + description: str | None = None participants: List[ComponentModel] termination_condition: ComponentModel | None = None max_turns: int | None = None @@ -774,10 +776,16 @@ class GraphFlow(BaseGroupChat, Component[GraphFlowConfig]): component_config_schema = GraphFlowConfig component_provider_override = "autogen_agentchat.teams.GraphFlow" + DEFAULT_NAME = "GraphFlow" + DEFAULT_DESCRIPTION = "A team of agents" + def __init__( self, participants: List[ChatAgent], graph: DiGraph, + *, + name: str | None = None, + description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, @@ -786,10 +794,16 @@ class GraphFlow(BaseGroupChat, Component[GraphFlowConfig]): self._input_participants = participants self._input_termination_condition = termination_condition + for participant in participants: + if not isinstance(participant, ChatAgent): + raise TypeError(f"Participant {participant} must be a ChatAgent.") + # No longer add _StopAgent or StopMessageTermination # Termination is now handled directly in GraphFlowManager._apply_termination_condition super().__init__( - participants, + name=name or self.DEFAULT_NAME, + description=description or self.DEFAULT_DESCRIPTION, + participants=list(participants), group_chat_manager_name="GraphManager", group_chat_manager_class=GraphFlowManager, termination_condition=termination_condition, @@ -838,6 +852,8 @@ class GraphFlow(BaseGroupChat, Component[GraphFlowConfig]): self._input_termination_condition.dump_component() if self._input_termination_condition else None ) return GraphFlowConfig( + name=self._name, + description=self._description, participants=participants, termination_condition=termination_condition, max_turns=self._max_turns, @@ -852,5 +868,10 @@ class GraphFlow(BaseGroupChat, Component[GraphFlowConfig]): TerminationCondition.load_component(config.termination_condition) if config.termination_condition else None ) return cls( - participants, graph=config.graph, termination_condition=termination_condition, max_turns=config.max_turns + name=config.name, + description=config.description, + participants=participants, + graph=config.graph, + termination_condition=termination_condition, + max_turns=config.max_turns, ) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_group_chat.py index 3b637d459..e5fd0e85a 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_group_chat.py @@ -22,6 +22,8 @@ event_logger = logging.getLogger(EVENT_LOGGER_NAME) class MagenticOneGroupChatConfig(BaseModel): """The declarative configuration for a MagenticOneGroupChat.""" + name: str | None = None + description: str | None = None participants: List[ComponentModel] model_client: ComponentModel termination_condition: ComponentModel | None = None @@ -39,6 +41,9 @@ class MagenticOneGroupChat(BaseGroupChat, Component[MagenticOneGroupChatConfig]) The orchestrator is based on the Magentic-One architecture, which is a generalist multi-agent system for solving complex tasks (see references below). + Unlike :class:`~autogen_agentchat.teams.RoundRobinGroupChat` and :class:`~autogen_agentchat.teams.SelectorGroupChat`, + the MagenticOneGroupChat does not support using team as participant. + Args: participants (List[ChatAgent]): The participants in the group chat. model_client (ChatCompletionClient): The model client used for generating responses. @@ -98,11 +103,16 @@ class MagenticOneGroupChat(BaseGroupChat, Component[MagenticOneGroupChatConfig]) component_config_schema = MagenticOneGroupChatConfig component_provider_override = "autogen_agentchat.teams.MagenticOneGroupChat" + DEFAULT_NAME = "MagenticOneGroupChat" + DEFAULT_DESCRIPTION = "A team of agents." + def __init__( self, participants: List[ChatAgent], model_client: ChatCompletionClient, *, + name: str | None = None, + description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, runtime: AgentRuntime | None = None, @@ -111,8 +121,13 @@ class MagenticOneGroupChat(BaseGroupChat, Component[MagenticOneGroupChatConfig]) custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, ): + for participant in participants: + if not isinstance(participant, ChatAgent): + raise TypeError(f"Participant {participant} must be a ChatAgent.") super().__init__( - participants, + name=name or self.DEFAULT_NAME, + description=description or self.DEFAULT_DESCRIPTION, + participants=list(participants), group_chat_manager_name="MagenticOneOrchestrator", group_chat_manager_class=MagenticOneOrchestrator, termination_condition=termination_condition, @@ -163,6 +178,8 @@ class MagenticOneGroupChat(BaseGroupChat, Component[MagenticOneGroupChatConfig]) participants = [participant.dump_component() for participant in self._participants] termination_condition = self._termination_condition.dump_component() if self._termination_condition else None return MagenticOneGroupChatConfig( + name=self.name, + description=self.description, participants=participants, model_client=self._model_client.dump_component(), termination_condition=termination_condition, @@ -180,8 +197,10 @@ class MagenticOneGroupChat(BaseGroupChat, Component[MagenticOneGroupChatConfig]) TerminationCondition.load_component(config.termination_condition) if config.termination_condition else None ) return cls( - participants, - model_client, + participants=participants, + name=config.name, + description=config.description, + model_client=model_client, termination_condition=termination_condition, max_turns=config.max_turns, max_stalls=config.max_stalls, diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_orchestrator.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_orchestrator.py index 2f1d2fd67..176789257 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_orchestrator.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_magentic_one/_magentic_one_orchestrator.py @@ -37,6 +37,7 @@ from .._events import ( GroupChatRequestPublish, GroupChatReset, GroupChatStart, + GroupChatTeamResponse, GroupChatTermination, SerializableException, ) @@ -189,14 +190,18 @@ class MagenticOneOrchestrator(BaseGroupChatManager): await self._reenter_outer_loop(ctx.cancellation_token) @event - async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None: # type: ignore + async def handle_agent_response( # type: ignore + self, message: GroupChatAgentResponse | GroupChatTeamResponse, ctx: MessageContext + ) -> None: # type: ignore try: + if not isinstance(message, GroupChatAgentResponse): + raise RuntimeError("MagenticOneOrchestrator does not support GroupChatTeamResponse messages.") delta: List[BaseAgentEvent | BaseChatMessage] = [] - if message.agent_response.inner_messages is not None: - for inner_message in message.agent_response.inner_messages: + if message.response.inner_messages is not None: + for inner_message in message.response.inner_messages: delta.append(inner_message) - await self.update_message_thread([message.agent_response.chat_message]) - delta.append(message.agent_response.chat_message) + await self.update_message_thread([message.response.chat_message]) + delta.append(message.response.chat_message) if self._termination_condition is not None: stop_message = await self._termination_condition(delta) @@ -285,7 +290,7 @@ class MagenticOneOrchestrator(BaseGroupChatManager): # Broadcast await self.publish_message( - GroupChatAgentResponse(agent_response=Response(chat_message=ledger_message), agent_name=self._name), + GroupChatAgentResponse(response=Response(chat_message=ledger_message), name=self._name), topic_id=DefaultTopicId(type=self._group_topic_type), ) @@ -415,7 +420,7 @@ class MagenticOneOrchestrator(BaseGroupChatManager): # Broadcast it await self.publish_message( # Broadcast - GroupChatAgentResponse(agent_response=Response(chat_message=message), agent_name=self._name), + GroupChatAgentResponse(response=Response(chat_message=message), name=self._name), topic_id=DefaultTopicId(type=self._group_topic_type), cancellation_token=cancellation_token, ) @@ -496,7 +501,7 @@ class MagenticOneOrchestrator(BaseGroupChatManager): # Broadcast await self.publish_message( - GroupChatAgentResponse(agent_response=Response(chat_message=message), agent_name=self._name), + GroupChatAgentResponse(response=Response(chat_message=message), name=self._name), topic_id=DefaultTopicId(type=self._group_topic_type), cancellation_token=cancellation_token, ) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py index d6b43afb2..3f529f0c4 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_round_robin_group_chat.py @@ -5,7 +5,7 @@ from autogen_core import AgentRuntime, Component, ComponentModel from pydantic import BaseModel from typing_extensions import Self -from ...base import ChatAgent, TerminationCondition +from ...base import ChatAgent, Team, TerminationCondition from ...messages import BaseAgentEvent, BaseChatMessage, MessageFactory from ...state import RoundRobinManagerState from ._base_group_chat import BaseGroupChat @@ -85,6 +85,8 @@ class RoundRobinGroupChatManager(BaseGroupChatManager): class RoundRobinGroupChatConfig(BaseModel): """The declarative configuration RoundRobinGroupChat.""" + name: str | None = None + description: str | None = None participants: List[ComponentModel] termination_condition: ComponentModel | None = None max_turns: int | None = None @@ -95,10 +97,23 @@ class RoundRobinGroupChat(BaseGroupChat, Component[RoundRobinGroupChatConfig]): """A team that runs a group chat with participants taking turns in a round-robin fashion to publish a message to all. + If an :class:`~autogen_agentchat.base.ChatAgent` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` from the agent response's + :attr:`~autogen_agentchat.base.Response.chat_message` will be published + to other participants in the group chat. + + If a :class:`~autogen_agentchat.base.Team` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` + from the team result' :attr:`~autogen_agentchat.base.TaskResult.messages` will be published + to other participants in the group chat. + If a single participant is in the team, the participant will be the only speaker. Args: - participants (List[BaseChatAgent]): The participants in the group chat. + participants (List[ChatAgent | Team]): The participants in the group chat. + name (str | None, optional): The name of the group chat, using :attr:`~autogen_agentchat.teams.RoundRobinGroupChat.DEFAULT_NAME` if not provided. + The name is used by a parent team to identify this group chat so it must be unique within the parent team. + description (str | None, optional): The description of the group chat, using :attr:`~autogen_agentchat.teams.RoundRobinGroupChat.DEFAULT_DESCRIPTION` if not provided. termination_condition (TerminationCondition, optional): The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely. max_turns (int, optional): The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit. @@ -112,69 +127,124 @@ class RoundRobinGroupChat(BaseGroupChat, Component[RoundRobinGroupChatConfig]): Examples: - A team with one participant with tools: + A team with one participant with tools: - .. code-block:: python + .. code-block:: python - import asyncio - from autogen_ext.models.openai import OpenAIChatCompletionClient - from autogen_agentchat.agents import AssistantAgent - from autogen_agentchat.teams import RoundRobinGroupChat - from autogen_agentchat.conditions import TextMentionTermination - from autogen_agentchat.ui import Console + import asyncio + from autogen_ext.models.openai import OpenAIChatCompletionClient + from autogen_agentchat.agents import AssistantAgent + from autogen_agentchat.teams import RoundRobinGroupChat + from autogen_agentchat.conditions import TextMentionTermination + from autogen_agentchat.ui import Console - async def main() -> None: - model_client = OpenAIChatCompletionClient(model="gpt-4o") + async def main() -> None: + model_client = OpenAIChatCompletionClient(model="gpt-4o") - async def get_weather(location: str) -> str: - return f"The weather in {location} is sunny." + async def get_weather(location: str) -> str: + return f"The weather in {location} is sunny." - assistant = AssistantAgent( - "Assistant", - model_client=model_client, - tools=[get_weather], - ) - termination = TextMentionTermination("TERMINATE") - team = RoundRobinGroupChat([assistant], termination_condition=termination) - await Console(team.run_stream(task="What's the weather in New York?")) + assistant = AssistantAgent( + "Assistant", + model_client=model_client, + tools=[get_weather], + ) + termination = TextMentionTermination("TERMINATE") + team = RoundRobinGroupChat([assistant], termination_condition=termination) + await Console(team.run_stream(task="What's the weather in New York?")) - asyncio.run(main()) + asyncio.run(main()) - A team with multiple participants: + A team with multiple participants: - .. code-block:: python + .. code-block:: python - import asyncio - from autogen_ext.models.openai import OpenAIChatCompletionClient - from autogen_agentchat.agents import AssistantAgent - from autogen_agentchat.teams import RoundRobinGroupChat - from autogen_agentchat.conditions import TextMentionTermination - from autogen_agentchat.ui import Console + import asyncio + from autogen_ext.models.openai import OpenAIChatCompletionClient + from autogen_agentchat.agents import AssistantAgent + from autogen_agentchat.teams import RoundRobinGroupChat + from autogen_agentchat.conditions import TextMentionTermination + from autogen_agentchat.ui import Console - async def main() -> None: - model_client = OpenAIChatCompletionClient(model="gpt-4o") + async def main() -> None: + model_client = OpenAIChatCompletionClient(model="gpt-4o") - agent1 = AssistantAgent("Assistant1", model_client=model_client) - agent2 = AssistantAgent("Assistant2", model_client=model_client) - termination = TextMentionTermination("TERMINATE") - team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) - await Console(team.run_stream(task="Tell me some jokes.")) + agent1 = AssistantAgent("Assistant1", model_client=model_client) + agent2 = AssistantAgent("Assistant2", model_client=model_client) + termination = TextMentionTermination("TERMINATE") + team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) + await Console(team.run_stream(task="Tell me some jokes.")) - asyncio.run(main()) + asyncio.run(main()) + + A team of user proxy and a nested team of writer and reviewer agents: + + .. code-block:: python + + import asyncio + + from autogen_agentchat.agents import UserProxyAgent, AssistantAgent + from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination + from autogen_agentchat.teams import RoundRobinGroupChat + from autogen_agentchat.ui import Console + from autogen_ext.models.openai import OpenAIChatCompletionClient + + + async def main() -> None: + model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") + + writer = AssistantAgent( + "writer", model_client=model_client, system_message="You are a writer.", model_client_stream=True + ) + + reviewer = AssistantAgent( + "reviewer", + model_client=model_client, + system_message="Provide feedback to the input and suggest improvements.", + model_client_stream=True, + ) + + # NOTE: you can skip input by pressing Enter. + user_proxy = UserProxyAgent("user_proxy") + + # Maximum 1 round of review and revision. + inner_termination = MaxMessageTermination(max_messages=4) + + # The outter-loop termination condition that will terminate the team when the user types "exit". + outter_termination = TextMentionTermination("exit", sources=["user_proxy"]) + + team = RoundRobinGroupChat( + [ + # For each turn, the writer writes a summary and the reviewer reviews it. + RoundRobinGroupChat([writer, reviewer], termination_condition=inner_termination), + # The user proxy gets user input once the writer and reviewer have finished their actions. + user_proxy, + ], + termination_condition=outter_termination, + ) + # Start the team and wait for it to terminate. + await Console(team.run_stream(task="Write a short essay about the impact of AI on society.")) + + + asyncio.run(main()) """ component_config_schema = RoundRobinGroupChatConfig component_provider_override = "autogen_agentchat.teams.RoundRobinGroupChat" - # TODO: Add * to the constructor to separate the positional parameters from the kwargs. - # This may be a breaking change so let's wait until a good time to do it. + DEFAULT_NAME = "RoundRobinGroupChat" + DEFAULT_DESCRIPTION = "A team of agents." + def __init__( self, - participants: List[ChatAgent], + participants: List[ChatAgent | Team], + *, + name: str | None = None, + description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, @@ -182,7 +252,9 @@ class RoundRobinGroupChat(BaseGroupChat, Component[RoundRobinGroupChatConfig]): emit_team_events: bool = False, ) -> None: super().__init__( - participants, + name=name or self.DEFAULT_NAME, + description=description or self.DEFAULT_DESCRIPTION, + participants=participants, group_chat_manager_name="RoundRobinGroupChatManager", group_chat_manager_class=RoundRobinGroupChatManager, termination_condition=termination_condition, @@ -226,6 +298,8 @@ class RoundRobinGroupChat(BaseGroupChat, Component[RoundRobinGroupChatConfig]): participants = [participant.dump_component() for participant in self._participants] termination_condition = self._termination_condition.dump_component() if self._termination_condition else None return RoundRobinGroupChatConfig( + name=self._name, + description=self._description, participants=participants, termination_condition=termination_condition, max_turns=self._max_turns, @@ -234,12 +308,20 @@ class RoundRobinGroupChat(BaseGroupChat, Component[RoundRobinGroupChatConfig]): @classmethod def _from_config(cls, config: RoundRobinGroupChatConfig) -> Self: - participants = [ChatAgent.load_component(participant) for participant in config.participants] + participants: List[ChatAgent | Team] = [] + for participant in config.participants: + if participant.component_type == Team.component_type: + participants.append(Team.load_component(participant)) + else: + participants.append(ChatAgent.load_component(participant)) + termination_condition = ( TerminationCondition.load_component(config.termination_condition) if config.termination_condition else None ) return cls( participants, + name=config.name, + description=config.description, termination_condition=termination_condition, max_turns=config.max_turns, emit_team_events=config.emit_team_events, diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py index eaa3c320d..480dc6b71 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_selector_group_chat.py @@ -22,8 +22,7 @@ from pydantic import BaseModel from typing_extensions import Self from ... import TRACE_LOGGER_NAME -from ...agents import BaseChatAgent -from ...base import ChatAgent, TerminationCondition +from ...base import ChatAgent, Team, TerminationCondition from ...messages import ( BaseAgentEvent, BaseChatMessage, @@ -345,6 +344,8 @@ class SelectorGroupChatManager(BaseGroupChatManager): class SelectorGroupChatConfig(BaseModel): """The declarative configuration for SelectorGroupChat.""" + name: str | None = None + description: str | None = None participants: List[ComponentModel] model_client: ComponentModel termination_condition: ComponentModel | None = None @@ -362,11 +363,27 @@ class SelectorGroupChat(BaseGroupChat, Component[SelectorGroupChatConfig]): """A group chat team that have participants takes turn to publish a message to all, using a ChatCompletion model to select the next speaker after each message. + If an :class:`~autogen_agentchat.base.ChatAgent` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` from the agent response's + :attr:`~autogen_agentchat.base.Response.chat_message` will be published + to other participants in the group chat. + + If a :class:`~autogen_agentchat.base.Team` is a participant, + the :class:`~autogen_agentchat.messages.BaseChatMessage` + from the team result' :attr:`~autogen_agentchat.base.TaskResult.messages` will be published + to other participants in the group chat. + Args: - participants (List[ChatAgent]): The participants in the group chat, + participants (List[ChatAgent | Team]): The participants in the group chat, must have unique names and at least two participants. model_client (ChatCompletionClient): The ChatCompletion model client used to select the next speaker. + name (str | None, optional): The name of the group chat, using + :attr:`~autogen_agentchat.teams.SelectorGroupChat.DEFAULT_NAME` if not provided. + The name is used by a parent team to identify this group chat so it must + be unique within the parent team. + description (str | None, optional): The description of the group chat, using + :attr:`~autogen_agentchat.teams.SelectorGroupChat.DEFAULT_DESCRIPTION` if not provided. termination_condition (TerminationCondition, optional): The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely. max_turns (int, optional): The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit. @@ -574,11 +591,16 @@ class SelectorGroupChat(BaseGroupChat, Component[SelectorGroupChatConfig]): component_config_schema = SelectorGroupChatConfig component_provider_override = "autogen_agentchat.teams.SelectorGroupChat" + DEFAULT_NAME = "SelectorGroupChat" + DEFAULT_DESCRIPTION = "A team of agents." + def __init__( self, - participants: List[ChatAgent], + participants: List[ChatAgent | Team], model_client: ChatCompletionClient, *, + name: str | None = None, + description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, @@ -600,7 +622,9 @@ Read the above conversation. Then select the next role from {participants} to pl model_context: ChatCompletionContext | None = None, ): super().__init__( - participants, + name=name or self.DEFAULT_NAME, + description=description or self.DEFAULT_DESCRIPTION, + participants=participants, group_chat_manager_name="SelectorGroupChatManager", group_chat_manager_class=SelectorGroupChatManager, termination_condition=termination_condition, @@ -658,6 +682,8 @@ Read the above conversation. Then select the next role from {participants} to pl def _to_config(self) -> SelectorGroupChatConfig: return SelectorGroupChatConfig( + name=self._name, + description=self._description, participants=[participant.dump_component() for participant in self._participants], model_client=self._model_client.dump_component(), termination_condition=self._termination_condition.dump_component() if self._termination_condition else None, @@ -673,9 +699,21 @@ Read the above conversation. Then select the next role from {participants} to pl @classmethod def _from_config(cls, config: SelectorGroupChatConfig) -> Self: + participants: List[ChatAgent | Team] = [] + for participant in config.participants: + if participant.component_type == ChatAgent.component_type: + participants.append(ChatAgent.load_component(participant)) + elif participant.component_type == Team.component_type: + participants.append(Team.load_component(participant)) + else: + raise ValueError( + f"Invalid participant component type: {participant.component_type}. " "Expected ChatAgent or Team." + ) return cls( - participants=[BaseChatAgent.load_component(participant) for participant in config.participants], + participants=participants, model_client=ChatCompletionClient.load_component(config.model_client), + name=config.name, + description=config.description, termination_condition=TerminationCondition.load_component(config.termination_condition) if config.termination_condition else None, diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py index 344994015..c9b495083 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_swarm_group_chat.py @@ -115,6 +115,8 @@ class SwarmGroupChatManager(BaseGroupChatManager): class SwarmConfig(BaseModel): """The declarative configuration for Swarm.""" + name: str | None = None + description: str | None = None participants: List[ComponentModel] termination_condition: ComponentModel | None = None max_turns: int | None = None @@ -129,8 +131,17 @@ class Swarm(BaseGroupChat, Component[SwarmConfig]): sent by the current speaker. If no handoff message is sent, the current speaker continues to be the speaker. + .. note:: + + Unlike :class:`~autogen_agentchat.teams.RoundRobinGroupChat` and + :class:`~autogen_agentchat.teams.SelectorGroupChat`, this group chat + team does not support inner teams as participants. + Args: participants (List[ChatAgent]): The agents participating in the group chat. The first agent in the list is the initial speaker. + name (str | None, optional): The name of the group chat, using :attr:`~autogen_agentchat.teams.Swarm.DEFAULT_NAME` if not provided. + The name is used by a parent team to identify this group chat so it must be unique within the parent team. + description (str | None, optional): The description of the group chat, using :attr:`~autogen_agentchat.teams.Swarm.DEFAULT_DESCRIPTION` if not provided. termination_condition (TerminationCondition, optional): The termination condition for the group chat. Defaults to None. Without a termination condition, the group chat will run indefinitely. max_turns (int, optional): The maximum number of turns in the group chat before stopping. Defaults to None, meaning no limit. @@ -216,19 +227,28 @@ class Swarm(BaseGroupChat, Component[SwarmConfig]): component_config_schema = SwarmConfig component_provider_override = "autogen_agentchat.teams.Swarm" - # TODO: Add * to the constructor to separate the positional parameters from the kwargs. - # This may be a breaking change so let's wait until a good time to do it. + DEFAULT_NAME = "Swarm" + DEFAULT_DESCRIPTION = "A team of agents." + def __init__( self, participants: List[ChatAgent], + *, + name: str | None = None, + description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, ) -> None: + for participant in participants: + if not isinstance(participant, ChatAgent): + raise TypeError(f"Participant {participant} must be a ChatAgent.") super().__init__( - participants, + name=name or self.DEFAULT_NAME, + description=description or self.DEFAULT_DESCRIPTION, + participants=[participant for participant in participants], group_chat_manager_name="SwarmGroupChatManager", group_chat_manager_class=SwarmGroupChatManager, termination_condition=termination_condition, @@ -239,6 +259,7 @@ class Swarm(BaseGroupChat, Component[SwarmConfig]): ) # The first participant must be able to produce handoff messages. first_participant = self._participants[0] + assert isinstance(first_participant, ChatAgent) if HandoffMessage not in first_participant.produced_message_types: raise ValueError("The first participant must be able to produce a handoff messages.") @@ -276,6 +297,8 @@ class Swarm(BaseGroupChat, Component[SwarmConfig]): participants = [participant.dump_component() for participant in self._participants] termination_condition = self._termination_condition.dump_component() if self._termination_condition else None return SwarmConfig( + name=self._name, + description=self._description, participants=participants, termination_condition=termination_condition, max_turns=self._max_turns, @@ -290,6 +313,8 @@ class Swarm(BaseGroupChat, Component[SwarmConfig]): ) return cls( participants, + name=config.name, + description=config.description, termination_condition=termination_condition, max_turns=config.max_turns, emit_team_events=config.emit_team_events, diff --git a/python/packages/autogen-agentchat/tests/test_group_chat_nested.py b/python/packages/autogen-agentchat/tests/test_group_chat_nested.py new file mode 100644 index 000000000..40301863e --- /dev/null +++ b/python/packages/autogen-agentchat/tests/test_group_chat_nested.py @@ -0,0 +1,668 @@ +import logging +import tempfile +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from autogen_agentchat import EVENT_LOGGER_NAME +from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent +from autogen_agentchat.base import TaskResult +from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination +from autogen_agentchat.messages import ( + BaseAgentEvent, + BaseChatMessage, + TextMessage, +) +from autogen_agentchat.teams import RoundRobinGroupChat, SelectorGroupChat, Swarm +from autogen_core import AgentRuntime, SingleThreadedAgentRuntime +from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor +from autogen_ext.models.replay import ReplayChatCompletionClient + +# Import test utilities from the main test file +from utils import FileLogHandler + +logger = logging.getLogger(EVENT_LOGGER_NAME) +logger.setLevel(logging.DEBUG) +logger.addHandler(FileLogHandler("test_group_chat_nested.log")) + + +@pytest_asyncio.fixture(params=["single_threaded", "embedded"]) # type: ignore +async def runtime(request: pytest.FixtureRequest) -> AsyncGenerator[AgentRuntime | None, None]: + if request.param == "single_threaded": + runtime = SingleThreadedAgentRuntime() + runtime.start() + yield runtime + await runtime.stop() + elif request.param == "embedded": + yield None + + +@pytest.mark.asyncio +async def test_round_robin_group_chat_nested_teams_run(runtime: AgentRuntime | None) -> None: + """Test RoundRobinGroupChat with nested teams using run method.""" + model_client = ReplayChatCompletionClient( + [ + 'Here is the program\n ```python\nprint("Hello, world!")\n```', + "TERMINATE", + "Good job", + "TERMINATE", + ], + ) + with tempfile.TemporaryDirectory() as temp_dir: + code_executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) + assistant = AssistantAgent( + "assistant", + model_client=model_client, + description="An assistant agent that writes code.", + ) + code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) + termination = TextMentionTermination("TERMINATE") + + # Create inner team (assistant + code executor) + inner_team = RoundRobinGroupChat( + participants=[assistant, code_executor_agent], + termination_condition=termination, + runtime=runtime, + ) + + # Create reviewer agent + reviewer = AssistantAgent( + "reviewer", + model_client=model_client, + description="A reviewer agent that reviews code.", + ) + + # Create outer team with nested inner team + outer_team = RoundRobinGroupChat( + participants=[inner_team, reviewer], + termination_condition=termination, + runtime=runtime, + ) + + result = await outer_team.run(task="Write a program that prints 'Hello, world!'") + + # Should have task message + inner team result + reviewer response + termination + assert len(result.messages) >= 4 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Write a program that prints 'Hello, world!'" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + +@pytest.mark.asyncio +async def test_round_robin_group_chat_nested_teams_run_stream(runtime: AgentRuntime | None) -> None: + """Test RoundRobinGroupChat with nested teams using run_stream method.""" + model_client = ReplayChatCompletionClient( + [ + 'Here is the program\n ```python\nprint("Hello, world!")\n```', + "TERMINATE", + "Good job", + "TERMINATE", + ], + ) + with tempfile.TemporaryDirectory() as temp_dir: + code_executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) + assistant = AssistantAgent( + "assistant", + model_client=model_client, + description="An assistant agent that writes code.", + ) + code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) + termination = TextMentionTermination("TERMINATE") + + # Create inner team (assistant + code executor) + inner_team = RoundRobinGroupChat( + participants=[assistant, code_executor_agent], + termination_condition=termination, + runtime=runtime, + ) + + # Create reviewer agent + reviewer = AssistantAgent( + "reviewer", + model_client=model_client, + description="A reviewer agent that reviews code.", + ) + + # Create outer team with nested inner team + outer_team = RoundRobinGroupChat( + participants=[inner_team, reviewer], + termination_condition=termination, + runtime=runtime, + ) + + messages: list[BaseAgentEvent | BaseChatMessage] = [] + result = None + async for message in outer_team.run_stream(task="Write a program that prints 'Hello, world!'"): + if isinstance(message, TaskResult): + result = message + else: + messages.append(message) + + assert result is not None + assert len(result.messages) >= 4 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Write a program that prints 'Hello, world!'" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + +@pytest.mark.asyncio +async def test_round_robin_group_chat_nested_teams_dump_load_component(runtime: AgentRuntime | None) -> None: + """Test RoundRobinGroupChat with nested teams dump_component and load_component.""" + model_client = ReplayChatCompletionClient(["Hello from agent1", "Hello from agent2", "Hello from agent3"]) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + termination = MaxMessageTermination(2) + + # Create inner team + inner_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + description="Inner team description", + ) + + # Create outer team with nested inner team + outer_team = RoundRobinGroupChat( + participants=[inner_team, agent3], + termination_condition=termination, + runtime=runtime, + name="OuterTeam", + description="Outer team description", + ) + + # Test dump_component + config = outer_team.dump_component() + assert config.config["name"] == "OuterTeam" + assert config.config["description"] == "Outer team description" + assert len(config.config["participants"]) == 2 + + # First participant should be the inner team + inner_team_config = config.config["participants"][0]["config"] + assert inner_team_config["name"] == "InnerTeam" + assert inner_team_config["description"] == "Inner team description" + assert len(inner_team_config["participants"]) == 2 + + # Second participant should be agent3 + agent3_config = config.config["participants"][1]["config"] + assert agent3_config["name"] == "agent3" + + # Test load_component + loaded_team = RoundRobinGroupChat.load_component(config) + assert loaded_team.name == "OuterTeam" + assert loaded_team.description == "Outer team description" + assert len(loaded_team._participants) == 2 # type: ignore[reportPrivateUsage] + + # Verify the loaded team has the same structure + loaded_config = loaded_team.dump_component() + assert loaded_config == config + + +@pytest.mark.asyncio +async def test_round_robin_group_chat_nested_teams_save_load_state(runtime: AgentRuntime | None) -> None: + """Test RoundRobinGroupChat with nested teams save_state and load_state.""" + model_client = ReplayChatCompletionClient(["Hello from agent1", "Hello from agent2", "TERMINATE"]) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + termination = TextMentionTermination("TERMINATE") # Use TextMentionTermination + + # Create inner team + inner_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=termination, + runtime=runtime, + ) + + # Create outer team with nested inner team + outer_team1 = RoundRobinGroupChat( + participants=[inner_team, agent3], + termination_condition=termination, + runtime=runtime, + ) + + # Run the team to generate state + await outer_team1.run(task="Test message") + + # Save state + state = await outer_team1.save_state() + + # Create new agents and teams + agent4 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent5 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent6 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + + inner_team2 = RoundRobinGroupChat( + participants=[agent4, agent5], + termination_condition=termination, + runtime=runtime, + ) + + outer_team2 = RoundRobinGroupChat( + participants=[inner_team2, agent6], + termination_condition=termination, + runtime=runtime, + ) + + # Load state + await outer_team2.load_state(state) + + # Verify state was loaded correctly + state2 = await outer_team2.save_state() + assert state == state2 + + +@pytest.mark.asyncio +async def test_selector_group_chat_nested_teams_run(runtime: AgentRuntime | None) -> None: + """Test SelectorGroupChat with nested teams using run method.""" + model_client = ReplayChatCompletionClient( + [ + "InnerTeam", # Select inner team first + 'Here is the program\n ```python\nprint("Hello, world!")\n```', + "TERMINATE", + "agent3", # Select agent3 (reviewer) + "Good job", + "TERMINATE", + ], + ) + with tempfile.TemporaryDirectory() as temp_dir: + code_executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) + assistant = AssistantAgent( + "assistant", + model_client=model_client, + description="An assistant agent that writes code.", + ) + code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) + termination = TextMentionTermination("TERMINATE") + + # Create inner team (assistant + code executor) + inner_team = RoundRobinGroupChat( + participants=[assistant, code_executor_agent], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + description="Team that writes and executes code", + ) + + # Create reviewer agent + reviewer = AssistantAgent( + "agent3", + model_client=model_client, + description="A reviewer agent that reviews code.", + ) + + # Create outer team with nested inner team + outer_team = SelectorGroupChat( + participants=[inner_team, reviewer], + model_client=model_client, + termination_condition=termination, + runtime=runtime, + ) + + result = await outer_team.run(task="Write a program that prints 'Hello, world!'") + + # Should have task message + selector events + inner team result + reviewer response + assert len(result.messages) >= 4 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Write a program that prints 'Hello, world!'" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + +@pytest.mark.asyncio +async def test_selector_group_chat_nested_teams_run_stream(runtime: AgentRuntime | None) -> None: + """Test SelectorGroupChat with nested teams using run_stream method.""" + model_client = ReplayChatCompletionClient( + [ + "InnerTeam", # Select inner team first + 'Here is the program\n ```python\nprint("Hello, world!")\n```', + "TERMINATE", + "agent3", # Select agent3 (reviewer) + "Good job", + "TERMINATE", + ], + ) + with tempfile.TemporaryDirectory() as temp_dir: + code_executor = LocalCommandLineCodeExecutor(work_dir=temp_dir) + assistant = AssistantAgent( + "assistant", + model_client=model_client, + description="An assistant agent that writes code.", + ) + code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) + termination = TextMentionTermination("TERMINATE") + + # Create inner team (assistant + code executor) + inner_team = RoundRobinGroupChat( + participants=[assistant, code_executor_agent], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + description="Team that writes and executes code", + ) + + # Create reviewer agent + reviewer = AssistantAgent( + "agent3", + model_client=model_client, + description="A reviewer agent that reviews code.", + ) + + # Create outer team with nested inner team + outer_team = SelectorGroupChat( + participants=[inner_team, reviewer], + model_client=model_client, + termination_condition=termination, + runtime=runtime, + ) + + messages: list[BaseAgentEvent | BaseChatMessage] = [] + result = None + async for message in outer_team.run_stream(task="Write a program that prints 'Hello, world!'"): + if isinstance(message, TaskResult): + result = message + else: + messages.append(message) + + assert result is not None + assert len(result.messages) >= 4 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Write a program that prints 'Hello, world!'" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + +@pytest.mark.asyncio +async def test_selector_group_chat_nested_teams_dump_load_component(runtime: AgentRuntime | None) -> None: + """Test SelectorGroupChat with nested teams dump_component and load_component.""" + model_client = ReplayChatCompletionClient(["agent1", "Hello from agent1", "agent3", "Hello from agent3"]) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + termination = MaxMessageTermination(2) + + # Create inner team + inner_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + description="Inner team description", + ) + + # Create outer team with nested inner team + outer_team = SelectorGroupChat( + participants=[inner_team, agent3], + model_client=model_client, + termination_condition=termination, + runtime=runtime, + name="OuterTeam", + description="Outer team description", + ) + + # Test dump_component + config = outer_team.dump_component() + assert config.config["name"] == "OuterTeam" + assert config.config["description"] == "Outer team description" + assert len(config.config["participants"]) == 2 + + # First participant should be the inner team + inner_team_config = config.config["participants"][0]["config"] + assert inner_team_config["name"] == "InnerTeam" + assert inner_team_config["description"] == "Inner team description" + assert len(inner_team_config["participants"]) == 2 + + # Second participant should be agent3 + agent3_config = config.config["participants"][1]["config"] + assert agent3_config["name"] == "agent3" + + # Test load_component + loaded_team = SelectorGroupChat.load_component(config) + assert loaded_team.name == "OuterTeam" + assert loaded_team.description == "Outer team description" + assert len(loaded_team._participants) == 2 # type: ignore[reportPrivateUsage] + + # Verify the loaded team has the same structure + loaded_config = loaded_team.dump_component() + assert loaded_config == config + + +@pytest.mark.asyncio +async def test_selector_group_chat_nested_teams_save_load_state(runtime: AgentRuntime | None) -> None: + """Test SelectorGroupChat with nested teams save_state and load_state.""" + model_client = ReplayChatCompletionClient(["InnerTeam", "Hello from inner team", "agent3", "TERMINATE"]) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + termination = TextMentionTermination("TERMINATE") + + # Create inner team + inner_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + ) + + # Create outer team with nested inner team + outer_team1 = SelectorGroupChat( + participants=[inner_team, agent3], + model_client=model_client, + termination_condition=termination, + runtime=runtime, + ) + + # Run the team to generate state + await outer_team1.run(task="Test message") + + # Save state + state = await outer_team1.save_state() + + # Create new agents and teams + agent4 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent5 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent6 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + + inner_team2 = RoundRobinGroupChat( + participants=[agent4, agent5], + termination_condition=termination, + runtime=runtime, + name="InnerTeam", + ) + + outer_team2 = SelectorGroupChat( + participants=[inner_team2, agent6], + model_client=model_client, + termination_condition=termination, + runtime=runtime, + ) + + # Load state + await outer_team2.load_state(state) + + # Verify state was loaded correctly + state2 = await outer_team2.save_state() + assert state == state2 + + +@pytest.mark.asyncio +async def test_swarm_doesnt_support_nested_teams() -> None: + """Test that Swarm raises TypeError when provided with nested teams.""" + model_client = ReplayChatCompletionClient(["Hello", "TERMINATE"]) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + termination = TextMentionTermination("TERMINATE") + + # Create inner team + inner_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=termination, + ) + + # Verify that Swarm raises TypeError when trying to use a team as participant + with pytest.raises(TypeError, match="Participant .* must be a ChatAgent"): + Swarm( + participants=[inner_team, agent3], # type: ignore + termination_condition=termination, + ) + + +@pytest.mark.asyncio +async def test_round_robin_deeply_nested_teams(runtime: AgentRuntime | None) -> None: + """Test RoundRobinGroupChat with deeply nested teams (3 levels).""" + model_client = ReplayChatCompletionClient( + [ + "Hello from agent1", + "TERMINATE from agent2", + "World from agent3", + "Hello from agent1", + "Hello from agent2", + "TERMINATE from agent1", + "TERMINATE from agent3", + "Review from agent4", + "TERMINATE from agent2", + "TERMINATE from agent3", + "TERMINATE from agent4", + ] + ) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client, description="Third agent") + agent4 = AssistantAgent("agent4", model_client=model_client, description="Fourth agent") + + # Create innermost team (level 1) + innermost_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=TextMentionTermination("TERMINATE", sources=["agent1", "agent2"]), + runtime=runtime, + name="InnermostTeam", + ) + + # Create middle team (level 2) + middle_team = RoundRobinGroupChat( + participants=[innermost_team, agent3], + termination_condition=TextMentionTermination("TERMINATE", sources=["agent3"]), + runtime=runtime, + name="MiddleTeam", + ) + + # Create outermost team (level 3) + outermost_team = RoundRobinGroupChat( + participants=[middle_team, agent4], + termination_condition=TextMentionTermination("TERMINATE", sources=["agent4"]), + runtime=runtime, + name="OutermostTeam", + ) + + result: TaskResult | None = None + async for msg in outermost_team.run_stream(task="Test deep nesting"): + if isinstance(msg, TaskResult): + result = msg + assert result is not None + # Should have task message + responses from each level + assert len(result.messages) == 12 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Test deep nesting" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + # Test component serialization of deeply nested structure + config = outermost_team.dump_component() + loaded_team = RoundRobinGroupChat.load_component(config) + assert loaded_team.name == "OutermostTeam" + + # Verify nested structure is preserved + loaded_config = loaded_team.dump_component() + assert loaded_config == config + + +@pytest.mark.asyncio +async def test_selector_deeply_nested_teams(runtime: AgentRuntime | None) -> None: + """Test SelectorGroupChat with deeply nested teams (3 levels).""" + model_client_inner = ReplayChatCompletionClient( + [ + "Hello from innermost agent 1", + "Hello from innermost agent 2", + "TERMINATE from innermost agent 1", + ] + ) + model_client_middle = ReplayChatCompletionClient( + [ + "InnermostTeam", # Select innermost team + "TERMINATE from agent3", + ] + ) + model_client_outter = ReplayChatCompletionClient( + [ + "MiddleTeam", # Select middle team + "agent4", # Select agent4 + "Hello from outermost agent 4", + "agent4", # Select agent4 again + "TERMINATE from agent4", + ] + ) + + # Create agents + agent1 = AssistantAgent("agent1", model_client=model_client_inner, description="First agent") + agent2 = AssistantAgent("agent2", model_client=model_client_inner, description="Second agent") + agent3 = AssistantAgent("agent3", model_client=model_client_middle, description="Third agent") + agent4 = AssistantAgent("agent4", model_client=model_client_outter, description="Fourth agent") + + # Create innermost team (level 1) - RoundRobin for simplicity + innermost_team = RoundRobinGroupChat( + participants=[agent1, agent2], + termination_condition=TextMentionTermination("TERMINATE", sources=["agent1", "agent2"]), + runtime=runtime, + name="InnermostTeam", + ) + + # Create middle team (level 2) - Selector + middle_team = SelectorGroupChat( + participants=[innermost_team, agent3], + model_client=model_client_middle, + termination_condition=TextMentionTermination("TERMINATE", sources=["agent3"]), + runtime=runtime, + name="MiddleTeam", + ) + + # Create outermost team (level 3) - Selector + outermost_team = SelectorGroupChat( + participants=[middle_team, agent4], + model_client=model_client_outter, + termination_condition=TextMentionTermination("TERMINATE", sources=["agent4"]), + runtime=runtime, + name="OutermostTeam", + allow_repeated_speaker=True, + ) + + result: TaskResult | None = None + async for msg in outermost_team.run_stream(task="Test deep nesting"): + if isinstance(msg, TaskResult): + result = msg + assert result is not None + + # Should have task message + selector events + responses from each level + assert len(result.messages) == 7 + assert isinstance(result.messages[0], TextMessage) + assert result.messages[0].content == "Test deep nesting" + assert result.stop_reason is not None and "TERMINATE" in result.stop_reason + + # Test component serialization of deeply nested structure + config = outermost_team.dump_component() + loaded_team = SelectorGroupChat.load_component(config) + assert loaded_team.name == "OutermostTeam" + + # Verify nested structure is preserved + loaded_config = loaded_team.dump_component() + assert loaded_config == config