Compare commits

...

9 Commits

Author SHA1 Message Date
Zamil Majdy
3d880cd591 refactor(backend/copilot): move imports to module level
- Move KEY_WORKFLOWS and TOOL_REGISTRY imports to top of file
- Better code organization following Python conventions
2026-03-06 23:15:39 +07:00
Zamil Majdy
73f5ff9983 test(backend/copilot): add tests for auto-generated tool documentation
- Test tool documentation structure (sections, format)
- Test that all TOOL_REGISTRY tools are included
- Test workflow sections are present
- Test no duplicate tools
- Verify markdown formatting compliance
- All 6 tests passing
2026-03-06 23:15:39 +07:00
Zamil Majdy
6d9faf5f91 refactor(backend/copilot): auto-generate tool docs in supplement, simplify default prompt
- Add _generate_tool_documentation() to auto-generate tool list from TOOL_REGISTRY
- Extract KEY_WORKFLOWS constant to prompt_constants.py for maintainability
- Append auto-generated tool docs + workflow guidance to system prompt supplement
- Simplify DEFAULT_SYSTEM_PROMPT to minimal tone/style baseline (Langfuse handles details)
- Add KEY WORKFLOWS section covering MCP integration, agent creation, folder management
- Ensures tool documentation stays in sync with actual implementations
- Fix Pyright error by safely accessing description field with .get()
2026-03-06 23:10:42 +07:00
Zamil Majdy
7774717104 docs(backend/copilot): document web_search and web_fetch in tool supplement
Add clear documentation for web_search and web_fetch to the shared tool notes
that get appended to all system prompts (Langfuse or default). This ensures
the copilot knows to use web_search for general web queries instead of
incorrectly using find_block to search for web search blocks.

- web_search: For current information beyond knowledge cutoff
- web_fetch: For retrieving content from specific URLs
2026-03-06 23:10:42 +07:00
Zamil Majdy
89ed628609 fix(backend/copilot): capture tool results in transcript
Tool results (StreamToolOutputAvailable) were being added to session.messages
but NOT to transcript_builder, causing the transcript to miss tool executions.
This made the copilot claim '(no tool used)' when tools were actually called.

