Compare commits

..

3 Commits

Author SHA1 Message Date
Otto
54821bb872 refactor: address review feedback
- Simplify list-all: normalize keywords to empty string early, remove list_all var
- Move helpers after usage (_is_uuid, _get_library_agent_by_id, _library_agent_to_info)
- Extract _library_agent_to_info to deduplicate AgentInfo construction
- Add NotFoundError catch in graph_id lookup path
- Differentiate suggestions for empty library vs no search results
- Make session_id optional, query not required
- Update tool description: 'Search for or list'
2026-02-18 11:56:27 +00:00
Otto
a286b1d06e refactor: address review feedback
- Move helpers after search_agents function
- Simplify list_all logic by setting query='' early
- Update find_library_agent description to 'Search for or list'
2026-02-17 18:09:40 +00:00
Otto
2b0654b9e5 fix(copilot): handle 'all' keyword in find_library_agent tool
When users ask CoPilot to 'show all my agents', the LLM was passing
the literal string 'all' as a search query, which matched no agents.

Changes:
- Make query parameter optional in FindLibraryAgentTool
- Add _LIST_ALL_KEYWORDS set for special keywords ('all', '*', 'everything', 'any', '')
- When query matches a list-all keyword, pass None to list_library_agents
- Update response messages to reflect 'list all' vs 'search' behavior

Fixes SECRT-2002
2026-02-17 18:09:09 +00:00
19 changed files with 261 additions and 348 deletions

View File

@@ -164,23 +164,21 @@ class CoPilotExecutor(AppProcess):
self._cancel_thread, self.cancel_client, "[cleanup][cancel]"
)
# Clean up worker threads (closes per-loop workspace storage sessions)
# Shutdown executor
if self._executor:
from .processor import cleanup_worker
logger.info(f"[cleanup {pid}] Cleaning up workers...")
futures = []
for _ in range(self._executor._max_workers):
futures.append(self._executor.submit(cleanup_worker))
for f in futures:
try:
f.result(timeout=10)
except Exception as e:
logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}")
logger.info(f"[cleanup {pid}] Shutting down executor...")
self._executor.shutdown(wait=False)
# Close async resources (workspace storage aiohttp session, etc.)
try:
from backend.util.workspace_storage import shutdown_workspace_storage
loop = asyncio.new_event_loop()
loop.run_until_complete(shutdown_workspace_storage())
loop.close()
except Exception as e:
logger.warning(f"[cleanup {pid}] Error closing workspace storage: {e}")
# Release any remaining locks
for task_id, lock in list(self._task_locks.items()):
try:

View File

