Compare commits

...

8 Commits

Author SHA1 Message Date
Zamil Majdy
9190a28f24 refactor: move is_uuid and is_creator_slug helpers to shared utils 2026-03-17 13:52:12 +07:00
Zamil Majdy
0d21179a25 fix: normalize query input for case-insensitive UUID lookup and whitespace trimming 2026-03-17 13:31:25 +07:00
Zamil Majdy
a7c13d676c fix(backend/copilot): use specific message for MCP tool blocks in UUID lookup
MCP tools have their own run_mcp_tool path — don't tell users they can
"only be used in agent graphs" when looking up an MCP block by UUID.
Also use block.id instead of raw query in all excluded-block messages.
2026-03-17 13:16:07 +07:00
Zamil Majdy
d68de002f5 refactor(backend/copilot): simplify search_agents by splitting marketplace/library paths
Split the single search_agents function into _search_marketplace and
_search_library helpers. Each path is now linear with its own validation,
search logic, and response formatting — eliminating the scattered
source-based if/else branching.
2026-03-17 13:11:29 +07:00
Zamil Majdy
ea43bdf695 refactor(backend/copilot): extract _marketplace_agent_to_info helper
Consolidate duplicated AgentInfo construction for marketplace agents
into a single helper function, matching the existing pattern of
_library_agent_to_info.
2026-03-17 13:05:13 +07:00
Zamil Majdy
bbc4d9194f fix(backend/copilot): use canonical block.id in UUID lookup response
Use block.id instead of raw query string to ensure canonical ID is
returned regardless of input casing.
2026-03-17 12:59:25 +07:00
Zamil Majdy
98a0d7dcc5 fix(backend/copilot): address PR review comments
- Extract UUID_V4_PATTERN to shared utils.py, removing duplication
- Return explicit NoResultsResponse for disabled/excluded blocks on UUID lookup
- Add tests for direct ID lookup in find_block and agent_search
- Remove useless logger.debug calls, use f-string style consistently
2026-03-17 12:56:57 +07:00
Zamil Majdy
f2676de9d0 feat(backend/copilot): add direct ID lookup to find_agent and find_block tools
Enable copilot tools to look up agents by creator/slug and blocks by UUID
directly, avoiding unnecessary search queries when the user provides an
exact identifier. This improves accuracy and reduces latency for direct
lookups.
2026-03-17 12:48:55 +07:00
6 changed files with 538 additions and 127 deletions

View File

