fix(copilot): review round 1 — hoist subagent constant, strip C1 chars, guard tmpdir

- Move _SUBAGENT_TOOLS frozenset to module level to avoid per-session allocation
- Extend _sanitize to strip C1 control characters (U+0080-U+009F) for
  defense against log injection via non-ASCII control sequences
- Guard CLAUDE_CODE_TMPDIR assignment with `if sdk_cwd:` for defensive
  consistency (matches PR #12636 approach)
This commit is contained in:
Zamil Majdy
2026-04-01 17:11:53 +02:00
parent ce1555c07a
commit c7e0f8169a
2 changed files with 16 additions and 8 deletions

View File

@@ -22,6 +22,10 @@ from .tool_adapter import (
logger = logging.getLogger(__name__)
# The SDK CLI uses "Task" in older versions and "Agent" in v2.x+.
# Shared across all sessions — used by security hooks for sub-agent detection.
_SUBAGENT_TOOLS: frozenset[str] = frozenset({"Task", "Agent"})
def _deny(reason: str) -> dict[str, Any]:
"""Return a hook denial response."""
@@ -155,14 +159,17 @@ def create_security_hooks(
def _sanitize(value: str, max_len: int = 200) -> str:
"""Strip control characters and truncate for safe logging."""
# Remove all C0 control chars (U+0000U+001F) and DEL (U+007F).
cleaned = "".join(c for c in value if c >= " " and c != "\x7f")
# Remove C0 (U+0000-U+001F), DEL (U+007F), and C1 (U+0080-U+009F)
# control characters to prevent log injection.
cleaned = "".join(
c
for c in value
if c >= " " and c != "\x7f" and not ("\x80" <= c <= "\x9f")
)
return cleaned[:max_len]
# Per-session tracking for sub-agent concurrency.
# Set of tool_use_ids that consumed a slot — len() is the active count.
# The SDK CLI uses "Task" in older versions and "Agent" in v2.x+.
_subagent_tools: frozenset[str] = frozenset({"Task", "Agent"})
task_tool_use_ids: set[str] = set()
async def pre_tool_use_hook(
@@ -177,7 +184,7 @@ def create_security_hooks(
# Rate-limit sub-agent spawns per session.
# The SDK CLI renamed "Task" → "Agent" in v2.x; handle both.
if tool_name in _subagent_tools:
if tool_name in _SUBAGENT_TOOLS:
# Block background task execution first — denied calls
# should not consume a subtask slot.
if tool_input.get("run_in_background"):
@@ -221,7 +228,7 @@ def create_security_hooks(
return cast(SyncHookJSONOutput, result)
# Reserve the sub-agent slot only after all validations pass
if tool_name in _subagent_tools and tool_use_id is not None:
if tool_name in _SUBAGENT_TOOLS and tool_use_id is not None:
task_tool_use_ids.add(tool_use_id)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
@@ -229,7 +236,7 @@ def create_security_hooks(
def _release_task_slot(tool_name: str, tool_use_id: str | None) -> None:
"""Release a sub-agent concurrency slot if one was reserved."""
if tool_name in _subagent_tools and tool_use_id in task_tool_use_ids:
if tool_name in _SUBAGENT_TOOLS and tool_use_id in task_tool_use_ids:
task_tool_use_ids.discard(tool_use_id)
logger.info(
"[SDK] Sub-agent slot released, active=%d/%d, user=%s",

View File

@@ -1872,7 +1872,8 @@ async def stream_chat_completion_sdk(
# NOTE: We intentionally do NOT override HOME here. The CLI
# stores auth credentials in $HOME/.claude/ — overriding HOME
# would break subscription mode (claude login) authentication.
sdk_env["CLAUDE_CODE_TMPDIR"] = sdk_cwd
if sdk_cwd:
sdk_env["CLAUDE_CODE_TMPDIR"] = sdk_cwd
if not config.api_key and not config.use_claude_code_subscription:
raise RuntimeError(