fix(backend/chat): Use normpath+startswith pattern for CodeQL path sanitization

CodeQL doesn't recognize re.sub as a path sanitizer. Switch to the
os.path.normpath + startswith prefix check pattern that CodeQL's
taint model explicitly recognizes as breaking the taint chain.
This commit is contained in:
Zamil Majdy
2026-02-11 06:45:12 +04:00
parent 750a674c78
commit 9791bdd724

View File

@@ -46,12 +46,20 @@ config = ChatConfig()
_background_tasks: set[asyncio.Task[Any]] = set()
def _sanitize_session_id(session_id: str) -> str:
"""Sanitize session_id to prevent path traversal and injection.
_SDK_CWD_PREFIX = "/tmp/copilot-"
Only allows alphanumeric characters and hyphens, stripping everything else.
def _make_sdk_cwd(session_id: str) -> str:
"""Create a safe, session-specific working directory path.
Sanitizes session_id, then validates the resulting path stays under /tmp/
using normpath + startswith (the pattern CodeQL recognises as a sanitizer).
"""
return re.sub(r"[^A-Za-z0-9-]", "", session_id)
safe_id = re.sub(r"[^A-Za-z0-9-]", "", session_id)
cwd = os.path.normpath(f"{_SDK_CWD_PREFIX}{safe_id}")
if not cwd.startswith(_SDK_CWD_PREFIX):
raise ValueError(f"Session path escaped prefix: {cwd}")
return cwd
def _cleanup_sdk_tool_results(cwd: str) -> None:
@@ -64,8 +72,13 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
import glob as _glob
import shutil
# Validate cwd is under the expected prefix (CodeQL sanitizer pattern)
normalized = os.path.normpath(cwd)
if not normalized.startswith(_SDK_CWD_PREFIX):
return
# SDK encodes the cwd path by replacing '/' with '-'
encoded_cwd = cwd.replace("/", "-")
encoded_cwd = normalized.replace("/", "-")
project_dir = os.path.expanduser(f"~/.claude/projects/{encoded_cwd}")
results_glob = os.path.join(project_dir, "tool-results", "*")
@@ -76,11 +89,10 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
pass
# Also clean up the temp cwd directory itself
if cwd.startswith("/tmp/copilot-"):
try:
shutil.rmtree(cwd, ignore_errors=True)
except OSError:
pass
try:
shutil.rmtree(normalized, ignore_errors=True)
except OSError:
pass
async def _compress_conversation_history(
@@ -247,9 +259,8 @@ async def stream_chat_completion_sdk(
stream_completed = False
# Use a session-specific temp dir to avoid cleanup race conditions
# between concurrent sessions. Sanitize session_id to prevent path traversal.
safe_session_id = _sanitize_session_id(session_id)
sdk_cwd = f"/tmp/copilot-{safe_session_id}"
# between concurrent sessions.
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
try: