Files
AutoGPT/autogpt_platform/backend/backend/util/feature_flag.py
Zamil Majdy 52b3aebf71 feat(backend/sdk): Claude Agent SDK integration for CoPilot (#12103)
## Summary

Full integration of the **Claude Agent SDK** to replace the existing
one-turn OpenAI-compatible CoPilot implementation with a multi-turn,
tool-using AI agent.

### What changed

**Core SDK Integration** (`chat/sdk/` — new module)
- **`service.py`**: Main orchestrator — spawns Claude Code CLI as a
subprocess per user message, streams responses back via SSE. Handles
conversation history compression, session lifecycle, and error recovery.
- **`response_adapter.py`**: Translates Claude Agent SDK events (text
deltas, tool use, errors, result messages) into the existing CoPilot
`StreamEvent` protocol so the frontend works unchanged.
- **`tool_adapter.py`**: Bridges CoPilot's MCP tools (find_block,
run_block, create_agent, etc.) into the SDK's tool format. Handles
schema conversion and result serialization.
- **`security_hooks.py`**: Pre/Post tool-use hooks that enforce a strict
allowlist of tools, block path traversal, sandbox file operations to
per-session workspace directories, cap sub-agent spawning, and prevent
the model from accessing unauthorized system resources.
- **`transcript.py`**: JSONL transcript I/O utilities for the stateless
`--resume` feature (see below).

**Stateless Multi-Turn Resume** (new)
- Instead of compressing conversation history via LLM on every turn
(lossy and expensive), we capture Claude Code's native JSONL session
transcript via a **Stop hook** callback, persist it in the DB
(`ChatSession.sdkTranscript`), and restore it on the next turn via
`--resume <file>`.
- This preserves full tool call/result context across turns with zero
token overhead for history.
- Feature-flagged via `CLAUDE_AGENT_USE_RESUME` (default: off).
- DB migration: `ALTER TABLE "ChatSession" ADD COLUMN "sdkTranscript"
TEXT`.

**Sandboxed Tool Execution** (`chat/tools/`)
- **`bash_exec.py`**: Sandboxed bash execution using bubblewrap
(`bwrap`) with read-only root filesystem, per-session writable
workspace, resource limits (CPU, memory, file size), and network
isolation.
- **`sandbox.py`**: Shared bubblewrap sandbox infrastructure — generates
`bwrap` command lines with configurable mounts, environment, and
resource constraints.
- **`web_fetch.py`**: URL fetching tool with domain allowlist, size
limits, and content-type filtering.
- **`check_operation_status.py`**: Polling tool for long-running
operations (agent creation, block execution) so the SDK doesn't block
waiting.
- **`find_block.py`** / **`run_block.py`**: Enhanced with category
filtering, optimized response size (removed raw JSON schemas), and
better error handling.

**Security**
- Path traversal prevention: session IDs sanitized, all file ops
confined to workspace dirs, symlink resolution.
- Tool allowlist enforcement via SDK hooks — model cannot call arbitrary
tools.
- Built-in `Bash` tool blocked via `disallowed_tools` to prevent
bypassing sandboxed `bash_exec`.
- Sub-agent (`Task`) spawning capped at configurable limit (default:
10).
- CodeQL-clean path sanitization patterns.

**Streaming & Reconnection**
- SSE stream registry backed by Redis Streams for crash-resilient
reconnection.
- Long-running operation tracking with TTL-based cleanup.
- Atomic message append to prevent race conditions on concurrent writes.

**Configuration** (`config.py`)
- `use_claude_agent_sdk` — master toggle (default: on)
- `claude_agent_model` — model override for SDK path
- `claude_agent_max_buffer_size` — JSON parsing buffer (10MB)
- `claude_agent_max_subtasks` — sub-agent cap (10)
- `claude_agent_use_resume` — transcript-based resume (default: off)
- `thinking_enabled` — extended thinking for Claude models

**Tests**
- `sdk/response_adapter_test.py` — 366 lines covering all event
translation paths
- `sdk/security_hooks_test.py` — 165 lines covering tool blocking, path
traversal, subtask limits
- `chat/model_test.py` — 214 lines covering session model serialization
- `chat/service_test.py` — Integration tests including multi-turn resume
keyword recall
- `tools/find_block_test.py` / `run_block_test.py` — Extended with new
tool behavior tests

## Test plan
- [x] Unit tests pass (`sdk/response_adapter_test.py`,
`security_hooks_test.py`, `model_test.py`)
- [x] Integration test: multi-turn keyword recall via `--resume`
(`service_test.py::test_sdk_resume_multi_turn`)
- [x] Manual E2E: CoPilot chat sessions with tool calls, bash execution,
and multi-turn context
- [x] Pre-commit hooks pass (ruff, isort, black, pyright, flake8)
- [ ] Staging deployment with `claude_agent_use_resume=false` initially
- [ ] Enable resume in staging, verify transcript capture and recall

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