@@ -60,18 +60,6 @@ def init_worker():
_tls.processor.on_executor_start()
def cleanup_worker():
"""Clean up the processor for the current worker thread.
Should be called before the worker thread's event loop is destroyed so
that event-loop-bound resources (e.g. ``aiohttp.ClientSession``) are
closed on the correct loop.
"""
processor: CoPilotProcessor | None = getattr(_tls, "processor", None)
if processor is not None:
processor.cleanup()
# ============ Processor Class ============ #
@@ -110,28 +98,6 @@ class CoPilotProcessor:
logger.info(f"[CoPilotExecutor] Worker {self.tid} started")
def cleanup(self):
"""Clean up event-loop-bound resources before the loop is destroyed.
Shuts down the workspace storage instance that belongs to this
worker's event loop, ensuring ``aiohttp.ClientSession.close()``
runs on the same loop that created the session.
"""
from backend.util.workspace_storage import shutdown_workspace_storage
try:
future = asyncio.run_coroutine_threadsafe(
shutdown_workspace_storage(), self.execution_loop
)
future.result(timeout=5)
except Exception as e:
logger.warning(f"[CoPilotExecutor] Worker {self.tid} cleanup error: {e}")
# Stop the event loop
self.execution_loop.call_soon_threadsafe(self.execution_loop.stop)
self.execution_thread.join(timeout=5)
logger.info(f"[CoPilotExecutor] Worker {self.tid} cleaned up")
@error_logged(swallow=False)
def execute(
self,

View File

@@ -693,15 +693,11 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0.5)
raw_transcript = read_transcript_file(captured_transcript.path)
if raw_transcript:
try:
async with asyncio.timeout(30):
await _upload_transcript_bg(
user_id, session_id, raw_transcript
)
except asyncio.TimeoutError:
logger.warning(
f"[SDK] Transcript upload timed out for {session_id}"
)
task = asyncio.create_task(
_upload_transcript_bg(user_id, session_id, raw_transcript)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
else:
logger.debug("[SDK] Stop hook fired but transcript not usable")

View File

@@ -24,94 +24,24 @@ _UUID_PATTERN = re.compile(
re.IGNORECASE,
)
def _is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_PATTERN.match(text.strip()))
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
"""Fetch a library agent by ID (library agent ID or graph_id).
Tries multiple lookup strategies:
1. First by graph_id (AgentGraph primary key)
2. Then by library agent ID (LibraryAgent primary key)
Args:
user_id: The user ID
agent_id: The ID to look up (can be graph_id or library agent ID)
Returns:
AgentInfo if found, None otherwise
"""
lib_db = library_db()
try:
agent = await lib_db.get_library_agent_by_graph_id(user_id, agent_id)
if agent:
logger.debug(f"Found library agent by graph_id: {agent.name}")
return AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by graph_id {agent_id}: {e}",
exc_info=True,
)
try:
agent = await lib_db.get_library_agent(agent_id, user_id)
if agent:
logger.debug(f"Found library agent by library_id: {agent.name}")
return AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
except NotFoundError:
logger.debug(f"Library agent not found by library_id: {agent_id}")
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by library_id {agent_id}: {e}",
exc_info=True,
)
return None
# Keywords that should be treated as "list all" rather than a literal search
_LIST_ALL_KEYWORDS = frozenset({"all", "*", "everything", "any", ""})
async def search_agents(
query: str,
source: SearchSource,
session_id: str | None,
session_id: str | None = None,
user_id: str | None = None,
) -> ToolResponseBase:
"""
Search for agents in marketplace or user library.
For library searches, keywords like "all", "*", "everything", or an empty
query will list all agents without filtering.
Args:
query: Search query string
query: Search query string. Special keywords list all library agents.
source: "marketplace" or "library"
session_id: Chat session ID
user_id: User ID (required for library search)
@@ -119,7 +49,11 @@ async def search_agents(
Returns:
AgentsFoundResponse, NoResultsResponse, or ErrorResponse
"""
if not query:
# Normalize list-all keywords to empty string for library searches
if source == "library" and query.lower().strip() in _LIST_ALL_KEYWORDS:
query = ""
if source == "marketplace" and not query:
return ErrorResponse(
message="Please provide a search query", session_id=session_id
)
@@ -159,27 +93,19 @@ async def search_agents(
logger.info(f"Found agent by direct ID lookup: {agent.name}")
if not agents:
logger.info(f"Searching user library for: {query}")
search_term = query or None
logger.info(
f"{'Listing all agents in' if not query else 'Searching'} "
f"user library{'' if not query else f' for: {query}'}"
)
results = await library_db().list_library_agents(
user_id=user_id, # type: ignore[arg-type]
search_term=query,
search_term=search_term,
page_size=10,
)
for agent in results.agents:
agents.append(
AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
_library_agent_to_info(agent)
)
logger.info(f"Found {len(agents)} agents in {source}")
except NotFoundError:
@@ -193,42 +119,60 @@ async def search_agents(
)
if not agents:
suggestions = (
[
if source == "marketplace":
suggestions = [
"Try more general terms",
"Browse categories in the marketplace",
"Check spelling",
]
if source == "marketplace"
else [
no_results_msg = (
f"No agents found matching '{query}'. Let the user know they can "
"try different keywords or browse the marketplace. Also let them "
"know you can create a custom agent for them based on their needs."
)
elif not query:
# User asked to list all but library is empty
suggestions = [
"Browse the marketplace to find and add agents",
"Use find_agent to search the marketplace",
]
no_results_msg = (
"Your library is empty. Let the user know they can browse the "
"marketplace to find agents, or you can create a custom agent "
"for them based on their needs."
)
else:
suggestions = [
"Try different keywords",
"Use find_agent to search the marketplace",
"Check your library at /library",
]
)
no_results_msg = (
f"No agents found matching '{query}'. Let the user know they can try different keywords or browse the marketplace. Also let them know you can create a custom agent for them based on their needs."
if source == "marketplace"
else f"No agents matching '{query}' found in your library. Let the user know you can create a custom agent for them based on their needs."
)
no_results_msg = (
f"No agents matching '{query}' found in your library. Let the "
"user know you can create a custom agent for them based on "
"their needs."
)
return NoResultsResponse(
message=no_results_msg, session_id=session_id, suggestions=suggestions
)
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} "
title += (
f"for '{query}'"
if source == "marketplace"
else f"in your library for '{query}'"
)
if source == "marketplace":
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} for '{query}'"
elif not query:
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} in your library"
else:
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} in your library for '{query}'"
message = (
"Now you have found some options for the user to choose from. "
"You can add a link to a recommended agent at: /marketplace/agent/agent_id "
"Please ask the user if they would like to use any of these agents. Let the user know we can create a custom agent for them based on their needs."
"Please ask the user if they would like to use any of these agents. "
"Let the user know we can create a custom agent for them based on their needs."
if source == "marketplace"
else "Found agents in the user's library. You can provide a link to view an agent at: "
"/library/agents/{agent_id}. Use agent_output to get execution results, or run_agent to execute. Let the user know we can create a custom agent for them based on their needs."
else "Found agents in the user's library. You can provide a link to view "
"an agent at: /library/agents/{agent_id}. Use agent_output to get "
"execution results, or run_agent to execute. Let the user know we can "
"create a custom agent for them based on their needs."
)
return AgentsFoundResponse(
@@ -238,3 +182,67 @@ async def search_agents(
count=len(agents),
session_id=session_id,
)
def _is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_PATTERN.match(text.strip()))
def _library_agent_to_info(agent) -> AgentInfo:
"""Convert a library agent model to an AgentInfo."""
return AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
"""Fetch a library agent by ID (library agent ID or graph_id).
Tries multiple lookup strategies:
1. First by graph_id (AgentGraph primary key)
2. Then by library agent ID (LibraryAgent primary key)
"""
lib_db = library_db()
try:
agent = await lib_db.get_library_agent_by_graph_id(user_id, agent_id)
if agent:
logger.debug(f"Found library agent by graph_id: {agent.name}")
return _library_agent_to_info(agent)
except NotFoundError:
logger.debug(f"Library agent not found by graph_id: {agent_id}")
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by graph_id {agent_id}: {e}",
exc_info=True,
)
try:
agent = await lib_db.get_library_agent(agent_id, user_id)
if agent:
logger.debug(f"Found library agent by library_id: {agent.name}")
return _library_agent_to_info(agent)
except NotFoundError:
logger.debug(f"Library agent not found by library_id: {agent_id}")
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by library_id {agent_id}: {e}",
exc_info=True,
)
return None

