Compare commits

...

3 Commits

Author SHA1 Message Date
Otto (AGPT)
83422d9dc8 fix: address review feedback - Redis TTL sync, cleanup loop, path handling, edit uniqueness 2026-02-19 15:07:23 +00:00
Zamil Majdy
4e84be021a Merge branch 'dev' into feat/copilot-e2b-sandbox 2026-02-19 20:34:19 +05:30
Otto (AGPT)
81b20ff9d8 feat(copilot): Replace bubblewrap with e2b sandbox for unified filesystem
Add e2b sandbox support behind COPILOT_E2B feature flag:

- New CoPilotSandboxManager: manages sandbox lifecycle per session with
  Redis-based cross-pod reconnection and idle timeout cleanup
- New MCP file tools (read_file, write_file, edit_file, glob_files,
  grep_files) that operate on the e2b sandbox filesystem
- New save_to_workspace/load_from_workspace tools for explicit GCS sync
- bash_exec uses e2b when available, falls back to bubblewrap
- SDK built-in file tools disabled when e2b is active (replaced by MCP tools)
- Security hooks updated to skip workspace path validation when e2b handles it
- Settings: copilot_sandbox_timeout, copilot_sandbox_template, copilot_use_e2b
- Feature flag: COPILOT_E2B in Flag enum for LaunchDarkly rollout

All three previously disconnected file contexts (SDK ephemeral disk,
bubblewrap sandbox, GCS workspace) collapse into a single e2b sandbox
per session, with explicit workspace sync for persistence.

Bubblewrap code kept as fallback when e2b is disabled.
2026-02-19 14:01:26 +00:00
9 changed files with 1130 additions and 19 deletions

View File

@@ -16,6 +16,7 @@ from .tool_adapter import (
DANGEROUS_PATTERNS,
MCP_TOOL_PREFIX,
WORKSPACE_SCOPED_TOOLS,
get_sandbox_manager,
stash_pending_tool_output,
)
@@ -97,8 +98,10 @@ def _validate_tool_access(
"Use the CoPilot-specific MCP tools instead."
)
# Workspace-scoped tools: allowed only within the SDK workspace directory
if tool_name in WORKSPACE_SCOPED_TOOLS:
# Workspace-scoped tools: allowed only within the SDK workspace directory.
# When e2b is enabled, these SDK built-in tools are disabled (replaced by
# MCP e2b file tools), so skip workspace path validation.
if tool_name in WORKSPACE_SCOPED_TOOLS and get_sandbox_manager() is None:
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
# Check for dangerous patterns in tool input

View File

