feat(a2a): add async execution support for A2A delegation

This commit adds async versions of A2A functions to support calling from
async contexts without creating new event loops.

Changes:
- Add afetch_agent_card() async function in utils.py
- Add aexecute_a2a_delegation() async function in utils.py
- Add async helper functions in wrapper.py:
  - _afetch_card_from_config()
  - _afetch_agent_cards_concurrently()
  - _aexecute_task_with_a2a()
  - _ahandle_agent_response_and_continue()
  - _adelegate_to_a2a()
- Update wrap_agent_with_a2a_instance() to wrap both execute_task and
  aexecute_task methods
- Add comprehensive tests for async A2A execution

Fixes #4162

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-12-30 07:54:43 +00:00
parent b9dd166a6b
commit 4f0b6f6427
3 changed files with 992 additions and 4 deletions

View File

@@ -134,6 +134,52 @@ def fetch_agent_card(
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Async version of fetch_agent_card for use in async contexts.
Fetches AgentCard from an A2A endpoint with optional caching.
This version should be used when calling from an async context
to avoid creating a new event loop.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
use_cache: Whether to use caching (default True)
Returns:
AgentCard object with agent capabilities and skills
Raises:
httpx.HTTPStatusError: If the request fails
A2AClientHTTPError: If authentication fails
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
return await _fetch_agent_card_async_cached(
endpoint=endpoint, auth_hash=auth_hash, timeout=timeout
)
return await _fetch_agent_card_async(endpoint=endpoint, auth=auth, timeout=timeout)
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _fetch_agent_card_async_cached(
endpoint: str,
@@ -329,6 +375,114 @@ def execute_a2a_delegation(
loop.close()
async def aexecute_a2a_delegation(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
task_description: str,
context: str | None = None,
context_id: str | None = None,
task_id: str | None = None,
reference_task_ids: list[str] | None = None,
metadata: dict[str, Any] | None = None,
extensions: dict[str, Any] | None = None,
conversation_history: list[Message] | None = None,
agent_id: str | None = None,
agent_role: Role | None = None,
agent_branch: Any | None = None,
response_model: type[BaseModel] | None = None,
turn_number: int | None = None,
) -> dict[str, Any]:
"""Async version of execute_a2a_delegation for use in async contexts.
Execute a task delegation to a remote A2A agent with multi-turn support.
This version should be used when calling from an async context
to avoid creating a new event loop.
Handles:
- AgentCard discovery
- Authentication setup
- Message creation and sending
- Response parsing
- Multi-turn conversations
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
Returns:
Dictionary with:
- status: "completed", "input_required", "failed", etc.
- result: Result string (if completed)
- error: Error message (if failed)
- history: List of new Message objects from this exchange
Raises:
ImportError: If a2a-sdk is not installed
"""
is_multiturn = bool(conversation_history and len(conversation_history) > 0)
if turn_number is None:
turn_number = (
len([m for m in (conversation_history or []) if m.role == Role.user]) + 1
)
crewai_event_bus.emit(
agent_branch,
A2ADelegationStartedEvent(
endpoint=endpoint,
task_description=task_description,
agent_id=agent_id,
is_multiturn=is_multiturn,
turn_number=turn_number,
),
)
result = await _execute_a2a_delegation_async(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history or [],
is_multiturn=is_multiturn,
turn_number=turn_number,
agent_branch=agent_branch,
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
)
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status=result["status"],
result=result.get("result"),
error=result.get("error"),
is_multiturn=is_multiturn,
),
)
return result
async def _execute_a2a_delegation_async(
endpoint: str,
auth: AuthScheme | None,

View File

@@ -5,7 +5,8 @@ Wraps agent classes with A2A delegation capabilities.
from __future__ import annotations
from collections.abc import Callable
import asyncio
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from types import MethodType
@@ -24,6 +25,8 @@ from crewai.a2a.templates import (
)
from crewai.a2a.types import AgentResponseProtocol
from crewai.a2a.utils import (
aexecute_a2a_delegation,
afetch_agent_card,
execute_a2a_delegation,
fetch_agent_card,
get_a2a_agents_and_response_model,
@@ -46,11 +49,11 @@ if TYPE_CHECKING:
def wrap_agent_with_a2a_instance(
agent: Agent, extension_registry: ExtensionRegistry | None = None
) -> None:
"""Wrap an agent instance's execute_task method with A2A support.
"""Wrap an agent instance's execute_task and aexecute_task methods with A2A support.
This function modifies the agent instance by wrapping its execute_task
method to add A2A delegation capabilities. Should only be called when
the agent has a2a configuration set.
and aexecute_task methods to add A2A delegation capabilities. Should only
be called when the agent has a2a configuration set.
Args:
agent: The agent instance to wrap
@@ -99,6 +102,49 @@ def wrap_agent_with_a2a_instance(
object.__setattr__(agent, "execute_task", MethodType(execute_task_with_a2a, agent))
original_aexecute_task = agent.aexecute_task.__func__ # type: ignore[attr-defined]
@wraps(original_aexecute_task)
async def aexecute_task_with_a2a(
self: Agent,
task: Task,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> str:
"""Execute task asynchronously with A2A delegation support.
This async version should be used when calling from an async context
to avoid creating a new event loop.
Args:
self: The agent instance
task: The task to execute
context: Optional context for task execution
tools: Optional tools available to the agent
Returns:
Task execution result
"""
if not self.a2a:
return await original_aexecute_task(self, task, context, tools) # type: ignore[no-any-return]
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
return await _aexecute_task_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=original_aexecute_task,
task=task,
agent_response_model=agent_response_model,
context=context,
tools=tools,
extension_registry=extension_registry,
)
object.__setattr__(
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
)
def _fetch_card_from_config(
config: A2AConfig,
@@ -701,3 +747,497 @@ def _delegate_to_a2a(
finally:
task.description = original_task_description
async def _afetch_card_from_config(
config: A2AConfig,
) -> tuple[A2AConfig, AgentCard | Exception]:
"""Async version of _fetch_card_from_config.
Fetch agent card from A2A config asynchronously.
Args:
config: A2A configuration
Returns:
Tuple of (config, card or exception)
"""
try:
card = await afetch_agent_card(
endpoint=config.endpoint,
auth=config.auth,
timeout=config.timeout,
)
return config, card
except Exception as e:
return config, e
async def _afetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Async version of _fetch_agent_cards_concurrently.
Fetch agent cards concurrently for multiple A2A agents using asyncio.gather.
Args:
a2a_agents: List of A2A agent configurations
Returns:
Tuple of (agent_cards dict, failed_agents dict mapping endpoint to error message)
"""
agent_cards: dict[str, AgentCard] = {}
failed_agents: dict[str, str] = {}
if not a2a_agents:
return agent_cards, failed_agents
results = await asyncio.gather(
*[_afetch_card_from_config(config) for config in a2a_agents],
return_exceptions=False,
)
for config, result in results:
if isinstance(result, Exception):
if config.fail_fast:
raise RuntimeError(
f"Failed to fetch agent card from {config.endpoint}. "
f"Ensure the A2A agent is running and accessible. Error: {result}"
) from result
failed_agents[config.endpoint] = str(result)
else:
agent_cards[config.endpoint] = result
return agent_cards, failed_agents
async def _aexecute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task,
agent_response_model: type[BaseModel],
context: str | None,
tools: list[BaseTool] | None,
extension_registry: ExtensionRegistry,
) -> str:
"""Async version of _execute_task_with_a2a.
Wrap aexecute_task with A2A delegation logic.
Args:
self: The agent instance
a2a_agents: Dictionary of A2A agent configurations
original_fn: The original aexecute_task method
task: The task to execute
context: Optional context for task execution
tools: Optional tools available to the agent
agent_response_model: Optional agent response model
extension_registry: Registry of A2A extensions
Returns:
Task execution result (either from LLM or A2A agent)
"""
original_description: str = task.description
original_output_pydantic = task.output_pydantic
original_response_model = task.response_model
agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents)
if not agent_cards and a2a_agents and failed_agents:
unavailable_agents_text = ""
for endpoint, error in failed_agents.items():
unavailable_agents_text += f" - {endpoint}: {error}\n"
notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute(
unavailable_agents=unavailable_agents_text
)
task.description = f"{original_description}{notice}"
try:
return await original_fn(self, task, context, tools)
finally:
task.description = original_description
task.description, _ = _augment_prompt_with_a2a(
a2a_agents=a2a_agents,
task_description=original_description,
agent_cards=agent_cards,
failed_agents=failed_agents,
extension_registry=extension_registry,
)
task.response_model = agent_response_model
try:
raw_result = await original_fn(self, task, context, tools)
agent_response = _parse_agent_response(
raw_result=raw_result, agent_response_model=agent_response_model
)
if extension_registry and isinstance(agent_response, BaseModel):
agent_response = extension_registry.process_response_with_all(
agent_response, {}
)
if isinstance(agent_response, BaseModel) and isinstance(
agent_response, AgentResponseProtocol
):
if agent_response.is_a2a:
return await _adelegate_to_a2a(
self,
agent_response=agent_response,
task=task,
original_fn=original_fn,
context=context,
tools=tools,
agent_cards=agent_cards,
original_task_description=original_description,
extension_registry=extension_registry,
)
return str(agent_response.message)
return raw_result
finally:
task.description = original_description
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model
async def _ahandle_agent_response_and_continue(
self: Agent,
a2a_result: dict[str, Any],
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig],
original_task_description: str,
conversation_history: list[Message],
turn_num: int,
max_turns: int,
task: Task,
original_fn: Callable[..., Coroutine[Any, Any, str]],
context: str | None,
tools: list[BaseTool] | None,
agent_response_model: type[BaseModel],
) -> tuple[str | None, str | None]:
"""Async version of _handle_agent_response_and_continue.
Handle A2A result and get CrewAI agent's response asynchronously.
Args:
self: The agent instance
a2a_result: Result from A2A delegation
agent_id: ID of the A2A agent
agent_cards: Pre-fetched agent cards
a2a_agents: List of A2A configurations
original_task_description: Original task description
conversation_history: Conversation history
turn_num: Current turn number
max_turns: Maximum turns allowed
task: The task being executed
original_fn: Original aexecute_task method
context: Optional context
tools: Optional tools
agent_response_model: Response model for parsing
Returns:
Tuple of (final_result, current_request) where:
- final_result is not None if conversation should end
- current_request is the next message to send if continuing
"""
agent_cards_dict = agent_cards or {}
if "agent_card" in a2a_result and agent_id not in agent_cards_dict:
agent_cards_dict[agent_id] = a2a_result["agent_card"]
task.description, disable_structured_output = _augment_prompt_with_a2a(
a2a_agents=a2a_agents,
task_description=original_task_description,
conversation_history=conversation_history,
turn_num=turn_num,
max_turns=max_turns,
agent_cards=agent_cards_dict,
)
original_response_model = task.response_model
if disable_structured_output:
task.response_model = None
raw_result = await original_fn(self, task, context, tools)
if disable_structured_output:
task.response_model = original_response_model
if disable_structured_output:
final_turn_number = turn_num + 1
result_text = str(raw_result)
crewai_event_bus.emit(
None,
A2AMessageSentEvent(
message=result_text,
turn_number=final_turn_number,
is_multiturn=True,
agent_role=self.role,
),
)
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=result_text,
error=None,
total_turns=final_turn_number,
),
)
return result_text, None
llm_response = _parse_agent_response(
raw_result=raw_result, agent_response_model=agent_response_model
)
if isinstance(llm_response, BaseModel) and isinstance(
llm_response, AgentResponseProtocol
):
if not llm_response.is_a2a:
final_turn_number = turn_num + 1
crewai_event_bus.emit(
None,
A2AMessageSentEvent(
message=str(llm_response.message),
turn_number=final_turn_number,
is_multiturn=True,
agent_role=self.role,
),
)
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=str(llm_response.message),
error=None,
total_turns=final_turn_number,
),
)
return str(llm_response.message), None
return None, str(llm_response.message)
return str(raw_result), None
async def _adelegate_to_a2a(
self: Agent,
agent_response: AgentResponseProtocol,
task: Task,
original_fn: Callable[..., Coroutine[Any, Any, str]],
context: str | None,
tools: list[BaseTool] | None,
agent_cards: dict[str, AgentCard] | None = None,
original_task_description: str | None = None,
extension_registry: ExtensionRegistry | None = None,
) -> str:
"""Async version of _delegate_to_a2a.
Delegate to A2A agent with multi-turn conversation support asynchronously.
Args:
self: The agent instance
agent_response: The AgentResponse indicating delegation
task: The task being executed (for extracting A2A fields)
original_fn: The original aexecute_task method for follow-ups
context: Optional context for task execution
tools: Optional tools available to the agent
agent_cards: Pre-fetched agent cards from _aexecute_task_with_a2a
original_task_description: The original task description before A2A augmentation
extension_registry: Optional registry of A2A extensions
Returns:
Result from A2A agent
Raises:
ImportError: If a2a-sdk is not installed
"""
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
agent_ids = tuple(config.endpoint for config in a2a_agents)
current_request = str(agent_response.message)
if hasattr(agent_response, "a2a_ids") and agent_response.a2a_ids:
agent_id = agent_response.a2a_ids[0]
else:
agent_id = agent_ids[0] if agent_ids else ""
if agent_id and agent_id not in agent_ids:
raise ValueError(
f"Unknown A2A agent ID(s): {agent_response.a2a_ids} not in {agent_ids}"
)
agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents))
task_config = task.config or {}
context_id = task_config.get("context_id")
task_id_config = task_config.get("task_id")
metadata = task_config.get("metadata")
extensions = task_config.get("extensions")
reference_task_ids = task_config.get("reference_task_ids", [])
if original_task_description is None:
original_task_description = task.description
conversation_history: list[Message] = []
max_turns = agent_config.max_turns
try:
for turn_num in range(max_turns):
console_formatter = getattr(crewai_event_bus, "_console", None)
agent_branch = None
if console_formatter:
agent_branch = getattr(
console_formatter, "current_agent_branch", None
) or getattr(console_formatter, "current_task_branch", None)
a2a_result = await aexecute_a2a_delegation(
endpoint=agent_config.endpoint,
auth=agent_config.auth,
timeout=agent_config.timeout,
task_description=current_request,
context_id=context_id,
task_id=task_id_config,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
agent_id=agent_id,
agent_role=Role.user,
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
)
conversation_history = a2a_result.get("history", [])
if conversation_history:
latest_message = conversation_history[-1]
if latest_message.task_id is not None:
task_id_config = latest_message.task_id
if latest_message.context_id is not None:
context_id = latest_message.context_id
if a2a_result["status"] in ["completed", "input_required"]:
if (
a2a_result["status"] == "completed"
and agent_config.trust_remote_completion_status
):
if (
task_id_config is not None
and task_id_config not in reference_task_ids
):
reference_task_ids.append(task_id_config)
if task.config is None:
task.config = {}
task.config["reference_task_ids"] = reference_task_ids
result_text = a2a_result.get("result", "")
final_turn_number = turn_num + 1
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=result_text,
error=None,
total_turns=final_turn_number,
),
)
return cast(str, result_text)
final_result, next_request = await _ahandle_agent_response_and_continue(
self=self,
a2a_result=a2a_result,
agent_id=agent_id,
agent_cards=agent_cards,
a2a_agents=a2a_agents,
original_task_description=original_task_description,
conversation_history=conversation_history,
turn_num=turn_num,
max_turns=max_turns,
task=task,
original_fn=original_fn,
context=context,
tools=tools,
agent_response_model=agent_response_model,
)
if final_result is not None:
return final_result
if next_request is not None:
current_request = next_request
continue
error_msg = a2a_result.get("error", "Unknown error")
final_result, next_request = await _ahandle_agent_response_and_continue(
self=self,
a2a_result=a2a_result,
agent_id=agent_id,
agent_cards=agent_cards,
a2a_agents=a2a_agents,
original_task_description=original_task_description,
conversation_history=conversation_history,
turn_num=turn_num,
max_turns=max_turns,
task=task,
original_fn=original_fn,
context=context,
tools=tools,
agent_response_model=agent_response_model,
)
if final_result is not None:
return final_result
if next_request is not None:
current_request = next_request
continue
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="failed",
final_result=None,
error=error_msg,
total_turns=turn_num + 1,
),
)
return f"A2A delegation failed: {error_msg}"
if conversation_history:
for msg in reversed(conversation_history):
if msg.role == Role.agent:
text_parts = [
part.root.text for part in msg.parts if part.root.kind == "text"
]
final_message = (
" ".join(text_parts) if text_parts else "Conversation completed"
)
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=final_message,
error=None,
total_turns=max_turns,
),
)
return final_message
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="failed",
final_result=None,
error=f"Conversation exceeded maximum turns ({max_turns})",
total_turns=max_turns,
),
)
raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})")
finally:
task.description = original_task_description

View File

@@ -0,0 +1,294 @@
"""Test A2A async execution support.
Tests that verify async execution works correctly without creating new event loops.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai import Agent
from crewai.a2a.config import A2AConfig
try:
from a2a.types import Message, Role
A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_agent_with_a2a_has_async_wrapper():
"""Verify that agents with a2a get the async wrapper applied to aexecute_task."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
assert agent.a2a is not None
assert callable(agent.aexecute_task)
assert hasattr(agent.aexecute_task, "__wrapped__")
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_async_wrapper_is_applied_differently_per_instance():
"""Verify that agents with and without a2a have different aexecute_task methods."""
agent_without_a2a = Agent(
role="agent without a2a",
goal="test goal",
backstory="test backstory",
)
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
agent_with_a2a = Agent(
role="agent with a2a",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
assert (
agent_without_a2a.aexecute_task.__func__
is not agent_with_a2a.aexecute_task.__func__
)
assert not hasattr(agent_without_a2a.aexecute_task, "__wrapped__")
assert hasattr(agent_with_a2a.aexecute_task, "__wrapped__")
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_async_delegate_to_a2a_does_not_create_new_event_loop():
"""Verify that async A2A delegation doesn't create a new event loop."""
from crewai.a2a.wrapper import _adelegate_to_a2a
from crewai import Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
async def mock_original_fn(self, task, context, tools):
return '{"is_a2a": false, "message": "Done", "a2a_ids": []}'
with (
patch("crewai.a2a.wrapper.aexecute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._afetch_agent_cards_concurrently") as mock_fetch,
patch("asyncio.new_event_loop") as mock_new_loop,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
}
result = await _adelegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=mock_original_fn,
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
assert result == "Done by remote"
mock_new_loop.assert_not_called()
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_aexecute_a2a_delegation_does_not_create_new_event_loop():
"""Verify that aexecute_a2a_delegation doesn't create a new event loop."""
from crewai.a2a.utils import aexecute_a2a_delegation
with (
patch(
"crewai.a2a.utils._execute_a2a_delegation_async"
) as mock_execute_async,
patch("asyncio.new_event_loop") as mock_new_loop,
):
mock_execute_async.return_value = {
"status": "completed",
"result": "Done",
"history": [],
}
result = await aexecute_a2a_delegation(
endpoint="http://test-endpoint.com",
auth=None,
timeout=30,
task_description="test task",
agent_id="test-agent",
)
assert result["status"] == "completed"
mock_new_loop.assert_not_called()
mock_execute_async.assert_called_once()
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_afetch_agent_card_does_not_create_new_event_loop():
"""Verify that afetch_agent_card doesn't create a new event loop."""
from crewai.a2a.utils import afetch_agent_card
with (
patch("crewai.a2a.utils._fetch_agent_card_async") as mock_fetch_async,
patch("asyncio.new_event_loop") as mock_new_loop,
):
mock_card = MagicMock()
mock_card.name = "Test Agent"
mock_fetch_async.return_value = mock_card
result = await afetch_agent_card(
endpoint="http://test-endpoint.com",
auth=None,
timeout=30,
use_cache=False,
)
assert result.name == "Test Agent"
mock_new_loop.assert_not_called()
mock_fetch_async.assert_called_once()
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_afetch_agent_cards_concurrently():
"""Verify that _afetch_agent_cards_concurrently fetches cards using asyncio.gather."""
from crewai.a2a.wrapper import _afetch_agent_cards_concurrently
a2a_configs = [
A2AConfig(endpoint="http://test-endpoint-1.com"),
A2AConfig(endpoint="http://test-endpoint-2.com"),
]
with patch("crewai.a2a.wrapper.afetch_agent_card") as mock_fetch:
mock_card1 = MagicMock()
mock_card1.name = "Agent 1"
mock_card2 = MagicMock()
mock_card2.name = "Agent 2"
async def side_effect(endpoint, auth, timeout):
if "endpoint-1" in endpoint:
return mock_card1
return mock_card2
mock_fetch.side_effect = side_effect
agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_configs)
assert len(agent_cards) == 2
assert len(failed_agents) == 0
assert mock_fetch.call_count == 2
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_aexecute_task_with_a2a_uses_async_path():
"""Verify that _aexecute_task_with_a2a uses the async delegation path."""
from crewai.a2a.wrapper import _aexecute_task_with_a2a
from crewai.a2a.utils import get_a2a_agents_and_response_model
from crewai import Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
task = Task(description="test task", expected_output="test output", agent=agent)
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(a2a_config)
async def mock_original_fn(self, task, context, tools):
return '{"is_a2a": false, "message": "Direct response", "a2a_ids": []}'
with (
patch("crewai.a2a.wrapper._afetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
from crewai.a2a.extensions.base import ExtensionRegistry
result = await _aexecute_task_with_a2a(
self=agent,
a2a_agents=a2a_agents,
original_fn=mock_original_fn,
task=task,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=ExtensionRegistry(),
)
assert result == "Direct response"
mock_fetch.assert_called_once()
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
@pytest.mark.asyncio
async def test_async_execution_in_running_event_loop():
"""Verify that async A2A execution works correctly within a running event loop.
This test simulates the scenario described in issue #4162 where A2A is called
from an async context that already has a running event loop.
"""
from crewai.a2a.utils import aexecute_a2a_delegation
current_loop = asyncio.get_running_loop()
assert current_loop is not None
with patch(
"crewai.a2a.utils._execute_a2a_delegation_async"
) as mock_execute_async:
mock_execute_async.return_value = {
"status": "completed",
"result": "Success from async context",
"history": [],
}
result = await aexecute_a2a_delegation(
endpoint="http://test-endpoint.com",
auth=None,
timeout=30,
task_description="test task from async context",
agent_id="test-agent",
)
assert result["status"] == "completed"
assert result["result"] == "Success from async context"