@@ -3,8 +3,7 @@
from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
from backend.api.features.library.model import LibraryAgent
@@ -19,16 +18,12 @@ from .models import (
NoResultsResponse,
ToolResponseBase,
)
from .utils import is_creator_slug, is_uuid
logger = logging.getLogger(__name__)
SearchSource = Literal["marketplace", "library"]
_UUID_PATTERN = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
re.IGNORECASE,
)
# Keywords that should be treated as "list all" rather than a literal search
_LIST_ALL_KEYWORDS = frozenset({"all", "*", "everything", "any", ""})
@@ -39,149 +34,158 @@ async def search_agents(
session_id: str | None = None,
user_id: str | None = None,
) -> ToolResponseBase:
"""
Search for agents in marketplace or user library.
"""Search for agents in marketplace or user library."""
if source == "marketplace":
return await _search_marketplace(query, session_id)
else:
return await _search_library(query, session_id, user_id)
For library searches, keywords like "all", "*", "everything", or an empty
query will list all agents without filtering.
Args:
query: Search query string. Special keywords list all library agents.
source: "marketplace" or "library"
session_id: Chat session ID
user_id: User ID (required for library search)
Returns:
AgentsFoundResponse, NoResultsResponse, or ErrorResponse
"""
# Normalize list-all keywords to empty string for library searches
if source == "library" and query.lower().strip() in _LIST_ALL_KEYWORDS:
query = ""
if source == "marketplace" and not query:
async def _search_marketplace(query: str, session_id: str | None) -> ToolResponseBase:
query = query.strip()
if not query:
return ErrorResponse(
message="Please provide a search query", session_id=session_id
)
if source == "library" and not user_id:
return ErrorResponse(
message="User authentication required to search library",
session_id=session_id,
)
agents: list[AgentInfo] = []
try:
if source == "marketplace":
# Direct lookup if query matches "creator/slug" pattern
if is_creator_slug(query):
logger.info(f"Query looks like creator/slug, trying direct lookup: {query}")
creator, slug = query.split("/", 1)
agent_info = await _get_marketplace_agent_by_slug(creator, slug)
if agent_info:
agents.append(agent_info)
if not agents:
logger.info(f"Searching marketplace for: {query}")
results = await store_db().get_store_agents(search_query=query, page_size=5)
for agent in results.agents:
agents.append(
AgentInfo(
id=f"{agent.creator}/{agent.slug}",
name=agent.agent_name,
description=agent.description or "",
source="marketplace",
in_library=False,
creator=agent.creator,
category="general",
rating=agent.rating,
runs=agent.runs,
is_featured=False,
)
)
else:
if _is_uuid(query):
logger.info(f"Query looks like UUID, trying direct lookup: {query}")
agent = await _get_library_agent_by_id(user_id, query) # type: ignore[arg-type]
if agent:
agents.append(agent)
logger.info(f"Found agent by direct ID lookup: {agent.name}")
if not agents:
search_term = query or None
logger.info(
f"{'Listing all agents in' if not query else 'Searching'} "
f"user library{'' if not query else f' for: {query}'}"
)
results = await library_db().list_library_agents(
user_id=user_id, # type: ignore[arg-type]
search_term=search_term,
page_size=50 if not query else 10,
)
for agent in results.agents:
agents.append(_library_agent_to_info(agent))
logger.info(f"Found {len(agents)} agents in {source}")
agents.append(_marketplace_agent_to_info(agent))
except NotFoundError:
pass
except DatabaseError as e:
logger.error(f"Error searching {source}: {e}", exc_info=True)
logger.error(f"Error searching marketplace: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to search {source}. Please try again.",
message="Failed to search marketplace. Please try again.",
error=str(e),
session_id=session_id,
)
if not agents:
if source == "marketplace":
suggestions = [
"Try more general terms",
"Browse categories in the marketplace",
"Check spelling",
]
no_results_msg = (
return NoResultsResponse(
message=(
f"No agents found matching '{query}'. Let the user know they can "
"try different keywords or browse the marketplace. Also let them "
"know you can create a custom agent for them based on their needs."
),
suggestions=[
"Try more general terms",
"Browse categories in the marketplace",
"Check spelling",
],
session_id=session_id,
)
return AgentsFoundResponse(
message=(
"Now you have found some options for the user to choose from. "
"You can add a link to a recommended agent at: /marketplace/agent/agent_id "
"Please ask the user if they would like to use any of these agents. "
"Let the user know we can create a custom agent for them based on their needs."
),
title=f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} for '{query}'",
agents=agents,
count=len(agents),
session_id=session_id,
)
async def _search_library(
query: str, session_id: str | None, user_id: str | None
) -> ToolResponseBase:
if not user_id:
return ErrorResponse(
message="User authentication required to search library",
session_id=session_id,
)
query = query.strip()
# Normalize list-all keywords to empty string
if query.lower() in _LIST_ALL_KEYWORDS:
query = ""
agents: list[AgentInfo] = []
try:
if is_uuid(query):
logger.info(f"Query looks like UUID, trying direct lookup: {query}")
agent = await _get_library_agent_by_id(user_id, query)
if agent:
agents.append(agent)
if not agents:
logger.info(
f"{'Listing all agents in' if not query else 'Searching'} "
f"user library{'' if not query else f' for: {query}'}"
)
elif not query:
# User asked to list all but library is empty
suggestions = [
"Browse the marketplace to find and add agents",
"Use find_agent to search the marketplace",
]
no_results_msg = (
"Your library is empty. Let the user know they can browse the "
"marketplace to find agents, or you can create a custom agent "
"for them based on their needs."
results = await library_db().list_library_agents(
user_id=user_id,
search_term=query or None,
page_size=50 if not query else 10,
)
else:
suggestions = [
"Try different keywords",
"Use find_agent to search the marketplace",
"Check your library at /library",
]
no_results_msg = (
for agent in results.agents:
agents.append(_library_agent_to_info(agent))
except NotFoundError:
pass
except DatabaseError as e:
logger.error(f"Error searching library: {e}", exc_info=True)
return ErrorResponse(
message="Failed to search library. Please try again.",
error=str(e),
session_id=session_id,
)
if not agents:
if not query:
return NoResultsResponse(
message=(
"Your library is empty. Let the user know they can browse the "
"marketplace to find agents, or you can create a custom agent "
"for them based on their needs."
),
suggestions=[
"Browse the marketplace to find and add agents",
"Use find_agent to search the marketplace",
],
session_id=session_id,
)
return NoResultsResponse(
message=(
f"No agents matching '{query}' found in your library. Let the "
"user know you can create a custom agent for them based on "
"their needs."
)
return NoResultsResponse(
message=no_results_msg, session_id=session_id, suggestions=suggestions
),
suggestions=[
"Try different keywords",
"Use find_agent to search the marketplace",
"Check your library at /library",
],
session_id=session_id,
)
if source == "marketplace":
title = (
f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} for '{query}'"
)
elif not query:
if not query:
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} in your library"
else:
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} in your library for '{query}'"
message = (
"Now you have found some options for the user to choose from. "
"You can add a link to a recommended agent at: /marketplace/agent/agent_id "
"Please ask the user if they would like to use any of these agents. "
"Let the user know we can create a custom agent for them based on their needs."
if source == "marketplace"
else "Found agents in the user's library. You can provide a link to view "
"an agent at: /library/agents/{agent_id}. Use agent_output to get "
"execution results, or run_agent to execute. Let the user know we can "
"create a custom agent for them based on their needs."
)
return AgentsFoundResponse(
message=message,
message=(
"Found agents in the user's library. You can provide a link to view "
"an agent at: /library/agents/{agent_id}. Use agent_output to get "
"execution results, or run_agent to execute. Let the user know we can "
"create a custom agent for them based on their needs."
),
title=title,
agents=agents,
count=len(agents),
@@ -189,9 +193,20 @@ async def search_agents(
)
def _is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_PATTERN.match(text.strip()))
def _marketplace_agent_to_info(agent: Any) -> AgentInfo:
"""Convert a marketplace agent (StoreAgent or StoreAgentDetails) to an AgentInfo."""
return AgentInfo(
id=f"{agent.creator}/{agent.slug}",
name=agent.agent_name,
description=agent.description or "",
source="marketplace",
in_library=False,
creator=agent.creator,
category="general",
rating=agent.rating,
runs=agent.runs,
is_featured=False,
)
def _library_agent_to_info(agent: LibraryAgent) -> AgentInfo:
@@ -214,6 +229,23 @@ def _library_agent_to_info(agent: LibraryAgent) -> AgentInfo:
)
async def _get_marketplace_agent_by_slug(creator: str, slug: str) -> AgentInfo | None:
"""Fetch a marketplace agent by creator/slug identifier."""
try:
details = await store_db().get_store_agent_details(creator, slug)
return _marketplace_agent_to_info(details)
except NotFoundError:
pass
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch marketplace agent {creator}/{slug}: {e}",
exc_info=True,
)
return None
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
"""Fetch a library agent by ID (library agent ID or graph_id).
@@ -226,10 +258,9 @@ async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | N
try:
agent = await lib_db.get_library_agent_by_graph_id(user_id, agent_id)
if agent:
logger.debug(f"Found library agent by graph_id: {agent.name}")
return _library_agent_to_info(agent)
except NotFoundError:
logger.debug(f"Library agent not found by graph_id: {agent_id}")
pass
except DatabaseError:
raise
except Exception as e:
@@ -241,10 +272,9 @@ async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | N
try:
agent = await lib_db.get_library_agent(agent_id, user_id)
if agent:
logger.debug(f"Found library agent by library_id: {agent.name}")
return _library_agent_to_info(agent)
except NotFoundError:
logger.debug(f"Library agent not found by library_id: {agent_id}")
pass
except DatabaseError:
raise
except Exception as e:

View File

@@ -0,0 +1,170 @@
"""Tests for agent search direct lookup functionality."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from .agent_search import search_agents
from .models import AgentsFoundResponse, NoResultsResponse
_TEST_USER_ID = "test-user-agent-search"
class TestMarketplaceSlugLookup:
"""Tests for creator/slug direct lookup in marketplace search."""
@pytest.mark.asyncio(loop_scope="session")
async def test_slug_lookup_found(self):
"""creator/slug query returns the agent directly."""
mock_details = MagicMock()
mock_details.creator = "testuser"
mock_details.slug = "my-agent"
mock_details.agent_name = "My Agent"
mock_details.description = "A test agent"
mock_details.rating = 4.5
mock_details.runs = 100
mock_store = MagicMock()
mock_store.get_store_agent_details = AsyncMock(return_value=mock_details)
with patch(
"backend.copilot.tools.agent_search.store_db",
return_value=mock_store,
):
response = await search_agents(
query="testuser/my-agent",
source="marketplace",
session_id="test-session",
)
assert isinstance(response, AgentsFoundResponse)
assert response.count == 1
assert response.agents[0].id == "testuser/my-agent"
assert response.agents[0].name == "My Agent"
@pytest.mark.asyncio(loop_scope="session")
async def test_slug_lookup_not_found_falls_back_to_search(self):
"""creator/slug not found falls back to general search."""
from backend.util.exceptions import NotFoundError
mock_store = MagicMock()
mock_store.get_store_agent_details = AsyncMock(side_effect=NotFoundError(""))
# Fallback search returns results
mock_search_results = MagicMock()
mock_agent = MagicMock()
mock_agent.creator = "other"
mock_agent.slug = "similar-agent"
mock_agent.agent_name = "Similar Agent"
mock_agent.description = "A similar agent"
mock_agent.rating = 3.0
mock_agent.runs = 50
mock_search_results.agents = [mock_agent]
mock_store.get_store_agents = AsyncMock(return_value=mock_search_results)
with patch(
"backend.copilot.tools.agent_search.store_db",
return_value=mock_store,
):
response = await search_agents(
query="testuser/my-agent",
source="marketplace",
session_id="test-session",
)
assert isinstance(response, AgentsFoundResponse)
assert response.count == 1
assert response.agents[0].id == "other/similar-agent"
@pytest.mark.asyncio(loop_scope="session")
async def test_slug_lookup_not_found_no_search_results(self):
"""creator/slug not found and search returns nothing."""
from backend.util.exceptions import NotFoundError
mock_store = MagicMock()
mock_store.get_store_agent_details = AsyncMock(side_effect=NotFoundError(""))
mock_search_results = MagicMock()
mock_search_results.agents = []
mock_store.get_store_agents = AsyncMock(return_value=mock_search_results)
with patch(
"backend.copilot.tools.agent_search.store_db",
return_value=mock_store,
):
response = await search_agents(
query="testuser/nonexistent",
source="marketplace",
session_id="test-session",
)
assert isinstance(response, NoResultsResponse)
@pytest.mark.asyncio(loop_scope="session")
async def test_non_slug_query_goes_to_search(self):
"""Regular keyword query skips slug lookup and goes to search."""
mock_store = MagicMock()
mock_search_results = MagicMock()
mock_agent = MagicMock()
mock_agent.creator = "creator1"
mock_agent.slug = "email-agent"
mock_agent.agent_name = "Email Agent"
mock_agent.description = "Sends emails"
mock_agent.rating = 4.0
mock_agent.runs = 200
mock_search_results.agents = [mock_agent]
mock_store.get_store_agents = AsyncMock(return_value=mock_search_results)
with patch(
"backend.copilot.tools.agent_search.store_db",
return_value=mock_store,
):
response = await search_agents(
query="email",
source="marketplace",
session_id="test-session",
)
assert isinstance(response, AgentsFoundResponse)
# get_store_agent_details should NOT have been called
mock_store.get_store_agent_details.assert_not_called()
class TestLibraryUUIDLookup:
"""Tests for UUID direct lookup in library search."""
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_found_by_graph_id(self):
"""UUID query matching a graph_id returns the agent directly."""
agent_id = "a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"
mock_agent = MagicMock()
mock_agent.id = "lib-agent-id"
mock_agent.name = "My Library Agent"
mock_agent.description = "A library agent"
mock_agent.creator_name = "testuser"
mock_agent.status.value = "HEALTHY"
mock_agent.can_access_graph = True
mock_agent.has_external_trigger = False
mock_agent.new_output = False
mock_agent.graph_id = agent_id
mock_agent.graph_version = 1
mock_agent.input_schema = {}
mock_agent.output_schema = {}
mock_lib_db = MagicMock()
mock_lib_db.get_library_agent_by_graph_id = AsyncMock(return_value=mock_agent)
with patch(
"backend.copilot.tools.agent_search.library_db",
return_value=mock_lib_db,
):
response = await search_agents(
query=agent_id,
source="library",
session_id="test-session",
user_id=_TEST_USER_ID,
)
assert isinstance(response, AgentsFoundResponse)
assert response.count == 1
assert response.agents[0].name == "My Library Agent"

