Compare commits

...

12 Commits

Author SHA1 Message Date
Zamil Majdy
3d880cd591 refactor(backend/copilot): move imports to module level
- Move KEY_WORKFLOWS and TOOL_REGISTRY imports to top of file
- Better code organization following Python conventions
2026-03-06 23:15:39 +07:00
Zamil Majdy
73f5ff9983 test(backend/copilot): add tests for auto-generated tool documentation
- Test tool documentation structure (sections, format)
- Test that all TOOL_REGISTRY tools are included
- Test workflow sections are present
- Test no duplicate tools
- Verify markdown formatting compliance
- All 6 tests passing
2026-03-06 23:15:39 +07:00
Zamil Majdy
6d9faf5f91 refactor(backend/copilot): auto-generate tool docs in supplement, simplify default prompt
- Add _generate_tool_documentation() to auto-generate tool list from TOOL_REGISTRY
- Extract KEY_WORKFLOWS constant to prompt_constants.py for maintainability
- Append auto-generated tool docs + workflow guidance to system prompt supplement
- Simplify DEFAULT_SYSTEM_PROMPT to minimal tone/style baseline (Langfuse handles details)
- Add KEY WORKFLOWS section covering MCP integration, agent creation, folder management
- Ensures tool documentation stays in sync with actual implementations
- Fix Pyright error by safely accessing description field with .get()
2026-03-06 23:10:42 +07:00
Zamil Majdy
7774717104 docs(backend/copilot): document web_search and web_fetch in tool supplement
Add clear documentation for web_search and web_fetch to the shared tool notes
that get appended to all system prompts (Langfuse or default). This ensures
the copilot knows to use web_search for general web queries instead of
incorrectly using find_block to search for web search blocks.

- web_search: For current information beyond knowledge cutoff
- web_fetch: For retrieving content from specific URLs
2026-03-06 23:10:42 +07:00
Zamil Majdy
89ed628609 fix(backend/copilot): capture tool results in transcript
Tool results (StreamToolOutputAvailable) were being added to session.messages
but NOT to transcript_builder, causing the transcript to miss tool executions.
This made the copilot claim '(no tool used)' when tools were actually called.

