diff --git a/python/docs/src/conf.py b/python/docs/src/conf.py index aa730556b..f0e1a7a63 100644 --- a/python/docs/src/conf.py +++ b/python/docs/src/conf.py @@ -18,7 +18,8 @@ extensions = [ "sphinx.ext.autosummary", "sphinx.ext.napoleon", "sphinxcontrib.apidoc", - "myst_parser" + "myst_parser", + "sphinx.ext.intersphinx" ] apidoc_module_dir = '../../src/agnext' @@ -54,4 +55,6 @@ html_theme_options = { autodoc_default_options = { "members": True, "undoc-members": True, -} \ No newline at end of file +} + +intersphinx_mapping = {'python': ('https://docs.python.org/3', None)} \ No newline at end of file diff --git a/python/examples/common/agents/_chat_completion_agent.py b/python/examples/common/agents/_chat_completion_agent.py index 8cb98a2d8..302c44aff 100644 --- a/python/examples/common/agents/_chat_completion_agent.py +++ b/python/examples/common/agents/_chat_completion_agent.py @@ -189,10 +189,12 @@ class ChatCompletionAgent(TypeRoutedAgent): and all(isinstance(x, FunctionCall) for x in response.content) ): # Send a function call message to itself. - response = await self.send_message( - message=FunctionCallMessage(content=response.content, source=self.metadata["name"]), - recipient=self.id, - cancellation_token=cancellation_token, + response = await ( + await self.send_message( + message=FunctionCallMessage(content=response.content, source=self.metadata["name"]), + recipient=self.id, + cancellation_token=cancellation_token, + ) ) # Make an assistant message from the response. hisorical_messages = await self._memory.get_messages() diff --git a/python/examples/common/agents/_image_generation_agent.py b/python/examples/common/agents/_image_generation_agent.py index 9c90003d9..f20d55c3e 100644 --- a/python/examples/common/agents/_image_generation_agent.py +++ b/python/examples/common/agents/_image_generation_agent.py @@ -57,7 +57,7 @@ class ImageGenerationAgent(TypeRoutedAgent): image is published as a MultiModalMessage.""" response = await self._generate_response(cancellation_token) - self.publish_message(response) + await self.publish_message(response) async def _generate_response(self, cancellation_token: CancellationToken) -> MultiModalMessage: messages = await self._memory.get_messages() diff --git a/python/examples/common/patterns/_orchestrator_chat.py b/python/examples/common/patterns/_orchestrator_chat.py index 84b974f7b..d59fd4819 100644 --- a/python/examples/common/patterns/_orchestrator_chat.py +++ b/python/examples/common/patterns/_orchestrator_chat.py @@ -51,7 +51,7 @@ class OrchestratorChat(TypeRoutedAgent): while total_turns < self._max_turns: # Reset all agents. for agent in [*self._specialists, self._orchestrator]: - await self.send_message(Reset(), agent) + await (await self.send_message(Reset(), agent)) # Create the task specs. task_specs = f""" @@ -73,7 +73,7 @@ Some additional points to consider: # Send the task specs to the orchestrator and specialists. for agent in [*self._specialists, self._orchestrator]: - await self.send_message(TextMessage(content=task_specs, source=self.metadata["name"]), agent) + await (await self.send_message(TextMessage(content=task_specs, source=self.metadata["name"]), agent)) # Inner loop. stalled_turns = 0 @@ -127,9 +127,11 @@ Some additional points to consider: # Update agents. for agent in [*self._specialists, self._orchestrator]: - _ = await self.send_message( - TextMessage(content=subtask, source=self.metadata["name"]), - agent, + _ = await ( + await self.send_message( + TextMessage(content=subtask, source=self.metadata["name"]), + agent, + ) ) # Find the speaker. @@ -139,17 +141,19 @@ Some additional points to consider: raise ValueError(f"Invalid next speaker: {data['next_speaker']['answer']}") from e # Ask speaker to speak. - speaker_response = await self.send_message(RespondNow(), speaker) + speaker_response = await (await self.send_message(RespondNow(), speaker)) assert speaker_response is not None # Update all other agents with the speaker's response. for agent in [agent for agent in self._specialists if agent != speaker] + [self._orchestrator]: - await self.send_message( - TextMessage( - content=speaker_response.content, - source=speaker_response.source, - ), - agent, + await ( + await self.send_message( + TextMessage( + content=speaker_response.content, + source=speaker_response.source, + ), + agent, + ) ) # Increment the total turns. @@ -162,7 +166,7 @@ Some additional points to consider: async def _prepare_task(self, task: str, sender: str) -> Tuple[str, str, str, str]: # Reset planner. - await self.send_message(Reset(), self._planner) + await (await self.send_message(Reset(), self._planner)) # A reusable description of the team. team = "\n".join( @@ -199,8 +203,8 @@ When answering this survey, keep in mind that "facts" will typically be specific """.strip() # Ask the planner to obtain prior knowledge about facts. - await self.send_message(TextMessage(content=closed_book_prompt, source=sender), self._planner) - facts_response = await self.send_message(RespondNow(), self._planner) + await (await self.send_message(TextMessage(content=closed_book_prompt, source=sender), self._planner)) + facts_response = await (await self.send_message(RespondNow(), self._planner)) facts = str(facts_response.content) @@ -213,7 +217,7 @@ Based on the team composition, and known and unknown facts, please devise a shor # Send second messag eto the planner. await self.send_message(TextMessage(content=plan_prompt, source=sender), self._planner) - plan_response = await self.send_message(RespondNow(), self._planner) + plan_response = await (await self.send_message(RespondNow(), self._planner)) plan = str(plan_response.content) return team, names, facts, plan @@ -265,11 +269,13 @@ Please output an answer in pure JSON format according to the following schema. T request = step_prompt while True: # Send a message to the orchestrator. - await self.send_message(TextMessage(content=request, source=sender), self._orchestrator) + await (await self.send_message(TextMessage(content=request, source=sender), self._orchestrator)) # Request a response. - step_response = await self.send_message( - RespondNow(response_format=ResponseFormat.json_object), - self._orchestrator, + step_response = await ( + await self.send_message( + RespondNow(response_format=ResponseFormat.json_object), + self._orchestrator, + ) ) # TODO: use typed dictionary. try: @@ -328,9 +334,9 @@ Please output an answer in pure JSON format according to the following schema. T {facts} """.strip() # Send a message to the orchestrator. - await self.send_message(TextMessage(content=new_facts_prompt, source=sender), self._orchestrator) + await (await self.send_message(TextMessage(content=new_facts_prompt, source=sender), self._orchestrator)) # Request a response. - new_facts_response = await self.send_message(RespondNow(), self._orchestrator) + new_facts_response = await (await self.send_message(RespondNow(), self._orchestrator)) return str(new_facts_response.content) async def _educated_guess(self, facts: str, sender: str) -> Any: @@ -353,14 +359,18 @@ Please output an answer in pure JSON format according to the following schema. T request = educated_guess_promt while True: # Send a message to the orchestrator. - await self.send_message( - TextMessage(content=request, source=sender), - self._orchestrator, + await ( + await self.send_message( + TextMessage(content=request, source=sender), + self._orchestrator, + ) ) # Request a response. - response = await self.send_message( - RespondNow(response_format=ResponseFormat.json_object), - self._orchestrator, + response = await ( + await self.send_message( + RespondNow(response_format=ResponseFormat.json_object), + self._orchestrator, + ) ) try: result = json.loads(str(response.content)) @@ -387,7 +397,7 @@ Team membership: {team} """.strip() # Send a message to the orchestrator. - await self.send_message(TextMessage(content=new_plan_prompt, source=sender), self._orchestrator) + await (await self.send_message(TextMessage(content=new_plan_prompt, source=sender), self._orchestrator)) # Request a response. - new_plan_response = await self.send_message(RespondNow(), self._orchestrator) + new_plan_response = await (await self.send_message(RespondNow(), self._orchestrator)) return str(new_plan_response.content) diff --git a/python/examples/core/inner_outer_direct.py b/python/examples/core/inner_outer_direct.py index c657aa3a3..ac67bc9a8 100644 --- a/python/examples/core/inner_outer_direct.py +++ b/python/examples/core/inner_outer_direct.py @@ -47,7 +47,7 @@ async def main() -> None: runtime = SingleThreadedAgentRuntime() inner = runtime.register_and_get("inner", Inner) outer = runtime.register_and_get("outer", lambda: Outer(inner)) - response = runtime.send_message(MessageType(body="Hello", sender="external"), outer) + response = await runtime.send_message(MessageType(body="Hello", sender="external"), outer) while not response.done(): await runtime.process_next() diff --git a/python/examples/core/one_agent_direct.py b/python/examples/core/one_agent_direct.py index 576f04eda..ae6dc2162 100644 --- a/python/examples/core/one_agent_direct.py +++ b/python/examples/core/one_agent_direct.py @@ -46,7 +46,7 @@ async def main() -> None: # Send a message to the agent. message = Message(content="Can you tell me something fun about SF?") - result = runtime.send_message(message, agent) + result = await runtime.send_message(message, agent) # Process messages until the agent responds. while result.done() is False: diff --git a/python/examples/core/two_agents_pub_sub_termination.py b/python/examples/core/two_agents_pub_sub_termination.py index 05a0e68f6..4b42fc903 100644 --- a/python/examples/core/two_agents_pub_sub_termination.py +++ b/python/examples/core/two_agents_pub_sub_termination.py @@ -64,7 +64,7 @@ class ChatCompletionAgent(TypeRoutedAgent): async def handle_message(self, message: Message, cancellation_token: CancellationToken) -> None: self._memory.append(message) if self._termination_word in message.content: - self.publish_message(Termination()) + await self.publish_message(Termination()) return llm_messages: List[LLMMessage] = [] for m in self._memory[-10:]: @@ -74,7 +74,7 @@ class ChatCompletionAgent(TypeRoutedAgent): llm_messages.append(UserMessage(content=m.content, source=m.source)) response = await self._model_client.create(self._system_messages + llm_messages) assert isinstance(response.content, str) - self.publish_message(Message(content=response.content, source=self.metadata["name"])) + await self.publish_message(Message(content=response.content, source=self.metadata["name"])) class TerminationHandler(DefaultInterventionHandler): @@ -126,7 +126,7 @@ async def main() -> None: # Send a message to Jack to start the conversation. message = Message(content="Can you tell me something fun about SF?", source="User") - runtime.send_message(message, jack) + await runtime.send_message(message, jack) # Process messages until termination. while not termination_handler.terminated: diff --git a/python/examples/demos/assistant.py b/python/examples/demos/assistant.py index f01b89d44..a753c0637 100644 --- a/python/examples/demos/assistant.py +++ b/python/examples/demos/assistant.py @@ -228,7 +228,7 @@ Type "exit" to exit the chat. user = assistant_chat(runtime) print(usage) # Request the user to start the conversation. - runtime.send_message(PublishNow(), user) + await runtime.send_message(PublishNow(), user) while True: # TODO: have a way to exit the loop. await runtime.process_next() diff --git a/python/examples/demos/chess_game.py b/python/examples/demos/chess_game.py index 4807fc1ac..d529131da 100644 --- a/python/examples/demos/chess_game.py +++ b/python/examples/demos/chess_game.py @@ -205,7 +205,7 @@ async def main() -> None: runtime = SingleThreadedAgentRuntime() chess_game(runtime) # Publish an initial message to trigger the group chat manager to start orchestration. - runtime.publish_message(TextMessage(content="Game started.", source="System"), namespace="default") + await runtime.publish_message(TextMessage(content="Game started.", source="System"), namespace="default") while True: await runtime.process_next() await asyncio.sleep(1) diff --git a/python/examples/patterns/coder_executor_pub_sub.py b/python/examples/patterns/coder_executor_pub_sub.py index 27a4b4960..7d97f56b3 100644 --- a/python/examples/patterns/coder_executor_pub_sub.py +++ b/python/examples/patterns/coder_executor_pub_sub.py @@ -100,7 +100,7 @@ Reply "TERMINATE" in the end when everything is done.""" ) # Publish the code execution task. - self.publish_message( + await self.publish_message( CodeExecutionTask(content=response.content, session_id=session_id), cancellation_token=cancellation_token ) @@ -120,11 +120,11 @@ Reply "TERMINATE" in the end when everything is done.""" if "TERMINATE" in response.content: # If the task is completed, publish a message with the completion content. - self.publish_message(TaskCompletion(content=response.content), cancellation_token=cancellation_token) + await self.publish_message(TaskCompletion(content=response.content), cancellation_token=cancellation_token) return # Publish the code execution task. - self.publish_message( + await self.publish_message( CodeExecutionTask(content=response.content, session_id=message.session_id), cancellation_token=cancellation_token, ) @@ -143,7 +143,7 @@ class Executor(TypeRoutedAgent): code_blocks = self._extract_code_blocks(message.content) if not code_blocks: # If no code block is found, publish a message with an error. - self.publish_message( + await self.publish_message( CodeExecutionTaskResult( output="Error: no Markdown code block found.", exit_code=1, session_id=message.session_id ), @@ -155,7 +155,7 @@ class Executor(TypeRoutedAgent): cancellation_token.link_future(future) result = await future # Publish the code execution result. - self.publish_message( + await self.publish_message( CodeExecutionTaskResult(output=result.output, exit_code=result.exit_code, session_id=message.session_id), cancellation_token=cancellation_token, ) @@ -202,7 +202,7 @@ async def main(task: str, temp_dir: str) -> None: runtime.register("executor", lambda: Executor(executor=LocalCommandLineCodeExecutor(work_dir=temp_dir))) # Publish the task message. - runtime.publish_message(TaskMessage(content=task), namespace="default") + await runtime.publish_message(TaskMessage(content=task), namespace="default") # Run the runtime until the termination condition is met. while not termination_handler.terminated: diff --git a/python/examples/patterns/coder_reviewer_direct.py b/python/examples/patterns/coder_reviewer_direct.py index d10b61f61..a57439341 100644 --- a/python/examples/patterns/coder_reviewer_direct.py +++ b/python/examples/patterns/coder_reviewer_direct.py @@ -184,7 +184,7 @@ Code: # Send the code review task to the reviewer. result = await self.send_message(code_review_task, self._reviewer) # Store the review result in the session memory. - memory.append(result) + memory.append(await result) # Obtain the request from previous messages. review_request = next(m for m in reversed(memory) if isinstance(m, CodeReviewTask)) assert review_request is not None @@ -222,7 +222,7 @@ async def main() -> None: reviewer=reviewer, ), ) - result = runtime.send_message( + result = await runtime.send_message( message=CodeWritingTask( task="Write a function to find the directory with the largest number of files using multi-processing." ), diff --git a/python/examples/patterns/coder_reviewer_pub_sub.py b/python/examples/patterns/coder_reviewer_pub_sub.py index d6f18ce7b..09117d3b4 100644 --- a/python/examples/patterns/coder_reviewer_pub_sub.py +++ b/python/examples/patterns/coder_reviewer_pub_sub.py @@ -281,7 +281,7 @@ async def main() -> None: model_client=OpenAIChatCompletionClient(model="gpt-3.5-turbo"), ), ) - runtime.publish_message( + await runtime.publish_message( message=CodeWritingTask( task="Write a function to find the directory with the largest number of files using multi-processing." ), diff --git a/python/examples/patterns/group_chat_pub_sub.py b/python/examples/patterns/group_chat_pub_sub.py index 74da99ad0..f86dcf901 100644 --- a/python/examples/patterns/group_chat_pub_sub.py +++ b/python/examples/patterns/group_chat_pub_sub.py @@ -65,10 +65,10 @@ class RoundRobinGroupChatManager(TypeRoutedAgent): self._round_count += 1 if self._round_count == self._num_rounds * len(self._participants): # End the conversation after the specified number of rounds. - self.publish_message(Termination()) + await self.publish_message(Termination()) return # Send a request to speak message to the selected speaker. - self.send_message(RequestToSpeak(), speaker) + await self.send_message(RequestToSpeak(), speaker) class GroupChatParticipant(TypeRoutedAgent): @@ -102,7 +102,7 @@ class GroupChatParticipant(TypeRoutedAgent): assert isinstance(response.content, str) speach = Message(content=response.content, source=self.metadata["name"]) self._memory.append(speach) - self.publish_message(speach) + await self.publish_message(speach) class TerminationHandler(DefaultInterventionHandler): @@ -165,7 +165,7 @@ async def main() -> None: ) # Start the conversation. - runtime.publish_message(Message(content="Hello, everyone!", source="Moderator"), namespace="default") + await runtime.publish_message(Message(content="Hello, everyone!", source="Moderator"), namespace="default") # Run the runtime until termination. while not termination_handler.terminated: diff --git a/python/examples/patterns/mixture_of_agents_direct.py b/python/examples/patterns/mixture_of_agents_direct.py index bc8c8e3f5..f54f0fe5c 100644 --- a/python/examples/patterns/mixture_of_agents_direct.py +++ b/python/examples/patterns/mixture_of_agents_direct.py @@ -83,7 +83,7 @@ class AggregatorAgent(TypeRoutedAgent): and aggregates the results.""" ref_task = ReferenceAgentTask(task=message.task) results: List[ReferenceAgentTaskResult] = await asyncio.gather( - *[self.send_message(ref_task, ref) for ref in self._references] + *[await self.send_message(ref_task, ref) for ref in self._references] ) combined_result = "\n\n".join([r.result for r in results]) response = await self._model_client.create( @@ -132,7 +132,7 @@ async def main() -> None: references=[ref1, ref2, ref3], ), ) - result = runtime.send_message(AggregatorTask(task="What are something fun to do in SF?"), agg) + result = await runtime.send_message(AggregatorTask(task="What are something fun to do in SF?"), agg) while result.done() is False: await runtime.process_next() print(result.result()) diff --git a/python/examples/patterns/multi_agent_debate_pub_sub.py b/python/examples/patterns/multi_agent_debate_pub_sub.py index 38eb08174..2d9faddec 100644 --- a/python/examples/patterns/multi_agent_debate_pub_sub.py +++ b/python/examples/patterns/multi_agent_debate_pub_sub.py @@ -128,7 +128,11 @@ class MathSolver(TypeRoutedAgent): "Your final answer should be a single numerical number, " "in the form of {{answer}}, at the end of your response." # Send the question to the agent itself. - self.send_message(SolverRequest(content=prompt, session_id=message.session_id, question=question), self.id) + await ( + await self.send_message( + SolverRequest(content=prompt, session_id=message.session_id, question=question), self.id + ) + ) # Clear the buffer. self._buffer.clear() @@ -192,7 +196,7 @@ class MathAggregator(TypeRoutedAgent): answers = [resp.answer for resp in self._responses[message.session_id]] majority_answer = max(set(answers), key=answers.count) # Publish the aggregated response. - self.publish_message(Answer(content=majority_answer)) + await self.publish_message(Answer(content=majority_answer)) # Clear the responses. self._responses.pop(message.session_id) diff --git a/python/examples/patterns/orchestrator.py b/python/examples/patterns/orchestrator.py index a0b5e4fee..c171da867 100644 --- a/python/examples/patterns/orchestrator.py +++ b/python/examples/patterns/orchestrator.py @@ -130,7 +130,7 @@ def software_development(runtime: AgentRuntime) -> OrchestratorChat: # type: ig async def run(message: str, user: str, scenario: Callable[[AgentRuntime], OrchestratorChat]) -> None: # type: ignore runtime = SingleThreadedAgentRuntime() chat = scenario(runtime) - response = runtime.send_message(TextMessage(content=message, source=user), chat.id) + response = await runtime.send_message(TextMessage(content=message, source=user), chat.id) while not response.done(): await runtime.process_next() print((await response).content) # type: ignore diff --git a/python/examples/tool-use/coding_one_agent_direct.py b/python/examples/tool-use/coding_one_agent_direct.py index f2739d3bb..75e775b7b 100644 --- a/python/examples/tool-use/coding_one_agent_direct.py +++ b/python/examples/tool-use/coding_one_agent_direct.py @@ -78,8 +78,8 @@ class ToolEnabledAgent(TypeRoutedAgent): # Keep executing the tools until the response is not a list of function calls. while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content): - results = await asyncio.gather( - *[self.send_message(ToolExecutionTask(function_call=call), self.id) for call in response.content] + results: List[ToolExecutionTaskResult] = await asyncio.gather( + *[await self.send_message(ToolExecutionTask(function_call=call), self.id) for call in response.content] ) # Combine the results into a single response. result = FunctionExecutionResultMessage(content=[result.result for result in results]) @@ -136,7 +136,9 @@ async def main() -> None: ) # Send a task to the tool user. - result = runtime.send_message(UserRequest("Run the following Python code: print('Hello, World!')"), tool_agent) + result = await runtime.send_message( + UserRequest("Run the following Python code: print('Hello, World!')"), tool_agent + ) # Run the runtime until the task is completed. while not result.done(): diff --git a/python/examples/tool-use/coding_two_agent_pub_sub.py b/python/examples/tool-use/coding_two_agent_pub_sub.py index c4e2138a4..0f3a663c1 100644 --- a/python/examples/tool-use/coding_two_agent_pub_sub.py +++ b/python/examples/tool-use/coding_two_agent_pub_sub.py @@ -218,7 +218,9 @@ async def main() -> None: ) # Publish a task. - runtime.publish_message(UserRequest("Run the following Python code: print('Hello, World!')"), namespace="default") + await runtime.publish_message( + UserRequest("Run the following Python code: print('Hello, World!')"), namespace="default" + ) # Run the runtime until termination. while not termination_handler.terminated: diff --git a/python/examples/tool-use/custom_function_tool_one_agent_direct.py b/python/examples/tool-use/custom_function_tool_one_agent_direct.py index 6c6238397..0b6acf983 100644 --- a/python/examples/tool-use/custom_function_tool_one_agent_direct.py +++ b/python/examples/tool-use/custom_function_tool_one_agent_direct.py @@ -49,7 +49,7 @@ async def main() -> None: ) # Send a task to the tool user. - result = runtime.send_message(UserRequest("What is the stock price of NVDA on 2024/06/01"), tool_agent) + result = await runtime.send_message(UserRequest("What is the stock price of NVDA on 2024/06/01"), tool_agent) # Run the runtime until the task is completed. while not result.done(): diff --git a/python/pyproject.toml b/python/pyproject.toml index 172d32bf6..4c94d9a2e 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -18,8 +18,7 @@ dependencies = [ "pillow", "aiohttp", "typing-extensions", - "pydantic>=1.10,<3", - "types-aiofiles" + "pydantic>=1.10,<3" ] [tool.hatch.envs.default] diff --git a/python/src/agnext/application/_single_threaded_agent_runtime.py b/python/src/agnext/application/_single_threaded_agent_runtime.py index 564807162..58140d8a9 100644 --- a/python/src/agnext/application/_single_threaded_agent_runtime.py +++ b/python/src/agnext/application/_single_threaded_agent_runtime.py @@ -106,7 +106,7 @@ class SingleThreadedAgentRuntime(AgentRuntime): return set(self._agent_factories.keys()) # Returns the response of the message - def send_message( + async def send_message( self, message: Any, recipient: AgentId, @@ -150,14 +150,14 @@ class SingleThreadedAgentRuntime(AgentRuntime): return future - def publish_message( + async def publish_message( self, message: Any, *, namespace: str | None = None, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, - ) -> Future[None]: + ) -> None: if cancellation_token is None: cancellation_token = CancellationToken() @@ -196,10 +196,6 @@ class SingleThreadedAgentRuntime(AgentRuntime): ) ) - future = asyncio.get_event_loop().create_future() - future.set_result(None) - return future - def save_state(self) -> Mapping[str, Any]: state: Dict[str, Dict[str, Any]] = {} for agent_id in self._instantiated_agents: diff --git a/python/src/agnext/core/_agent.py b/python/src/agnext/core/_agent.py index e4068f95b..4bb07ed3c 100644 --- a/python/src/agnext/core/_agent.py +++ b/python/src/agnext/core/_agent.py @@ -27,8 +27,9 @@ class Agent(Protocol): Returns: Any: Response to the message. Can be None. - Notes: - If there was a cancellation, this function should raise a `CancelledError`. + Raises: + asyncio.CancelledError: If the message was cancelled. + CantHandleException: If the agent cannot handle the message. """ ... diff --git a/python/src/agnext/core/_agent_proxy.py b/python/src/agnext/core/_agent_proxy.py index f53890bfc..27b912c51 100644 --- a/python/src/agnext/core/_agent_proxy.py +++ b/python/src/agnext/core/_agent_proxy.py @@ -26,14 +26,14 @@ class AgentProxy: """Metadata of the agent.""" return self._runtime.agent_metadata(self._agent) - def send_message( + async def send_message( self, message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None, ) -> Future[Any]: - return self._runtime.send_message( + return await self._runtime.send_message( message, recipient=self._agent, sender=sender, diff --git a/python/src/agnext/core/_agent_runtime.py b/python/src/agnext/core/_agent_runtime.py index 5b423086c..c1e4a585a 100644 --- a/python/src/agnext/core/_agent_runtime.py +++ b/python/src/agnext/core/_agent_runtime.py @@ -20,24 +20,64 @@ agent_instantiation_context: ContextVar[tuple[AgentRuntime, AgentId]] = ContextV @runtime_checkable class AgentRuntime(Protocol): # Returns the response of the message - def send_message( + # Can raise CantHandleException + async def send_message( self, message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, - ) -> Future[Any]: ... + ) -> Future[Any]: + """Send a message to an agent and return a future that will resolve to the response. + + The act of sending a message may be asynchronous, and the response to the message itself is also asynchronous. For example: + + .. code-block:: python + + response_future = await runtime.send_message(MyMessage("Hello"), recipient=agent_id) + response = await response_future + + The returned future only needs to be awaited if the response is needed. If the response is not needed, the future can be ignored. + + Args: + message (Any): The message to send. + recipient (AgentId): The agent to send the message to. + sender (AgentId | None, optional): Agent which sent the message. Should **only** be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None. + cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None. + + Raises: + CantHandleException: If the recipient cannot handle the message. + UndeliverableException: If the message cannot be delivered. + + Returns: + Future[Any]: A future that will resolve to the response of the message. + """ + + ... # No responses from publishing - def publish_message( + async def publish_message( self, message: Any, *, namespace: str | None = None, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, - ) -> Future[None]: ... + ) -> None: + """Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender. + + No responses are expected from publishing. + + Args: + message (Any): The message to publish. + namespace (str | None, optional): The namespace to publish to. Defaults to None. + sender (AgentId | None, optional): The agent which sent the message. Defaults to None. + cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None. + + Raises: + UndeliverableException: If the message cannot be delivered. + """ @overload def register( @@ -62,7 +102,7 @@ class AgentRuntime(Protocol): Args: name (str): The name of the type agent this factory creates. - agent_factory (Callable[[], T] | Callable[[AgentRuntime, AgentId], T]): The factory that creates the agent. + agent_factory (Callable[[], T] | Callable[[AgentRuntime, AgentId], T]): The factory that creates the agent, where T is a concrete Agent type. Example: @@ -82,8 +122,29 @@ class AgentRuntime(Protocol): ... - def get(self, name: str, *, namespace: str = "default") -> AgentId: ... - def get_proxy(self, name: str, *, namespace: str = "default") -> AgentProxy: ... + def get(self, name: str, *, namespace: str = "default") -> AgentId: + """Get an agent by name and namespace. + + Args: + name (str): The name of the agent. + namespace (str, optional): The namespace of the agent. Defaults to "default". + + Returns: + AgentId: The agent id. + """ + ... + + def get_proxy(self, name: str, *, namespace: str = "default") -> AgentProxy: + """Get a proxy for an agent by name and namespace. + + Args: + name (str): The name of the agent. + namespace (str, optional): The namespace of the agent. Defaults to "default". + + Returns: + AgentProxy: The agent proxy. + """ + ... @overload def register_and_get( @@ -110,6 +171,16 @@ class AgentRuntime(Protocol): *, namespace: str = "default", ) -> AgentId: + """Register an agent factory with the runtime associated with a specific name and get the agent id. The name must be unique. + + Args: + name (str): The name of the type agent this factory creates. + agent_factory (Callable[[], T] | Callable[[AgentRuntime, AgentId], T]): The factory that creates the agent, where T is a concrete Agent type. + namespace (str, optional): The namespace of the agent. Defaults to "default". + + Returns: + AgentId: The agent id. + """ self.register(name, agent_factory) return self.get(name, namespace=namespace) @@ -138,15 +209,66 @@ class AgentRuntime(Protocol): *, namespace: str = "default", ) -> AgentProxy: + """Register an agent factory with the runtime associated with a specific name and get the agent proxy. The name must be unique. + + Args: + name (str): The name of the type agent this factory creates. + agent_factory (Callable[[], T] | Callable[[AgentRuntime, AgentId], T]): The factory that creates the agent, where T is a concrete Agent type. + namespace (str, optional): The namespace of the agent. Defaults to "default". + + Returns: + AgentProxy: The agent proxy. + """ self.register(name, agent_factory) return self.get_proxy(name, namespace=namespace) - def save_state(self) -> Mapping[str, Any]: ... + def save_state(self) -> Mapping[str, Any]: + """Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to :meth:`load_state`. - def load_state(self, state: Mapping[str, Any]) -> None: ... + The structure of the state is implementation defined and can be any JSON serializable object. - def agent_metadata(self, agent: AgentId) -> AgentMetadata: ... + Returns: + Mapping[str, Any]: The saved state. + """ + ... - def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]: ... + def load_state(self, state: Mapping[str, Any]) -> None: + """Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by :meth:`save_state`. - def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None: ... + Args: + state (Mapping[str, Any]): The saved state. + """ + ... + + def agent_metadata(self, agent: AgentId) -> AgentMetadata: + """Get the metadata for an agent. + + Args: + agent (AgentId): The agent id. + + Returns: + AgentMetadata: The agent metadata. + """ + ... + + def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]: + """Save the state of a single agent. + + The structure of the state is implementation defined and can be any JSON serializable object. + + Args: + agent (AgentId): The agent id. + + Returns: + Mapping[str, Any]: The saved state. + """ + ... + + def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None: + """Load the state of a single agent. + + Args: + agent (AgentId): The agent id. + state (Mapping[str, Any]): The saved state. + """ + ... diff --git a/python/src/agnext/core/_base_agent.py b/python/src/agnext/core/_base_agent.py index 041328f62..909b68aad 100644 --- a/python/src/agnext/core/_base_agent.py +++ b/python/src/agnext/core/_base_agent.py @@ -49,18 +49,18 @@ class BaseAgent(ABC, Agent): @abstractmethod async def on_message(self, message: Any, cancellation_token: CancellationToken) -> Any: ... - # Returns the response of the message - def send_message( + async def send_message( self, message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, ) -> Future[Any]: + """See :py:meth:`agnext.core.AgentRuntime.send_message` for more information.""" if cancellation_token is None: cancellation_token = CancellationToken() - future = self._runtime.send_message( + future = await self._runtime.send_message( message, sender=self.id, recipient=recipient, @@ -69,17 +69,13 @@ class BaseAgent(ABC, Agent): cancellation_token.link_future(future) return future - def publish_message( + async def publish_message( self, message: Any, *, cancellation_token: CancellationToken | None = None, - ) -> Future[None]: - if cancellation_token is None: - cancellation_token = CancellationToken() - - future = self._runtime.publish_message(message, sender=self.id, cancellation_token=cancellation_token) - return future + ) -> None: + await self._runtime.publish_message(message, sender=self.id, cancellation_token=cancellation_token) def save_state(self) -> Mapping[str, Any]: warnings.warn("save_state not implemented", stacklevel=2) diff --git a/python/teams/team-one/examples/example_coder.py b/python/teams/team-one/examples/example_coder.py index 1d0ea6f2b..3d304ecce 100644 --- a/python/teams/team-one/examples/example_coder.py +++ b/python/teams/team-one/examples/example_coder.py @@ -18,7 +18,7 @@ async def main() -> None: task = TaskMessage(input(f"Enter a task for {coder.name}: ")) # Send a task to the tool user. - result = runtime.send_message(task, coder) + result = await runtime.send_message(task, coder) # Run the runtime until the task is completed. while not result.done(): diff --git a/python/teams/team-one/examples/example_file_surfer.py b/python/teams/team-one/examples/example_file_surfer.py index d6aacb190..2996d40f4 100644 --- a/python/teams/team-one/examples/example_file_surfer.py +++ b/python/teams/team-one/examples/example_file_surfer.py @@ -20,7 +20,7 @@ async def main() -> None: task = TaskMessage(input(f"Enter a task for {file_surfer.name}: ")) # Send a task to the tool user. - result = runtime.send_message(task, file_surfer) + result = await runtime.send_message(task, file_surfer) # Run the runtime until the task is completed. while not result.done(): diff --git a/python/teams/team-one/pyproject.toml b/python/teams/team-one/pyproject.toml index 78b192ca9..f3fb0117b 100644 --- a/python/teams/team-one/pyproject.toml +++ b/python/teams/team-one/pyproject.toml @@ -25,7 +25,9 @@ dependencies = [ "pyright==1.1.368", "mypy==1.10.0", "ruff==0.4.8", - "pytest" + "pytest", + "aiofiles", + "types-aiofiles" ] [tool.hatch.envs.default.extra-scripts] diff --git a/python/teams/team-one/src/team_one/agents/coder.py b/python/teams/team-one/src/team_one/agents/coder.py index 0a3ad612b..8c3b60e62 100644 --- a/python/teams/team-one/src/team_one/agents/coder.py +++ b/python/teams/team-one/src/team_one/agents/coder.py @@ -75,8 +75,9 @@ class Coder(TypeRoutedAgent): # Keep executing the tools until the response is not a list of function calls. while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content): + # TODO: gather internally too results = await asyncio.gather( - *[self.send_message(ToolMessage(function_call=call), self.id) for call in response.content] + *[await self.send_message(ToolMessage(function_call=call), self.id) for call in response.content] ) # Combine the results into a single response. result = FunctionExecutionResultMessage(content=[result.result for result in results]) diff --git a/python/teams/team-one/src/team_one/agents/file_surfer.py b/python/teams/team-one/src/team_one/agents/file_surfer.py index 25120d658..6bb801f51 100644 --- a/python/teams/team-one/src/team_one/agents/file_surfer.py +++ b/python/teams/team-one/src/team_one/agents/file_surfer.py @@ -103,8 +103,9 @@ class FileSurfer(TypeRoutedAgent): # Keep executing the tools until the response is not a list of function calls. while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content): + # TODO: gather internally too results = await asyncio.gather( - *[self.send_message(ToolMessage(function_call=call), self.id) for call in response.content] + *[await self.send_message(ToolMessage(function_call=call), self.id) for call in response.content] ) # Combine the results into a single response. result = FunctionExecutionResultMessage(content=[result.result for result in results]) diff --git a/python/tests/test_cancellation.py b/python/tests/test_cancellation.py index b17f35d04..00aad064d 100644 --- a/python/tests/test_cancellation.py +++ b/python/tests/test_cancellation.py @@ -43,7 +43,7 @@ class NestingLongRunningAgent(TypeRoutedAgent): @message_handler async def on_new_message(self, message: MessageType, cancellation_token: CancellationToken) -> MessageType: self.called = True - response = self.send_message(message, self._nested_agent, cancellation_token=cancellation_token) + response = await self.send_message(message, self._nested_agent, cancellation_token=cancellation_token) try: val = await response assert isinstance(val, MessageType) @@ -59,7 +59,7 @@ async def test_cancellation_with_token() -> None: long_running = runtime.register_and_get("long_running", LongRunningAgent) token = CancellationToken() - response = runtime.send_message(MessageType(), recipient=long_running, cancellation_token=token) + response = await runtime.send_message(MessageType(), recipient=long_running, cancellation_token=token) assert not response.done() await runtime.process_next() @@ -83,7 +83,7 @@ async def test_nested_cancellation_only_outer_called() -> None: nested = runtime.register_and_get("nested", lambda: NestingLongRunningAgent(long_running)) token = CancellationToken() - response = runtime.send_message(MessageType(), nested, cancellation_token=token) + response = await runtime.send_message(MessageType(), nested, cancellation_token=token) assert not response.done() await runtime.process_next() @@ -108,7 +108,7 @@ async def test_nested_cancellation_inner_called() -> None: nested = runtime.register_and_get("nested", lambda: NestingLongRunningAgent(long_running)) token = CancellationToken() - response = runtime.send_message(MessageType(), nested, cancellation_token=token) + response = await runtime.send_message(MessageType(), nested, cancellation_token=token) assert not response.done() await runtime.process_next() diff --git a/python/tests/test_intervention.py b/python/tests/test_intervention.py index 88e673528..8f29dd1ac 100644 --- a/python/tests/test_intervention.py +++ b/python/tests/test_intervention.py @@ -21,7 +21,7 @@ async def test_intervention_count_messages() -> None: runtime = SingleThreadedAgentRuntime(intervention_handler=handler) loopback = runtime.register_and_get("name", LoopbackAgent) - response = runtime.send_message(MessageType(), recipient=loopback) + response = await runtime.send_message(MessageType(), recipient=loopback) while not response.done(): await runtime.process_next() @@ -41,7 +41,7 @@ async def test_intervention_drop_send() -> None: runtime = SingleThreadedAgentRuntime(intervention_handler=handler) loopback = runtime.register_and_get("name", LoopbackAgent) - response = runtime.send_message(MessageType(), recipient=loopback) + response = await runtime.send_message(MessageType(), recipient=loopback) while not response.done(): await runtime.process_next() @@ -64,7 +64,7 @@ async def test_intervention_drop_response() -> None: runtime = SingleThreadedAgentRuntime(intervention_handler=handler) loopback = runtime.register_and_get("name", LoopbackAgent) - response = runtime.send_message(MessageType(), recipient=loopback) + response = await runtime.send_message(MessageType(), recipient=loopback) while not response.done(): await runtime.process_next() @@ -87,7 +87,7 @@ async def test_intervention_raise_exception_on_send() -> None: runtime = SingleThreadedAgentRuntime(intervention_handler=handler) long_running = runtime.register_and_get("name", LoopbackAgent) - response = runtime.send_message(MessageType(), recipient=long_running) + response = await runtime.send_message(MessageType(), recipient=long_running) while not response.done(): await runtime.process_next() @@ -112,7 +112,7 @@ async def test_intervention_raise_exception_on_respond() -> None: runtime = SingleThreadedAgentRuntime(intervention_handler=handler) long_running = runtime.register_and_get("name", LoopbackAgent) - response = runtime.send_message(MessageType(), recipient=long_running) + response = await runtime.send_message(MessageType(), recipient=long_running) while not response.done(): await runtime.process_next()