mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-18 10:41:49 -05:00
Compare commits
5 Commits
feat/fix-h
...
fix/spinne
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
efe2f18719 | ||
|
|
23eda10cb1 | ||
|
|
7bfc5140b0 | ||
|
|
15bcdae4e8 | ||
|
|
e9ba7e51db |
@@ -50,7 +50,6 @@ from backend.copilot.tools.models import (
|
||||
OperationPendingResponse,
|
||||
OperationStartedResponse,
|
||||
SetupRequirementsResponse,
|
||||
SuggestedGoalResponse,
|
||||
UnderstandingUpdatedResponse,
|
||||
)
|
||||
from backend.copilot.tracking import track_user_message
|
||||
@@ -985,7 +984,6 @@ ToolResponseUnion = (
|
||||
| AgentPreviewResponse
|
||||
| AgentSavedResponse
|
||||
| ClarificationNeededResponse
|
||||
| SuggestedGoalResponse
|
||||
| BlockListResponse
|
||||
| BlockDetailsResponse
|
||||
| BlockOutputResponse
|
||||
|
||||
@@ -164,21 +164,23 @@ class CoPilotExecutor(AppProcess):
|
||||
self._cancel_thread, self.cancel_client, "[cleanup][cancel]"
|
||||
)
|
||||
|
||||
# Shutdown executor
|
||||
# Clean up worker threads (closes per-loop workspace storage sessions)
|
||||
if self._executor:
|
||||
from .processor import cleanup_worker
|
||||
|
||||
logger.info(f"[cleanup {pid}] Cleaning up workers...")
|
||||
futures = []
|
||||
for _ in range(self._executor._max_workers):
|
||||
futures.append(self._executor.submit(cleanup_worker))
|
||||
for f in futures:
|
||||
try:
|
||||
f.result(timeout=10)
|
||||
except Exception as e:
|
||||
logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}")
|
||||
|
||||
logger.info(f"[cleanup {pid}] Shutting down executor...")
|
||||
self._executor.shutdown(wait=False)
|
||||
|
||||
# Close async resources (workspace storage aiohttp session, etc.)
|
||||
try:
|
||||
from backend.util.workspace_storage import shutdown_workspace_storage
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(shutdown_workspace_storage())
|
||||
loop.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"[cleanup {pid}] Error closing workspace storage: {e}")
|
||||
|
||||
# Release any remaining locks
|
||||
for task_id, lock in list(self._task_locks.items()):
|
||||
try:
|
||||
|
||||
@@ -60,6 +60,18 @@ def init_worker():
|
||||
_tls.processor.on_executor_start()
|
||||
|
||||
|
||||
def cleanup_worker():
|
||||
"""Clean up the processor for the current worker thread.
|
||||
|
||||
Should be called before the worker thread's event loop is destroyed so
|
||||
that event-loop-bound resources (e.g. ``aiohttp.ClientSession``) are
|
||||
closed on the correct loop.
|
||||
"""
|
||||
processor: CoPilotProcessor | None = getattr(_tls, "processor", None)
|
||||
if processor is not None:
|
||||
processor.cleanup()
|
||||
|
||||
|
||||
# ============ Processor Class ============ #
|
||||
|
||||
|
||||
@@ -98,6 +110,28 @@ class CoPilotProcessor:
|
||||
|
||||
logger.info(f"[CoPilotExecutor] Worker {self.tid} started")
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up event-loop-bound resources before the loop is destroyed.
|
||||
|
||||
Shuts down the workspace storage instance that belongs to this
|
||||
worker's event loop, ensuring ``aiohttp.ClientSession.close()``
|
||||
runs on the same loop that created the session.
|
||||
"""
|
||||
from backend.util.workspace_storage import shutdown_workspace_storage
|
||||
|
||||
try:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
shutdown_workspace_storage(), self.execution_loop
|
||||
)
|
||||
future.result(timeout=5)
|
||||
except Exception as e:
|
||||
logger.warning(f"[CoPilotExecutor] Worker {self.tid} cleanup error: {e}")
|
||||
|
||||
# Stop the event loop
|
||||
self.execution_loop.call_soon_threadsafe(self.execution_loop.stop)
|
||||
self.execution_thread.join(timeout=5)
|
||||
logger.info(f"[CoPilotExecutor] Worker {self.tid} cleaned up")
|
||||
|
||||
@error_logged(swallow=False)
|
||||
def execute(
|
||||
self,
|
||||
|
||||
@@ -693,11 +693,15 @@ async def stream_chat_completion_sdk(
|
||||
await asyncio.sleep(0.5)
|
||||
raw_transcript = read_transcript_file(captured_transcript.path)
|
||||
if raw_transcript:
|
||||
task = asyncio.create_task(
|
||||
_upload_transcript_bg(user_id, session_id, raw_transcript)
|
||||
)
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
try:
|
||||
async with asyncio.timeout(30):
|
||||
await _upload_transcript_bg(
|
||||
user_id, session_id, raw_transcript
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"[SDK] Transcript upload timed out for {session_id}"
|
||||
)
|
||||
else:
|
||||
logger.debug("[SDK] Stop hook fired but transcript not usable")
|
||||
|
||||
|
||||
@@ -118,8 +118,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
|
||||
- Find reusable components with `find_block`
|
||||
- Create custom solutions with `create_agent` if nothing suitable exists
|
||||
- Modify existing library agents with `edit_agent`
|
||||
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
|
||||
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
|
||||
|
||||
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
|
||||
|
||||
@@ -166,11 +164,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
|
||||
- Use `add_understanding` to capture valuable business context
|
||||
- When tool calls fail, try alternative approaches
|
||||
|
||||
**Handle Feedback Loops:**
|
||||
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
|
||||
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
|
||||
- Don't ask redundant questions if the user has already provided context in the conversation
|
||||
|
||||
## CRITICAL REMINDER
|
||||
|
||||
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
|
||||
|
||||
@@ -22,7 +22,6 @@ from .models import (
|
||||
ClarificationNeededResponse,
|
||||
ClarifyingQuestion,
|
||||
ErrorResponse,
|
||||
SuggestedGoalResponse,
|
||||
ToolResponseBase,
|
||||
)
|
||||
|
||||
@@ -187,25 +186,26 @@ class CreateAgentTool(BaseTool):
|
||||
if decomposition_result.get("type") == "unachievable_goal":
|
||||
suggested = decomposition_result.get("suggested_goal", "")
|
||||
reason = decomposition_result.get("reason", "")
|
||||
return SuggestedGoalResponse(
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"This goal cannot be accomplished with the available blocks. {reason}"
|
||||
f"This goal cannot be accomplished with the available blocks. "
|
||||
f"{reason} "
|
||||
f"Suggestion: {suggested}"
|
||||
),
|
||||
suggested_goal=suggested,
|
||||
reason=reason,
|
||||
original_goal=description,
|
||||
goal_type="unachievable",
|
||||
error="unachievable_goal",
|
||||
details={"suggested_goal": suggested, "reason": reason},
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
if decomposition_result.get("type") == "vague_goal":
|
||||
suggested = decomposition_result.get("suggested_goal", "")
|
||||
return SuggestedGoalResponse(
|
||||
message="The goal is too vague to create a specific workflow.",
|
||||
suggested_goal=suggested,
|
||||
reason="The goal needs more specific details",
|
||||
original_goal=description,
|
||||
goal_type="vague",
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"The goal is too vague to create a specific workflow. "
|
||||
f"Suggestion: {suggested}"
|
||||
),
|
||||
error="vague_goal",
|
||||
details={"suggested_goal": suggested},
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,142 +0,0 @@
|
||||
"""Tests for CreateAgentTool response types."""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.copilot.tools.create_agent import CreateAgentTool
|
||||
from backend.copilot.tools.models import (
|
||||
ClarificationNeededResponse,
|
||||
ErrorResponse,
|
||||
SuggestedGoalResponse,
|
||||
)
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
_TEST_USER_ID = "test-user-create-agent"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tool():
|
||||
return CreateAgentTool()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session():
|
||||
return make_session(_TEST_USER_ID)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_missing_description_returns_error(tool, session):
|
||||
"""Missing description returns ErrorResponse."""
|
||||
result = await tool._execute(user_id=_TEST_USER_ID, session=session, description="")
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert result.error == "Missing description parameter"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vague_goal_returns_suggested_goal_response(tool, session):
|
||||
"""vague_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
|
||||
vague_result = {
|
||||
"type": "vague_goal",
|
||||
"suggested_goal": "Monitor Twitter mentions for a specific keyword and send a daily digest email",
|
||||
}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=[],
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.decompose_goal",
|
||||
new_callable=AsyncMock,
|
||||
return_value=vague_result,
|
||||
),
|
||||
):
|
||||
result = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
description="monitor social media",
|
||||
)
|
||||
|
||||
assert isinstance(result, SuggestedGoalResponse)
|
||||
assert result.goal_type == "vague"
|
||||
assert result.suggested_goal == vague_result["suggested_goal"]
|
||||
assert result.original_goal == "monitor social media"
|
||||
assert result.reason == "The goal needs more specific details"
|
||||
assert not isinstance(result, ErrorResponse)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unachievable_goal_returns_suggested_goal_response(tool, session):
|
||||
"""unachievable_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
|
||||
unachievable_result = {
|
||||
"type": "unachievable_goal",
|
||||
"suggested_goal": "Summarize the latest news articles on a topic and send them by email",
|
||||
"reason": "There are no blocks for mind-reading.",
|
||||
}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=[],
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.decompose_goal",
|
||||
new_callable=AsyncMock,
|
||||
return_value=unachievable_result,
|
||||
),
|
||||
):
|
||||
result = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
description="read my mind",
|
||||
)
|
||||
|
||||
assert isinstance(result, SuggestedGoalResponse)
|
||||
assert result.goal_type == "unachievable"
|
||||
assert result.suggested_goal == unachievable_result["suggested_goal"]
|
||||
assert result.original_goal == "read my mind"
|
||||
assert result.reason == unachievable_result["reason"]
|
||||
assert not isinstance(result, ErrorResponse)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clarifying_questions_returns_clarification_needed_response(
|
||||
tool, session
|
||||
):
|
||||
"""clarifying_questions decomposition result returns ClarificationNeededResponse."""
|
||||
clarifying_result = {
|
||||
"type": "clarifying_questions",
|
||||
"questions": [
|
||||
{
|
||||
"question": "What platform should be monitored?",
|
||||
"keyword": "platform",
|
||||
"example": "Twitter, Reddit",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=[],
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.create_agent.decompose_goal",
|
||||
new_callable=AsyncMock,
|
||||
return_value=clarifying_result,
|
||||
),
|
||||
):
|
||||
result = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
description="monitor social media and alert me",
|
||||
)
|
||||
|
||||
assert isinstance(result, ClarificationNeededResponse)
|
||||
assert len(result.questions) == 1
|
||||
assert result.questions[0].keyword == "platform"
|
||||
@@ -50,8 +50,6 @@ class ResponseType(str, Enum):
|
||||
# Feature request types
|
||||
FEATURE_REQUEST_SEARCH = "feature_request_search"
|
||||
FEATURE_REQUEST_CREATED = "feature_request_created"
|
||||
# Goal refinement
|
||||
SUGGESTED_GOAL = "suggested_goal"
|
||||
|
||||
|
||||
# Base response model
|
||||
@@ -298,22 +296,6 @@ class ClarificationNeededResponse(ToolResponseBase):
|
||||
questions: list[ClarifyingQuestion] = Field(default_factory=list)
|
||||
|
||||
|
||||
class SuggestedGoalResponse(ToolResponseBase):
|
||||
"""Response when the goal needs refinement with a suggested alternative."""
|
||||
|
||||
type: ResponseType = ResponseType.SUGGESTED_GOAL
|
||||
suggested_goal: str = Field(description="The suggested alternative goal")
|
||||
reason: str = Field(
|
||||
default="", description="Why the original goal needs refinement"
|
||||
)
|
||||
original_goal: str = Field(
|
||||
default="", description="The user's original goal for context"
|
||||
)
|
||||
goal_type: str = Field(
|
||||
default="vague", description="Type: 'vague' or 'unachievable'"
|
||||
)
|
||||
|
||||
|
||||
# Documentation search models
|
||||
class DocSearchResult(BaseModel):
|
||||
"""A single documentation search result."""
|
||||
|
||||
@@ -93,7 +93,15 @@ from backend.data.user import (
|
||||
get_user_notification_preference,
|
||||
update_user_integrations,
|
||||
)
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.data.workspace import (
|
||||
count_workspace_files,
|
||||
create_workspace_file,
|
||||
get_or_create_workspace,
|
||||
get_workspace_file,
|
||||
get_workspace_file_by_path,
|
||||
list_workspace_files,
|
||||
soft_delete_workspace_file,
|
||||
)
|
||||
from backend.util.service import (
|
||||
AppService,
|
||||
AppServiceClient,
|
||||
@@ -274,7 +282,13 @@ class DatabaseManager(AppService):
|
||||
get_user_execution_summary_data = _(get_user_execution_summary_data)
|
||||
|
||||
# ============ Workspace ============ #
|
||||
count_workspace_files = _(count_workspace_files)
|
||||
create_workspace_file = _(create_workspace_file)
|
||||
get_or_create_workspace = _(get_or_create_workspace)
|
||||
get_workspace_file = _(get_workspace_file)
|
||||
get_workspace_file_by_path = _(get_workspace_file_by_path)
|
||||
list_workspace_files = _(list_workspace_files)
|
||||
soft_delete_workspace_file = _(soft_delete_workspace_file)
|
||||
|
||||
# ============ Understanding ============ #
|
||||
get_business_understanding = _(get_business_understanding)
|
||||
@@ -438,7 +452,13 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
get_user_execution_summary_data = d.get_user_execution_summary_data
|
||||
|
||||
# ============ Workspace ============ #
|
||||
count_workspace_files = d.count_workspace_files
|
||||
create_workspace_file = d.create_workspace_file
|
||||
get_or_create_workspace = d.get_or_create_workspace
|
||||
get_workspace_file = d.get_workspace_file
|
||||
get_workspace_file_by_path = d.get_workspace_file_by_path
|
||||
list_workspace_files = d.list_workspace_files
|
||||
soft_delete_workspace_file = d.soft_delete_workspace_file
|
||||
|
||||
# ============ Understanding ============ #
|
||||
get_business_understanding = d.get_business_understanding
|
||||
|
||||
@@ -164,21 +164,23 @@ async def create_workspace_file(
|
||||
|
||||
async def get_workspace_file(
|
||||
file_id: str,
|
||||
workspace_id: Optional[str] = None,
|
||||
workspace_id: str,
|
||||
) -> Optional[WorkspaceFile]:
|
||||
"""
|
||||
Get a workspace file by ID.
|
||||
|
||||
Args:
|
||||
file_id: The file ID
|
||||
workspace_id: Optional workspace ID for validation
|
||||
workspace_id: Workspace ID for scoping (required)
|
||||
|
||||
Returns:
|
||||
WorkspaceFile instance or None
|
||||
"""
|
||||
where_clause: dict = {"id": file_id, "isDeleted": False}
|
||||
if workspace_id:
|
||||
where_clause["workspaceId"] = workspace_id
|
||||
where_clause: UserWorkspaceFileWhereInput = {
|
||||
"id": file_id,
|
||||
"isDeleted": False,
|
||||
"workspaceId": workspace_id,
|
||||
}
|
||||
|
||||
file = await UserWorkspaceFile.prisma().find_first(where=where_clause)
|
||||
return WorkspaceFile.from_db(file) if file else None
|
||||
@@ -268,7 +270,7 @@ async def count_workspace_files(
|
||||
Returns:
|
||||
Number of files
|
||||
"""
|
||||
where_clause: dict = {"workspaceId": workspace_id}
|
||||
where_clause: UserWorkspaceFileWhereInput = {"workspaceId": workspace_id}
|
||||
if not include_deleted:
|
||||
where_clause["isDeleted"] = False
|
||||
|
||||
@@ -283,7 +285,7 @@ async def count_workspace_files(
|
||||
|
||||
async def soft_delete_workspace_file(
|
||||
file_id: str,
|
||||
workspace_id: Optional[str] = None,
|
||||
workspace_id: str,
|
||||
) -> Optional[WorkspaceFile]:
|
||||
"""
|
||||
Soft-delete a workspace file.
|
||||
@@ -293,7 +295,7 @@ async def soft_delete_workspace_file(
|
||||
|
||||
Args:
|
||||
file_id: The file ID
|
||||
workspace_id: Optional workspace ID for validation
|
||||
workspace_id: Workspace ID for scoping (required)
|
||||
|
||||
Returns:
|
||||
Updated WorkspaceFile instance or None if not found
|
||||
|
||||
@@ -28,7 +28,7 @@ from typing import (
|
||||
import httpx
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, Request, responses
|
||||
from prisma.errors import DataError
|
||||
from prisma.errors import DataError, UniqueViolationError
|
||||
from pydantic import BaseModel, TypeAdapter, create_model
|
||||
|
||||
import backend.util.exceptions as exceptions
|
||||
@@ -201,6 +201,7 @@ EXCEPTION_MAPPING = {
|
||||
UnhealthyServiceError,
|
||||
HTTPClientError,
|
||||
HTTPServerError,
|
||||
UniqueViolationError,
|
||||
*[
|
||||
ErrorType
|
||||
for _, ErrorType in inspect.getmembers(exceptions)
|
||||
@@ -416,6 +417,9 @@ class AppService(BaseAppService, ABC):
|
||||
self.fastapi_app.add_exception_handler(
|
||||
DataError, self._handle_internal_http_error(400)
|
||||
)
|
||||
self.fastapi_app.add_exception_handler(
|
||||
UniqueViolationError, self._handle_internal_http_error(400)
|
||||
)
|
||||
self.fastapi_app.add_exception_handler(
|
||||
Exception, self._handle_internal_http_error(500)
|
||||
)
|
||||
@@ -478,6 +482,7 @@ def get_service_client(
|
||||
# Don't retry these specific exceptions that won't be fixed by retrying
|
||||
ValueError, # Invalid input/parameters
|
||||
DataError, # Prisma data integrity errors (foreign key, unique constraints)
|
||||
UniqueViolationError, # Unique constraint violations
|
||||
KeyError, # Missing required data
|
||||
TypeError, # Wrong data types
|
||||
AttributeError, # Missing attributes
|
||||
|
||||
@@ -12,15 +12,8 @@ from typing import Optional
|
||||
|
||||
from prisma.errors import UniqueViolationError
|
||||
|
||||
from backend.data.workspace import (
|
||||
WorkspaceFile,
|
||||
count_workspace_files,
|
||||
create_workspace_file,
|
||||
get_workspace_file,
|
||||
get_workspace_file_by_path,
|
||||
list_workspace_files,
|
||||
soft_delete_workspace_file,
|
||||
)
|
||||
from backend.data.db_accessors import workspace_db
|
||||
from backend.data.workspace import WorkspaceFile
|
||||
from backend.util.settings import Config
|
||||
from backend.util.virus_scanner import scan_content_safe
|
||||
from backend.util.workspace_storage import compute_file_checksum, get_workspace_storage
|
||||
@@ -125,8 +118,9 @@ class WorkspaceManager:
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
"""
|
||||
db = workspace_db()
|
||||
resolved_path = self._resolve_path(path)
|
||||
file = await get_workspace_file_by_path(self.workspace_id, resolved_path)
|
||||
file = await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
|
||||
if file is None:
|
||||
raise FileNotFoundError(f"File not found at path: {resolved_path}")
|
||||
|
||||
@@ -146,7 +140,8 @@ class WorkspaceManager:
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
"""
|
||||
file = await get_workspace_file(file_id, self.workspace_id)
|
||||
db = workspace_db()
|
||||
file = await db.get_workspace_file(file_id, self.workspace_id)
|
||||
if file is None:
|
||||
raise FileNotFoundError(f"File not found: {file_id}")
|
||||
|
||||
@@ -204,8 +199,10 @@ class WorkspaceManager:
|
||||
# For overwrite=True, we let the write proceed and handle via UniqueViolationError
|
||||
# This ensures the new file is written to storage BEFORE the old one is deleted,
|
||||
# preventing data loss if the new write fails
|
||||
db = workspace_db()
|
||||
|
||||
if not overwrite:
|
||||
existing = await get_workspace_file_by_path(self.workspace_id, path)
|
||||
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
|
||||
if existing is not None:
|
||||
raise ValueError(f"File already exists at path: {path}")
|
||||
|
||||
@@ -232,7 +229,7 @@ class WorkspaceManager:
|
||||
# Create database record - handle race condition where another request
|
||||
# created a file at the same path between our check and create
|
||||
try:
|
||||
file = await create_workspace_file(
|
||||
file = await db.create_workspace_file(
|
||||
workspace_id=self.workspace_id,
|
||||
file_id=file_id,
|
||||
name=filename,
|
||||
@@ -246,12 +243,12 @@ class WorkspaceManager:
|
||||
# Race condition: another request created a file at this path
|
||||
if overwrite:
|
||||
# Re-fetch and delete the conflicting file, then retry
|
||||
existing = await get_workspace_file_by_path(self.workspace_id, path)
|
||||
existing = await db.get_workspace_file_by_path(self.workspace_id, path)
|
||||
if existing:
|
||||
await self.delete_file(existing.id)
|
||||
# Retry the create - if this also fails, clean up storage file
|
||||
try:
|
||||
file = await create_workspace_file(
|
||||
file = await db.create_workspace_file(
|
||||
workspace_id=self.workspace_id,
|
||||
file_id=file_id,
|
||||
name=filename,
|
||||
@@ -314,8 +311,9 @@ class WorkspaceManager:
|
||||
List of WorkspaceFile instances
|
||||
"""
|
||||
effective_path = self._get_effective_path(path, include_all_sessions)
|
||||
db = workspace_db()
|
||||
|
||||
return await list_workspace_files(
|
||||
return await db.list_workspace_files(
|
||||
workspace_id=self.workspace_id,
|
||||
path_prefix=effective_path,
|
||||
limit=limit,
|
||||
@@ -332,7 +330,8 @@ class WorkspaceManager:
|
||||
Returns:
|
||||
True if deleted, False if not found
|
||||
"""
|
||||
file = await get_workspace_file(file_id, self.workspace_id)
|
||||
db = workspace_db()
|
||||
file = await db.get_workspace_file(file_id, self.workspace_id)
|
||||
if file is None:
|
||||
return False
|
||||
|
||||
@@ -345,7 +344,7 @@ class WorkspaceManager:
|
||||
# Continue with database soft-delete even if storage delete fails
|
||||
|
||||
# Soft-delete database record
|
||||
result = await soft_delete_workspace_file(file_id, self.workspace_id)
|
||||
result = await db.soft_delete_workspace_file(file_id, self.workspace_id)
|
||||
return result is not None
|
||||
|
||||
async def get_download_url(self, file_id: str, expires_in: int = 3600) -> str:
|
||||
@@ -362,7 +361,8 @@ class WorkspaceManager:
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
"""
|
||||
file = await get_workspace_file(file_id, self.workspace_id)
|
||||
db = workspace_db()
|
||||
file = await db.get_workspace_file(file_id, self.workspace_id)
|
||||
if file is None:
|
||||
raise FileNotFoundError(f"File not found: {file_id}")
|
||||
|
||||
@@ -379,7 +379,8 @@ class WorkspaceManager:
|
||||
Returns:
|
||||
WorkspaceFile instance or None
|
||||
"""
|
||||
return await get_workspace_file(file_id, self.workspace_id)
|
||||
db = workspace_db()
|
||||
return await db.get_workspace_file(file_id, self.workspace_id)
|
||||
|
||||
async def get_file_info_by_path(self, path: str) -> Optional[WorkspaceFile]:
|
||||
"""
|
||||
@@ -394,8 +395,9 @@ class WorkspaceManager:
|
||||
Returns:
|
||||
WorkspaceFile instance or None
|
||||
"""
|
||||
db = workspace_db()
|
||||
resolved_path = self._resolve_path(path)
|
||||
return await get_workspace_file_by_path(self.workspace_id, resolved_path)
|
||||
return await db.get_workspace_file_by_path(self.workspace_id, resolved_path)
|
||||
|
||||
async def get_file_count(
|
||||
self,
|
||||
@@ -417,7 +419,8 @@ class WorkspaceManager:
|
||||
Number of files
|
||||
"""
|
||||
effective_path = self._get_effective_path(path, include_all_sessions)
|
||||
db = workspace_db()
|
||||
|
||||
return await count_workspace_files(
|
||||
return await db.count_workspace_files(
|
||||
self.workspace_id, path_prefix=effective_path
|
||||
)
|
||||
|
||||
@@ -93,7 +93,14 @@ class WorkspaceStorageBackend(ABC):
|
||||
|
||||
|
||||
class GCSWorkspaceStorage(WorkspaceStorageBackend):
|
||||
"""Google Cloud Storage implementation for workspace storage."""
|
||||
"""Google Cloud Storage implementation for workspace storage.
|
||||
|
||||
Each instance owns a single ``aiohttp.ClientSession`` and GCS async
|
||||
client. Because ``ClientSession`` is bound to the event loop on which it
|
||||
was created, callers that run on separate loops (e.g. copilot executor
|
||||
worker threads) **must** obtain their own ``GCSWorkspaceStorage`` instance
|
||||
via :func:`get_workspace_storage` which is event-loop-aware.
|
||||
"""
|
||||
|
||||
def __init__(self, bucket_name: str):
|
||||
self.bucket_name = bucket_name
|
||||
@@ -337,60 +344,73 @@ class LocalWorkspaceStorage(WorkspaceStorageBackend):
|
||||
raise ValueError(f"Invalid storage path format: {storage_path}")
|
||||
|
||||
|
||||
# Global storage backend instance
|
||||
_workspace_storage: Optional[WorkspaceStorageBackend] = None
|
||||
# ---------------------------------------------------------------------------
|
||||
# Storage instance management
|
||||
# ---------------------------------------------------------------------------
|
||||
# ``aiohttp.ClientSession`` is bound to the event loop where it is created.
|
||||
# The copilot executor runs each worker in its own thread with a dedicated
|
||||
# event loop, so a single global ``GCSWorkspaceStorage`` instance would break.
|
||||
#
|
||||
# For **local storage** a single shared instance is fine (no async I/O).
|
||||
# For **GCS storage** we keep one instance *per event loop* so every loop
|
||||
# gets its own ``ClientSession``.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_local_storage: Optional[LocalWorkspaceStorage] = None
|
||||
_gcs_storages: dict[int, GCSWorkspaceStorage] = {}
|
||||
_storage_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def get_workspace_storage() -> WorkspaceStorageBackend:
|
||||
"""Return a workspace storage backend for the **current** event loop.
|
||||
|
||||
* Local storage → single shared instance (no event-loop affinity).
|
||||
* GCS storage → one instance per event loop to avoid cross-loop
|
||||
``aiohttp`` errors.
|
||||
"""
|
||||
Get the workspace storage backend instance.
|
||||
global _local_storage
|
||||
|
||||
Uses GCS if media_gcs_bucket_name is configured, otherwise uses local storage.
|
||||
"""
|
||||
global _workspace_storage
|
||||
config = Config()
|
||||
|
||||
if _workspace_storage is None:
|
||||
async with _storage_lock:
|
||||
if _workspace_storage is None:
|
||||
config = Config()
|
||||
# --- Local storage (shared) ---
|
||||
if not config.media_gcs_bucket_name:
|
||||
if _local_storage is None:
|
||||
storage_dir = (
|
||||
config.workspace_storage_dir if config.workspace_storage_dir else None
|
||||
)
|
||||
logger.info(f"Using local workspace storage: {storage_dir or 'default'}")
|
||||
_local_storage = LocalWorkspaceStorage(storage_dir)
|
||||
return _local_storage
|
||||
|
||||
if config.media_gcs_bucket_name:
|
||||
logger.info(
|
||||
f"Using GCS workspace storage: {config.media_gcs_bucket_name}"
|
||||
)
|
||||
_workspace_storage = GCSWorkspaceStorage(
|
||||
config.media_gcs_bucket_name
|
||||
)
|
||||
else:
|
||||
storage_dir = (
|
||||
config.workspace_storage_dir
|
||||
if config.workspace_storage_dir
|
||||
else None
|
||||
)
|
||||
logger.info(
|
||||
f"Using local workspace storage: {storage_dir or 'default'}"
|
||||
)
|
||||
_workspace_storage = LocalWorkspaceStorage(storage_dir)
|
||||
|
||||
return _workspace_storage
|
||||
# --- GCS storage (per event loop) ---
|
||||
loop_id = id(asyncio.get_running_loop())
|
||||
if loop_id not in _gcs_storages:
|
||||
logger.info(
|
||||
f"Creating GCS workspace storage for loop {loop_id}: "
|
||||
f"{config.media_gcs_bucket_name}"
|
||||
)
|
||||
_gcs_storages[loop_id] = GCSWorkspaceStorage(config.media_gcs_bucket_name)
|
||||
return _gcs_storages[loop_id]
|
||||
|
||||
|
||||
async def shutdown_workspace_storage() -> None:
|
||||
"""
|
||||
Properly shutdown the global workspace storage backend.
|
||||
"""Shut down workspace storage for the **current** event loop.
|
||||
|
||||
Closes aiohttp sessions and other resources for GCS backend.
|
||||
Should be called during application shutdown.
|
||||
Closes the ``aiohttp`` session owned by the current loop's GCS instance.
|
||||
Each worker thread should call this on its own loop before the loop is
|
||||
destroyed. The REST API lifespan hook calls it for the main server loop.
|
||||
"""
|
||||
global _workspace_storage
|
||||
global _local_storage
|
||||
|
||||
if _workspace_storage is not None:
|
||||
async with _storage_lock:
|
||||
if _workspace_storage is not None:
|
||||
if isinstance(_workspace_storage, GCSWorkspaceStorage):
|
||||
await _workspace_storage.close()
|
||||
_workspace_storage = None
|
||||
loop_id = id(asyncio.get_running_loop())
|
||||
storage = _gcs_storages.pop(loop_id, None)
|
||||
if storage is not None:
|
||||
await storage.close()
|
||||
|
||||
# Clear local storage only when the last GCS instance is gone
|
||||
# (i.e. full shutdown, not just a single worker stopping).
|
||||
if not _gcs_storages:
|
||||
_local_storage = None
|
||||
|
||||
|
||||
def compute_file_checksum(content: bytes) -> str:
|
||||
|
||||
@@ -169,7 +169,10 @@ export const ChatMessagesContainer = ({
|
||||
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
|
||||
{headerSlot}
|
||||
{isLoading && messages.length === 0 && (
|
||||
<div className="flex min-h-full flex-1 items-center justify-center">
|
||||
<div
|
||||
className="flex flex-1 items-center justify-center"
|
||||
style={{ minHeight: "calc(100vh - 12rem)" }}
|
||||
>
|
||||
<LoadingSpinner className="text-neutral-600" />
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -26,7 +26,6 @@ import {
|
||||
} from "./components/ClarificationQuestionsCard";
|
||||
import sparklesImg from "./components/MiniGame/assets/sparkles.png";
|
||||
import { MiniGame } from "./components/MiniGame/MiniGame";
|
||||
import { SuggestedGoalCard } from "./components/SuggestedGoalCard";
|
||||
import {
|
||||
AccordionIcon,
|
||||
formatMaybeJson,
|
||||
@@ -39,7 +38,6 @@ import {
|
||||
isOperationInProgressOutput,
|
||||
isOperationPendingOutput,
|
||||
isOperationStartedOutput,
|
||||
isSuggestedGoalOutput,
|
||||
ToolIcon,
|
||||
truncateText,
|
||||
type CreateAgentToolOutput,
|
||||
@@ -79,13 +77,6 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
|
||||
expanded: true,
|
||||
};
|
||||
}
|
||||
if (isSuggestedGoalOutput(output)) {
|
||||
return {
|
||||
icon,
|
||||
title: "Goal needs refinement",
|
||||
expanded: true,
|
||||
};
|
||||
}
|
||||
if (
|
||||
isOperationStartedOutput(output) ||
|
||||
isOperationPendingOutput(output) ||
|
||||
@@ -134,13 +125,8 @@ export function CreateAgentTool({ part }: Props) {
|
||||
isAgentPreviewOutput(output) ||
|
||||
isAgentSavedOutput(output) ||
|
||||
isClarificationNeededOutput(output) ||
|
||||
isSuggestedGoalOutput(output) ||
|
||||
isErrorOutput(output));
|
||||
|
||||
function handleUseSuggestedGoal(goal: string) {
|
||||
onSend(`Please create an agent with this goal: ${goal}`);
|
||||
}
|
||||
|
||||
function handleClarificationAnswers(answers: Record<string, string>) {
|
||||
const questions =
|
||||
output && isClarificationNeededOutput(output)
|
||||
@@ -259,15 +245,6 @@ export function CreateAgentTool({ part }: Props) {
|
||||
/>
|
||||
)}
|
||||
|
||||
{isSuggestedGoalOutput(output) && (
|
||||
<SuggestedGoalCard
|
||||
message={output.message}
|
||||
suggestedGoal={output.suggested_goal}
|
||||
goalType={output.goal_type ?? "vague"}
|
||||
onUseSuggestedGoal={handleUseSuggestedGoal}
|
||||
/>
|
||||
)}
|
||||
|
||||
{isErrorOutput(output) && (
|
||||
<ContentGrid>
|
||||
<ContentMessage>{output.message}</ContentMessage>
|
||||
@@ -281,22 +258,6 @@ export function CreateAgentTool({ part }: Props) {
|
||||
{formatMaybeJson(output.details)}
|
||||
</ContentCodeBlock>
|
||||
)}
|
||||
<div className="flex gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="small"
|
||||
onClick={() => onSend("Please try creating the agent again.")}
|
||||
>
|
||||
Try again
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="small"
|
||||
onClick={() => onSend("Can you help me simplify this goal?")}
|
||||
>
|
||||
Simplify goal
|
||||
</Button>
|
||||
</div>
|
||||
</ContentGrid>
|
||||
)}
|
||||
</ToolAccordion>
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
"use client";
|
||||
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { Text } from "@/components/atoms/Text/Text";
|
||||
import { ArrowRightIcon, LightbulbIcon } from "@phosphor-icons/react";
|
||||
|
||||
interface Props {
|
||||
message: string;
|
||||
suggestedGoal: string;
|
||||
goalType: string;
|
||||
onUseSuggestedGoal: (goal: string) => void;
|
||||
}
|
||||
|
||||
export function SuggestedGoalCard({
|
||||
message,
|
||||
suggestedGoal,
|
||||
goalType,
|
||||
onUseSuggestedGoal,
|
||||
}: Props) {
|
||||
return (
|
||||
<div className="rounded-xl border border-amber-200 bg-amber-50/50 p-4">
|
||||
<div className="flex items-start gap-3">
|
||||
<LightbulbIcon
|
||||
size={20}
|
||||
weight="fill"
|
||||
className="mt-0.5 text-amber-600"
|
||||
/>
|
||||
<div className="flex-1 space-y-3">
|
||||
<div>
|
||||
<Text variant="body-medium" className="font-medium text-slate-900">
|
||||
{goalType === "unachievable"
|
||||
? "Goal cannot be accomplished"
|
||||
: "Goal needs more detail"}
|
||||
</Text>
|
||||
<Text variant="small" className="text-slate-600">
|
||||
{message}
|
||||
</Text>
|
||||
</div>
|
||||
|
||||
<div className="rounded-lg border border-amber-300 bg-white p-3">
|
||||
<Text variant="small" className="mb-1 font-semibold text-amber-800">
|
||||
Suggested alternative:
|
||||
</Text>
|
||||
<Text variant="body-medium" className="text-slate-900">
|
||||
{suggestedGoal}
|
||||
</Text>
|
||||
</div>
|
||||
|
||||
<Button
|
||||
onClick={() => onUseSuggestedGoal(suggestedGoal)}
|
||||
variant="primary"
|
||||
>
|
||||
<span className="inline-flex items-center gap-1.5">
|
||||
Use this goal <ArrowRightIcon size={14} weight="bold" />
|
||||
</span>
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import type { OperationInProgressResponse } from "@/app/api/__generated__/models
|
||||
import type { OperationPendingResponse } from "@/app/api/__generated__/models/operationPendingResponse";
|
||||
import type { OperationStartedResponse } from "@/app/api/__generated__/models/operationStartedResponse";
|
||||
import { ResponseType } from "@/app/api/__generated__/models/responseType";
|
||||
import type { SuggestedGoalResponse } from "@/app/api/__generated__/models/suggestedGoalResponse";
|
||||
import {
|
||||
PlusCircleIcon,
|
||||
PlusIcon,
|
||||
@@ -22,7 +21,6 @@ export type CreateAgentToolOutput =
|
||||
| AgentPreviewResponse
|
||||
| AgentSavedResponse
|
||||
| ClarificationNeededResponse
|
||||
| SuggestedGoalResponse
|
||||
| ErrorResponse;
|
||||
|
||||
function parseOutput(output: unknown): CreateAgentToolOutput | null {
|
||||
@@ -45,7 +43,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
|
||||
type === ResponseType.agent_preview ||
|
||||
type === ResponseType.agent_saved ||
|
||||
type === ResponseType.clarification_needed ||
|
||||
type === ResponseType.suggested_goal ||
|
||||
type === ResponseType.error
|
||||
) {
|
||||
return output as CreateAgentToolOutput;
|
||||
@@ -58,7 +55,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
|
||||
if ("agent_id" in output && "library_agent_id" in output)
|
||||
return output as AgentSavedResponse;
|
||||
if ("questions" in output) return output as ClarificationNeededResponse;
|
||||
if ("suggested_goal" in output) return output as SuggestedGoalResponse;
|
||||
if ("error" in output || "details" in output)
|
||||
return output as ErrorResponse;
|
||||
}
|
||||
@@ -118,14 +114,6 @@ export function isClarificationNeededOutput(
|
||||
);
|
||||
}
|
||||
|
||||
export function isSuggestedGoalOutput(
|
||||
output: CreateAgentToolOutput,
|
||||
): output is SuggestedGoalResponse {
|
||||
return (
|
||||
output.type === ResponseType.suggested_goal || "suggested_goal" in output
|
||||
);
|
||||
}
|
||||
|
||||
export function isErrorOutput(
|
||||
output: CreateAgentToolOutput,
|
||||
): output is ErrorResponse {
|
||||
@@ -151,7 +139,6 @@ export function getAnimationText(part: {
|
||||
if (isAgentSavedOutput(output)) return `Saved ${output.agent_name}`;
|
||||
if (isAgentPreviewOutput(output)) return `Preview "${output.agent_name}"`;
|
||||
if (isClarificationNeededOutput(output)) return "Needs clarification";
|
||||
if (isSuggestedGoalOutput(output)) return "Goal needs refinement";
|
||||
return "Error creating agent";
|
||||
}
|
||||
case "output-error":
|
||||
|
||||
@@ -1052,7 +1052,6 @@
|
||||
{
|
||||
"$ref": "#/components/schemas/ClarificationNeededResponse"
|
||||
},
|
||||
{ "$ref": "#/components/schemas/SuggestedGoalResponse" },
|
||||
{ "$ref": "#/components/schemas/BlockListResponse" },
|
||||
{ "$ref": "#/components/schemas/BlockDetailsResponse" },
|
||||
{ "$ref": "#/components/schemas/BlockOutputResponse" },
|
||||
@@ -10797,8 +10796,7 @@
|
||||
"bash_exec",
|
||||
"operation_status",
|
||||
"feature_request_search",
|
||||
"feature_request_created",
|
||||
"suggested_goal"
|
||||
"feature_request_created"
|
||||
],
|
||||
"title": "ResponseType",
|
||||
"description": "Types of tool responses."
|
||||
@@ -11679,46 +11677,6 @@
|
||||
"enum": ["DRAFT", "PENDING", "APPROVED", "REJECTED"],
|
||||
"title": "SubmissionStatus"
|
||||
},
|
||||
"SuggestedGoalResponse": {
|
||||
"properties": {
|
||||
"type": {
|
||||
"$ref": "#/components/schemas/ResponseType",
|
||||
"default": "suggested_goal"
|
||||
},
|
||||
"message": { "type": "string", "title": "Message" },
|
||||
"session_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Session Id"
|
||||
},
|
||||
"suggested_goal": {
|
||||
"type": "string",
|
||||
"title": "Suggested Goal",
|
||||
"description": "The suggested alternative goal"
|
||||
},
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"title": "Reason",
|
||||
"description": "Why the original goal needs refinement",
|
||||
"default": ""
|
||||
},
|
||||
"original_goal": {
|
||||
"type": "string",
|
||||
"title": "Original Goal",
|
||||
"description": "The user's original goal for context",
|
||||
"default": ""
|
||||
},
|
||||
"goal_type": {
|
||||
"type": "string",
|
||||
"title": "Goal Type",
|
||||
"description": "Type: 'vague' or 'unachievable'",
|
||||
"default": "vague"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["message", "suggested_goal"],
|
||||
"title": "SuggestedGoalResponse",
|
||||
"description": "Response when the goal needs refinement with a suggested alternative."
|
||||
},
|
||||
"SuggestionsResponse": {
|
||||
"properties": {
|
||||
"otto_suggestions": {
|
||||
|
||||
Reference in New Issue
Block a user