Compare commits

..

5 Commits

Author SHA1 Message Date
Ubbe
efe2f18719 Merge branch 'dev' into fix/spinner-height-copilot 2026-02-18 20:44:42 +08:00
Lluis Agusti
23eda10cb1 chore: format 2026-02-18 20:34:48 +08:00
Lluis Agusti
7bfc5140b0 fix: make spinner centred when loading 2026-02-18 20:29:27 +08:00
Otto
15bcdae4e8 fix(backend/copilot): Clean up GCSWorkspaceStorage per worker (#12153)
The copilot executor runs each worker in its own thread with a dedicated
event loop (`asyncio.new_event_loop()`). `aiohttp.ClientSession` is
bound to the event loop where it was created — using it from a different
loop causes `asyncio.timeout()` to fail with:

```
RuntimeError: Timeout context manager should be used inside a task
```

This was the root cause of transcript upload failures tracked in
SECRT-2009 and [Sentry
#7272473694](https://significant-gravitas.sentry.io/issues/7272473694/).

### Fix

**One `GCSWorkspaceStorage` instance per event loop** instead of a
single shared global.

- `get_workspace_storage()` now returns a per-loop GCS instance (keyed
by `id(asyncio.get_running_loop())`). Local storage remains shared since
it has no async I/O.
- `shutdown_workspace_storage()` closes the instance for the **current**
loop only, so `session.close()` always runs on the loop that created the
session.
- `CoPilotProcessor.cleanup()` shuts down workspace storage on the
worker's own loop, then stops the loop.
- Manager cleanup submits `cleanup_worker` to each thread pool worker
before shutting down the executor — replacing the old approach of
creating a temporary event loop that couldn't close cross-loop sessions.

### Changes

| File | Change |
|------|--------|
| `util/workspace_storage.py` | `GCSWorkspaceStorage` back to simple
single-session class; `get_workspace_storage()` returns per-loop GCS
instance; `shutdown_workspace_storage()` scoped to current loop |
| `copilot/executor/processor.py` | Added `CoPilotProcessor.cleanup()`
and `cleanup_worker()` |
| `copilot/executor/manager.py` | Calls `cleanup_worker` on each thread
pool worker during shutdown |

Fixes SECRT-2009

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2026-02-18 11:17:39 +00:00
Otto
e9ba7e51db fix(copilot): Route workspace through db_accessors, fix transcript upload (#12148)
## Summary

Fixes two bugs in the copilot executor:

### SECRT-2008: WorkspaceManager bypasses db_accessors
`backend/util/workspace.py` imported 6 workspace functions directly from
`backend/data/workspace.py`, which call `prisma()` directly. In the
copilot executor (no Prisma connection), these fail.

**Fix:** Replace direct imports with `workspace_db()` from
`db_accessors`, routing through the database_manager HTTP client when
Prisma is unavailable. Also:
- Register all 6 workspace functions in `DatabaseManager` and
`DatabaseManagerAsyncClient`
- Add `UniqueViolationError` to the service `EXCEPTION_MAPPING` so it's
properly re-raised over HTTP (needed for race-condition retry logic)

### SECRT-2009: Transcript upload asyncio.timeout error
`asyncio.create_task()` at line 696 of `service.py` creates an orphaned
background task in the executor's thread event loop.
`gcloud-aio-storage`'s `asyncio.timeout()` fails in this context.

**Fix:** Replace `create_task` with direct `await`. The upload runs
after streaming completes (all chunks already yielded), so no
user-facing latency impact. The function already has internal try/except
error handling.
2026-02-17 22:24:19 +00:00
18 changed files with 197 additions and 428 deletions

View File

@@ -50,7 +50,6 @@ from backend.copilot.tools.models import (
OperationPendingResponse,
OperationStartedResponse,
SetupRequirementsResponse,
SuggestedGoalResponse,
UnderstandingUpdatedResponse,
)
from backend.copilot.tracking import track_user_message
@@ -985,7 +984,6 @@ ToolResponseUnion = (
| AgentPreviewResponse
| AgentSavedResponse
| ClarificationNeededResponse
| SuggestedGoalResponse
| BlockListResponse
| BlockDetailsResponse
| BlockOutputResponse

View File

@@ -164,21 +164,23 @@ class CoPilotExecutor(AppProcess):
self._cancel_thread, self.cancel_client, "[cleanup][cancel]"
)
# Shutdown executor
# Clean up worker threads (closes per-loop workspace storage sessions)
if self._executor:
from .processor import cleanup_worker
logger.info(f"[cleanup {pid}] Cleaning up workers...")
futures = []
for _ in range(self._executor._max_workers):
futures.append(self._executor.submit(cleanup_worker))
for f in futures:
try:
f.result(timeout=10)
except Exception as e:
logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}")
logger.info(f"[cleanup {pid}] Shutting down executor...")
self._executor.shutdown(wait=False)
# Close async resources (workspace storage aiohttp session, etc.)
try:
from backend.util.workspace_storage import shutdown_workspace_storage
loop = asyncio.new_event_loop()
loop.run_until_complete(shutdown_workspace_storage())
loop.close()
except Exception as e:
logger.warning(f"[cleanup {pid}] Error closing workspace storage: {e}")
# Release any remaining locks
for task_id, lock in list(self._task_locks.items()):
try:

View File

@@ -60,6 +60,18 @@ def init_worker():
_tls.processor.on_executor_start()
def cleanup_worker():
"""Clean up the processor for the current worker thread.
Should be called before the worker thread's event loop is destroyed so
that event-loop-bound resources (e.g. ``aiohttp.ClientSession``) are
closed on the correct loop.
"""
processor: CoPilotProcessor | None = getattr(_tls, "processor", None)
if processor is not None:
processor.cleanup()
# ============ Processor Class ============ #
@@ -98,6 +110,28 @@ class CoPilotProcessor:
logger.info(f"[CoPilotExecutor] Worker {self.tid} started")
def cleanup(self):
"""Clean up event-loop-bound resources before the loop is destroyed.
Shuts down the workspace storage instance that belongs to this
worker's event loop, ensuring ``aiohttp.ClientSession.close()``
runs on the same loop that created the session.
"""
from backend.util.workspace_storage import shutdown_workspace_storage
try:
future = asyncio.run_coroutine_threadsafe(
shutdown_workspace_storage(), self.execution_loop
)
future.result(timeout=5)
except Exception as e:
logger.warning(f"[CoPilotExecutor] Worker {self.tid} cleanup error: {e}")
# Stop the event loop
self.execution_loop.call_soon_threadsafe(self.execution_loop.stop)
self.execution_thread.join(timeout=5)
logger.info(f"[CoPilotExecutor] Worker {self.tid} cleaned up")
@error_logged(swallow=False)
def execute(
self,

View File

@@ -693,11 +693,15 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0.5)
raw_transcript = read_transcript_file(captured_transcript.path)
if raw_transcript:
task = asyncio.create_task(
_upload_transcript_bg(user_id, session_id, raw_transcript)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
try:
async with asyncio.timeout(30):
await _upload_transcript_bg(
user_id, session_id, raw_transcript
)
except asyncio.TimeoutError:
logger.warning(
f"[SDK] Transcript upload timed out for {session_id}"
)
else:
logger.debug("[SDK] Stop hook fired but transcript not usable")

View File

@@ -118,8 +118,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
@@ -166,11 +164,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
**Handle Feedback Loops:**
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
- Don't ask redundant questions if the user has already provided context in the conversation
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""

View File

@@ -22,7 +22,6 @@ from .models import (
ClarificationNeededResponse,
ClarifyingQuestion,
ErrorResponse,
SuggestedGoalResponse,
ToolResponseBase,
)
@@ -187,25 +186,26 @@ class CreateAgentTool(BaseTool):
if decomposition_result.get("type") == "unachievable_goal":
suggested = decomposition_result.get("suggested_goal", "")
reason = decomposition_result.get("reason", "")
return SuggestedGoalResponse(
return ErrorResponse(
message=(
f"This goal cannot be accomplished with the available blocks. {reason}"
f"This goal cannot be accomplished with the available blocks. "
f"{reason} "
f"Suggestion: {suggested}"
),
suggested_goal=suggested,
reason=reason,
original_goal=description,
goal_type="unachievable",
error="unachievable_goal",
details={"suggested_goal": suggested, "reason": reason},
session_id=session_id,
)
if decomposition_result.get("type") == "vague_goal":
suggested = decomposition_result.get("suggested_goal", "")
return SuggestedGoalResponse(
message="The goal is too vague to create a specific workflow.",
suggested_goal=suggested,
reason="The goal needs more specific details",
original_goal=description,
goal_type="vague",
return ErrorResponse(
message=(
f"The goal is too vague to create a specific workflow. "
f"Suggestion: {suggested}"
),
error="vague_goal",
details={"suggested_goal": suggested},
session_id=session_id,
)

View File

@@ -1,142 +0,0 @@
"""Tests for CreateAgentTool response types."""
from unittest.mock import AsyncMock, patch
import pytest
from backend.copilot.tools.create_agent import CreateAgentTool
from backend.copilot.tools.models import (
ClarificationNeededResponse,
ErrorResponse,
SuggestedGoalResponse,
)
from ._test_data import make_session
_TEST_USER_ID = "test-user-create-agent"
@pytest.fixture
def tool():
return CreateAgentTool()
@pytest.fixture
def session():
return make_session(_TEST_USER_ID)
@pytest.mark.asyncio
async def test_missing_description_returns_error(tool, session):
"""Missing description returns ErrorResponse."""
result = await tool._execute(user_id=_TEST_USER_ID, session=session, description="")
assert isinstance(result, ErrorResponse)
assert result.error == "Missing description parameter"
@pytest.mark.asyncio
async def test_vague_goal_returns_suggested_goal_response(tool, session):
"""vague_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
vague_result = {
"type": "vague_goal",
"suggested_goal": "Monitor Twitter mentions for a specific keyword and send a daily digest email",
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=vague_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="monitor social media",
)
assert isinstance(result, SuggestedGoalResponse)
assert result.goal_type == "vague"
assert result.suggested_goal == vague_result["suggested_goal"]
assert result.original_goal == "monitor social media"
assert result.reason == "The goal needs more specific details"
assert not isinstance(result, ErrorResponse)
@pytest.mark.asyncio
async def test_unachievable_goal_returns_suggested_goal_response(tool, session):
"""unachievable_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
unachievable_result = {
"type": "unachievable_goal",
"suggested_goal": "Summarize the latest news articles on a topic and send them by email",
"reason": "There are no blocks for mind-reading.",
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=unachievable_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="read my mind",
)
assert isinstance(result, SuggestedGoalResponse)
assert result.goal_type == "unachievable"
assert result.suggested_goal == unachievable_result["suggested_goal"]
assert result.original_goal == "read my mind"
assert result.reason == unachievable_result["reason"]
assert not isinstance(result, ErrorResponse)
@pytest.mark.asyncio
async def test_clarifying_questions_returns_clarification_needed_response(
tool, session
):
"""clarifying_questions decomposition result returns ClarificationNeededResponse."""
clarifying_result = {
"type": "clarifying_questions",
"questions": [
{
"question": "What platform should be monitored?",
"keyword": "platform",
"example": "Twitter, Reddit",
}
],
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=clarifying_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="monitor social media and alert me",
)
assert isinstance(result, ClarificationNeededResponse)
assert len(result.questions) == 1
assert result.questions[0].keyword == "platform"

View File

@@ -50,8 +50,6 @@ class ResponseType(str, Enum):
# Feature request types
FEATURE_REQUEST_SEARCH = "feature_request_search"
FEATURE_REQUEST_CREATED = "feature_request_created"
# Goal refinement
SUGGESTED_GOAL = "suggested_goal"
# Base response model
@@ -298,22 +296,6 @@ class ClarificationNeededResponse(ToolResponseBase):
questions: list[ClarifyingQuestion] = Field(default_factory=list)
class SuggestedGoalResponse(ToolResponseBase):
"""Response when the goal needs refinement with a suggested alternative."""
type: ResponseType = ResponseType.SUGGESTED_GOAL
suggested_goal: str = Field(description="The suggested alternative goal")
reason: str = Field(
default="", description="Why the original goal needs refinement"
)
original_goal: str = Field(
default="", description="The user's original goal for context"
)
goal_type: str = Field(
default="vague", description="Type: 'vague' or 'unachievable'"
)
# Documentation search models
class DocSearchResult(BaseModel):
"""A single documentation search result."""

View File

@@ -93,7 +93,15 @@ from backend.data.user import (
get_user_notification_preference,
update_user_integrations,
)
from backend.data.workspace import get_or_create_workspace
from backend.data.workspace import (
count_workspace_files,
create_workspace_file,
get_or_create_workspace,
get_workspace_file,
get_workspace_file_by_path,
list_workspace_files,
soft_delete_workspace_file,
)
from backend.util.service import (
AppService,
AppServiceClient,
@@ -274,7 +282,13 @@ class DatabaseManager(AppService):
get_user_execution_summary_data = _(get_user_execution_summary_data)
# ============ Workspace ============ #
count_workspace_files = _(count_workspace_files)
create_workspace_file = _(create_workspace_file)
get_or_create_workspace = _(get_or_create_workspace)
get_workspace_file = _(get_workspace_file)
get_workspace_file_by_path = _(get_workspace_file_by_path)
list_workspace_files = _(list_workspace_files)
soft_delete_workspace_file = _(soft_delete_workspace_file)
# ============ Understanding ============ #
get_business_understanding = _(get_business_understanding)
@@ -438,7 +452,13 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_user_execution_summary_data = d.get_user_execution_summary_data
# ============ Workspace ============ #
count_workspace_files = d.count_workspace_files
create_workspace_file = d.create_workspace_file
get_or_create_workspace = d.get_or_create_workspace
get_workspace_file = d.get_workspace_file
get_workspace_file_by_path = d.get_workspace_file_by_path
list_workspace_files = d.list_workspace_files
soft_delete_workspace_file = d.soft_delete_workspace_file
# ============ Understanding ============ #
get_business_understanding = d.get_business_understanding

View File

@@ -164,21 +164,23 @@ async def create_workspace_file(
async def get_workspace_file(
file_id: str,
workspace_id: Optional[str] = None,
workspace_id: str,
) -> Optional[WorkspaceFile]:
"""
Get a workspace file by ID.
Args:
file_id: The file ID
workspace_id: Optional workspace ID for validation
workspace_id: Workspace ID for scoping (required)
Returns:
WorkspaceFile instance or None
"""
where_clause: dict = {"id": file_id, "isDeleted": False}
if workspace_id:
where_clause["workspaceId"] = workspace_id
where_clause: UserWorkspaceFileWhereInput = {
"id": file_id,
"isDeleted": False,
"workspaceId": workspace_id,
}
file = await UserWorkspaceFile.prisma().find_first(where=where_clause)
return WorkspaceFile.from_db(file) if file else None
@@ -268,7 +270,7 @@ async def count_workspace_files(
Returns:
Number of files
"""
where_clause: dict = {"workspaceId": workspace_id}
where_clause: UserWorkspaceFileWhereInput = {"workspaceId": workspace_id}
if not include_deleted:
where_clause["isDeleted"] = False
@@ -283,7 +285,7 @@ async def count_workspace_files(
async def soft_delete_workspace_file(
file_id: str,
workspace_id: Optional[str] = None,
workspace_id: str,
) -> Optional[WorkspaceFile]:
"""
Soft-delete a workspace file.
@@ -293,7 +295,7 @@ async def soft_delete_workspace_file(
Args:
file_id: The file ID
workspace_id: Optional workspace ID for validation
workspace_id: Workspace ID for scoping (required)
Returns:
Updated WorkspaceFile instance or None if not found

View File

@@ -28,7 +28,7 @@ from typing import (
import httpx
import uvicorn
from fastapi import FastAPI, Request, responses
from prisma.errors import DataError
from prisma.errors import DataError, UniqueViolationError
from pydantic import BaseModel, TypeAdapter, create_model
import backend.util.exceptions as exceptions
@@ -201,6 +201,7 @@ EXCEPTION_MAPPING = {
UnhealthyServiceError,
HTTPClientError,
HTTPServerError,
UniqueViolationError,
*[
ErrorType
for _, ErrorType in inspect.getmembers(exceptions)
@@ -416,6 +417,9 @@ class AppService(BaseAppService, ABC):
self.fastapi_app.add_exception_handler(
DataError, self._handle_internal_http_error(400)
)
self.fastapi_app.add_exception_handler(
UniqueViolationError, self._handle_internal_http_error(400)
)
self.fastapi_app.add_exception_handler(
Exception, self._handle_internal_http_error(500)
)
@@ -478,6 +482,7 @@ def get_service_client(
# Don't retry these specific exceptions that won't be fixed by retrying
ValueError, # Invalid input/parameters
DataError, # Prisma data integrity errors (foreign key, unique constraints)
UniqueViolationError, # Unique constraint violations
KeyError, # Missing required data
TypeError, # Wrong data types
AttributeError, # Missing attributes

View File

@@ -12,15 +12,8 @@ from typing import Optional
from prisma.errors import UniqueViolationError
from backend.data.workspace import (
WorkspaceFile,
count_workspace_files,
create_workspace_file,
get_workspace_file,
get_workspace_file_by_path,
list_workspace_files,
soft_delete_workspace_file,
)
from backend.data.db_accessors import workspace_db
from backend.data.workspace import WorkspaceFile
from backend.util.settings import Config
from backend.util.virus_scanner import scan_content_safe
from backend.util.workspace_storage import compute_file_checksum, get_workspace_storage
@@ -125,8 +118,9 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
db = workspace_db()
resolved_path = self._resolve_path(path)
file = await get_workspace_file_by_path(self.workspace_id, resolved_path)
file = await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
if file is None:
raise FileNotFoundError(f"File not found at path: {resolved_path}")
@@ -146,7 +140,8 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
file = await get_workspace_file(file_id, self.workspace_id)
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
if file is None:
raise FileNotFoundError(f"File not found: {file_id}")
@@ -204,8 +199,10 @@ class WorkspaceManager:
# For overwrite=True, we let the write proceed and handle via UniqueViolationError
# This ensures the new file is written to storage BEFORE the old one is deleted,
# preventing data loss if the new write fails
db = workspace_db()
if not overwrite:
existing = await get_workspace_file_by_path(self.workspace_id, path)
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
if existing is not None:
raise ValueError(f"File already exists at path: {path}")
@@ -232,7 +229,7 @@ class WorkspaceManager:
# Create database record - handle race condition where another request
# created a file at the same path between our check and create
try:
file = await create_workspace_file(
file = await db.create_workspace_file(
workspace_id=self.workspace_id,
file_id=file_id,
name=filename,
@@ -246,12 +243,12 @@ class WorkspaceManager:
# Race condition: another request created a file at this path
if overwrite:
# Re-fetch and delete the conflicting file, then retry
existing = await get_workspace_file_by_path(self.workspace_id, path)
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
if existing:
await self.delete_file(existing.id)
# Retry the create - if this also fails, clean up storage file
try:
file = await create_workspace_file(
file = await db.create_workspace_file(
workspace_id=self.workspace_id,
file_id=file_id,
name=filename,
@@ -314,8 +311,9 @@ class WorkspaceManager:
List of WorkspaceFile instances
"""
effective_path = self._get_effective_path(path, include_all_sessions)
db = workspace_db()
return await list_workspace_files(
return await db.list_workspace_files(
workspace_id=self.workspace_id,
path_prefix=effective_path,
limit=limit,
@@ -332,7 +330,8 @@ class WorkspaceManager:
Returns:
True if deleted, False if not found
"""
file = await get_workspace_file(file_id, self.workspace_id)
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
if file is None:
return False
@@ -345,7 +344,7 @@ class WorkspaceManager:
# Continue with database soft-delete even if storage delete fails
# Soft-delete database record
result = await soft_delete_workspace_file(file_id, self.workspace_id)
result = await db.soft_delete_workspace_file(file_id, self.workspace_id)
return result is not None
async def get_download_url(self, file_id: str, expires_in: int = 3600) -> str:
@@ -362,7 +361,8 @@ class WorkspaceManager:
Raises:
FileNotFoundError: If file doesn't exist
"""
file = await get_workspace_file(file_id, self.workspace_id)
db = workspace_db()
file = await db.get_workspace_file(file_id, self.workspace_id)
if file is None:
raise FileNotFoundError(f"File not found: {file_id}")
@@ -379,7 +379,8 @@ class WorkspaceManager:
Returns:
WorkspaceFile instance or None
"""
return await get_workspace_file(file_id, self.workspace_id)
db = workspace_db()
return await db.get_workspace_file(file_id, self.workspace_id)
async def get_file_info_by_path(self, path: str) -> Optional[WorkspaceFile]:
"""
@@ -394,8 +395,9 @@ class WorkspaceManager:
Returns:
WorkspaceFile instance or None
"""
db = workspace_db()
resolved_path = self._resolve_path(path)
return await get_workspace_file_by_path(self.workspace_id, resolved_path)
return await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
async def get_file_count(
self,
@@ -417,7 +419,8 @@ class WorkspaceManager:
Number of files
"""
effective_path = self._get_effective_path(path, include_all_sessions)
db = workspace_db()
return await count_workspace_files(
return await db.count_workspace_files(
self.workspace_id, path_prefix=effective_path
)

View File

@@ -93,7 +93,14 @@ class WorkspaceStorageBackend(ABC):
class GCSWorkspaceStorage(WorkspaceStorageBackend):
"""Google Cloud Storage implementation for workspace storage."""
"""Google Cloud Storage implementation for workspace storage.
Each instance owns a single ``aiohttp.ClientSession`` and GCS async
client. Because ``ClientSession`` is bound to the event loop on which it
was created, callers that run on separate loops (e.g. copilot executor
worker threads) **must** obtain their own ``GCSWorkspaceStorage`` instance
via :func:`get_workspace_storage` which is event-loop-aware.
"""
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
@@ -337,60 +344,73 @@ class LocalWorkspaceStorage(WorkspaceStorageBackend):
raise ValueError(f"Invalid storage path format: {storage_path}")
# Global storage backend instance
_workspace_storage: Optional[WorkspaceStorageBackend] = None
# ---------------------------------------------------------------------------
# Storage instance management
# ---------------------------------------------------------------------------
# ``aiohttp.ClientSession`` is bound to the event loop where it is created.
# The copilot executor runs each worker in its own thread with a dedicated
# event loop, so a single global ``GCSWorkspaceStorage`` instance would break.
#
# For **local storage** a single shared instance is fine (no async I/O).
# For **GCS storage** we keep one instance *per event loop* so every loop
# gets its own ``ClientSession``.
# ---------------------------------------------------------------------------
_local_storage: Optional[LocalWorkspaceStorage] = None
_gcs_storages: dict[int, GCSWorkspaceStorage] = {}
_storage_lock = asyncio.Lock()
async def get_workspace_storage() -> WorkspaceStorageBackend:
"""Return a workspace storage backend for the **current** event loop.
* Local storage → single shared instance (no event-loop affinity).
* GCS storage → one instance per event loop to avoid cross-loop
``aiohttp`` errors.
"""
Get the workspace storage backend instance.
global _local_storage
Uses GCS if media_gcs_bucket_name is configured, otherwise uses local storage.
"""
global _workspace_storage
config = Config()
if _workspace_storage is None:
async with _storage_lock:
if _workspace_storage is None:
config = Config()
# --- Local storage (shared) ---
if not config.media_gcs_bucket_name:
if _local_storage is None:
storage_dir = (
config.workspace_storage_dir if config.workspace_storage_dir else None
)
logger.info(f"Using local workspace storage: {storage_dir or 'default'}")
_local_storage = LocalWorkspaceStorage(storage_dir)
return _local_storage
if config.media_gcs_bucket_name:
logger.info(
f"Using GCS workspace storage: {config.media_gcs_bucket_name}"
)
_workspace_storage = GCSWorkspaceStorage(
config.media_gcs_bucket_name
)
else:
storage_dir = (
config.workspace_storage_dir
if config.workspace_storage_dir
else None
)
logger.info(
f"Using local workspace storage: {storage_dir or 'default'}"
)
_workspace_storage = LocalWorkspaceStorage(storage_dir)
return _workspace_storage
# --- GCS storage (per event loop) ---
loop_id = id(asyncio.get_running_loop())
if loop_id not in _gcs_storages:
logger.info(
f"Creating GCS workspace storage for loop {loop_id}: "
f"{config.media_gcs_bucket_name}"
)
_gcs_storages[loop_id] = GCSWorkspaceStorage(config.media_gcs_bucket_name)
return _gcs_storages[loop_id]
async def shutdown_workspace_storage() -> None:
"""
Properly shutdown the global workspace storage backend.
"""Shut down workspace storage for the **current** event loop.
Closes aiohttp sessions and other resources for GCS backend.
Should be called during application shutdown.
Closes the ``aiohttp`` session owned by the current loop's GCS instance.
Each worker thread should call this on its own loop before the loop is
destroyed. The REST API lifespan hook calls it for the main server loop.
"""
global _workspace_storage
global _local_storage
if _workspace_storage is not None:
async with _storage_lock:
if _workspace_storage is not None:
if isinstance(_workspace_storage, GCSWorkspaceStorage):
await _workspace_storage.close()
_workspace_storage = None
loop_id = id(asyncio.get_running_loop())
storage = _gcs_storages.pop(loop_id, None)
if storage is not None:
await storage.close()
# Clear local storage only when the last GCS instance is gone
# (i.e. full shutdown, not just a single worker stopping).
if not _gcs_storages:
_local_storage = None
def compute_file_checksum(content: bytes) -> str:

View File

@@ -169,7 +169,10 @@ export const ChatMessagesContainer = ({
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
{headerSlot}
{isLoading && messages.length === 0 && (
<div className="flex min-h-full flex-1 items-center justify-center">
<div
className="flex flex-1 items-center justify-center"
style={{ minHeight: "calc(100vh - 12rem)" }}
>
<LoadingSpinner className="text-neutral-600" />
</div>
)}

View File

@@ -26,7 +26,6 @@ import {
} from "./components/ClarificationQuestionsCard";
import sparklesImg from "./components/MiniGame/assets/sparkles.png";
import { MiniGame } from "./components/MiniGame/MiniGame";
import { SuggestedGoalCard } from "./components/SuggestedGoalCard";
import {
AccordionIcon,
formatMaybeJson,
@@ -39,7 +38,6 @@ import {
isOperationInProgressOutput,
isOperationPendingOutput,
isOperationStartedOutput,
isSuggestedGoalOutput,
ToolIcon,
truncateText,
type CreateAgentToolOutput,
@@ -79,13 +77,6 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
expanded: true,
};
}
if (isSuggestedGoalOutput(output)) {
return {
icon,
title: "Goal needs refinement",
expanded: true,
};
}
if (
isOperationStartedOutput(output) ||
isOperationPendingOutput(output) ||
@@ -134,13 +125,8 @@ export function CreateAgentTool({ part }: Props) {
isAgentPreviewOutput(output) ||
isAgentSavedOutput(output) ||
isClarificationNeededOutput(output) ||
isSuggestedGoalOutput(output) ||
isErrorOutput(output));
function handleUseSuggestedGoal(goal: string) {
onSend(`Please create an agent with this goal: ${goal}`);
}
function handleClarificationAnswers(answers: Record<string, string>) {
const questions =
output && isClarificationNeededOutput(output)
@@ -259,15 +245,6 @@ export function CreateAgentTool({ part }: Props) {
/>
)}
{isSuggestedGoalOutput(output) && (
<SuggestedGoalCard
message={output.message}
suggestedGoal={output.suggested_goal}
goalType={output.goal_type ?? "vague"}
onUseSuggestedGoal={handleUseSuggestedGoal}
/>
)}
{isErrorOutput(output) && (
<ContentGrid>
<ContentMessage>{output.message}</ContentMessage>
@@ -281,22 +258,6 @@ export function CreateAgentTool({ part }: Props) {
{formatMaybeJson(output.details)}
</ContentCodeBlock>
)}
<div className="flex gap-2">
<Button
variant="outline"
size="small"
onClick={() => onSend("Please try creating the agent again.")}
>
Try again
</Button>
<Button
variant="outline"
size="small"
onClick={() => onSend("Can you help me simplify this goal?")}
>
Simplify goal
</Button>
</div>
</ContentGrid>
)}
</ToolAccordion>

View File

@@ -1,61 +0,0 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { ArrowRightIcon, LightbulbIcon } from "@phosphor-icons/react";
interface Props {
message: string;
suggestedGoal: string;
goalType: string;
onUseSuggestedGoal: (goal: string) => void;
}
export function SuggestedGoalCard({
message,
suggestedGoal,
goalType,
onUseSuggestedGoal,
}: Props) {
return (
<div className="rounded-xl border border-amber-200 bg-amber-50/50 p-4">
<div className="flex items-start gap-3">
<LightbulbIcon
size={20}
weight="fill"
className="mt-0.5 text-amber-600"
/>
<div className="flex-1 space-y-3">
<div>
<Text variant="body-medium" className="font-medium text-slate-900">
{goalType === "unachievable"
? "Goal cannot be accomplished"
: "Goal needs more detail"}
</Text>
<Text variant="small" className="text-slate-600">
{message}
</Text>
</div>
<div className="rounded-lg border border-amber-300 bg-white p-3">
<Text variant="small" className="mb-1 font-semibold text-amber-800">
Suggested alternative:
</Text>
<Text variant="body-medium" className="text-slate-900">
{suggestedGoal}
</Text>
</div>
<Button
onClick={() => onUseSuggestedGoal(suggestedGoal)}
variant="primary"
>
<span className="inline-flex items-center gap-1.5">
Use this goal <ArrowRightIcon size={14} weight="bold" />
</span>
</Button>
</div>
</div>
</div>
);
}

View File

@@ -6,7 +6,6 @@ import type { OperationInProgressResponse } from "@/app/api/__generated__/models
import type { OperationPendingResponse } from "@/app/api/__generated__/models/operationPendingResponse";
import type { OperationStartedResponse } from "@/app/api/__generated__/models/operationStartedResponse";
import { ResponseType } from "@/app/api/__generated__/models/responseType";
import type { SuggestedGoalResponse } from "@/app/api/__generated__/models/suggestedGoalResponse";
import {
PlusCircleIcon,
PlusIcon,
@@ -22,7 +21,6 @@ export type CreateAgentToolOutput =
| AgentPreviewResponse
| AgentSavedResponse
| ClarificationNeededResponse
| SuggestedGoalResponse
| ErrorResponse;
function parseOutput(output: unknown): CreateAgentToolOutput | null {
@@ -45,7 +43,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
type === ResponseType.agent_preview ||
type === ResponseType.agent_saved ||
type === ResponseType.clarification_needed ||
type === ResponseType.suggested_goal ||
type === ResponseType.error
) {
return output as CreateAgentToolOutput;
@@ -58,7 +55,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
if ("agent_id" in output && "library_agent_id" in output)
return output as AgentSavedResponse;
if ("questions" in output) return output as ClarificationNeededResponse;
if ("suggested_goal" in output) return output as SuggestedGoalResponse;
if ("error" in output || "details" in output)
return output as ErrorResponse;
}
@@ -118,14 +114,6 @@ export function isClarificationNeededOutput(
);
}
export function isSuggestedGoalOutput(
output: CreateAgentToolOutput,
): output is SuggestedGoalResponse {
return (
output.type === ResponseType.suggested_goal || "suggested_goal" in output
);
}
export function isErrorOutput(
output: CreateAgentToolOutput,
): output is ErrorResponse {
@@ -151,7 +139,6 @@ export function getAnimationText(part: {
if (isAgentSavedOutput(output)) return `Saved ${output.agent_name}`;
if (isAgentPreviewOutput(output)) return `Preview "${output.agent_name}"`;
if (isClarificationNeededOutput(output)) return "Needs clarification";
if (isSuggestedGoalOutput(output)) return "Goal needs refinement";
return "Error creating agent";
}
case "output-error":

View File

@@ -1052,7 +1052,6 @@
{
"$ref": "#/components/schemas/ClarificationNeededResponse"
},
{ "$ref": "#/components/schemas/SuggestedGoalResponse" },
{ "$ref": "#/components/schemas/BlockListResponse" },
{ "$ref": "#/components/schemas/BlockDetailsResponse" },
{ "$ref": "#/components/schemas/BlockOutputResponse" },
@@ -10797,8 +10796,7 @@
"bash_exec",
"operation_status",
"feature_request_search",
"feature_request_created",
"suggested_goal"
"feature_request_created"
],
"title": "ResponseType",
"description": "Types of tool responses."
@@ -11679,46 +11677,6 @@
"enum": ["DRAFT", "PENDING", "APPROVED", "REJECTED"],
"title": "SubmissionStatus"
},
"SuggestedGoalResponse": {
"properties": {
"type": {
"$ref": "#/components/schemas/ResponseType",
"default": "suggested_goal"
},
"message": { "type": "string", "title": "Message" },
"session_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Session Id"
},
"suggested_goal": {
"type": "string",
"title": "Suggested Goal",
"description": "The suggested alternative goal"
},
"reason": {
"type": "string",
"title": "Reason",
"description": "Why the original goal needs refinement",
"default": ""
},
"original_goal": {
"type": "string",
"title": "Original Goal",
"description": "The user's original goal for context",
"default": ""
},
"goal_type": {
"type": "string",
"title": "Goal Type",
"description": "Type: 'vague' or 'unachievable'",
"default": "vague"
}
},
"type": "object",
"required": ["message", "suggested_goal"],
"title": "SuggestedGoalResponse",
"description": "Response when the goal needs refinement with a suggested alternative."
},
"SuggestionsResponse": {
"properties": {
"otto_suggestions": {