Compare commits

..

9 Commits

Author SHA1 Message Date
Bently
383898a2da Merge branch 'dev' into fix/claude-code-binary-files-v2 2026-03-05 19:38:25 +00:00
Bently
3665015647 Merge branch 'dev' into fix/claude-code-binary-files-v2 2026-02-25 08:31:48 +00:00
Bently
7bc08672fa Merge branch 'dev' into fix/claude-code-binary-files-v2 2026-02-19 17:48:58 +00:00
Bentlybro
e8b8cad97a fix: apply size check to text files too (OOM protection) 2026-02-17 14:11:44 +00:00
Bentlybro
be35c626ad fix: address review comments
- Remove redundant inline comment on text_only param
- Simplify file filtering logic per review suggestion
2026-02-17 14:03:55 +00:00
Bentlybro
719c4ee1d1 fix: add explicit ValueError guard for stat output parsing 2026-02-16 14:46:06 +00:00
Bentlybro
411c399e03 style: fix formatting and sync docs
- Fix Black formatting for is_text/is_binary checks
- Update llm.md to reflect binary file support in Claude Code block
2026-02-16 14:40:53 +00:00
Bentlybro
6ac011e36c fix: normalize extension case in sandbox file extraction
Fixes bug where 'Dockerfile' in TEXT_EXTENSIONS wouldn't match after
lowercasing file_path because the extension itself wasn't lowercased.
2026-02-16 14:18:25 +00:00
Bentlybro
5e554526e2 fix(backend): Extract binary files from ClaudeCodeBlock sandbox
Enables binary file extraction (images, PDFs, etc.) for the Claude Code block
by setting text_only=False in extract_and_store_sandbox_files.

Changes:
- sandbox_files.py: Add BINARY_EXTENSIONS set with supported formats
- sandbox_files.py: Add MAX_BINARY_FILE_SIZE (50MB) limit to prevent OOM
- sandbox_files.py: Add size check before reading binary files
- sandbox_files.py: Add .svg to TEXT_EXTENSIONS (XML-based)
- sandbox_files.py: Make extension matching case-insensitive
- claude_code.py: Enable binary file extraction (text_only=False)
- claude_code.py: Update output description to mention binary support
- claude_code.md: Update docs to reflect binary file support