Now tool results are captured as user messages with tool_result content blocks,
matching the Claude API transcript format and ensuring --resume has complete
conversation history including all tool interactions.
2026-03-06 23:10:42 +07:00
Zamil Majdy
d56452898a hotfix(backend/copilot): refactor transcript to SDK-based atomic full-context model (#12318)
## Summary

Major refactor to eliminate CLI transcript race conditions and simplify
the codebase by building transcripts directly from SDK messages instead
of reading CLI files.

## Problem

The previous approach had race conditions:
- SDK reads CLI transcript file during stop hook
- CLI may not have finished writing → incomplete transcript
- Complex merge logic to detect and fix incomplete writes
- ~200 lines of synthetic entry detection and merge code

## Solution

**Atomic Full-Context Transcript Model:**
- Build transcript from SDK messages during streaming
(`TranscriptBuilder`)
- Each upload REPLACES the previous transcript entirely (atomic)
- No CLI file reading → no race conditions
- Eliminates all merge complexity

## Key Changes

### Core Refactor
- **NEW**: `transcript_builder.py` - Build JSONL from SDK messages
during streaming
- **SIMPLIFIED**: `transcript.py` - Removed merge logic, simplified
upload/download
- **SIMPLIFIED**: `service.py` - Use TranscriptBuilder, removed stop
hook callback
- **CLEANED**: `security_hooks.py` - Removed `on_stop` parameter

### Performance & Code Quality
- **orjson migration**: Use `backend.util.json` (2-3x faster than
stdlib)
- Added `fallback` parameter to `json.loads()` for cleaner error
handling
- Moved SDK imports to top-level per code style guidelines

### Bug Fixes
- Fixed garbage collection bug in background task handling
- Fixed double upload bug in timeout handling  
- Downgraded PII-risk logging from WARNING to DEBUG
- Added 30s timeout to prevent session lock hang

## Code Removed (~200 lines)

- `merge_with_previous_transcript()` - No longer needed
- `read_transcript_file()` - No longer needed
- `CapturedTranscript` dataclass - No longer needed
- `_on_stop()` callback - No longer needed
- Synthetic entry detection logic - No longer needed
- Manual append/merge logic in finally block - No longer needed

## Testing

-  All transcript tests passing (24/24)
-  Verified with real session logs showing proper transcript growth
-  Verified with Langfuse traces showing proper turn tracking (1-8)

## Transcript Growth Pattern

From session logs:
- **Turn 1**: 2 entries (initial)
- **Turn 2**: 5 entries (+3), 2257B uploaded
- **Turn N**: ~2N entries (linear growth)

Each upload is the **complete atomic state** - always REPLACES, never
incremental.

## Files Changed

```
backend/copilot/sdk/transcript_builder.py (NEW)   | +140 lines
backend/copilot/sdk/transcript.py                  | -198, +125 lines  
backend/copilot/sdk/service.py                     | -214, +160 lines
backend/copilot/sdk/security_hooks.py              | -33, +10 lines
backend/copilot/sdk/transcript_test.py             | -85, +36 lines
backend/util/json.py                               | +45 lines
```

**Net result**: -200 lines, more reliable, faster JSON operations.

## Migration Notes

This is a **breaking change** for any code that:
- Directly calls `merge_with_previous_transcript()` or
`read_transcript_file()`
- Relies on incremental transcript uploads
- Expects stop hook callbacks

All internal usage has been updated.

---

@ntindle - Tagging for autogpt-reviewer
2026-03-06 21:03:49 +07:00
Otto
3e108a813a fix(backend): Use db_manager for workspace in add_graph_execution (#12312)
When `add_graph_execution` is called from a context where the global
Prisma client isn't connected (e.g. CoPilot tools, external API), the
call to `get_or_create_workspace(user_id)` crashes with
`ClientNotConnectedError` because it directly accesses
`UserWorkspace.prisma()`.

The fix adds `workspace_db` to the existing `if prisma.is_connected()`
fallback pattern, consistent with how all other DB calls in the function
already work.

**Sentry:** AUTOGPT-SERVER-83T (and ~15 related issues going back to Jan
2026)

---
Co-authored-by: Reinier van der Leer (@Pwuts) <pwuts@agpt.co>

Co-authored-by: Reinier van der Leer (@Pwuts) <pwuts@agpt.co>
2026-03-06 08:48:15 +01:00
Zamil Majdy
0b9e0665dd Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT 2026-03-06 02:32:36 +07:00
Zamil Majdy
f6f268a1f0 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into HEAD 2026-03-06 02:29:56 +07:00
11 changed files with 729 additions and 512 deletions

View File

@@ -0,0 +1,29 @@
"""Prompt constants for CoPilot - workflow guidance and supplementary documentation.
This module contains workflow patterns and guidance that supplement the main system prompt.
These are appended dynamically to the prompt along with auto-generated tool documentation.
"""
# Workflow guidance for key tool patterns
# This is appended after the auto-generated tool list to provide usage patterns
KEY_WORKFLOWS = """
## KEY WORKFLOWS
### MCP Integration Workflow
When using `run_mcp_tool`:
1. **Known servers** (use directly): Notion (https://mcp.notion.com/mcp), Linear (https://mcp.linear.app/mcp), Stripe (https://mcp.stripe.com), Intercom (https://mcp.intercom.com/mcp), Cloudflare (https://mcp.cloudflare.com/mcp), Atlassian (https://mcp.atlassian.com/mcp)
2. **Unknown servers**: Use `web_search("{{service}} MCP server URL")` to find the endpoint
3. **Discovery**: Call `run_mcp_tool(server_url)` to see available tools
4. **Execution**: Call `run_mcp_tool(server_url, tool_name, tool_arguments)`
5. **Authentication**: If credentials needed, user will be prompted. When they confirm, retry immediately with same arguments.
### Agent Creation Workflow
When using `create_agent`:
1. Always check `find_library_agent` first for existing solutions
2. Call `create_agent` with description
3. **If `suggested_goal` returned**: Present to user, ask for confirmation, call again with suggested goal if accepted
4. **If `clarifying_questions` returned**: After user answers, call again with original description AND answers in `context` parameter
### Folder Management
Use folder tools (`create_folder`, `list_folders`, `move_agents_to_folder`) to organize agents in the user's library for better discoverability."""

View File

@@ -127,7 +127,6 @@ def create_security_hooks(
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_compact: Callable[[], None] | None = None,
on_stop: Callable[[str, str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -136,15 +135,12 @@ def create_security_hooks(
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
the SDK finishes processing — used to read the JSONL transcript
before the CLI process exits.
on_compact: Callback invoked when SDK starts compacting context.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -311,30 +307,6 @@ def create_security_hooks(
on_compact()
return cast(SyncHookJSONOutput, {})
# --- Stop hook: capture transcript path for stateless resume ---
async def stop_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Capture transcript path when SDK finishes processing.
The Stop hook fires while the CLI process is still alive, giving us
a reliable window to read the JSONL transcript before SIGTERM.
"""
_ = context, tool_use_id
transcript_path = cast(str, input_data.get("transcript_path", ""))
sdk_session_id = cast(str, input_data.get("session_id", ""))
if transcript_path and on_stop:
logger.info(
f"[SDK] Stop hook: transcript_path={transcript_path}, "
f"sdk_session_id={sdk_session_id[:12]}..."
)
on_stop(transcript_path, sdk_session_id)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
@@ -344,9 +316,6 @@ def create_security_hooks(
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
if on_stop is not None:
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
return hooks
except ImportError:
# Fallback for when SDK isn't available - return empty hooks

View File

@@ -12,7 +12,6 @@ import subprocess
import sys
import uuid
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any, cast
import openai
@@ -21,6 +20,9 @@ from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
)
from langfuse import propagate_attributes
@@ -42,6 +44,7 @@ from ..model import (
update_session_title,
upsert_chat_session,
)
from ..prompt_constants import KEY_WORKFLOWS
from ..response_model import (
StreamBaseResponse,
StreamError,
@@ -57,6 +60,7 @@ from ..service import (
_generate_session_title,
_is_langfuse_configured,
)
from ..tools import TOOL_REGISTRY
from ..tools.e2b_sandbox import get_or_create_sandbox
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
from ..tools.workspace_files import get_manager
@@ -74,11 +78,11 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
)
from .transcript_builder import TranscriptBuilder
logger = logging.getLogger(__name__)
config = ChatConfig()
@@ -137,19 +141,6 @@ _setup_langfuse_otel()
_background_tasks: set[asyncio.Task[Any]] = set()
@dataclass
class CapturedTranscript:
"""Info captured by the SDK Stop hook for stateless --resume."""
path: str = ""
sdk_session_id: str = ""
raw_content: str = ""
@property
def available(self) -> bool:
return bool(self.path)
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
@@ -160,8 +151,37 @@ _HEARTBEAT_INTERVAL = 10.0 # seconds
# Appended to the system prompt to inform the agent about available tools.
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
def _generate_tool_documentation() -> str:
"""Auto-generate tool documentation from TOOL_REGISTRY.
This generates a complete list of available tools with their descriptions,
ensuring the documentation stays in sync with the actual tool implementations.
"""
docs = "\n## AVAILABLE TOOLS\n\n"
# Sort tools alphabetically for consistent output
for name in sorted(TOOL_REGISTRY.keys()):
tool = TOOL_REGISTRY[name]
schema = tool.as_openai_tool()
desc = schema["function"].get("description", "No description available")
# Format as bullet list with tool name in code style
docs += f"- **`{name}`**: {desc}\n"
# Add workflow guidance for key tools
docs += KEY_WORKFLOWS
return docs
_SHARED_TOOL_NOTES = """\
### Web search and research
- **`web_search(query)`** — Search the web for current information (uses Claude's
native web search). Use this when you need up-to-date information, facts,
statistics, or current events that are beyond your knowledge cutoff.
- **`web_fetch(url)`** — Retrieve and analyze content from a specific URL.
Use this when you have a specific URL to read (documentation, articles, etc.).
### Sharing files with the user
After saving a file to the persistent workspace with `write_workspace_file`,
share it with the user by embedding the `download_url` from the response in
@@ -451,6 +471,49 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
pass
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
"""Convert SDK content blocks to transcript format.
Handles TextBlock, ToolUseBlock, ToolResultBlock, and ThinkingBlock.
Unknown block types are logged and skipped.
"""
result: list[dict[str, Any]] = []
for block in blocks or []:
if isinstance(block, TextBlock):
result.append({"type": "text", "text": block.text})
elif isinstance(block, ToolUseBlock):
result.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
elif isinstance(block, ToolResultBlock):
result.append(
{
"type": "tool_result",
"tool_use_id": block.tool_use_id,
"content": block.content,
}
)
elif isinstance(block, ThinkingBlock):
result.append(
{
"type": "thinking",
"thinking": block.thinking,
"signature": block.signature,
}
)
else:
logger.warning(
f"[SDK] Unknown content block type: {type(block).__name__}. "
f"This may indicate a new SDK version with additional block types."
)
return result
async def _compress_messages(
messages: list[ChatMessage],
) -> tuple[list[ChatMessage], bool]:
@@ -806,6 +869,11 @@ async def stream_chat_completion_sdk(
user_id=user_id, session_id=session_id, message_length=len(message)
)
# Structured log prefix: [SDK][<session>][T<turn>]
# Turn = number of user messages (1-based), computed AFTER appending the new message.
turn = sum(1 for m in session.messages if m.role == "user")
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
@@ -823,10 +891,11 @@ async def stream_chat_completion_sdk(
message_id = str(uuid.uuid4())
stream_id = str(uuid.uuid4())
stream_completed = False
ended_with_stream_error = False
e2b_sandbox = None
use_resume = False
resume_file: str | None = None
captured_transcript = CapturedTranscript()
transcript_builder = TranscriptBuilder()
sdk_cwd = ""
# Acquire stream lock to prevent concurrent streams to the same session
@@ -841,7 +910,7 @@ async def stream_chat_completion_sdk(
if lock_owner != stream_id:
# Another stream is active
logger.warning(
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
f"{log_prefix} Session already has an active stream: {lock_owner}"
)
yield StreamError(
errorText="Another stream is already active for this session. "
@@ -865,7 +934,7 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
except (ValueError, OSError) as e:
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
yield StreamError(
errorText="Unable to initialize working directory.",
code="sdk_cwd_error",
@@ -909,12 +978,13 @@ async def stream_chat_completion_sdk(
):
return None
try:
return await download_transcript(user_id, session_id)
return await download_transcript(
user_id, session_id, log_prefix=log_prefix
)
except Exception as transcript_err:
logger.warning(
"[SDK] [%s] Transcript download failed, continuing without "
"--resume: %s",
session_id[:12],
"%s Transcript download failed, continuing without " "--resume: %s",
log_prefix,
transcript_err,
)
return None
@@ -926,21 +996,34 @@ async def stream_chat_completion_sdk(
)
use_e2b = e2b_sandbox is not None
system_prompt = base_system_prompt + (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
# Generate tool documentation and append appropriate supplement
tool_docs = _generate_tool_documentation()
system_prompt = (
base_system_prompt
+ tool_docs
+ (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
)
)
# Process transcript download result
transcript_msg_count = 0
if dl:
is_valid = validate_transcript(dl.content)
dl_lines = dl.content.strip().split("\n") if dl.content else []
logger.info(
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
log_prefix,
len(dl.content),
len(dl_lines),
dl.message_count,
is_valid,
)
if is_valid:
logger.info(
f"[SDK] Transcript available for session {session_id}: "
f"{len(dl.content)}B, msg_count={dl.message_count}"
)
# Load previous FULL context into builder
transcript_builder.load_previous(dl.content, log_prefix=log_prefix)
resume_file = write_transcript_to_tempfile(
dl.content, session_id, sdk_cwd
)
@@ -948,16 +1031,14 @@ async def stream_chat_completion_sdk(
use_resume = True
transcript_msg_count = dl.message_count
logger.debug(
f"[SDK] Using --resume ({len(dl.content)}B, "
f"{log_prefix} Using --resume ({len(dl.content)}B, "
f"msg_count={transcript_msg_count})"
)
else:
logger.warning(
f"[SDK] Transcript downloaded but invalid for {session_id}"
)
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
logger.warning(
f"[SDK] No transcript available for {session_id} "
f"{log_prefix} No transcript available "
f"({len(session.messages)} messages in session)"
)
@@ -979,25 +1060,6 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
# Read the file content immediately — the SDK may clean up
# the file before our finally block runs.
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.path = transcript_path
captured_transcript.sdk_session_id = sdk_session_id
content = read_transcript_file(transcript_path)
if content:
captured_transcript.raw_content = content
logger.info(
f"[SDK] Stop hook: captured {len(content)}B from "
f"{transcript_path}"
)
else:
logger.warning(
f"[SDK] Stop hook: transcript file empty/missing at "
f"{transcript_path}"
)
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
compaction = CompactionTracker()
@@ -1005,7 +1067,6 @@ async def stream_chat_completion_sdk(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
on_stop=_on_stop if config.claude_agent_use_resume else None,
on_compact=compaction.on_compact,
)
@@ -1040,7 +1101,10 @@ async def stream_chat_completion_sdk(
session_id=session_id,
trace_name="copilot-sdk",
tags=["sdk"],
metadata={"resume": str(use_resume)},
metadata={
"resume": str(use_resume),
"conversation_turn": str(turn),
},
)
_otel_ctx.__enter__()
@@ -1074,9 +1138,9 @@ async def stream_chat_completion_sdk(
query_message = f"{query_message}\n\n{attachments.hint}"
logger.info(
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
"%s Sending query — resume=%s, total_msgs=%d, "
"query_len=%d, attached_files=%d, image_blocks=%d",
session_id[:12],
log_prefix,
use_resume,
len(session.messages),
len(query_message),
@@ -1105,8 +1169,13 @@ async def stream_chat_completion_sdk(
await client._transport.write( # noqa: SLF001
json.dumps(user_msg) + "\n"
)
# Capture user message in transcript (multimodal)
transcript_builder.add_user_message(content=content_blocks)
else:
await client.query(query_message, session_id=session_id)
# Capture actual user message in transcript (not the engineered query)
# query_message may include context wrappers, but transcript needs raw input
transcript_builder.add_user_message(content=current_message)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
@@ -1150,8 +1219,8 @@ async def stream_chat_completion_sdk(
sdk_msg = done.pop().result()
except StopAsyncIteration:
logger.info(
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
session_id[:12],
"%s Stream ended normally (StopAsyncIteration)",
log_prefix,
)
break
except Exception as stream_err:
@@ -1160,8 +1229,8 @@ async def stream_chat_completion_sdk(
# so the session can still be saved and the
# frontend gets a clean finish.
logger.error(
"[SDK] [%s] Stream error from SDK: %s",
session_id[:12],
"%s Stream error from SDK: %s",
log_prefix,
stream_err,
exc_info=True,
)
@@ -1173,9 +1242,9 @@ async def stream_chat_completion_sdk(
break
logger.info(
"[SDK] [%s] Received: %s %s "
"%s Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
@@ -1184,6 +1253,15 @@ async def stream_chat_completion_sdk(
len(adapter.resolved_tool_calls),
)
# Capture SDK messages in transcript
if isinstance(sdk_msg, AssistantMessage):
content_blocks = _format_sdk_content_blocks(sdk_msg.content)
model_name = getattr(sdk_msg, "model", "")
transcript_builder.add_assistant_message(
content_blocks=content_blocks,
model=model_name,
)
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
# message can arrive before the hook stashes output.
@@ -1210,10 +1288,10 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0)
else:
logger.warning(
"[SDK] [%s] Timed out waiting for "
"%s Timed out waiting for "
"PostToolUse hook stash "
"(%d unresolved tool calls)",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
@@ -1221,9 +1299,9 @@ async def stream_chat_completion_sdk(
# Log ResultMessage details for debugging
if isinstance(sdk_msg, ResultMessage):
logger.info(
"[SDK] [%s] Received: ResultMessage %s "
"%s Received: ResultMessage %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
sdk_msg.subtype,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
@@ -1232,8 +1310,8 @@ async def stream_chat_completion_sdk(
)
if sdk_msg.subtype in ("error", "error_during_execution"):
logger.error(
"[SDK] [%s] SDK execution failed with error: %s",
session_id[:12],
"%s SDK execution failed with error: %s",
log_prefix,
sdk_msg.result or "(no error message provided)",
)
@@ -1258,8 +1336,8 @@ async def stream_chat_completion_sdk(
out_len = len(str(response.output))
extra = f", output_len={out_len}"
logger.info(
"[SDK] [%s] Tool event: %s, tool=%s%s",
session_id[:12],
"%s Tool event: %s, tool=%s%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
@@ -1268,8 +1346,8 @@ async def stream_chat_completion_sdk(
# Log errors being sent to frontend
if isinstance(response, StreamError):
logger.error(
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
session_id[:12],
"%s Sending error to frontend: %s (code=%s)",
log_prefix,
response.errorText,
response.code,
)
@@ -1314,17 +1392,28 @@ async def stream_chat_completion_sdk(
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
tool_result_content = (
response.output
if isinstance(response.output, str)
else str(response.output)
)
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
content=tool_result_content,
tool_call_id=response.toolCallId,
)
)
# Capture tool result in transcript as user message with tool_result content
transcript_builder.add_user_message(
content=[
{
"type": "tool_result",
"tool_use_id": response.toolCallId,
"content": tool_result_content,
}
]
)
has_tool_results = True
elif isinstance(response, StreamFinish):
@@ -1335,8 +1424,8 @@ async def stream_chat_completion_sdk(
# server shutdown). Log and let the safety-net / finally
# blocks handle cleanup.
logger.warning(
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
session_id[:12],
"%s Streaming loop cancelled (asyncio.CancelledError)",
log_prefix,
)
raise
finally:
@@ -1350,7 +1439,8 @@ async def stream_chat_completion_sdk(
except (asyncio.CancelledError, StopAsyncIteration):
# Expected: task was cancelled or exhausted during cleanup
logger.info(
"[SDK] Pending __anext__ task completed during cleanup"
"%s Pending __anext__ task completed during cleanup",
log_prefix,
)
# Safety net: if tools are still unresolved after the
@@ -1359,9 +1449,9 @@ async def stream_chat_completion_sdk(
# them now so the frontend stops showing spinners.
if adapter.has_unresolved_tool_calls:
logger.warning(
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
"%s %d unresolved tool(s) after stream loop — "
"flushing as safety net",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
)
safety_responses: list[StreamBaseResponse] = []
@@ -1372,8 +1462,8 @@ async def stream_chat_completion_sdk(
(StreamToolInputAvailable, StreamToolOutputAvailable),
):
logger.info(
"[SDK] [%s] Safety flush: %s, tool=%s",
session_id[:12],
"%s Safety flush: %s, tool=%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
)
@@ -1386,8 +1476,8 @@ async def stream_chat_completion_sdk(
# StreamFinish is published by mark_session_completed in the processor.
if not stream_completed and not ended_with_stream_error:
logger.info(
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
session_id[:12],
"%s Stream ended without ResultMessage (stopped by user)",
log_prefix,
)
closing_responses: list[StreamBaseResponse] = []
adapter._end_text_if_open(closing_responses)
@@ -1408,69 +1498,36 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# --- Upload transcript for next-turn --resume ---
# After async with the SDK task group has exited, so the Stop
# hook has already fired and the CLI has been SIGTERMed. The
# CLI uses appendFileSync, so all writes are safely on disk.
if config.claude_agent_use_resume and user_id:
# With --resume the CLI appends to the resume file (most
# complete). Otherwise use the Stop hook path.
if use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
logger.debug("[SDK] Transcript source: resume file")
elif captured_transcript.path:
raw_transcript = read_transcript_file(captured_transcript.path)
logger.debug(
"[SDK] Transcript source: stop hook (%s), read result: %s",
captured_transcript.path,
f"{len(raw_transcript)}B" if raw_transcript else "None",
)
else:
raw_transcript = None
# Transcript upload is handled exclusively in the finally block
# to avoid double-uploads (the success path used to upload the
# old resume file, then the finally block overwrote it with the
# stop hook content — which could be smaller after compaction).
if not raw_transcript:
logger.debug(
"[SDK] No usable transcript — CLI file had no "
"conversation entries (expected for first turn "
"without --resume)"
)
if raw_transcript:
# Shield the upload from generator cancellation so a
# client disconnect / page refresh doesn't lose the
# transcript. The upload must finish even if the SSE
# connection is torn down.
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
)
logger.info(
"[SDK] [%s] Stream completed successfully with %d messages",
session_id[:12],
len(session.messages),
)
if ended_with_stream_error:
logger.warning(
"%s Stream ended with SDK error after %d messages",
log_prefix,
len(session.messages),
)
else:
logger.info(
"%s Stream completed successfully with %d messages",
log_prefix,
len(session.messages),
)
except BaseException as e:
# Catch BaseException to handle both Exception and CancelledError
# (CancelledError inherits from BaseException in Python 3.8+)
if isinstance(e, asyncio.CancelledError):
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
logger.warning("%s Session cancelled", log_prefix)
error_msg = "Operation cancelled"
else:
error_msg = str(e) or type(e).__name__
# SDK cleanup RuntimeError is expected during cancellation, log as warning
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
logger.warning(
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
)
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
else:
logger.error(
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
)
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
# Append error marker to session (non-invasive text parsing approach)
# The finally block will persist the session with this error marker
@@ -1481,8 +1538,8 @@ async def stream_chat_completion_sdk(
)
)
logger.debug(
"[SDK] [%s] Appended error marker, will be persisted in finally",
session_id[:12],
"%s Appended error marker, will be persisted in finally",
log_prefix,
)
# Yield StreamError for immediate feedback (only for non-cancellation errors)
@@ -1514,47 +1571,61 @@ async def stream_chat_completion_sdk(
try:
await asyncio.shield(upsert_chat_session(session))
logger.info(
"[SDK] [%s] Session persisted in finally with %d messages",
session_id[:12],
"%s Session persisted in finally with %d messages",
log_prefix,
len(session.messages),
)
except Exception as persist_err:
logger.error(
"[SDK] [%s] Failed to persist session in finally: %s",
session_id[:12],
"%s Failed to persist session in finally: %s",
log_prefix,
persist_err,
exc_info=True,
)
# --- Upload transcript for next-turn --resume ---
# This MUST run in finally so the transcript is uploaded even when
# the streaming loop raises an exception. The CLI uses
# appendFileSync, so whatever was written before the error/SIGTERM
# is safely on disk and still useful for the next turn.
if config.claude_agent_use_resume and user_id:
# the streaming loop raises an exception.
# The transcript represents the COMPLETE active context (atomic).
if config.claude_agent_use_resume and user_id and session is not None:
try:
# Prefer content captured in the Stop hook (read before
# cleanup removes the file). Fall back to the resume
# file when the stop hook didn't fire (e.g. error before
# completion) so we don't lose the prior transcript.
raw_transcript = captured_transcript.raw_content or None
if not raw_transcript and use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
# Build complete transcript from captured SDK messages
transcript_content = transcript_builder.to_jsonl()
if raw_transcript and session is not None:
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
if not transcript_content:
logger.warning(
"%s No transcript to upload (builder empty)", log_prefix
)
elif not validate_transcript(transcript_content):
logger.warning(
"%s Transcript invalid, skipping upload (entries=%d)",
log_prefix,
transcript_builder.entry_count,
)
else:
logger.warning(f"[SDK] No transcript to upload for {session_id}")
logger.info(
"%s Uploading complete transcript (entries=%d, bytes=%d)",
log_prefix,
transcript_builder.entry_count,
len(transcript_content),
)
# Shield upload from cancellation - let it complete even if
# the finally block is interrupted. No timeout to avoid race
# conditions where backgrounded uploads overwrite newer transcripts.
await asyncio.shield(
upload_transcript(
user_id=user_id,
session_id=session_id,
content=transcript_content,
message_count=len(session.messages),
log_prefix=log_prefix,
)
)
except Exception as upload_err:
logger.error(
f"[SDK] Transcript upload failed in finally: {upload_err}",
"%s Transcript upload failed in finally: %s",
log_prefix,
upload_err,
exc_info=True,
)
@@ -1565,33 +1636,6 @@ async def stream_chat_completion_sdk(
await lock.release()
async def _try_upload_transcript(
user_id: str,
session_id: str,
raw_content: str,
message_count: int = 0,
) -> bool:
"""Strip progress entries and upload transcript (with timeout).
Returns True if the upload completed without error.
"""
try:
async with asyncio.timeout(30):
await upload_transcript(
user_id, session_id, raw_content, message_count=message_count
)
return True
except asyncio.TimeoutError:
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
return False
except Exception as e:
logger.error(
f"[SDK] Failed to upload transcript for {session_id}: {e}",
exc_info=True,
)
return False
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
@@ -1600,7 +1644,7 @@ async def _update_title_async(
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
if title and user_id:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch
import pytest
from .service import _prepare_file_attachments
from .service import _generate_tool_documentation, _prepare_file_attachments
@dataclass
@@ -145,3 +145,94 @@ class TestPrepareFileAttachments:
assert "Read tool" not in result.hint
assert len(result.image_blocks) == 1
class TestGenerateToolDocumentation:
"""Tests for auto-generated tool documentation from TOOL_REGISTRY."""
def test_generate_tool_documentation_structure(self):
"""Test that tool documentation has expected structure."""
docs = _generate_tool_documentation()
# Check main sections exist
assert "## AVAILABLE TOOLS" in docs
assert "## KEY WORKFLOWS" in docs
# Verify no duplicate sections
assert docs.count("## AVAILABLE TOOLS") == 1
assert docs.count("## KEY WORKFLOWS") == 1
def test_tool_documentation_includes_key_tools(self):
"""Test that documentation includes essential copilot tools."""
docs = _generate_tool_documentation()
# Core agent workflow tools
assert "`create_agent`" in docs
assert "`run_agent`" in docs
assert "`find_library_agent`" in docs
assert "`edit_agent`" in docs
# MCP integration
assert "`run_mcp_tool`" in docs
# Browser automation
assert "`browser_navigate`" in docs
# Folder management
assert "`create_folder`" in docs
def test_tool_documentation_format(self):
"""Test that each tool follows bullet list format."""
docs = _generate_tool_documentation()
lines = docs.split("\n")
tool_lines = [line for line in lines if line.strip().startswith("- **`")]
# Should have multiple tools (at least 20 from TOOL_REGISTRY)
assert len(tool_lines) >= 20
# Each tool line should have proper markdown format
for line in tool_lines:
assert line.startswith("- **`"), f"Bad format: {line}"
assert "`**:" in line, f"Missing description separator: {line}"
def test_tool_documentation_includes_workflows(self):
"""Test that key workflow patterns are documented."""
docs = _generate_tool_documentation()
# Check workflow sections
assert "MCP Integration Workflow" in docs
assert "Agent Creation Workflow" in docs
assert "Folder Management" in docs
# Check workflow details
assert "suggested_goal" in docs # Agent creation feedback loop
assert "clarifying_questions" in docs # Agent creation feedback loop
assert "run_mcp_tool(server_url)" in docs # MCP discovery pattern
def test_tool_documentation_completeness(self):
"""Test that all tools from TOOL_REGISTRY appear in documentation."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Verify each registered tool is documented
for tool_name in TOOL_REGISTRY.keys():
assert (
f"`{tool_name}`" in docs
), f"Tool '{tool_name}' missing from auto-generated documentation"
def test_tool_documentation_no_duplicate_tools(self):
"""Test that no tool appears multiple times in the list."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Extract the tools section (before KEY WORKFLOWS)
tools_section = docs.split("## KEY WORKFLOWS")[0]
# Count occurrences of each tool
for tool_name in TOOL_REGISTRY.keys():
# Count how many times this tool appears as a bullet point
count = tools_section.count(f"- **`{tool_name}`**")
assert count == 1, f"Tool '{tool_name}' appears {count} times (should be 1)"

View File

@@ -10,13 +10,14 @@ Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from backend.util import json
logger = logging.getLogger(__name__)
# UUIDs are hex + hyphens; strip everything else to prevent path injection.
@@ -58,41 +59,37 @@ def strip_progress_entries(content: str) -> str:
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
any remaining child entries so the ``parentUuid`` chain stays intact.
Typically reduces transcript size by ~30%.
Entries that are not stripped or reparented are kept as their original
raw JSON line to avoid unnecessary re-serialization that changes
whitespace or key ordering.
"""
lines = content.strip().split("\n")
entries: list[dict] = []
# Parse entries, keeping the original line alongside the parsed dict.
parsed: list[tuple[str, dict | None]] = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Keep unparseable lines as-is (safety)
entries.append({"_raw": line})
parsed.append((line, json.loads(line, fallback=None)))
# First pass: identify stripped UUIDs and build parent map.
stripped_uuids: set[str] = set()
uuid_to_parent: dict[str, str] = {}
kept: list[dict] = []
for entry in entries:
if "_raw" in entry:
kept.append(entry)
for _line, entry in parsed:
if not isinstance(entry, dict):
continue
uid = entry.get("uuid", "")
parent = entry.get("parentUuid", "")
entry_type = entry.get("type", "")
if uid:
uuid_to_parent[uid] = parent
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
stripped_uuids.add(uid)
if entry_type in STRIPPABLE_TYPES:
if uid:
stripped_uuids.add(uid)
else:
kept.append(entry)
# Reparent: walk up chain through stripped entries to find surviving ancestor
for entry in kept:
if "_raw" in entry:
# Second pass: keep non-stripped entries, reparenting where needed.
# Preserve original line when no reparenting is required.
reparented: set[str] = set()
for _line, entry in parsed:
if not isinstance(entry, dict):
continue
parent = entry.get("parentUuid", "")
original_parent = parent
@@ -100,63 +97,32 @@ def strip_progress_entries(content: str) -> str:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
uid = entry.get("uuid", "")
if uid:
reparented.add(uid)
result_lines: list[str] = []
for entry in kept:
if "_raw" in entry:
result_lines.append(entry["_raw"])
else:
for line, entry in parsed:
if not isinstance(entry, dict):
result_lines.append(line)
continue
if entry.get("type", "") in STRIPPABLE_TYPES:
continue
uid = entry.get("uuid", "")
if uid in reparented:
# Re-serialize only entries whose parentUuid was changed.
result_lines.append(json.dumps(entry, separators=(",", ":")))
else:
result_lines.append(line)
return "\n".join(result_lines) + "\n"
# ---------------------------------------------------------------------------
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
# Local file I/O (write temp file for --resume)
# ---------------------------------------------------------------------------
def read_transcript_file(transcript_path: str) -> str | None:
"""Read a JSONL transcript file from disk.
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
or only contains metadata (≤2 lines with no conversation messages).
"""
if not transcript_path or not os.path.isfile(transcript_path):
logger.debug(f"[Transcript] File not found: {transcript_path}")
return None
try:
with open(transcript_path) as f:
content = f.read()
if not content.strip():
logger.debug("[Transcript] File is empty: %s", transcript_path)
return None
lines = content.strip().split("\n")
# Validate that the transcript has real conversation content
# (not just metadata like queue-operation entries).
if not validate_transcript(content):
logger.debug(
"[Transcript] No conversation content (%d lines) in %s",
len(lines),
transcript_path,
)
return None
logger.info(
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
return None
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
"""Sanitize an ID for safe use in file paths.
@@ -171,14 +137,6 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
def _encode_cwd_for_cli(cwd: str) -> str:
"""Encode a working directory path the same way the Claude CLI does.
The CLI replaces all non-alphanumeric characters with ``-``.
"""
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""Remove the CLI's project directory for a specific working directory.
@@ -188,7 +146,8 @@ def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""
import shutil
cwd_encoded = _encode_cwd_for_cli(sdk_cwd)
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
@@ -248,32 +207,29 @@ def write_transcript_to_tempfile(
def validate_transcript(content: str | None) -> bool:
"""Check that a transcript has actual conversation messages.
A valid transcript for resume needs at least one user message and one
assistant message (not just queue-operation / file-history-snapshot
metadata).
A valid transcript needs at least one assistant message (not just
queue-operation / file-history-snapshot metadata). We do NOT require
a ``type: "user"`` entry because with ``--resume`` the user's message
is passed as a CLI query parameter and does not appear in the
transcript file.
"""
if not content or not content.strip():
return False
lines = content.strip().split("\n")
if len(lines) < 2:
return False
has_user = False
has_assistant = False
for line in lines:
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
has_user = True
elif msg_type == "assistant":
has_assistant = True
except json.JSONDecodeError:
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
return False
if entry.get("type") == "assistant":
has_assistant = True
return has_user and has_assistant
return has_assistant
# ---------------------------------------------------------------------------
@@ -328,26 +284,41 @@ async def upload_transcript(
session_id: str,
content: str,
message_count: int = 0,
log_prefix: str = "[Transcript]",
) -> None:
"""Strip progress entries and upload transcript to bucket storage.
"""Strip progress entries and upload complete transcript.
The transcript represents the FULL active context (atomic).
Each upload REPLACES the previous transcript entirely.
The executor holds a cluster lock per session, so concurrent uploads for
the same session cannot happen. We always overwrite — with ``--resume``
the CLI may compact old tool results, so neither byte size nor line count
is a reliable proxy for "newer".
the same session cannot happen.
Args:
message_count: ``len(session.messages)`` at upload time — used by
the next turn to detect staleness and compress only the gap.
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
entry = json.loads(line, fallback={"type": "INVALID_JSON"})
entry_types.append(entry.get("type", "?"))
logger.warning(
f"[Transcript] Skipping upload — stripped content not valid "
f"for session {session_id}"
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
log_prefix,
entry_types,
len(stripped),
len(content),
)
logger.debug("%s Raw content preview: %s", log_prefix, content[:500])
logger.debug("%s Stripped content: %s", log_prefix, stripped[:500])
return
storage = await get_workspace_storage()
@@ -373,17 +344,18 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.info(
f"[Transcript] Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count}) "
f"for session {session_id}"
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
)
async def download_transcript(
user_id: str, session_id: str
user_id: str,
session_id: str,
log_prefix: str = "[Transcript]",
) -> TranscriptDownload | None:
"""Download transcript and metadata from bucket storage.
@@ -399,10 +371,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
logger.debug(f"{log_prefix} No transcript in storage")
return None
except Exception as e:
logger.warning(f"[Transcript] Failed to download transcript: {e}")
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -419,16 +391,13 @@ async def download_transcript(
meta_path = f"local://{mwid}/{mfid}/{mfname}"
meta_data = await storage.retrieve(meta_path)
meta = json.loads(meta_data.decode("utf-8"))
meta = json.loads(meta_data.decode("utf-8"), fallback={})
message_count = meta.get("message_count", 0)
uploaded_at = meta.get("uploaded_at", 0.0)
except (FileNotFoundError, json.JSONDecodeError, Exception):
except (FileNotFoundError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
logger.info(
f"[Transcript] Downloaded {len(content)}B "
f"(msg_count={message_count}) for session {session_id}"
)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
return TranscriptDownload(
content=content,
message_count=message_count,

View File

@@ -0,0 +1,150 @@
"""Build complete JSONL transcript from SDK messages.
The transcript represents the FULL active context at any point in time.
Each upload REPLACES the previous transcript atomically.
Flow:
Turn 1: Upload [msg1, msg2]
Turn 2: Download [msg1, msg2] → Upload [msg1, msg2, msg3, msg4] (REPLACE)
Turn 3: Download [msg1, msg2, msg3, msg4] → Upload [all messages] (REPLACE)
The transcript is never incremental - always the complete atomic state.
"""
import logging
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
from backend.util import json
from .transcript import STRIPPABLE_TYPES
logger = logging.getLogger(__name__)
class TranscriptEntry(BaseModel):
"""Single transcript entry (user or assistant turn)."""
type: str
uuid: str
parentUuid: str | None
message: dict[str, Any]
class TranscriptBuilder:
"""Build complete JSONL transcript from SDK messages.
This builder maintains the FULL conversation state, not incremental changes.
The output is always the complete active context.
"""
def __init__(self) -> None:
self._entries: list[TranscriptEntry] = []
self._last_uuid: str | None = None
def load_previous(self, content: str, log_prefix: str = "[Transcript]") -> None:
"""Load complete previous transcript.
This loads the FULL previous context. As new messages come in,
we append to this state. The final output is the complete context
(previous + new), not just the delta.
"""
if not content or not content.strip():
return
lines = content.strip().split("\n")
for line_num, line in enumerate(lines, 1):
if not line.strip():
continue
data = json.loads(line, fallback=None)
if data is None:
logger.warning(
"%s Failed to parse transcript line %d/%d",
log_prefix,
line_num,
len(lines),
)
continue
# Load all non-strippable entries (user/assistant/system/etc.)
# Skip only STRIPPABLE_TYPES to match strip_progress_entries() behavior
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
continue
entry = TranscriptEntry(
type=data["type"],
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
message=data.get("message", {}),
)
self._entries.append(entry)
self._last_uuid = entry.uuid
logger.info(
"%s Loaded %d entries from previous transcript (last_uuid=%s)",
log_prefix,
len(self._entries),
self._last_uuid[:12] if self._last_uuid else None,
)
def add_user_message(
self, content: str | list[dict], uuid: str | None = None
) -> None:
"""Add user message to the complete context."""
msg_uuid = uuid or str(uuid4())
self._entries.append(
TranscriptEntry(
type="user",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={"role": "user", "content": content},
)
)
self._last_uuid = msg_uuid
def add_assistant_message(
self, content_blocks: list[dict], model: str = ""
) -> None:
"""Add assistant message to the complete context."""
msg_uuid = str(uuid4())
self._entries.append(
TranscriptEntry(
type="assistant",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={
"role": "assistant",
"model": model,
"content": content_blocks,
},
)
)
self._last_uuid = msg_uuid
def to_jsonl(self) -> str:
"""Export complete context as JSONL.
Returns the FULL conversation state (all entries), not incremental.
This output REPLACES any previous transcript.
"""
if not self._entries:
return ""
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
return "\n".join(lines) + "\n"
@property
def entry_count(self) -> int:
"""Total number of entries in the complete context."""
return len(self._entries)
@property
def is_empty(self) -> bool:
"""Whether this builder has any entries."""
return len(self._entries) == 0

View File

@@ -1,11 +1,11 @@
"""Unit tests for JSONL transcript management utilities."""
import json
import os
from backend.util import json
from .transcript import (
STRIPPABLE_TYPES,
read_transcript_file,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
@@ -38,49 +38,6 @@ PROGRESS_ENTRY = {
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
# --- read_transcript_file ---
class TestReadTranscriptFile:
def test_returns_content_for_valid_file(self, tmp_path):
path = tmp_path / "session.jsonl"
path.write_text(VALID_TRANSCRIPT)
result = read_transcript_file(str(path))
assert result is not None
assert "user" in result
def test_returns_none_for_missing_file(self):
assert read_transcript_file("/nonexistent/path.jsonl") is None
def test_returns_none_for_empty_path(self):
assert read_transcript_file("") is None
def test_returns_none_for_empty_file(self, tmp_path):
path = tmp_path / "empty.jsonl"
path.write_text("")
assert read_transcript_file(str(path)) is None
def test_returns_none_for_metadata_only(self, tmp_path):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
path = tmp_path / "meta.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
def test_no_size_limit(self, tmp_path):
"""Large files are accepted — bucket storage has no size limit."""
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
result = read_transcript_file(str(path))
assert result is not None
# --- write_transcript_to_tempfile ---
@@ -155,12 +112,56 @@ class TestValidateTranscript:
assert validate_transcript(content) is False
def test_assistant_only_no_user(self):
"""With --resume the user message is a CLI query param, not a transcript entry.
A transcript with only assistant entries is valid."""
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
assert validate_transcript(content) is False
assert validate_transcript(content) is True
def test_resume_transcript_without_user_entry(self):
"""Simulates a real --resume stop hook transcript: the CLI session file
has summary + assistant entries but no user entry."""
summary = {"type": "summary", "uuid": "s1", "text": "context..."}
asst1 = {
"type": "assistant",
"uuid": "a1",
"message": {"role": "assistant", "content": "Hello!"},
}
asst2 = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "a1",
"message": {"role": "assistant", "content": "Sure, let me help."},
}
content = _make_jsonl(summary, asst1, asst2)
assert validate_transcript(content) is True
def test_single_assistant_entry(self):
"""A transcript with just one assistant line is valid — the CLI may
produce short transcripts for simple responses with no tool use."""
content = json.dumps(ASST_MSG) + "\n"
assert validate_transcript(content) is True
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False
def test_malformed_json_after_valid_assistant_returns_false(self):
"""Validation must scan all lines - malformed JSON anywhere should fail."""
valid_asst = json.dumps(ASST_MSG)
malformed = "not valid json"
content = valid_asst + "\n" + malformed + "\n"
assert validate_transcript(content) is False
def test_blank_lines_are_skipped(self):
"""Transcripts with blank lines should be valid if they contain assistant entries."""
content = (
json.dumps(USER_MSG)
+ "\n\n" # blank line
+ json.dumps(ASST_MSG)
+ "\n"
+ "\n" # another blank line
)
assert validate_transcript(content) is True
# --- strip_progress_entries ---
@@ -253,3 +254,31 @@ class TestStripProgressEntries:
assert "queue-operation" not in result_types
assert "user" in result_types
assert "assistant" in result_types
def test_preserves_original_line_formatting(self):
"""Non-reparented entries keep their original JSON formatting."""
# orjson produces compact JSON - test that we preserve the exact input
# when no reparenting is needed (no re-serialization)
original_line = json.dumps(USER_MSG)
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
result = strip_progress_entries(content)
result_lines = result.strip().split("\n")
# Original line should be byte-identical (not re-serialized)
assert result_lines[0] == original_line
def test_reparented_entries_are_reserialized(self):
"""Entries whose parentUuid changes must be re-serialized."""
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p1",
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, progress, asst)
result = strip_progress_entries(content)
lines = result.strip().split("\n")
asst_entry = json.loads(lines[-1])
assert asst_entry["parentUuid"] == "u1" # reparented

View File

@@ -34,8 +34,9 @@ client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# Default system prompt used when Langfuse is not configured
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
# Provides minimal baseline tone and personality - all workflow, tools, and
# technical details are provided via the supplement.
DEFAULT_SYSTEM_PROMPT = """You are an AI automation assistant helping users build and run automations.
Here is everything you know about the current user from previous interactions:
@@ -43,113 +44,12 @@ Here is everything you know about the current user from previous interactions:
{users_information}
</users_information>
## YOUR CORE MANDATE
Your goal is to help users automate tasks by:
- Understanding their needs and business context
- Building and running working automations
- Delivering tangible value through action, not just explanation
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- **For live integrations** (read a GitHub repo, query a database, post to Slack, etc.) consider `run_mcp_tool` — it connects directly to external services without building a full agent
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
**MCP (Model Context Protocol) Servers:**
- `run_mcp_tool`: Connect to any MCP server to discover and run its tools
**Two-step flow:**
1. `run_mcp_tool(server_url)` → returns a list of available tools. Each tool has `name`, `description`, and `input_schema` (JSON Schema). Read `input_schema.properties` to understand what arguments are needed.
2. `run_mcp_tool(server_url, tool_name, tool_arguments)` → executes the tool. Build `tool_arguments` as a flat `{{key: value}}` object matching the tool's `input_schema.properties`.
**Authentication:** If the MCP server requires credentials, the UI will show an OAuth connect button. Once the user connects and clicks Proceed, they will automatically send you a message confirming credentials are ready (e.g. "I've connected the MCP server credentials. Please retry run_mcp_tool..."). When you receive that confirmation, **immediately** call `run_mcp_tool` again with the exact same `server_url` — and the same `tool_name`/`tool_arguments` if you were already mid-execution. Do not ask the user what to do next; just retry.
**Finding server URLs (fastest → slowest):**
1. **Known hosted servers** — use directly, no lookup:
- Notion: `https://mcp.notion.com/mcp`
- Linear: `https://mcp.linear.app/mcp`
- Stripe: `https://mcp.stripe.com`
- Intercom: `https://mcp.intercom.com/mcp`
- Cloudflare: `https://mcp.cloudflare.com/mcp`
- Atlassian (Jira/Confluence): `https://mcp.atlassian.com/mcp`
2. **`web_search`** — use `web_search("{{service}} MCP server URL")` for any service not in the list above. This is the fastest way to find unlisted servers.
3. **Registry API** — `web_fetch("https://registry.modelcontextprotocol.io/v0.1/servers?search={{query}}&limit=10")` to browse what's available. Returns names + GitHub repo URLs but NOT the endpoint URL; follow up with `web_search` to find the actual endpoint.
- **Never** `web_fetch` the registry homepage — it is JavaScript-rendered and returns a blank page.
**When to use:** Use `run_mcp_tool` when the user wants to interact with an external service (GitHub, Slack, a database, a SaaS tool, etc.) via its MCP integration. Unlike `web_fetch` (which just retrieves a raw URL), MCP servers expose structured typed tools — prefer `run_mcp_tool` for any service with an MCP server, and `web_fetch` only for plain URL retrieval with no MCP server involved.
**CRITICAL**: `run_mcp_tool` is **always available** in your tool list. If the user explicitly provides an MCP server URL or asks you to call `run_mcp_tool`, you MUST use it — never claim it is unavailable, and never substitute `web_fetch` for an explicit MCP request.
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
- **For MCP integrations**: Known URL (see list) or `web_search("{{service}} MCP server URL")` → `run_mcp_tool(server_url)` → `run_mcp_tool(server_url, tool_name, tool_arguments)`. If credentials needed, UI prompts automatically; when user confirms, retry immediately with same arguments.
**Handle Feedback Loops:**
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
- Don't ask redundant questions if the user has already provided context in the conversation
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
Be concise, proactive, and action-oriented. Bias toward showing working solutions over lengthy explanations."""
# ---------------------------------------------------------------------------

View File

@@ -15,6 +15,7 @@ from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data import workspace as workspace_db
# Import dynamic field utilities from centralized location
from backend.data.block import BlockInput, BlockOutputEntry
@@ -32,7 +33,6 @@ from backend.data.execution import (
from backend.data.graph import GraphModel, Node
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.workspace import get_or_create_workspace
from backend.util.clients import (
get_async_execution_event_bus,
get_async_execution_queue,
@@ -831,8 +831,9 @@ async def add_graph_execution(
udb = user_db
gdb = graph_db
odb = onboarding_db
wdb = workspace_db
else:
edb = udb = gdb = odb = get_database_manager_async_client()
edb = udb = gdb = odb = wdb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
@@ -892,7 +893,7 @@ async def add_graph_execution(
if execution_context is None:
user = await udb.get_user_by_id(user_id)
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
workspace = await get_or_create_workspace(user_id)
workspace = await wdb.get_or_create_workspace(user_id)
execution_context = ExecutionContext(
# Execution identity

View File

@@ -368,12 +368,10 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
# Setup mock returns
# The function returns (graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip)
@@ -649,12 +647,10 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
# Setup returns - include nodes_to_skip in the tuple
mock_validate.return_value = (

View File

@@ -72,19 +72,58 @@ def dumps(
T = TypeVar("T")
@overload
def loads(data: str | bytes, *args, target_type: Type[T], **kwargs) -> T: ...
# Sentinel value to detect when fallback is not provided
_NO_FALLBACK = object()
@overload
def loads(data: str | bytes, *args, **kwargs) -> Any: ...
def loads(
data: str | bytes, *args, target_type: Type[T], fallback: T | None = None, **kwargs
) -> T:
pass
@overload
def loads(data: str | bytes, *args, fallback: Any = None, **kwargs) -> Any:
pass
def loads(
data: str | bytes, *args, target_type: Type[T] | None = None, **kwargs
data: str | bytes,
*args,
target_type: Type[T] | None = None,
fallback: Any = _NO_FALLBACK,
**kwargs,
) -> Any:
parsed = orjson.loads(data)
"""Parse JSON with optional fallback on decode errors.
Args:
data: JSON string or bytes to parse
target_type: Optional type to validate/cast result to
fallback: Value to return on JSONDecodeError. If not provided, raises.
**kwargs: Additional arguments (unused, for compatibility)
Returns:
Parsed JSON data, or fallback value if parsing fails
Raises:
orjson.JSONDecodeError: Only if fallback is not provided
Examples:
>>> loads('{"valid": "json"}')
{'valid': 'json'}
>>> loads('invalid json', fallback=None)
None
>>> loads('invalid json', fallback={})
{}
>>> loads('invalid json') # raises orjson.JSONDecodeError
"""
try:
parsed = orjson.loads(data)
except orjson.JSONDecodeError:
if fallback is not _NO_FALLBACK:
return fallback
raise
if target_type:
return type_match(parsed, target_type)