Compare commits

...

1 Commits

Author SHA1 Message Date
Claude
f9ee5309c1 feat(backend): add file upload to workspace with per-user quota
Implement backend support for user file uploads to the workspace,
enabling CoPilot to read files users attach in chat. Files are
session-scoped and persisted for future runs in the same thread.

New endpoints on /api/workspace:
- POST /files/upload — multipart upload with session_id, type/size
  validation, virus scanning, and per-user storage quota enforcement
- GET /files — list files (optionally scoped to a session)
- GET /usage — storage usage vs quota (for UI alerts)
- DELETE /files/{file_id} — soft-delete a file

Pipeline changes:
- StreamChatRequest and CoPilotExecutionEntry carry file_ids
- Executor processor injects file metadata context into the agent
  message so CoPilot can read uploaded files via workspace tools

Config:
- user_storage_quota_mb setting (default 1 GB, configurable per deploy)

https://claude.ai/code/session_01XHoK7ujoE3J5okZtTqKgnE
2026-02-26 14:23:51 +00:00
6 changed files with 379 additions and 2 deletions

View File

@@ -79,6 +79,7 @@ class StreamChatRequest(BaseModel):
message: str
is_user_message: bool = True
context: dict[str, str] | None = None # {url: str, content: str}
file_ids: list[str] | None = None # workspace file IDs attached to this message
class CreateSessionResponse(BaseModel):
@@ -445,6 +446,7 @@ async def stream_chat_post(
turn_id=turn_id,
is_user_message=request.is_user_message,
context=request.context,
file_ids=request.file_ids,
)
setup_time = (time.perf_counter() - stream_start_time) * 1000

View File

