fix(backend): security fixes and dead code removal

- routes.py: Sanitize filename in Content-Disposition header to prevent
  header injection (RFC5987 encoding for non-ASCII)
- http.py: Replace assert with explicit ValueError for graph_exec_id check
  (asserts can be stripped with -O)
- workspace.py: Remove unused functions (get_workspace_by_id,
  hard_delete_workspace_file, update_workspace_file)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-01-28 00:14:22 -06:00
parent 83f93d00f4
commit d51d811497
3 changed files with 29 additions and 87 deletions

View File

@@ -3,7 +3,9 @@ Workspace API routes for managing user file storage.
"""
import logging
import re
from typing import Annotated
from urllib.parse import quote
import fastapi
from autogpt_libs.auth.dependencies import get_user_id, requires_user
@@ -12,6 +14,28 @@ from fastapi.responses import Response
from backend.data.workspace import get_workspace, get_workspace_file
from backend.util.workspace_storage import get_workspace_storage
def _sanitize_filename_for_header(filename: str) -> str:
"""
Sanitize filename for Content-Disposition header to prevent header injection.
Removes/replaces characters that could break the header or inject new headers.
Uses RFC5987 encoding for non-ASCII characters.
"""
# Remove CR, LF, and null bytes (header injection prevention)
sanitized = re.sub(r'[\r\n\x00]', '', filename)
# Escape quotes
sanitized = sanitized.replace('"', '\\"')
# For non-ASCII, use RFC5987 filename* parameter
# Check if filename has non-ASCII characters
try:
sanitized.encode('ascii')
return f'attachment; filename="{sanitized}"'
except UnicodeEncodeError:
# Use RFC5987 encoding for UTF-8 filenames
encoded = quote(filename, safe='')
return f"attachment; filename*=UTF-8''{encoded}"
logger = logging.getLogger(__name__)
router = fastapi.APIRouter(
@@ -35,7 +59,7 @@ async def _create_file_download_response(file) -> Response:
content=content,
media_type=file.mimeType,
headers={
"Content-Disposition": f'attachment; filename="{file.name}"',
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)
@@ -50,7 +74,7 @@ async def _create_file_download_response(file) -> Response:
content=content,
media_type=file.mimeType,
headers={
"Content-Disposition": f'attachment; filename="{file.name}"',
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)
@@ -62,7 +86,7 @@ async def _create_file_download_response(file) -> Response:
content=content,
media_type=file.mimeType,
headers={
"Content-Disposition": f'attachment; filename="{file.name}"',
"Content-Disposition": _sanitize_filename_for_header(file.name),
"Content-Length": str(len(content)),
},
)

View File

@@ -128,7 +128,8 @@ class SendWebRequestBlock(Block):
"""
files_payload: list[tuple[str, tuple[str, BytesIO, str]]] = []
graph_exec_id = execution_context.graph_exec_id
assert graph_exec_id is not None
if graph_exec_id is None:
raise ValueError("graph_exec_id is required for file operations")
for media in files:
# Normalise to a list so we can repeat the same key

View File

@@ -52,19 +52,6 @@ async def get_workspace(user_id: str) -> Optional[UserWorkspace]:
return await UserWorkspace.prisma().find_unique(where={"userId": user_id})
async def get_workspace_by_id(workspace_id: str) -> Optional[UserWorkspace]:
"""
Get workspace by its ID.
Args:
workspace_id: The workspace ID
Returns:
UserWorkspace instance or None
"""
return await UserWorkspace.prisma().find_unique(where={"id": workspace_id})
async def create_workspace_file(
workspace_id: str,
name: str,
@@ -271,76 +258,6 @@ async def soft_delete_workspace_file(
return updated
async def hard_delete_workspace_file(file_id: str) -> bool:
"""
Permanently delete a workspace file record.
Note: This only deletes the database record. The actual file should be
deleted from storage separately using the storage backend.
Args:
file_id: The file ID
Returns:
True if deleted, False if not found
"""
try:
await UserWorkspaceFile.prisma().delete(where={"id": file_id})
logger.info(f"Hard-deleted workspace file {file_id}")
return True
except Exception:
return False
async def update_workspace_file(
file_id: str,
name: Optional[str] = None,
path: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[UserWorkspaceFile]:
"""
Update workspace file metadata.
Args:
file_id: The file ID
name: New filename
path: New virtual path
metadata: New metadata (merged with existing)
Returns:
Updated UserWorkspaceFile instance or None if not found
"""
update_data: dict = {}
if name is not None:
update_data["name"] = name
if path is not None:
if not path.startswith("/"):
path = f"/{path}"
update_data["path"] = path
if metadata is not None:
# Get existing metadata and merge
file = await get_workspace_file(file_id)
if file is None:
return None
existing_metadata = file.metadata if file.metadata else {}
merged_metadata = {**existing_metadata, **metadata}
update_data["metadata"] = SafeJson(merged_metadata)
if not update_data:
return await get_workspace_file(file_id)
try:
return await UserWorkspaceFile.prisma().update(
where={"id": file_id},
data=update_data,
)
except Exception:
return None
async def workspace_file_exists(
workspace_id: str,
path: str,