Merge origin/dev into fix/copilot-transcript-compaction

Resolve merge conflicts in service.py, transcript.py, and transcript_test.py.
Incorporates dev's compaction tracking (emit_end_if_ready, entries_replaced,
read_compacted_entries) into the retry loop structure.
This commit is contained in:
Zamil Majdy
2026-03-14 06:08:07 +07:00
3 changed files with 175 additions and 83 deletions

View File

@@ -85,8 +85,9 @@ from .transcript import (
)
from .transcript_builder import TranscriptBuilder
# Patterns that indicate a "prompt too long" / context-length error from the
# Claude API. Matched case-insensitively against the exception message.
logger = logging.getLogger(__name__)
config = ChatConfig()
_PROMPT_TOO_LONG_PATTERNS = (
"prompt is too long",
"prompt_too_long",
@@ -95,15 +96,11 @@ _PROMPT_TOO_LONG_PATTERNS = (
def _is_prompt_too_long(err: BaseException) -> bool:
"""Return True if *err* indicates the prompt exceeded the context window."""
"""Return True if *err* indicates the prompt exceeds the model's limit."""
msg = str(err).lower()
return any(p in msg for p in _PROMPT_TOO_LONG_PATTERNS)
logger = logging.getLogger(__name__)
config = ChatConfig()
def _setup_langfuse_otel() -> None:
"""Configure OTEL tracing for the Claude Agent SDK → Langfuse.
@@ -721,11 +718,9 @@ async def stream_chat_completion_sdk(
stream_id = str(uuid.uuid4())
stream_completed = False
ended_with_stream_error = False
transcript_caused_error = False
e2b_sandbox = None
use_resume = False
resume_file: str | None = None
transcript_content: str = ""
transcript_builder = TranscriptBuilder()
sdk_cwd = ""
@@ -752,6 +747,8 @@ async def stream_chat_completion_sdk(
# OTEL context manager — initialized inside the try and cleaned up in finally.
_otel_ctx: Any = None
transcript_caused_error = False
transcript_content: str = ""
# Make sure there is no more code between the lock acquisition and try-block.
try:
@@ -815,7 +812,7 @@ async def stream_chat_completion_sdk(
)
except Exception as transcript_err:
logger.warning(
"%s Transcript download failed, continuing without " "--resume: %s",
"%s Transcript download failed, continuing without --resume: %s",
log_prefix,
transcript_err,
)
@@ -838,7 +835,7 @@ async def stream_chat_completion_sdk(
is_valid = validate_transcript(dl.content)
dl_lines = dl.content.strip().split("\n") if dl.content else []
logger.info(
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
"%s Downloaded transcript: %dB, %d lines, msg_count=%d, valid=%s",
log_prefix,
len(dl.content),
len(dl_lines),
@@ -846,13 +843,11 @@ async def stream_chat_completion_sdk(
is_valid,
)
if is_valid:
transcript_content = dl.content
# Load previous FULL context into builder
transcript_builder.load_previous(
transcript_content, log_prefix=log_prefix
)
transcript_content = dl.content
transcript_builder.load_previous(dl.content, log_prefix=log_prefix)
resume_file = write_transcript_to_tempfile(
transcript_content, session_id, sdk_cwd
dl.content, session_id, sdk_cwd
)
if resume_file:
use_resume = True
@@ -918,6 +913,13 @@ async def stream_chat_completion_sdk(
sdk_options_kwargs["model"] = sdk_model
if sdk_env:
sdk_options_kwargs["env"] = sdk_env
if use_resume and resume_file:
sdk_options_kwargs["resume"] = resume_file
options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] # dynamic kwargs
adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
# Propagate user_id/session_id as OTEL context attributes so the
# langsmith tracing integration attaches them to every span. This
# is what Langfuse (or any OTEL backend) maps to its native
@@ -934,7 +936,6 @@ async def stream_chat_completion_sdk(
)
_otel_ctx.__enter__()
# Pre-compute message and attachments (reused across retry attempts).
current_message = message or ""
if not current_message and session.messages:
last_user = [m for m in session.messages if m.role == "user"]
@@ -948,36 +949,42 @@ async def stream_chat_completion_sdk(
)
return
query_message, was_compacted = await _build_query_message(
current_message,
session,
use_resume,
transcript_msg_count,
session_id,
)
# If files are attached, prepare them: images become vision
# content blocks in the user message, other files go to sdk_cwd.
attachments = await _prepare_file_attachments(
file_ids or [], user_id or "", session_id, sdk_cwd
)
if attachments.hint:
query_message = f"{query_message}\n\n{attachments.hint}"
# --- Retry loop for prompt-too-long errors ---
# Attempt 1: use transcript as-is with --resume
# Attempt 2: compact transcript, retry with --resume
# Attempt 3: drop transcript, use _build_query_message DB fallback
_MAX_QUERY_ATTEMPTS = 3
_prompt_too_long = False
for _query_attempt in range(_MAX_QUERY_ATTEMPTS):
if _query_attempt > 0:
logger.warning(
"%s Prompt too long (attempt %d/%d), %s",
_prompt_too_long = False
logger.info(
"%s Prompt-too-long retry attempt %d/%d",
log_prefix,
_query_attempt + 1,
_MAX_QUERY_ATTEMPTS,
(
"compacting transcript"
if transcript_content
else "falling back to DB context"
),
)
if transcript_content:
if _query_attempt == 1 and transcript_content:
compacted = await compact_transcript(
transcript_content, log_prefix=log_prefix
)
if compacted and compacted != transcript_content:
transcript_content = compacted
if compacted and validate_transcript(compacted):
logger.info(
"%s Using compacted transcript for retry",
log_prefix,
)
transcript_builder = TranscriptBuilder()
transcript_builder.load_previous(
compacted, log_prefix=log_prefix
@@ -985,40 +992,36 @@ async def stream_chat_completion_sdk(
resume_file = write_transcript_to_tempfile(
compacted, session_id, sdk_cwd
)
if not resume_file:
use_resume = False
use_resume = bool(resume_file)
transcript_msg_count = 0
else:
# Compaction failed or didn't reduce — drop transcript
logger.warning(
"%s Compaction failed, dropping transcript",
log_prefix,
)
transcript_builder = TranscriptBuilder()
use_resume = False
resume_file = None
transcript_content = ""
transcript_msg_count = 0
transcript_builder = TranscriptBuilder()
else:
# No transcript to compact — use DB fallback
logger.warning(
"%s No transcript to compact, using DB fallback",
log_prefix,
)
transcript_builder = TranscriptBuilder()
use_resume = False
resume_file = None
transcript_msg_count = 0
transcript_builder = TranscriptBuilder()
# Build SDK options for this attempt
attempt_kwargs = dict(sdk_options_kwargs)
if use_resume and resume_file:
attempt_kwargs["resume"] = resume_file
options = ClaudeAgentOptions(**attempt_kwargs) # type: ignore[arg-type]
# Rebuild SDK options with updated resume state
sdk_options_kwargs_retry = dict(sdk_options_kwargs)
if use_resume and resume_file:
sdk_options_kwargs_retry["resume"] = resume_file
elif "resume" in sdk_options_kwargs_retry:
del sdk_options_kwargs_retry["resume"]
options = ClaudeAgentOptions(**sdk_options_kwargs_retry) # type: ignore[arg-type]
adapter = SDKResponseAdapter(message_id=message_id, session_id=session_id)
# Reset per-attempt state
_prompt_too_long = False
stream_completed = False
ended_with_stream_error = False
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False
async with ClaudeSDKClient(options=options) as client:
# Rebuild query with updated resume/transcript state
query_message, was_compacted = await _build_query_message(
current_message,
session,
@@ -1029,6 +1032,11 @@ async def stream_chat_completion_sdk(
if attachments.hint:
query_message = f"{query_message}\n\n{attachments.hint}"
adapter = SDKResponseAdapter(
message_id=message_id, session_id=session_id
)
async with ClaudeSDKClient(options=options) as client:
logger.info(
"%s Sending query — resume=%s, total_msgs=%d, "
"query_len=%d, attached_files=%d, image_blocks=%d",
@@ -1115,15 +1123,12 @@ async def stream_chat_completion_sdk(
)
break
except Exception as stream_err:
# SDK sends {"type": "error"} which raises
# Exception in receive_response() — capture it
# so the session can still be saved and the
# frontend gets a clean finish.
if _is_prompt_too_long(stream_err):
logger.warning(
"%s Prompt too long on attempt %d: %s",
"%s Prompt too long (attempt %d/%d): %s",
log_prefix,
_query_attempt + 1,
_MAX_QUERY_ATTEMPTS,
stream_err,
)
_prompt_too_long = True
@@ -1135,6 +1140,7 @@ async def stream_chat_completion_sdk(
exc_info=True,
)
ended_with_stream_error = True
yield StreamError(
errorText=f"SDK stream error: {stream_err}",
code="sdk_stream_error",
@@ -1448,19 +1454,21 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# --- End of async with ClaudeSDKClient ---
if not _prompt_too_long:
break
if not _prompt_too_long:
break # Success or non-retriable error — exit retry loop
# All retry attempts exhausted with prompt-too-long
if _prompt_too_long:
transcript_caused_error = True
ended_with_stream_error = True
yield StreamError(
errorText="Prompt too long after compaction and fallback attempts",
code="prompt_too_long",
)
if _prompt_too_long:
transcript_caused_error = True
logger.error(
"%s All %d query attempts exhausted prompt too long",
log_prefix,
_MAX_QUERY_ATTEMPTS,
)
yield StreamError(
errorText="The conversation is too long for the model. "
"Please start a new session.",
code="prompt_too_long",
)
# Transcript upload is handled exclusively in the finally block
# to avoid double-uploads (the success path used to upload the
@@ -1562,9 +1570,6 @@ async def stream_chat_completion_sdk(
# TranscriptBuilder is the single source of truth. It mirrors the
# CLI's active context: on compaction, replace_entries() syncs it
# with the compacted session file. No CLI file read needed here.
# Skip upload if the transcript caused a prompt-too-long error —
# re-uploading a problematic transcript would cause the next turn
# to fail identically.
if transcript_caused_error:
logger.warning(
"%s Skipping transcript upload — transcript caused "
@@ -1573,14 +1578,14 @@ async def stream_chat_completion_sdk(
)
elif config.claude_agent_use_resume and user_id and session is not None:
try:
transcript_content = transcript_builder.to_jsonl()
transcript_upload_content = transcript_builder.to_jsonl()
entry_count = transcript_builder.entry_count
if not transcript_content:
if not transcript_upload_content:
logger.warning(
"%s No transcript to upload (builder empty)", log_prefix
)
elif not validate_transcript(transcript_content):
elif not validate_transcript(transcript_upload_content):
logger.warning(
"%s Transcript invalid, skipping upload (entries=%d)",
log_prefix,
@@ -1591,13 +1596,13 @@ async def stream_chat_completion_sdk(
"%s Uploading transcript (entries=%d, bytes=%d)",
log_prefix,
entry_count,
len(transcript_content),
len(transcript_upload_content),
)
await asyncio.shield(
upload_transcript(
user_id=user_id,
session_id=session_id,
content=transcript_content,
content=transcript_upload_content,
message_count=len(session.messages),
log_prefix=log_prefix,
)

View File

@@ -555,8 +555,34 @@ async def download_transcript(
)
async def delete_transcript(user_id: str, session_id: str) -> None:
"""Delete transcript and its metadata from bucket storage.
Removes both the ``.jsonl`` transcript and the companion ``.meta.json``
so stale ``message_count`` watermarks cannot corrupt gap-fill logic.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
try:
await storage.delete(path)
logger.info("[Transcript] Deleted transcript for session %s", session_id)
except Exception as e:
logger.warning("[Transcript] Failed to delete transcript: %s", e)
# Also delete the companion .meta.json to avoid orphaned metadata.
try:
meta_path = _build_meta_storage_path(user_id, session_id, storage)
await storage.delete(meta_path)
logger.info("[Transcript] Deleted metadata for session %s", session_id)
except Exception as e:
logger.warning("[Transcript] Failed to delete metadata: %s", e)
# ---------------------------------------------------------------------------
# Transcript compaction
# Transcript compaction — LLM summarization for prompt-too-long recovery
# ---------------------------------------------------------------------------
# JSONL protocol values used in transcript serialization.
@@ -672,9 +698,9 @@ def _messages_to_transcript(messages: list[dict]) -> str:
async def _run_compression(
messages: list[dict],
model: str,
cfg: "ChatConfig",
cfg: ChatConfig,
log_prefix: str,
) -> "CompressResult":
) -> CompressResult:
"""Run LLM-based compression with truncation fallback."""
import openai

View File

@@ -1,12 +1,16 @@
"""Unit tests for JSONL transcript management utilities."""
import os
from unittest.mock import AsyncMock, patch
import pytest
from backend.util import json
from .transcript import (
STRIPPABLE_TYPES,
_cli_project_dir,
delete_transcript,
read_cli_session_file,
read_compacted_entries,
strip_progress_entries,
@@ -367,6 +371,63 @@ class TestCliProjectDir:
assert result is None
# --- delete_transcript ---
class TestDeleteTranscript:
@pytest.mark.asyncio
async def test_deletes_both_jsonl_and_meta(self):
"""delete_transcript removes both the .jsonl and .meta.json files."""
mock_storage = AsyncMock()
mock_storage.delete = AsyncMock()
with patch(
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
await delete_transcript("user-123", "session-456")
assert mock_storage.delete.call_count == 2
paths = [call.args[0] for call in mock_storage.delete.call_args_list]
assert any(p.endswith(".jsonl") for p in paths)
assert any(p.endswith(".meta.json") for p in paths)
@pytest.mark.asyncio
async def test_continues_on_jsonl_delete_failure(self):
"""If .jsonl delete fails, .meta.json delete is still attempted."""
mock_storage = AsyncMock()
mock_storage.delete = AsyncMock(
side_effect=[Exception("jsonl delete failed"), None]
)
with patch(
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
# Should not raise
await delete_transcript("user-123", "session-456")
assert mock_storage.delete.call_count == 2
@pytest.mark.asyncio
async def test_handles_meta_delete_failure(self):
"""If .meta.json delete fails, no exception propagates."""
mock_storage = AsyncMock()
mock_storage.delete = AsyncMock(
side_effect=[None, Exception("meta delete failed")]
)
with patch(
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
# Should not raise
await delete_transcript("user-123", "session-456")
# --- read_compacted_entries ---