This PR replaces the existing OpenAI-compatible CoPilot with a full
Claude Agent SDK integration, introducing multi-turn conversations,
stateless resume via JSONL transcripts, and sandboxed tool execution.

**Key changes:**
- **SDK integration** (`chat/sdk/`): spawns Claude Code CLI subprocess
per message, translates events to frontend protocol, bridges MCP tools
- **Stateless resume**: captures JSONL transcripts via Stop hook,
persists in `ChatSession.sdkTranscript`, restores with `--resume`
(feature-flagged, default off)
- **Sandboxed execution**: bubblewrap sandbox for bash commands with
filesystem whitelist, network isolation, resource limits
- **Security hooks**: tool allowlist enforcement, path traversal
prevention, workspace-scoped file operations, sub-agent spawn limits
- **Long-running operations**: delegates `create_agent`/`edit_agent` to
existing stream_registry infrastructure for SSE reconnection
- **Feature flag**: `CHAT_USE_CLAUDE_AGENT_SDK` with LaunchDarkly
support, defaults to enabled

**Security issues found:**
- Path traversal validation has logic errors in `security_hooks.py:82`
(tilde expansion order) and `service.py:266` (redundant `..` check)
- Config validator always prefers env var over explicit `False` value
(`config.py:162`)
- Race condition in `routes.py:323` — message persisted before task
registration, could duplicate on retry
- Resource limits in sandbox may fail silently (`sandbox.py:109`)

**Test coverage is strong** with 366 lines for response adapter, 165 for
security hooks, and integration tests for multi-turn resume.
</details>


<details><summary><h3>Confidence Score: 3/5</h3></summary>