Now tool results are captured as user messages with tool_result content blocks,
matching the Claude API transcript format and ensuring --resume has complete
conversation history including all tool interactions.
2026-03-06 23:10:42 +07:00
Zamil Majdy
d56452898a hotfix(backend/copilot): refactor transcript to SDK-based atomic full-context model (#12318)
## Summary

Major refactor to eliminate CLI transcript race conditions and simplify
the codebase by building transcripts directly from SDK messages instead
of reading CLI files.

## Problem

The previous approach had race conditions:
- SDK reads CLI transcript file during stop hook
- CLI may not have finished writing → incomplete transcript
- Complex merge logic to detect and fix incomplete writes
- ~200 lines of synthetic entry detection and merge code

## Solution

**Atomic Full-Context Transcript Model:**
- Build transcript from SDK messages during streaming
(`TranscriptBuilder`)
- Each upload REPLACES the previous transcript entirely (atomic)
- No CLI file reading → no race conditions
- Eliminates all merge complexity

## Key Changes

### Core Refactor
- **NEW**: `transcript_builder.py` - Build JSONL from SDK messages
during streaming
- **SIMPLIFIED**: `transcript.py` - Removed merge logic, simplified
upload/download
- **SIMPLIFIED**: `service.py` - Use TranscriptBuilder, removed stop
hook callback
- **CLEANED**: `security_hooks.py` - Removed `on_stop` parameter

### Performance & Code Quality
- **orjson migration**: Use `backend.util.json` (2-3x faster than
stdlib)
- Added `fallback` parameter to `json.loads()` for cleaner error
handling
- Moved SDK imports to top-level per code style guidelines

### Bug Fixes
- Fixed garbage collection bug in background task handling
- Fixed double upload bug in timeout handling  
- Downgraded PII-risk logging from WARNING to DEBUG
- Added 30s timeout to prevent session lock hang

## Code Removed (~200 lines)

- `merge_with_previous_transcript()` - No longer needed
- `read_transcript_file()` - No longer needed
- `CapturedTranscript` dataclass - No longer needed
- `_on_stop()` callback - No longer needed
- Synthetic entry detection logic - No longer needed
- Manual append/merge logic in finally block - No longer needed

## Testing

-  All transcript tests passing (24/24)
-  Verified with real session logs showing proper transcript growth
-  Verified with Langfuse traces showing proper turn tracking (1-8)

## Transcript Growth Pattern

From session logs:
- **Turn 1**: 2 entries (initial)
- **Turn 2**: 5 entries (+3), 2257B uploaded
- **Turn N**: ~2N entries (linear growth)

Each upload is the **complete atomic state** - always REPLACES, never
incremental.

## Files Changed

```
backend/copilot/sdk/transcript_builder.py (NEW)   | +140 lines
backend/copilot/sdk/transcript.py                  | -198, +125 lines  
backend/copilot/sdk/service.py                     | -214, +160 lines
backend/copilot/sdk/security_hooks.py              | -33, +10 lines
backend/copilot/sdk/transcript_test.py             | -85, +36 lines
backend/util/json.py                               | +45 lines
```

**Net result**: -200 lines, more reliable, faster JSON operations.

## Migration Notes

This is a **breaking change** for any code that:
- Directly calls `merge_with_previous_transcript()` or
`read_transcript_file()`
- Relies on incremental transcript uploads
- Expects stop hook callbacks

All internal usage has been updated.

---

@ntindle - Tagging for autogpt-reviewer
2026-03-06 21:03:49 +07:00
Otto
3e108a813a fix(backend): Use db_manager for workspace in add_graph_execution (#12312)
When `add_graph_execution` is called from a context where the global
Prisma client isn't connected (e.g. CoPilot tools, external API), the
call to `get_or_create_workspace(user_id)` crashes with
`ClientNotConnectedError` because it directly accesses
`UserWorkspace.prisma()`.

The fix adds `workspace_db` to the existing `if prisma.is_connected()`
fallback pattern, consistent with how all other DB calls in the function
already work.

**Sentry:** AUTOGPT-SERVER-83T (and ~15 related issues going back to Jan
2026)

---
Co-authored-by: Reinier van der Leer (@Pwuts) <pwuts@agpt.co>

Co-authored-by: Reinier van der Leer (@Pwuts) <pwuts@agpt.co>
2026-03-06 08:48:15 +01:00
Zamil Majdy
0b9e0665dd Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT 2026-03-06 02:32:36 +07:00
Zamil Majdy
be18436e8f Merge branch 'master' of github.com:Significant-Gravitas/AutoGPT into dev 2026-03-06 02:31:40 +07:00
Zamil Majdy
f6f268a1f0 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into HEAD 2026-03-06 02:29:56 +07:00
Zamil Majdy
ea0333c1fc fix(copilot): always upload transcript instead of size-based skip (#12303)
## Summary

Fixes copilot sessions "forgetting" previous turns due to stale
transcript storage.

**Root cause:** The transcript upload logic used byte size comparison
(`existing >= new → skip`) to prevent overwriting newer transcripts with
older ones. However, with `--resume` the CLI compacts old tool results,
so newer transcripts can have **fewer bytes** despite containing **more
conversation events**. This caused the stored transcript to freeze at
whatever the largest historical upload was — every subsequent turn
downloaded the same stale transcript and the agent lost context of
recent turns.

**Evidence from prod session `41a3814c`:**
- Stored transcript: 764KB (frozen, never updated)
- Turn 1 output: 379KB (75 lines) → upload skipped (764KB >= 379KB)
- Turn 2 output: 422KB (71 lines) → upload skipped (764KB >= 422KB)
- Turn 3 output: **empty** → upload skipped
- Agent resumed from the same stale 764KB transcript every turn, losing
context of the PR it created

**Fix:** Remove the size comparison entirely. The executor holds a
cluster lock per session, so concurrent uploads cannot race. Just always
overwrite with the latest transcript.

## Test plan
- [x] `poetry run pytest backend/copilot/sdk/transcript_test.py` — 25/25
pass
- [x] All pre-commit hooks pass
- [ ] After deploy: verify multi-turn sessions retain context across
turns
2026-03-06 02:26:52 +07:00
Otto
aa7a2f0a48 hotfix(frontend/signup): Add missing createUser() call in password signup (#12287)
Requested by @0ubbe

Password signup was missing the backend `createUser()` call that the
OAuth callback flow already had. This caused `getOnboardingStatus()` to
fail/hang for new users whose backend record didn't exist yet, resulting
in an infinite spinner after account creation.

## Root Cause

| Flow | createUser() | getOnboardingStatus() | Result |
|------|-------------|----------------------|--------|
| OAuth signup |  Called |  Works | Redirects correctly |
| Password signup |  Missing |  Fails/hangs | Infinite spinner |

## Fix

Adds `createUser()` call in `signup/actions.ts` after session is set,
before onboarding status check — matching the OAuth callback pattern.
Includes error handling with Sentry reporting.

## Testing

- Create a new password account → should redirect without spinner
- OAuth signup unaffected (no changes to that flow)

Fixes OPEN-3023

---------

Co-authored-by: Lluis Agusti <hi@llu.lu>
2026-03-05 16:11:51 +08:00
12 changed files with 751 additions and 540 deletions

View File

@@ -0,0 +1,29 @@
"""Prompt constants for CoPilot - workflow guidance and supplementary documentation.
This module contains workflow patterns and guidance that supplement the main system prompt.
These are appended dynamically to the prompt along with auto-generated tool documentation.
"""
# Workflow guidance for key tool patterns
# This is appended after the auto-generated tool list to provide usage patterns
KEY_WORKFLOWS = """
## KEY WORKFLOWS
### MCP Integration Workflow
When using `run_mcp_tool`:
1. **Known servers** (use directly): Notion (https://mcp.notion.com/mcp), Linear (https://mcp.linear.app/mcp), Stripe (https://mcp.stripe.com), Intercom (https://mcp.intercom.com/mcp), Cloudflare (https://mcp.cloudflare.com/mcp), Atlassian (https://mcp.atlassian.com/mcp)
2. **Unknown servers**: Use `web_search("{{service}} MCP server URL")` to find the endpoint
3. **Discovery**: Call `run_mcp_tool(server_url)` to see available tools
4. **Execution**: Call `run_mcp_tool(server_url, tool_name, tool_arguments)`
5. **Authentication**: If credentials needed, user will be prompted. When they confirm, retry immediately with same arguments.
### Agent Creation Workflow
When using `create_agent`:
1. Always check `find_library_agent` first for existing solutions
2. Call `create_agent` with description
3. **If `suggested_goal` returned**: Present to user, ask for confirmation, call again with suggested goal if accepted
4. **If `clarifying_questions` returned**: After user answers, call again with original description AND answers in `context` parameter
### Folder Management
Use folder tools (`create_folder`, `list_folders`, `move_agents_to_folder`) to organize agents in the user's library for better discoverability."""

View File

@@ -127,7 +127,6 @@ def create_security_hooks(
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_compact: Callable[[], None] | None = None,
on_stop: Callable[[str, str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -136,15 +135,12 @@ def create_security_hooks(
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
the SDK finishes processing — used to read the JSONL transcript
before the CLI process exits.
on_compact: Callback invoked when SDK starts compacting context.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -311,30 +307,6 @@ def create_security_hooks(
on_compact()
return cast(SyncHookJSONOutput, {})
# --- Stop hook: capture transcript path for stateless resume ---
async def stop_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Capture transcript path when SDK finishes processing.
The Stop hook fires while the CLI process is still alive, giving us
a reliable window to read the JSONL transcript before SIGTERM.
"""
_ = context, tool_use_id
transcript_path = cast(str, input_data.get("transcript_path", ""))
sdk_session_id = cast(str, input_data.get("session_id", ""))
if transcript_path and on_stop:
logger.info(
f"[SDK] Stop hook: transcript_path={transcript_path}, "
f"sdk_session_id={sdk_session_id[:12]}..."
)
on_stop(transcript_path, sdk_session_id)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
@@ -344,9 +316,6 @@ def create_security_hooks(
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
if on_stop is not None:
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
return hooks
except ImportError:
# Fallback for when SDK isn't available - return empty hooks

View File

@@ -12,7 +12,6 @@ import subprocess
import sys
import uuid
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any, cast
import openai
@@ -21,6 +20,9 @@ from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
)
from langfuse import propagate_attributes
@@ -42,6 +44,7 @@ from ..model import (
update_session_title,
upsert_chat_session,
)
from ..prompt_constants import KEY_WORKFLOWS
from ..response_model import (
StreamBaseResponse,
StreamError,
@@ -57,6 +60,7 @@ from ..service import (
_generate_session_title,
_is_langfuse_configured,
)
from ..tools import TOOL_REGISTRY
from ..tools.e2b_sandbox import get_or_create_sandbox
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
from ..tools.workspace_files import get_manager
@@ -74,11 +78,11 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
)
from .transcript_builder import TranscriptBuilder
logger = logging.getLogger(__name__)
config = ChatConfig()
@@ -137,19 +141,6 @@ _setup_langfuse_otel()
_background_tasks: set[asyncio.Task[Any]] = set()
@dataclass
class CapturedTranscript:
"""Info captured by the SDK Stop hook for stateless --resume."""
path: str = ""
sdk_session_id: str = ""
raw_content: str = ""
@property
def available(self) -> bool:
return bool(self.path)
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
@@ -160,8 +151,37 @@ _HEARTBEAT_INTERVAL = 10.0 # seconds
# Appended to the system prompt to inform the agent about available tools.
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
def _generate_tool_documentation() -> str:
"""Auto-generate tool documentation from TOOL_REGISTRY.
This generates a complete list of available tools with their descriptions,
ensuring the documentation stays in sync with the actual tool implementations.
"""
docs = "\n## AVAILABLE TOOLS\n\n"
# Sort tools alphabetically for consistent output
for name in sorted(TOOL_REGISTRY.keys()):
tool = TOOL_REGISTRY[name]
schema = tool.as_openai_tool()
desc = schema["function"].get("description", "No description available")
# Format as bullet list with tool name in code style
docs += f"- **`{name}`**: {desc}\n"
# Add workflow guidance for key tools
docs += KEY_WORKFLOWS
return docs
_SHARED_TOOL_NOTES = """\
### Web search and research
- **`web_search(query)`** — Search the web for current information (uses Claude's
native web search). Use this when you need up-to-date information, facts,
statistics, or current events that are beyond your knowledge cutoff.
- **`web_fetch(url)`** — Retrieve and analyze content from a specific URL.
Use this when you have a specific URL to read (documentation, articles, etc.).
### Sharing files with the user
After saving a file to the persistent workspace with `write_workspace_file`,
share it with the user by embedding the `download_url` from the response in
@@ -451,6 +471,49 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
pass
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
"""Convert SDK content blocks to transcript format.
Handles TextBlock, ToolUseBlock, ToolResultBlock, and ThinkingBlock.
Unknown block types are logged and skipped.
"""
result: list[dict[str, Any]] = []
for block in blocks or []:
if isinstance(block, TextBlock):
result.append({"type": "text", "text": block.text})
elif isinstance(block, ToolUseBlock):
result.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
elif isinstance(block, ToolResultBlock):
result.append(
{
"type": "tool_result",
"tool_use_id": block.tool_use_id,
"content": block.content,
}
)
elif isinstance(block, ThinkingBlock):
result.append(
{
"type": "thinking",
"thinking": block.thinking,
"signature": block.signature,
}
)
else:
logger.warning(
f"[SDK] Unknown content block type: {type(block).__name__}. "
f"This may indicate a new SDK version with additional block types."
)
return result
async def _compress_messages(
messages: list[ChatMessage],
) -> tuple[list[ChatMessage], bool]:
@@ -806,6 +869,11 @@ async def stream_chat_completion_sdk(
user_id=user_id, session_id=session_id, message_length=len(message)
)
# Structured log prefix: [SDK][<session>][T<turn>]
# Turn = number of user messages (1-based), computed AFTER appending the new message.
turn = sum(1 for m in session.messages if m.role == "user")
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
@@ -823,10 +891,11 @@ async def stream_chat_completion_sdk(
message_id = str(uuid.uuid4())
stream_id = str(uuid.uuid4())
stream_completed = False
ended_with_stream_error = False
e2b_sandbox = None
use_resume = False
resume_file: str | None = None
captured_transcript = CapturedTranscript()
transcript_builder = TranscriptBuilder()
sdk_cwd = ""
# Acquire stream lock to prevent concurrent streams to the same session
@@ -841,7 +910,7 @@ async def stream_chat_completion_sdk(
if lock_owner != stream_id:
# Another stream is active
logger.warning(
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
f"{log_prefix} Session already has an active stream: {lock_owner}"
)
yield StreamError(
errorText="Another stream is already active for this session. "
@@ -865,7 +934,7 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
except (ValueError, OSError) as e:
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
yield StreamError(
errorText="Unable to initialize working directory.",
code="sdk_cwd_error",
@@ -909,12 +978,13 @@ async def stream_chat_completion_sdk(
):
return None
try:
return await download_transcript(user_id, session_id)
return await download_transcript(
user_id, session_id, log_prefix=log_prefix
)
except Exception as transcript_err:
logger.warning(
"[SDK] [%s] Transcript download failed, continuing without "
"--resume: %s",
session_id[:12],
"%s Transcript download failed, continuing without " "--resume: %s",
log_prefix,
transcript_err,
)
return None
@@ -926,21 +996,34 @@ async def stream_chat_completion_sdk(
)
use_e2b = e2b_sandbox is not None
system_prompt = base_system_prompt + (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
# Generate tool documentation and append appropriate supplement
tool_docs = _generate_tool_documentation()
system_prompt = (
base_system_prompt
+ tool_docs
+ (
_E2B_TOOL_SUPPLEMENT
if use_e2b
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
)
)
# Process transcript download result
transcript_msg_count = 0
if dl:
is_valid = validate_transcript(dl.content)
dl_lines = dl.content.strip().split("\n") if dl.content else []
logger.info(
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
log_prefix,
len(dl.content),
len(dl_lines),
dl.message_count,
is_valid,
)
if is_valid:
logger.info(
f"[SDK] Transcript available for session {session_id}: "
f"{len(dl.content)}B, msg_count={dl.message_count}"
)
# Load previous FULL context into builder
transcript_builder.load_previous(dl.content, log_prefix=log_prefix)
resume_file = write_transcript_to_tempfile(
dl.content, session_id, sdk_cwd
)
@@ -948,16 +1031,14 @@ async def stream_chat_completion_sdk(
use_resume = True
transcript_msg_count = dl.message_count
logger.debug(
f"[SDK] Using --resume ({len(dl.content)}B, "
f"{log_prefix} Using --resume ({len(dl.content)}B, "
f"msg_count={transcript_msg_count})"
)
else:
logger.warning(
f"[SDK] Transcript downloaded but invalid for {session_id}"
)
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
logger.warning(
f"[SDK] No transcript available for {session_id} "
f"{log_prefix} No transcript available "
f"({len(session.messages)} messages in session)"
)
@@ -979,25 +1060,6 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
# Read the file content immediately — the SDK may clean up
# the file before our finally block runs.
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.path = transcript_path
captured_transcript.sdk_session_id = sdk_session_id
content = read_transcript_file(transcript_path)
if content:
captured_transcript.raw_content = content
logger.info(
f"[SDK] Stop hook: captured {len(content)}B from "
f"{transcript_path}"
)
else:
logger.warning(
f"[SDK] Stop hook: transcript file empty/missing at "
f"{transcript_path}"
)
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
compaction = CompactionTracker()
@@ -1005,7 +1067,6 @@ async def stream_chat_completion_sdk(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
on_stop=_on_stop if config.claude_agent_use_resume else None,
on_compact=compaction.on_compact,
)
@@ -1040,7 +1101,10 @@ async def stream_chat_completion_sdk(
session_id=session_id,
trace_name="copilot-sdk",
tags=["sdk"],
metadata={"resume": str(use_resume)},
metadata={
"resume": str(use_resume),
"conversation_turn": str(turn),
},
)
_otel_ctx.__enter__()
@@ -1074,9 +1138,9 @@ async def stream_chat_completion_sdk(
query_message = f"{query_message}\n\n{attachments.hint}"
logger.info(
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
"%s Sending query — resume=%s, total_msgs=%d, "
"query_len=%d, attached_files=%d, image_blocks=%d",
session_id[:12],
log_prefix,
use_resume,
len(session.messages),
len(query_message),
@@ -1105,8 +1169,13 @@ async def stream_chat_completion_sdk(
await client._transport.write( # noqa: SLF001
json.dumps(user_msg) + "\n"
)
# Capture user message in transcript (multimodal)
transcript_builder.add_user_message(content=content_blocks)
else:
await client.query(query_message, session_id=session_id)
# Capture actual user message in transcript (not the engineered query)
# query_message may include context wrappers, but transcript needs raw input
transcript_builder.add_user_message(content=current_message)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
@@ -1150,8 +1219,8 @@ async def stream_chat_completion_sdk(
sdk_msg = done.pop().result()
except StopAsyncIteration:
logger.info(
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
session_id[:12],
"%s Stream ended normally (StopAsyncIteration)",
log_prefix,
)
break
except Exception as stream_err:
@@ -1160,8 +1229,8 @@ async def stream_chat_completion_sdk(
# so the session can still be saved and the
# frontend gets a clean finish.
logger.error(
"[SDK] [%s] Stream error from SDK: %s",
session_id[:12],
"%s Stream error from SDK: %s",
log_prefix,
stream_err,
exc_info=True,
)
@@ -1173,9 +1242,9 @@ async def stream_chat_completion_sdk(
break
logger.info(
"[SDK] [%s] Received: %s %s "
"%s Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
@@ -1184,6 +1253,15 @@ async def stream_chat_completion_sdk(
len(adapter.resolved_tool_calls),
)
# Capture SDK messages in transcript
if isinstance(sdk_msg, AssistantMessage):
content_blocks = _format_sdk_content_blocks(sdk_msg.content)
model_name = getattr(sdk_msg, "model", "")
transcript_builder.add_assistant_message(
content_blocks=content_blocks,
model=model_name,
)
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
# message can arrive before the hook stashes output.
@@ -1210,10 +1288,10 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0)
else:
logger.warning(
"[SDK] [%s] Timed out waiting for "
"%s Timed out waiting for "
"PostToolUse hook stash "
"(%d unresolved tool calls)",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
@@ -1221,9 +1299,9 @@ async def stream_chat_completion_sdk(
# Log ResultMessage details for debugging
if isinstance(sdk_msg, ResultMessage):
logger.info(
"[SDK] [%s] Received: ResultMessage %s "
"%s Received: ResultMessage %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
sdk_msg.subtype,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
@@ -1232,8 +1310,8 @@ async def stream_chat_completion_sdk(
)
if sdk_msg.subtype in ("error", "error_during_execution"):
logger.error(
"[SDK] [%s] SDK execution failed with error: %s",
session_id[:12],
"%s SDK execution failed with error: %s",
log_prefix,
sdk_msg.result or "(no error message provided)",
)
@@ -1258,8 +1336,8 @@ async def stream_chat_completion_sdk(
out_len = len(str(response.output))
extra = f", output_len={out_len}"
logger.info(
"[SDK] [%s] Tool event: %s, tool=%s%s",
session_id[:12],
"%s Tool event: %s, tool=%s%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
@@ -1268,8 +1346,8 @@ async def stream_chat_completion_sdk(
# Log errors being sent to frontend
if isinstance(response, StreamError):
logger.error(
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
session_id[:12],
"%s Sending error to frontend: %s (code=%s)",
log_prefix,
response.errorText,
response.code,
)
@@ -1314,17 +1392,28 @@ async def stream_chat_completion_sdk(
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
tool_result_content = (
response.output
if isinstance(response.output, str)
else str(response.output)
)
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
content=tool_result_content,
tool_call_id=response.toolCallId,
)
)
# Capture tool result in transcript as user message with tool_result content
transcript_builder.add_user_message(
content=[
{
"type": "tool_result",
"tool_use_id": response.toolCallId,
"content": tool_result_content,
}
]
)
has_tool_results = True
elif isinstance(response, StreamFinish):
@@ -1335,8 +1424,8 @@ async def stream_chat_completion_sdk(
# server shutdown). Log and let the safety-net / finally
# blocks handle cleanup.
logger.warning(
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
session_id[:12],
"%s Streaming loop cancelled (asyncio.CancelledError)",
log_prefix,
)
raise
finally:
@@ -1350,7 +1439,8 @@ async def stream_chat_completion_sdk(
except (asyncio.CancelledError, StopAsyncIteration):
# Expected: task was cancelled or exhausted during cleanup
logger.info(
"[SDK] Pending __anext__ task completed during cleanup"
"%s Pending __anext__ task completed during cleanup",
log_prefix,
)
# Safety net: if tools are still unresolved after the
@@ -1359,9 +1449,9 @@ async def stream_chat_completion_sdk(
# them now so the frontend stops showing spinners.
if adapter.has_unresolved_tool_calls:
logger.warning(
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
"%s %d unresolved tool(s) after stream loop — "
"flushing as safety net",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
)
safety_responses: list[StreamBaseResponse] = []
@@ -1372,8 +1462,8 @@ async def stream_chat_completion_sdk(
(StreamToolInputAvailable, StreamToolOutputAvailable),
):
logger.info(
"[SDK] [%s] Safety flush: %s, tool=%s",
session_id[:12],
"%s Safety flush: %s, tool=%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
)
@@ -1386,8 +1476,8 @@ async def stream_chat_completion_sdk(
# StreamFinish is published by mark_session_completed in the processor.
if not stream_completed and not ended_with_stream_error:
logger.info(
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
session_id[:12],
"%s Stream ended without ResultMessage (stopped by user)",
log_prefix,
)
closing_responses: list[StreamBaseResponse] = []
adapter._end_text_if_open(closing_responses)
@@ -1408,69 +1498,36 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# --- Upload transcript for next-turn --resume ---
# After async with the SDK task group has exited, so the Stop
# hook has already fired and the CLI has been SIGTERMed. The
# CLI uses appendFileSync, so all writes are safely on disk.
if config.claude_agent_use_resume and user_id:
# With --resume the CLI appends to the resume file (most
# complete). Otherwise use the Stop hook path.
if use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
logger.debug("[SDK] Transcript source: resume file")
elif captured_transcript.path:
raw_transcript = read_transcript_file(captured_transcript.path)
logger.debug(
"[SDK] Transcript source: stop hook (%s), read result: %s",
captured_transcript.path,
f"{len(raw_transcript)}B" if raw_transcript else "None",
)
else:
raw_transcript = None
# Transcript upload is handled exclusively in the finally block
# to avoid double-uploads (the success path used to upload the
# old resume file, then the finally block overwrote it with the
# stop hook content — which could be smaller after compaction).
if not raw_transcript:
logger.debug(
"[SDK] No usable transcript — CLI file had no "
"conversation entries (expected for first turn "
"without --resume)"
)
if raw_transcript:
# Shield the upload from generator cancellation so a
# client disconnect / page refresh doesn't lose the
# transcript. The upload must finish even if the SSE
# connection is torn down.
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
)
logger.info(
"[SDK] [%s] Stream completed successfully with %d messages",
session_id[:12],
len(session.messages),
)
if ended_with_stream_error:
logger.warning(
"%s Stream ended with SDK error after %d messages",
log_prefix,
len(session.messages),
)
else:
logger.info(
"%s Stream completed successfully with %d messages",
log_prefix,
len(session.messages),
)
except BaseException as e:
# Catch BaseException to handle both Exception and CancelledError
# (CancelledError inherits from BaseException in Python 3.8+)
if isinstance(e, asyncio.CancelledError):
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
logger.warning("%s Session cancelled", log_prefix)
error_msg = "Operation cancelled"
else:
error_msg = str(e) or type(e).__name__
# SDK cleanup RuntimeError is expected during cancellation, log as warning
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
logger.warning(
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
)
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
else:
logger.error(
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
)
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
# Append error marker to session (non-invasive text parsing approach)
# The finally block will persist the session with this error marker
@@ -1481,8 +1538,8 @@ async def stream_chat_completion_sdk(
)
)
logger.debug(
"[SDK] [%s] Appended error marker, will be persisted in finally",
session_id[:12],
"%s Appended error marker, will be persisted in finally",
log_prefix,
)
# Yield StreamError for immediate feedback (only for non-cancellation errors)
@@ -1514,47 +1571,61 @@ async def stream_chat_completion_sdk(
try:
await asyncio.shield(upsert_chat_session(session))
logger.info(
"[SDK] [%s] Session persisted in finally with %d messages",
session_id[:12],
"%s Session persisted in finally with %d messages",
log_prefix,
len(session.messages),
)
except Exception as persist_err:
logger.error(
"[SDK] [%s] Failed to persist session in finally: %s",
session_id[:12],
"%s Failed to persist session in finally: %s",
log_prefix,
persist_err,
exc_info=True,
)
# --- Upload transcript for next-turn --resume ---
# This MUST run in finally so the transcript is uploaded even when
# the streaming loop raises an exception. The CLI uses
# appendFileSync, so whatever was written before the error/SIGTERM
# is safely on disk and still useful for the next turn.
if config.claude_agent_use_resume and user_id:
# the streaming loop raises an exception.
# The transcript represents the COMPLETE active context (atomic).
if config.claude_agent_use_resume and user_id and session is not None:
try:
# Prefer content captured in the Stop hook (read before
# cleanup removes the file). Fall back to the resume
# file when the stop hook didn't fire (e.g. error before
# completion) so we don't lose the prior transcript.
raw_transcript = captured_transcript.raw_content or None
if not raw_transcript and use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
# Build complete transcript from captured SDK messages
transcript_content = transcript_builder.to_jsonl()
if raw_transcript and session is not None:
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
if not transcript_content:
logger.warning(
"%s No transcript to upload (builder empty)", log_prefix
)
elif not validate_transcript(transcript_content):
logger.warning(
"%s Transcript invalid, skipping upload (entries=%d)",
log_prefix,
transcript_builder.entry_count,
)
else:
logger.warning(f"[SDK] No transcript to upload for {session_id}")
logger.info(
"%s Uploading complete transcript (entries=%d, bytes=%d)",
log_prefix,
transcript_builder.entry_count,
len(transcript_content),
)
# Shield upload from cancellation - let it complete even if
# the finally block is interrupted. No timeout to avoid race
# conditions where backgrounded uploads overwrite newer transcripts.
await asyncio.shield(
upload_transcript(
user_id=user_id,
session_id=session_id,
content=transcript_content,
message_count=len(session.messages),
log_prefix=log_prefix,
)
)
except Exception as upload_err:
logger.error(
f"[SDK] Transcript upload failed in finally: {upload_err}",
"%s Transcript upload failed in finally: %s",
log_prefix,
upload_err,
exc_info=True,
)
@@ -1565,33 +1636,6 @@ async def stream_chat_completion_sdk(
await lock.release()
async def _try_upload_transcript(
user_id: str,
session_id: str,
raw_content: str,
message_count: int = 0,
) -> bool:
"""Strip progress entries and upload transcript (with timeout).
Returns True if the upload completed without error.
"""
try:
async with asyncio.timeout(30):
await upload_transcript(
user_id, session_id, raw_content, message_count=message_count
)
return True
except asyncio.TimeoutError:
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
return False
except Exception as e:
logger.error(
f"[SDK] Failed to upload transcript for {session_id}: {e}",
exc_info=True,
)
return False
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
@@ -1600,7 +1644,7 @@ async def _update_title_async(
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
if title and user_id:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch
import pytest
from .service import _prepare_file_attachments
from .service import _generate_tool_documentation, _prepare_file_attachments
@dataclass
@@ -145,3 +145,94 @@ class TestPrepareFileAttachments:
assert "Read tool" not in result.hint
assert len(result.image_blocks) == 1
class TestGenerateToolDocumentation:
"""Tests for auto-generated tool documentation from TOOL_REGISTRY."""
def test_generate_tool_documentation_structure(self):
"""Test that tool documentation has expected structure."""
docs = _generate_tool_documentation()
# Check main sections exist
assert "## AVAILABLE TOOLS" in docs
assert "## KEY WORKFLOWS" in docs
# Verify no duplicate sections
assert docs.count("## AVAILABLE TOOLS") == 1
assert docs.count("## KEY WORKFLOWS") == 1
def test_tool_documentation_includes_key_tools(self):
"""Test that documentation includes essential copilot tools."""
docs = _generate_tool_documentation()
# Core agent workflow tools
assert "`create_agent`" in docs
assert "`run_agent`" in docs
assert "`find_library_agent`" in docs
assert "`edit_agent`" in docs
# MCP integration
assert "`run_mcp_tool`" in docs
# Browser automation
assert "`browser_navigate`" in docs
# Folder management
assert "`create_folder`" in docs
def test_tool_documentation_format(self):
"""Test that each tool follows bullet list format."""
docs = _generate_tool_documentation()
lines = docs.split("\n")
tool_lines = [line for line in lines if line.strip().startswith("- **`")]
# Should have multiple tools (at least 20 from TOOL_REGISTRY)
assert len(tool_lines) >= 20
# Each tool line should have proper markdown format
for line in tool_lines:
assert line.startswith("- **`"), f"Bad format: {line}"
assert "`**:" in line, f"Missing description separator: {line}"
def test_tool_documentation_includes_workflows(self):
"""Test that key workflow patterns are documented."""
docs = _generate_tool_documentation()
# Check workflow sections
assert "MCP Integration Workflow" in docs
assert "Agent Creation Workflow" in docs
assert "Folder Management" in docs
# Check workflow details
assert "suggested_goal" in docs # Agent creation feedback loop
assert "clarifying_questions" in docs # Agent creation feedback loop
assert "run_mcp_tool(server_url)" in docs # MCP discovery pattern
def test_tool_documentation_completeness(self):
"""Test that all tools from TOOL_REGISTRY appear in documentation."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Verify each registered tool is documented
for tool_name in TOOL_REGISTRY.keys():
assert (
f"`{tool_name}`" in docs
), f"Tool '{tool_name}' missing from auto-generated documentation"
def test_tool_documentation_no_duplicate_tools(self):
"""Test that no tool appears multiple times in the list."""
from backend.copilot.tools import TOOL_REGISTRY
docs = _generate_tool_documentation()
# Extract the tools section (before KEY WORKFLOWS)
tools_section = docs.split("## KEY WORKFLOWS")[0]
# Count occurrences of each tool
for tool_name in TOOL_REGISTRY.keys():
# Count how many times this tool appears as a bullet point
count = tools_section.count(f"- **`{tool_name}`**")
assert count == 1, f"Tool '{tool_name}' appears {count} times (should be 1)"

View File

@@ -10,13 +10,14 @@ Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from backend.util import json
logger = logging.getLogger(__name__)
# UUIDs are hex + hyphens; strip everything else to prevent path injection.
@@ -58,41 +59,37 @@ def strip_progress_entries(content: str) -> str:
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
any remaining child entries so the ``parentUuid`` chain stays intact.
Typically reduces transcript size by ~30%.
Entries that are not stripped or reparented are kept as their original
raw JSON line to avoid unnecessary re-serialization that changes
whitespace or key ordering.
"""
lines = content.strip().split("\n")
entries: list[dict] = []
# Parse entries, keeping the original line alongside the parsed dict.
parsed: list[tuple[str, dict | None]] = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Keep unparseable lines as-is (safety)
entries.append({"_raw": line})
parsed.append((line, json.loads(line, fallback=None)))
# First pass: identify stripped UUIDs and build parent map.
stripped_uuids: set[str] = set()
uuid_to_parent: dict[str, str] = {}
kept: list[dict] = []
for entry in entries:
if "_raw" in entry:
kept.append(entry)
for _line, entry in parsed:
if not isinstance(entry, dict):
continue
uid = entry.get("uuid", "")
parent = entry.get("parentUuid", "")
entry_type = entry.get("type", "")
if uid:
uuid_to_parent[uid] = parent
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
stripped_uuids.add(uid)
if entry_type in STRIPPABLE_TYPES:
if uid:
stripped_uuids.add(uid)
else:
kept.append(entry)
# Reparent: walk up chain through stripped entries to find surviving ancestor
for entry in kept:
if "_raw" in entry:
# Second pass: keep non-stripped entries, reparenting where needed.
# Preserve original line when no reparenting is required.
reparented: set[str] = set()
for _line, entry in parsed:
if not isinstance(entry, dict):
continue
parent = entry.get("parentUuid", "")
original_parent = parent
@@ -100,63 +97,32 @@ def strip_progress_entries(content: str) -> str:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
uid = entry.get("uuid", "")
if uid:
reparented.add(uid)
result_lines: list[str] = []
for entry in kept:
if "_raw" in entry:
result_lines.append(entry["_raw"])
else:
for line, entry in parsed:
if not isinstance(entry, dict):
result_lines.append(line)
continue
if entry.get("type", "") in STRIPPABLE_TYPES:
continue
uid = entry.get("uuid", "")
if uid in reparented:
# Re-serialize only entries whose parentUuid was changed.
result_lines.append(json.dumps(entry, separators=(",", ":")))
else:
result_lines.append(line)
return "\n".join(result_lines) + "\n"
# ---------------------------------------------------------------------------
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
# Local file I/O (write temp file for --resume)
# ---------------------------------------------------------------------------
def read_transcript_file(transcript_path: str) -> str | None:
"""Read a JSONL transcript file from disk.
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
or only contains metadata (≤2 lines with no conversation messages).
"""
if not transcript_path or not os.path.isfile(transcript_path):
logger.debug(f"[Transcript] File not found: {transcript_path}")
return None
try:
with open(transcript_path) as f:
content = f.read()
if not content.strip():
logger.debug("[Transcript] File is empty: %s", transcript_path)
return None
lines = content.strip().split("\n")
# Validate that the transcript has real conversation content
# (not just metadata like queue-operation entries).
if not validate_transcript(content):
logger.debug(
"[Transcript] No conversation content (%d lines) in %s",
len(lines),
transcript_path,
)
return None
logger.info(
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
return None
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
"""Sanitize an ID for safe use in file paths.
@@ -171,14 +137,6 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
def _encode_cwd_for_cli(cwd: str) -> str:
"""Encode a working directory path the same way the Claude CLI does.
The CLI replaces all non-alphanumeric characters with ``-``.
"""
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""Remove the CLI's project directory for a specific working directory.
@@ -188,7 +146,8 @@ def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""
import shutil
cwd_encoded = _encode_cwd_for_cli(sdk_cwd)
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
@@ -248,32 +207,29 @@ def write_transcript_to_tempfile(
def validate_transcript(content: str | None) -> bool:
"""Check that a transcript has actual conversation messages.
A valid transcript for resume needs at least one user message and one
assistant message (not just queue-operation / file-history-snapshot
metadata).
A valid transcript needs at least one assistant message (not just
queue-operation / file-history-snapshot metadata). We do NOT require
a ``type: "user"`` entry because with ``--resume`` the user's message
is passed as a CLI query parameter and does not appear in the
transcript file.
"""
if not content or not content.strip():
return False
lines = content.strip().split("\n")
if len(lines) < 2:
return False
has_user = False
has_assistant = False
for line in lines:
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
has_user = True
elif msg_type == "assistant":
has_assistant = True
except json.JSONDecodeError:
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
return False
if entry.get("type") == "assistant":
has_assistant = True
return has_user and has_assistant
return has_assistant
# ---------------------------------------------------------------------------
@@ -328,58 +284,56 @@ async def upload_transcript(
session_id: str,
content: str,
message_count: int = 0,
log_prefix: str = "[Transcript]",
) -> None:
"""Strip progress entries and upload transcript to bucket storage.
"""Strip progress entries and upload complete transcript.
Safety: only overwrites when the new (stripped) transcript is larger than
what is already stored. Since JSONL is append-only, the latest transcript
is always the longest. This prevents a slow/stale background task from
clobbering a newer upload from a concurrent turn.
The transcript represents the FULL active context (atomic).
Each upload REPLACES the previous transcript entirely.
The executor holds a cluster lock per session, so concurrent uploads for
the same session cannot happen.
Args:
message_count: ``len(session.messages)`` at upload time — used by
the next turn to detect staleness and compress only the gap.
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
entry = json.loads(line, fallback={"type": "INVALID_JSON"})
entry_types.append(entry.get("type", "?"))
logger.warning(
f"[Transcript] Skipping upload — stripped content not valid "
f"for session {session_id}"
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
log_prefix,
entry_types,
len(stripped),
len(content),
)
logger.debug("%s Raw content preview: %s", log_prefix, content[:500])
logger.debug("%s Stripped content: %s", log_prefix, stripped[:500])
return
storage = await get_workspace_storage()
wid, fid, fname = _storage_path_parts(user_id, session_id)
encoded = stripped.encode("utf-8")
new_size = len(encoded)
# Check existing transcript size to avoid overwriting newer with older
path = _build_storage_path(user_id, session_id, storage)
content_skipped = False
try:
existing = await storage.retrieve(path)
if len(existing) >= new_size:
logger.info(
f"[Transcript] Skipping content upload — existing ({len(existing)}B) "
f">= new ({new_size}B) for session {session_id}"
)
content_skipped = True
except (FileNotFoundError, Exception):
pass # No existing transcript or retrieval error — proceed with upload
await storage.store(
workspace_id=wid,
file_id=fid,
filename=fname,
content=encoded,
)
if not content_skipped:
await storage.store(
workspace_id=wid,
file_id=fid,
filename=fname,
content=encoded,
)
# Always update metadata (even when content is skipped) so message_count
# stays current. The gap-fill logic in _build_query_message relies on
# message_count to avoid re-compressing the same messages every turn.
# Update metadata so message_count stays current. The gap-fill logic
# in _build_query_message relies on it to avoid re-compressing messages.
try:
meta = {"message_count": message_count, "uploaded_at": time.time()}
mwid, mfid, mfname = _meta_storage_path_parts(user_id, session_id)
@@ -390,18 +344,18 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.info(
f"[Transcript] Uploaded {new_size}B "
f"(stripped from {len(content)}B, msg_count={message_count}, "
f"content_skipped={content_skipped}) "
f"for session {session_id}"
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
)
async def download_transcript(
user_id: str, session_id: str
user_id: str,
session_id: str,
log_prefix: str = "[Transcript]",
) -> TranscriptDownload | None:
"""Download transcript and metadata from bucket storage.
@@ -417,10 +371,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
logger.debug(f"{log_prefix} No transcript in storage")
return None
except Exception as e:
logger.warning(f"[Transcript] Failed to download transcript: {e}")
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -437,16 +391,13 @@ async def download_transcript(
meta_path = f"local://{mwid}/{mfid}/{mfname}"
meta_data = await storage.retrieve(meta_path)
meta = json.loads(meta_data.decode("utf-8"))
meta = json.loads(meta_data.decode("utf-8"), fallback={})
message_count = meta.get("message_count", 0)
uploaded_at = meta.get("uploaded_at", 0.0)
except (FileNotFoundError, json.JSONDecodeError, Exception):
except (FileNotFoundError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
logger.info(
f"[Transcript] Downloaded {len(content)}B "
f"(msg_count={message_count}) for session {session_id}"
)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
return TranscriptDownload(
content=content,
message_count=message_count,

View File

@@ -0,0 +1,150 @@
"""Build complete JSONL transcript from SDK messages.
The transcript represents the FULL active context at any point in time.
Each upload REPLACES the previous transcript atomically.
Flow:
Turn 1: Upload [msg1, msg2]
Turn 2: Download [msg1, msg2] → Upload [msg1, msg2, msg3, msg4] (REPLACE)
Turn 3: Download [msg1, msg2, msg3, msg4] → Upload [all messages] (REPLACE)
The transcript is never incremental - always the complete atomic state.
"""
import logging
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
from backend.util import json
from .transcript import STRIPPABLE_TYPES
logger = logging.getLogger(__name__)
class TranscriptEntry(BaseModel):
"""Single transcript entry (user or assistant turn)."""
type: str
uuid: str
parentUuid: str | None
message: dict[str, Any]
class TranscriptBuilder:
"""Build complete JSONL transcript from SDK messages.
This builder maintains the FULL conversation state, not incremental changes.
The output is always the complete active context.
"""
def __init__(self) -> None:
self._entries: list[TranscriptEntry] = []
self._last_uuid: str | None = None
def load_previous(self, content: str, log_prefix: str = "[Transcript]") -> None:
"""Load complete previous transcript.
This loads the FULL previous context. As new messages come in,
we append to this state. The final output is the complete context
(previous + new), not just the delta.
"""
if not content or not content.strip():
return
lines = content.strip().split("\n")
for line_num, line in enumerate(lines, 1):
if not line.strip():
continue
data = json.loads(line, fallback=None)
if data is None:
logger.warning(
"%s Failed to parse transcript line %d/%d",
log_prefix,
line_num,
len(lines),
)
continue
# Load all non-strippable entries (user/assistant/system/etc.)
# Skip only STRIPPABLE_TYPES to match strip_progress_entries() behavior
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
continue
entry = TranscriptEntry(
type=data["type"],
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
message=data.get("message", {}),
)
self._entries.append(entry)
self._last_uuid = entry.uuid
logger.info(
"%s Loaded %d entries from previous transcript (last_uuid=%s)",
log_prefix,
len(self._entries),
self._last_uuid[:12] if self._last_uuid else None,
)
def add_user_message(
self, content: str | list[dict], uuid: str | None = None
) -> None:
"""Add user message to the complete context."""
msg_uuid = uuid or str(uuid4())
self._entries.append(
TranscriptEntry(
type="user",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={"role": "user", "content": content},
)
)
self._last_uuid = msg_uuid
def add_assistant_message(
self, content_blocks: list[dict], model: str = ""
) -> None:
"""Add assistant message to the complete context."""
msg_uuid = str(uuid4())
self._entries.append(
TranscriptEntry(
type="assistant",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={
"role": "assistant",
"model": model,
"content": content_blocks,
},
)
)
self._last_uuid = msg_uuid
def to_jsonl(self) -> str:
"""Export complete context as JSONL.
Returns the FULL conversation state (all entries), not incremental.
This output REPLACES any previous transcript.
"""
if not self._entries:
return ""
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
return "\n".join(lines) + "\n"
@property
def entry_count(self) -> int:
"""Total number of entries in the complete context."""
return len(self._entries)
@property
def is_empty(self) -> bool:
"""Whether this builder has any entries."""
return len(self._entries) == 0

View File

@@ -1,11 +1,11 @@
"""Unit tests for JSONL transcript management utilities."""
import json
import os
from backend.util import json
from .transcript import (
STRIPPABLE_TYPES,
read_transcript_file,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
@@ -38,49 +38,6 @@ PROGRESS_ENTRY = {
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
# --- read_transcript_file ---
class TestReadTranscriptFile:
def test_returns_content_for_valid_file(self, tmp_path):
path = tmp_path / "session.jsonl"
path.write_text(VALID_TRANSCRIPT)
result = read_transcript_file(str(path))
assert result is not None
assert "user" in result
def test_returns_none_for_missing_file(self):
assert read_transcript_file("/nonexistent/path.jsonl") is None
def test_returns_none_for_empty_path(self):
assert read_transcript_file("") is None
def test_returns_none_for_empty_file(self, tmp_path):
path = tmp_path / "empty.jsonl"
path.write_text("")
assert read_transcript_file(str(path)) is None
def test_returns_none_for_metadata_only(self, tmp_path):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
path = tmp_path / "meta.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
def test_no_size_limit(self, tmp_path):
"""Large files are accepted — bucket storage has no size limit."""
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
result = read_transcript_file(str(path))
assert result is not None
# --- write_transcript_to_tempfile ---
@@ -155,12 +112,56 @@ class TestValidateTranscript:
assert validate_transcript(content) is False
def test_assistant_only_no_user(self):
"""With --resume the user message is a CLI query param, not a transcript entry.
A transcript with only assistant entries is valid."""
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
assert validate_transcript(content) is False
assert validate_transcript(content) is True
def test_resume_transcript_without_user_entry(self):
"""Simulates a real --resume stop hook transcript: the CLI session file
has summary + assistant entries but no user entry."""
summary = {"type": "summary", "uuid": "s1", "text": "context..."}
asst1 = {
"type": "assistant",
"uuid": "a1",
"message": {"role": "assistant", "content": "Hello!"},
}
asst2 = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "a1",
"message": {"role": "assistant", "content": "Sure, let me help."},
}
content = _make_jsonl(summary, asst1, asst2)
assert validate_transcript(content) is True
def test_single_assistant_entry(self):
"""A transcript with just one assistant line is valid — the CLI may
produce short transcripts for simple responses with no tool use."""
content = json.dumps(ASST_MSG) + "\n"
assert validate_transcript(content) is True
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False
def test_malformed_json_after_valid_assistant_returns_false(self):
"""Validation must scan all lines - malformed JSON anywhere should fail."""
valid_asst = json.dumps(ASST_MSG)
malformed = "not valid json"
content = valid_asst + "\n" + malformed + "\n"
assert validate_transcript(content) is False
def test_blank_lines_are_skipped(self):
"""Transcripts with blank lines should be valid if they contain assistant entries."""
content = (
json.dumps(USER_MSG)
+ "\n\n" # blank line
+ json.dumps(ASST_MSG)
+ "\n"
+ "\n" # another blank line
)
assert validate_transcript(content) is True
# --- strip_progress_entries ---
@@ -253,3 +254,31 @@ class TestStripProgressEntries:
assert "queue-operation" not in result_types
assert "user" in result_types
assert "assistant" in result_types
def test_preserves_original_line_formatting(self):
"""Non-reparented entries keep their original JSON formatting."""
# orjson produces compact JSON - test that we preserve the exact input
# when no reparenting is needed (no re-serialization)
original_line = json.dumps(USER_MSG)
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
result = strip_progress_entries(content)
result_lines = result.strip().split("\n")
# Original line should be byte-identical (not re-serialized)
assert result_lines[0] == original_line
def test_reparented_entries_are_reserialized(self):
"""Entries whose parentUuid changes must be re-serialized."""
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p1",
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, progress, asst)
result = strip_progress_entries(content)
lines = result.strip().split("\n")
asst_entry = json.loads(lines[-1])
assert asst_entry["parentUuid"] == "u1" # reparented

View File

@@ -34,8 +34,9 @@ client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# Default system prompt used when Langfuse is not configured
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
# Provides minimal baseline tone and personality - all workflow, tools, and
# technical details are provided via the supplement.
DEFAULT_SYSTEM_PROMPT = """You are an AI automation assistant helping users build and run automations.
Here is everything you know about the current user from previous interactions:
@@ -43,113 +44,12 @@ Here is everything you know about the current user from previous interactions:
{users_information}
</users_information>
## YOUR CORE MANDATE
Your goal is to help users automate tasks by:
- Understanding their needs and business context
- Building and running working automations
- Delivering tangible value through action, not just explanation
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- **For live integrations** (read a GitHub repo, query a database, post to Slack, etc.) consider `run_mcp_tool` — it connects directly to external services without building a full agent
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
**MCP (Model Context Protocol) Servers:**
- `run_mcp_tool`: Connect to any MCP server to discover and run its tools
**Two-step flow:**
1. `run_mcp_tool(server_url)` → returns a list of available tools. Each tool has `name`, `description`, and `input_schema` (JSON Schema). Read `input_schema.properties` to understand what arguments are needed.
2. `run_mcp_tool(server_url, tool_name, tool_arguments)` → executes the tool. Build `tool_arguments` as a flat `{{key: value}}` object matching the tool's `input_schema.properties`.
**Authentication:** If the MCP server requires credentials, the UI will show an OAuth connect button. Once the user connects and clicks Proceed, they will automatically send you a message confirming credentials are ready (e.g. "I've connected the MCP server credentials. Please retry run_mcp_tool..."). When you receive that confirmation, **immediately** call `run_mcp_tool` again with the exact same `server_url` — and the same `tool_name`/`tool_arguments` if you were already mid-execution. Do not ask the user what to do next; just retry.
**Finding server URLs (fastest → slowest):**
1. **Known hosted servers** — use directly, no lookup:
- Notion: `https://mcp.notion.com/mcp`
- Linear: `https://mcp.linear.app/mcp`
- Stripe: `https://mcp.stripe.com`
- Intercom: `https://mcp.intercom.com/mcp`
- Cloudflare: `https://mcp.cloudflare.com/mcp`
- Atlassian (Jira/Confluence): `https://mcp.atlassian.com/mcp`
2. **`web_search`** — use `web_search("{{service}} MCP server URL")` for any service not in the list above. This is the fastest way to find unlisted servers.
3. **Registry API** — `web_fetch("https://registry.modelcontextprotocol.io/v0.1/servers?search={{query}}&limit=10")` to browse what's available. Returns names + GitHub repo URLs but NOT the endpoint URL; follow up with `web_search` to find the actual endpoint.
- **Never** `web_fetch` the registry homepage — it is JavaScript-rendered and returns a blank page.
**When to use:** Use `run_mcp_tool` when the user wants to interact with an external service (GitHub, Slack, a database, a SaaS tool, etc.) via its MCP integration. Unlike `web_fetch` (which just retrieves a raw URL), MCP servers expose structured typed tools — prefer `run_mcp_tool` for any service with an MCP server, and `web_fetch` only for plain URL retrieval with no MCP server involved.
**CRITICAL**: `run_mcp_tool` is **always available** in your tool list. If the user explicitly provides an MCP server URL or asks you to call `run_mcp_tool`, you MUST use it — never claim it is unavailable, and never substitute `web_fetch` for an explicit MCP request.
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
- **For MCP integrations**: Known URL (see list) or `web_search("{{service}} MCP server URL")` → `run_mcp_tool(server_url)` → `run_mcp_tool(server_url, tool_name, tool_arguments)`. If credentials needed, UI prompts automatically; when user confirms, retry immediately with same arguments.
**Handle Feedback Loops:**
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
- Don't ask redundant questions if the user has already provided context in the conversation
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
Be concise, proactive, and action-oriented. Bias toward showing working solutions over lengthy explanations."""
# ---------------------------------------------------------------------------

View File

@@ -15,6 +15,7 @@ from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data import workspace as workspace_db
# Import dynamic field utilities from centralized location
from backend.data.block import BlockInput, BlockOutputEntry
@@ -32,7 +33,6 @@ from backend.data.execution import (
from backend.data.graph import GraphModel, Node
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.workspace import get_or_create_workspace
from backend.util.clients import (
get_async_execution_event_bus,
get_async_execution_queue,
@@ -831,8 +831,9 @@ async def add_graph_execution(
udb = user_db
gdb = graph_db
odb = onboarding_db
wdb = workspace_db
else:
edb = udb = gdb = odb = get_database_manager_async_client()
edb = udb = gdb = odb = wdb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
@@ -892,7 +893,7 @@ async def add_graph_execution(
if execution_context is None:
user = await udb.get_user_by_id(user_id)
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
workspace = await get_or_create_workspace(user_id)
workspace = await wdb.get_or_create_workspace(user_id)
execution_context = ExecutionContext(
# Execution identity

View File

@@ -368,12 +368,10 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
# Setup mock returns
# The function returns (graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip)
@@ -649,12 +647,10 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_wdb = mocker.patch("backend.executor.utils.workspace_db")
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
mock_wdb.get_or_create_workspace = mocker.AsyncMock(return_value=mock_workspace)
# Setup returns - include nodes_to_skip in the tuple
mock_validate.return_value = (

View File

@@ -72,19 +72,58 @@ def dumps(
T = TypeVar("T")
@overload
def loads(data: str | bytes, *args, target_type: Type[T], **kwargs) -> T: ...
# Sentinel value to detect when fallback is not provided
_NO_FALLBACK = object()
@overload
def loads(data: str | bytes, *args, **kwargs) -> Any: ...
def loads(
data: str | bytes, *args, target_type: Type[T], fallback: T | None = None, **kwargs
) -> T:
pass
@overload
def loads(data: str | bytes, *args, fallback: Any = None, **kwargs) -> Any:
pass
def loads(
data: str | bytes, *args, target_type: Type[T] | None = None, **kwargs
data: str | bytes,
*args,
target_type: Type[T] | None = None,
fallback: Any = _NO_FALLBACK,
**kwargs,
) -> Any:
parsed = orjson.loads(data)
"""Parse JSON with optional fallback on decode errors.
Args:
data: JSON string or bytes to parse
target_type: Optional type to validate/cast result to
fallback: Value to return on JSONDecodeError. If not provided, raises.
**kwargs: Additional arguments (unused, for compatibility)
Returns:
Parsed JSON data, or fallback value if parsing fails
Raises:
orjson.JSONDecodeError: Only if fallback is not provided
Examples:
>>> loads('{"valid": "json"}')
{'valid': 'json'}
>>> loads('invalid json', fallback=None)
None
>>> loads('invalid json', fallback={})
{}
>>> loads('invalid json') # raises orjson.JSONDecodeError
"""
try:
parsed = orjson.loads(data)
except orjson.JSONDecodeError:
if fallback is not _NO_FALLBACK:
return fallback
raise
if target_type:
return type_match(parsed, target_type)

View File

@@ -1,10 +1,11 @@
"use server";
import { postV1GetOrCreateUser } from "@/app/api/__generated__/endpoints/auth/auth";
import { getOnboardingStatus, resolveResponse } from "@/app/api/helpers";
import { getServerSupabase } from "@/lib/supabase/server/getServerSupabase";
import { signupFormSchema } from "@/types/auth";
import * as Sentry from "@sentry/nextjs";
import { isWaitlistError, logWaitlistError } from "../../api/auth/utils";
import { getOnboardingStatus } from "../../api/helpers";
export async function signup(
email: string,
@@ -57,6 +58,17 @@ export async function signup(
await supabase.auth.setSession(data.session);
}
try {
await resolveResponse(postV1GetOrCreateUser());
} catch (createUserError) {
console.error("Error creating user during signup:", createUserError);
Sentry.captureException(createUserError);
return {
success: false,
error: "Failed to complete account setup. Please try again.",
};
}
// Get onboarding status from backend (includes chat flag evaluated for this user)
const { shouldShowOnboarding } = await getOnboardingStatus();
const next = shouldShowOnboarding ? "/onboarding" : "/";