Update termination handler docs to keep it up-to-date (#486)

* Update termination handler docs to keep it up-to-date

* fix type
This commit is contained in:
Eric Zhu
2024-09-12 03:59:56 -07:00
committed by GitHub
parent cb9ae44f23
commit 06554c5a45
2 changed files with 167 additions and 84 deletions

View File

@@ -0,0 +1,167 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Termination using Intervention Handler\n",
"\n",
"```{note}\n",
"This method is valid when using {py:class}`~autogen_core.application.SingleThreadedAgentRuntime`.\n",
"```\n",
"\n",
"There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any\n",
"\n",
"from autogen_core.application import SingleThreadedAgentRuntime\n",
"from autogen_core.components import RoutedAgent, message_handler, DefaultSubscription, DefaultTopicId\n",
"from autogen_core.base import AgentId, MessageContext\n",
"from autogen_core.base.intervention import DefaultInterventionHandler"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we define a dataclass that will be used to signal termination."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Termination:\n",
" reason: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We code our agent to publish a termination message when it decides it is time to terminate."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"class AnAgent(RoutedAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
" self.received = 0\n",
"\n",
" @message_handler\n",
" async def on_new_message(self, message: str, ctx: MessageContext) -> None:\n",
" self.received += 1\n",
" if self.received > 3:\n",
" await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"class TerminationHandler(DefaultInterventionHandler):\n",
" def __init__(self) -> None:\n",
" self._termination_value: Termination | None = None\n",
"\n",
" async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n",
" if isinstance(message, Termination):\n",
" self._termination_value = message\n",
" return message\n",
"\n",
" @property\n",
" def termination_value(self) -> Termination | None:\n",
" return self._termination_value\n",
"\n",
" @property\n",
" def has_terminated(self) -> bool:\n",
" return self._termination_value is not None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Termination(reason='Reached maximum number of messages')\n"
]
}
],
"source": [
"termination_handler = TerminationHandler()\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
"\n",
"await runtime.register(\"my_agent\", AnAgent, lambda: [DefaultSubscription()])\n",
"\n",
"runtime.start()\n",
"\n",
"# Publish more than 3 messages to trigger termination.\n",
"await runtime.publish_message(\"hello\", DefaultTopicId())\n",
"await runtime.publish_message(\"hello\", DefaultTopicId())\n",
"await runtime.publish_message(\"hello\", DefaultTopicId())\n",
"await runtime.publish_message(\"hello\", DefaultTopicId())\n",
"\n",
"# Wait for termination.\n",
"await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
"\n",
"print(termination_handler.termination_value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -1,84 +0,0 @@
# Termination using Intervention Handler
```{note}
This method is only really valid for single-tenant applications. If multiple parallel users are using the application via namespaces this approach will not work without modification.
```
There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an `InterventionHandler` to detect a termination message and then act on it.
```python
import asyncio
from dataclasses import dataclass
from typing import Any
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.components import RoutedAgent, message_handler
from autogen_core.base import AgentId, CancellationToken
from autogen_core.base.intervention import DefaultInterventionHandler
```
First, we define a dataclass that will be used to signal termination.
```python
@dataclass
class Termination:
reason: str
```
We code our agent to publish a termination message when it decides it is time to terminate.
```python
class AnAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("MyAgent")
self.received = 0
@message_handler
async def on_new_message(self, message: str, cancellation_token: CancellationToken) -> None:
self.received += 1
if self.received > 3:
self.publish_message(Termination(reason="Reached maximum number of messages"))
```
Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested.
```python
class TerminationHandler(DefaultInterventionHandler):
def __init__(self):
self.termination_value: Termination | None = None
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
if isinstance(message, Termination):
self.termination_value = message
return message
@property
def termination_value(self) -> Termination | None:
return self.termination_value
@property
def has_terminated(self) -> bool:
return self.termination_value is not None
```
Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received.
```python
async def main() -> None:
termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(
intervention_handlers=[termination_handler]
)
# Add Agents and kick off task
runtime.start()
await runtime.stop_when(lambda: termination_handler.has_terminated)
print(termination_handler.termination_value)
if __name__ == "__main__":
asyncio.run(main())
```