Compare commits
12 Commits
feat/copil
...
copilot/sd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f47cd573af | ||
|
|
670f812f0f | ||
|
|
ecfe4e6a7a | ||
|
|
efb4b3b518 | ||
|
|
ebeab7fbe6 | ||
|
|
98ef8a26ab | ||
|
|
ed02e6db9e | ||
|
|
6952334b85 | ||
|
|
0c586c2edf | ||
|
|
b6128dd75f | ||
|
|
c4f5f7c8b8 | ||
|
|
8af4e0bf7d |
@@ -16,7 +16,6 @@ from .tool_adapter import (
|
||||
DANGEROUS_PATTERNS,
|
||||
MCP_TOOL_PREFIX,
|
||||
WORKSPACE_SCOPED_TOOLS,
|
||||
get_sandbox_manager,
|
||||
stash_pending_tool_output,
|
||||
)
|
||||
|
||||
@@ -98,10 +97,8 @@ def _validate_tool_access(
|
||||
"Use the CoPilot-specific MCP tools instead."
|
||||
)
|
||||
|
||||
# Workspace-scoped tools: allowed only within the SDK workspace directory.
|
||||
# When e2b is enabled, these SDK built-in tools are disabled (replaced by
|
||||
# MCP e2b file tools), so skip workspace path validation.
|
||||
if tool_name in WORKSPACE_SCOPED_TOOLS and get_sandbox_manager() is None:
|
||||
# Workspace-scoped tools: allowed only within the SDK workspace directory
|
||||
if tool_name in WORKSPACE_SCOPED_TOOLS:
|
||||
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
|
||||
|
||||
# Check for dangerous patterns in tool input
|
||||
|
||||
@@ -58,9 +58,6 @@ from .transcript import (
|
||||
logger = logging.getLogger(__name__)
|
||||
config = ChatConfig()
|
||||
|
||||
# SDK built-in file tools to disable when e2b is active (replaced by MCP tools)
|
||||
_E2B_DISALLOWED_SDK_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep"]
|
||||
|
||||
# Set to hold background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
@@ -86,36 +83,42 @@ _SDK_TOOL_SUPPLEMENT = """
|
||||
|
||||
## Tool notes
|
||||
|
||||
### Shell commands
|
||||
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
|
||||
for shell commands — it runs in a network-isolated sandbox.
|
||||
- **Shared workspace**: The SDK Read/Write tools and `bash_exec` share the
|
||||
same working directory. Files created by one are readable by the other.
|
||||
- **IMPORTANT — File persistence**: Your working directory is **ephemeral** —
|
||||
files are lost between turns. When you create or modify important files
|
||||
(code, configs, outputs), you MUST save them using `write_workspace_file`
|
||||
so they persist. Use `read_workspace_file` and `list_workspace_files` to
|
||||
access files saved in previous turns. If a "Files from previous turns"
|
||||
section is present above, those files are available via `read_workspace_file`.
|
||||
- Long-running tools (create_agent, edit_agent, etc.) are handled
|
||||
asynchronously. You will receive an immediate response; the actual result
|
||||
is delivered to the user via a background stream.
|
||||
"""
|
||||
|
||||
_SDK_TOOL_SUPPLEMENT_E2B = """
|
||||
### Two storage systems — CRITICAL to understand
|
||||
|
||||
## Tool notes
|
||||
1. **Ephemeral working directory** (`/tmp/copilot-<session>/`):
|
||||
- Shared by SDK Read/Write/Edit/Glob/Grep tools AND `bash_exec`
|
||||
- Files here are **lost between turns** — do NOT rely on them persisting
|
||||
- Use for temporary work: running scripts, processing data, etc.
|
||||
|
||||
- The SDK built-in Bash, Read, Write, Edit, Glob, and Grep tools are NOT available.
|
||||
Use the MCP tools instead: `bash_exec`, `read_file`, `write_file`, `edit_file`,
|
||||
`glob_files`, `grep_files`.
|
||||
- **All tools share a single sandbox**: The sandbox is a microVM with a shared
|
||||
filesystem at /home/user/. Files created by any tool are accessible to all others.
|
||||
Network access IS available (pip install, curl, etc.).
|
||||
- **Persistent storage**: Use `save_to_workspace` to persist sandbox files to cloud
|
||||
storage, and `load_from_workspace` to bring workspace files into the sandbox.
|
||||
- Long-running tools (create_agent, edit_agent, etc.) are handled
|
||||
asynchronously. You will receive an immediate response; the actual result
|
||||
is delivered to the user via a background stream.
|
||||
2. **Persistent workspace** (cloud storage):
|
||||
- Files here **survive across turns and sessions**
|
||||
- Use `write_workspace_file` to save important files (code, outputs, configs)
|
||||
- Use `read_workspace_file` to retrieve previously saved files
|
||||
- Use `list_workspace_files` to see what files you've saved before
|
||||
- Call `list_workspace_files(include_all_sessions=True)` to see files from
|
||||
all sessions
|
||||
|
||||
### Moving files between ephemeral and persistent storage
|
||||
- **Ephemeral → Persistent**: Use `write_workspace_file` with either:
|
||||
- `content` param (plain text) — for text files
|
||||
- `source_path` param — to copy any file directly from the ephemeral dir
|
||||
- **Persistent → Ephemeral**: Use `read_workspace_file` with `save_to_path`
|
||||
param to download a workspace file to the ephemeral dir for processing
|
||||
|
||||
### File persistence workflow
|
||||
When you create or modify important files (code, configs, outputs), you MUST:
|
||||
1. Save them using `write_workspace_file` so they persist
|
||||
2. At the start of a new turn, call `list_workspace_files` to see what files
|
||||
are available from previous turns
|
||||
|
||||
### Long-running tools
|
||||
Long-running tools (create_agent, edit_agent, etc.) are handled
|
||||
asynchronously. You will receive an immediate response; the actual result
|
||||
is delivered to the user via a background stream.
|
||||
"""
|
||||
|
||||
|
||||
@@ -394,11 +397,9 @@ async def _compress_conversation_history(
|
||||
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
|
||||
"""Format conversation messages into a context prefix for the user message.
|
||||
|
||||
Returns a string like:
|
||||
<conversation_history>
|
||||
User: hello
|
||||
You responded: Hi! How can I help?
|
||||
</conversation_history>
|
||||
Includes user messages, assistant text, tool call summaries, and
|
||||
tool result summaries so the agent retains full context about what
|
||||
tools were invoked and their outcomes.
|
||||
|
||||
Returns None if there are no messages to format.
|
||||
"""
|
||||
@@ -407,13 +408,28 @@ def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
|
||||
|
||||
lines: list[str] = []
|
||||
for msg in messages:
|
||||
if not msg.content:
|
||||
continue
|
||||
if msg.role == "user":
|
||||
lines.append(f"User: {msg.content}")
|
||||
if msg.content:
|
||||
lines.append(f"User: {msg.content}")
|
||||
elif msg.role == "assistant":
|
||||
lines.append(f"You responded: {msg.content}")
|
||||
# Skip tool messages — they're internal details
|
||||
if msg.content:
|
||||
lines.append(f"You responded: {msg.content}")
|
||||
# Include tool call summaries
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
tool_name = func.get("name", "unknown")
|
||||
tool_args = func.get("arguments", "")
|
||||
# Truncate long arguments
|
||||
if len(tool_args) > 200:
|
||||
tool_args = tool_args[:200] + "..."
|
||||
lines.append(f"You called tool: {tool_name}({tool_args})")
|
||||
elif msg.role == "tool":
|
||||
# Include tool results (truncated to avoid context bloat)
|
||||
content = msg.content or ""
|
||||
if len(content) > 300:
|
||||
content = content[:300] + "..."
|
||||
lines.append(f"Tool result: {content}")
|
||||
|
||||
if not lines:
|
||||
return None
|
||||
@@ -473,33 +489,12 @@ async def stream_chat_completion_sdk(
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Check if e2b sandbox is enabled for this user
|
||||
sandbox_mgr = None
|
||||
use_e2b = False
|
||||
try:
|
||||
from backend.util.feature_flag import Flag
|
||||
from backend.util.feature_flag import is_feature_enabled as _is_flag_enabled
|
||||
from backend.util.settings import Config as AppConfig
|
||||
|
||||
app_config = AppConfig()
|
||||
use_e2b = await _is_flag_enabled(
|
||||
Flag.COPILOT_E2B,
|
||||
user_id or "anonymous",
|
||||
default=app_config.copilot_use_e2b,
|
||||
)
|
||||
if use_e2b:
|
||||
from backend.copilot.tools.e2b_sandbox import CoPilotSandboxManager
|
||||
|
||||
sandbox_mgr = CoPilotSandboxManager()
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Failed to initialize e2b sandbox: {e}")
|
||||
|
||||
# Build system prompt (reuses non-SDK path with Langfuse support)
|
||||
has_history = len(session.messages) > 1
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
)
|
||||
system_prompt += _SDK_TOOL_SUPPLEMENT_E2B if use_e2b else _SDK_TOOL_SUPPLEMENT
|
||||
system_prompt += _SDK_TOOL_SUPPLEMENT
|
||||
message_id = str(uuid.uuid4())
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
@@ -521,7 +516,6 @@ async def stream_chat_completion_sdk(
|
||||
user_id,
|
||||
session,
|
||||
long_running_callback=_build_long_running_callback(user_id),
|
||||
sandbox_manager=sandbox_mgr,
|
||||
)
|
||||
try:
|
||||
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
|
||||
@@ -561,6 +555,20 @@ async def stream_chat_completion_sdk(
|
||||
|
||||
if config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
||||
dl = await download_transcript(user_id, session_id)
|
||||
if dl and validate_transcript(dl.content):
|
||||
logger.info(
|
||||
f"[SDK] Transcript available for session {session_id}: "
|
||||
f"{len(dl.content)}B, msg_count={dl.message_count}"
|
||||
)
|
||||
elif dl:
|
||||
logger.warning(
|
||||
f"[SDK] Transcript downloaded but invalid for {session_id}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[SDK] No transcript available for {session_id} "
|
||||
f"({len(session.messages)} messages in session)"
|
||||
)
|
||||
if dl and validate_transcript(dl.content):
|
||||
resume_file = write_transcript_to_tempfile(
|
||||
dl.content, session_id, sdk_cwd
|
||||
@@ -573,21 +581,11 @@ async def stream_chat_completion_sdk(
|
||||
f"msg_count={transcript_msg_count})"
|
||||
)
|
||||
|
||||
# When e2b is active, disable SDK built-in file tools
|
||||
# (replaced by MCP e2b tools) and remove them from allowed list
|
||||
effective_disallowed = list(SDK_DISALLOWED_TOOLS)
|
||||
effective_allowed = list(COPILOT_TOOL_NAMES)
|
||||
if use_e2b:
|
||||
effective_disallowed.extend(_E2B_DISALLOWED_SDK_TOOLS)
|
||||
effective_allowed = [
|
||||
t for t in effective_allowed if t not in _E2B_DISALLOWED_SDK_TOOLS
|
||||
]
|
||||
|
||||
sdk_options_kwargs: dict[str, Any] = {
|
||||
"system_prompt": system_prompt,
|
||||
"mcp_servers": {"copilot": mcp_server},
|
||||
"allowed_tools": effective_allowed,
|
||||
"disallowed_tools": effective_disallowed,
|
||||
"allowed_tools": COPILOT_TOOL_NAMES,
|
||||
"disallowed_tools": SDK_DISALLOWED_TOOLS,
|
||||
"hooks": security_hooks,
|
||||
"cwd": sdk_cwd,
|
||||
"max_buffer_size": config.claude_agent_max_buffer_size,
|
||||
@@ -663,7 +661,7 @@ async def stream_chat_completion_sdk(
|
||||
f"Now, the user says:\n{current_message}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[SDK] Sending query ({len(session.messages)} msgs, "
|
||||
f"resume={use_resume})"
|
||||
)
|
||||
@@ -801,11 +799,6 @@ async def stream_chat_completion_sdk(
|
||||
)
|
||||
yield StreamFinish()
|
||||
finally:
|
||||
if sandbox_mgr:
|
||||
try:
|
||||
await sandbox_mgr.dispose_all()
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Failed to dispose e2b sandboxes: {e}")
|
||||
if sdk_cwd:
|
||||
_cleanup_sdk_tool_results(sdk_cwd)
|
||||
|
||||
|
||||
@@ -42,8 +42,7 @@ _current_session: ContextVar[ChatSession | None] = ContextVar(
|
||||
# Keyed by tool_name → full output string. Consumed (popped) by the
|
||||
# response adapter when it builds StreamToolOutputAvailable.
|
||||
_pending_tool_outputs: ContextVar[dict[str, list[str]]] = ContextVar(
|
||||
"pending_tool_outputs",
|
||||
default=None, # type: ignore[arg-type]
|
||||
"pending_tool_outputs", default=None # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
# Callback type for delegating long-running tools to the non-SDK infrastructure.
|
||||
@@ -57,15 +56,11 @@ _long_running_callback: ContextVar[LongRunningCallback | None] = ContextVar(
|
||||
"long_running_callback", default=None
|
||||
)
|
||||
|
||||
# ContextVar for the e2b sandbox manager (set when e2b is enabled).
|
||||
_sandbox_manager: ContextVar[Any | None] = ContextVar("sandbox_manager", default=None)
|
||||
|
||||
|
||||
def set_execution_context(
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
long_running_callback: LongRunningCallback | None = None,
|
||||
sandbox_manager: Any | None = None,
|
||||
) -> None:
|
||||
"""Set the execution context for tool calls.
|
||||
|
||||
@@ -77,13 +72,11 @@ def set_execution_context(
|
||||
session: Current chat session.
|
||||
long_running_callback: Optional callback to delegate long-running tools
|
||||
to the non-SDK background infrastructure (stream_registry + Redis).
|
||||
sandbox_manager: Optional CoPilotSandboxManager for e2b sandbox access.
|
||||
"""
|
||||
_current_user_id.set(user_id)
|
||||
_current_session.set(session)
|
||||
_pending_tool_outputs.set({})
|
||||
_long_running_callback.set(long_running_callback)
|
||||
_sandbox_manager.set(sandbox_manager)
|
||||
|
||||
|
||||
def get_execution_context() -> tuple[str | None, ChatSession | None]:
|
||||
@@ -94,11 +87,6 @@ def get_execution_context() -> tuple[str | None, ChatSession | None]:
|
||||
)
|
||||
|
||||
|
||||
def get_sandbox_manager() -> Any | None:
|
||||
"""Get the current e2b sandbox manager from execution context."""
|
||||
return _sandbox_manager.get(None)
|
||||
|
||||
|
||||
def pop_pending_tool_output(tool_name: str) -> str | None:
|
||||
"""Pop and return the oldest stashed output for *tool_name*.
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ def read_transcript_file(transcript_path: str) -> str | None:
|
||||
json.loads(lines[0])
|
||||
json.loads(lines[-1])
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[Transcript] Read {len(lines)} lines, "
|
||||
f"{len(content)} bytes from {transcript_path}"
|
||||
)
|
||||
@@ -234,7 +234,7 @@ def write_transcript_to_tempfile(
|
||||
with open(jsonl_path, "w") as f:
|
||||
f.write(transcript_content)
|
||||
|
||||
logger.info(f"[Transcript] Wrote resume file: {jsonl_path}")
|
||||
logger.debug(f"[Transcript] Wrote resume file: {jsonl_path}")
|
||||
return jsonl_path
|
||||
|
||||
except OSError as e:
|
||||
@@ -357,7 +357,7 @@ async def upload_transcript(
|
||||
try:
|
||||
existing = await storage.retrieve(path)
|
||||
if len(existing) >= new_size:
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[Transcript] Skipping upload — existing ({len(existing)}B) "
|
||||
f">= new ({new_size}B) for session {session_id}"
|
||||
)
|
||||
@@ -439,7 +439,7 @@ async def download_transcript(
|
||||
except (FileNotFoundError, json.JSONDecodeError, Exception):
|
||||
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[Transcript] Downloaded {len(content)}B "
|
||||
f"(msg_count={message_count}) for session {session_id}"
|
||||
)
|
||||
|
||||
@@ -387,7 +387,7 @@ async def stream_chat_completion(
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, "
|
||||
f"message_len={len(message) if message else 0}, is_user={is_user_message}",
|
||||
extra={
|
||||
@@ -404,7 +404,7 @@ async def stream_chat_completion(
|
||||
fetch_start = time.monotonic()
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
fetch_time = (time.monotonic() - fetch_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] get_chat_session took {fetch_time:.1f}ms, "
|
||||
f"n_messages={len(session.messages) if session else 0}",
|
||||
extra={
|
||||
@@ -416,7 +416,7 @@ async def stream_chat_completion(
|
||||
},
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Using provided session, messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
)
|
||||
@@ -450,7 +450,7 @@ async def stream_chat_completion(
|
||||
message_length=len(message),
|
||||
)
|
||||
posthog_time = (time.monotonic() - posthog_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] track_user_message took {posthog_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": posthog_time}},
|
||||
)
|
||||
@@ -458,7 +458,7 @@ async def stream_chat_completion(
|
||||
upsert_start = time.monotonic()
|
||||
session = await upsert_chat_session(session)
|
||||
upsert_time = (time.monotonic() - upsert_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": upsert_time}},
|
||||
)
|
||||
@@ -503,7 +503,7 @@ async def stream_chat_completion(
|
||||
prompt_start = time.monotonic()
|
||||
system_prompt, understanding = await _build_system_prompt(user_id)
|
||||
prompt_time = (time.monotonic() - prompt_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": prompt_time}},
|
||||
)
|
||||
@@ -537,7 +537,7 @@ async def stream_chat_completion(
|
||||
|
||||
# Only yield message start for the initial call, not for continuations.
|
||||
setup_time = (time.monotonic() - completion_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
|
||||
)
|
||||
@@ -548,7 +548,7 @@ async def stream_chat_completion(
|
||||
yield StreamStartStep()
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
logger.debug(
|
||||
"[TIMING] Calling _stream_chat_chunks",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
@@ -988,7 +988,7 @@ async def _stream_chat_chunks(
|
||||
if session.user_id:
|
||||
log_meta["user_id"] = session.user_id
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, "
|
||||
f"user={session.user_id}, n_messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
@@ -1011,7 +1011,7 @@ async def _stream_chat_chunks(
|
||||
base_url=config.base_url,
|
||||
)
|
||||
context_time = (time_module.perf_counter() - context_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _manage_context_window took {context_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": context_time}},
|
||||
)
|
||||
@@ -1053,7 +1053,7 @@ async def _stream_chat_chunks(
|
||||
retry_info = (
|
||||
f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else ""
|
||||
)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -1093,7 +1093,7 @@ async def _stream_chat_chunks(
|
||||
extra_body=extra_body,
|
||||
)
|
||||
api_init_time = (time_module.perf_counter() - api_call_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": api_init_time}},
|
||||
)
|
||||
@@ -1142,7 +1142,7 @@ async def _stream_chat_chunks(
|
||||
ttfc = (
|
||||
time_module.perf_counter() - api_call_start
|
||||
) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms "
|
||||
f"(since API call), n_chunks={chunk_count}",
|
||||
extra={
|
||||
@@ -1210,7 +1210,7 @@ async def _stream_chat_chunks(
|
||||
)
|
||||
emitted_start_for_idx.add(idx)
|
||||
stream_duration = time_module.perf_counter() - api_call_start
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, "
|
||||
f"duration={stream_duration:.2f}s, "
|
||||
f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}",
|
||||
@@ -1244,7 +1244,7 @@ async def _stream_chat_chunks(
|
||||
raise
|
||||
|
||||
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
|
||||
f"session={session.session_id}, user={session.user_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
|
||||
@@ -117,7 +117,7 @@ async def create_task(
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
@@ -135,7 +135,7 @@ async def create_task(
|
||||
redis_start = time.perf_counter()
|
||||
redis = await get_redis_async()
|
||||
redis_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] get_redis_async took {redis_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": redis_time}},
|
||||
)
|
||||
@@ -158,7 +158,7 @@ async def create_task(
|
||||
},
|
||||
)
|
||||
hset_time = (time.perf_counter() - hset_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] redis.hset took {hset_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hset_time}},
|
||||
)
|
||||
@@ -169,7 +169,7 @@ async def create_task(
|
||||
await redis.set(op_key, task_id, ex=config.stream_ttl)
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
@@ -230,7 +230,7 @@ async def publish_chunk(
|
||||
in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd")
|
||||
or total_time > 50
|
||||
):
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -279,7 +279,7 @@ async def subscribe_to_task(
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}",
|
||||
extra={"json_fields": {**log_meta, "last_message_id": last_message_id}},
|
||||
)
|
||||
@@ -289,14 +289,14 @@ async def subscribe_to_task(
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
hgetall_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}},
|
||||
)
|
||||
|
||||
if not meta:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Task not found in Redis after {elapsed:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -335,7 +335,7 @@ async def subscribe_to_task(
|
||||
xread_start = time.perf_counter()
|
||||
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
|
||||
xread_time = (time.perf_counter() - xread_start) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -363,7 +363,7 @@ async def subscribe_to_task(
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay message: {e}")
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -376,7 +376,7 @@ async def subscribe_to_task(
|
||||
|
||||
# Step 2: If task is still running, start stream listener for live updates
|
||||
if task_status == "running":
|
||||
logger.info(
|
||||
logger.debug(
|
||||
"[TIMING] Task still running, starting _stream_listener",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
@@ -387,14 +387,14 @@ async def subscribe_to_task(
|
||||
_listener_tasks[id(subscriber_queue)] = (task_id, listener_task)
|
||||
else:
|
||||
# Task is completed/failed - add finish marker
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] Task already {task_status}, adding StreamFinish",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
await subscriber_queue.put(StreamFinish())
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, "
|
||||
f"n_messages_replayed={replayed_count}",
|
||||
extra={
|
||||
@@ -433,7 +433,7 @@ async def _stream_listener(
|
||||
if log_meta is None:
|
||||
log_meta = {"component": "StreamRegistry", "task_id": task_id}
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}",
|
||||
extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}},
|
||||
)
|
||||
@@ -462,7 +462,7 @@ async def _stream_listener(
|
||||
|
||||
if messages:
|
||||
msg_count = sum(len(msgs) for _, msgs in messages)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -475,7 +475,7 @@ async def _stream_listener(
|
||||
)
|
||||
elif xread_time > 1000:
|
||||
# Only log timeouts (30s blocking)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -526,7 +526,7 @@ async def _stream_listener(
|
||||
if first_message_time is None:
|
||||
first_message_time = time.perf_counter()
|
||||
elapsed = (first_message_time - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -568,7 +568,7 @@ async def _stream_listener(
|
||||
# Stop listening on finish
|
||||
if isinstance(chunk, StreamFinish):
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -587,7 +587,7 @@ async def _stream_listener(
|
||||
|
||||
except asyncio.CancelledError:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
@@ -619,7 +619,7 @@ async def _stream_listener(
|
||||
finally:
|
||||
# Clean up listener task mapping on exit
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, "
|
||||
f"delivered={messages_delivered}, xread_count={xread_count}",
|
||||
extra={
|
||||
@@ -835,7 +835,7 @@ async def get_active_task_for_session(
|
||||
f"for task {task_id[:8]}...: {exc}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
|
||||
)
|
||||
|
||||
|
||||
@@ -13,15 +13,6 @@ from .bash_exec import BashExecTool
|
||||
from .check_operation_status import CheckOperationStatusTool
|
||||
from .create_agent import CreateAgentTool
|
||||
from .customize_agent import CustomizeAgentTool
|
||||
from .e2b_file_tools import (
|
||||
E2BEditTool,
|
||||
E2BGlobTool,
|
||||
E2BGrepTool,
|
||||
E2BReadTool,
|
||||
E2BWriteTool,
|
||||
LoadFromWorkspaceTool,
|
||||
SaveToWorkspaceTool,
|
||||
)
|
||||
from .edit_agent import EditAgentTool
|
||||
from .feature_requests import CreateFeatureRequestTool, SearchFeatureRequestsTool
|
||||
from .find_agent import FindAgentTool
|
||||
@@ -72,14 +63,6 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
|
||||
"read_workspace_file": ReadWorkspaceFileTool(),
|
||||
"write_workspace_file": WriteWorkspaceFileTool(),
|
||||
"delete_workspace_file": DeleteWorkspaceFileTool(),
|
||||
# E2B sandbox file tools (active when COPILOT_E2B feature flag is enabled)
|
||||
"read_file": E2BReadTool(),
|
||||
"write_file": E2BWriteTool(),
|
||||
"edit_file": E2BEditTool(),
|
||||
"glob_files": E2BGlobTool(),
|
||||
"grep_files": E2BGrepTool(),
|
||||
"save_to_workspace": SaveToWorkspaceTool(),
|
||||
"load_from_workspace": LoadFromWorkspaceTool(),
|
||||
}
|
||||
|
||||
# Export individual tool instances for backwards compatibility
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
"""Bash execution tool — run shell commands in a sandbox.
|
||||
|
||||
Supports two backends:
|
||||
- **e2b** (preferred): VM-level isolation with network access, enabled via
|
||||
the COPILOT_E2B feature flag.
|
||||
- **bubblewrap** (fallback): kernel-level isolation, no network, Linux-only.
|
||||
"""Bash execution tool — run shell commands in a bubblewrap sandbox.
|
||||
|
||||
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
|
||||
Safety comes from OS-level isolation (bubblewrap): only system dirs visible
|
||||
read-only, writable workspace only, clean env, no network.
|
||||
|
||||
Requires bubblewrap (``bwrap``) — the tool is disabled when bwrap is not
|
||||
available (e.g. macOS development).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import shlex
|
||||
from typing import Any
|
||||
|
||||
from backend.copilot.model import ChatSession
|
||||
@@ -20,8 +19,6 @@ from .sandbox import get_workspace_dir, has_full_sandbox, run_sandboxed
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SANDBOX_HOME = "/home/user"
|
||||
|
||||
|
||||
class BashExecTool(BaseTool):
|
||||
"""Execute Bash commands in a bubblewrap sandbox."""
|
||||
@@ -32,18 +29,6 @@ class BashExecTool(BaseTool):
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
if _is_e2b_available():
|
||||
return (
|
||||
"Execute a Bash command or script in an e2b sandbox (microVM). "
|
||||
"Full Bash scripting is supported (loops, conditionals, pipes, "
|
||||
"functions, etc.). "
|
||||
"The sandbox shares the same filesystem as the read_file/write_file "
|
||||
"tools — files created by any tool are accessible to all others. "
|
||||
"Network access IS available (pip install, curl, etc.). "
|
||||
"Working directory is /home/user/. "
|
||||
"Execution is killed after the timeout (default 30s, max 120s). "
|
||||
"Returns stdout and stderr."
|
||||
)
|
||||
if not has_full_sandbox():
|
||||
return (
|
||||
"Bash execution is DISABLED — bubblewrap sandbox is not "
|
||||
@@ -100,6 +85,13 @@ class BashExecTool(BaseTool):
|
||||
) -> ToolResponseBase:
|
||||
session_id = session.session_id if session else None
|
||||
|
||||
if not has_full_sandbox():
|
||||
return ErrorResponse(
|
||||
message="bash_exec requires bubblewrap sandbox (Linux only).",
|
||||
error="sandbox_unavailable",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
command: str = (kwargs.get("command") or "").strip()
|
||||
timeout: int = kwargs.get("timeout", 30)
|
||||
|
||||
@@ -110,20 +102,6 @@ class BashExecTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# --- E2B path ---
|
||||
if _is_e2b_available():
|
||||
return await self._execute_e2b(
|
||||
command, timeout, session, user_id, session_id
|
||||
)
|
||||
|
||||
# --- Bubblewrap fallback ---
|
||||
if not has_full_sandbox():
|
||||
return ErrorResponse(
|
||||
message="bash_exec requires bubblewrap sandbox (Linux only).",
|
||||
error="sandbox_unavailable",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
workspace = get_workspace_dir(session_id or "default")
|
||||
|
||||
stdout, stderr, exit_code, timed_out = await run_sandboxed(
|
||||
@@ -144,72 +122,3 @@ class BashExecTool(BaseTool):
|
||||
timed_out=timed_out,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
async def _execute_e2b(
|
||||
self,
|
||||
command: str,
|
||||
timeout: int,
|
||||
session: ChatSession,
|
||||
user_id: str | None,
|
||||
session_id: str | None,
|
||||
) -> ToolResponseBase:
|
||||
"""Execute command in e2b sandbox."""
|
||||
try:
|
||||
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
|
||||
|
||||
manager = get_sandbox_manager()
|
||||
if manager is None:
|
||||
return ErrorResponse(
|
||||
message="E2B sandbox manager not available.",
|
||||
error="sandbox_unavailable",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
sandbox = await manager.get_or_create(
|
||||
session_id or "default", user_id or "anonymous"
|
||||
)
|
||||
result = await sandbox.commands.run(
|
||||
f"bash -c {shlex.quote(command)}",
|
||||
cwd=_SANDBOX_HOME,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return BashExecResponse(
|
||||
message=f"Command executed (exit {result.exit_code})",
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
exit_code=result.exit_code,
|
||||
timed_out=False,
|
||||
session_id=session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
error_str = str(e)
|
||||
if "timeout" in error_str.lower():
|
||||
return BashExecResponse(
|
||||
message="Execution timed out",
|
||||
stdout="",
|
||||
stderr=f"Execution timed out after {timeout}s",
|
||||
exit_code=-1,
|
||||
timed_out=True,
|
||||
session_id=session_id,
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=f"E2B execution failed: {e}",
|
||||
error=error_str,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Module-level helpers (placed after classes that call them)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
|
||||
def _is_e2b_available() -> bool:
|
||||
"""Check if e2b sandbox is available via execution context."""
|
||||
try:
|
||||
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
|
||||
|
||||
return get_sandbox_manager() is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@@ -1,703 +0,0 @@
|
||||
"""E2B file tools — MCP tools that proxy filesystem operations to the e2b sandbox.
|
||||
|
||||
These replace the SDK built-in Read/Write/Edit/Glob/Grep tools when e2b is
|
||||
enabled, ensuring all file operations go through the sandbox VM.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import posixpath
|
||||
import shlex
|
||||
from typing import Any
|
||||
|
||||
from backend.copilot.model import ChatSession
|
||||
|
||||
from .base import BaseTool
|
||||
from .models import BashExecResponse, ErrorResponse, ToolResponseBase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SANDBOX_HOME = "/home/user"
|
||||
|
||||
|
||||
class E2BReadTool(BaseTool):
|
||||
"""Read a file from the e2b sandbox filesystem."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "read_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Read a file from the sandbox filesystem. "
|
||||
"The sandbox is the shared working environment — files created by "
|
||||
"any tool (bash_exec, write_file, etc.) are accessible here. "
|
||||
"Returns the file content as text. "
|
||||
"Use offset and limit for large files."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path to the file to read (relative to /home/user/ "
|
||||
"or absolute within /home/user/)."
|
||||
),
|
||||
},
|
||||
"offset": {
|
||||
"type": "integer",
|
||||
"description": (
|
||||
"Line number to start reading from (0-indexed). Default: 0"
|
||||
),
|
||||
},
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Number of lines to read. Default: 2000",
|
||||
},
|
||||
},
|
||||
"required": ["path"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
path = kwargs.get("path", "")
|
||||
offset = kwargs.get("offset", 0)
|
||||
limit = kwargs.get("limit", 2000)
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(path)
|
||||
if resolved is None:
|
||||
return _path_error(path, session)
|
||||
|
||||
try:
|
||||
content = await sandbox.files.read(resolved)
|
||||
lines = content.splitlines(keepends=True)
|
||||
selected = lines[offset : offset + limit]
|
||||
text = "".join(selected)
|
||||
return BashExecResponse(
|
||||
message=f"Read {len(selected)} lines from {resolved}",
|
||||
stdout=text,
|
||||
stderr="",
|
||||
exit_code=0,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to read {resolved}: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class E2BWriteTool(BaseTool):
|
||||
"""Write a file to the e2b sandbox filesystem."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "write_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Write or create a file in the sandbox filesystem. "
|
||||
"This is the shared working environment — files are accessible "
|
||||
"to bash_exec and other tools. "
|
||||
"Creates parent directories automatically."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path for the file (relative to /home/user/ "
|
||||
"or absolute within /home/user/)."
|
||||
),
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": "Content to write to the file.",
|
||||
},
|
||||
},
|
||||
"required": ["path", "content"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
path = kwargs.get("path", "")
|
||||
content = kwargs.get("content", "")
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(path)
|
||||
if resolved is None:
|
||||
return _path_error(path, session)
|
||||
|
||||
try:
|
||||
# Ensure parent directory exists
|
||||
parent = posixpath.dirname(resolved)
|
||||
if parent and parent != _SANDBOX_HOME:
|
||||
await sandbox.commands.run(f"mkdir -p {parent}", timeout=5)
|
||||
await sandbox.files.write(resolved, content)
|
||||
return BashExecResponse(
|
||||
message=f"Wrote {len(content)} bytes to {resolved}",
|
||||
stdout=f"Successfully wrote to {resolved}",
|
||||
stderr="",
|
||||
exit_code=0,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to write {resolved}: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class E2BEditTool(BaseTool):
|
||||
"""Edit a file in the e2b sandbox using search/replace."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "edit_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Edit a file in the sandbox by replacing exact text. "
|
||||
"Provide old_text (the exact text to find) and new_text "
|
||||
"(what to replace it with). The old_text must match exactly."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path to the file (relative to /home/user/ "
|
||||
"or absolute within /home/user/)."
|
||||
),
|
||||
},
|
||||
"old_text": {
|
||||
"type": "string",
|
||||
"description": "Exact text to find in the file.",
|
||||
},
|
||||
"new_text": {
|
||||
"type": "string",
|
||||
"description": "Text to replace old_text with.",
|
||||
},
|
||||
},
|
||||
"required": ["path", "old_text", "new_text"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
path = kwargs.get("path", "")
|
||||
old_text = kwargs.get("old_text", "")
|
||||
new_text = kwargs.get("new_text", "")
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(path)
|
||||
if resolved is None:
|
||||
return _path_error(path, session)
|
||||
|
||||
try:
|
||||
content = await sandbox.files.read(resolved)
|
||||
occurrences = content.count(old_text)
|
||||
if occurrences == 0:
|
||||
return ErrorResponse(
|
||||
message=f"old_text not found in {resolved}",
|
||||
error="text_not_found",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
if occurrences > 1:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"old_text found {occurrences} times in {resolved}. "
|
||||
"Please provide more context to make the match unique."
|
||||
),
|
||||
error="ambiguous_match",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
new_content = content.replace(old_text, new_text, 1)
|
||||
await sandbox.files.write(resolved, new_content)
|
||||
return BashExecResponse(
|
||||
message=f"Edited {resolved}",
|
||||
stdout=f"Successfully edited {resolved}",
|
||||
stderr="",
|
||||
exit_code=0,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to edit {resolved}: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class E2BGlobTool(BaseTool):
|
||||
"""List files matching a pattern in the e2b sandbox."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "glob_files"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"List files in the sandbox matching a glob pattern. "
|
||||
"Uses find under the hood. Default directory is /home/user/."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Glob pattern to match (e.g., '*.py', '**/*.json')."
|
||||
),
|
||||
},
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": ("Directory to search in (default: /home/user/)."),
|
||||
},
|
||||
},
|
||||
"required": ["pattern"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
pattern = kwargs.get("pattern", "*")
|
||||
path = kwargs.get("path", _SANDBOX_HOME)
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(path)
|
||||
if resolved is None:
|
||||
return _path_error(path, session)
|
||||
|
||||
try:
|
||||
result = await sandbox.commands.run(
|
||||
f"find {resolved} -name {shlex.quote(pattern)} -type f 2>/dev/null",
|
||||
timeout=15,
|
||||
)
|
||||
return BashExecResponse(
|
||||
message="Glob results",
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
exit_code=result.exit_code,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to glob: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class E2BGrepTool(BaseTool):
|
||||
"""Search file contents in the e2b sandbox."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "grep_files"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Search for a pattern in files within the sandbox. "
|
||||
"Uses grep -rn under the hood. Returns matching lines with "
|
||||
"file paths and line numbers."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {
|
||||
"type": "string",
|
||||
"description": "Search pattern (regex supported).",
|
||||
},
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": ("Directory to search in (default: /home/user/)."),
|
||||
},
|
||||
"include": {
|
||||
"type": "string",
|
||||
"description": "File glob to include (e.g., '*.py').",
|
||||
},
|
||||
},
|
||||
"required": ["pattern"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
pattern = kwargs.get("pattern", "")
|
||||
path = kwargs.get("path", _SANDBOX_HOME)
|
||||
include = kwargs.get("include", "")
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(path)
|
||||
if resolved is None:
|
||||
return _path_error(path, session)
|
||||
|
||||
include_flag = f" --include={shlex.quote(include)}" if include else ""
|
||||
try:
|
||||
result = await sandbox.commands.run(
|
||||
f"grep -rn{include_flag} {shlex.quote(pattern)} {resolved} 2>/dev/null",
|
||||
timeout=15,
|
||||
)
|
||||
return BashExecResponse(
|
||||
message="Grep results",
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
exit_code=result.exit_code,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to grep: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class SaveToWorkspaceTool(BaseTool):
|
||||
"""Copy a file from e2b sandbox to the persistent GCS workspace."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "save_to_workspace"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Save a file from the sandbox to the persistent workspace "
|
||||
"(cloud storage). Files saved to workspace survive across sessions. "
|
||||
"Provide the sandbox file path and optional workspace path."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sandbox_path": {
|
||||
"type": "string",
|
||||
"description": "Path of the file in the sandbox to save.",
|
||||
},
|
||||
"workspace_path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path in the workspace to save to "
|
||||
"(defaults to the sandbox filename)."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["sandbox_path"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
sandbox_path = kwargs.get("sandbox_path", "")
|
||||
workspace_path = kwargs.get("workspace_path", "")
|
||||
|
||||
if not user_id:
|
||||
return ErrorResponse(
|
||||
message="Authentication required",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
resolved = _resolve_path(sandbox_path)
|
||||
if resolved is None:
|
||||
return _path_error(sandbox_path, session)
|
||||
|
||||
try:
|
||||
content_bytes = await sandbox.files.read(resolved, format="bytes")
|
||||
|
||||
# Determine workspace path
|
||||
filename = resolved.rsplit("/", 1)[-1]
|
||||
wp = workspace_path or f"/{filename}"
|
||||
|
||||
from backend.data.db_accessors import workspace_db
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
workspace = await workspace_db().get_or_create_workspace(user_id)
|
||||
manager = WorkspaceManager(user_id, workspace.id, session.session_id)
|
||||
file_record = await manager.write_file(
|
||||
content=content_bytes,
|
||||
filename=filename,
|
||||
path=wp,
|
||||
overwrite=True,
|
||||
)
|
||||
|
||||
return BashExecResponse(
|
||||
message=f"Saved {resolved} to workspace at {file_record.path}",
|
||||
stdout=(
|
||||
f"Saved to workspace: {file_record.path} "
|
||||
f"({file_record.size_bytes} bytes)"
|
||||
),
|
||||
stderr="",
|
||||
exit_code=0,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to save to workspace: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
class LoadFromWorkspaceTool(BaseTool):
|
||||
"""Copy a file from the persistent GCS workspace into the e2b sandbox."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "load_from_workspace"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Load a file from the persistent workspace (cloud storage) into "
|
||||
"the sandbox. Use this to bring workspace files into the sandbox "
|
||||
"for editing or processing."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"workspace_path": {
|
||||
"type": "string",
|
||||
"description": ("Path of the file in the workspace to load."),
|
||||
},
|
||||
"sandbox_path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path in the sandbox to write to "
|
||||
"(defaults to /home/user/<filename>)."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["workspace_path"],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponseBase:
|
||||
workspace_path = kwargs.get("workspace_path", "")
|
||||
sandbox_path = kwargs.get("sandbox_path", "")
|
||||
|
||||
if not user_id:
|
||||
return ErrorResponse(
|
||||
message="Authentication required",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
sandbox = await _get_sandbox(session)
|
||||
if sandbox is None:
|
||||
return _sandbox_unavailable(session)
|
||||
|
||||
try:
|
||||
from backend.data.db_accessors import workspace_db
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
workspace = await workspace_db().get_or_create_workspace(user_id)
|
||||
manager = WorkspaceManager(user_id, workspace.id, session.session_id)
|
||||
file_info = await manager.get_file_info_by_path(workspace_path)
|
||||
if file_info is None:
|
||||
return ErrorResponse(
|
||||
message=f"File not found in workspace: {workspace_path}",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
content = await manager.read_file_by_id(file_info.id)
|
||||
|
||||
# Determine sandbox path
|
||||
filename = workspace_path.rsplit("/", 1)[-1]
|
||||
target = sandbox_path or f"{_SANDBOX_HOME}/{filename}"
|
||||
resolved = _resolve_path(target)
|
||||
if resolved is None:
|
||||
return _path_error(target, session)
|
||||
|
||||
# Ensure parent directory exists
|
||||
parent = posixpath.dirname(resolved)
|
||||
if parent and parent != _SANDBOX_HOME:
|
||||
await sandbox.commands.run(f"mkdir -p {parent}", timeout=5)
|
||||
await sandbox.files.write(resolved, content)
|
||||
|
||||
return BashExecResponse(
|
||||
message=f"Loaded {workspace_path} into sandbox at {resolved}",
|
||||
stdout=(f"Loaded from workspace: {resolved} ({len(content)} bytes)"),
|
||||
stderr="",
|
||||
exit_code=0,
|
||||
timed_out=False,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to load from workspace: {e}",
|
||||
error=str(e),
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Module-level helpers (placed after functions that call them)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
|
||||
def _resolve_path(path: str) -> str | None:
|
||||
"""Resolve a path to an absolute path within /home/user/.
|
||||
|
||||
Returns None if the path escapes the sandbox home.
|
||||
"""
|
||||
if not path:
|
||||
return None
|
||||
|
||||
# Handle relative paths
|
||||
if not path.startswith("/"):
|
||||
path = f"{_SANDBOX_HOME}/{path}"
|
||||
|
||||
# Normalize to prevent traversal
|
||||
resolved = posixpath.normpath(path)
|
||||
|
||||
if not resolved.startswith(_SANDBOX_HOME):
|
||||
return None
|
||||
|
||||
return resolved
|
||||
|
||||
|
||||
async def _get_sandbox(session: ChatSession) -> Any | None:
|
||||
"""Get the sandbox for the current session from the execution context."""
|
||||
try:
|
||||
from backend.copilot.sdk.tool_adapter import get_sandbox_manager
|
||||
|
||||
manager = get_sandbox_manager()
|
||||
if manager is None:
|
||||
return None
|
||||
user_id, _ = _get_user_from_context()
|
||||
return await manager.get_or_create(session.session_id, user_id or "anonymous")
|
||||
except Exception as e:
|
||||
logger.error(f"[E2B] Failed to get sandbox: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _get_user_from_context() -> tuple[str | None, Any]:
|
||||
"""Get user_id from execution context."""
|
||||
from backend.copilot.sdk.tool_adapter import get_execution_context
|
||||
|
||||
return get_execution_context()
|
||||
|
||||
|
||||
def _sandbox_unavailable(session: ChatSession) -> ErrorResponse:
|
||||
"""Return an error response for unavailable sandbox."""
|
||||
return ErrorResponse(
|
||||
message="E2B sandbox is not available. Try again or contact support.",
|
||||
error="sandbox_unavailable",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
|
||||
|
||||
def _path_error(path: str, session: ChatSession) -> ErrorResponse:
|
||||
"""Return an error response for invalid paths."""
|
||||
return ErrorResponse(
|
||||
message=f"Invalid path: {path}. Paths must be within /home/user/.",
|
||||
error="invalid_path",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
@@ -1,215 +0,0 @@
|
||||
"""E2B sandbox manager for CoPilot sessions.
|
||||
|
||||
Manages e2b sandbox lifecycle: create, reuse via Redis, dispose with GCS sync.
|
||||
One sandbox per session, cached in-memory on the worker thread and stored in
|
||||
Redis for cross-pod reconnection.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_REDIS_KEY_PREFIX = "copilot:sandbox:"
|
||||
_SANDBOX_HOME = "/home/user"
|
||||
|
||||
|
||||
class CoPilotSandboxManager:
|
||||
"""Manages e2b sandbox lifecycle for CoPilot sessions.
|
||||
|
||||
Each session gets a single sandbox. The sandbox_id is stored in Redis
|
||||
so another pod can reconnect to it if the original pod dies.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._sandboxes: dict[str, Any] = {} # session_id -> AsyncSandbox
|
||||
self._last_activity: dict[str, float] = {} # session_id -> timestamp
|
||||
self._cleanup_task: asyncio.Task[None] | None = None
|
||||
config = Config()
|
||||
self._timeout: int = config.copilot_sandbox_timeout
|
||||
self._template: str = config.copilot_sandbox_template
|
||||
self._api_key: str = config.e2b_api_key
|
||||
|
||||
async def get_or_create(self, session_id: str, user_id: str) -> Any:
|
||||
"""Get existing sandbox or create a new one for this session.
|
||||
|
||||
Args:
|
||||
session_id: CoPilot chat session ID.
|
||||
user_id: User ID for workspace sync.
|
||||
|
||||
Returns:
|
||||
An e2b AsyncSandbox instance.
|
||||
"""
|
||||
self._last_activity[session_id] = time.monotonic()
|
||||
|
||||
# 1. Check in-memory cache
|
||||
if session_id in self._sandboxes:
|
||||
sandbox = self._sandboxes[session_id]
|
||||
if await _is_sandbox_alive(sandbox):
|
||||
return sandbox
|
||||
# Sandbox died — clean up stale reference
|
||||
del self._sandboxes[session_id]
|
||||
|
||||
# 2. Check Redis for sandbox_id (cross-pod reconnection)
|
||||
sandbox = await self._try_reconnect_from_redis(session_id)
|
||||
if sandbox is not None:
|
||||
self._sandboxes[session_id] = sandbox
|
||||
return sandbox
|
||||
|
||||
# 3. Create new sandbox
|
||||
sandbox = await self._create_sandbox(session_id, user_id)
|
||||
self._sandboxes[session_id] = sandbox
|
||||
await _store_sandbox_id_in_redis(session_id, sandbox.sandbox_id)
|
||||
|
||||
# 4. Start cleanup task if not running
|
||||
self._ensure_cleanup_task()
|
||||
|
||||
return sandbox
|
||||
|
||||
async def dispose(self, session_id: str) -> None:
|
||||
"""Persist workspace files to GCS, then kill sandbox.
|
||||
|
||||
Args:
|
||||
session_id: CoPilot chat session ID.
|
||||
"""
|
||||
sandbox = self._sandboxes.pop(session_id, None)
|
||||
self._last_activity.pop(session_id, None)
|
||||
|
||||
if sandbox is None:
|
||||
return
|
||||
|
||||
try:
|
||||
await sandbox.kill()
|
||||
except Exception as e:
|
||||
logger.warning(f"[E2B] Failed to kill sandbox for {session_id}: {e}")
|
||||
|
||||
await _remove_sandbox_id_from_redis(session_id)
|
||||
logger.info(f"[E2B] Disposed sandbox for session {session_id}")
|
||||
|
||||
async def dispose_all(self) -> None:
|
||||
"""Dispose all sandboxes (called on processor shutdown)."""
|
||||
session_ids = list(self._sandboxes.keys())
|
||||
for sid in session_ids:
|
||||
await self.dispose(sid)
|
||||
if self._cleanup_task and not self._cleanup_task.done():
|
||||
self._cleanup_task.cancel()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _create_sandbox(self, session_id: str, user_id: str) -> Any:
|
||||
"""Create a new e2b sandbox."""
|
||||
from e2b import AsyncSandbox
|
||||
|
||||
kwargs: dict[str, Any] = {"api_key": self._api_key}
|
||||
if self._template:
|
||||
kwargs["template"] = self._template
|
||||
if self._timeout:
|
||||
kwargs["timeout"] = self._timeout
|
||||
|
||||
sandbox = await AsyncSandbox.create(**kwargs)
|
||||
logger.info(
|
||||
f"[E2B] Created sandbox {sandbox.sandbox_id} for session={session_id}, "
|
||||
f"user={user_id}"
|
||||
)
|
||||
return sandbox
|
||||
|
||||
async def _try_reconnect_from_redis(self, session_id: str) -> Any | None:
|
||||
"""Attempt to reconnect to a sandbox stored in Redis."""
|
||||
from e2b import AsyncSandbox
|
||||
|
||||
sandbox_id = await _load_sandbox_id_from_redis(session_id)
|
||||
if not sandbox_id:
|
||||
return None
|
||||
|
||||
try:
|
||||
sandbox = await AsyncSandbox.connect(
|
||||
sandbox_id=sandbox_id, api_key=self._api_key
|
||||
)
|
||||
logger.info(
|
||||
f"[E2B] Reconnected to sandbox {sandbox_id} for session={session_id}"
|
||||
)
|
||||
return sandbox
|
||||
except Exception as e:
|
||||
logger.warning(f"[E2B] Failed to reconnect to sandbox {sandbox_id}: {e}")
|
||||
await _remove_sandbox_id_from_redis(session_id)
|
||||
return None
|
||||
|
||||
def _ensure_cleanup_task(self) -> None:
|
||||
"""Start the idle cleanup background task if not already running."""
|
||||
if self._cleanup_task is None or self._cleanup_task.done():
|
||||
self._cleanup_task = asyncio.ensure_future(self._idle_cleanup_loop())
|
||||
|
||||
async def _idle_cleanup_loop(self) -> None:
|
||||
"""Periodically check for idle sandboxes and dispose them."""
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
if not self._sandboxes:
|
||||
continue
|
||||
now = time.monotonic()
|
||||
to_dispose: list[str] = []
|
||||
for sid, last in list(self._last_activity.items()):
|
||||
if now - last > self._timeout:
|
||||
to_dispose.append(sid)
|
||||
for sid in to_dispose:
|
||||
logger.info(f"[E2B] Disposing idle sandbox for session {sid}")
|
||||
await self.dispose(sid)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Module-level helpers (placed after classes that call them)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _is_sandbox_alive(sandbox: Any) -> bool:
|
||||
"""Check if an e2b sandbox is still running."""
|
||||
try:
|
||||
result = await sandbox.commands.run("echo ok", timeout=5)
|
||||
return result.exit_code == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def _store_sandbox_id_in_redis(session_id: str, sandbox_id: str) -> None:
|
||||
"""Store sandbox_id in Redis keyed by session_id."""
|
||||
try:
|
||||
from backend.data import redis as redis_client
|
||||
|
||||
redis = redis_client.get_redis()
|
||||
key = f"{_REDIS_KEY_PREFIX}{session_id}"
|
||||
config = Config()
|
||||
ttl = max(config.copilot_sandbox_timeout * 2, 3600) # At least 1h, 2x timeout
|
||||
await redis.set(key, sandbox_id, ex=ttl)
|
||||
except Exception as e:
|
||||
logger.warning(f"[E2B] Failed to store sandbox_id in Redis: {e}")
|
||||
|
||||
|
||||
async def _load_sandbox_id_from_redis(session_id: str) -> str | None:
|
||||
"""Load sandbox_id from Redis."""
|
||||
try:
|
||||
from backend.data import redis as redis_client
|
||||
|
||||
redis = redis_client.get_redis()
|
||||
key = f"{_REDIS_KEY_PREFIX}{session_id}"
|
||||
value = await redis.get(key)
|
||||
return value.decode() if isinstance(value, bytes) else value
|
||||
except Exception as e:
|
||||
logger.warning(f"[E2B] Failed to load sandbox_id from Redis: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def _remove_sandbox_id_from_redis(session_id: str) -> None:
|
||||
"""Remove sandbox_id from Redis."""
|
||||
try:
|
||||
from backend.data import redis as redis_client
|
||||
|
||||
redis = redis_client.get_redis()
|
||||
key = f"{_REDIS_KEY_PREFIX}{session_id}"
|
||||
await redis.delete(key)
|
||||
except Exception as e:
|
||||
logger.warning(f"[E2B] Failed to remove sandbox_id from Redis: {e}")
|
||||
@@ -212,6 +212,8 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
"Specify either file_id or path to identify the file. "
|
||||
"For small text files, returns content directly. "
|
||||
"For large or binary files, returns metadata and a download URL. "
|
||||
"Optionally use 'save_to_path' to copy the file to the ephemeral "
|
||||
"working directory for processing with bash_exec or SDK tools. "
|
||||
"Paths are scoped to the current session by default. "
|
||||
"Use /sessions/<session_id>/... for cross-session access."
|
||||
)
|
||||
@@ -232,6 +234,15 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
"Scoped to current session by default."
|
||||
),
|
||||
},
|
||||
"save_to_path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"If provided, save the file to this path in the ephemeral "
|
||||
"working directory (e.g., '/tmp/copilot-.../data.csv') "
|
||||
"so it can be processed with bash_exec or SDK tools. "
|
||||
"The file content is still returned in the response."
|
||||
),
|
||||
},
|
||||
"force_download_url": {
|
||||
"type": "boolean",
|
||||
"description": (
|
||||
@@ -275,6 +286,7 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
|
||||
file_id: Optional[str] = kwargs.get("file_id")
|
||||
path: Optional[str] = kwargs.get("path")
|
||||
save_to_path: Optional[str] = kwargs.get("save_to_path")
|
||||
force_download_url: bool = kwargs.get("force_download_url", False)
|
||||
|
||||
if not file_id and not path:
|
||||
@@ -283,6 +295,22 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Validate save_to_path is within ephemeral workspace
|
||||
if save_to_path:
|
||||
import os
|
||||
|
||||
from backend.copilot.tools.sandbox import WORKSPACE_PREFIX
|
||||
|
||||
real_save = os.path.realpath(save_to_path)
|
||||
if not real_save.startswith(WORKSPACE_PREFIX):
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"save_to_path must be within the ephemeral working "
|
||||
f"directory ({WORKSPACE_PREFIX})"
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
try:
|
||||
workspace = await workspace_db().get_or_create_workspace(user_id)
|
||||
# Pass session_id for session-scoped file access
|
||||
@@ -308,6 +336,15 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
)
|
||||
target_file_id = file_info.id
|
||||
|
||||
# If save_to_path requested, always read and save the file
|
||||
if save_to_path:
|
||||
import os
|
||||
|
||||
content = await manager.read_file_by_id(target_file_id)
|
||||
os.makedirs(os.path.dirname(save_to_path), exist_ok=True)
|
||||
with open(save_to_path, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
# Decide whether to return inline content or metadata+URL
|
||||
is_small_file = file_info.size_bytes <= self.MAX_INLINE_SIZE_BYTES
|
||||
is_text_file = self._is_text_mime_type(file_info.mime_type)
|
||||
@@ -327,13 +364,16 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
content = await manager.read_file_by_id(target_file_id)
|
||||
content_b64 = base64.b64encode(content).decode("utf-8")
|
||||
|
||||
msg = f"Successfully read file: {file_info.name}"
|
||||
if save_to_path:
|
||||
msg += f" (also saved to {save_to_path})"
|
||||
return WorkspaceFileContentResponse(
|
||||
file_id=file_info.id,
|
||||
name=file_info.name,
|
||||
path=file_info.path,
|
||||
mime_type=file_info.mime_type,
|
||||
content_base64=content_b64,
|
||||
message=f"Successfully read file: {file_info.name}",
|
||||
message=msg,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -356,6 +396,11 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
except Exception:
|
||||
pass # Preview is optional
|
||||
|
||||
msg = f"File: {file_info.name} ({file_info.size_bytes} bytes)."
|
||||
if save_to_path:
|
||||
msg += f" Saved to {save_to_path}."
|
||||
else:
|
||||
msg += " Use download_url to retrieve content."
|
||||
return WorkspaceFileMetadataResponse(
|
||||
file_id=file_info.id,
|
||||
name=file_info.name,
|
||||
@@ -364,7 +409,7 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
size_bytes=file_info.size_bytes,
|
||||
download_url=download_url,
|
||||
preview=preview,
|
||||
message=f"File: {file_info.name} ({file_info.size_bytes} bytes). Use download_url to retrieve content.",
|
||||
message=msg,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -395,7 +440,9 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
"Write or create a file in the user's persistent workspace (cloud storage). "
|
||||
"These files survive across sessions. "
|
||||
"For ephemeral session files, use the SDK Write tool instead. "
|
||||
"Provide the content as a base64-encoded string. "
|
||||
"Provide content as plain text via 'content', OR base64-encoded via "
|
||||
"'content_base64', OR copy a file from the ephemeral working directory "
|
||||
"via 'source_path'. Exactly one of these three is required. "
|
||||
f"Maximum file size is {Config().max_file_size_mb}MB. "
|
||||
"Files are saved to the current session's folder by default. "
|
||||
"Use /sessions/<session_id>/... for cross-session access."
|
||||
@@ -410,9 +457,30 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
"type": "string",
|
||||
"description": "Name for the file (e.g., 'report.pdf')",
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Plain text content to write. Use this for text files "
|
||||
"(code, configs, documents, etc.). "
|
||||
"Mutually exclusive with content_base64 and source_path."
|
||||
),
|
||||
},
|
||||
"content_base64": {
|
||||
"type": "string",
|
||||
"description": "Base64-encoded file content",
|
||||
"description": (
|
||||
"Base64-encoded file content. Use this for binary files "
|
||||
"(images, PDFs, etc.). "
|
||||
"Mutually exclusive with content and source_path."
|
||||
),
|
||||
},
|
||||
"source_path": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Path to a file in the ephemeral working directory to "
|
||||
"copy to workspace (e.g., '/tmp/copilot-.../output.csv'). "
|
||||
"Use this to persist files created by bash_exec or SDK Write. "
|
||||
"Mutually exclusive with content and content_base64."
|
||||
),
|
||||
},
|
||||
"path": {
|
||||
"type": "string",
|
||||
@@ -434,7 +502,7 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
"description": "Whether to overwrite if file exists at path (default: false)",
|
||||
},
|
||||
},
|
||||
"required": ["filename", "content_base64"],
|
||||
"required": ["filename"],
|
||||
}
|
||||
|
||||
@property
|
||||
@@ -456,7 +524,9 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
)
|
||||
|
||||
filename: str = kwargs.get("filename", "")
|
||||
content_b64: str = kwargs.get("content_base64", "")
|
||||
content_text: Optional[str] = kwargs.get("content")
|
||||
content_b64: Optional[str] = kwargs.get("content_base64")
|
||||
source_path: Optional[str] = kwargs.get("source_path")
|
||||
path: Optional[str] = kwargs.get("path")
|
||||
mime_type: Optional[str] = kwargs.get("mime_type")
|
||||
overwrite: bool = kwargs.get("overwrite", False)
|
||||
@@ -467,20 +537,66 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
if not content_b64:
|
||||
# Resolve content from one of three sources
|
||||
sources_provided = sum(
|
||||
bool(x) for x in [content_text, content_b64, source_path]
|
||||
)
|
||||
if sources_provided == 0:
|
||||
return ErrorResponse(
|
||||
message="Please provide content_base64",
|
||||
message="Please provide one of: content, content_base64, or source_path",
|
||||
session_id=session_id,
|
||||
)
|
||||
if sources_provided > 1:
|
||||
return ErrorResponse(
|
||||
message="Provide only one of: content, content_base64, or source_path",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Decode content
|
||||
try:
|
||||
content = base64.b64decode(content_b64)
|
||||
except Exception:
|
||||
return ErrorResponse(
|
||||
message="Invalid base64-encoded content",
|
||||
session_id=session_id,
|
||||
)
|
||||
content: bytes
|
||||
if source_path:
|
||||
# Read from ephemeral working directory
|
||||
import os
|
||||
|
||||
from backend.copilot.tools.sandbox import WORKSPACE_PREFIX
|
||||
|
||||
real_path = os.path.realpath(source_path)
|
||||
if not real_path.startswith(WORKSPACE_PREFIX):
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"source_path must be within the ephemeral working "
|
||||
f"directory ({WORKSPACE_PREFIX})"
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
try:
|
||||
with open(real_path, "rb") as f:
|
||||
content = f.read()
|
||||
except FileNotFoundError:
|
||||
return ErrorResponse(
|
||||
message=f"Source file not found: {source_path}",
|
||||
session_id=session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(
|
||||
message=f"Failed to read source file: {e}",
|
||||
session_id=session_id,
|
||||
)
|
||||
elif content_b64:
|
||||
# Decode base64 content
|
||||
try:
|
||||
content = base64.b64decode(content_b64)
|
||||
except Exception:
|
||||
# Fallback: treat as plain text if base64 decode fails
|
||||
# (LLMs sometimes send plain text in the content_base64 field)
|
||||
logger.warning(
|
||||
"[workspace] content_base64 is not valid base64, "
|
||||
"treating as plain text"
|
||||
)
|
||||
content = content_b64.encode("utf-8")
|
||||
else:
|
||||
# Plain text content
|
||||
assert content_text is not None
|
||||
content = content_text.encode("utf-8")
|
||||
|
||||
# Check size
|
||||
max_file_size = Config().max_file_size_mb * 1024 * 1024
|
||||
|
||||
@@ -39,7 +39,6 @@ class Flag(str, Enum):
|
||||
ENABLE_PLATFORM_PAYMENT = "enable-platform-payment"
|
||||
CHAT = "chat"
|
||||
COPILOT_SDK = "copilot-sdk"
|
||||
COPILOT_E2B = "copilot-e2b"
|
||||
|
||||
|
||||
def is_configured() -> bool:
|
||||
|
||||
@@ -665,18 +665,6 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
|
||||
fal_api_key: str = Field(default="", description="FAL API key")
|
||||
exa_api_key: str = Field(default="", description="Exa API key")
|
||||
e2b_api_key: str = Field(default="", description="E2B API key")
|
||||
copilot_sandbox_timeout: int = Field(
|
||||
default=900,
|
||||
description="E2B sandbox idle timeout in seconds (default 15 min).",
|
||||
)
|
||||
copilot_sandbox_template: str = Field(
|
||||
default="",
|
||||
description="E2B sandbox template ID (empty = default template).",
|
||||
)
|
||||
copilot_use_e2b: bool = Field(
|
||||
default=False,
|
||||
description="Enable e2b sandbox for CoPilot (feature flag default).",
|
||||
)
|
||||
nvidia_api_key: str = Field(default="", description="Nvidia API key")
|
||||
mem0_api_key: str = Field(default="", description="Mem0 API key")
|
||||
elevenlabs_api_key: str = Field(default="", description="ElevenLabs API key")
|
||||
|
||||
@@ -169,10 +169,7 @@ export const ChatMessagesContainer = ({
|
||||
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
|
||||
{headerSlot}
|
||||
{isLoading && messages.length === 0 && (
|
||||
<div
|
||||
className="flex flex-1 items-center justify-center"
|
||||
style={{ minHeight: "calc(100vh - 12rem)" }}
|
||||
>
|
||||
<div className="flex min-h-full flex-1 items-center justify-center">
|
||||
<LoadingSpinner className="text-neutral-600" />
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -13,7 +13,7 @@ export function MorphingTextAnimation({ text, className }: Props) {
|
||||
<div className={cn(className)}>
|
||||
<AnimatePresence mode="popLayout" initial={false}>
|
||||
<motion.div key={text} className="whitespace-nowrap">
|
||||
<motion.span className="inline-flex overflow-hidden">
|
||||
<motion.span className="inline-flex gap-0 overflow-hidden">
|
||||
{letters.map((char, index) => (
|
||||
<motion.span
|
||||
key={`${text}-${index}`}
|
||||
|
||||
@@ -10,10 +10,17 @@ export function MiniGame() {
|
||||
const { canvasRef, activeMode, showOverlay, score, highScore, onContinue } =
|
||||
useMiniGame();
|
||||
|
||||
const isRunActive =
|
||||
activeMode === "run" || activeMode === "idle" || activeMode === "over";
|
||||
|
||||
let overlayText: string | undefined;
|
||||
let buttonLabel = "Continue";
|
||||
if (activeMode === "idle") {
|
||||
buttonLabel = "Start";
|
||||
} else if (activeMode === "boss-intro") {
|
||||
overlayText = "Face the bandit!";
|
||||
} else if (activeMode === "boss-defeated") {
|
||||
overlayText = "Great job, keep on going";
|
||||
} else if (activeMode === "over") {
|
||||
overlayText = `Score: ${score} / Record: ${highScore}`;
|
||||
buttonLabel = "Retry";
|
||||
@@ -22,7 +29,16 @@ export function MiniGame() {
|
||||
return (
|
||||
<div className="flex flex-col gap-2">
|
||||
<p className="text-sm font-medium text-purple-500">
|
||||
<Key>WASD</Key> to move
|
||||
{isRunActive ? (
|
||||
<>
|
||||
Run mode: <Key>Space</Key> to jump
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
Duel mode: <Key>←→</Key> to move · <Key>Z</Key> to attack ·{" "}
|
||||
<Key>X</Key> to block · <Key>Space</Key> to jump
|
||||
</>
|
||||
)}
|
||||
</p>
|
||||
<div className="relative w-full overflow-hidden rounded-md border border-accent bg-background text-foreground">
|
||||
<canvas
|
||||
|
||||
|
After Width: | Height: | Size: 5.2 KiB |
|
After Width: | Height: | Size: 4.9 KiB |
|
After Width: | Height: | Size: 12 KiB |
|
After Width: | Height: | Size: 8.0 KiB |
|
After Width: | Height: | Size: 7.3 KiB |
|
After Width: | Height: | Size: 9.6 KiB |
|
After Width: | Height: | Size: 9.5 KiB |
|
After Width: | Height: | Size: 16 KiB |
|
After Width: | Height: | Size: 14 KiB |
|
After Width: | Height: | Size: 10 KiB |
@@ -69,11 +69,12 @@ test.describe("Marketplace Creator Page – Basic Functionality", () => {
|
||||
await marketplacePage.getFirstCreatorProfile(page);
|
||||
await firstCreatorProfile.click();
|
||||
await page.waitForURL("**/marketplace/creator/**");
|
||||
await page.waitForLoadState("networkidle").catch(() => {});
|
||||
|
||||
const firstAgent = page
|
||||
.locator('[data-testid="store-card"]:visible')
|
||||
.first();
|
||||
await firstAgent.waitFor({ state: "visible", timeout: 15000 });
|
||||
await firstAgent.waitFor({ state: "visible", timeout: 30000 });
|
||||
|
||||
await firstAgent.click();
|
||||
await page.waitForURL("**/marketplace/agent/**");
|
||||
|
||||
@@ -115,11 +115,18 @@ test.describe("Marketplace – Basic Functionality", () => {
|
||||
const searchTerm = page.getByText("DummyInput").first();
|
||||
await isVisible(searchTerm);
|
||||
|
||||
await expect
|
||||
.poll(() => marketplacePage.getSearchResultsCount(page), {
|
||||
timeout: 15000,
|
||||
})
|
||||
.toBeGreaterThan(0);
|
||||
await page.waitForLoadState("networkidle").catch(() => {});
|
||||
|
||||
await page
|
||||
.waitForFunction(
|
||||
() =>
|
||||
document.querySelectorAll('[data-testid="store-card"]').length > 0,
|
||||
{ timeout: 15000 },
|
||||
)
|
||||
.catch(() => console.log("No search results appeared within timeout"));
|
||||
|
||||
const results = await marketplacePage.getSearchResultsCount(page);
|
||||
expect(results).toBeGreaterThan(0);
|
||||
|
||||
console.log("Complete search flow works correctly test passed ✅");
|
||||
});
|
||||
@@ -128,9 +135,7 @@ test.describe("Marketplace – Basic Functionality", () => {
|
||||
});
|
||||
|
||||
test.describe("Marketplace – Edge Cases", () => {
|
||||
test("Search for non-existent item renders search page correctly", async ({
|
||||
page,
|
||||
}) => {
|
||||
test("Search for non-existent item shows no results", async ({ page }) => {
|
||||
const marketplacePage = new MarketplacePage(page);
|
||||
await marketplacePage.goto(page);
|
||||
|
||||
@@ -146,23 +151,9 @@ test.describe("Marketplace – Edge Cases", () => {
|
||||
const searchTerm = page.getByText("xyznonexistentitemxyz123");
|
||||
await isVisible(searchTerm);
|
||||
|
||||
// The search page should render either results or a "No results found" message
|
||||
await expect
|
||||
.poll(
|
||||
async () => {
|
||||
const hasResults =
|
||||
(await page.locator('[data-testid="store-card"]').count()) > 0;
|
||||
const hasNoResultsMsg = await page
|
||||
.getByText("No results found")
|
||||
.isVisible();
|
||||
return hasResults || hasNoResultsMsg;
|
||||
},
|
||||
{ timeout: 15000 },
|
||||
)
|
||||
.toBe(true);
|
||||
const results = await marketplacePage.getSearchResultsCount(page);
|
||||
expect(results).toBe(0);
|
||||
|
||||
console.log(
|
||||
"Search for non-existent item renders search page correctly test passed ✅",
|
||||
);
|
||||
console.log("Search for non-existent item shows no results test passed ✅");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -125,8 +125,16 @@ export class BuildPage extends BasePage {
|
||||
`[data-id="block-card-${blockCardId}"]`,
|
||||
);
|
||||
|
||||
await blockCard.waitFor({ state: "visible", timeout: 10000 });
|
||||
await blockCard.click();
|
||||
try {
|
||||
// Wait for the block card to be visible with a reasonable timeout
|
||||
await blockCard.waitFor({ state: "visible", timeout: 10000 });
|
||||
await blockCard.click();
|
||||
} catch (error) {
|
||||
console.log(
|
||||
`Block ${block.name} (display: ${displayName}) returned from the API but not found in block list`,
|
||||
);
|
||||
console.log(`Error: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
async hasBlock(_block: Block) {
|
||||
|
||||
@@ -65,7 +65,7 @@ export class LoginPage {
|
||||
await this.page.waitForLoadState("load", { timeout: 10_000 });
|
||||
|
||||
console.log("➡️ Navigating to /marketplace ...");
|
||||
await this.page.goto("/marketplace", { timeout: 20_000 });
|
||||
await this.page.goto("/marketplace", { timeout: 10_000 });
|
||||
console.log("✅ Login process complete");
|
||||
|
||||
// If Wallet popover auto-opens, close it to avoid blocking account menu interactions
|
||||
|
||||
@@ -9,12 +9,7 @@ export class MarketplacePage extends BasePage {
|
||||
|
||||
async goto(page: Page) {
|
||||
await page.goto("/marketplace");
|
||||
await page
|
||||
.locator(
|
||||
'[data-testid="store-card"], [data-testid="featured-store-card"]',
|
||||
)
|
||||
.first()
|
||||
.waitFor({ state: "visible", timeout: 20000 });
|
||||
await page.waitForLoadState("networkidle").catch(() => {});
|
||||
}
|
||||
|
||||
async getMarketplaceTitle(page: Page) {
|
||||
@@ -116,7 +111,7 @@ export class MarketplacePage extends BasePage {
|
||||
async getFirstFeaturedAgent(page: Page) {
|
||||
const { getId } = getSelectors(page);
|
||||
const card = getId("featured-store-card").first();
|
||||
await card.waitFor({ state: "visible", timeout: 15000 });
|
||||
await card.waitFor({ state: "visible", timeout: 30000 });
|
||||
return card;
|
||||
}
|
||||
|
||||
@@ -124,14 +119,14 @@ export class MarketplacePage extends BasePage {
|
||||
const card = this.page
|
||||
.locator('[data-testid="store-card"]:visible')
|
||||
.first();
|
||||
await card.waitFor({ state: "visible", timeout: 15000 });
|
||||
await card.waitFor({ state: "visible", timeout: 30000 });
|
||||
return card;
|
||||
}
|
||||
|
||||
async getFirstCreatorProfile(page: Page) {
|
||||
const { getId } = getSelectors(page);
|
||||
const card = getId("creator-card").first();
|
||||
await card.waitFor({ state: "visible", timeout: 15000 });
|
||||
await card.waitFor({ state: "visible", timeout: 30000 });
|
||||
return card;
|
||||
}
|
||||
|
||||
|
||||
@@ -45,9 +45,8 @@ export async function isEnabled(el: Locator) {
|
||||
}
|
||||
|
||||
export async function hasMinCount(el: Locator, minCount: number) {
|
||||
await expect
|
||||
.poll(async () => await el.count(), { timeout: 10000 })
|
||||
.toBeGreaterThanOrEqual(minCount);
|
||||
const count = await el.count();
|
||||
expect(count).toBeGreaterThanOrEqual(minCount);
|
||||
}
|
||||
|
||||
export async function matchesUrl(page: Page, pattern: RegExp) {
|
||||
|
||||