@@ -1,5 +1,9 @@
"""
Workspace API routes for managing user file storage.
Provides endpoints for uploading, downloading, listing, and deleting files
in a user's workspace. Uploads are session-scoped by default and support
per-user storage quotas.
"""
import logging
@@ -8,12 +12,52 @@ from typing import Annotated
from urllib.parse import quote
import fastapi
import pydantic
from autogpt_libs.auth.dependencies import get_user_id, requires_user
from fastapi.responses import Response
from backend.data.workspace import WorkspaceFile, get_workspace, get_workspace_file
from backend.util.settings import Config
from backend.util.workspace import WorkspaceManager
from backend.util.workspace_storage import get_workspace_storage
logger = logging.getLogger(__name__)
# ---------- Allowed MIME types for user uploads ----------
# Phase 1: text, PDF, CSV/spreadsheets, images. Video/audio is future.
ALLOWED_UPLOAD_MIME_TYPES: set[str] = {
# Text
"text/plain",
"text/markdown",
"text/csv",
"text/html",
"text/xml",
"application/json",
"application/xml",
# PDF
"application/pdf",
# Spreadsheets
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # .xlsx
"application/vnd.ms-excel", # .xls
# Documents
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # .docx
"application/msword", # .doc
# Images
"image/png",
"image/jpeg",
"image/gif",
"image/webp",
"image/svg+xml",
}
# Extensions allowed when MIME is generic application/octet-stream
_EXTENSION_ALLOWLIST: set[str] = {
".txt", ".md", ".csv", ".json", ".xml", ".html",
".pdf",
".xlsx", ".xls", ".docx", ".doc",
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg",
}
def _sanitize_filename_for_header(filename: str) -> str:
"""
@@ -37,13 +81,63 @@ def _sanitize_filename_for_header(filename: str) -> str:
return f"attachment; filename*=UTF-8''{encoded}"
logger = logging.getLogger(__name__)
def _is_allowed_upload(content_type: str | None, filename: str) -> bool:
"""Check whether a file's MIME type or extension is permitted for upload."""
if content_type and content_type in ALLOWED_UPLOAD_MIME_TYPES:
return True
# Fall back to extension check (handles octet-stream or missing MIME)
ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
return ext in _EXTENSION_ALLOWLIST
# ---------- Response models ----------
class FileInfoResponse(pydantic.BaseModel):
file_id: str
name: str
path: str
mime_type: str
size_bytes: int
created_at: str
class UploadFileResponse(pydantic.BaseModel):
file_id: str
name: str
path: str
mime_type: str
size_bytes: int
class ListFilesResponse(pydantic.BaseModel):
files: list[FileInfoResponse]
total_count: int
class StorageUsageResponse(pydantic.BaseModel):
used_bytes: int
quota_bytes: int
used_pct: float
file_count: int
# ---------- Router ----------
router = fastapi.APIRouter(
dependencies=[fastapi.Security(requires_user)],
)
# ---------- Helpers ----------
async def _get_manager(user_id: str, session_id: str | None = None) -> WorkspaceManager:
"""Build a WorkspaceManager, creating the workspace if needed."""
from backend.data.db_accessors import workspace_db
workspace = await workspace_db().get_or_create_workspace(user_id)
return WorkspaceManager(user_id, workspace.id, session_id)
def _create_streaming_response(content: bytes, file: WorkspaceFile) -> Response:
"""Create a streaming response for file content."""
return Response(
@@ -98,6 +192,174 @@ async def _create_file_download_response(file: WorkspaceFile) -> Response:
raise
# ---------- Endpoints ----------
@router.post(
"/files/upload",
summary="Upload a file to the workspace",
status_code=201,
)
async def upload_file(
user_id: Annotated[str, fastapi.Security(get_user_id)],
session_id: str = fastapi.Form(..., description="Chat session ID to scope this upload"),
file: fastapi.UploadFile = fastapi.File(...),
) -> UploadFileResponse:
"""
Upload a file to the user's workspace, scoped to a chat session.
The file is stored under ``/sessions/{session_id}/uploads/{filename}``
and is immediately accessible to the CoPilot agent for that session.
Files persist across runs within the same thread.
Enforces per-user storage quotas and file-type restrictions.
"""
filename = file.filename or "untitled"
# Validate file type
if not _is_allowed_upload(file.content_type, filename):
raise fastapi.HTTPException(
status_code=415,
detail=(
f"File type not supported: {file.content_type or 'unknown'}. "
"Allowed types: text, PDF, CSV/spreadsheets, images."
),
)
# Read content (enforcing max size during read)
config = Config()
max_size = config.max_file_size_mb * 1024 * 1024
content = await file.read()
if len(content) > max_size:
raise fastapi.HTTPException(
status_code=413,
detail=f"File too large ({len(content)} bytes). "
f"Maximum size is {config.max_file_size_mb} MB.",
)
manager = await _get_manager(user_id, session_id)
# Quota check
within_quota, current_usage, quota_bytes = await manager.check_quota(len(content))
if not within_quota:
logger.warning(
f"User {user_id} hit storage quota: "
f"{current_usage / 1024 / 1024:.1f} MB / {quota_bytes / 1024 / 1024:.1f} MB"
)
raise fastapi.HTTPException(
status_code=413,
detail=(
f"Storage quota exceeded. Using {current_usage / 1024 / 1024:.1f} MB "
f"of {quota_bytes / 1024 / 1024:.1f} MB. "
"Please delete some files and try again."
),
)
# Write file via WorkspaceManager (handles storage, DB, virus scan, checksums)
# Path: /sessions/{session_id}/uploads/{filename}
upload_path = f"/uploads/{filename}"
try:
workspace_file = await manager.write_file(
content=content,
filename=filename,
path=upload_path,
mime_type=file.content_type,
overwrite=True,
)
except ValueError as e:
raise fastapi.HTTPException(status_code=400, detail=str(e))
logger.info(
f"User {user_id} uploaded {filename} ({len(content)} bytes) "
f"to session {session_id}"
)
return UploadFileResponse(
file_id=workspace_file.id,
name=workspace_file.name,
path=workspace_file.path,
mime_type=workspace_file.mime_type,
size_bytes=workspace_file.size_bytes,
)
@router.get(
"/files",
summary="List files in workspace",
)
async def list_files(
user_id: Annotated[str, fastapi.Security(get_user_id)],
session_id: str | None = fastapi.Query(
default=None, description="Scope to files in this session"
),
limit: int = fastapi.Query(default=50, ge=1, le=100),
offset: int = fastapi.Query(default=0, ge=0),
) -> ListFilesResponse:
"""
List files in the user's workspace.
If ``session_id`` is provided, only files in that session are returned.
Otherwise, all workspace files are listed.
"""
manager = await _get_manager(user_id, session_id)
files = await manager.list_files(
limit=limit,
offset=offset,
include_all_sessions=session_id is None,
)
total = await manager.get_file_count(include_all_sessions=session_id is None)
return ListFilesResponse(
files=[
FileInfoResponse(
file_id=f.id,
name=f.name,
path=f.path,
mime_type=f.mime_type,
size_bytes=f.size_bytes,
created_at=f.created_at.isoformat(),
)
for f in files
],
total_count=total,
)
@router.get(
"/usage",
summary="Get workspace storage usage and quota",
)
async def get_usage(
user_id: Annotated[str, fastapi.Security(get_user_id)],
) -> StorageUsageResponse:
"""
Get the user's workspace storage usage vs their quota.
Useful for showing remaining capacity in the UI and triggering
alerts when approaching the limit.
"""
manager = await _get_manager(user_id)
current_usage = await manager.get_total_usage_bytes()
config = Config()
quota_bytes = config.user_storage_quota_mb * 1024 * 1024
file_count = await manager.get_file_count(include_all_sessions=True)
pct = (current_usage / quota_bytes * 100) if quota_bytes > 0 else 0.0
if pct >= 90:
logger.warning(
f"User {user_id} storage usage at {pct:.1f}%: "
f"{current_usage / 1024 / 1024:.1f} MB / {quota_bytes / 1024 / 1024:.1f} MB"
)
return StorageUsageResponse(
used_bytes=current_usage,
quota_bytes=quota_bytes,
used_pct=round(pct, 2),
file_count=file_count,
)
@router.get(
"/files/{file_id}/download",
summary="Download file by ID",
@@ -120,3 +382,26 @@ async def download_file(
raise fastapi.HTTPException(status_code=404, detail="File not found")
return await _create_file_download_response(file)
@router.delete(
"/files/{file_id}",
summary="Delete a file from workspace",
status_code=204,
)
async def delete_file(
user_id: Annotated[str, fastapi.Security(get_user_id)],
file_id: str,
) -> Response:
"""
Soft-delete a file from the user's workspace.
The storage is reclaimed and the file is no longer accessible.
"""
manager = await _get_manager(user_id)
deleted = await manager.delete_file(file_id)
if not deleted:
raise fastapi.HTTPException(status_code=404, detail="File not found")
logger.info(f"User {user_id} deleted file {file_id}")
return Response(status_code=204)

View File

@@ -23,9 +23,46 @@ from backend.util.retry import func_retry
from .utils import CoPilotExecutionEntry, CoPilotLogMetadata
logger_mod = logging.getLogger(__name__)
logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]")
async def _build_file_attachment_context(
user_id: str,
session_id: str,
file_ids: list[str],
) -> str:
"""Build a context string describing workspace files attached by the user.
Fetches metadata for each file_id and returns a structured text block
that the agent can use to understand what files are available and how to
access them via the ``read_workspace_file`` tool.
"""
from backend.data.db_accessors import workspace_db
db = workspace_db()
workspace = await db.get_or_create_workspace(user_id)
lines: list[str] = ["[Attached files]"]
for fid in file_ids:
info = await db.get_workspace_file(fid, workspace.id)
if info is None:
lines.append(f"- {fid}: (not found)")
continue
lines.append(
f"- {info.name} (id={info.id}, path={info.path}, "
f"type={info.mime_type}, size={info.size_bytes:,} bytes)"
)
lines.append(
"\nUse the read_workspace_file tool with the file_id above to read "
"file contents. For large or binary files, use save_to_path to copy "
"them to the working directory first."
)
return "\n".join(lines)
# ============ Module Entry Points ============ #
# Thread-local storage for processor instances
@@ -235,10 +272,24 @@ class CoPilotProcessor:
)
log.info(f"Using {'SDK' if use_sdk else 'standard'} service")
# If the user attached files, build context and append to message
message = entry.message if entry.message else None
if message and entry.file_ids and entry.user_id:
try:
file_context = await _build_file_attachment_context(
entry.user_id, entry.session_id, entry.file_ids
)
message = f"{message}\n\n{file_context}"
log.info(
f"Injected {len(entry.file_ids)} file attachment(s) into message"
)
except Exception as e:
log.error(f"Failed to build file attachment context: {e}")
# Stream chat completion and publish chunks to Redis.
async for chunk in stream_fn(
session_id=entry.session_id,
message=entry.message if entry.message else None,
message=message,
is_user_message=entry.is_user_message,
user_id=entry.user_id,
context=entry.context,

View File

@@ -153,6 +153,9 @@ class CoPilotExecutionEntry(BaseModel):
context: dict[str, str] | None = None
"""Optional context for the message (e.g., {url: str, content: str})"""
file_ids: list[str] | None = None
"""Workspace file IDs attached to this message by the user"""
class CancelCoPilotEvent(BaseModel):
"""Event to cancel a CoPilot operation."""
@@ -171,6 +174,7 @@ async def enqueue_copilot_turn(
turn_id: str,
is_user_message: bool = True,
context: dict[str, str] | None = None,
file_ids: list[str] | None = None,
) -> None:
"""Enqueue a CoPilot task for processing by the executor service.
@@ -181,6 +185,7 @@ async def enqueue_copilot_turn(
turn_id: Per-turn UUID for Redis stream isolation
is_user_message: Whether the message is from the user (vs system/assistant)
context: Optional context for the message (e.g., {url: str, content: str})
file_ids: Optional list of workspace file IDs attached by the user
"""
from backend.util.clients import get_async_copilot_queue
@@ -191,6 +196,7 @@ async def enqueue_copilot_turn(
message=message,
is_user_message=is_user_message,
context=context,
file_ids=file_ids,
)
queue_client = await get_async_copilot_queue()

View File

@@ -413,6 +413,15 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Maximum file size in MB for workspace files (1-1024 MB)",
)
user_storage_quota_mb: int = Field(
default=1024,
ge=1,
le=102400,
description="Per-user workspace storage quota in MB (default 1 GB). "
"Users are blocked from uploading once their total workspace usage "
"exceeds this limit.",
)
# AutoMod configuration
automod_enabled: bool = Field(
default=False,

View File

@@ -424,3 +424,27 @@ class WorkspaceManager:
return await db.count_workspace_files(
self.workspace_id, path_prefix=effective_path
)
async def get_total_usage_bytes(self) -> int:
"""
Get total storage usage for this workspace across all sessions.
Returns:
Total size in bytes
"""
db = workspace_db()
return await db.get_workspace_total_size(self.workspace_id)
async def check_quota(self, additional_bytes: int = 0) -> tuple[bool, int, int]:
"""
Check if the workspace is within storage quota.
Args:
additional_bytes: Size of a pending upload to include in the check
Returns:
Tuple of (within_quota, current_usage_bytes, quota_bytes)
"""
current_usage = await self.get_total_usage_bytes()
quota_bytes = Config().user_storage_quota_mb * 1024 * 1024
return (current_usage + additional_bytes) <= quota_bytes, current_usage, quota_bytes