diff --git a/python/docs/src/getting-started/message-and-communication.ipynb b/python/docs/src/getting-started/message-and-communication.ipynb index 3f12d7e91..a38204f1f 100644 --- a/python/docs/src/getting-started/message-and-communication.ipynb +++ b/python/docs/src/getting-started/message-and-communication.ipynb @@ -1,424 +1,419 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Message and Communication\n", - "\n", - "An agent in AGNext can react to, send, and publish messages,\n", - "and messages are the only means through which agents can communicate\n", - "with each other." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Messages\n", - "\n", - "Messages are serializable objects, they can be defined using:\n", - "\n", - "- A subclass of Pydantic's {py:class}`pydantic.BaseModel`, or\n", - "- A dataclass\n", - "\n", - "For example:" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "from dataclasses import dataclass\n", - "\n", - "\n", - "@dataclass\n", - "class TextMessage:\n", - " content: str\n", - " source: str\n", - "\n", - "\n", - "@dataclass\n", - "class ImageMessage:\n", - " url: str\n", - " source: str" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "```{note}\n", - "Messages are purely data, and should not contain any logic.\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Message Handlers\n", - "\n", - "When an agent receives a message the runtime will invoke the agent's message handler\n", - "({py:meth}`~agnext.core.Agent.on_message`) which should implement the agents message handling logic.\n", - "If this message cannot be handled by the agent, the agent should raise a\n", - "{py:class}`~agnext.core.exceptions.CantHandleException`.\n", - "\n", - "For convenience, the {py:class}`~agnext.components.RoutedAgent` base class\n", - "provides the {py:meth}`~agnext.components.message_handler` decorator\n", - "for associating message types with message handlers,\n", - "so developers do not need to implement the {py:meth}`~agnext.core.Agent.on_message` method.\n", - "\n", - "For example, the following type-routed agent responds to `TextMessage` and `ImageMessage`\n", - "using different message handlers:" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "from agnext.application import SingleThreadedAgentRuntime\n", - "from agnext.components import RoutedAgent, message_handler\n", - "from agnext.core import AgentId, MessageContext\n", - "\n", - "\n", - "class MyAgent(RoutedAgent):\n", - " @message_handler\n", - " async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", - " print(f\"Hello, {message.source}, you said {message.content}!\")\n", - "\n", - " @message_handler\n", - " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", - " print(f\"Hello, {message.source}, you sent me {message.url}!\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create the agent runtime and register the agent (see [Agent and Agent Runtime](agent-and-agent-runtime.ipynb)):" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [], - "source": [ - "runtime = SingleThreadedAgentRuntime()\n", - "await runtime.register(\"my_agent\", lambda: MyAgent(\"My Agent\"))\n", - "agent = AgentId(\"my_agent\", \"default\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Test this agent with `TextMessage` and `ImageMessage`." - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Hello, User, you said Hello, World!!\n", - "Hello, User, you sent me https://example.com/image.jpg!\n" - ] - } - ], - "source": [ - "runtime.start()\n", - "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent)\n", - "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent)\n", - "await runtime.stop_when_idle()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Communication\n", - "\n", - "There are two types of communication in AGNext:\n", - "\n", - "- **Direct communication**: An agent sends a direct message to another agent.\n", - "- **Broadcast communication**: An agent publishes a message to all agents in the same namespace.\n", - "\n", - "### Direct Communication\n", - "\n", - "To send a direct message to another agent, within a message handler use\n", - "the {py:meth}`agnext.core.BaseAgent.send_message` method,\n", - "from the runtime use the {py:meth}`agnext.core.AgentRuntime.send_message` method.\n", - "Awaiting calls to these methods will return the return value of the\n", - "receiving agent's message handler.\n", - "\n", - "```{note}\n", - "If the invoked agent raises an exception while the sender is awaiting,\n", - "the exception will be propagated back to the sender.\n", - "```\n", - "\n", - "#### Request/Response\n", - "\n", - "Direct communication can be used for request/response scenarios,\n", - "where the sender expects a response from the receiver.\n", - "The receiver can respond to the message by returning a value from its message handler.\n", - "You can think of this as a function call between agents.\n", - "\n", - "For example, consider the following type-routed agent:" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "metadata": {}, - "outputs": [], - "source": [ - "from dataclasses import dataclass\n", - "\n", - "from agnext.application import SingleThreadedAgentRuntime\n", - "from agnext.components import RoutedAgent, message_handler\n", - "from agnext.core import MessageContext\n", - "\n", - "\n", - "@dataclass\n", - "class Message:\n", - " content: str\n", - "\n", - "\n", - "class InnerAgent(RoutedAgent):\n", - " @message_handler\n", - " async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:\n", - " return Message(content=f\"Hello from inner, {message.content}\")\n", - "\n", - "\n", - "class OuterAgent(RoutedAgent):\n", - " def __init__(self, description: str, inner_agent_type: str):\n", - " super().__init__(description)\n", - " self.inner_agent_id = AgentId(inner_agent_type, self.id.key)\n", - "\n", - " @message_handler\n", - " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", - " print(f\"Received message: {message.content}\")\n", - " # Send a direct message to the inner agent and receves a response.\n", - " response = await self.send_message(Message(f\"Hello from outer, {message.content}\"), self.inner_agent_id)\n", - " print(f\"Received inner response: {response.content}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Upone receving a message, the `OuterAgent` sends a direct message to the `InnerAgent` and receives\n", - "a message in response.\n", - "\n", - "We can test these agents by sending a `Message` to the `OuterAgent`." - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Received message: Hello, World!\n", - "Received inner response: Hello from inner, Hello from outer, Hello, World!\n" - ] - } - ], - "source": [ - "runtime = SingleThreadedAgentRuntime()\n", - "await runtime.register(\"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n", - "await runtime.register(\"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"InnerAgent\"))\n", - "runtime.start()\n", - "outer = AgentId(\"outer_agent\", \"default\")\n", - "await runtime.send_message(Message(content=\"Hello, World!\"), outer)\n", - "await runtime.stop_when_idle()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Both outputs are produced by the `OuterAgent`'s message handler, however the second output is based on the response from the `InnerAgent`." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Broadcast Communication\n", - "\n", - "Broadcast communication is effectively the publish/subscribe model.\n", - "As part of the base agent ({py:class}`~agnext.core.BaseAgent`) implementation,\n", - "it must advertise the message types that\n", - "it would like to receive when published ({py:attr}`~agnext.core.AgentMetadata.subscriptions`).\n", - "If one of these messages is published, the agent's message handler will be invoked.\n", - "\n", - "The key difference between direct and broadcast communication is that broadcast\n", - "communication cannot be used for request/response scenarios.\n", - "When an agent publishes a message it is one way only, it cannot receive a response\n", - "from any other agent, even if a receiving agent sends a response.\n", - "\n", - "```{note}\n", - "An agent receiving a message does not know if it is handling a published or direct message.\n", - "So, if a response is given to a published message, it will be thrown away.\n", - "```\n", - "\n", - "To publish a message to all agents in the same namespace,\n", - "use the {py:meth}`agnext.core.BaseAgent.publish_message` method.\n", - "This call must still be awaited to allow the runtime to deliver the message to all agents,\n", - "but it will always return `None`.\n", - "If an agent raises an exception while handling a published message,\n", - "this will be logged but will not be propagated back to the publishing agent.\n", - "\n", - "The following example shows a `BroadcastingAgent` that publishes a message\n", - "upong receiving a message. A `ReceivingAgent` that prints the message\n", - "it receives." - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "metadata": {}, - "outputs": [], - "source": [ - "from agnext.application import SingleThreadedAgentRuntime\n", - "from agnext.components import RoutedAgent, message_handler\n", - "from agnext.core import MessageContext, TopicId\n", - "\n", - "\n", - "class BroadcastingAgent(RoutedAgent):\n", - " @message_handler\n", - " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", - " # Publish a message to all agents in the same namespace.\n", - " assert ctx.topic_id is not None\n", - " await self.publish_message(\n", - " Message(f\"Publishing a message: {message.content}!\"), topic_id=TopicId(\"deafult\", self.id.key)\n", - " )\n", - "\n", - "\n", - "class ReceivingAgent(RoutedAgent):\n", - " @message_handler\n", - " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", - " print(f\"Received a message: {message.content}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Sending a direct message to the `BroadcastingAgent` will result in a message being published by\n", - "the `BroadcastingAgent` and received by the `ReceivingAgent`." - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Received a message: Publishing a message: Hello, World!!\n" - ] - } - ], - "source": [ - "from agnext.components import TypeSubscription\n", - "\n", - "runtime = SingleThreadedAgentRuntime()\n", - "await runtime.register(\"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"))\n", - "await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", - "await runtime.add_subscription(TypeSubscription(\"default\", \"broadcasting_agent\"))\n", - "await runtime.add_subscription(TypeSubscription(\"default\", \"receiving_agent\"))\n", - "runtime.start()\n", - "await runtime.send_message(Message(\"Hello, World!\"), AgentId(\"broadcasting_agent\", \"default\"))\n", - "await runtime.stop()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To publish a message to all agents outside of an agent handling a message,\n", - "the message should be published via the runtime with the\n", - "{py:meth}`agnext.core.AgentRuntime.publish_message` method." - ] - }, - { - "cell_type": "code", - "execution_count": 22, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Received a message: Hello, World! From the runtime!\n", - "Received a message: Publishing a message: Hello, World! From the runtime!!\n" - ] - } - ], - "source": [ - "# Replace send_message with publish_message in the above example.\n", - "\n", - "runtime = SingleThreadedAgentRuntime()\n", - "await runtime.register(\"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"))\n", - "await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", - "await runtime.add_subscription(TypeSubscription(\"default\", \"broadcasting_agent\"))\n", - "await runtime.add_subscription(TypeSubscription(\"default\", \"receiving_agent\"))\n", - "runtime.start()\n", - "await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=TopicId(\"default\", \"default\"))\n", - "await runtime.stop_when_idle()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The first output is from the `ReceivingAgent` that received a message published\n", - "by the runtime. The second output is from the `ReceivingAgent` that received\n", - "a message published by the `BroadcastingAgent`.\n", - "\n", - "```{note}\n", - "If an agent publishes a message type for which it is subscribed it will not\n", - "receive the message it published. This is to prevent infinite loops.\n", - "```" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "agnext", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} \ No newline at end of file + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Message and Communication\n", + "\n", + "An agent in AGNext can react to, send, and publish messages,\n", + "and messages are the only means through which agents can communicate\n", + "with each other." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Messages\n", + "\n", + "Messages are serializable objects, they can be defined using:\n", + "\n", + "- A subclass of Pydantic's {py:class}`pydantic.BaseModel`, or\n", + "- A dataclass\n", + "\n", + "For example:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "\n", + "\n", + "@dataclass\n", + "class TextMessage:\n", + " content: str\n", + " source: str\n", + "\n", + "\n", + "@dataclass\n", + "class ImageMessage:\n", + " url: str\n", + " source: str" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```{note}\n", + "Messages are purely data, and should not contain any logic.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Message Handlers\n", + "\n", + "When an agent receives a message the runtime will invoke the agent's message handler\n", + "({py:meth}`~agnext.core.Agent.on_message`) which should implement the agents message handling logic.\n", + "If this message cannot be handled by the agent, the agent should raise a\n", + "{py:class}`~agnext.core.exceptions.CantHandleException`.\n", + "\n", + "For convenience, the {py:class}`~agnext.components.RoutedAgent` base class\n", + "provides the {py:meth}`~agnext.components.message_handler` decorator\n", + "for associating message types with message handlers,\n", + "so developers do not need to implement the {py:meth}`~agnext.core.Agent.on_message` method.\n", + "\n", + "For example, the following type-routed agent responds to `TextMessage` and `ImageMessage`\n", + "using different message handlers:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from agnext.application import SingleThreadedAgentRuntime\n", + "from agnext.components import RoutedAgent, message_handler\n", + "from agnext.core import AgentId, MessageContext\n", + "\n", + "\n", + "class MyAgent(RoutedAgent):\n", + " @message_handler\n", + " async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", + " print(f\"Hello, {message.source}, you said {message.content}!\")\n", + "\n", + " @message_handler\n", + " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", + " print(f\"Hello, {message.source}, you sent me {message.url}!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create the agent runtime and register the agent (see [Agent and Agent Runtime](agent-and-agent-runtime.ipynb)):" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "await runtime.register(\"my_agent\", lambda: MyAgent(\"My Agent\"))\n", + "agent = AgentId(\"my_agent\", \"default\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Test this agent with `TextMessage` and `ImageMessage`." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Hello, User, you said Hello, World!!\n", + "Hello, User, you sent me https://example.com/image.jpg!\n" + ] + } + ], + "source": [ + "runtime.start()\n", + "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent)\n", + "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent)\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Communication\n", + "\n", + "There are two types of communication in AGNext:\n", + "\n", + "- **Direct communication**: An agent sends a direct message to another agent.\n", + "- **Broadcast communication**: An agent publishes a message to all agents in the same namespace.\n", + "\n", + "### Direct Communication\n", + "\n", + "To send a direct message to another agent, within a message handler use\n", + "the {py:meth}`agnext.core.BaseAgent.send_message` method,\n", + "from the runtime use the {py:meth}`agnext.core.AgentRuntime.send_message` method.\n", + "Awaiting calls to these methods will return the return value of the\n", + "receiving agent's message handler.\n", + "\n", + "```{note}\n", + "If the invoked agent raises an exception while the sender is awaiting,\n", + "the exception will be propagated back to the sender.\n", + "```\n", + "\n", + "#### Request/Response\n", + "\n", + "Direct communication can be used for request/response scenarios,\n", + "where the sender expects a response from the receiver.\n", + "The receiver can respond to the message by returning a value from its message handler.\n", + "You can think of this as a function call between agents.\n", + "\n", + "For example, consider the following type-routed agent:" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "\n", + "from agnext.application import SingleThreadedAgentRuntime\n", + "from agnext.components import RoutedAgent, message_handler\n", + "from agnext.core import MessageContext\n", + "\n", + "\n", + "@dataclass\n", + "class Message:\n", + " content: str\n", + "\n", + "\n", + "class InnerAgent(RoutedAgent):\n", + " @message_handler\n", + " async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:\n", + " return Message(content=f\"Hello from inner, {message.content}\")\n", + "\n", + "\n", + "class OuterAgent(RoutedAgent):\n", + " def __init__(self, description: str, inner_agent_type: str):\n", + " super().__init__(description)\n", + " self.inner_agent_id = AgentId(inner_agent_type, self.id.key)\n", + "\n", + " @message_handler\n", + " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", + " print(f\"Received message: {message.content}\")\n", + " # Send a direct message to the inner agent and receves a response.\n", + " response = await self.send_message(Message(f\"Hello from outer, {message.content}\"), self.inner_agent_id)\n", + " print(f\"Received inner response: {response.content}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Upone receving a message, the `OuterAgent` sends a direct message to the `InnerAgent` and receives\n", + "a message in response.\n", + "\n", + "We can test these agents by sending a `Message` to the `OuterAgent`." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Received message: Hello, World!\n", + "Received inner response: Hello from inner, Hello from outer, Hello, World!\n" + ] + } + ], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "await runtime.register(\"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n", + "await runtime.register(\"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"InnerAgent\"))\n", + "runtime.start()\n", + "outer = AgentId(\"outer_agent\", \"default\")\n", + "await runtime.send_message(Message(content=\"Hello, World!\"), outer)\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Both outputs are produced by the `OuterAgent`'s message handler, however the second output is based on the response from the `InnerAgent`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Broadcast Communication\n", + "\n", + "Broadcast communication is effectively the publish/subscribe model.\n", + "As part of the base agent ({py:class}`~agnext.core.BaseAgent`) implementation,\n", + "it must advertise the message types that\n", + "it would like to receive when published ({py:attr}`~agnext.core.AgentMetadata.subscriptions`).\n", + "If one of these messages is published, the agent's message handler will be invoked.\n", + "\n", + "The key difference between direct and broadcast communication is that broadcast\n", + "communication cannot be used for request/response scenarios.\n", + "When an agent publishes a message it is one way only, it cannot receive a response\n", + "from any other agent, even if a receiving agent sends a response.\n", + "\n", + "```{note}\n", + "An agent receiving a message does not know if it is handling a published or direct message.\n", + "So, if a response is given to a published message, it will be thrown away.\n", + "```\n", + "\n", + "To publish a message to all agents in the same namespace,\n", + "use the {py:meth}`agnext.core.BaseAgent.publish_message` method.\n", + "This call must still be awaited to allow the runtime to deliver the message to all agents,\n", + "but it will always return `None`.\n", + "If an agent raises an exception while handling a published message,\n", + "this will be logged but will not be propagated back to the publishing agent.\n", + "\n", + "The following example shows a `BroadcastingAgent` that publishes a message\n", + "upong receiving a message. A `ReceivingAgent` that prints the message\n", + "it receives." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "from agnext.application import SingleThreadedAgentRuntime\n", + "from agnext.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler\n", + "from agnext.core import MessageContext\n", + "\n", + "\n", + "class BroadcastingAgent(RoutedAgent):\n", + " @message_handler\n", + " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", + " # Publish a message to all agents in the same namespace.\n", + " await self.publish_message(Message(f\"Publishing a message: {message.content}!\"), topic_id=DefaultTopicId())\n", + "\n", + "\n", + "class ReceivingAgent(RoutedAgent):\n", + " @message_handler\n", + " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", + " print(f\"Received a message: {message.content}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Sending a direct message to the `BroadcastingAgent` will result in a message being published by\n", + "the `BroadcastingAgent` and received by the `ReceivingAgent`." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Received a message: Publishing a message: Hello, World!!\n" + ] + } + ], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "await runtime.register(\n", + " \"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"), lambda: [DefaultSubscription()]\n", + ")\n", + "await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"), lambda: [DefaultSubscription()])\n", + "runtime.start()\n", + "await runtime.send_message(Message(\"Hello, World!\"), AgentId(\"broadcasting_agent\", \"default\"))\n", + "await runtime.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To publish a message to all agents outside of an agent handling a message,\n", + "the message should be published via the runtime with the\n", + "{py:meth}`agnext.core.AgentRuntime.publish_message` method." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Received a message: Hello, World! From the runtime!\n", + "Received a message: Publishing a message: Hello, World! From the runtime!!\n" + ] + } + ], + "source": [ + "# Replace send_message with publish_message in the above example.\n", + "\n", + "runtime = SingleThreadedAgentRuntime()\n", + "await runtime.register(\n", + " \"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"), lambda: [DefaultSubscription()]\n", + ")\n", + "await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"), lambda: [DefaultSubscription()])\n", + "runtime.start()\n", + "await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=DefaultTopicId())\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first output is from the `ReceivingAgent` that received a message published\n", + "by the runtime. The second output is from the `ReceivingAgent` that received\n", + "a message published by the `BroadcastingAgent`.\n", + "\n", + "```{note}\n", + "If an agent publishes a message type for which it is subscribed it will not\n", + "receive the message it published. This is to prevent infinite loops.\n", + "```" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "agnext", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/samples/common/agents/_chat_completion_agent.py b/python/samples/common/agents/_chat_completion_agent.py index 213c90bfc..c71589dd5 100644 --- a/python/samples/common/agents/_chat_completion_agent.py +++ b/python/samples/common/agents/_chat_completion_agent.py @@ -3,6 +3,7 @@ import json from typing import Any, Coroutine, Dict, List, Mapping, Sequence, Tuple from agnext.components import ( + DefaultTopicId, FunctionCall, RoutedAgent, message_handler, @@ -110,9 +111,8 @@ class ChatCompletionAgent(RoutedAgent): # Generate a response. response = await self._generate_response(message.response_format, ctx) - assert ctx.topic_id is not None # Publish the response. - await self.publish_message(response, topic_id=ctx.topic_id) + await self.publish_message(response, topic_id=DefaultTopicId()) @message_handler() async def on_tool_call_message( diff --git a/python/samples/common/agents/_image_generation_agent.py b/python/samples/common/agents/_image_generation_agent.py index 878991762..8b0f105a1 100644 --- a/python/samples/common/agents/_image_generation_agent.py +++ b/python/samples/common/agents/_image_generation_agent.py @@ -2,6 +2,7 @@ from typing import Literal import openai from agnext.components import ( + DefaultTopicId, Image, RoutedAgent, message_handler, @@ -57,8 +58,7 @@ class ImageGenerationAgent(RoutedAgent): image is published as a MultiModalMessage.""" response = await self._generate_response(ctx.cancellation_token) - assert ctx.topic_id is not None - await self.publish_message(response, topic_id=ctx.topic_id) + await self.publish_message(response, topic_id=DefaultTopicId()) async def _generate_response(self, cancellation_token: CancellationToken) -> MultiModalMessage: messages = await self._memory.get_messages() diff --git a/python/samples/common/agents/_oai_assistant.py b/python/samples/common/agents/_oai_assistant.py index 6539d578a..2a1d72d0f 100644 --- a/python/samples/common/agents/_oai_assistant.py +++ b/python/samples/common/agents/_oai_assistant.py @@ -1,7 +1,7 @@ from typing import Any, Callable, List, Mapping import openai -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.core import ( CancellationToken, MessageContext, # type: ignore @@ -80,8 +80,7 @@ class OpenAIAssistantAgent(RoutedAgent): async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None: """Handle a publish now message. This method generates a response and publishes it.""" response = await self._generate_response(message.response_format, ctx.cancellation_token) - assert ctx.topic_id is not None - await self.publish_message(response, ctx.topic_id) + await self.publish_message(response, DefaultTopicId()) async def _generate_response( self, diff --git a/python/samples/common/agents/_user_proxy.py b/python/samples/common/agents/_user_proxy.py index 69d1a491e..442955050 100644 --- a/python/samples/common/agents/_user_proxy.py +++ b/python/samples/common/agents/_user_proxy.py @@ -1,6 +1,6 @@ import asyncio -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.core import MessageContext from ..types import PublishNow, TextMessage @@ -23,8 +23,9 @@ class UserProxyAgent(RoutedAgent): async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None: """Handle a publish now message. This method prompts the user for input, then publishes it.""" user_input = await self.get_user_input(self._user_input_prompt) - assert ctx.topic_id is not None - await self.publish_message(TextMessage(content=user_input, source=self.metadata["type"]), topic_id=ctx.topic_id) + await self.publish_message( + TextMessage(content=user_input, source=self.metadata["type"]), topic_id=DefaultTopicId() + ) async def get_user_input(self, prompt: str) -> str: """Get user input from the console. Override this method to customize how user input is retrieved.""" diff --git a/python/samples/core/two_agents_pub_sub.py b/python/samples/core/two_agents_pub_sub.py index fd18f23ab..0889d55ea 100644 --- a/python/samples/core/two_agents_pub_sub.py +++ b/python/samples/core/two_agents_pub_sub.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from typing import List from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.models import ( AssistantMessage, @@ -74,7 +74,7 @@ class ChatCompletionAgent(RoutedAgent): if ctx.topic_id is not None: await self.publish_message( - Message(content=response.content, source=self.metadata["type"]), topic_id=ctx.topic_id + Message(content=response.content, source=self.metadata["type"]), topic_id=DefaultTopicId() ) diff --git a/python/samples/demos/assistant.py b/python/samples/demos/assistant.py index 2f9ce3654..471bcd018 100644 --- a/python/samples/demos/assistant.py +++ b/python/samples/demos/assistant.py @@ -12,7 +12,7 @@ from typing import List import aiofiles import openai from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.core import AgentId, AgentRuntime, MessageContext from openai import AsyncAssistantEventHandler from openai.types.beta.thread import ToolResources @@ -109,9 +109,8 @@ class UserProxyAgent(RoutedAgent): return else: # Publish user input and exit handler. - assert ctx.topic_id is not None await self.publish_message( - TextMessage(content=user_input, source=self.metadata["type"]), topic_id=ctx.topic_id + TextMessage(content=user_input, source=self.metadata["type"]), topic_id=DefaultTopicId() ) return diff --git a/python/samples/demos/chat_room.py b/python/samples/demos/chat_room.py index 842c8c6af..83ce6b0b2 100644 --- a/python/samples/demos/chat_room.py +++ b/python/samples/demos/chat_room.py @@ -6,7 +6,8 @@ import os import sys from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler +from agnext.components._default_subscription import DefaultSubscription from agnext.components.memory import ChatMemory from agnext.components.models import ChatCompletionClient, SystemMessage from agnext.core import AgentId, AgentInstantiationContext, AgentProxy, AgentRuntime @@ -76,9 +77,8 @@ Use the following JSON format to provide your thought on the latest message and # Publish the response if needed. if respond is True or str(respond).lower().strip() == "true": - assert ctx.topic_id is not None await self.publish_message( - TextMessage(source=self.metadata["type"], content=str(response)), topic_id=ctx.topic_id + TextMessage(source=self.metadata["type"], content=str(response)), topic_id=DefaultTopicId() ) @@ -98,6 +98,7 @@ async def chat_room(runtime: AgentRuntime, app: TextualChatApp) -> None: description="The user in the chat room.", app=app, ), + lambda: [DefaultSubscription()], ) await runtime.register( "Alice", @@ -108,6 +109,7 @@ async def chat_room(runtime: AgentRuntime, app: TextualChatApp) -> None: memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), + lambda: [DefaultSubscription()], ) alice = AgentProxy(AgentId("Alice", "default"), runtime) await runtime.register( @@ -119,6 +121,7 @@ async def chat_room(runtime: AgentRuntime, app: TextualChatApp) -> None: memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), + lambda: [DefaultSubscription()], ) bob = AgentProxy(AgentId("Bob", "default"), runtime) await runtime.register( @@ -130,6 +133,7 @@ async def chat_room(runtime: AgentRuntime, app: TextualChatApp) -> None: memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), + lambda: [DefaultSubscription()], ) charlie = AgentProxy(AgentId("Charlie", "default"), runtime) app.welcoming_notice = f"""Welcome to the chat room demo with the following participants: diff --git a/python/samples/demos/chess_game.py b/python/samples/demos/chess_game.py index 5e4a9a85c..ddf14ad92 100644 --- a/python/samples/demos/chess_game.py +++ b/python/samples/demos/chess_game.py @@ -10,10 +10,11 @@ import sys from typing import Annotated, Literal from agnext.application import SingleThreadedAgentRuntime +from agnext.components import DefaultTopicId from agnext.components._type_subscription import TypeSubscription from agnext.components.models import SystemMessage from agnext.components.tools import FunctionTool -from agnext.core import AgentInstantiationContext, AgentRuntime, TopicId +from agnext.core import AgentInstantiationContext, AgentRuntime from chess import BLACK, SQUARE_NAMES, WHITE, Board, Move from chess import piece_name as get_piece_name @@ -214,9 +215,7 @@ async def main() -> None: await chess_game(runtime) runtime.start() # Publish an initial message to trigger the group chat manager to start orchestration. - await runtime.publish_message( - TextMessage(content="Game started.", source="System"), topic_id=TopicId("default", "default") - ) + await runtime.publish_message(TextMessage(content="Game started.", source="System"), topic_id=DefaultTopicId()) await runtime.stop_when_idle() diff --git a/python/samples/demos/utils.py b/python/samples/demos/utils.py index 96e21d761..6fadc8979 100644 --- a/python/samples/demos/utils.py +++ b/python/samples/demos/utils.py @@ -4,7 +4,7 @@ import random import sys from asyncio import Future -from agnext.components import Image, RoutedAgent, message_handler +from agnext.components import DefaultTopicId, Image, RoutedAgent, message_handler from agnext.core import AgentRuntime, CancellationToken from textual.app import App, ComposeResult from textual.containers import ScrollableContainer @@ -13,7 +13,6 @@ from textual_imageview.viewer import ImageViewer sys.path.append(os.path.join(os.path.dirname(__file__), "..")) -from agnext.core import TopicId from common.types import ( MultiModalMessage, PublishNow, @@ -136,9 +135,7 @@ class TextualChatApp(App): # type: ignore chat_messages.query("#typing").remove() # Publish the user message to the runtime. await self._runtime.publish_message( - # TODO fix hard coded topic_id - TextMessage(source=self._user_name, content=user_input), - topic_id=TopicId("default", "default"), + TextMessage(source=self._user_name, content=user_input), topic_id=DefaultTopicId() ) async def post_runtime_message(self, message: TextMessage | MultiModalMessage) -> None: # type: ignore diff --git a/python/samples/marketing-agents/auditor.py b/python/samples/marketing-agents/auditor.py index 0d4e197dc..3a9ff9cd9 100644 --- a/python/samples/marketing-agents/auditor.py +++ b/python/samples/marketing-agents/auditor.py @@ -1,4 +1,4 @@ -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components.models import ChatCompletionClient from agnext.components.models._types import SystemMessage from agnext.core import MessageContext @@ -30,7 +30,6 @@ class AuditAgent(RoutedAgent): assert isinstance(completion.content, str) if "NOTFORME" in completion.content: return - assert ctx.topic_id is not None await self.publish_message( - AuditorAlert(UserId=message.UserId, auditorAlertMessage=completion.content), topic_id=ctx.topic_id + AuditorAlert(UserId=message.UserId, auditorAlertMessage=completion.content), topic_id=DefaultTopicId() ) diff --git a/python/samples/marketing-agents/graphic_designer.py b/python/samples/marketing-agents/graphic_designer.py index 75c4641ae..cdbb52ed6 100644 --- a/python/samples/marketing-agents/graphic_designer.py +++ b/python/samples/marketing-agents/graphic_designer.py @@ -3,6 +3,7 @@ from typing import Literal import openai from agnext.components import ( + DefaultTopicId, RoutedAgent, message_handler, ) @@ -33,9 +34,8 @@ class GraphicDesignerAgent(RoutedAgent): image_uri = response.data[0].url logger.info(f"Generated image for article. Got response: '{image_uri}'") - assert ctx.topic_id is not None await self.publish_message( - GraphicDesignCreated(UserId=message.UserId, imageUri=image_uri), topic_id=ctx.topic_id + GraphicDesignCreated(UserId=message.UserId, imageUri=image_uri), topic_id=DefaultTopicId() ) except Exception as e: logger.error(f"Failed to generate image for article. Error: {e}") diff --git a/python/samples/marketing-agents/test_usage.py b/python/samples/marketing-agents/test_usage.py index 55d61689c..648b2f6d7 100644 --- a/python/samples/marketing-agents/test_usage.py +++ b/python/samples/marketing-agents/test_usage.py @@ -2,8 +2,8 @@ import asyncio import os from agnext.application import SingleThreadedAgentRuntime -from agnext.components import Image, RoutedAgent, message_handler -from agnext.core import MessageContext, TopicId +from agnext.components import DefaultTopicId, Image, RoutedAgent, message_handler +from agnext.core import MessageContext from app import build_app from dotenv import load_dotenv from messages import ArticleCreated, AuditorAlert, AuditText, GraphicDesignCreated @@ -34,15 +34,13 @@ async def main() -> None: runtime.start() - topic_id = TopicId("default", "default") - await runtime.publish_message( - AuditText(text="Buy my product for a MASSIVE 50% discount.", UserId="user-1"), topic_id=topic_id + AuditText(text="Buy my product for a MASSIVE 50% discount.", UserId="user-1"), topic_id=DefaultTopicId() ) await runtime.publish_message( ArticleCreated(article="The best article ever written about trees and rocks", UserId="user-2"), - topic_id=topic_id, + topic_id=DefaultTopicId(), ) await runtime.stop_when_idle() diff --git a/python/samples/patterns/coder_executor.py b/python/samples/patterns/coder_executor.py index 45529600a..c2910ca12 100644 --- a/python/samples/patterns/coder_executor.py +++ b/python/samples/patterns/coder_executor.py @@ -21,7 +21,7 @@ from dataclasses import dataclass from typing import Dict, List from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.code_executor import CodeBlock, CodeExecutor, LocalCommandLineCodeExecutor from agnext.components.models import ( @@ -102,12 +102,11 @@ Reply "TERMINATE" in the end when everything is done.""" AssistantMessage(content=response.content, source=self.metadata["type"]) ) - assert ctx.topic_id is not None # Publish the code execution task. await self.publish_message( CodeExecutionTask(content=response.content, session_id=session_id), cancellation_token=ctx.cancellation_token, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) @message_handler @@ -124,11 +123,10 @@ Reply "TERMINATE" in the end when everything is done.""" if "TERMINATE" in response.content: # If the task is completed, publish a message with the completion content. - assert ctx.topic_id is not None await self.publish_message( TaskCompletion(content=response.content), cancellation_token=ctx.cancellation_token, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) print("--------------------") print("Task completed:") @@ -136,11 +134,10 @@ Reply "TERMINATE" in the end when everything is done.""" return # Publish the code execution task. - assert ctx.topic_id is not None await self.publish_message( CodeExecutionTask(content=response.content, session_id=message.session_id), cancellation_token=ctx.cancellation_token, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) @@ -157,13 +154,12 @@ class Executor(RoutedAgent): code_blocks = self._extract_code_blocks(message.content) if not code_blocks: # If no code block is found, publish a message with an error. - assert ctx.topic_id is not None await self.publish_message( CodeExecutionTaskResult( output="Error: no Markdown code block found.", exit_code=1, session_id=message.session_id ), cancellation_token=ctx.cancellation_token, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) return # Execute code blocks. @@ -171,11 +167,10 @@ class Executor(RoutedAgent): code_blocks=code_blocks, cancellation_token=ctx.cancellation_token ) # Publish the code execution result. - assert ctx.topic_id is not None await self.publish_message( CodeExecutionTaskResult(output=result.output, exit_code=result.exit_code, session_id=message.session_id), cancellation_token=ctx.cancellation_token, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) def _extract_code_blocks(self, markdown_text: str) -> List[CodeBlock]: diff --git a/python/samples/patterns/coder_reviewer.py b/python/samples/patterns/coder_reviewer.py index 80cf76d44..46cc5a9ce 100644 --- a/python/samples/patterns/coder_reviewer.py +++ b/python/samples/patterns/coder_reviewer.py @@ -21,7 +21,7 @@ from dataclasses import dataclass from typing import Dict, List, Union from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.models import ( AssistantMessage, @@ -112,14 +112,13 @@ Please review the code and provide feedback. review_text = "Code review:\n" + "\n".join([f"{k}: {v}" for k, v in review.items()]) approved = review["approval"].lower().strip() == "approve" # Publish the review result. - assert ctx.topic_id is not None await self.publish_message( CodeReviewResult( review=review_text, approved=approved, session_id=message.session_id, ), - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) @@ -183,10 +182,9 @@ Code: # Store the code review task in the session memory. self._session_memory[session_id].append(code_review_task) # Publish a code review task. - assert ctx.topic_id is not None await self.publish_message( code_review_task, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) @message_handler @@ -201,14 +199,13 @@ Code: # Check if the code is approved. if message.approved: # Publish the code writing result. - assert ctx.topic_id is not None await self.publish_message( CodeWritingResult( code=review_request.code, task=review_request.code_writing_task, review=message.review, ), - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) print("Code Writing Result:") print("-" * 80) @@ -247,10 +244,9 @@ Code: # Store the code review task in the session memory. self._session_memory[message.session_id].append(code_review_task) # Publish a new code review task. - assert ctx.topic_id is not None await self.publish_message( code_review_task, - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) def _extract_code_block(self, markdown_text: str) -> Union[str, None]: diff --git a/python/samples/patterns/group_chat.py b/python/samples/patterns/group_chat.py index 5d48d5ad7..d1c80c3c2 100644 --- a/python/samples/patterns/group_chat.py +++ b/python/samples/patterns/group_chat.py @@ -18,7 +18,7 @@ from dataclasses import dataclass from typing import List from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components.models import ( AssistantMessage, ChatCompletionClient, @@ -69,8 +69,7 @@ class RoundRobinGroupChatManager(RoutedAgent): self._round_count += 1 if self._round_count > self._num_rounds * len(self._participants): # End the conversation after the specified number of rounds. - assert ctx.topic_id is not None - await self.publish_message(Termination(), ctx.topic_id) + await self.publish_message(Termination(), DefaultTopicId()) return # Send a request to speak message to the selected speaker. await self.send_message(RequestToSpeak(), speaker) @@ -107,8 +106,7 @@ class GroupChatParticipant(RoutedAgent): assert isinstance(response.content, str) speech = Message(content=response.content, source=self.metadata["type"]) self._memory.append(speech) - assert ctx.topic_id is not None - await self.publish_message(speech, topic_id=ctx.topic_id) + await self.publish_message(speech, topic_id=DefaultTopicId()) async def main() -> None: diff --git a/python/samples/patterns/mixture_of_agents.py b/python/samples/patterns/mixture_of_agents.py index c7d0159e6..1344a7719 100644 --- a/python/samples/patterns/mixture_of_agents.py +++ b/python/samples/patterns/mixture_of_agents.py @@ -15,7 +15,7 @@ from dataclasses import dataclass from typing import Dict, List from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.models import ChatCompletionClient, SystemMessage, UserMessage from agnext.core import MessageContext @@ -68,8 +68,7 @@ class ReferenceAgent(RoutedAgent): response = await self._model_client.create(self._system_messages + [task_message]) assert isinstance(response.content, str) task_result = ReferenceAgentTaskResult(session_id=message.session_id, result=response.content) - assert ctx.topic_id is not None - await self.publish_message(task_result, topic_id=ctx.topic_id) + await self.publish_message(task_result, topic_id=DefaultTopicId()) class AggregatorAgent(RoutedAgent): @@ -93,8 +92,7 @@ class AggregatorAgent(RoutedAgent): """Handle a task message. This method publishes the task to the reference agents.""" session_id = str(uuid.uuid4()) ref_task = ReferenceAgentTask(session_id=session_id, task=message.task) - assert ctx.topic_id is not None - await self.publish_message(ref_task, topic_id=ctx.topic_id) + await self.publish_message(ref_task, topic_id=DefaultTopicId()) @message_handler async def handle_result(self, message: ReferenceAgentTaskResult, ctx: MessageContext) -> None: @@ -108,8 +106,7 @@ class AggregatorAgent(RoutedAgent): ) assert isinstance(response.content, str) task_result = AggregatorTaskResult(result=response.content) - assert ctx.topic_id is not None - await self.publish_message(task_result, topic_id=ctx.topic_id) + await self.publish_message(task_result, topic_id=DefaultTopicId()) self._session_results.pop(message.session_id) print(f"Aggregator result: {response.content}") diff --git a/python/samples/patterns/multi_agent_debate.py b/python/samples/patterns/multi_agent_debate.py index f819465f3..e1dc29079 100644 --- a/python/samples/patterns/multi_agent_debate.py +++ b/python/samples/patterns/multi_agent_debate.py @@ -40,7 +40,7 @@ from dataclasses import dataclass from typing import Dict, List, Tuple from agnext.application import SingleThreadedAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.models import ( AssistantMessage, @@ -165,11 +165,10 @@ class MathSolver(RoutedAgent): answer = match.group(1) # Increment the counter. self._counters[message.session_id] = self._counters.get(message.session_id, 0) + 1 - assert ctx.topic_id is not None if self._counters[message.session_id] == self._max_round: # If the counter reaches the maximum round, publishes a final response. await self.publish_message( - FinalSolverResponse(answer=answer, session_id=message.session_id), topic_id=ctx.topic_id + FinalSolverResponse(answer=answer, session_id=message.session_id), topic_id=DefaultTopicId() ) else: # Publish intermediate response. @@ -181,7 +180,7 @@ class MathSolver(RoutedAgent): session_id=message.session_id, round=self._counters[message.session_id], ), - topic_id=ctx.topic_id, + topic_id=DefaultTopicId(), ) @@ -199,9 +198,8 @@ class MathAggregator(RoutedAgent): "in the form of {{answer}}, at the end of your response." ) session_id = str(uuid.uuid4()) - assert ctx.topic_id is not None await self.publish_message( - SolverRequest(content=prompt, session_id=session_id, question=message.content), topic_id=ctx.topic_id + SolverRequest(content=prompt, session_id=session_id, question=message.content), topic_id=DefaultTopicId() ) @message_handler @@ -212,8 +210,7 @@ class MathAggregator(RoutedAgent): answers = [resp.answer for resp in self._responses[message.session_id]] majority_answer = max(set(answers), key=answers.count) # Publish the aggregated response. - assert ctx.topic_id is not None - await self.publish_message(Answer(content=majority_answer), topic_id=ctx.topic_id) + await self.publish_message(Answer(content=majority_answer), topic_id=DefaultTopicId()) # Clear the responses. self._responses.pop(message.session_id) print(f"Aggregated answer: {majority_answer}") diff --git a/python/samples/tool-use/coding_pub_sub.py b/python/samples/tool-use/coding_pub_sub.py index 81d462af0..f0ef2121f 100644 --- a/python/samples/tool-use/coding_pub_sub.py +++ b/python/samples/tool-use/coding_pub_sub.py @@ -20,7 +20,7 @@ from dataclasses import dataclass from typing import Dict, List from agnext.application import SingleThreadedAgentRuntime -from agnext.components import FunctionCall, RoutedAgent, message_handler +from agnext.components import DefaultTopicId, FunctionCall, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.components.code_executor import LocalCommandLineCodeExecutor from agnext.components.models import ( @@ -90,8 +90,7 @@ class ToolExecutorAgent(RoutedAgent): session_id=message.session_id, result=FunctionExecutionResult(content=result_as_str, call_id=message.function_call.id), ) - assert ctx.topic_id is not None - await self.publish_message(task_result, topic_id=ctx.topic_id) + await self.publish_message(task_result, topic_id=DefaultTopicId()) class ToolUseAgent(RoutedAgent): @@ -129,8 +128,7 @@ class ToolUseAgent(RoutedAgent): if isinstance(response.content, str): # If the response is a string, just publish the response. response_message = AgentResponse(content=response.content) - assert ctx.topic_id is not None - await self.publish_message(response_message, topic_id=ctx.topic_id) + await self.publish_message(response_message, topic_id=DefaultTopicId()) print(f"AI Response: {response.content}") return @@ -143,8 +141,7 @@ class ToolUseAgent(RoutedAgent): for function_call in response.content: task = ToolExecutionTask(session_id=session_id, function_call=function_call) self._tool_counter[session_id] += 1 - assert ctx.topic_id is not None - await self.publish_message(task, topic_id=ctx.topic_id) + await self.publish_message(task, topic_id=DefaultTopicId()) @message_handler async def handle_tool_result(self, message: ToolExecutionTaskResult, ctx: MessageContext) -> None: @@ -170,11 +167,10 @@ class ToolUseAgent(RoutedAgent): self._sessions[message.session_id].append( AssistantMessage(content=response.content, source=self.metadata["type"]) ) - assert ctx.topic_id is not None # If the response is a string, just publish the response. if isinstance(response.content, str): response_message = AgentResponse(content=response.content) - await self.publish_message(response_message, topic_id=ctx.topic_id) + await self.publish_message(response_message, topic_id=DefaultTopicId()) self._tool_results.pop(message.session_id) self._tool_counter.pop(message.session_id) print(f"AI Response: {response.content}") @@ -185,7 +181,7 @@ class ToolUseAgent(RoutedAgent): for function_call in response.content: task = ToolExecutionTask(session_id=message.session_id, function_call=function_call) self._tool_counter[message.session_id] += 1 - await self.publish_message(task, topic_id=ctx.topic_id) + await self.publish_message(task, topic_id=DefaultTopicId()) async def main() -> None: diff --git a/python/samples/worker/run_worker_pub_sub.py b/python/samples/worker/run_worker_pub_sub.py index a2c8f7ae3..6c8d485c6 100644 --- a/python/samples/worker/run_worker_pub_sub.py +++ b/python/samples/worker/run_worker_pub_sub.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from typing import Any, NoReturn from agnext.application import WorkerAgentRuntime -from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, message_handler from agnext.components._type_subscription import TypeSubscription from agnext.core import MESSAGE_TYPE_REGISTRY, MessageContext, TopicId @@ -40,13 +40,11 @@ class ReceiveAgent(RoutedAgent): @message_handler async def on_greet(self, message: Greeting, ctx: MessageContext) -> None: - assert ctx.topic_id is not None - await self.publish_message(ReturnedGreeting(f"Returned greeting: {message.content}"), topic_id=ctx.topic_id) + await self.publish_message(ReturnedGreeting(f"Returned greeting: {message.content}"), topic_id=DefaultTopicId()) @message_handler async def on_feedback(self, message: Feedback, ctx: MessageContext) -> None: - assert ctx.topic_id is not None - await self.publish_message(ReturnedFeedback(f"Returned feedback: {message.content}"), topic_id=ctx.topic_id) + await self.publish_message(ReturnedFeedback(f"Returned feedback: {message.content}"), topic_id=DefaultTopicId()) async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> NoReturn: # type: ignore print(f"Unhandled message: {message}") @@ -58,13 +56,11 @@ class GreeterAgent(RoutedAgent): @message_handler async def on_ask(self, message: AskToGreet, ctx: MessageContext) -> None: - assert ctx.topic_id is not None - await self.publish_message(Greeting(f"Hello, {message.content}!"), topic_id=ctx.topic_id) + await self.publish_message(Greeting(f"Hello, {message.content}!"), topic_id=DefaultTopicId()) @message_handler async def on_returned_greet(self, message: ReturnedGreeting, ctx: MessageContext) -> None: - assert ctx.topic_id is not None - await self.publish_message(Feedback(f"Feedback: {message.content}"), topic_id=ctx.topic_id) + await self.publish_message(Feedback(f"Feedback: {message.content}"), topic_id=DefaultTopicId()) async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> NoReturn: # type: ignore print(f"Unhandled message: {message}") diff --git a/python/samples/worker/run_worker_rpc.py b/python/samples/worker/run_worker_rpc.py index 8980c1ea9..39f1372b2 100644 --- a/python/samples/worker/run_worker_rpc.py +++ b/python/samples/worker/run_worker_rpc.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from typing import Any, NoReturn from agnext.application import WorkerAgentRuntime -from agnext.components import RoutedAgent, TypeSubscription, message_handler +from agnext.components import DefaultTopicId, RoutedAgent, TypeSubscription, message_handler from agnext.core import MESSAGE_TYPE_REGISTRY, AgentId, AgentInstantiationContext, MessageContext, TopicId @@ -47,8 +47,7 @@ class GreeterAgent(RoutedAgent): @message_handler async def on_ask(self, message: AskToGreet, ctx: MessageContext) -> None: response = await self.send_message(Greeting(f"Hello, {message.content}!"), recipient=self._receive_agent_id) - assert ctx.topic_id is not None - await self.publish_message(Feedback(f"Feedback: {response.content}"), topic_id=ctx.topic_id) + await self.publish_message(Feedback(f"Feedback: {response.content}"), topic_id=DefaultTopicId()) async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> NoReturn: # type: ignore print(f"Unhandled message: {message}") diff --git a/python/src/agnext/components/__init__.py b/python/src/agnext/components/__init__.py index 17fe80ac2..4d61b3180 100644 --- a/python/src/agnext/components/__init__.py +++ b/python/src/agnext/components/__init__.py @@ -6,7 +6,7 @@ from ._closure_agent import ClosureAgent from ._default_subscription import DefaultSubscription from ._default_topic import DefaultTopicId from ._image import Image -from ._routed_agent import RoutedAgent, message_handler, TypeRoutedAgent +from ._routed_agent import RoutedAgent, TypeRoutedAgent, message_handler from ._type_subscription import TypeSubscription from ._types import FunctionCall diff --git a/python/tests/test_utils/__init__.py b/python/tests/test_utils/__init__.py index f77793247..acec46e8c 100644 --- a/python/tests/test_utils/__init__.py +++ b/python/tests/test_utils/__init__.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from typing import Any from agnext.components import RoutedAgent, message_handler +from agnext.components import DefaultTopicId from agnext.core import BaseAgent from agnext.core import MessageContext @@ -38,8 +39,7 @@ class CascadingAgent(RoutedAgent): self.num_calls += 1 if message.round == self.max_rounds: return - assert ctx.topic_id is not None - await self.publish_message(CascadingMessageType(round=message.round + 1), topic_id=ctx.topic_id) + await self.publish_message(CascadingMessageType(round=message.round + 1), topic_id=DefaultTopicId()) class NoopAgent(BaseAgent): def __init__(self) -> None: