mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-03-17 03:00:27 -04:00
Compare commits
1 Commits
hotfix/tra
...
claude/fil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9ee5309c1 |
@@ -79,6 +79,7 @@ class StreamChatRequest(BaseModel):
|
||||
message: str
|
||||
is_user_message: bool = True
|
||||
context: dict[str, str] | None = None # {url: str, content: str}
|
||||
file_ids: list[str] | None = None # workspace file IDs attached to this message
|
||||
|
||||
|
||||
class CreateSessionResponse(BaseModel):
|
||||
@@ -445,6 +446,7 @@ async def stream_chat_post(
|
||||
turn_id=turn_id,
|
||||
is_user_message=request.is_user_message,
|
||||
context=request.context,
|
||||
file_ids=request.file_ids,
|
||||
)
|
||||
|
||||
setup_time = (time.perf_counter() - stream_start_time) * 1000
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
"""
|
||||
Workspace API routes for managing user file storage.
|
||||
|
||||
Provides endpoints for uploading, downloading, listing, and deleting files
|
||||
in a user's workspace. Uploads are session-scoped by default and support
|
||||
per-user storage quotas.
|
||||
"""
|
||||
|
||||
import logging
|
||||
@@ -8,12 +12,52 @@ from typing import Annotated
|
||||
from urllib.parse import quote
|
||||
|
||||
import fastapi
|
||||
import pydantic
|
||||
from autogpt_libs.auth.dependencies import get_user_id, requires_user
|
||||
from fastapi.responses import Response
|
||||
|
||||
from backend.data.workspace import WorkspaceFile, get_workspace, get_workspace_file
|
||||
from backend.util.settings import Config
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
from backend.util.workspace_storage import get_workspace_storage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------- Allowed MIME types for user uploads ----------
|
||||
# Phase 1: text, PDF, CSV/spreadsheets, images. Video/audio is future.
|
||||
ALLOWED_UPLOAD_MIME_TYPES: set[str] = {
|
||||
# Text
|
||||
"text/plain",
|
||||
"text/markdown",
|
||||
"text/csv",
|
||||
"text/html",
|
||||
"text/xml",
|
||||
"application/json",
|
||||
"application/xml",
|
||||
# PDF
|
||||
"application/pdf",
|
||||
# Spreadsheets
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # .xlsx
|
||||
"application/vnd.ms-excel", # .xls
|
||||
# Documents
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # .docx
|
||||
"application/msword", # .doc
|
||||
# Images
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/gif",
|
||||
"image/webp",
|
||||
"image/svg+xml",
|
||||
}
|
||||
|
||||
# Extensions allowed when MIME is generic application/octet-stream
|
||||
_EXTENSION_ALLOWLIST: set[str] = {
|
||||
".txt", ".md", ".csv", ".json", ".xml", ".html",
|
||||
".pdf",
|
||||
".xlsx", ".xls", ".docx", ".doc",
|
||||
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg",
|
||||
}
|
||||
|
||||
|
||||
def _sanitize_filename_for_header(filename: str) -> str:
|
||||
"""
|
||||
@@ -37,13 +81,63 @@ def _sanitize_filename_for_header(filename: str) -> str:
|
||||
return f"attachment; filename*=UTF-8''{encoded}"
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
def _is_allowed_upload(content_type: str | None, filename: str) -> bool:
|
||||
"""Check whether a file's MIME type or extension is permitted for upload."""
|
||||
if content_type and content_type in ALLOWED_UPLOAD_MIME_TYPES:
|
||||
return True
|
||||
# Fall back to extension check (handles octet-stream or missing MIME)
|
||||
ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
||||
return ext in _EXTENSION_ALLOWLIST
|
||||
|
||||
|
||||
# ---------- Response models ----------
|
||||
|
||||
class FileInfoResponse(pydantic.BaseModel):
|
||||
file_id: str
|
||||
name: str
|
||||
path: str
|
||||
mime_type: str
|
||||
size_bytes: int
|
||||
created_at: str
|
||||
|
||||
|
||||
class UploadFileResponse(pydantic.BaseModel):
|
||||
file_id: str
|
||||
name: str
|
||||
path: str
|
||||
mime_type: str
|
||||
size_bytes: int
|
||||
|
||||
|
||||
class ListFilesResponse(pydantic.BaseModel):
|
||||
files: list[FileInfoResponse]
|
||||
total_count: int
|
||||
|
||||
|
||||
class StorageUsageResponse(pydantic.BaseModel):
|
||||
used_bytes: int
|
||||
quota_bytes: int
|
||||
used_pct: float
|
||||
file_count: int
|
||||
|
||||
|
||||
# ---------- Router ----------
|
||||
|
||||
router = fastapi.APIRouter(
|
||||
dependencies=[fastapi.Security(requires_user)],
|
||||
)
|
||||
|
||||
|
||||
# ---------- Helpers ----------
|
||||
|
||||
async def _get_manager(user_id: str, session_id: str | None = None) -> WorkspaceManager:
|
||||
"""Build a WorkspaceManager, creating the workspace if needed."""
|
||||
from backend.data.db_accessors import workspace_db
|
||||
|
||||
workspace = await workspace_db().get_or_create_workspace(user_id)
|
||||
return WorkspaceManager(user_id, workspace.id, session_id)
|
||||
|
||||
|
||||
def _create_streaming_response(content: bytes, file: WorkspaceFile) -> Response:
|
||||
"""Create a streaming response for file content."""
|
||||
return Response(
|
||||
@@ -98,6 +192,174 @@ async def _create_file_download_response(file: WorkspaceFile) -> Response:
|
||||
raise
|
||||
|
||||
|
||||
# ---------- Endpoints ----------
|
||||
|
||||
|
||||
@router.post(
|
||||
"/files/upload",
|
||||
summary="Upload a file to the workspace",
|
||||
status_code=201,
|
||||
)
|
||||
async def upload_file(
|
||||
user_id: Annotated[str, fastapi.Security(get_user_id)],
|
||||
session_id: str = fastapi.Form(..., description="Chat session ID to scope this upload"),
|
||||
file: fastapi.UploadFile = fastapi.File(...),
|
||||
) -> UploadFileResponse:
|
||||
"""
|
||||
Upload a file to the user's workspace, scoped to a chat session.
|
||||
|
||||
The file is stored under ``/sessions/{session_id}/uploads/{filename}``
|
||||
and is immediately accessible to the CoPilot agent for that session.
|
||||
Files persist across runs within the same thread.
|
||||
|
||||
Enforces per-user storage quotas and file-type restrictions.
|
||||
"""
|
||||
filename = file.filename or "untitled"
|
||||
|
||||
# Validate file type
|
||||
if not _is_allowed_upload(file.content_type, filename):
|
||||
raise fastapi.HTTPException(
|
||||
status_code=415,
|
||||
detail=(
|
||||
f"File type not supported: {file.content_type or 'unknown'}. "
|
||||
"Allowed types: text, PDF, CSV/spreadsheets, images."
|
||||
),
|
||||
)
|
||||
|
||||
# Read content (enforcing max size during read)
|
||||
config = Config()
|
||||
max_size = config.max_file_size_mb * 1024 * 1024
|
||||
content = await file.read()
|
||||
if len(content) > max_size:
|
||||
raise fastapi.HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large ({len(content)} bytes). "
|
||||
f"Maximum size is {config.max_file_size_mb} MB.",
|
||||
)
|
||||
|
||||
manager = await _get_manager(user_id, session_id)
|
||||
|
||||
# Quota check
|
||||
within_quota, current_usage, quota_bytes = await manager.check_quota(len(content))
|
||||
if not within_quota:
|
||||
logger.warning(
|
||||
f"User {user_id} hit storage quota: "
|
||||
f"{current_usage / 1024 / 1024:.1f} MB / {quota_bytes / 1024 / 1024:.1f} MB"
|
||||
)
|
||||
raise fastapi.HTTPException(
|
||||
status_code=413,
|
||||
detail=(
|
||||
f"Storage quota exceeded. Using {current_usage / 1024 / 1024:.1f} MB "
|
||||
f"of {quota_bytes / 1024 / 1024:.1f} MB. "
|
||||
"Please delete some files and try again."
|
||||
),
|
||||
)
|
||||
|
||||
# Write file via WorkspaceManager (handles storage, DB, virus scan, checksums)
|
||||
# Path: /sessions/{session_id}/uploads/{filename}
|
||||
upload_path = f"/uploads/{filename}"
|
||||
try:
|
||||
workspace_file = await manager.write_file(
|
||||
content=content,
|
||||
filename=filename,
|
||||
path=upload_path,
|
||||
mime_type=file.content_type,
|
||||
overwrite=True,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise fastapi.HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
logger.info(
|
||||
f"User {user_id} uploaded {filename} ({len(content)} bytes) "
|
||||
f"to session {session_id}"
|
||||
)
|
||||
|
||||
return UploadFileResponse(
|
||||
file_id=workspace_file.id,
|
||||
name=workspace_file.name,
|
||||
path=workspace_file.path,
|
||||
mime_type=workspace_file.mime_type,
|
||||
size_bytes=workspace_file.size_bytes,
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/files",
|
||||
summary="List files in workspace",
|
||||
)
|
||||
async def list_files(
|
||||
user_id: Annotated[str, fastapi.Security(get_user_id)],
|
||||
session_id: str | None = fastapi.Query(
|
||||
default=None, description="Scope to files in this session"
|
||||
),
|
||||
limit: int = fastapi.Query(default=50, ge=1, le=100),
|
||||
offset: int = fastapi.Query(default=0, ge=0),
|
||||
) -> ListFilesResponse:
|
||||
"""
|
||||
List files in the user's workspace.
|
||||
|
||||
If ``session_id`` is provided, only files in that session are returned.
|
||||
Otherwise, all workspace files are listed.
|
||||
"""
|
||||
manager = await _get_manager(user_id, session_id)
|
||||
files = await manager.list_files(
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
include_all_sessions=session_id is None,
|
||||
)
|
||||
total = await manager.get_file_count(include_all_sessions=session_id is None)
|
||||
|
||||
return ListFilesResponse(
|
||||
files=[
|
||||
FileInfoResponse(
|
||||
file_id=f.id,
|
||||
name=f.name,
|
||||
path=f.path,
|
||||
mime_type=f.mime_type,
|
||||
size_bytes=f.size_bytes,
|
||||
created_at=f.created_at.isoformat(),
|
||||
)
|
||||
for f in files
|
||||
],
|
||||
total_count=total,
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/usage",
|
||||
summary="Get workspace storage usage and quota",
|
||||
)
|
||||
async def get_usage(
|
||||
user_id: Annotated[str, fastapi.Security(get_user_id)],
|
||||
) -> StorageUsageResponse:
|
||||
"""
|
||||
Get the user's workspace storage usage vs their quota.
|
||||
|
||||
Useful for showing remaining capacity in the UI and triggering
|
||||
alerts when approaching the limit.
|
||||
"""
|
||||
manager = await _get_manager(user_id)
|
||||
current_usage = await manager.get_total_usage_bytes()
|
||||
config = Config()
|
||||
quota_bytes = config.user_storage_quota_mb * 1024 * 1024
|
||||
file_count = await manager.get_file_count(include_all_sessions=True)
|
||||
|
||||
pct = (current_usage / quota_bytes * 100) if quota_bytes > 0 else 0.0
|
||||
|
||||
if pct >= 90:
|
||||
logger.warning(
|
||||
f"User {user_id} storage usage at {pct:.1f}%: "
|
||||
f"{current_usage / 1024 / 1024:.1f} MB / {quota_bytes / 1024 / 1024:.1f} MB"
|
||||
)
|
||||
|
||||
return StorageUsageResponse(
|
||||
used_bytes=current_usage,
|
||||
quota_bytes=quota_bytes,
|
||||
used_pct=round(pct, 2),
|
||||
file_count=file_count,
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/files/{file_id}/download",
|
||||
summary="Download file by ID",
|
||||
@@ -120,3 +382,26 @@ async def download_file(
|
||||
raise fastapi.HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
return await _create_file_download_response(file)
|
||||
|
||||
|
||||
@router.delete(
|
||||
"/files/{file_id}",
|
||||
summary="Delete a file from workspace",
|
||||
status_code=204,
|
||||
)
|
||||
async def delete_file(
|
||||
user_id: Annotated[str, fastapi.Security(get_user_id)],
|
||||
file_id: str,
|
||||
) -> Response:
|
||||
"""
|
||||
Soft-delete a file from the user's workspace.
|
||||
|
||||
The storage is reclaimed and the file is no longer accessible.
|
||||
"""
|
||||
manager = await _get_manager(user_id)
|
||||
deleted = await manager.delete_file(file_id)
|
||||
if not deleted:
|
||||
raise fastapi.HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
logger.info(f"User {user_id} deleted file {file_id}")
|
||||
return Response(status_code=204)
|
||||
|
||||
@@ -23,9 +23,46 @@ from backend.util.retry import func_retry
|
||||
|
||||
from .utils import CoPilotExecutionEntry, CoPilotLogMetadata
|
||||
|
||||
logger_mod = logging.getLogger(__name__)
|
||||
|
||||
logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]")
|
||||
|
||||
|
||||
async def _build_file_attachment_context(
|
||||
user_id: str,
|
||||
session_id: str,
|
||||
file_ids: list[str],
|
||||
) -> str:
|
||||
"""Build a context string describing workspace files attached by the user.
|
||||
|
||||
Fetches metadata for each file_id and returns a structured text block
|
||||
that the agent can use to understand what files are available and how to
|
||||
access them via the ``read_workspace_file`` tool.
|
||||
"""
|
||||
from backend.data.db_accessors import workspace_db
|
||||
|
||||
db = workspace_db()
|
||||
workspace = await db.get_or_create_workspace(user_id)
|
||||
|
||||
lines: list[str] = ["[Attached files]"]
|
||||
for fid in file_ids:
|
||||
info = await db.get_workspace_file(fid, workspace.id)
|
||||
if info is None:
|
||||
lines.append(f"- {fid}: (not found)")
|
||||
continue
|
||||
lines.append(
|
||||
f"- {info.name} (id={info.id}, path={info.path}, "
|
||||
f"type={info.mime_type}, size={info.size_bytes:,} bytes)"
|
||||
)
|
||||
|
||||
lines.append(
|
||||
"\nUse the read_workspace_file tool with the file_id above to read "
|
||||
"file contents. For large or binary files, use save_to_path to copy "
|
||||
"them to the working directory first."
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ============ Module Entry Points ============ #
|
||||
|
||||
# Thread-local storage for processor instances
|
||||
@@ -235,10 +272,24 @@ class CoPilotProcessor:
|
||||
)
|
||||
log.info(f"Using {'SDK' if use_sdk else 'standard'} service")
|
||||
|
||||
# If the user attached files, build context and append to message
|
||||
message = entry.message if entry.message else None
|
||||
if message and entry.file_ids and entry.user_id:
|
||||
try:
|
||||
file_context = await _build_file_attachment_context(
|
||||
entry.user_id, entry.session_id, entry.file_ids
|
||||
)
|
||||
message = f"{message}\n\n{file_context}"
|
||||
log.info(
|
||||
f"Injected {len(entry.file_ids)} file attachment(s) into message"
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Failed to build file attachment context: {e}")
|
||||
|
||||
# Stream chat completion and publish chunks to Redis.
|
||||
async for chunk in stream_fn(
|
||||
session_id=entry.session_id,
|
||||
message=entry.message if entry.message else None,
|
||||
message=message,
|
||||
is_user_message=entry.is_user_message,
|
||||
user_id=entry.user_id,
|
||||
context=entry.context,
|
||||
|
||||
@@ -153,6 +153,9 @@ class CoPilotExecutionEntry(BaseModel):
|
||||
context: dict[str, str] | None = None
|
||||
"""Optional context for the message (e.g., {url: str, content: str})"""
|
||||
|
||||
file_ids: list[str] | None = None
|
||||
"""Workspace file IDs attached to this message by the user"""
|
||||
|
||||
|
||||
class CancelCoPilotEvent(BaseModel):
|
||||
"""Event to cancel a CoPilot operation."""
|
||||
@@ -171,6 +174,7 @@ async def enqueue_copilot_turn(
|
||||
turn_id: str,
|
||||
is_user_message: bool = True,
|
||||
context: dict[str, str] | None = None,
|
||||
file_ids: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Enqueue a CoPilot task for processing by the executor service.
|
||||
|
||||
@@ -181,6 +185,7 @@ async def enqueue_copilot_turn(
|
||||
turn_id: Per-turn UUID for Redis stream isolation
|
||||
is_user_message: Whether the message is from the user (vs system/assistant)
|
||||
context: Optional context for the message (e.g., {url: str, content: str})
|
||||
file_ids: Optional list of workspace file IDs attached by the user
|
||||
"""
|
||||
from backend.util.clients import get_async_copilot_queue
|
||||
|
||||
@@ -191,6 +196,7 @@ async def enqueue_copilot_turn(
|
||||
message=message,
|
||||
is_user_message=is_user_message,
|
||||
context=context,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
queue_client = await get_async_copilot_queue()
|
||||
|
||||
@@ -413,6 +413,15 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="Maximum file size in MB for workspace files (1-1024 MB)",
|
||||
)
|
||||
|
||||
user_storage_quota_mb: int = Field(
|
||||
default=1024,
|
||||
ge=1,
|
||||
le=102400,
|
||||
description="Per-user workspace storage quota in MB (default 1 GB). "
|
||||
"Users are blocked from uploading once their total workspace usage "
|
||||
"exceeds this limit.",
|
||||
)
|
||||
|
||||
# AutoMod configuration
|
||||
automod_enabled: bool = Field(
|
||||
default=False,
|
||||
|
||||
@@ -424,3 +424,27 @@ class WorkspaceManager:
|
||||
return await db.count_workspace_files(
|
||||
self.workspace_id, path_prefix=effective_path
|
||||
)
|
||||
|
||||
async def get_total_usage_bytes(self) -> int:
|
||||
"""
|
||||
Get total storage usage for this workspace across all sessions.
|
||||
|
||||
Returns:
|
||||
Total size in bytes
|
||||
"""
|
||||
db = workspace_db()
|
||||
return await db.get_workspace_total_size(self.workspace_id)
|
||||
|
||||
async def check_quota(self, additional_bytes: int = 0) -> tuple[bool, int, int]:
|
||||
"""
|
||||
Check if the workspace is within storage quota.
|
||||
|
||||
Args:
|
||||
additional_bytes: Size of a pending upload to include in the check
|
||||
|
||||
Returns:
|
||||
Tuple of (within_quota, current_usage_bytes, quota_bytes)
|
||||
"""
|
||||
current_usage = await self.get_total_usage_bytes()
|
||||
quota_bytes = Config().user_storage_quota_mb * 1024 * 1024
|
||||
return (current_usage + additional_bytes) <= quota_bytes, current_usage, quota_bytes
|
||||
|
||||
Reference in New Issue
Block a user