@@ -58,6 +58,9 @@ from .transcript import (
logger = logging.getLogger(__name__)
config = ChatConfig()
# SDK built-in file tools to disable when e2b is active (replaced by MCP tools)
_E2B_DISALLOWED_SDK_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep"]
# Set to hold background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[Any]] = set()
@@ -98,6 +101,23 @@ _SDK_TOOL_SUPPLEMENT = """
is delivered to the user via a background stream.
"""
_SDK_TOOL_SUPPLEMENT_E2B = """
## Tool notes
- The SDK built-in Bash, Read, Write, Edit, Glob, and Grep tools are NOT available.
Use the MCP tools instead: `bash_exec`, `read_file`, `write_file`, `edit_file`,
`glob_files`, `grep_files`.
- **All tools share a single sandbox**: The sandbox is a microVM with a shared
filesystem at /home/user/. Files created by any tool are accessible to all others.
Network access IS available (pip install, curl, etc.).
- **Persistent storage**: Use `save_to_workspace` to persist sandbox files to cloud
storage, and `load_from_workspace` to bring workspace files into the sandbox.
- Long-running tools (create_agent, edit_agent, etc.) are handled
asynchronously. You will receive an immediate response; the actual result
is delivered to the user via a background stream.
"""
def _build_long_running_callback(user_id: str | None) -> LongRunningCallback:
"""Build a callback that delegates long-running tools to the non-SDK infrastructure.
@@ -453,12 +473,33 @@ async def stream_chat_completion_sdk(
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Check if e2b sandbox is enabled for this user
sandbox_mgr = None
use_e2b = False
try:
from backend.util.feature_flag import Flag
from backend.util.feature_flag import is_feature_enabled as _is_flag_enabled
from backend.util.settings import Config as AppConfig
app_config = AppConfig()
use_e2b = await _is_flag_enabled(
Flag.COPILOT_E2B,
user_id or "anonymous",
default=app_config.copilot_use_e2b,
)
if use_e2b:
from backend.copilot.tools.e2b_sandbox import CoPilotSandboxManager
sandbox_mgr = CoPilotSandboxManager()
except Exception as e:
logger.warning(f"[SDK] Failed to initialize e2b sandbox: {e}")
# Build system prompt (reuses non-SDK path with Langfuse support)
has_history = len(session.messages) > 1
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
system_prompt += _SDK_TOOL_SUPPLEMENT
system_prompt += _SDK_TOOL_SUPPLEMENT_E2B if use_e2b else _SDK_TOOL_SUPPLEMENT
message_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
@@ -480,6 +521,7 @@ async def stream_chat_completion_sdk(
user_id,
session,
long_running_callback=_build_long_running_callback(user_id),
sandbox_manager=sandbox_mgr,
)
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
@@ -531,11 +573,21 @@ async def stream_chat_completion_sdk(
f"msg_count={transcript_msg_count})"
)
# When e2b is active, disable SDK built-in file tools
# (replaced by MCP e2b tools) and remove them from allowed list
effective_disallowed = list(SDK_DISALLOWED_TOOLS)
effective_allowed = list(COPILOT_TOOL_NAMES)
if use_e2b:
effective_disallowed.extend(_E2B_DISALLOWED_SDK_TOOLS)
effective_allowed = [
t for t in effective_allowed if t not in _E2B_DISALLOWED_SDK_TOOLS
]
sdk_options_kwargs: dict[str, Any] = {
"system_prompt": system_prompt,
"mcp_servers": {"copilot": mcp_server},
"allowed_tools": COPILOT_TOOL_NAMES,
"disallowed_tools": SDK_DISALLOWED_TOOLS,
"allowed_tools": effective_allowed,
"disallowed_tools": effective_disallowed,
"hooks": security_hooks,
"cwd": sdk_cwd,
"max_buffer_size": config.claude_agent_max_buffer_size,
@@ -749,6 +801,11 @@ async def stream_chat_completion_sdk(
)
yield StreamFinish()
finally:
if sandbox_mgr:
try:
await sandbox_mgr.dispose_all()
except Exception as e:
logger.warning(f"[SDK] Failed to dispose e2b sandboxes: {e}")
if sdk_cwd:
_cleanup_sdk_tool_results(sdk_cwd)

View File

@@ -42,7 +42,8 @@ _current_session: ContextVar[ChatSession | None] = ContextVar(
# Keyed by tool_name → full output string. Consumed (popped) by the
# response adapter when it builds StreamToolOutputAvailable.
_pending_tool_outputs: ContextVar[dict[str, list[str]]] = ContextVar(
"pending_tool_outputs", default=None # type: ignore[arg-type]
"pending_tool_outputs",
default=None, # type: ignore[arg-type]
)
# Callback type for delegating long-running tools to the non-SDK infrastructure.
@@ -56,11 +57,15 @@ _long_running_callback: ContextVar[LongRunningCallback | None] = ContextVar(
"long_running_callback", default=None
)
# ContextVar for the e2b sandbox manager (set when e2b is enabled).
_sandbox_manager: ContextVar[Any | None] = ContextVar("sandbox_manager", default=None)
def set_execution_context(
user_id: str | None,
session: ChatSession,
long_running_callback: LongRunningCallback | None = None,
sandbox_manager: Any | None = None,
) -> None:
"""Set the execution context for tool calls.
@@ -72,11 +77,13 @@ def set_execution_context(
session: Current chat session.
long_running_callback: Optional callback to delegate long-running tools
to the non-SDK background infrastructure (stream_registry + Redis).
sandbox_manager: Optional CoPilotSandboxManager for e2b sandbox access.
"""
_current_user_id.set(user_id)
_current_session.set(session)
_pending_tool_outputs.set({})
_long_running_callback.set(long_running_callback)
_sandbox_manager.set(sandbox_manager)
def get_execution_context() -> tuple[str | None, ChatSession | None]:
@@ -87,6 +94,11 @@ def get_execution_context() -> tuple[str | None, ChatSession | None]:
)
def get_sandbox_manager() -> Any | None:
"""Get the current e2b sandbox manager from execution context."""
return _sandbox_manager.get(None)
def pop_pending_tool_output(tool_name: str) -> str | None:
"""Pop and return the oldest stashed output for *tool_name*.

View File

@@ -13,6 +13,15 @@ from .bash_exec import BashExecTool
from .check_operation_status import CheckOperationStatusTool
from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool
from .e2b_file_tools import (
E2BEditTool,
E2BGlobTool,
E2BGrepTool,
E2BReadTool,
E2BWriteTool,
LoadFromWorkspaceTool,
SaveToWorkspaceTool,
)
from .edit_agent import EditAgentTool
from .feature_requests import CreateFeatureRequestTool, SearchFeatureRequestsTool
from .find_agent import FindAgentTool
@@ -63,6 +72,14 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"read_workspace_file": ReadWorkspaceFileTool(),
"write_workspace_file": WriteWorkspaceFileTool(),
"delete_workspace_file": DeleteWorkspaceFileTool(),
# E2B sandbox file tools (active when COPILOT_E2B feature flag is enabled)
"read_file": E2BReadTool(),
"write_file": E2BWriteTool(),
"edit_file": E2BEditTool(),
"glob_files": E2BGlobTool(),
"grep_files": E2BGrepTool(),
"save_to_workspace": SaveToWorkspaceTool(),
"load_from_workspace": LoadFromWorkspaceTool(),
}
# Export individual tool instances for backwards compatibility

View File

@@ -1,14 +1,15 @@
"""Bash execution tool — run shell commands in a bubblewrap sandbox.
"""Bash execution tool — run shell commands in a sandbox.
Supports two backends:
- **e2b** (preferred): VM-level isolation with network access, enabled via
the COPILOT_E2B feature flag.
- **bubblewrap** (fallback): kernel-level isolation, no network, Linux-only.
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
Safety comes from OS-level isolation (bubblewrap): only system dirs visible
read-only, writable workspace only, clean env, no network.
Requires bubblewrap (``bwrap``) — the tool is disabled when bwrap is not
available (e.g. macOS development).
"""
import logging
import shlex
from typing import Any
from backend.copilot.model import ChatSession
@@ -19,6 +20,8 @@ from .sandbox import get_workspace_dir, has_full_sandbox, run_sandboxed
logger = logging.getLogger(__name__)
_SANDBOX_HOME = "/home/user"
class BashExecTool(BaseTool):
"""Execute Bash commands in a bubblewrap sandbox."""
@@ -29,6 +32,18 @@ class BashExecTool(BaseTool):
@property
def description(self) -> str:
if _is_e2b_available():
return (
"Execute a Bash command or script in an e2b sandbox (microVM). "
"Full Bash scripting is supported (loops, conditionals, pipes, "
"functions, etc.). "
"The sandbox shares the same filesystem as the read_file/write_file "
"tools — files created by any tool are accessible to all others. "
"Network access IS available (pip install, curl, etc.). "
"Working directory is /home/user/. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr."
)
if not has_full_sandbox():
return (
"Bash execution is DISABLED — bubblewrap sandbox is not "
@@ -85,13 +100,6 @@ class BashExecTool(BaseTool):
) -> ToolResponseBase:
session_id = session.session_id if session else None
if not has_full_sandbox():
return ErrorResponse(
message="bash_exec requires bubblewrap sandbox (Linux only).",
error="sandbox_unavailable",
session_id=session_id,
)
command: str = (kwargs.get("command") or "").strip()
timeout: int = kwargs.get("timeout", 30)
@@ -102,6 +110,20 @@ class BashExecTool(BaseTool):
session_id=session_id,
)
# --- E2B path ---
if _is_e2b_available():
return await self._execute_e2b(
command, timeout, session, user_id, session_id
)
# --- Bubblewrap fallback ---
if not has_full_sandbox():
return ErrorResponse(
message="bash_exec requires bubblewrap sandbox (Linux only).",
error="sandbox_unavailable",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
stdout, stderr, exit_code, timed_out = await run_sandboxed(
@@ -122,3 +144,72 @@ class BashExecTool(BaseTool):
timed_out=timed_out,
session_id=session_id,
)
async def _execute_e2b(
self,
command: str,
timeout: int,
session: ChatSession,
user_id: str | None,
session_id: str | None,
) -> ToolResponseBase:
"""Execute command in e2b sandbox."""
try:
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
manager = get_sandbox_manager()
if manager is None:
return ErrorResponse(
message="E2B sandbox manager not available.",
error="sandbox_unavailable",
session_id=session_id,
)
sandbox = await manager.get_or_create(
session_id or "default", user_id or "anonymous"
)
result = await sandbox.commands.run(
f"bash -c {shlex.quote(command)}",
cwd=_SANDBOX_HOME,
timeout=timeout,
)
return BashExecResponse(
message=f"Command executed (exit {result.exit_code})",
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
timed_out=False,
session_id=session_id,
)
except Exception as e:
error_str = str(e)
if "timeout" in error_str.lower():
return BashExecResponse(
message="Execution timed out",
stdout="",
stderr=f"Execution timed out after {timeout}s",
exit_code=-1,
timed_out=True,
session_id=session_id,
)
return ErrorResponse(
message=f"E2B execution failed: {e}",
error=error_str,
session_id=session_id,
)
# ------------------------------------------------------------------
# Module-level helpers (placed after classes that call them)
# ------------------------------------------------------------------
def _is_e2b_available() -> bool:
"""Check if e2b sandbox is available via execution context."""
try:
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
return get_sandbox_manager() is not None
except Exception:
return False

View File

@@ -0,0 +1,703 @@
"""E2B file tools — MCP tools that proxy filesystem operations to the e2b sandbox.
These replace the SDK built-in Read/Write/Edit/Glob/Grep tools when e2b is
enabled, ensuring all file operations go through the sandbox VM.
"""
import logging
import posixpath
import shlex
from typing import Any
from backend.copilot.model import ChatSession
from .base import BaseTool
from .models import BashExecResponse, ErrorResponse, ToolResponseBase
logger = logging.getLogger(__name__)
_SANDBOX_HOME = "/home/user"
class E2BReadTool(BaseTool):
"""Read a file from the e2b sandbox filesystem."""
@property
def name(self) -> str:
return "read_file"
@property
def description(self) -> str:
return (
"Read a file from the sandbox filesystem. "
"The sandbox is the shared working environment — files created by "
"any tool (bash_exec, write_file, etc.) are accessible here. "
"Returns the file content as text. "
"Use offset and limit for large files."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path to the file to read (relative to /home/user/ "
"or absolute within /home/user/)."
),
},
"offset": {
"type": "integer",
"description": (
"Line number to start reading from (0-indexed). Default: 0"
),
},
"limit": {
"type": "integer",
"description": "Number of lines to read. Default: 2000",
},
},
"required": ["path"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
path = kwargs.get("path", "")
offset = kwargs.get("offset", 0)
limit = kwargs.get("limit", 2000)
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(path)
if resolved is None:
return _path_error(path, session)
try:
content = await sandbox.files.read(resolved)
lines = content.splitlines(keepends=True)
selected = lines[offset : offset + limit]
text = "".join(selected)
return BashExecResponse(
message=f"Read {len(selected)} lines from {resolved}",
stdout=text,
stderr="",
exit_code=0,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to read {resolved}: {e}",
error=str(e),
session_id=session.session_id,
)
class E2BWriteTool(BaseTool):
"""Write a file to the e2b sandbox filesystem."""
@property
def name(self) -> str:
return "write_file"
@property
def description(self) -> str:
return (
"Write or create a file in the sandbox filesystem. "
"This is the shared working environment — files are accessible "
"to bash_exec and other tools. "
"Creates parent directories automatically."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path for the file (relative to /home/user/ "
"or absolute within /home/user/)."
),
},
"content": {
"type": "string",
"description": "Content to write to the file.",
},
},
"required": ["path", "content"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
path = kwargs.get("path", "")
content = kwargs.get("content", "")
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(path)
if resolved is None:
return _path_error(path, session)
try:
# Ensure parent directory exists
parent = posixpath.dirname(resolved)
if parent and parent != _SANDBOX_HOME:
await sandbox.commands.run(f"mkdir -p {parent}", timeout=5)
await sandbox.files.write(resolved, content)
return BashExecResponse(
message=f"Wrote {len(content)} bytes to {resolved}",
stdout=f"Successfully wrote to {resolved}",
stderr="",
exit_code=0,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to write {resolved}: {e}",
error=str(e),
session_id=session.session_id,
)
class E2BEditTool(BaseTool):
"""Edit a file in the e2b sandbox using search/replace."""
@property
def name(self) -> str:
return "edit_file"
@property
def description(self) -> str:
return (
"Edit a file in the sandbox by replacing exact text. "
"Provide old_text (the exact text to find) and new_text "
"(what to replace it with). The old_text must match exactly."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Path to the file (relative to /home/user/ "
"or absolute within /home/user/)."
),
},
"old_text": {
"type": "string",
"description": "Exact text to find in the file.",
},
"new_text": {
"type": "string",
"description": "Text to replace old_text with.",
},
},
"required": ["path", "old_text", "new_text"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
path = kwargs.get("path", "")
old_text = kwargs.get("old_text", "")
new_text = kwargs.get("new_text", "")
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(path)
if resolved is None:
return _path_error(path, session)
try:
content = await sandbox.files.read(resolved)
occurrences = content.count(old_text)
if occurrences == 0:
return ErrorResponse(
message=f"old_text not found in {resolved}",
error="text_not_found",
session_id=session.session_id,
)
if occurrences > 1:
return ErrorResponse(
message=(
f"old_text found {occurrences} times in {resolved}. "
"Please provide more context to make the match unique."
),
error="ambiguous_match",
session_id=session.session_id,
)
new_content = content.replace(old_text, new_text, 1)
await sandbox.files.write(resolved, new_content)
return BashExecResponse(
message=f"Edited {resolved}",
stdout=f"Successfully edited {resolved}",
stderr="",
exit_code=0,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to edit {resolved}: {e}",
error=str(e),
session_id=session.session_id,
)
class E2BGlobTool(BaseTool):
"""List files matching a pattern in the e2b sandbox."""
@property
def name(self) -> str:
return "glob_files"
@property
def description(self) -> str:
return (
"List files in the sandbox matching a glob pattern. "
"Uses find under the hood. Default directory is /home/user/."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": (
"Glob pattern to match (e.g., '*.py', '**/*.json')."
),
},
"path": {
"type": "string",
"description": ("Directory to search in (default: /home/user/)."),
},
},
"required": ["pattern"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
pattern = kwargs.get("pattern", "*")
path = kwargs.get("path", _SANDBOX_HOME)
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(path)
if resolved is None:
return _path_error(path, session)
try:
result = await sandbox.commands.run(
f"find {resolved} -name {shlex.quote(pattern)} -type f 2>/dev/null",
timeout=15,
)
return BashExecResponse(
message="Glob results",
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to glob: {e}",
error=str(e),
session_id=session.session_id,
)
class E2BGrepTool(BaseTool):
"""Search file contents in the e2b sandbox."""
@property
def name(self) -> str:
return "grep_files"
@property
def description(self) -> str:
return (
"Search for a pattern in files within the sandbox. "
"Uses grep -rn under the hood. Returns matching lines with "
"file paths and line numbers."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Search pattern (regex supported).",
},
"path": {
"type": "string",
"description": ("Directory to search in (default: /home/user/)."),
},
"include": {
"type": "string",
"description": "File glob to include (e.g., '*.py').",
},
},
"required": ["pattern"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
pattern = kwargs.get("pattern", "")
path = kwargs.get("path", _SANDBOX_HOME)
include = kwargs.get("include", "")
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(path)
if resolved is None:
return _path_error(path, session)
include_flag = f" --include={shlex.quote(include)}" if include else ""
try:
result = await sandbox.commands.run(
f"grep -rn{include_flag} {shlex.quote(pattern)} {resolved} 2>/dev/null",
timeout=15,
)
return BashExecResponse(
message="Grep results",
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to grep: {e}",
error=str(e),
session_id=session.session_id,
)
class SaveToWorkspaceTool(BaseTool):
"""Copy a file from e2b sandbox to the persistent GCS workspace."""
@property
def name(self) -> str:
return "save_to_workspace"
@property
def description(self) -> str:
return (
"Save a file from the sandbox to the persistent workspace "
"(cloud storage). Files saved to workspace survive across sessions. "
"Provide the sandbox file path and optional workspace path."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"sandbox_path": {
"type": "string",
"description": "Path of the file in the sandbox to save.",
},
"workspace_path": {
"type": "string",
"description": (
"Path in the workspace to save to "
"(defaults to the sandbox filename)."
),
},
},
"required": ["sandbox_path"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
sandbox_path = kwargs.get("sandbox_path", "")
workspace_path = kwargs.get("workspace_path", "")
if not user_id:
return ErrorResponse(
message="Authentication required",
session_id=session.session_id,
)
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
resolved = _resolve_path(sandbox_path)
if resolved is None:
return _path_error(sandbox_path, session)
try:
content_bytes = await sandbox.files.read(resolved, format="bytes")
# Determine workspace path
filename = resolved.rsplit("/", 1)[-1]
wp = workspace_path or f"/{filename}"
from backend.data.db_accessors import workspace_db
from backend.util.workspace import WorkspaceManager
workspace = await workspace_db().get_or_create_workspace(user_id)
manager = WorkspaceManager(user_id, workspace.id, session.session_id)
file_record = await manager.write_file(
content=content_bytes,
filename=filename,
path=wp,
overwrite=True,
)
return BashExecResponse(
message=f"Saved {resolved} to workspace at {file_record.path}",
stdout=(
f"Saved to workspace: {file_record.path} "
f"({file_record.size_bytes} bytes)"
),
stderr="",
exit_code=0,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to save to workspace: {e}",
error=str(e),
session_id=session.session_id,
)
class LoadFromWorkspaceTool(BaseTool):
"""Copy a file from the persistent GCS workspace into the e2b sandbox."""
@property
def name(self) -> str:
return "load_from_workspace"
@property
def description(self) -> str:
return (
"Load a file from the persistent workspace (cloud storage) into "
"the sandbox. Use this to bring workspace files into the sandbox "
"for editing or processing."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"workspace_path": {
"type": "string",
"description": ("Path of the file in the workspace to load."),
},
"sandbox_path": {
"type": "string",
"description": (
"Path in the sandbox to write to "
"(defaults to /home/user/<filename>)."
),
},
},
"required": ["workspace_path"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
workspace_path = kwargs.get("workspace_path", "")
sandbox_path = kwargs.get("sandbox_path", "")
if not user_id:
return ErrorResponse(
message="Authentication required",
session_id=session.session_id,
)
sandbox = await _get_sandbox(session)
if sandbox is None:
return _sandbox_unavailable(session)
try:
from backend.data.db_accessors import workspace_db
from backend.util.workspace import WorkspaceManager
workspace = await workspace_db().get_or_create_workspace(user_id)
manager = WorkspaceManager(user_id, workspace.id, session.session_id)
file_info = await manager.get_file_info_by_path(workspace_path)
if file_info is None:
return ErrorResponse(
message=f"File not found in workspace: {workspace_path}",
session_id=session.session_id,
)
content = await manager.read_file_by_id(file_info.id)
# Determine sandbox path
filename = workspace_path.rsplit("/", 1)[-1]
target = sandbox_path or f"{_SANDBOX_HOME}/{filename}"
resolved = _resolve_path(target)
if resolved is None:
return _path_error(target, session)
# Ensure parent directory exists
parent = posixpath.dirname(resolved)
if parent and parent != _SANDBOX_HOME:
await sandbox.commands.run(f"mkdir -p {parent}", timeout=5)
await sandbox.files.write(resolved, content)
return BashExecResponse(
message=f"Loaded {workspace_path} into sandbox at {resolved}",
stdout=(f"Loaded from workspace: {resolved} ({len(content)} bytes)"),
stderr="",
exit_code=0,
timed_out=False,
session_id=session.session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to load from workspace: {e}",
error=str(e),
session_id=session.session_id,
)
# ------------------------------------------------------------------
# Module-level helpers (placed after functions that call them)
# ------------------------------------------------------------------
def _resolve_path(path: str) -> str | None:
"""Resolve a path to an absolute path within /home/user/.
Returns None if the path escapes the sandbox home.
"""
if not path:
return None
# Handle relative paths
if not path.startswith("/"):
path = f"{_SANDBOX_HOME}/{path}"
# Normalize to prevent traversal
resolved = posixpath.normpath(path)
if not resolved.startswith(_SANDBOX_HOME):
return None
return resolved
async def _get_sandbox(session: ChatSession) -> Any | None:
"""Get the sandbox for the current session from the execution context."""
try:
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
manager = get_sandbox_manager()
if manager is None:
return None
user_id, _ = _get_user_from_context()
return await manager.get_or_create(session.session_id, user_id or "anonymous")
except Exception as e:
logger.error(f"[E2B] Failed to get sandbox: {e}")
return None
def _get_user_from_context() -> tuple[str | None, Any]:
"""Get user_id from execution context."""
from backend.copilot.sdk.tool_adapter import get_execution_context
return get_execution_context()
def _sandbox_unavailable(session: ChatSession) -> ErrorResponse:
"""Return an error response for unavailable sandbox."""
return ErrorResponse(
message="E2B sandbox is not available. Try again or contact support.",
error="sandbox_unavailable",
session_id=session.session_id,
)
def _path_error(path: str, session: ChatSession) -> ErrorResponse:
"""Return an error response for invalid paths."""
return ErrorResponse(
message=f"Invalid path: {path}. Paths must be within /home/user/.",
error="invalid_path",
session_id=session.session_id,
)

View File

@@ -0,0 +1,215 @@
"""E2B sandbox manager for CoPilot sessions.
Manages e2b sandbox lifecycle: create, reuse via Redis, dispose with GCS sync.
One sandbox per session, cached in-memory on the worker thread and stored in
Redis for cross-pod reconnection.
"""
import asyncio
import logging
import time
from typing import Any
from backend.util.settings import Config
logger = logging.getLogger(__name__)
_REDIS_KEY_PREFIX = "copilot:sandbox:"
_SANDBOX_HOME = "/home/user"
class CoPilotSandboxManager:
"""Manages e2b sandbox lifecycle for CoPilot sessions.
Each session gets a single sandbox. The sandbox_id is stored in Redis
so another pod can reconnect to it if the original pod dies.
"""
def __init__(self) -> None:
self._sandboxes: dict[str, Any] = {} # session_id -> AsyncSandbox
self._last_activity: dict[str, float] = {} # session_id -> timestamp
self._cleanup_task: asyncio.Task[None] | None = None
config = Config()
self._timeout: int = config.copilot_sandbox_timeout
self._template: str = config.copilot_sandbox_template
self._api_key: str = config.e2b_api_key
async def get_or_create(self, session_id: str, user_id: str) -> Any:
"""Get existing sandbox or create a new one for this session.
Args:
session_id: CoPilot chat session ID.
user_id: User ID for workspace sync.
Returns:
An e2b AsyncSandbox instance.
"""
self._last_activity[session_id] = time.monotonic()
# 1. Check in-memory cache
if session_id in self._sandboxes:
sandbox = self._sandboxes[session_id]
if await _is_sandbox_alive(sandbox):
return sandbox
# Sandbox died — clean up stale reference
del self._sandboxes[session_id]
# 2. Check Redis for sandbox_id (cross-pod reconnection)
sandbox = await self._try_reconnect_from_redis(session_id)
if sandbox is not None:
self._sandboxes[session_id] = sandbox
return sandbox
# 3. Create new sandbox
sandbox = await self._create_sandbox(session_id, user_id)
self._sandboxes[session_id] = sandbox
await _store_sandbox_id_in_redis(session_id, sandbox.sandbox_id)
# 4. Start cleanup task if not running
self._ensure_cleanup_task()
return sandbox
async def dispose(self, session_id: str) -> None:
"""Persist workspace files to GCS, then kill sandbox.
Args:
session_id: CoPilot chat session ID.
"""
sandbox = self._sandboxes.pop(session_id, None)
self._last_activity.pop(session_id, None)
if sandbox is None:
return
try:
await sandbox.kill()
except Exception as e:
logger.warning(f"[E2B] Failed to kill sandbox for {session_id}: {e}")
await _remove_sandbox_id_from_redis(session_id)
logger.info(f"[E2B] Disposed sandbox for session {session_id}")
async def dispose_all(self) -> None:
"""Dispose all sandboxes (called on processor shutdown)."""
session_ids = list(self._sandboxes.keys())
for sid in session_ids:
await self.dispose(sid)
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _create_sandbox(self, session_id: str, user_id: str) -> Any:
"""Create a new e2b sandbox."""
from e2b import AsyncSandbox
kwargs: dict[str, Any] = {"api_key": self._api_key}
if self._template:
kwargs["template"] = self._template
if self._timeout:
kwargs["timeout"] = self._timeout
sandbox = await AsyncSandbox.create(**kwargs)
logger.info(
f"[E2B] Created sandbox {sandbox.sandbox_id} for session={session_id}, "
f"user={user_id}"
)
return sandbox
async def _try_reconnect_from_redis(self, session_id: str) -> Any | None:
"""Attempt to reconnect to a sandbox stored in Redis."""
from e2b import AsyncSandbox
sandbox_id = await _load_sandbox_id_from_redis(session_id)
if not sandbox_id:
return None
try:
sandbox = await AsyncSandbox.connect(
sandbox_id=sandbox_id, api_key=self._api_key
)
logger.info(
f"[E2B] Reconnected to sandbox {sandbox_id} for session={session_id}"
)
return sandbox
except Exception as e:
logger.warning(f"[E2B] Failed to reconnect to sandbox {sandbox_id}: {e}")
await _remove_sandbox_id_from_redis(session_id)
return None
def _ensure_cleanup_task(self) -> None:
"""Start the idle cleanup background task if not already running."""
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.ensure_future(self._idle_cleanup_loop())
async def _idle_cleanup_loop(self) -> None:
"""Periodically check for idle sandboxes and dispose them."""
while True:
await asyncio.sleep(60)
if not self._sandboxes:
continue
now = time.monotonic()
to_dispose: list[str] = []
for sid, last in list(self._last_activity.items()):
if now - last > self._timeout:
to_dispose.append(sid)
for sid in to_dispose:
logger.info(f"[E2B] Disposing idle sandbox for session {sid}")
await self.dispose(sid)
# ------------------------------------------------------------------
# Module-level helpers (placed after classes that call them)
# ------------------------------------------------------------------
async def _is_sandbox_alive(sandbox: Any) -> bool:
"""Check if an e2b sandbox is still running."""
try:
result = await sandbox.commands.run("echo ok", timeout=5)
return result.exit_code == 0
except Exception:
return False
async def _store_sandbox_id_in_redis(session_id: str, sandbox_id: str) -> None:
"""Store sandbox_id in Redis keyed by session_id."""
try:
from backend.data import redis as redis_client
redis = redis_client.get_redis()
key = f"{_REDIS_KEY_PREFIX}{session_id}"
config = Config()
ttl = max(config.copilot_sandbox_timeout * 2, 3600) # At least 1h, 2x timeout
await redis.set(key, sandbox_id, ex=ttl)
except Exception as e:
logger.warning(f"[E2B] Failed to store sandbox_id in Redis: {e}")
async def _load_sandbox_id_from_redis(session_id: str) -> str | None:
"""Load sandbox_id from Redis."""
try:
from backend.data import redis as redis_client
redis = redis_client.get_redis()
key = f"{_REDIS_KEY_PREFIX}{session_id}"
value = await redis.get(key)
return value.decode() if isinstance(value, bytes) else value
except Exception as e:
logger.warning(f"[E2B] Failed to load sandbox_id from Redis: {e}")
return None
async def _remove_sandbox_id_from_redis(session_id: str) -> None:
"""Remove sandbox_id from Redis."""
try:
from backend.data import redis as redis_client
redis = redis_client.get_redis()
key = f"{_REDIS_KEY_PREFIX}{session_id}"
await redis.delete(key)
except Exception as e:
logger.warning(f"[E2B] Failed to remove sandbox_id from Redis: {e}")

View File

@@ -39,6 +39,7 @@ class Flag(str, Enum):
ENABLE_PLATFORM_PAYMENT = "enable-platform-payment"
CHAT = "chat"
COPILOT_SDK = "copilot-sdk"
COPILOT_E2B = "copilot-e2b"
def is_configured() -> bool:

View File

@@ -665,6 +665,18 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
fal_api_key: str = Field(default="", description="FAL API key")
exa_api_key: str = Field(default="", description="Exa API key")
e2b_api_key: str = Field(default="", description="E2B API key")
copilot_sandbox_timeout: int = Field(
default=900,
description="E2B sandbox idle timeout in seconds (default 15 min).",
)
copilot_sandbox_template: str = Field(
default="",
description="E2B sandbox template ID (empty = default template).",
)
copilot_use_e2b: bool = Field(
default=False,
description="Enable e2b sandbox for CoPilot (feature flag default).",
)
nvidia_api_key: str = Field(default="", description="Nvidia API key")
mem0_api_key: str = Field(default="", description="Mem0 API key")
elevenlabs_api_key: str = Field(default="", description="ElevenLabs API key")