mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-03-17 03:00:27 -04:00
Compare commits
25 Commits
dev
...
hotfix/tra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9123178556 | ||
|
|
b8c65e3d2b | ||
|
|
0eddb6f1bb | ||
|
|
042ed42c0b | ||
|
|
eadd67c70c | ||
|
|
ad7044995e | ||
|
|
203bf2ca32 | ||
|
|
a1e0caa983 | ||
|
|
440a06ad9e | ||
|
|
8ec706c125 | ||
|
|
6c83a91ae3 | ||
|
|
f19a423cae | ||
|
|
87258441f2 | ||
|
|
494978319e | ||
|
|
9135969c34 | ||
|
|
8625a82495 | ||
|
|
c17f19317b | ||
|
|
fc48944b56 | ||
|
|
2f57c1499c | ||
|
|
7042fcecdf | ||
|
|
3e45a28307 | ||
|
|
81aea5dc52 | ||
|
|
60f950c719 | ||
|
|
0b9e0665dd | ||
|
|
f6f268a1f0 |
@@ -127,7 +127,6 @@ def create_security_hooks(
|
|||||||
sdk_cwd: str | None = None,
|
sdk_cwd: str | None = None,
|
||||||
max_subtasks: int = 3,
|
max_subtasks: int = 3,
|
||||||
on_compact: Callable[[], None] | None = None,
|
on_compact: Callable[[], None] | None = None,
|
||||||
on_stop: Callable[[str, str], None] | None = None,
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Create the security hooks configuration for Claude Agent SDK.
|
"""Create the security hooks configuration for Claude Agent SDK.
|
||||||
|
|
||||||
@@ -136,15 +135,12 @@ def create_security_hooks(
|
|||||||
- PostToolUse: Log successful tool executions
|
- PostToolUse: Log successful tool executions
|
||||||
- PostToolUseFailure: Log and handle failed tool executions
|
- PostToolUseFailure: Log and handle failed tool executions
|
||||||
- PreCompact: Log context compaction events (SDK handles compaction automatically)
|
- PreCompact: Log context compaction events (SDK handles compaction automatically)
|
||||||
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
user_id: Current user ID for isolation validation
|
user_id: Current user ID for isolation validation
|
||||||
sdk_cwd: SDK working directory for workspace-scoped tool validation
|
sdk_cwd: SDK working directory for workspace-scoped tool validation
|
||||||
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
|
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
|
||||||
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
|
on_compact: Callback invoked when SDK starts compacting context.
|
||||||
the SDK finishes processing — used to read the JSONL transcript
|
|
||||||
before the CLI process exits.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Hooks configuration dict for ClaudeAgentOptions
|
Hooks configuration dict for ClaudeAgentOptions
|
||||||
@@ -311,30 +307,6 @@ def create_security_hooks(
|
|||||||
on_compact()
|
on_compact()
|
||||||
return cast(SyncHookJSONOutput, {})
|
return cast(SyncHookJSONOutput, {})
|
||||||
|
|
||||||
# --- Stop hook: capture transcript path for stateless resume ---
|
|
||||||
async def stop_hook(
|
|
||||||
input_data: HookInput,
|
|
||||||
tool_use_id: str | None,
|
|
||||||
context: HookContext,
|
|
||||||
) -> SyncHookJSONOutput:
|
|
||||||
"""Capture transcript path when SDK finishes processing.
|
|
||||||
|
|
||||||
The Stop hook fires while the CLI process is still alive, giving us
|
|
||||||
a reliable window to read the JSONL transcript before SIGTERM.
|
|
||||||
"""
|
|
||||||
_ = context, tool_use_id
|
|
||||||
transcript_path = cast(str, input_data.get("transcript_path", ""))
|
|
||||||
sdk_session_id = cast(str, input_data.get("session_id", ""))
|
|
||||||
|
|
||||||
if transcript_path and on_stop:
|
|
||||||
logger.info(
|
|
||||||
f"[SDK] Stop hook: transcript_path={transcript_path}, "
|
|
||||||
f"sdk_session_id={sdk_session_id[:12]}..."
|
|
||||||
)
|
|
||||||
on_stop(transcript_path, sdk_session_id)
|
|
||||||
|
|
||||||
return cast(SyncHookJSONOutput, {})
|
|
||||||
|
|
||||||
hooks: dict[str, Any] = {
|
hooks: dict[str, Any] = {
|
||||||
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
|
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
|
||||||
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
|
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
|
||||||
@@ -344,9 +316,6 @@ def create_security_hooks(
|
|||||||
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
|
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
|
||||||
}
|
}
|
||||||
|
|
||||||
if on_stop is not None:
|
|
||||||
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
|
|
||||||
|
|
||||||
return hooks
|
return hooks
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Fallback for when SDK isn't available - return empty hooks
|
# Fallback for when SDK isn't available - return empty hooks
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import subprocess
|
|||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import AsyncGenerator
|
from collections.abc import AsyncGenerator
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
import openai
|
import openai
|
||||||
@@ -21,6 +20,9 @@ from claude_agent_sdk import (
|
|||||||
ClaudeAgentOptions,
|
ClaudeAgentOptions,
|
||||||
ClaudeSDKClient,
|
ClaudeSDKClient,
|
||||||
ResultMessage,
|
ResultMessage,
|
||||||
|
TextBlock,
|
||||||
|
ThinkingBlock,
|
||||||
|
ToolResultBlock,
|
||||||
ToolUseBlock,
|
ToolUseBlock,
|
||||||
)
|
)
|
||||||
from langfuse import propagate_attributes
|
from langfuse import propagate_attributes
|
||||||
@@ -74,11 +76,11 @@ from .tool_adapter import (
|
|||||||
from .transcript import (
|
from .transcript import (
|
||||||
cleanup_cli_project_dir,
|
cleanup_cli_project_dir,
|
||||||
download_transcript,
|
download_transcript,
|
||||||
read_transcript_file,
|
|
||||||
upload_transcript,
|
upload_transcript,
|
||||||
validate_transcript,
|
validate_transcript,
|
||||||
write_transcript_to_tempfile,
|
write_transcript_to_tempfile,
|
||||||
)
|
)
|
||||||
|
from .transcript_builder import TranscriptBuilder
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
config = ChatConfig()
|
config = ChatConfig()
|
||||||
@@ -137,19 +139,6 @@ _setup_langfuse_otel()
|
|||||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class CapturedTranscript:
|
|
||||||
"""Info captured by the SDK Stop hook for stateless --resume."""
|
|
||||||
|
|
||||||
path: str = ""
|
|
||||||
sdk_session_id: str = ""
|
|
||||||
raw_content: str = ""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def available(self) -> bool:
|
|
||||||
return bool(self.path)
|
|
||||||
|
|
||||||
|
|
||||||
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
|
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
|
||||||
|
|
||||||
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
|
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
|
||||||
@@ -451,6 +440,43 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
|
||||||
|
"""Convert SDK content blocks to transcript format.
|
||||||
|
|
||||||
|
Handles TextBlock, ToolUseBlock, ToolResultBlock, and ThinkingBlock.
|
||||||
|
"""
|
||||||
|
result: list[dict[str, Any]] = []
|
||||||
|
for block in blocks or []:
|
||||||
|
if isinstance(block, TextBlock):
|
||||||
|
result.append({"type": "text", "text": block.text})
|
||||||
|
elif isinstance(block, ToolUseBlock):
|
||||||
|
result.append(
|
||||||
|
{
|
||||||
|
"type": "tool_use",
|
||||||
|
"id": block.id,
|
||||||
|
"name": block.name,
|
||||||
|
"input": block.input,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
elif isinstance(block, ToolResultBlock):
|
||||||
|
result.append(
|
||||||
|
{
|
||||||
|
"type": "tool_result",
|
||||||
|
"tool_use_id": block.tool_use_id,
|
||||||
|
"content": block.content,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
elif isinstance(block, ThinkingBlock):
|
||||||
|
result.append(
|
||||||
|
{
|
||||||
|
"type": "thinking",
|
||||||
|
"thinking": block.thinking,
|
||||||
|
"signature": block.signature,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def _compress_messages(
|
async def _compress_messages(
|
||||||
messages: list[ChatMessage],
|
messages: list[ChatMessage],
|
||||||
) -> tuple[list[ChatMessage], bool]:
|
) -> tuple[list[ChatMessage], bool]:
|
||||||
@@ -806,6 +832,11 @@ async def stream_chat_completion_sdk(
|
|||||||
user_id=user_id, session_id=session_id, message_length=len(message)
|
user_id=user_id, session_id=session_id, message_length=len(message)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Structured log prefix: [SDK][<session>][T<turn>]
|
||||||
|
# Turn = number of user messages (1-based), computed AFTER appending the new message.
|
||||||
|
turn = sum(1 for m in session.messages if m.role == "user")
|
||||||
|
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
|
||||||
|
|
||||||
session = await upsert_chat_session(session)
|
session = await upsert_chat_session(session)
|
||||||
|
|
||||||
# Generate title for new sessions (first user message)
|
# Generate title for new sessions (first user message)
|
||||||
@@ -823,10 +854,11 @@ async def stream_chat_completion_sdk(
|
|||||||
message_id = str(uuid.uuid4())
|
message_id = str(uuid.uuid4())
|
||||||
stream_id = str(uuid.uuid4())
|
stream_id = str(uuid.uuid4())
|
||||||
stream_completed = False
|
stream_completed = False
|
||||||
|
ended_with_stream_error = False
|
||||||
e2b_sandbox = None
|
e2b_sandbox = None
|
||||||
use_resume = False
|
use_resume = False
|
||||||
resume_file: str | None = None
|
resume_file: str | None = None
|
||||||
captured_transcript = CapturedTranscript()
|
transcript_builder = TranscriptBuilder()
|
||||||
sdk_cwd = ""
|
sdk_cwd = ""
|
||||||
|
|
||||||
# Acquire stream lock to prevent concurrent streams to the same session
|
# Acquire stream lock to prevent concurrent streams to the same session
|
||||||
@@ -841,7 +873,7 @@ async def stream_chat_completion_sdk(
|
|||||||
if lock_owner != stream_id:
|
if lock_owner != stream_id:
|
||||||
# Another stream is active
|
# Another stream is active
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
|
f"{log_prefix} Session already has an active stream: {lock_owner}"
|
||||||
)
|
)
|
||||||
yield StreamError(
|
yield StreamError(
|
||||||
errorText="Another stream is already active for this session. "
|
errorText="Another stream is already active for this session. "
|
||||||
@@ -865,7 +897,7 @@ async def stream_chat_completion_sdk(
|
|||||||
sdk_cwd = _make_sdk_cwd(session_id)
|
sdk_cwd = _make_sdk_cwd(session_id)
|
||||||
os.makedirs(sdk_cwd, exist_ok=True)
|
os.makedirs(sdk_cwd, exist_ok=True)
|
||||||
except (ValueError, OSError) as e:
|
except (ValueError, OSError) as e:
|
||||||
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
|
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
|
||||||
yield StreamError(
|
yield StreamError(
|
||||||
errorText="Unable to initialize working directory.",
|
errorText="Unable to initialize working directory.",
|
||||||
code="sdk_cwd_error",
|
code="sdk_cwd_error",
|
||||||
@@ -909,12 +941,13 @@ async def stream_chat_completion_sdk(
|
|||||||
):
|
):
|
||||||
return None
|
return None
|
||||||
try:
|
try:
|
||||||
return await download_transcript(user_id, session_id)
|
return await download_transcript(
|
||||||
|
user_id, session_id, log_prefix=log_prefix
|
||||||
|
)
|
||||||
except Exception as transcript_err:
|
except Exception as transcript_err:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[SDK] [%s] Transcript download failed, continuing without "
|
"%s Transcript download failed, continuing without " "--resume: %s",
|
||||||
"--resume: %s",
|
log_prefix,
|
||||||
session_id[:12],
|
|
||||||
transcript_err,
|
transcript_err,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
@@ -936,11 +969,18 @@ async def stream_chat_completion_sdk(
|
|||||||
transcript_msg_count = 0
|
transcript_msg_count = 0
|
||||||
if dl:
|
if dl:
|
||||||
is_valid = validate_transcript(dl.content)
|
is_valid = validate_transcript(dl.content)
|
||||||
if is_valid:
|
dl_lines = dl.content.strip().split("\n") if dl.content else []
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[SDK] Transcript available for session {session_id}: "
|
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
|
||||||
f"{len(dl.content)}B, msg_count={dl.message_count}"
|
log_prefix,
|
||||||
|
len(dl.content),
|
||||||
|
len(dl_lines),
|
||||||
|
dl.message_count,
|
||||||
|
is_valid,
|
||||||
)
|
)
|
||||||
|
if is_valid:
|
||||||
|
# Load previous FULL context into builder
|
||||||
|
transcript_builder.load_previous(dl.content)
|
||||||
resume_file = write_transcript_to_tempfile(
|
resume_file = write_transcript_to_tempfile(
|
||||||
dl.content, session_id, sdk_cwd
|
dl.content, session_id, sdk_cwd
|
||||||
)
|
)
|
||||||
@@ -948,16 +988,14 @@ async def stream_chat_completion_sdk(
|
|||||||
use_resume = True
|
use_resume = True
|
||||||
transcript_msg_count = dl.message_count
|
transcript_msg_count = dl.message_count
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"[SDK] Using --resume ({len(dl.content)}B, "
|
f"{log_prefix} Using --resume ({len(dl.content)}B, "
|
||||||
f"msg_count={transcript_msg_count})"
|
f"msg_count={transcript_msg_count})"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
|
||||||
f"[SDK] Transcript downloaded but invalid for {session_id}"
|
|
||||||
)
|
|
||||||
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[SDK] No transcript available for {session_id} "
|
f"{log_prefix} No transcript available "
|
||||||
f"({len(session.messages)} messages in session)"
|
f"({len(session.messages)} messages in session)"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -979,25 +1017,6 @@ async def stream_chat_completion_sdk(
|
|||||||
|
|
||||||
sdk_model = _resolve_sdk_model()
|
sdk_model = _resolve_sdk_model()
|
||||||
|
|
||||||
# --- Transcript capture via Stop hook ---
|
|
||||||
# Read the file content immediately — the SDK may clean up
|
|
||||||
# the file before our finally block runs.
|
|
||||||
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
|
|
||||||
captured_transcript.path = transcript_path
|
|
||||||
captured_transcript.sdk_session_id = sdk_session_id
|
|
||||||
content = read_transcript_file(transcript_path)
|
|
||||||
if content:
|
|
||||||
captured_transcript.raw_content = content
|
|
||||||
logger.info(
|
|
||||||
f"[SDK] Stop hook: captured {len(content)}B from "
|
|
||||||
f"{transcript_path}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
f"[SDK] Stop hook: transcript file empty/missing at "
|
|
||||||
f"{transcript_path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
|
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
|
||||||
compaction = CompactionTracker()
|
compaction = CompactionTracker()
|
||||||
|
|
||||||
@@ -1005,7 +1024,6 @@ async def stream_chat_completion_sdk(
|
|||||||
user_id,
|
user_id,
|
||||||
sdk_cwd=sdk_cwd,
|
sdk_cwd=sdk_cwd,
|
||||||
max_subtasks=config.claude_agent_max_subtasks,
|
max_subtasks=config.claude_agent_max_subtasks,
|
||||||
on_stop=_on_stop if config.claude_agent_use_resume else None,
|
|
||||||
on_compact=compaction.on_compact,
|
on_compact=compaction.on_compact,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1040,7 +1058,10 @@ async def stream_chat_completion_sdk(
|
|||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
trace_name="copilot-sdk",
|
trace_name="copilot-sdk",
|
||||||
tags=["sdk"],
|
tags=["sdk"],
|
||||||
metadata={"resume": str(use_resume)},
|
metadata={
|
||||||
|
"resume": str(use_resume),
|
||||||
|
"conversation_turn": str(turn),
|
||||||
|
},
|
||||||
)
|
)
|
||||||
_otel_ctx.__enter__()
|
_otel_ctx.__enter__()
|
||||||
|
|
||||||
@@ -1074,9 +1095,9 @@ async def stream_chat_completion_sdk(
|
|||||||
query_message = f"{query_message}\n\n{attachments.hint}"
|
query_message = f"{query_message}\n\n{attachments.hint}"
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
|
"%s Sending query — resume=%s, total_msgs=%d, "
|
||||||
"query_len=%d, attached_files=%d, image_blocks=%d",
|
"query_len=%d, attached_files=%d, image_blocks=%d",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
use_resume,
|
use_resume,
|
||||||
len(session.messages),
|
len(session.messages),
|
||||||
len(query_message),
|
len(query_message),
|
||||||
@@ -1105,8 +1126,12 @@ async def stream_chat_completion_sdk(
|
|||||||
await client._transport.write( # noqa: SLF001
|
await client._transport.write( # noqa: SLF001
|
||||||
json.dumps(user_msg) + "\n"
|
json.dumps(user_msg) + "\n"
|
||||||
)
|
)
|
||||||
|
# Capture user message in transcript (multimodal)
|
||||||
|
transcript_builder.add_user_message(content=content_blocks)
|
||||||
else:
|
else:
|
||||||
await client.query(query_message, session_id=session_id)
|
await client.query(query_message, session_id=session_id)
|
||||||
|
# Capture user message in transcript (text only)
|
||||||
|
transcript_builder.add_user_message(content=query_message)
|
||||||
|
|
||||||
assistant_response = ChatMessage(role="assistant", content="")
|
assistant_response = ChatMessage(role="assistant", content="")
|
||||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||||
@@ -1150,8 +1175,8 @@ async def stream_chat_completion_sdk(
|
|||||||
sdk_msg = done.pop().result()
|
sdk_msg = done.pop().result()
|
||||||
except StopAsyncIteration:
|
except StopAsyncIteration:
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
|
"%s Stream ended normally (StopAsyncIteration)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
except Exception as stream_err:
|
except Exception as stream_err:
|
||||||
@@ -1160,8 +1185,8 @@ async def stream_chat_completion_sdk(
|
|||||||
# so the session can still be saved and the
|
# so the session can still be saved and the
|
||||||
# frontend gets a clean finish.
|
# frontend gets a clean finish.
|
||||||
logger.error(
|
logger.error(
|
||||||
"[SDK] [%s] Stream error from SDK: %s",
|
"%s Stream error from SDK: %s",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
stream_err,
|
stream_err,
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
@@ -1173,9 +1198,9 @@ async def stream_chat_completion_sdk(
|
|||||||
break
|
break
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Received: %s %s "
|
"%s Received: %s %s "
|
||||||
"(unresolved=%d, current=%d, resolved=%d)",
|
"(unresolved=%d, current=%d, resolved=%d)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
type(sdk_msg).__name__,
|
type(sdk_msg).__name__,
|
||||||
getattr(sdk_msg, "subtype", ""),
|
getattr(sdk_msg, "subtype", ""),
|
||||||
len(adapter.current_tool_calls)
|
len(adapter.current_tool_calls)
|
||||||
@@ -1184,6 +1209,15 @@ async def stream_chat_completion_sdk(
|
|||||||
len(adapter.resolved_tool_calls),
|
len(adapter.resolved_tool_calls),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Capture AssistantMessage in transcript
|
||||||
|
if isinstance(sdk_msg, AssistantMessage):
|
||||||
|
content_blocks = _format_sdk_content_blocks(sdk_msg.content)
|
||||||
|
model_name = getattr(sdk_msg, "model", "")
|
||||||
|
transcript_builder.add_assistant_message(
|
||||||
|
content_blocks=content_blocks,
|
||||||
|
model=model_name,
|
||||||
|
)
|
||||||
|
|
||||||
# Race-condition fix: SDK hooks (PostToolUse) are
|
# Race-condition fix: SDK hooks (PostToolUse) are
|
||||||
# executed asynchronously via start_soon() — the next
|
# executed asynchronously via start_soon() — the next
|
||||||
# message can arrive before the hook stashes output.
|
# message can arrive before the hook stashes output.
|
||||||
@@ -1210,10 +1244,10 @@ async def stream_chat_completion_sdk(
|
|||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[SDK] [%s] Timed out waiting for "
|
"%s Timed out waiting for "
|
||||||
"PostToolUse hook stash "
|
"PostToolUse hook stash "
|
||||||
"(%d unresolved tool calls)",
|
"(%d unresolved tool calls)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
len(adapter.current_tool_calls)
|
len(adapter.current_tool_calls)
|
||||||
- len(adapter.resolved_tool_calls),
|
- len(adapter.resolved_tool_calls),
|
||||||
)
|
)
|
||||||
@@ -1221,9 +1255,9 @@ async def stream_chat_completion_sdk(
|
|||||||
# Log ResultMessage details for debugging
|
# Log ResultMessage details for debugging
|
||||||
if isinstance(sdk_msg, ResultMessage):
|
if isinstance(sdk_msg, ResultMessage):
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Received: ResultMessage %s "
|
"%s Received: ResultMessage %s "
|
||||||
"(unresolved=%d, current=%d, resolved=%d)",
|
"(unresolved=%d, current=%d, resolved=%d)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
sdk_msg.subtype,
|
sdk_msg.subtype,
|
||||||
len(adapter.current_tool_calls)
|
len(adapter.current_tool_calls)
|
||||||
- len(adapter.resolved_tool_calls),
|
- len(adapter.resolved_tool_calls),
|
||||||
@@ -1232,8 +1266,8 @@ async def stream_chat_completion_sdk(
|
|||||||
)
|
)
|
||||||
if sdk_msg.subtype in ("error", "error_during_execution"):
|
if sdk_msg.subtype in ("error", "error_during_execution"):
|
||||||
logger.error(
|
logger.error(
|
||||||
"[SDK] [%s] SDK execution failed with error: %s",
|
"%s SDK execution failed with error: %s",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
sdk_msg.result or "(no error message provided)",
|
sdk_msg.result or "(no error message provided)",
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1258,8 +1292,8 @@ async def stream_chat_completion_sdk(
|
|||||||
out_len = len(str(response.output))
|
out_len = len(str(response.output))
|
||||||
extra = f", output_len={out_len}"
|
extra = f", output_len={out_len}"
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Tool event: %s, tool=%s%s",
|
"%s Tool event: %s, tool=%s%s",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
type(response).__name__,
|
type(response).__name__,
|
||||||
getattr(response, "toolName", "N/A"),
|
getattr(response, "toolName", "N/A"),
|
||||||
extra,
|
extra,
|
||||||
@@ -1268,8 +1302,8 @@ async def stream_chat_completion_sdk(
|
|||||||
# Log errors being sent to frontend
|
# Log errors being sent to frontend
|
||||||
if isinstance(response, StreamError):
|
if isinstance(response, StreamError):
|
||||||
logger.error(
|
logger.error(
|
||||||
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
|
"%s Sending error to frontend: %s (code=%s)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
response.errorText,
|
response.errorText,
|
||||||
response.code,
|
response.code,
|
||||||
)
|
)
|
||||||
@@ -1335,8 +1369,8 @@ async def stream_chat_completion_sdk(
|
|||||||
# server shutdown). Log and let the safety-net / finally
|
# server shutdown). Log and let the safety-net / finally
|
||||||
# blocks handle cleanup.
|
# blocks handle cleanup.
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
|
"%s Streaming loop cancelled (asyncio.CancelledError)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
@@ -1350,7 +1384,8 @@ async def stream_chat_completion_sdk(
|
|||||||
except (asyncio.CancelledError, StopAsyncIteration):
|
except (asyncio.CancelledError, StopAsyncIteration):
|
||||||
# Expected: task was cancelled or exhausted during cleanup
|
# Expected: task was cancelled or exhausted during cleanup
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] Pending __anext__ task completed during cleanup"
|
"%s Pending __anext__ task completed during cleanup",
|
||||||
|
log_prefix,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Safety net: if tools are still unresolved after the
|
# Safety net: if tools are still unresolved after the
|
||||||
@@ -1359,9 +1394,9 @@ async def stream_chat_completion_sdk(
|
|||||||
# them now so the frontend stops showing spinners.
|
# them now so the frontend stops showing spinners.
|
||||||
if adapter.has_unresolved_tool_calls:
|
if adapter.has_unresolved_tool_calls:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
|
"%s %d unresolved tool(s) after stream loop — "
|
||||||
"flushing as safety net",
|
"flushing as safety net",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
|
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
|
||||||
)
|
)
|
||||||
safety_responses: list[StreamBaseResponse] = []
|
safety_responses: list[StreamBaseResponse] = []
|
||||||
@@ -1372,8 +1407,8 @@ async def stream_chat_completion_sdk(
|
|||||||
(StreamToolInputAvailable, StreamToolOutputAvailable),
|
(StreamToolInputAvailable, StreamToolOutputAvailable),
|
||||||
):
|
):
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Safety flush: %s, tool=%s",
|
"%s Safety flush: %s, tool=%s",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
type(response).__name__,
|
type(response).__name__,
|
||||||
getattr(response, "toolName", "N/A"),
|
getattr(response, "toolName", "N/A"),
|
||||||
)
|
)
|
||||||
@@ -1386,8 +1421,8 @@ async def stream_chat_completion_sdk(
|
|||||||
# StreamFinish is published by mark_session_completed in the processor.
|
# StreamFinish is published by mark_session_completed in the processor.
|
||||||
if not stream_completed and not ended_with_stream_error:
|
if not stream_completed and not ended_with_stream_error:
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
|
"%s Stream ended without ResultMessage (stopped by user)",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
)
|
)
|
||||||
closing_responses: list[StreamBaseResponse] = []
|
closing_responses: list[StreamBaseResponse] = []
|
||||||
adapter._end_text_if_open(closing_responses)
|
adapter._end_text_if_open(closing_responses)
|
||||||
@@ -1408,69 +1443,36 @@ async def stream_chat_completion_sdk(
|
|||||||
) and not has_appended_assistant:
|
) and not has_appended_assistant:
|
||||||
session.messages.append(assistant_response)
|
session.messages.append(assistant_response)
|
||||||
|
|
||||||
# --- Upload transcript for next-turn --resume ---
|
# Transcript upload is handled exclusively in the finally block
|
||||||
# After async with the SDK task group has exited, so the Stop
|
# to avoid double-uploads (the success path used to upload the
|
||||||
# hook has already fired and the CLI has been SIGTERMed. The
|
# old resume file, then the finally block overwrote it with the
|
||||||
# CLI uses appendFileSync, so all writes are safely on disk.
|
# stop hook content — which could be smaller after compaction).
|
||||||
if config.claude_agent_use_resume and user_id:
|
|
||||||
# With --resume the CLI appends to the resume file (most
|
if ended_with_stream_error:
|
||||||
# complete). Otherwise use the Stop hook path.
|
logger.warning(
|
||||||
if use_resume and resume_file:
|
"%s Stream ended with SDK error after %d messages",
|
||||||
raw_transcript = read_transcript_file(resume_file)
|
log_prefix,
|
||||||
logger.debug("[SDK] Transcript source: resume file")
|
len(session.messages),
|
||||||
elif captured_transcript.path:
|
|
||||||
raw_transcript = read_transcript_file(captured_transcript.path)
|
|
||||||
logger.debug(
|
|
||||||
"[SDK] Transcript source: stop hook (%s), read result: %s",
|
|
||||||
captured_transcript.path,
|
|
||||||
f"{len(raw_transcript)}B" if raw_transcript else "None",
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raw_transcript = None
|
|
||||||
|
|
||||||
if not raw_transcript:
|
|
||||||
logger.debug(
|
|
||||||
"[SDK] No usable transcript — CLI file had no "
|
|
||||||
"conversation entries (expected for first turn "
|
|
||||||
"without --resume)"
|
|
||||||
)
|
|
||||||
|
|
||||||
if raw_transcript:
|
|
||||||
# Shield the upload from generator cancellation so a
|
|
||||||
# client disconnect / page refresh doesn't lose the
|
|
||||||
# transcript. The upload must finish even if the SSE
|
|
||||||
# connection is torn down.
|
|
||||||
await asyncio.shield(
|
|
||||||
_try_upload_transcript(
|
|
||||||
user_id,
|
|
||||||
session_id,
|
|
||||||
raw_transcript,
|
|
||||||
message_count=len(session.messages),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Stream completed successfully with %d messages",
|
"%s Stream completed successfully with %d messages",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
len(session.messages),
|
len(session.messages),
|
||||||
)
|
)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
# Catch BaseException to handle both Exception and CancelledError
|
# Catch BaseException to handle both Exception and CancelledError
|
||||||
# (CancelledError inherits from BaseException in Python 3.8+)
|
# (CancelledError inherits from BaseException in Python 3.8+)
|
||||||
if isinstance(e, asyncio.CancelledError):
|
if isinstance(e, asyncio.CancelledError):
|
||||||
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
|
logger.warning("%s Session cancelled", log_prefix)
|
||||||
error_msg = "Operation cancelled"
|
error_msg = "Operation cancelled"
|
||||||
else:
|
else:
|
||||||
error_msg = str(e) or type(e).__name__
|
error_msg = str(e) or type(e).__name__
|
||||||
# SDK cleanup RuntimeError is expected during cancellation, log as warning
|
# SDK cleanup RuntimeError is expected during cancellation, log as warning
|
||||||
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
|
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
|
||||||
logger.warning(
|
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
|
||||||
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
logger.error(
|
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
|
||||||
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# Append error marker to session (non-invasive text parsing approach)
|
# Append error marker to session (non-invasive text parsing approach)
|
||||||
# The finally block will persist the session with this error marker
|
# The finally block will persist the session with this error marker
|
||||||
@@ -1481,8 +1483,8 @@ async def stream_chat_completion_sdk(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[SDK] [%s] Appended error marker, will be persisted in finally",
|
"%s Appended error marker, will be persisted in finally",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Yield StreamError for immediate feedback (only for non-cancellation errors)
|
# Yield StreamError for immediate feedback (only for non-cancellation errors)
|
||||||
@@ -1514,47 +1516,72 @@ async def stream_chat_completion_sdk(
|
|||||||
try:
|
try:
|
||||||
await asyncio.shield(upsert_chat_session(session))
|
await asyncio.shield(upsert_chat_session(session))
|
||||||
logger.info(
|
logger.info(
|
||||||
"[SDK] [%s] Session persisted in finally with %d messages",
|
"%s Session persisted in finally with %d messages",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
len(session.messages),
|
len(session.messages),
|
||||||
)
|
)
|
||||||
except Exception as persist_err:
|
except Exception as persist_err:
|
||||||
logger.error(
|
logger.error(
|
||||||
"[SDK] [%s] Failed to persist session in finally: %s",
|
"%s Failed to persist session in finally: %s",
|
||||||
session_id[:12],
|
log_prefix,
|
||||||
persist_err,
|
persist_err,
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# --- Upload transcript for next-turn --resume ---
|
# --- Upload transcript for next-turn --resume ---
|
||||||
# This MUST run in finally so the transcript is uploaded even when
|
# This MUST run in finally so the transcript is uploaded even when
|
||||||
# the streaming loop raises an exception. The CLI uses
|
# the streaming loop raises an exception.
|
||||||
# appendFileSync, so whatever was written before the error/SIGTERM
|
# The transcript represents the COMPLETE active context (atomic).
|
||||||
# is safely on disk and still useful for the next turn.
|
if config.claude_agent_use_resume and user_id and session is not None:
|
||||||
if config.claude_agent_use_resume and user_id:
|
|
||||||
try:
|
try:
|
||||||
# Prefer content captured in the Stop hook (read before
|
# Build complete transcript from captured SDK messages
|
||||||
# cleanup removes the file). Fall back to the resume
|
transcript_content = transcript_builder.to_jsonl()
|
||||||
# file when the stop hook didn't fire (e.g. error before
|
|
||||||
# completion) so we don't lose the prior transcript.
|
|
||||||
raw_transcript = captured_transcript.raw_content or None
|
|
||||||
if not raw_transcript and use_resume and resume_file:
|
|
||||||
raw_transcript = read_transcript_file(resume_file)
|
|
||||||
|
|
||||||
if raw_transcript and session is not None:
|
if not transcript_content:
|
||||||
await asyncio.shield(
|
logger.warning(
|
||||||
_try_upload_transcript(
|
"%s No transcript to upload (builder empty)", log_prefix
|
||||||
user_id,
|
|
||||||
session_id,
|
|
||||||
raw_transcript,
|
|
||||||
message_count=len(session.messages),
|
|
||||||
)
|
)
|
||||||
|
elif not validate_transcript(transcript_content):
|
||||||
|
logger.warning(
|
||||||
|
"%s Transcript invalid, skipping upload (entries=%d)",
|
||||||
|
log_prefix,
|
||||||
|
transcript_builder.entry_count,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"[SDK] No transcript to upload for {session_id}")
|
logger.info(
|
||||||
|
"%s Uploading complete transcript (entries=%d, bytes=%d)",
|
||||||
|
log_prefix,
|
||||||
|
transcript_builder.entry_count,
|
||||||
|
len(transcript_content),
|
||||||
|
)
|
||||||
|
# Create task first so we have a reference if timeout occurs
|
||||||
|
upload_task = asyncio.create_task(
|
||||||
|
upload_transcript(
|
||||||
|
user_id=user_id,
|
||||||
|
session_id=session_id,
|
||||||
|
content=transcript_content,
|
||||||
|
message_count=len(session.messages),
|
||||||
|
log_prefix=log_prefix,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
async with asyncio.timeout(30):
|
||||||
|
await asyncio.shield(upload_task)
|
||||||
|
except TimeoutError:
|
||||||
|
# Timeout fired but shield keeps upload running - track the
|
||||||
|
# SAME task to prevent garbage collection (no double upload)
|
||||||
|
logger.warning(
|
||||||
|
"%s Transcript upload exceeded 30s timeout, "
|
||||||
|
"continuing in background",
|
||||||
|
log_prefix,
|
||||||
|
)
|
||||||
|
_background_tasks.add(upload_task)
|
||||||
|
upload_task.add_done_callback(_background_tasks.discard)
|
||||||
except Exception as upload_err:
|
except Exception as upload_err:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[SDK] Transcript upload failed in finally: {upload_err}",
|
"%s Transcript upload failed in finally: %s",
|
||||||
|
log_prefix,
|
||||||
|
upload_err,
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1565,33 +1592,6 @@ async def stream_chat_completion_sdk(
|
|||||||
await lock.release()
|
await lock.release()
|
||||||
|
|
||||||
|
|
||||||
async def _try_upload_transcript(
|
|
||||||
user_id: str,
|
|
||||||
session_id: str,
|
|
||||||
raw_content: str,
|
|
||||||
message_count: int = 0,
|
|
||||||
) -> bool:
|
|
||||||
"""Strip progress entries and upload transcript (with timeout).
|
|
||||||
|
|
||||||
Returns True if the upload completed without error.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
async with asyncio.timeout(30):
|
|
||||||
await upload_transcript(
|
|
||||||
user_id, session_id, raw_content, message_count=message_count
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
f"[SDK] Failed to upload transcript for {session_id}: {e}",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
async def _update_title_async(
|
async def _update_title_async(
|
||||||
session_id: str, message: str, user_id: str | None = None
|
session_id: str, message: str, user_id: str | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
@@ -58,41 +58,40 @@ def strip_progress_entries(content: str) -> str:
|
|||||||
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
|
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
|
||||||
any remaining child entries so the ``parentUuid`` chain stays intact.
|
any remaining child entries so the ``parentUuid`` chain stays intact.
|
||||||
Typically reduces transcript size by ~30%.
|
Typically reduces transcript size by ~30%.
|
||||||
|
|
||||||
|
Entries that are not stripped or reparented are kept as their original
|
||||||
|
raw JSON line to avoid unnecessary re-serialization that changes
|
||||||
|
whitespace or key ordering.
|
||||||
"""
|
"""
|
||||||
lines = content.strip().split("\n")
|
lines = content.strip().split("\n")
|
||||||
|
|
||||||
entries: list[dict] = []
|
# Parse entries, keeping the original line alongside the parsed dict.
|
||||||
|
parsed: list[tuple[str, dict | None]] = []
|
||||||
for line in lines:
|
for line in lines:
|
||||||
try:
|
try:
|
||||||
entries.append(json.loads(line))
|
parsed.append((line, json.loads(line)))
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
# Keep unparseable lines as-is (safety)
|
parsed.append((line, None))
|
||||||
entries.append({"_raw": line})
|
|
||||||
|
|
||||||
|
# First pass: identify stripped UUIDs and build parent map.
|
||||||
stripped_uuids: set[str] = set()
|
stripped_uuids: set[str] = set()
|
||||||
uuid_to_parent: dict[str, str] = {}
|
uuid_to_parent: dict[str, str] = {}
|
||||||
kept: list[dict] = []
|
|
||||||
|
|
||||||
for entry in entries:
|
for _line, entry in parsed:
|
||||||
if "_raw" in entry:
|
if entry is None:
|
||||||
kept.append(entry)
|
|
||||||
continue
|
continue
|
||||||
uid = entry.get("uuid", "")
|
uid = entry.get("uuid", "")
|
||||||
parent = entry.get("parentUuid", "")
|
parent = entry.get("parentUuid", "")
|
||||||
entry_type = entry.get("type", "")
|
|
||||||
|
|
||||||
if uid:
|
if uid:
|
||||||
uuid_to_parent[uid] = parent
|
uuid_to_parent[uid] = parent
|
||||||
|
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
|
||||||
if entry_type in STRIPPABLE_TYPES:
|
|
||||||
if uid:
|
|
||||||
stripped_uuids.add(uid)
|
stripped_uuids.add(uid)
|
||||||
else:
|
|
||||||
kept.append(entry)
|
|
||||||
|
|
||||||
# Reparent: walk up chain through stripped entries to find surviving ancestor
|
# Second pass: keep non-stripped entries, reparenting where needed.
|
||||||
for entry in kept:
|
# Preserve original line when no reparenting is required.
|
||||||
if "_raw" in entry:
|
reparented: set[str] = set()
|
||||||
|
for _line, entry in parsed:
|
||||||
|
if entry is None:
|
||||||
continue
|
continue
|
||||||
parent = entry.get("parentUuid", "")
|
parent = entry.get("parentUuid", "")
|
||||||
original_parent = parent
|
original_parent = parent
|
||||||
@@ -100,63 +99,32 @@ def strip_progress_entries(content: str) -> str:
|
|||||||
parent = uuid_to_parent.get(parent, "")
|
parent = uuid_to_parent.get(parent, "")
|
||||||
if parent != original_parent:
|
if parent != original_parent:
|
||||||
entry["parentUuid"] = parent
|
entry["parentUuid"] = parent
|
||||||
|
uid = entry.get("uuid", "")
|
||||||
|
if uid:
|
||||||
|
reparented.add(uid)
|
||||||
|
|
||||||
result_lines: list[str] = []
|
result_lines: list[str] = []
|
||||||
for entry in kept:
|
for line, entry in parsed:
|
||||||
if "_raw" in entry:
|
if entry is None:
|
||||||
result_lines.append(entry["_raw"])
|
result_lines.append(line)
|
||||||
else:
|
continue
|
||||||
|
if entry.get("type", "") in STRIPPABLE_TYPES:
|
||||||
|
continue
|
||||||
|
uid = entry.get("uuid", "")
|
||||||
|
if uid in reparented:
|
||||||
|
# Re-serialize only entries whose parentUuid was changed.
|
||||||
result_lines.append(json.dumps(entry, separators=(",", ":")))
|
result_lines.append(json.dumps(entry, separators=(",", ":")))
|
||||||
|
else:
|
||||||
|
result_lines.append(line)
|
||||||
|
|
||||||
return "\n".join(result_lines) + "\n"
|
return "\n".join(result_lines) + "\n"
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
|
# Local file I/O (write temp file for --resume)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def read_transcript_file(transcript_path: str) -> str | None:
|
|
||||||
"""Read a JSONL transcript file from disk.
|
|
||||||
|
|
||||||
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
|
|
||||||
or only contains metadata (≤2 lines with no conversation messages).
|
|
||||||
"""
|
|
||||||
if not transcript_path or not os.path.isfile(transcript_path):
|
|
||||||
logger.debug(f"[Transcript] File not found: {transcript_path}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(transcript_path) as f:
|
|
||||||
content = f.read()
|
|
||||||
|
|
||||||
if not content.strip():
|
|
||||||
logger.debug("[Transcript] File is empty: %s", transcript_path)
|
|
||||||
return None
|
|
||||||
|
|
||||||
lines = content.strip().split("\n")
|
|
||||||
|
|
||||||
# Validate that the transcript has real conversation content
|
|
||||||
# (not just metadata like queue-operation entries).
|
|
||||||
if not validate_transcript(content):
|
|
||||||
logger.debug(
|
|
||||||
"[Transcript] No conversation content (%d lines) in %s",
|
|
||||||
len(lines),
|
|
||||||
transcript_path,
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"[Transcript] Read {len(lines)} lines, "
|
|
||||||
f"{len(content)} bytes from {transcript_path}"
|
|
||||||
)
|
|
||||||
return content
|
|
||||||
|
|
||||||
except (json.JSONDecodeError, OSError) as e:
|
|
||||||
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
|
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
|
||||||
"""Sanitize an ID for safe use in file paths.
|
"""Sanitize an ID for safe use in file paths.
|
||||||
|
|
||||||
@@ -171,14 +139,6 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
|
|||||||
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
|
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
|
||||||
|
|
||||||
|
|
||||||
def _encode_cwd_for_cli(cwd: str) -> str:
|
|
||||||
"""Encode a working directory path the same way the Claude CLI does.
|
|
||||||
|
|
||||||
The CLI replaces all non-alphanumeric characters with ``-``.
|
|
||||||
"""
|
|
||||||
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
||||||
"""Remove the CLI's project directory for a specific working directory.
|
"""Remove the CLI's project directory for a specific working directory.
|
||||||
|
|
||||||
@@ -188,7 +148,8 @@ def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
|||||||
"""
|
"""
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
cwd_encoded = _encode_cwd_for_cli(sdk_cwd)
|
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
|
||||||
|
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
|
||||||
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
|
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
|
||||||
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
|
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
|
||||||
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
|
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
|
||||||
@@ -248,32 +209,30 @@ def write_transcript_to_tempfile(
|
|||||||
def validate_transcript(content: str | None) -> bool:
|
def validate_transcript(content: str | None) -> bool:
|
||||||
"""Check that a transcript has actual conversation messages.
|
"""Check that a transcript has actual conversation messages.
|
||||||
|
|
||||||
A valid transcript for resume needs at least one user message and one
|
A valid transcript needs at least one assistant message (not just
|
||||||
assistant message (not just queue-operation / file-history-snapshot
|
queue-operation / file-history-snapshot metadata). We do NOT require
|
||||||
metadata).
|
a ``type: "user"`` entry because with ``--resume`` the user's message
|
||||||
|
is passed as a CLI query parameter and does not appear in the
|
||||||
|
transcript file.
|
||||||
"""
|
"""
|
||||||
if not content or not content.strip():
|
if not content or not content.strip():
|
||||||
return False
|
return False
|
||||||
|
|
||||||
lines = content.strip().split("\n")
|
lines = content.strip().split("\n")
|
||||||
if len(lines) < 2:
|
|
||||||
return False
|
|
||||||
|
|
||||||
has_user = False
|
|
||||||
has_assistant = False
|
has_assistant = False
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
entry = json.loads(line)
|
entry = json.loads(line)
|
||||||
msg_type = entry.get("type")
|
if entry.get("type") == "assistant":
|
||||||
if msg_type == "user":
|
|
||||||
has_user = True
|
|
||||||
elif msg_type == "assistant":
|
|
||||||
has_assistant = True
|
has_assistant = True
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return has_user and has_assistant
|
return has_assistant
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -328,26 +287,43 @@ async def upload_transcript(
|
|||||||
session_id: str,
|
session_id: str,
|
||||||
content: str,
|
content: str,
|
||||||
message_count: int = 0,
|
message_count: int = 0,
|
||||||
|
log_prefix: str = "[Transcript]",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Strip progress entries and upload transcript to bucket storage.
|
"""Strip progress entries and upload complete transcript.
|
||||||
|
|
||||||
|
The transcript represents the FULL active context (atomic).
|
||||||
|
Each upload REPLACES the previous transcript entirely.
|
||||||
|
|
||||||
The executor holds a cluster lock per session, so concurrent uploads for
|
The executor holds a cluster lock per session, so concurrent uploads for
|
||||||
the same session cannot happen. We always overwrite — with ``--resume``
|
the same session cannot happen.
|
||||||
the CLI may compact old tool results, so neither byte size nor line count
|
|
||||||
is a reliable proxy for "newer".
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message_count: ``len(session.messages)`` at upload time — used by
|
content: Complete JSONL transcript (from TranscriptBuilder).
|
||||||
the next turn to detect staleness and compress only the gap.
|
message_count: ``len(session.messages)`` at upload time.
|
||||||
"""
|
"""
|
||||||
from backend.util.workspace_storage import get_workspace_storage
|
from backend.util.workspace_storage import get_workspace_storage
|
||||||
|
|
||||||
|
# Strip metadata entries (progress, file-history-snapshot, etc.)
|
||||||
|
# Note: SDK-built transcripts shouldn't have these, but strip for safety
|
||||||
stripped = strip_progress_entries(content)
|
stripped = strip_progress_entries(content)
|
||||||
if not validate_transcript(stripped):
|
if not validate_transcript(stripped):
|
||||||
|
# Log entry types for debugging — helps identify why validation failed
|
||||||
|
entry_types: list[str] = []
|
||||||
|
for line in stripped.strip().split("\n"):
|
||||||
|
try:
|
||||||
|
entry_types.append(json.loads(line).get("type", "?"))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
entry_types.append("INVALID_JSON")
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[Transcript] Skipping upload — stripped content not valid "
|
"%s Skipping upload — stripped content not valid "
|
||||||
f"for session {session_id}"
|
"(types=%s, stripped_len=%d, raw_len=%d)",
|
||||||
|
log_prefix,
|
||||||
|
entry_types,
|
||||||
|
len(stripped),
|
||||||
|
len(content),
|
||||||
)
|
)
|
||||||
|
logger.debug("%s Raw content preview: %s", log_prefix, content[:500])
|
||||||
|
logger.debug("%s Stripped content: %s", log_prefix, stripped[:500])
|
||||||
return
|
return
|
||||||
|
|
||||||
storage = await get_workspace_storage()
|
storage = await get_workspace_storage()
|
||||||
@@ -373,17 +349,18 @@ async def upload_transcript(
|
|||||||
content=json.dumps(meta).encode("utf-8"),
|
content=json.dumps(meta).encode("utf-8"),
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
|
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[Transcript] Uploaded {len(encoded)}B "
|
f"{log_prefix} Uploaded {len(encoded)}B "
|
||||||
f"(stripped from {len(content)}B, msg_count={message_count}) "
|
f"(stripped from {len(content)}B, msg_count={message_count})"
|
||||||
f"for session {session_id}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def download_transcript(
|
async def download_transcript(
|
||||||
user_id: str, session_id: str
|
user_id: str,
|
||||||
|
session_id: str,
|
||||||
|
log_prefix: str = "[Transcript]",
|
||||||
) -> TranscriptDownload | None:
|
) -> TranscriptDownload | None:
|
||||||
"""Download transcript and metadata from bucket storage.
|
"""Download transcript and metadata from bucket storage.
|
||||||
|
|
||||||
@@ -399,10 +376,10 @@ async def download_transcript(
|
|||||||
data = await storage.retrieve(path)
|
data = await storage.retrieve(path)
|
||||||
content = data.decode("utf-8")
|
content = data.decode("utf-8")
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
|
logger.debug(f"{log_prefix} No transcript in storage")
|
||||||
return None
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[Transcript] Failed to download transcript: {e}")
|
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Try to load metadata (best-effort — old transcripts won't have it)
|
# Try to load metadata (best-effort — old transcripts won't have it)
|
||||||
@@ -425,10 +402,7 @@ async def download_transcript(
|
|||||||
except (FileNotFoundError, json.JSONDecodeError, Exception):
|
except (FileNotFoundError, json.JSONDecodeError, Exception):
|
||||||
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
|
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
|
||||||
|
|
||||||
logger.info(
|
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
|
||||||
f"[Transcript] Downloaded {len(content)}B "
|
|
||||||
f"(msg_count={message_count}) for session {session_id}"
|
|
||||||
)
|
|
||||||
return TranscriptDownload(
|
return TranscriptDownload(
|
||||||
content=content,
|
content=content,
|
||||||
message_count=message_count,
|
message_count=message_count,
|
||||||
|
|||||||
@@ -0,0 +1,140 @@
|
|||||||
|
"""Build complete JSONL transcript from SDK messages.
|
||||||
|
|
||||||
|
The transcript represents the FULL active context at any point in time.
|
||||||
|
Each upload REPLACES the previous transcript atomically.
|
||||||
|
|
||||||
|
Flow:
|
||||||
|
Turn 1: Upload [msg1, msg2]
|
||||||
|
Turn 2: Download [msg1, msg2] → Upload [msg1, msg2, msg3, msg4] (REPLACE)
|
||||||
|
Turn 3: Download [msg1, msg2, msg3, msg4] → Upload [all messages] (REPLACE)
|
||||||
|
|
||||||
|
The transcript is never incremental - always the complete atomic state.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class TranscriptEntry(BaseModel):
|
||||||
|
"""Single transcript entry (user or assistant turn)."""
|
||||||
|
|
||||||
|
type: str
|
||||||
|
uuid: str
|
||||||
|
parentUuid: str | None
|
||||||
|
message: dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
class TranscriptBuilder:
|
||||||
|
"""Build complete JSONL transcript from SDK messages.
|
||||||
|
|
||||||
|
This builder maintains the FULL conversation state, not incremental changes.
|
||||||
|
The output is always the complete active context.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._entries: list[TranscriptEntry] = []
|
||||||
|
self._last_uuid: str | None = None
|
||||||
|
|
||||||
|
def load_previous(self, content: str) -> None:
|
||||||
|
"""Load complete previous transcript.
|
||||||
|
|
||||||
|
This loads the FULL previous context. As new messages come in,
|
||||||
|
we append to this state. The final output is the complete context
|
||||||
|
(previous + new), not just the delta.
|
||||||
|
"""
|
||||||
|
if not content or not content.strip():
|
||||||
|
return
|
||||||
|
|
||||||
|
for line in content.strip().split("\n"):
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(line)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.warning("Failed to parse transcript line: %s", line[:100])
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Only load conversation messages (user/assistant)
|
||||||
|
# Skip metadata entries
|
||||||
|
if data.get("type") not in ("user", "assistant"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
entry = TranscriptEntry(
|
||||||
|
type=data["type"],
|
||||||
|
uuid=data.get("uuid") or str(uuid4()),
|
||||||
|
parentUuid=data.get("parentUuid"),
|
||||||
|
message=data.get("message", {}),
|
||||||
|
)
|
||||||
|
self._entries.append(entry)
|
||||||
|
self._last_uuid = entry.uuid
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Loaded %d entries from previous transcript (last_uuid=%s)",
|
||||||
|
len(self._entries),
|
||||||
|
self._last_uuid[:12] if self._last_uuid else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
def add_user_message(
|
||||||
|
self, content: str | list[dict], uuid: str | None = None
|
||||||
|
) -> None:
|
||||||
|
"""Add user message to the complete context."""
|
||||||
|
msg_uuid = uuid or str(uuid4())
|
||||||
|
|
||||||
|
self._entries.append(
|
||||||
|
TranscriptEntry(
|
||||||
|
type="user",
|
||||||
|
uuid=msg_uuid,
|
||||||
|
parentUuid=self._last_uuid,
|
||||||
|
message={"role": "user", "content": content},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self._last_uuid = msg_uuid
|
||||||
|
|
||||||
|
def add_assistant_message(
|
||||||
|
self, content_blocks: list[dict], model: str = ""
|
||||||
|
) -> None:
|
||||||
|
"""Add assistant message to the complete context."""
|
||||||
|
msg_uuid = str(uuid4())
|
||||||
|
|
||||||
|
self._entries.append(
|
||||||
|
TranscriptEntry(
|
||||||
|
type="assistant",
|
||||||
|
uuid=msg_uuid,
|
||||||
|
parentUuid=self._last_uuid,
|
||||||
|
message={
|
||||||
|
"role": "assistant",
|
||||||
|
"model": model,
|
||||||
|
"content": content_blocks,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self._last_uuid = msg_uuid
|
||||||
|
|
||||||
|
def to_jsonl(self) -> str:
|
||||||
|
"""Export complete context as JSONL.
|
||||||
|
|
||||||
|
Returns the FULL conversation state (all entries), not incremental.
|
||||||
|
This output REPLACES any previous transcript.
|
||||||
|
"""
|
||||||
|
if not self._entries:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
|
||||||
|
return "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entry_count(self) -> int:
|
||||||
|
"""Total number of entries in the complete context."""
|
||||||
|
return len(self._entries)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_empty(self) -> bool:
|
||||||
|
"""Whether this builder has any entries."""
|
||||||
|
return len(self._entries) == 0
|
||||||
@@ -5,7 +5,6 @@ import os
|
|||||||
|
|
||||||
from .transcript import (
|
from .transcript import (
|
||||||
STRIPPABLE_TYPES,
|
STRIPPABLE_TYPES,
|
||||||
read_transcript_file,
|
|
||||||
strip_progress_entries,
|
strip_progress_entries,
|
||||||
validate_transcript,
|
validate_transcript,
|
||||||
write_transcript_to_tempfile,
|
write_transcript_to_tempfile,
|
||||||
@@ -38,49 +37,6 @@ PROGRESS_ENTRY = {
|
|||||||
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
|
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
|
||||||
|
|
||||||
|
|
||||||
# --- read_transcript_file ---
|
|
||||||
|
|
||||||
|
|
||||||
class TestReadTranscriptFile:
|
|
||||||
def test_returns_content_for_valid_file(self, tmp_path):
|
|
||||||
path = tmp_path / "session.jsonl"
|
|
||||||
path.write_text(VALID_TRANSCRIPT)
|
|
||||||
result = read_transcript_file(str(path))
|
|
||||||
assert result is not None
|
|
||||||
assert "user" in result
|
|
||||||
|
|
||||||
def test_returns_none_for_missing_file(self):
|
|
||||||
assert read_transcript_file("/nonexistent/path.jsonl") is None
|
|
||||||
|
|
||||||
def test_returns_none_for_empty_path(self):
|
|
||||||
assert read_transcript_file("") is None
|
|
||||||
|
|
||||||
def test_returns_none_for_empty_file(self, tmp_path):
|
|
||||||
path = tmp_path / "empty.jsonl"
|
|
||||||
path.write_text("")
|
|
||||||
assert read_transcript_file(str(path)) is None
|
|
||||||
|
|
||||||
def test_returns_none_for_metadata_only(self, tmp_path):
|
|
||||||
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
|
|
||||||
path = tmp_path / "meta.jsonl"
|
|
||||||
path.write_text(content)
|
|
||||||
assert read_transcript_file(str(path)) is None
|
|
||||||
|
|
||||||
def test_returns_none_for_invalid_json(self, tmp_path):
|
|
||||||
path = tmp_path / "bad.jsonl"
|
|
||||||
path.write_text("not json\n{}\n{}\n")
|
|
||||||
assert read_transcript_file(str(path)) is None
|
|
||||||
|
|
||||||
def test_no_size_limit(self, tmp_path):
|
|
||||||
"""Large files are accepted — bucket storage has no size limit."""
|
|
||||||
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
|
|
||||||
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
|
|
||||||
path = tmp_path / "big.jsonl"
|
|
||||||
path.write_text(content)
|
|
||||||
result = read_transcript_file(str(path))
|
|
||||||
assert result is not None
|
|
||||||
|
|
||||||
|
|
||||||
# --- write_transcript_to_tempfile ---
|
# --- write_transcript_to_tempfile ---
|
||||||
|
|
||||||
|
|
||||||
@@ -155,12 +111,56 @@ class TestValidateTranscript:
|
|||||||
assert validate_transcript(content) is False
|
assert validate_transcript(content) is False
|
||||||
|
|
||||||
def test_assistant_only_no_user(self):
|
def test_assistant_only_no_user(self):
|
||||||
|
"""With --resume the user message is a CLI query param, not a transcript entry.
|
||||||
|
A transcript with only assistant entries is valid."""
|
||||||
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
|
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
|
||||||
assert validate_transcript(content) is False
|
assert validate_transcript(content) is True
|
||||||
|
|
||||||
|
def test_resume_transcript_without_user_entry(self):
|
||||||
|
"""Simulates a real --resume stop hook transcript: the CLI session file
|
||||||
|
has summary + assistant entries but no user entry."""
|
||||||
|
summary = {"type": "summary", "uuid": "s1", "text": "context..."}
|
||||||
|
asst1 = {
|
||||||
|
"type": "assistant",
|
||||||
|
"uuid": "a1",
|
||||||
|
"message": {"role": "assistant", "content": "Hello!"},
|
||||||
|
}
|
||||||
|
asst2 = {
|
||||||
|
"type": "assistant",
|
||||||
|
"uuid": "a2",
|
||||||
|
"parentUuid": "a1",
|
||||||
|
"message": {"role": "assistant", "content": "Sure, let me help."},
|
||||||
|
}
|
||||||
|
content = _make_jsonl(summary, asst1, asst2)
|
||||||
|
assert validate_transcript(content) is True
|
||||||
|
|
||||||
|
def test_single_assistant_entry(self):
|
||||||
|
"""A transcript with just one assistant line is valid — the CLI may
|
||||||
|
produce short transcripts for simple responses with no tool use."""
|
||||||
|
content = json.dumps(ASST_MSG) + "\n"
|
||||||
|
assert validate_transcript(content) is True
|
||||||
|
|
||||||
def test_invalid_json_returns_false(self):
|
def test_invalid_json_returns_false(self):
|
||||||
assert validate_transcript("not json\n{}\n{}\n") is False
|
assert validate_transcript("not json\n{}\n{}\n") is False
|
||||||
|
|
||||||
|
def test_malformed_json_after_valid_assistant_returns_false(self):
|
||||||
|
"""Validation must scan all lines - malformed JSON anywhere should fail."""
|
||||||
|
valid_asst = json.dumps(ASST_MSG)
|
||||||
|
malformed = "not valid json"
|
||||||
|
content = valid_asst + "\n" + malformed + "\n"
|
||||||
|
assert validate_transcript(content) is False
|
||||||
|
|
||||||
|
def test_blank_lines_are_skipped(self):
|
||||||
|
"""Transcripts with blank lines should be valid if they contain assistant entries."""
|
||||||
|
content = (
|
||||||
|
json.dumps(USER_MSG)
|
||||||
|
+ "\n\n" # blank line
|
||||||
|
+ json.dumps(ASST_MSG)
|
||||||
|
+ "\n"
|
||||||
|
+ "\n" # another blank line
|
||||||
|
)
|
||||||
|
assert validate_transcript(content) is True
|
||||||
|
|
||||||
|
|
||||||
# --- strip_progress_entries ---
|
# --- strip_progress_entries ---
|
||||||
|
|
||||||
@@ -253,3 +253,32 @@ class TestStripProgressEntries:
|
|||||||
assert "queue-operation" not in result_types
|
assert "queue-operation" not in result_types
|
||||||
assert "user" in result_types
|
assert "user" in result_types
|
||||||
assert "assistant" in result_types
|
assert "assistant" in result_types
|
||||||
|
|
||||||
|
def test_preserves_original_line_formatting(self):
|
||||||
|
"""Non-reparented entries keep their original JSON formatting."""
|
||||||
|
# Use pretty-printed JSON with spaces (as the CLI produces)
|
||||||
|
original_line = json.dumps(USER_MSG) # default formatting with spaces
|
||||||
|
compact_line = json.dumps(USER_MSG, separators=(",", ":"))
|
||||||
|
assert original_line != compact_line # precondition
|
||||||
|
|
||||||
|
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
|
||||||
|
result = strip_progress_entries(content)
|
||||||
|
result_lines = result.strip().split("\n")
|
||||||
|
|
||||||
|
# Original line should be byte-identical (not re-serialized)
|
||||||
|
assert result_lines[0] == original_line
|
||||||
|
|
||||||
|
def test_reparented_entries_are_reserialized(self):
|
||||||
|
"""Entries whose parentUuid changes must be re-serialized."""
|
||||||
|
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
|
||||||
|
asst = {
|
||||||
|
"type": "assistant",
|
||||||
|
"uuid": "a1",
|
||||||
|
"parentUuid": "p1",
|
||||||
|
"message": {"role": "assistant", "content": "done"},
|
||||||
|
}
|
||||||
|
content = _make_jsonl(USER_MSG, progress, asst)
|
||||||
|
result = strip_progress_entries(content)
|
||||||
|
lines = result.strip().split("\n")
|
||||||
|
asst_entry = json.loads(lines[-1])
|
||||||
|
assert asst_entry["parentUuid"] == "u1" # reparented
|
||||||
|
|||||||
Reference in New Issue
Block a user