Binary files are stored via store_media_file which handles:
- Virus scanning via scan_content_safe()
- Workspace storage (returns workspace:// URI in CoPilot)
- Data URI fallback for graph execution

Closes SECRT-1897
2026-02-16 14:10:05 +00:00
15 changed files with 603 additions and 740 deletions

View File

@@ -187,9 +187,11 @@ class ClaudeCodeBlock(Block):
)
files: list[SandboxFileOutput] = SchemaField(
description=(
"List of text files created/modified by Claude Code during this execution. "
"List of files created/modified by Claude Code during this execution. "
"Includes text files and binary files (images, PDFs, etc.). "
"Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. "
"workspace_ref contains a workspace:// URI if the file was stored to workspace."
"workspace_ref contains a workspace:// URI for workspace storage. "
"For binary files, content contains a placeholder; use workspace_ref to access the file."
)
)
conversation_history: str = SchemaField(
@@ -452,13 +454,15 @@ class ClaudeCodeBlock(Block):
else:
new_conversation_history = turn_entry
# Extract files created/modified during this run and store to workspace
# Extract files created/modified during this run and store to workspace.
# Binary files (images, PDFs, etc.) are stored via store_media_file
# which handles virus scanning and workspace storage.
sandbox_files = await extract_and_store_sandbox_files(
sandbox=sandbox,
working_directory=working_directory,
execution_context=execution_context,
since_timestamp=start_timestamp,
text_only=True,
text_only=False,
)
return (

View File

@@ -1,29 +0,0 @@
"""Prompt constants for CoPilot - workflow guidance and supplementary documentation.
This module contains workflow patterns and guidance that supplement the main system prompt.
These are appended dynamically to the prompt along with auto-generated tool documentation.
"""
# Workflow guidance for key tool patterns
# This is appended after the auto-generated tool list to provide usage patterns
KEY_WORKFLOWS = """
## KEY WORKFLOWS
### MCP Integration Workflow
When using `run_mcp_tool`:
1. **Known servers** (use directly): Notion (https://mcp.notion.com/mcp), Linear (https://mcp.linear.app/mcp), Stripe (https://mcp.stripe.com), Intercom (https://mcp.intercom.com/mcp), Cloudflare (https://mcp.cloudflare.com/mcp), Atlassian (https://mcp.atlassian.com/mcp)
2. **Unknown servers**: Use `web_search("{{service}} MCP server URL")` to find the endpoint
3. **Discovery**: Call `run_mcp_tool(server_url)` to see available tools
4. **Execution**: Call `run_mcp_tool(server_url, tool_name, tool_arguments)`
5. **Authentication**: If credentials needed, user will be prompted. When they confirm, retry immediately with same arguments.
### Agent Creation Workflow
When using `create_agent`:
1. Always check `find_library_agent` first for existing solutions
2. Call `create_agent` with description
3. **If `suggested_goal` returned**: Present to user, ask for confirmation, call again with suggested goal if accepted
4. **If `clarifying_questions` returned**: After user answers, call again with original description AND answers in `context` parameter
### Folder Management
Use folder tools (`create_folder`, `list_folders`, `move_agents_to_folder`) to organize agents in the user's library for better discoverability."""

View File

@@ -127,6 +127,7 @@ def create_security_hooks(
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_compact: Callable[[], None] | None = None,
on_stop: Callable[[str, str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -135,12 +136,15 @@ def create_security_hooks(
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
on_compact: Callback invoked when SDK starts compacting context.
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
the SDK finishes processing — used to read the JSONL transcript
before the CLI process exits.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -307,6 +311,30 @@ def create_security_hooks(
on_compact()
return cast(SyncHookJSONOutput, {})
# --- Stop hook: capture transcript path for stateless resume ---
async def stop_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Capture transcript path when SDK finishes processing.
The Stop hook fires while the CLI process is still alive, giving us
a reliable window to read the JSONL transcript before SIGTERM.
"""
_ = context, tool_use_id
transcript_path = cast(str, input_data.get("transcript_path", ""))
sdk_session_id = cast(str, input_data.get("session_id", ""))
if transcript_path and on_stop:
logger.info(
f"[SDK] Stop hook: transcript_path={transcript_path}, "
f"sdk_session_id={sdk_session_id[:12]}..."
)
on_stop(transcript_path, sdk_session_id)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
@@ -316,6 +344,9 @@ def create_security_hooks(
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
if on_stop is not None:
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
return hooks
except ImportError:
# Fallback for when SDK isn't available - return empty hooks

View File

@@ -12,6 +12,7 @@ import subprocess
import sys
import uuid
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any, cast
import openai
@@ -20,9 +21,6 @@ from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
)
from langfuse import propagate_attributes
@@ -44,7 +42,6 @@ from ..model import (
update_session_title,
upsert_chat_session,
)
from ..prompt_constants import KEY_WORKFLOWS
from ..response_model import (
StreamBaseResponse,
StreamError,
@@ -60,7 +57,6 @@ from ..service import (
_generate_session_title,
_is_langfuse_configured,
)
from ..tools import TOOL_REGISTRY
from ..tools.e2b_sandbox import get_or_create_sandbox
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
from ..tools.workspace_files import get_manager
@@ -78,11 +74,11 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
)
from .transcript_builder import TranscriptBuilder
logger = logging.getLogger(__name__)
config = ChatConfig()
@@ -141,6 +137,19 @@ _setup_langfuse_otel()
_background_tasks: set[asyncio.Task[Any]] = set()
@dataclass
class CapturedTranscript:
"""Info captured by the SDK Stop hook for stateless --resume."""
path: str = ""
sdk_session_id: str = ""
raw_content: str = ""
@property
def available(self) -> bool:
return bool(self.path)
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
@@ -151,37 +160,8 @@ _HEARTBEAT_INTERVAL = 10.0 # seconds
# Appended to the system prompt to inform the agent about available tools.
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
def _generate_tool_documentation() -> str:
"""Auto-generate tool documentation from TOOL_REGISTRY.
This generates a complete list of available tools with their descriptions,
ensuring the documentation stays in sync with the actual tool implementations.
"""
docs = "\n## AVAILABLE TOOLS\n\n"
# Sort tools alphabetically for consistent output
for name in sorted(TOOL_REGISTRY.keys()):
tool = TOOL_REGISTRY[name]
schema = tool.as_openai_tool()
desc = schema["function"].get("description", "No description available")
# Format as bullet list with tool name in code style
docs += f"- **`{name}`**: {desc}\n"
# Add workflow guidance for key tools
docs += KEY_WORKFLOWS
return docs
_SHARED_TOOL_NOTES = """\
### Web search and research
- **`web_search(query)`** — Search the web for current information (uses Claude's
native web search). Use this when you need up-to-date information, facts,
statistics, or current events that are beyond your knowledge cutoff.
- **`web_fetch(url)`** — Retrieve and analyze content from a specific URL.
Use this when you have a specific URL to read (documentation, articles, etc.).
### Sharing files with the user
After saving a file to the persistent workspace with `write_workspace_file`,
share it with the user by embedding the `download_url` from the response in
@@ -471,49 +451,6 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
pass
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
"""Convert SDK content blocks to transcript format.
Handles TextBlock, ToolUseBlock, ToolResultBlock, and ThinkingBlock.
Unknown block types are logged and skipped.
"""
result: list[dict[str, Any]] = []
for block in blocks or []:
if isinstance(block, TextBlock):
result.append({"type": "text", "text": block.text})
elif isinstance(block, ToolUseBlock):
result.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
elif isinstance(block, ToolResultBlock):
result.append(
{
"type": "tool_result",
"tool_use_id": block.tool_use_id,
"content": block.content,
}
)
elif isinstance(block, ThinkingBlock):
result.append(
{
"type": "thinking",
"thinking": block.thinking,
"signature": block.signature,
}
)
else:
logger.warning(
f"[SDK] Unknown content block type: {type(block).__name__}. "
f"This may indicate a new SDK version with additional block types."
)
return result
async def _compress_messages(
messages: list[ChatMessage],
) -> tuple[list[ChatMessage], bool]:
@@ -869,11 +806,6 @@ async def stream_chat_completion_sdk(
user_id=user_id, session_id=session_id, message_length=len(message)
)
# Structured log prefix: [SDK][<session>][T<turn>]
# Turn = number of user messages (1-based), computed AFTER appending the new message.
turn = sum(1 for m in session.messages if m.role == "user")
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
@@ -891,11 +823,10 @@ async def stream_chat_completion_sdk(
message_id = str(uuid.uuid4())
stream_id = str(uuid.uuid4())
stream_completed = False
ended_with_stream_error = False
e2b_sandbox = None
use_resume = False
resume_file: str | None = None
transcript_builder = TranscriptBuilder()
captured_transcript = CapturedTranscript()
sdk_cwd = ""
# Acquire stream lock to prevent concurrent streams to the same session
@@ -910,7 +841,7 @@ async def stream_chat_completion_sdk(
if lock_owner != stream_id:
# Another stream is active
logger.warning(
f"{log_prefix} Session already has an active stream: {lock_owner}"
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
)
yield StreamError(
errorText="Another stream is already active for this session. "
@@ -934,7 +865,7 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
except (ValueError, OSError) as e:
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
yield StreamError(
errorText="Unable to initialize working directory.",
code="sdk_cwd_error",
@@ -978,13 +909,12 @@ async def stream_chat_completion_sdk(
):
return None
try:
return await download_transcript(
user_id, session_id, log_prefix=log_prefix
)
return await download_transcript(user_id, session_id)
except Exception as transcript_err:
logger.warning(
"%s Transcript download failed, continuing without " "--resume: %s",
log_prefix,
"[SDK] [%s] Transcript download failed, continuing without "
"--resume: %s",
session_id[:12],
transcript_err,
)
return None
@@ -996,34 +926,21 @@ async def stream_chat_completion_sdk(
)
use_e2b = e2b_sandbox is not None
# Generate tool documentation and append appropriate supplement
tool_docs = _generate_tool_documentation()
system_prompt = (
base_system_prompt
+ tool_docs
+ (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
)
system_prompt = base_system_prompt + (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
)
# Process transcript download result
transcript_msg_count = 0
if dl:
is_valid = validate_transcript(dl.content)
dl_lines = dl.content.strip().split("\n") if dl.content else []
logger.info(
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
log_prefix,
len(dl.content),
len(dl_lines),
dl.message_count,
is_valid,
)
if is_valid:
# Load previous FULL context into builder
transcript_builder.load_previous(dl.content, log_prefix=log_prefix)
logger.info(
f"[SDK] Transcript available for session {session_id}: "
f"{len(dl.content)}B, msg_count={dl.message_count}"
)
resume_file = write_transcript_to_tempfile(
dl.content, session_id, sdk_cwd
)
@@ -1031,14 +948,16 @@ async def stream_chat_completion_sdk(
use_resume = True
transcript_msg_count = dl.message_count
logger.debug(
f"{log_prefix} Using --resume ({len(dl.content)}B, "
f"[SDK] Using --resume ({len(dl.content)}B, "
f"msg_count={transcript_msg_count})"
)
else:
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
logger.warning(
f"[SDK] Transcript downloaded but invalid for {session_id}"
)
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
logger.warning(
f"{log_prefix} No transcript available "
f"[SDK] No transcript available for {session_id} "
f"({len(session.messages)} messages in session)"
)
@@ -1060,6 +979,25 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
# Read the file content immediately — the SDK may clean up
# the file before our finally block runs.
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.path = transcript_path
captured_transcript.sdk_session_id = sdk_session_id
content = read_transcript_file(transcript_path)
if content:
captured_transcript.raw_content = content
logger.info(
f"[SDK] Stop hook: captured {len(content)}B from "
f"{transcript_path}"
)
else:
logger.warning(
f"[SDK] Stop hook: transcript file empty/missing at "
f"{transcript_path}"
)
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
compaction = CompactionTracker()
@@ -1067,6 +1005,7 @@ async def stream_chat_completion_sdk(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
on_stop=_on_stop if config.claude_agent_use_resume else None,
on_compact=compaction.on_compact,
)
@@ -1101,10 +1040,7 @@ async def stream_chat_completion_sdk(
session_id=session_id,
trace_name="copilot-sdk",
tags=["sdk"],
metadata={
"resume": str(use_resume),
"conversation_turn": str(turn),
},
metadata={"resume": str(use_resume)},
)
_otel_ctx.__enter__()
@@ -1138,9 +1074,9 @@ async def stream_chat_completion_sdk(
query_message = f"{query_message}\n\n{attachments.hint}"
logger.info(
"%s Sending query — resume=%s, total_msgs=%d, "
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
"query_len=%d, attached_files=%d, image_blocks=%d",
log_prefix,
session_id[:12],
use_resume,
len(session.messages),
len(query_message),
@@ -1169,13 +1105,8 @@ async def stream_chat_completion_sdk(
await client._transport.write( # noqa: SLF001
json.dumps(user_msg) + "\n"
)
# Capture user message in transcript (multimodal)
transcript_builder.add_user_message(content=content_blocks)
else:
await client.query(query_message, session_id=session_id)
# Capture actual user message in transcript (not the engineered query)
# query_message may include context wrappers, but transcript needs raw input
transcript_builder.add_user_message(content=current_message)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
@@ -1219,8 +1150,8 @@ async def stream_chat_completion_sdk(
sdk_msg = done.pop().result()
except StopAsyncIteration:
logger.info(
"%s Stream ended normally (StopAsyncIteration)",
log_prefix,
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
session_id[:12],
)
break
except Exception as stream_err:
@@ -1229,8 +1160,8 @@ async def stream_chat_completion_sdk(
# so the session can still be saved and the
# frontend gets a clean finish.
logger.error(
"%s Stream error from SDK: %s",
log_prefix,
"[SDK] [%s] Stream error from SDK: %s",
session_id[:12],
stream_err,
exc_info=True,
)
@@ -1242,9 +1173,9 @@ async def stream_chat_completion_sdk(
break
logger.info(
"%s Received: %s %s "
"[SDK] [%s] Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
log_prefix,
session_id[:12],
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
@@ -1253,15 +1184,6 @@ async def stream_chat_completion_sdk(
len(adapter.resolved_tool_calls),
)
# Capture SDK messages in transcript
if isinstance(sdk_msg, AssistantMessage):
content_blocks = _format_sdk_content_blocks(sdk_msg.content)
model_name = getattr(sdk_msg, "model", "")
transcript_builder.add_assistant_message(
content_blocks=content_blocks,
model=model_name,
)
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
# message can arrive before the hook stashes output.
@@ -1288,10 +1210,10 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0)
else:
logger.warning(
"%s Timed out waiting for "
"[SDK] [%s] Timed out waiting for "
"PostToolUse hook stash "
"(%d unresolved tool calls)",
log_prefix,
session_id[:12],
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
@@ -1299,9 +1221,9 @@ async def stream_chat_completion_sdk(
# Log ResultMessage details for debugging
if isinstance(sdk_msg, ResultMessage):
logger.info(
"%s Received: ResultMessage %s "
"[SDK] [%s] Received: ResultMessage %s "
"(unresolved=%d, current=%d, resolved=%d)",
log_prefix,
session_id[:12],
sdk_msg.subtype,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
@@ -1310,8 +1232,8 @@ async def stream_chat_completion_sdk(
)
if sdk_msg.subtype in ("error", "error_during_execution"):
logger.error(
"%s SDK execution failed with error: %s",
log_prefix,
"[SDK] [%s] SDK execution failed with error: %s",
session_id[:12],
sdk_msg.result or "(no error message provided)",
)
@@ -1336,8 +1258,8 @@ async def stream_chat_completion_sdk(
out_len = len(str(response.output))
extra = f", output_len={out_len}"
logger.info(
"%s Tool event: %s, tool=%s%s",
log_prefix,
"[SDK] [%s] Tool event: %s, tool=%s%s",
session_id[:12],
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
@@ -1346,8 +1268,8 @@ async def stream_chat_completion_sdk(
# Log errors being sent to frontend
if isinstance(response, StreamError):
logger.error(
"%s Sending error to frontend: %s (code=%s)",
log_prefix,
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
session_id[:12],
response.errorText,
response.code,
)
@@ -1392,28 +1314,17 @@ async def stream_chat_completion_sdk(
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
tool_result_content = (
response.output
if isinstance(response.output, str)
else str(response.output)
)
session.messages.append(
ChatMessage(
role="tool",
content=tool_result_content,
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
# Capture tool result in transcript as user message with tool_result content
transcript_builder.add_user_message(
content=[
{
"type": "tool_result",
"tool_use_id": response.toolCallId,
"content": tool_result_content,
}
]
)
has_tool_results = True
elif isinstance(response, StreamFinish):
@@ -1424,8 +1335,8 @@ async def stream_chat_completion_sdk(
# server shutdown). Log and let the safety-net / finally
# blocks handle cleanup.
logger.warning(
"%s Streaming loop cancelled (asyncio.CancelledError)",
log_prefix,
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
session_id[:12],
)
raise
finally:
@@ -1439,8 +1350,7 @@ async def stream_chat_completion_sdk(
except (asyncio.CancelledError, StopAsyncIteration):
# Expected: task was cancelled or exhausted during cleanup
logger.info(
"%s Pending __anext__ task completed during cleanup",
log_prefix,
"[SDK] Pending __anext__ task completed during cleanup"
)
# Safety net: if tools are still unresolved after the
@@ -1449,9 +1359,9 @@ async def stream_chat_completion_sdk(
# them now so the frontend stops showing spinners.
if adapter.has_unresolved_tool_calls:
logger.warning(
"%s %d unresolved tool(s) after stream loop — "
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
"flushing as safety net",
log_prefix,
session_id[:12],
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
)
safety_responses: list[StreamBaseResponse] = []
@@ -1462,8 +1372,8 @@ async def stream_chat_completion_sdk(
(StreamToolInputAvailable, StreamToolOutputAvailable),
):
logger.info(
"%s Safety flush: %s, tool=%s",
log_prefix,
"[SDK] [%s] Safety flush: %s, tool=%s",
session_id[:12],
type(response).__name__,
getattr(response, "toolName", "N/A"),
)
@@ -1476,8 +1386,8 @@ async def stream_chat_completion_sdk(
# StreamFinish is published by mark_session_completed in the processor.
if not stream_completed and not ended_with_stream_error:
logger.info(
"%s Stream ended without ResultMessage (stopped by user)",
log_prefix,
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
session_id[:12],
)
closing_responses: list[StreamBaseResponse] = []
adapter._end_text_if_open(closing_responses)
@@ -1498,36 +1408,69 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# Transcript upload is handled exclusively in the finally block
# to avoid double-uploads (the success path used to upload the
# old resume file, then the finally block overwrote it with the
# stop hook content — which could be smaller after compaction).
# --- Upload transcript for next-turn --resume ---
# After async with the SDK task group has exited, so the Stop
# hook has already fired and the CLI has been SIGTERMed. The
# CLI uses appendFileSync, so all writes are safely on disk.
if config.claude_agent_use_resume and user_id:
# With --resume the CLI appends to the resume file (most
# complete). Otherwise use the Stop hook path.
if use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
logger.debug("[SDK] Transcript source: resume file")
elif captured_transcript.path:
raw_transcript = read_transcript_file(captured_transcript.path)
logger.debug(
"[SDK] Transcript source: stop hook (%s), read result: %s",
captured_transcript.path,
f"{len(raw_transcript)}B" if raw_transcript else "None",
)
else:
raw_transcript = None
if ended_with_stream_error:
logger.warning(
"%s Stream ended with SDK error after %d messages",
log_prefix,
len(session.messages),
)
else:
logger.info(
"%s Stream completed successfully with %d messages",
log_prefix,
len(session.messages),
)
if not raw_transcript:
logger.debug(
"[SDK] No usable transcript — CLI file had no "
"conversation entries (expected for first turn "
"without --resume)"
)
if raw_transcript:
# Shield the upload from generator cancellation so a
# client disconnect / page refresh doesn't lose the
# transcript. The upload must finish even if the SSE
# connection is torn down.
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
)
logger.info(
"[SDK] [%s] Stream completed successfully with %d messages",
session_id[:12],
len(session.messages),
)
except BaseException as e:
# Catch BaseException to handle both Exception and CancelledError
# (CancelledError inherits from BaseException in Python 3.8+)
if isinstance(e, asyncio.CancelledError):
logger.warning("%s Session cancelled", log_prefix)
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
error_msg = "Operation cancelled"
else:
error_msg = str(e) or type(e).__name__
# SDK cleanup RuntimeError is expected during cancellation, log as warning
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
logger.warning(
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
)
else:
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
logger.error(
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
)
# Append error marker to session (non-invasive text parsing approach)
# The finally block will persist the session with this error marker
@@ -1538,8 +1481,8 @@ async def stream_chat_completion_sdk(
)
)
logger.debug(
"%s Appended error marker, will be persisted in finally",
log_prefix,
"[SDK] [%s] Appended error marker, will be persisted in finally",
session_id[:12],
)
# Yield StreamError for immediate feedback (only for non-cancellation errors)
@@ -1571,61 +1514,47 @@ async def stream_chat_completion_sdk(
try:
await asyncio.shield(upsert_chat_session(session))
logger.info(
"%s Session persisted in finally with %d messages",
log_prefix,
"[SDK] [%s] Session persisted in finally with %d messages",
session_id[:12],
len(session.messages),
)
except Exception as persist_err:
logger.error(
"%s Failed to persist session in finally: %s",
log_prefix,
"[SDK] [%s] Failed to persist session in finally: %s",
session_id[:12],
persist_err,
exc_info=True,
)
# --- Upload transcript for next-turn --resume ---
# This MUST run in finally so the transcript is uploaded even when
# the streaming loop raises an exception.
# The transcript represents the COMPLETE active context (atomic).
if config.claude_agent_use_resume and user_id and session is not None:
# the streaming loop raises an exception. The CLI uses
# appendFileSync, so whatever was written before the error/SIGTERM
# is safely on disk and still useful for the next turn.
if config.claude_agent_use_resume and user_id:
try:
# Build complete transcript from captured SDK messages
transcript_content = transcript_builder.to_jsonl()
# Prefer content captured in the Stop hook (read before
# cleanup removes the file). Fall back to the resume
# file when the stop hook didn't fire (e.g. error before
# completion) so we don't lose the prior transcript.
raw_transcript = captured_transcript.raw_content or None
if not raw_transcript and use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
if not transcript_content:
logger.warning(
"%s No transcript to upload (builder empty)", log_prefix
)
elif not validate_transcript(transcript_content):
logger.warning(
"%s Transcript invalid, skipping upload (entries=%d)",
log_prefix,
transcript_builder.entry_count,
)
else:
logger.info(
"%s Uploading complete transcript (entries=%d, bytes=%d)",
log_prefix,
transcript_builder.entry_count,
len(transcript_content),
)
# Shield upload from cancellation - let it complete even if
# the finally block is interrupted. No timeout to avoid race
# conditions where backgrounded uploads overwrite newer transcripts.
if raw_transcript and session is not None:
await asyncio.shield(
upload_transcript(
user_id=user_id,
session_id=session_id,
content=transcript_content,
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
log_prefix=log_prefix,
)
)
else:
logger.warning(f"[SDK] No transcript to upload for {session_id}")
except Exception as upload_err:
logger.error(
"%s Transcript upload failed in finally: %s",
log_prefix,
upload_err,
f"[SDK] Transcript upload failed in finally: {upload_err}",
exc_info=True,
)
@@ -1636,6 +1565,33 @@ async def stream_chat_completion_sdk(
await lock.release()
async def _try_upload_transcript(
user_id: str,
session_id: str,
raw_content: str,
message_count: int = 0,
) -> bool:
"""Strip progress entries and upload transcript (with timeout).
Returns True if the upload completed without error.
"""
try:
async with asyncio.timeout(30):
await upload_transcript(
user_id, session_id, raw_content, message_count=message_count
)
return True
except asyncio.TimeoutError:
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
return False
except Exception as e:
logger.error(
f"[SDK] Failed to upload transcript for {session_id}: {e}",
exc_info=True,
)
return False
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
@@ -1644,7 +1600,7 @@ async def _update_title_async(
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title and user_id:
if title:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch
import pytest
from .service import _generate_tool_documentation, _prepare_file_attachments
from .service import _prepare_file_attachments
@dataclass
@@ -145,94 +145,3 @@ class TestPrepareFileAttachments:
assert "Read tool" not in result.hint
assert len(result.image_blocks) == 1
class TestGenerateToolDocumentation:
"""Tests for auto-generated tool documentation from TOOL_REGISTRY."""
def test_generate_tool_documentation_structure(self):
"""Test that tool documentation has expected structure."""
docs = _generate_tool_documentation()
# Check main sections exist
assert "## AVAILABLE TOOLS" in docs
assert "## KEY WORKFLOWS" in docs
# Verify no duplicate sections
assert docs.count("## AVAILABLE TOOLS") == 1
assert docs.count("## KEY WORKFLOWS") == 1
def test_tool_documentation_includes_key_tools(self):
"""Test that documentation includes essential copilot tools."""
docs = _generate_tool_documentation()
# Core agent workflow tools
assert "`create_agent`" in docs
assert "`run_agent`" in docs
assert "`find_library_agent`" in docs
assert "`edit_agent`" in docs
# MCP integration
assert "`run_mcp_tool`" in docs
# Browser automation
assert "`browser_navigate`" in docs
# Folder management
assert "`create_folder`" in docs
def test_tool_documentation_format(self):
"""Test that each tool follows bullet list format."""
docs = _generate_tool_documentation()
lines = docs.split("\n")
tool_lines = [line for line in lines if line.strip().startswith("- **`")]
# Should have multiple tools (at least 20 from TOOL_REGISTRY)
assert len(tool_lines) >= 20
# Each tool line should have proper markdown format
for line in tool_lines:
assert line.startswith("- **`"), f"Bad format: {line}"
assert "`**:" in line, f"Missing description separator: {line}"
def test_tool_documentation_includes_workflows(self):
"""Test that key workflow patterns are documented."""
docs = _generate_tool_documentation()
# Check workflow sections
assert "MCP Integration Workflow" in docs
assert "Agent Creation Workflow" in docs
assert "Folder Management" in docs
# Check workflow details
assert "suggested_goal" in docs # Agent creation feedback loop
assert "clarifying_questions" in docs # Agent creation feedback loop
assert "run_mcp_tool(server_url)" in docs # MCP discovery pattern
def test_tool_documentation_completeness(self):
"""Test that all tools from TOOL_REGISTRY appear in documentation."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Verify each registered tool is documented
for tool_name in TOOL_REGISTRY.keys():
assert (
f"`{tool_name}`" in docs
), f"Tool '{tool_name}' missing from auto-generated documentation"
def test_tool_documentation_no_duplicate_tools(self):
"""Test that no tool appears multiple times in the list."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Extract the tools section (before KEY WORKFLOWS)
tools_section = docs.split("## KEY WORKFLOWS")[0]
# Count occurrences of each tool
for tool_name in TOOL_REGISTRY.keys():
# Count how many times this tool appears as a bullet point
count = tools_section.count(f"- **`{tool_name}`**")
assert count == 1, f"Tool '{tool_name}' appears {count} times (should be 1)"

View File

@@ -10,14 +10,13 @@ Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from backend.util import json
logger = logging.getLogger(__name__)
# UUIDs are hex + hyphens; strip everything else to prevent path injection.
@@ -59,37 +58,41 @@ def strip_progress_entries(content: str) -> str:
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
any remaining child entries so the ``parentUuid`` chain stays intact.
Typically reduces transcript size by ~30%.
Entries that are not stripped or reparented are kept as their original
raw JSON line to avoid unnecessary re-serialization that changes
whitespace or key ordering.
"""
lines = content.strip().split("\n")
# Parse entries, keeping the original line alongside the parsed dict.
parsed: list[tuple[str, dict | None]] = []
entries: list[dict] = []
for line in lines:
parsed.append((line, json.loads(line, fallback=None)))
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Keep unparseable lines as-is (safety)
entries.append({"_raw": line})
# First pass: identify stripped UUIDs and build parent map.
stripped_uuids: set[str] = set()
uuid_to_parent: dict[str, str] = {}
kept: list[dict] = []
for _line, entry in parsed:
if not isinstance(entry, dict):
for entry in entries:
if "_raw" in entry:
kept.append(entry)
continue
uid = entry.get("uuid", "")
parent = entry.get("parentUuid", "")
entry_type = entry.get("type", "")
if uid:
uuid_to_parent[uid] = parent
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
stripped_uuids.add(uid)
# Second pass: keep non-stripped entries, reparenting where needed.
# Preserve original line when no reparenting is required.
reparented: set[str] = set()
for _line, entry in parsed:
if not isinstance(entry, dict):
if entry_type in STRIPPABLE_TYPES:
if uid:
stripped_uuids.add(uid)
else:
kept.append(entry)
# Reparent: walk up chain through stripped entries to find surviving ancestor
for entry in kept:
if "_raw" in entry:
continue
parent = entry.get("parentUuid", "")
original_parent = parent
@@ -97,32 +100,63 @@ def strip_progress_entries(content: str) -> str:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
uid = entry.get("uuid", "")
if uid:
reparented.add(uid)
result_lines: list[str] = []
for line, entry in parsed:
if not isinstance(entry, dict):
result_lines.append(line)
continue
if entry.get("type", "") in STRIPPABLE_TYPES:
continue
uid = entry.get("uuid", "")
if uid in reparented:
# Re-serialize only entries whose parentUuid was changed.
result_lines.append(json.dumps(entry, separators=(",", ":")))
for entry in kept:
if "_raw" in entry:
result_lines.append(entry["_raw"])
else:
result_lines.append(line)
result_lines.append(json.dumps(entry, separators=(",", ":")))
return "\n".join(result_lines) + "\n"
# ---------------------------------------------------------------------------
# Local file I/O (write temp file for --resume)
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
# ---------------------------------------------------------------------------
def read_transcript_file(transcript_path: str) -> str | None:
"""Read a JSONL transcript file from disk.
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
or only contains metadata (≤2 lines with no conversation messages).
"""
if not transcript_path or not os.path.isfile(transcript_path):
logger.debug(f"[Transcript] File not found: {transcript_path}")
return None
try:
with open(transcript_path) as f:
content = f.read()
if not content.strip():
logger.debug("[Transcript] File is empty: %s", transcript_path)
return None
lines = content.strip().split("\n")
# Validate that the transcript has real conversation content
# (not just metadata like queue-operation entries).
if not validate_transcript(content):
logger.debug(
"[Transcript] No conversation content (%d lines) in %s",
len(lines),
transcript_path,
)
return None
logger.info(
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
return None
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
"""Sanitize an ID for safe use in file paths.
@@ -137,6 +171,14 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
def _encode_cwd_for_cli(cwd: str) -> str:
"""Encode a working directory path the same way the Claude CLI does.
The CLI replaces all non-alphanumeric characters with ``-``.
"""
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""Remove the CLI's project directory for a specific working directory.
@@ -146,8 +188,7 @@ def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""
import shutil
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
cwd_encoded = _encode_cwd_for_cli(sdk_cwd)
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
@@ -207,29 +248,32 @@ def write_transcript_to_tempfile(
def validate_transcript(content: str | None) -> bool:
"""Check that a transcript has actual conversation messages.
A valid transcript needs at least one assistant message (not just
queue-operation / file-history-snapshot metadata). We do NOT require
a ``type: "user"`` entry because with ``--resume`` the user's message
is passed as a CLI query parameter and does not appear in the
transcript file.
A valid transcript for resume needs at least one user message and one
assistant message (not just queue-operation / file-history-snapshot
metadata).
"""
if not content or not content.strip():
return False
lines = content.strip().split("\n")
if len(lines) < 2:
return False
has_user = False
has_assistant = False
for line in lines:
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
has_user = True
elif msg_type == "assistant":
has_assistant = True
except json.JSONDecodeError:
return False
if entry.get("type") == "assistant":
has_assistant = True
return has_assistant
return has_user and has_assistant
# ---------------------------------------------------------------------------
@@ -284,41 +328,26 @@ async def upload_transcript(
session_id: str,
content: str,
message_count: int = 0,
log_prefix: str = "[Transcript]",
) -> None:
"""Strip progress entries and upload complete transcript.
The transcript represents the FULL active context (atomic).
Each upload REPLACES the previous transcript entirely.
"""Strip progress entries and upload transcript to bucket storage.
The executor holds a cluster lock per session, so concurrent uploads for
the same session cannot happen.
the same session cannot happen. We always overwrite — with ``--resume``
the CLI may compact old tool results, so neither byte size nor line count
is a reliable proxy for "newer".
Args:
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
message_count: ``len(session.messages)`` at upload time — used by
the next turn to detect staleness and compress only the gap.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
entry = json.loads(line, fallback={"type": "INVALID_JSON"})
entry_types.append(entry.get("type", "?"))
logger.warning(
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
log_prefix,
entry_types,
len(stripped),
len(content),
f"[Transcript] Skipping upload — stripped content not valid "
f"for session {session_id}"
)
logger.debug("%s Raw content preview: %s", log_prefix, content[:500])
logger.debug("%s Stripped content: %s", log_prefix, stripped[:500])
return
storage = await get_workspace_storage()
@@ -344,18 +373,17 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
logger.info(
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
f"[Transcript] Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count}) "
f"for session {session_id}"
)
async def download_transcript(
user_id: str,
session_id: str,
log_prefix: str = "[Transcript]",
user_id: str, session_id: str
) -> TranscriptDownload | None:
"""Download transcript and metadata from bucket storage.
@@ -371,10 +399,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug(f"{log_prefix} No transcript in storage")
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
return None
except Exception as e:
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
logger.warning(f"[Transcript] Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -391,13 +419,16 @@ async def download_transcript(
meta_path = f"local://{mwid}/{mfid}/{mfname}"
meta_data = await storage.retrieve(meta_path)
meta = json.loads(meta_data.decode("utf-8"), fallback={})
meta = json.loads(meta_data.decode("utf-8"))
message_count = meta.get("message_count", 0)
uploaded_at = meta.get("uploaded_at", 0.0)
except (FileNotFoundError, Exception):
except (FileNotFoundError, json.JSONDecodeError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
logger.info(
f"[Transcript] Downloaded {len(content)}B "
f"(msg_count={message_count}) for session {session_id}"
)
return TranscriptDownload(
content=content,
message_count=message_count,

View File

@@ -1,150 +0,0 @@
"""Build complete JSONL transcript from SDK messages.
The transcript represents the FULL active context at any point in time.
Each upload REPLACES the previous transcript atomically.
Flow:
Turn 1: Upload [msg1, msg2]
Turn 2: Download [msg1, msg2] → Upload [msg1, msg2, msg3, msg4] (REPLACE)
Turn 3: Download [msg1, msg2, msg3, msg4] → Upload [all messages] (REPLACE)
The transcript is never incremental - always the complete atomic state.
"""
import logging
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
from backend.util import json
from .transcript import STRIPPABLE_TYPES
logger = logging.getLogger(__name__)
class TranscriptEntry(BaseModel):
"""Single transcript entry (user or assistant turn)."""
type: str
uuid: str
parentUuid: str | None
message: dict[str, Any]
class TranscriptBuilder:
"""Build complete JSONL transcript from SDK messages.
This builder maintains the FULL conversation state, not incremental changes.
The output is always the complete active context.
"""
def __init__(self) -> None:
self._entries: list[TranscriptEntry] = []
self._last_uuid: str | None = None
def load_previous(self, content: str, log_prefix: str = "[Transcript]") -> None:
"""Load complete previous transcript.
This loads the FULL previous context. As new messages come in,
we append to this state. The final output is the complete context
(previous + new), not just the delta.
"""
if not content or not content.strip():
return
lines = content.strip().split("\n")
for line_num, line in enumerate(lines, 1):
if not line.strip():
continue
data = json.loads(line, fallback=None)
if data is None:
logger.warning(
"%s Failed to parse transcript line %d/%d",
log_prefix,
line_num,
len(lines),
)
continue
# Load all non-strippable entries (user/assistant/system/etc.)
# Skip only STRIPPABLE_TYPES to match strip_progress_entries() behavior
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
continue
entry = TranscriptEntry(
type=data["type"],
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
message=data.get("message", {}),
)
self._entries.append(entry)
self._last_uuid = entry.uuid
logger.info(
"%s Loaded %d entries from previous transcript (last_uuid=%s)",
log_prefix,
len(self._entries),
self._last_uuid[:12] if self._last_uuid else None,
)
def add_user_message(
self, content: str | list[dict], uuid: str | None = None
) -> None:
"""Add user message to the complete context."""
msg_uuid = uuid or str(uuid4())
self._entries.append(
TranscriptEntry(
type="user",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={"role": "user", "content": content},
)
)
self._last_uuid = msg_uuid
def add_assistant_message(
self, content_blocks: list[dict], model: str = ""
) -> None:
"""Add assistant message to the complete context."""
msg_uuid = str(uuid4())
self._entries.append(
TranscriptEntry(
type="assistant",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={
"role": "assistant",
"model": model,
"content": content_blocks,
},
)
)
self._last_uuid = msg_uuid
def to_jsonl(self) -> str:
"""Export complete context as JSONL.
Returns the FULL conversation state (all entries), not incremental.
This output REPLACES any previous transcript.
"""
if not self._entries:
return ""
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
return "\n".join(lines) + "\n"
@property
def entry_count(self) -> int:
"""Total number of entries in the complete context."""
return len(self._entries)
@property
def is_empty(self) -> bool:
"""Whether this builder has any entries."""
return len(self._entries) == 0

View File

@@ -1,11 +1,11 @@
"""Unit tests for JSONL transcript management utilities."""
import json
import os
from backend.util import json
from .transcript import (
STRIPPABLE_TYPES,
read_transcript_file,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
@@ -38,6 +38,49 @@ PROGRESS_ENTRY = {
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
# --- read_transcript_file ---
class TestReadTranscriptFile:
def test_returns_content_for_valid_file(self, tmp_path):
path = tmp_path / "session.jsonl"
path.write_text(VALID_TRANSCRIPT)
result = read_transcript_file(str(path))
assert result is not None
assert "user" in result
def test_returns_none_for_missing_file(self):
assert read_transcript_file("/nonexistent/path.jsonl") is None
def test_returns_none_for_empty_path(self):
assert read_transcript_file("") is None
def test_returns_none_for_empty_file(self, tmp_path):
path = tmp_path / "empty.jsonl"
path.write_text("")
assert read_transcript_file(str(path)) is None
def test_returns_none_for_metadata_only(self, tmp_path):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
path = tmp_path / "meta.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
def test_no_size_limit(self, tmp_path):
"""Large files are accepted — bucket storage has no size limit."""
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
result = read_transcript_file(str(path))
assert result is not None
# --- write_transcript_to_tempfile ---
@@ -112,56 +155,12 @@ class TestValidateTranscript:
assert validate_transcript(content) is False
def test_assistant_only_no_user(self):
"""With --resume the user message is a CLI query param, not a transcript entry.
A transcript with only assistant entries is valid."""
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
assert validate_transcript(content) is True
def test_resume_transcript_without_user_entry(self):
"""Simulates a real --resume stop hook transcript: the CLI session file
has summary + assistant entries but no user entry."""
summary = {"type": "summary", "uuid": "s1", "text": "context..."}
asst1 = {
"type": "assistant",
"uuid": "a1",
"message": {"role": "assistant", "content": "Hello!"},
}
asst2 = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "a1",
"message": {"role": "assistant", "content": "Sure, let me help."},
}
content = _make_jsonl(summary, asst1, asst2)
assert validate_transcript(content) is True
def test_single_assistant_entry(self):
"""A transcript with just one assistant line is valid — the CLI may
produce short transcripts for simple responses with no tool use."""
content = json.dumps(ASST_MSG) + "\n"
assert validate_transcript(content) is True
assert validate_transcript(content) is False
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False
def test_malformed_json_after_valid_assistant_returns_false(self):
"""Validation must scan all lines - malformed JSON anywhere should fail."""
valid_asst = json.dumps(ASST_MSG)
malformed = "not valid json"
content = valid_asst + "\n" + malformed + "\n"
assert validate_transcript(content) is False
def test_blank_lines_are_skipped(self):
"""Transcripts with blank lines should be valid if they contain assistant entries."""
content = (
json.dumps(USER_MSG)
+ "\n\n" # blank line
+ json.dumps(ASST_MSG)
+ "\n"
+ "\n" # another blank line
)
assert validate_transcript(content) is True
# --- strip_progress_entries ---
@@ -254,31 +253,3 @@ class TestStripProgressEntries:
assert "queue-operation" not in result_types
assert "user" in result_types
assert "assistant" in result_types
def test_preserves_original_line_formatting(self):
"""Non-reparented entries keep their original JSON formatting."""
# orjson produces compact JSON - test that we preserve the exact input
# when no reparenting is needed (no re-serialization)
original_line = json.dumps(USER_MSG)
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
result = strip_progress_entries(content)
result_lines = result.strip().split("\n")
# Original line should be byte-identical (not re-serialized)
assert result_lines[0] == original_line
def test_reparented_entries_are_reserialized(self):
"""Entries whose parentUuid changes must be re-serialized."""
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p1",
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, progress, asst)
result = strip_progress_entries(content)
lines = result.strip().split("\n")
asst_entry = json.loads(lines[-1])
assert asst_entry["parentUuid"] == "u1" # reparented

View File

@@ -34,9 +34,8 @@ client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# Default system prompt used when Langfuse is not configured
# Provides minimal baseline tone and personality - all workflow, tools, and
# technical details are provided via the supplement.
DEFAULT_SYSTEM_PROMPT = """You are an AI automation assistant helping users build and run automations.
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
Here is everything you know about the current user from previous interactions:
@@ -44,12 +43,113 @@ Here is everything you know about the current user from previous interactions:
{users_information}
</users_information>
Your goal is to help users automate tasks by:
- Understanding their needs and business context
- Building and running working automations
- Delivering tangible value through action, not just explanation
## YOUR CORE MANDATE
Be concise, proactive, and action-oriented. Bias toward showing working solutions over lengthy explanations."""
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- **For live integrations** (read a GitHub repo, query a database, post to Slack, etc.) consider `run_mcp_tool` — it connects directly to external services without building a full agent
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
**MCP (Model Context Protocol) Servers:**
- `run_mcp_tool`: Connect to any MCP server to discover and run its tools
**Two-step flow:**
1. `run_mcp_tool(server_url)` → returns a list of available tools. Each tool has `name`, `description`, and `input_schema` (JSON Schema). Read `input_schema.properties` to understand what arguments are needed.
2. `run_mcp_tool(server_url, tool_name, tool_arguments)` → executes the tool. Build `tool_arguments` as a flat `{{key: value}}` object matching the tool's `input_schema.properties`.
**Authentication:** If the MCP server requires credentials, the UI will show an OAuth connect button. Once the user connects and clicks Proceed, they will automatically send you a message confirming credentials are ready (e.g. "I've connected the MCP server credentials. Please retry run_mcp_tool..."). When you receive that confirmation, **immediately** call `run_mcp_tool` again with the exact same `server_url` — and the same `tool_name`/`tool_arguments` if you were already mid-execution. Do not ask the user what to do next; just retry.
**Finding server URLs (fastest → slowest):**
1. **Known hosted servers** — use directly, no lookup:
- Notion: `https://mcp.notion.com/mcp`
- Linear: `https://mcp.linear.app/mcp`
- Stripe: `https://mcp.stripe.com`
- Intercom: `https://mcp.intercom.com/mcp`
- Cloudflare: `https://mcp.cloudflare.com/mcp`
- Atlassian (Jira/Confluence): `https://mcp.atlassian.com/mcp`
2. **`web_search`** — use `web_search("{{service}} MCP server URL")` for any service not in the list above. This is the fastest way to find unlisted servers.
3. **Registry API** — `web_fetch("https://registry.modelcontextprotocol.io/v0.1/servers?search={{query}}&limit=10")` to browse what's available. Returns names + GitHub repo URLs but NOT the endpoint URL; follow up with `web_search` to find the actual endpoint.
- **Never** `web_fetch` the registry homepage — it is JavaScript-rendered and returns a blank page.
**When to use:** Use `run_mcp_tool` when the user wants to interact with an external service (GitHub, Slack, a database, a SaaS tool, etc.) via its MCP integration. Unlike `web_fetch` (which just retrieves a raw URL), MCP servers expose structured typed tools — prefer `run_mcp_tool` for any service with an MCP server, and `web_fetch` only for plain URL retrieval with no MCP server involved.
**CRITICAL**: `run_mcp_tool` is **always available** in your tool list. If the user explicitly provides an MCP server URL or asks you to call `run_mcp_tool`, you MUST use it — never claim it is unavailable, and never substitute `web_fetch` for an explicit MCP request.
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
- **For MCP integrations**: Known URL (see list) or `web_search("{{service}} MCP server URL")` → `run_mcp_tool(server_url)` → `run_mcp_tool(server_url, tool_name, tool_arguments)`. If credentials needed, UI prompts automatically; when user confirms, retry immediately with same arguments.
**Handle Feedback Loops:**
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
- Don't ask redundant questions if the user has already provided context in the conversation
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
# ---------------------------------------------------------------------------

View File

@@ -15,7 +15,6 @@ from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data import workspace as workspace_db
# Import dynamic field utilities from centralized location
from backend.data.block import BlockInput, BlockOutputEntry
@@ -33,6 +32,7 @@ from backend.data.execution import (
from backend.data.graph import GraphModel, Node
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.workspace import get_or_create_workspace
from backend.util.clients import (
get_async_execution_event_bus,
get_async_execution_queue,
@@ -831,9 +831,8 @@ async def add_graph_execution(
udb = user_db
gdb = graph_db
odb = onboarding_db
wdb = workspace_db
else:
edb = udb = gdb = odb = wdb = get_database_manager_async_client()
edb = udb = gdb = odb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
@@ -893,7 +892,7 @@ async def add_graph_execution(
if execution_context is None:
user = await udb.get_user_by_id(user_id)
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
workspace = await wdb.get_or_create_workspace(user_id)
workspace = await get_or_create_workspace(user_id)
execution_context = ExecutionContext(
# Execution identity

View File

@@ -368,10 +368,12 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
# Setup mock returns
# The function returns (graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip)
@@ -647,10 +649,12 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
# Setup returns - include nodes_to_skip in the tuple
mock_validate.return_value = (

View File

@@ -72,58 +72,19 @@ def dumps(
T = TypeVar("T")
# Sentinel value to detect when fallback is not provided
_NO_FALLBACK = object()
@overload
def loads(data: str | bytes, *args, target_type: Type[T], **kwargs) -> T: ...
@overload
def loads(
data: str | bytes, *args, target_type: Type[T], fallback: T | None = None, **kwargs
) -> T:
pass
@overload
def loads(data: str | bytes, *args, fallback: Any = None, **kwargs) -> Any:
pass
def loads(data: str | bytes, *args, **kwargs) -> Any: ...
def loads(
data: str | bytes,
*args,
target_type: Type[T] | None = None,
fallback: Any = _NO_FALLBACK,
**kwargs,
data: str | bytes, *args, target_type: Type[T] | None = None, **kwargs
) -> Any:
"""Parse JSON with optional fallback on decode errors.
Args:
data: JSON string or bytes to parse
target_type: Optional type to validate/cast result to
fallback: Value to return on JSONDecodeError. If not provided, raises.
**kwargs: Additional arguments (unused, for compatibility)
Returns:
Parsed JSON data, or fallback value if parsing fails
Raises:
orjson.JSONDecodeError: Only if fallback is not provided
Examples:
>>> loads('{"valid": "json"}')
{'valid': 'json'}
>>> loads('invalid json', fallback=None)
None
>>> loads('invalid json', fallback={})
{}
>>> loads('invalid json') # raises orjson.JSONDecodeError
"""
try:
parsed = orjson.loads(data)
except orjson.JSONDecodeError:
if fallback is not _NO_FALLBACK:
return fallback
raise
parsed = orjson.loads(data)
if target_type:
return type_match(parsed, target_type)

View File

@@ -74,8 +74,50 @@ TEXT_EXTENSIONS = {
".tex",
".csv",
".log",
".svg", # SVG is XML-based text
}
# Binary file extensions we explicitly support extracting
BINARY_EXTENSIONS = {
# Images
".png",
".jpg",
".jpeg",
".gif",
".webp",
".ico",
".bmp",
".tiff",
".tif",
# Documents
".pdf",
# Archives
".zip",
".tar",
".gz",
".7z",
# Audio
".mp3",
".wav",
".ogg",
".flac",
# Video
".mp4",
".webm",
".mov",
".avi",
# Fonts
".woff",
".woff2",
".ttf",
".otf",
".eot",
}
# Maximum file size for binary extraction (50MB)
# Prevents OOM from accidentally extracting huge files
MAX_BINARY_FILE_SIZE = 50 * 1024 * 1024
class SandboxFileOutput(BaseModel):
"""A file extracted from a sandbox and optionally stored in workspace."""
@@ -120,7 +162,8 @@ async def extract_sandbox_files(
sandbox: The E2B sandbox instance
working_directory: Directory to search for files
since_timestamp: ISO timestamp - only return files modified after this time
text_only: If True, only extract text files (default). If False, extract all files.
text_only: If True, only extract text files. If False, also extract
supported binary files (images, PDFs, etc.).
Returns:
List of ExtractedFile objects with path, content, and metadata
@@ -149,15 +192,48 @@ async def extract_sandbox_files(
if not file_path:
continue
# Check if it's a text file
is_text = any(file_path.endswith(ext) for ext in TEXT_EXTENSIONS)
# Check file type (case-insensitive for extensions)
file_path_lower = file_path.lower()
is_text = any(
file_path_lower.endswith(ext.lower()) for ext in TEXT_EXTENSIONS
)
is_binary = any(
file_path_lower.endswith(ext.lower()) for ext in BINARY_EXTENSIONS
)
# Skip non-text files if text_only mode
# Skip files with unrecognized extensions
if not is_text and not is_binary:
continue
# In text_only mode, skip binary files
if text_only and not is_text:
continue
try:
# Read file content as bytes
# Check file size before reading to prevent OOM
stat_result = await sandbox.commands.run(
f"stat -c %s {shlex.quote(file_path)} 2>/dev/null"
)
if stat_result.exit_code != 0 or not stat_result.stdout:
logger.debug(f"Skipping {file_path}: could not determine file size")
continue
try:
file_size = int(stat_result.stdout.strip())
except ValueError:
logger.debug(
f"Skipping {file_path}: unexpected stat output "
f"{stat_result.stdout.strip()!r}"
)
continue
if file_size > MAX_BINARY_FILE_SIZE:
logger.info(
f"Skipping {file_path}: size {file_size} bytes "
f"exceeds limit {MAX_BINARY_FILE_SIZE}"
)
continue
content = await sandbox.files.read(file_path, format="bytes")
if isinstance(content, str):
content = content.encode("utf-8")

View File

@@ -16,7 +16,7 @@ When activated, the block:
- Install dependencies (npm, pip, etc.)
- Run terminal commands
- Build and test applications
5. Extracts all text files created/modified during execution
5. Extracts all files created/modified during execution (text files and binary files like images, PDFs, etc.)
6. Returns the response and files, optionally keeping the sandbox alive for follow-up tasks
The block supports conversation continuation through three mechanisms:
@@ -42,7 +42,7 @@ The block supports conversation continuation through three mechanisms:
| Output | Description |
|--------|-------------|
| Response | The output/response from Claude Code execution |
| Files | List of text files created/modified during execution. Each file includes path, relative_path, name, and content fields |
| Files | List of files (text and binary) created/modified during execution. Includes images, PDFs, and other supported formats. Each file has path, relative_path, name, content, and workspace_ref fields. Binary files are stored in workspace and accessible via workspace_ref |
| Conversation History | Full conversation history including this turn. Use to restore context on a fresh sandbox |
| Session ID | Session ID for this conversation. Pass back with sandbox_id to continue the conversation |
| Sandbox ID | ID of the sandbox instance (null if disposed). Pass back with session_id to continue the conversation |

View File

@@ -535,7 +535,7 @@ When activated, the block:
2. Installs the latest version of Claude Code in the sandbox
3. Optionally runs setup commands to prepare the environment
4. Executes your prompt using Claude Code, which can create/edit files, install dependencies, run terminal commands, and build applications
5. Extracts all text files created/modified during execution
5. Extracts all files created/modified during execution (text files and binary files like images, PDFs, etc.)
6. Returns the response and files, optionally keeping the sandbox alive for follow-up tasks
The block supports conversation continuation through three mechanisms:
@@ -563,7 +563,7 @@ The block supports conversation continuation through three mechanisms:
|--------|-------------|------|
| error | Error message if execution failed | str |
| response | The output/response from Claude Code execution | str |
| files | List of text files created/modified by Claude Code during this execution. Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. workspace_ref contains a workspace:// URI if the file was stored to workspace. | List[SandboxFileOutput] |
| files | List of files created/modified by Claude Code during this execution. Includes text files and binary files (images, PDFs, etc.). Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. workspace_ref contains a workspace:// URI for workspace storage. For binary files, content contains a placeholder; use workspace_ref to access the file. | List[SandboxFileOutput] |
| conversation_history | Full conversation history including this turn. Pass this to conversation_history input to continue on a fresh sandbox if the previous sandbox timed out. | str |
| session_id | Session ID for this conversation. Pass this back along with sandbox_id to continue the conversation. | str |
| sandbox_id | ID of the sandbox instance. Pass this back along with session_id to continue the conversation. This is None if dispose_sandbox was True (sandbox was disposed). | str |