View File

@@ -33,6 +33,7 @@ query SearchFeatureRequests($term: String!, $filter: IssueFilter, $first: Int) {
id
identifier
title
description
}
}
}
@@ -204,6 +205,7 @@ class SearchFeatureRequestsTool(BaseTool):
id=node["id"],
identifier=node["identifier"],
title=node["title"],
description=node.get("description"),
)
for node in nodes
]
@@ -237,11 +239,7 @@ class CreateFeatureRequestTool(BaseTool):
"Create a new feature request or add a customer need to an existing one. "
"Always search first with search_feature_requests to avoid duplicates. "
"If a matching request exists, pass its ID as existing_issue_id to add "
"the user's need to it instead of creating a duplicate. "
"IMPORTANT: Never include personally identifiable information (PII) in "
"the title or description — no names, emails, phone numbers, company "
"names, or other identifying details. Write titles and descriptions in "
"generic, feature-focused language."
"the user's need to it instead of creating a duplicate."
)
@property
@@ -251,20 +249,11 @@ class CreateFeatureRequestTool(BaseTool):
"properties": {
"title": {
"type": "string",
"description": (
"Title for the feature request. Must be generic and "
"feature-focused — do not include any user names, emails, "
"company names, or other PII."
),
"description": "Title for the feature request.",
},
"description": {
"type": "string",
"description": (
"Detailed description of what the user wants and why. "
"Must not contain any personally identifiable information "
"(PII) — describe the feature need generically without "
"referencing specific users, companies, or contact details."
),
"description": "Detailed description of what the user wants and why.",
},
"existing_issue_id": {
"type": "string",

View File

@@ -117,11 +117,13 @@ class TestSearchFeatureRequestsTool:
"id": "id-1",
"identifier": "FR-1",
"title": "Dark mode",
"description": "Add dark mode support",
},
{
"id": "id-2",
"identifier": "FR-2",
"title": "Dark theme",
"description": None,
},
]
patcher, _ = _mock_linear_config(query_return=_search_response(nodes))