View File

@@ -19,7 +19,8 @@ class FindAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Discover agents from the marketplace based on capabilities and user needs."
"Discover agents from the marketplace based on capabilities and "
"user needs, or look up a specific agent by its creator/slug ID."
)
@property
@@ -29,7 +30,7 @@ class FindAgentTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": "Search query describing what the user wants to accomplish. Use single keywords for best results.",
"description": "Search query describing what the user wants to accomplish, or a creator/slug ID (e.g. 'username/agent-name') for direct lookup. Use single keywords for best results.",
},
},
"required": ["query"],

View File

@@ -15,6 +15,7 @@ from .models import (
ErrorResponse,
NoResultsResponse,
)
from .utils import is_uuid
logger = logging.getLogger(__name__)
@@ -52,7 +53,8 @@ class FindBlockTool(BaseTool):
@property
def description(self) -> str:
return (
"Search for available blocks by name or description. "
"Search for available blocks by name or description, or look up a "
"specific block by its ID. "
"Blocks are reusable components that perform specific tasks like "
"sending emails, making API calls, processing text, etc. "
"IMPORTANT: Use this tool FIRST to get the block's 'id' before calling run_block. "
@@ -68,7 +70,8 @@ class FindBlockTool(BaseTool):
"query": {
"type": "string",
"description": (
"Search query to find blocks by name or description. "
"Search query to find blocks by name or description, "
"or a block ID (UUID) for direct lookup. "
"Use keywords like 'email', 'http', 'text', 'ai', etc."
),
},
@@ -113,11 +116,77 @@ class FindBlockTool(BaseTool):
if not query:
return ErrorResponse(
message="Please provide a search query",
message="Please provide a search query or block ID",
session_id=session_id,
)
try:
# Direct ID lookup if query looks like a UUID
if is_uuid(query):
block = get_block(query.lower())
if block:
if block.disabled:
return NoResultsResponse(
message=f"Block '{block.name}' (ID: {query}) is disabled and cannot be used.",
suggestions=["Search for an alternative block by name"],
session_id=session_id,
)
if (
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
):
if block.block_type == BlockType.MCP_TOOL:
return NoResultsResponse(
message=(
f"Block '{block.name}' (ID: {block.id}) is not "
"runnable through find_block/run_block. Use "
"run_mcp_tool instead."
),
suggestions=[
"Use run_mcp_tool to discover and run this MCP tool",
"Search for an alternative block by name",
],
session_id=session_id,
)
return NoResultsResponse(
message=(
f"Block '{block.name}' (ID: {block.id}) is not available "
"in CoPilot. It can only be used within agent graphs."
),
suggestions=[
"Search for an alternative block by name",
"Use this block in an agent graph instead",
],
session_id=session_id,
)
summary = BlockInfoSummary(
id=block.id,
name=block.name,
description=(
block.optimized_description or block.description or ""
),
categories=[c.value for c in block.categories],
)
if include_schemas:
info = block.get_info()
summary.input_schema = info.inputSchema
summary.output_schema = info.outputSchema
summary.static_output = info.staticOutput
return BlockListResponse(
message=(
f"Found block '{block.name}' by ID. "
"To see inputs/outputs and execute it, use "
"run_block with the block's 'id' - providing "
"no inputs."
),
blocks=[summary],
count=1,
query=query,
session_id=session_id,
)
# Search for blocks using hybrid search
results, total = await search().unified_hybrid_search(
query=query,

View File

@@ -499,3 +499,123 @@ class TestFindBlockFiltering:
assert response.blocks[0].input_schema == input_schema
assert response.blocks[0].output_schema == output_schema
assert response.blocks[0].static_output is True
class TestFindBlockDirectLookup:
"""Tests for direct UUID lookup in FindBlockTool."""
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_found(self):
"""UUID query returns the block directly without search."""
session = make_session(user_id=_TEST_USER_ID)
block_id = "a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"
block = make_mock_block(block_id, "Test Block", BlockType.STANDARD)
with patch(
"backend.copilot.tools.find_block.get_block",
return_value=block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query=block_id
)
assert isinstance(response, BlockListResponse)
assert response.count == 1
assert response.blocks[0].id == block_id
assert response.blocks[0].name == "Test Block"
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_not_found_falls_through(self):
"""UUID that doesn't match any block falls through to search."""
session = make_session(user_id=_TEST_USER_ID)
block_id = "a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"
mock_search_db = MagicMock()
mock_search_db.unified_hybrid_search = AsyncMock(return_value=([], 0))
with (
patch(
"backend.copilot.tools.find_block.get_block",
return_value=None,
),
patch(
"backend.copilot.tools.find_block.search",
return_value=mock_search_db,
),
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query=block_id
)
from .models import NoResultsResponse
assert isinstance(response, NoResultsResponse)
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_disabled_block(self):
"""UUID matching a disabled block returns NoResultsResponse."""
session = make_session(user_id=_TEST_USER_ID)
block_id = "a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"
block = make_mock_block(
block_id, "Disabled Block", BlockType.STANDARD, disabled=True
)
with patch(
"backend.copilot.tools.find_block.get_block",
return_value=block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query=block_id
)
from .models import NoResultsResponse
assert isinstance(response, NoResultsResponse)
assert "disabled" in response.message.lower()
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_excluded_block_type(self):
"""UUID matching an excluded block type returns NoResultsResponse."""
session = make_session(user_id=_TEST_USER_ID)
block_id = "a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"
block = make_mock_block(block_id, "Input Block", BlockType.INPUT)
with patch(
"backend.copilot.tools.find_block.get_block",
return_value=block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query=block_id
)
from .models import NoResultsResponse
assert isinstance(response, NoResultsResponse)
assert "not available" in response.message.lower()
@pytest.mark.asyncio(loop_scope="session")
async def test_uuid_lookup_excluded_block_id(self):
"""UUID matching an excluded block ID returns NoResultsResponse."""
session = make_session(user_id=_TEST_USER_ID)
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
block = make_mock_block(
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
)
with patch(
"backend.copilot.tools.find_block.get_block",
return_value=block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query=smart_decision_id
)
from .models import NoResultsResponse
assert isinstance(response, NoResultsResponse)
assert "not available" in response.message.lower()

View File

@@ -1,6 +1,7 @@
"""Shared utilities for chat tools."""
import logging
import re
from typing import Any
from backend.api.features.library import model as library_model
@@ -19,6 +20,26 @@ from backend.util.exceptions import NotFoundError
logger = logging.getLogger(__name__)
# Shared UUID v4 pattern used by multiple tools for direct ID lookups.
_UUID_V4_PATTERN = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
re.IGNORECASE,
)
def is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_V4_PATTERN.match(text.strip()))
# Matches "creator/slug" identifiers used in the marketplace
_CREATOR_SLUG_PATTERN = re.compile(r"^[\w-]+/[\w-]+$")
def is_creator_slug(text: str) -> bool:
"""Check if text matches a 'creator/slug' marketplace identifier."""
return bool(_CREATOR_SLUG_PATTERN.match(text.strip()))
async def fetch_graph_from_store_slug(
username: str,