- This PR is generally safe but has critical security issues in path
validation that must be fixed before merge
- Score reflects strong architecture and test coverage offset by real
security vulnerabilities: the tilde expansion bug in `security_hooks.py`
could allow sandbox escape, the race condition could cause message
duplication, and the silent ulimit failures could bypass resource
limits. The bubblewrap sandbox and allowlist enforcement are
well-designed, but the path validation bugs need fixing. The transcript
resume feature is properly feature-flagged. Overall the implementation
is solid but the security issues prevent a higher score.
- Pay close attention to
`backend/api/features/chat/sdk/security_hooks.py` (path traversal
vulnerability), `backend/api/features/chat/routes.py` (race condition),
`backend/api/features/chat/tools/sandbox.py` (silent resource limit
failures), and `backend/api/features/chat/sdk/service.py` (redundant
security check)
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant Frontend
    participant Routes as routes.py
    participant SDKService as sdk/service.py
    participant ClaudeSDK as Claude Agent SDK CLI
    participant SecurityHooks as security_hooks.py
    participant ToolAdapter as tool_adapter.py
    participant CoPilotTools as tools/*
    participant Sandbox as sandbox.py (bwrap)
    participant DB as Database
    participant Redis as stream_registry

    Frontend->>Routes: POST /chat (user message)
    Routes->>SDKService: stream_chat_completion_sdk()
    
    SDKService->>DB: get_chat_session()
    DB-->>SDKService: session + messages
    
    alt Resume enabled AND transcript exists
        SDKService->>SDKService: validate_transcript()
        SDKService->>SDKService: write_transcript_to_tempfile()
        Note over SDKService: Pass --resume to SDK
    else No resume
        SDKService->>SDKService: _compress_conversation_history()
        Note over SDKService: Inject history into user message
    end
    
    SDKService->>SecurityHooks: create_security_hooks()
    SDKService->>ToolAdapter: create_copilot_mcp_server()
    SDKService->>ClaudeSDK: spawn subprocess with MCP server
    
    loop Streaming Conversation
        ClaudeSDK->>SDKService: AssistantMessage (text/tool_use)
        SDKService->>Frontend: StreamTextDelta / StreamToolInputAvailable
        
        alt Tool Call
            ClaudeSDK->>SecurityHooks: PreToolUse hook
            SecurityHooks->>SecurityHooks: validate path, check allowlist
            alt Tool blocked
                SecurityHooks-->>ClaudeSDK: deny
            else Tool allowed
                SecurityHooks-->>ClaudeSDK: allow
                ClaudeSDK->>ToolAdapter: call MCP tool
                
                alt Long-running tool (create_agent, edit_agent)
                    ToolAdapter->>Redis: register task
                    ToolAdapter->>DB: save OperationPendingResponse
                    ToolAdapter->>ToolAdapter: spawn background task
                    ToolAdapter-->>ClaudeSDK: OperationStartedResponse
                else Regular tool (find_block, bash_exec)
                    ToolAdapter->>CoPilotTools: execute()
                    alt bash_exec
                        CoPilotTools->>Sandbox: run_sandboxed()
                        Sandbox->>Sandbox: build bwrap command
                        Note over Sandbox: Network isolation,<br/>filesystem whitelist,<br/>resource limits
                        Sandbox-->>CoPilotTools: stdout, stderr, exit_code
                    end
                    CoPilotTools-->>ToolAdapter: result
                    ToolAdapter->>ToolAdapter: stash full output
                    ToolAdapter-->>ClaudeSDK: MCP response
                end
                
                SecurityHooks->>SecurityHooks: PostToolUse hook (log)
            end
        end
        
        ClaudeSDK->>SDKService: UserMessage (ToolResultBlock)
        SDKService->>ToolAdapter: pop_pending_tool_output()
        SDKService->>Frontend: StreamToolOutputAvailable
    end
    
    ClaudeSDK->>SecurityHooks: Stop hook
    SecurityHooks->>SDKService: transcript_path callback
    SDKService->>SDKService: read_transcript_file()
    SDKService->>DB: save transcript to session.sdkTranscript
    
    ClaudeSDK->>SDKService: ResultMessage (success)
    SDKService->>Frontend: StreamFinish
    SDKService->>DB: upsert_chat_session()
```
</details>


<sub>Last reviewed commit: 28c1121</sub>

<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: Swifty <craigswift13@gmail.com>
2026-02-13 15:49:03 +00:00

335 lines
10 KiB
Python

import contextlib
import logging
from enum import Enum
from functools import wraps
from typing import Any, Awaitable, Callable, TypeVar
import ldclient
from autogpt_libs.auth.dependencies import get_optional_user_id
from fastapi import HTTPException, Security
from ldclient import Context, LDClient
from ldclient.config import Config
from typing_extensions import ParamSpec
from backend.util.cache import cached
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
# Load settings at module level
settings = Settings()
P = ParamSpec("P")
T = TypeVar("T")
_is_initialized = False
class Flag(str, Enum):
"""
Centralized enum for all LaunchDarkly feature flags.
Add new flags here to ensure consistency across the codebase.
"""
AUTOMOD = "AutoMod"
AI_ACTIVITY_STATUS = "ai-agent-execution-summary"
BETA_BLOCKS = "beta-blocks"
AGENT_ACTIVITY = "agent-activity"
ENABLE_PLATFORM_PAYMENT = "enable-platform-payment"
CHAT = "chat"
COPILOT_SDK = "copilot-sdk"
def is_configured() -> bool:
"""Check if LaunchDarkly is configured with an SDK key."""
return bool(settings.secrets.launch_darkly_sdk_key)
def get_client() -> LDClient:
"""Get the LaunchDarkly client singleton."""
if not _is_initialized:
initialize_launchdarkly()
return ldclient.get()
def initialize_launchdarkly() -> None:
sdk_key = settings.secrets.launch_darkly_sdk_key
logger.debug(
f"Initializing LaunchDarkly with SDK key: {'present' if sdk_key else 'missing'}"
)
if not sdk_key:
logger.warning("LaunchDarkly SDK key not configured")
return
config = Config(sdk_key)
ldclient.set_config(config)
global _is_initialized
_is_initialized = True
if ldclient.get().is_initialized():
logger.info("LaunchDarkly client initialized successfully")
else:
logger.error("LaunchDarkly client failed to initialize")
def shutdown_launchdarkly() -> None:
"""Shutdown the LaunchDarkly client."""
if ldclient.get().is_initialized():
ldclient.get().close()
logger.info("LaunchDarkly client closed successfully")
@cached(maxsize=1000, ttl_seconds=86400) # 1000 entries, 24 hours TTL
async def _fetch_user_context_data(user_id: str) -> Context:
"""
Fetch user context for LaunchDarkly from Supabase.
Args:
user_id: The user ID to fetch data for
Returns:
LaunchDarkly Context object
"""
builder = Context.builder(user_id).kind("user").anonymous(True)
try:
from backend.util.clients import get_supabase
# If we have user data, update context
response = get_supabase().auth.admin.get_user_by_id(user_id)
if response and response.user:
user = response.user
builder.anonymous(False)
if user.role:
builder.set("role", user.role)
# It's weird, I know, but it is what it is.
builder.set("custom", {"role": user.role})
if user.email:
builder.set("email", user.email)
builder.set("email_domain", user.email.split("@")[-1])
except Exception as e:
logger.warning(f"Failed to fetch user context for {user_id}: {e}")
return builder.build()
async def get_feature_flag_value(
flag_key: str,
user_id: str,
default: Any = None,
) -> Any:
"""
Get the raw value of a feature flag for a user.
This is the generic function that returns the actual flag value,
which could be a boolean, string, number, or JSON object.
Args:
flag_key: The LaunchDarkly feature flag key
user_id: The user ID to evaluate the flag for
default: Default value if LaunchDarkly is unavailable or flag evaluation fails
Returns:
The flag value from LaunchDarkly
"""
try:
client = get_client()
# Check if client is initialized
if not client.is_initialized():
logger.debug(
f"LaunchDarkly not initialized, using default={default} for {flag_key}"
)
return default
# Get user context from Supabase
context = await _fetch_user_context_data(user_id)
# Evaluate flag
result = client.variation(flag_key, context, default)
logger.debug(
f"Feature flag {flag_key} for user {user_id}: {result} (type: {type(result).__name__})"
)
return result
except Exception as e:
logger.warning(
f"LaunchDarkly flag evaluation failed for {flag_key}: {e}, using default={default}"
)
return default
async def is_feature_enabled(
flag_key: Flag,
user_id: str,
default: bool = False,
) -> bool:
"""
Check if a feature flag is enabled for a user.
Args:
flag_key: The Flag enum value
user_id: The user ID to evaluate the flag for
default: Default value if LaunchDarkly is unavailable or flag evaluation fails
Returns:
True if feature is enabled, False otherwise
"""
result = await get_feature_flag_value(flag_key.value, user_id, default)
# If the result is already a boolean, return it
if isinstance(result, bool):
return result
# Log a warning if the flag is not returning a boolean
logger.warning(
f"Feature flag {flag_key} returned non-boolean value: {result} (type: {type(result).__name__}). "
f"This flag should be configured as a boolean in LaunchDarkly. Using default={default}"
)
# Return the default if we get a non-boolean value
# This prevents objects from being incorrectly treated as True
return default
def feature_flag(
flag_key: str,
default: bool = False,
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:
"""
Decorator for async feature flag protected endpoints.
Args:
flag_key: The LaunchDarkly feature flag key
default: Default value if flag evaluation fails
Returns:
Decorator that only works with async functions
"""
def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
try:
user_id = kwargs.get("user_id")
if not user_id:
raise ValueError("user_id is required")
if not get_client().is_initialized():
logger.warning(
"LaunchDarkly not initialized, "
f"using default {flag_key}={repr(default)}"
)
is_enabled = default
else:
# Use the internal function directly since we have a raw string flag_key
flag_value = await get_feature_flag_value(
flag_key, str(user_id), default
)
# Ensure we treat flag value as boolean
if isinstance(flag_value, bool):
is_enabled = flag_value
else:
# Log warning and use default for non-boolean values
logger.warning(
f"Feature flag {flag_key} returned non-boolean value: "
f"{repr(flag_value)} (type: {type(flag_value).__name__}). "
f"Using default value {repr(default)}"
)
is_enabled = default
if not is_enabled:
raise HTTPException(status_code=404, detail="Feature not available")
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Error evaluating feature flag {flag_key}: {e}")
raise
return async_wrapper
return decorator
def create_feature_flag_dependency(
flag_key: Flag,
default: bool = False,
) -> Callable[[str | None], Awaitable[None]]:
"""
Create a FastAPI dependency that checks a feature flag.
This dependency automatically extracts the user_id from the JWT token
(if present) for proper LaunchDarkly user targeting, while still
supporting anonymous access.
Args:
flag_key: The Flag enum value to check
default: Default value if flag evaluation fails
Returns:
An async dependency function that raises HTTPException if flag is disabled
Example:
router = APIRouter(
dependencies=[Depends(create_feature_flag_dependency(Flag.CHAT))]
)
"""
async def check_feature_flag(
user_id: str | None = Security(get_optional_user_id),
) -> None:
"""Check if feature flag is enabled for the user.
The user_id is automatically injected from JWT authentication if present,
or None for anonymous access.
"""
# For routes that don't require authentication, use anonymous context
check_user_id = user_id or "anonymous"
if not is_configured():
logger.debug(
f"LaunchDarkly not configured, using default {flag_key.value}={default}"
)
if not default:
raise HTTPException(status_code=404, detail="Feature not available")
return
try:
client = get_client()
if not client.is_initialized():
logger.debug(
f"LaunchDarkly not initialized, using default {flag_key.value}={default}"
)
if not default:
raise HTTPException(status_code=404, detail="Feature not available")
return
is_enabled = await is_feature_enabled(flag_key, check_user_id, default)
if not is_enabled:
raise HTTPException(status_code=404, detail="Feature not available")
except Exception as e:
logger.warning(
f"LaunchDarkly error for flag {flag_key.value}: {e}, using default={default}"
)
raise HTTPException(status_code=500, detail="Failed to check feature flag")
return check_feature_flag
@contextlib.contextmanager
def mock_flag_variation(flag_key: str, return_value: Any):
"""Context manager for testing feature flags."""
original_variation = get_client().variation
get_client().variation = lambda key, context, default: (
return_value if key == flag_key else original_variation(key, context, default)
)
try:
yield
finally:
get_client().variation = original_variation