View File

@@ -19,9 +19,10 @@ class FindLibraryAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Search for agents in the user's library. Use this to find agents "
"the user has already added to their library, including agents they "
"created or added from the marketplace."
"Search for or list agents in the user's library. Use this to find "
"agents the user has already added to their library, including agents "
"they created or added from the marketplace. "
"Omit the query to list all agents."
)
@property
@@ -31,10 +32,13 @@ class FindLibraryAgentTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": "Search query to find agents by name or description.",
"description": (
"Search query to find agents by name or description. "
"Omit to list all agents in the library."
),
},
},
"required": ["query"],
"required": [],
}
@property

View File

@@ -486,6 +486,7 @@ class FeatureRequestInfo(BaseModel):
id: str
identifier: str
title: str
description: str | None = None
class FeatureRequestSearchResponse(ToolResponseBase):

View File

@@ -93,15 +93,7 @@ from backend.data.user import (
get_user_notification_preference,
update_user_integrations,
)
from backend.data.workspace import (
count_workspace_files,
create_workspace_file,
get_or_create_workspace,
get_workspace_file,
get_workspace_file_by_path,
list_workspace_files,
soft_delete_workspace_file,
)
from backend.data.workspace import get_or_create_workspace
from backend.util.service import (
AppService,
AppServiceClient,
@@ -282,13 +274,7 @@ class DatabaseManager(AppService):
get_user_execution_summary_data = _(get_user_execution_summary_data)
# ============ Workspace ============ #
count_workspace_files = _(count_workspace_files)
create_workspace_file = _(create_workspace_file)
get_or_create_workspace = _(get_or_create_workspace)
get_workspace_file = _(get_workspace_file)
get_workspace_file_by_path = _(get_workspace_file_by_path)
list_workspace_files = _(list_workspace_files)
soft_delete_workspace_file = _(soft_delete_workspace_file)
# ============ Understanding ============ #
get_business_understanding = _(get_business_understanding)
@@ -452,13 +438,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_user_execution_summary_data = d.get_user_execution_summary_data
# ============ Workspace ============ #
count_workspace_files = d.count_workspace_files
create_workspace_file = d.create_workspace_file
get_or_create_workspace = d.get_or_create_workspace
get_workspace_file = d.get_workspace_file
get_workspace_file_by_path = d.get_workspace_file_by_path
list_workspace_files = d.list_workspace_files
soft_delete_workspace_file = d.soft_delete_workspace_file
# ============ Understanding ============ #
get_business_understanding = d.get_business_understanding

View File

@@ -164,23 +164,21 @@ async def create_workspace_file(
async def get_workspace_file(
file_id: str,
workspace_id: str,
workspace_id: Optional[str] = None,
) -> Optional[WorkspaceFile]:
"""
Get a workspace file by ID.
Args:
file_id: The file ID
workspace_id: Workspace ID for scoping (required)
workspace_id: Optional workspace ID for validation
Returns:
WorkspaceFile instance or None
"""
where_clause: UserWorkspaceFileWhereInput = {
"id": file_id,
"isDeleted": False,
"workspaceId": workspace_id,
}
where_clause: dict = {"id": file_id, "isDeleted": False}
if workspace_id:
where_clause["workspaceId"] = workspace_id
file = await UserWorkspaceFile.prisma().find_first(where=where_clause)
return WorkspaceFile.from_db(file) if file else None
@@ -270,7 +268,7 @@ async def count_workspace_files(
Returns:
Number of files
"""
where_clause: UserWorkspaceFileWhereInput = {"workspaceId": workspace_id}
where_clause: dict = {"workspaceId": workspace_id}
if not include_deleted:
where_clause["isDeleted"] = False
@@ -285,7 +283,7 @@ async def count_workspace_files(
async def soft_delete_workspace_file(
file_id: str,
workspace_id: str,
workspace_id: Optional[str] = None,
) -> Optional[WorkspaceFile]:
"""
Soft-delete a workspace file.
@@ -295,7 +293,7 @@ async def soft_delete_workspace_file(
Args:
file_id: The file ID
workspace_id: Workspace ID for scoping (required)
workspace_id: Optional workspace ID for validation
Returns:
Updated WorkspaceFile instance or None if not found

View File

@@ -28,7 +28,7 @@ from typing import (
import httpx
import uvicorn
from fastapi import FastAPI, Request, responses
from prisma.errors import DataError, UniqueViolationError
from prisma.errors import DataError
from pydantic import BaseModel, TypeAdapter, create_model
import backend.util.exceptions as exceptions
@@ -201,7 +201,6 @@ EXCEPTION_MAPPING = {
UnhealthyServiceError,
HTTPClientError,
HTTPServerError,
UniqueViolationError,
*[
ErrorType
for _, ErrorType in inspect.getmembers(exceptions)
@@ -417,9 +416,6 @@ class AppService(BaseAppService, ABC):
self.fastapi_app.add_exception_handler(
DataError, self._handle_internal_http_error(400)
)
self.fastapi_app.add_exception_handler(
UniqueViolationError, self._handle_internal_http_error(400)
)
self.fastapi_app.add_exception_handler(
Exception, self._handle_internal_http_error(500)
)
@@ -482,7 +478,6 @@ def get_service_client(
# Don't retry these specific exceptions that won't be fixed by retrying
ValueError, # Invalid input/parameters
DataError, # Prisma data integrity errors (foreign key, unique constraints)
UniqueViolationError, # Unique constraint violations
KeyError, # Missing required data
TypeError, # Wrong data types
AttributeError, # Missing attributes

View File

@@ -12,8 +12,15 @@ from typing import Optional
from prisma.errors import UniqueViolationError
from backend.data.db_accessors import workspace_db
from backend.data.workspace import WorkspaceFile
from backend.data.workspace import (
WorkspaceFile,
count_workspace_files,
create_workspace_file,
get_workspace_file,
get_workspace_file_by_path,
list_workspace_files,
soft_delete_workspace_file,
)
from backend.util.settings import Config
from backend.util.virus_scanner import scan_content_safe
from backend.util.workspace_storage import compute_file_checksum, get_workspace_storage
@@ -118,9 +125,8 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
db = workspace_db()
resolved_path = self._resolve_path(path)
file = await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
file = await get_workspace_file_by_path(self.workspace_id, resolved_path)
if file is None:
raise FileNotFoundError(f"File not found at path: {resolved_path}")
@@ -140,8 +146,7 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
file = await get_workspace_file(file_id, self.workspace_id)
if file is None:
raise FileNotFoundError(f"File not found: {file_id}")
@@ -199,10 +204,8 @@ class WorkspaceManager:
# For overwrite=True, we let the write proceed and handle via UniqueViolationError
# This ensures the new file is written to storage BEFORE the old one is deleted,
# preventing data loss if the new write fails
db = workspace_db()
if not overwrite:
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
existing = await get_workspace_file_by_path(self.workspace_id, path)
if existing is not None:
raise ValueError(f"File already exists at path: {path}")
@@ -229,7 +232,7 @@ class WorkspaceManager:
# Create database record - handle race condition where another request
# created a file at the same path between our check and create
try:
file = await db.create_workspace_file(
file = await create_workspace_file(
workspace_id=self.workspace_id,
file_id=file_id,
name=filename,
@@ -243,12 +246,12 @@ class WorkspaceManager:
# Race condition: another request created a file at this path
if overwrite:
# Re-fetch and delete the conflicting file, then retry
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
existing = await get_workspace_file_by_path(self.workspace_id, path)
if existing:
await self.delete_file(existing.id)
# Retry the create - if this also fails, clean up storage file
try:
file = await db.create_workspace_file(
file = await create_workspace_file(
workspace_id=self.workspace_id,
file_id=file_id,
name=filename,
@@ -311,9 +314,8 @@ class WorkspaceManager:
List of WorkspaceFile instances
"""
effective_path = self._get_effective_path(path, include_all_sessions)
db = workspace_db()
return await db.list_workspace_files(
return await list_workspace_files(
workspace_id=self.workspace_id,
path_prefix=effective_path,
limit=limit,
@@ -330,8 +332,7 @@ class WorkspaceManager:
Returns:
True if deleted, False if not found
"""
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
file = await get_workspace_file(file_id, self.workspace_id)
if file is None:
return False
@@ -344,7 +345,7 @@ class WorkspaceManager:
# Continue with database soft-delete even if storage delete fails
# Soft-delete database record
result = await db.soft_delete_workspace_file(file_id, self.workspace_id)
result = await soft_delete_workspace_file(file_id, self.workspace_id)
return result is not None
async def get_download_url(self, file_id: str, expires_in: int = 3600) -> str:
@@ -361,8 +362,7 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
file = await get_workspace_file(file_id, self.workspace_id)
if file is None:
raise FileNotFoundError(f"File not found: {file_id}")
@@ -379,8 +379,7 @@ class WorkspaceManager:
Returns:
WorkspaceFile instance or None
"""
db = workspace_db()
return await db.get_workspace_file(file_id, self.workspace_id)
return await get_workspace_file(file_id, self.workspace_id)
async def get_file_info_by_path(self, path: str) -> Optional[WorkspaceFile]:
"""
@@ -395,9 +394,8 @@ class WorkspaceManager:
Returns:
WorkspaceFile instance or None
"""
db = workspace_db()
resolved_path = self._resolve_path(path)
return await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
return await get_workspace_file_by_path(self.workspace_id, resolved_path)
async def get_file_count(
self,
@@ -419,8 +417,7 @@ class WorkspaceManager:
Number of files
"""
effective_path = self._get_effective_path(path, include_all_sessions)
db = workspace_db()
return await db.count_workspace_files(
return await count_workspace_files(
self.workspace_id, path_prefix=effective_path
)

View File

@@ -93,14 +93,7 @@ class WorkspaceStorageBackend(ABC):
class GCSWorkspaceStorage(WorkspaceStorageBackend):
"""Google Cloud Storage implementation for workspace storage.
Each instance owns a single ``aiohttp.ClientSession`` and GCS async
client. Because ``ClientSession`` is bound to the event loop on which it
was created, callers that run on separate loops (e.g. copilot executor
worker threads) **must** obtain their own ``GCSWorkspaceStorage`` instance
via :func:`get_workspace_storage` which is event-loop-aware.
"""
"""Google Cloud Storage implementation for workspace storage."""
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
@@ -344,73 +337,60 @@ class LocalWorkspaceStorage(WorkspaceStorageBackend):
raise ValueError(f"Invalid storage path format: {storage_path}")
# ---------------------------------------------------------------------------
# Storage instance management
# ---------------------------------------------------------------------------
# ``aiohttp.ClientSession`` is bound to the event loop where it is created.
# The copilot executor runs each worker in its own thread with a dedicated
# event loop, so a single global ``GCSWorkspaceStorage`` instance would break.
#
# For **local storage** a single shared instance is fine (no async I/O).
# For **GCS storage** we keep one instance *per event loop* so every loop
# gets its own ``ClientSession``.
# ---------------------------------------------------------------------------
_local_storage: Optional[LocalWorkspaceStorage] = None
_gcs_storages: dict[int, GCSWorkspaceStorage] = {}
# Global storage backend instance
_workspace_storage: Optional[WorkspaceStorageBackend] = None
_storage_lock = asyncio.Lock()
async def get_workspace_storage() -> WorkspaceStorageBackend:
"""Return a workspace storage backend for the **current** event loop.
* Local storage → single shared instance (no event-loop affinity).
* GCS storage → one instance per event loop to avoid cross-loop
``aiohttp`` errors.
"""
global _local_storage
Get the workspace storage backend instance.
config = Config()
Uses GCS if media_gcs_bucket_name is configured, otherwise uses local storage.
"""
global _workspace_storage
# --- Local storage (shared) ---
if not config.media_gcs_bucket_name:
if _local_storage is None:
storage_dir = (
config.workspace_storage_dir if config.workspace_storage_dir else None
)
logger.info(f"Using local workspace storage: {storage_dir or 'default'}")
_local_storage = LocalWorkspaceStorage(storage_dir)
return _local_storage
if _workspace_storage is None:
async with _storage_lock:
if _workspace_storage is None:
config = Config()
# --- GCS storage (per event loop) ---
loop_id = id(asyncio.get_running_loop())
if loop_id not in _gcs_storages:
logger.info(
f"Creating GCS workspace storage for loop {loop_id}: "
f"{config.media_gcs_bucket_name}"
)
_gcs_storages[loop_id] = GCSWorkspaceStorage(config.media_gcs_bucket_name)
return _gcs_storages[loop_id]
if config.media_gcs_bucket_name:
logger.info(
f"Using GCS workspace storage: {config.media_gcs_bucket_name}"
)
_workspace_storage = GCSWorkspaceStorage(
config.media_gcs_bucket_name
)
else:
storage_dir = (
config.workspace_storage_dir
if config.workspace_storage_dir
else None
)
logger.info(
f"Using local workspace storage: {storage_dir or 'default'}"
)
_workspace_storage = LocalWorkspaceStorage(storage_dir)
return _workspace_storage
async def shutdown_workspace_storage() -> None:
"""Shut down workspace storage for the **current** event loop.
Closes the ``aiohttp`` session owned by the current loop's GCS instance.
Each worker thread should call this on its own loop before the loop is
destroyed. The REST API lifespan hook calls it for the main server loop.
"""
global _local_storage
Properly shutdown the global workspace storage backend.
loop_id = id(asyncio.get_running_loop())
storage = _gcs_storages.pop(loop_id, None)
if storage is not None:
await storage.close()
Closes aiohttp sessions and other resources for GCS backend.
Should be called during application shutdown.
"""
global _workspace_storage
# Clear local storage only when the last GCS instance is gone
# (i.e. full shutdown, not just a single worker stopping).
if not _gcs_storages:
_local_storage = None
if _workspace_storage is not None:
async with _storage_lock:
if _workspace_storage is not None:
if isinstance(_workspace_storage, GCSWorkspaceStorage):
await _workspace_storage.close()
_workspace_storage = None
def compute_file_checksum(content: bytes) -> str:

View File

@@ -69,11 +69,12 @@ test.describe("Marketplace Creator Page Basic Functionality", () => {
await marketplacePage.getFirstCreatorProfile(page);
await firstCreatorProfile.click();
await page.waitForURL("**/marketplace/creator/**");
await page.waitForLoadState("networkidle").catch(() => {});
const firstAgent = page
.locator('[data-testid="store-card"]:visible')
.first();
await firstAgent.waitFor({ state: "visible", timeout: 15000 });
await firstAgent.waitFor({ state: "visible", timeout: 30000 });
await firstAgent.click();
await page.waitForURL("**/marketplace/agent/**");

View File

@@ -115,11 +115,18 @@ test.describe("Marketplace Basic Functionality", () => {
const searchTerm = page.getByText("DummyInput").first();
await isVisible(searchTerm);
await expect
.poll(() => marketplacePage.getSearchResultsCount(page), {
timeout: 15000,
})
.toBeGreaterThan(0);
await page.waitForLoadState("networkidle").catch(() => {});
await page
.waitForFunction(
() =>
document.querySelectorAll('[data-testid="store-card"]').length > 0,
{ timeout: 15000 },
)
.catch(() => console.log("No search results appeared within timeout"));
const results = await marketplacePage.getSearchResultsCount(page);
expect(results).toBeGreaterThan(0);
console.log("Complete search flow works correctly test passed ✅");
});
@@ -128,9 +135,7 @@ test.describe("Marketplace Basic Functionality", () => {
});
test.describe("Marketplace Edge Cases", () => {
test("Search for non-existent item renders search page correctly", async ({
page,
}) => {
test("Search for non-existent item shows no results", async ({ page }) => {
const marketplacePage = new MarketplacePage(page);
await marketplacePage.goto(page);
@@ -146,18 +151,9 @@ test.describe("Marketplace Edge Cases", () => {
const searchTerm = page.getByText("xyznonexistentitemxyz123");
await isVisible(searchTerm);
// The search page should render either results or a "No results found" message
await page.waitForLoadState("networkidle").catch(() => {});
const hasResults =
(await page.locator('[data-testid="store-card"]').count()) > 0;
const hasNoResultsMsg = await page
.getByText("No results found")
.isVisible()
.catch(() => false);
expect(hasResults || hasNoResultsMsg).toBe(true);
const results = await marketplacePage.getSearchResultsCount(page);
expect(results).toBe(0);
console.log(
"Search for non-existent item renders search page correctly test passed ✅",
);
console.log("Search for non-existent item shows no results test passed ✅");
});
});

View File

@@ -125,8 +125,16 @@ export class BuildPage extends BasePage {
`[data-id="block-card-${blockCardId}"]`,
);
await blockCard.waitFor({ state: "visible", timeout: 10000 });
await blockCard.click();
try {
// Wait for the block card to be visible with a reasonable timeout
await blockCard.waitFor({ state: "visible", timeout: 10000 });
await blockCard.click();
} catch (error) {
console.log(
`Block ${block.name} (display: ${displayName}) returned from the API but not found in block list`,
);
console.log(`Error: ${error}`);
}
}
async hasBlock(_block: Block) {

View File

@@ -65,7 +65,7 @@ export class LoginPage {
await this.page.waitForLoadState("load", { timeout: 10_000 });
console.log("➡️ Navigating to /marketplace ...");
await this.page.goto("/marketplace", { timeout: 20_000 });
await this.page.goto("/marketplace", { timeout: 10_000 });
console.log("✅ Login process complete");
// If Wallet popover auto-opens, close it to avoid blocking account menu interactions

View File

@@ -9,12 +9,7 @@ export class MarketplacePage extends BasePage {
async goto(page: Page) {
await page.goto("/marketplace");
await page
.locator(
'[data-testid="store-card"], [data-testid="featured-store-card"]',
)
.first()
.waitFor({ state: "visible", timeout: 20000 });
await page.waitForLoadState("networkidle").catch(() => {});
}
async getMarketplaceTitle(page: Page) {
@@ -116,7 +111,7 @@ export class MarketplacePage extends BasePage {
async getFirstFeaturedAgent(page: Page) {
const { getId } = getSelectors(page);
const card = getId("featured-store-card").first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}
@@ -124,14 +119,14 @@ export class MarketplacePage extends BasePage {
const card = this.page
.locator('[data-testid="store-card"]:visible')
.first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}
async getFirstCreatorProfile(page: Page) {
const { getId } = getSelectors(page);
const card = getId("creator-card").first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}

View File

@@ -45,9 +45,8 @@ export async function isEnabled(el: Locator) {
}
export async function hasMinCount(el: Locator, minCount: number) {
await expect
.poll(async () => await el.count(), { timeout: 10000 })
.toBeGreaterThanOrEqual(minCount);
const count = await el.count();
expect(count).toBeGreaterThanOrEqual(minCount);
}
export async function matchesUrl(page: Page, pattern: RegExp) {