fix: add truncation timeout, closure contract, shared test fixtures

- Document outer-scope variable contract in _run_stream_attempt closure
- Add 30s timeout to truncation fallback in _run_compression
- Fix _truncate_middle_tokens edge case for max_tok < 3
- Pass model parameter to compact_transcript to avoid ChatConfig() per call
- Extract _build_transcript to shared conftest.py fixture
- Fix scenario numbering gap in retry_scenarios_test.py
- Add comment explaining E2B sandbox timeout increase
This commit is contained in:
Zamil Majdy
2026-03-14 22:28:41 +07:00
parent 9642332332
commit fc844fde1f
8 changed files with 90 additions and 105 deletions

View File

@@ -115,7 +115,7 @@ class ChatConfig(BaseSettings):
description="E2B sandbox template to use for copilot sessions.",
)
e2b_sandbox_timeout: int = Field(
default=900, # 15 min safety net — explicit per-turn pause is the primary mechanism
default=900, # 15 min safety net — raised from 5 min to accommodate compaction retries
description="E2B sandbox running-time timeout (seconds). "
"E2B timeout is wall-clock (not idle). Explicit per-turn pause is the primary "
"mechanism; this is the safety net.",

View File

@@ -0,0 +1,49 @@
"""Shared test fixtures for copilot SDK tests."""
from __future__ import annotations
from uuid import uuid4
import pytest
from backend.util import json
def build_test_transcript(pairs: list[tuple[str, str]]) -> str:
"""Build a minimal valid JSONL transcript from (role, content) pairs.
Use this helper in any copilot SDK test that needs a well-formed
transcript without hitting the real storage layer.
"""
lines: list[str] = []
last_uuid: str | None = None
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uid[:8]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"
@pytest.fixture()
def transcript_factory():
"""Pytest fixture that returns the ``build_test_transcript`` helper."""
return build_test_transcript

View File

@@ -9,6 +9,7 @@ import pytest
from backend.util import json
from .conftest import build_test_transcript as _build_transcript
from .transcript import (
_flatten_assistant_content,
_flatten_tool_result_content,
@@ -362,36 +363,6 @@ class TestMessagesToTranscript:
# ---------------------------------------------------------------------------
def _build_transcript(pairs: list[tuple[str, str]]) -> str:
"""Build a minimal valid JSONL transcript from (role, content) pairs."""
lines: list[str] = []
last_uuid = None
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uid[:8]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"
class TestCompactTranscript:
@pytest.mark.asyncio
async def test_too_few_messages_returns_none(self):

View File

@@ -23,12 +23,12 @@ Scenario matrix:
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import pytest
from backend.util import json
from .conftest import build_test_transcript as _build_transcript
from .service import _MAX_STREAM_ATTEMPTS
from .transcript import (
_flatten_assistant_content,
@@ -45,36 +45,6 @@ from .transcript_builder import TranscriptBuilder
# ---------------------------------------------------------------------------
def _build_transcript(pairs: list[tuple[str, str]]) -> str:
"""Build a minimal valid JSONL transcript from (role, content) pairs."""
lines: list[str] = []
last_uuid = None
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uid[:8]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"
def _mock_compress_result(
was_compacted: bool,
messages: list[dict] | None = None,
@@ -332,7 +302,7 @@ class TestScenarioAllAttemptsExhausted:
# ---------------------------------------------------------------------------
# Scenario 8: Compaction returns identical content
# Scenario 7: Compaction returns identical content
# ---------------------------------------------------------------------------
@@ -382,7 +352,7 @@ class TestScenarioCompactionIdentical:
# ---------------------------------------------------------------------------
# Scenario 9: transcript_caused_error → finally skips upload
# Scenario 8: transcript_caused_error → finally skips upload
# ---------------------------------------------------------------------------

View File

@@ -151,7 +151,9 @@ async def _reduce_context(
"""
# First retry: try compacting
if transcript_content and not tried_compaction:
compacted = await compact_transcript(transcript_content, log_prefix=log_prefix)
compacted = await compact_transcript(
transcript_content, model=config.model, log_prefix=log_prefix
)
if (
compacted
and compacted != transcript_content
@@ -1099,6 +1101,16 @@ async def stream_chat_completion_sdk(
Yields stream events. On stream error the exception propagates
to the caller so the retry loop can rollback and retry.
Outer-scope variable contract (closure):
Reassigned between retries by the retry loop:
``options``, ``query_message``, ``was_compacted``,
``transcript_builder``, ``adapter``, ``use_resume``,
``resume_file``, ``transcript_msg_count``
Read-only (unchanged across retries):
``session``, ``session_id``, ``sdk_cwd``, ``log_prefix``,
``compaction``, ``attachments``, ``current_message``,
``file_ids``, ``lock``, ``message_id``, ``e2b_sandbox``
"""
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []

View File

@@ -9,10 +9,10 @@ from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, patch
from uuid import uuid4
import pytest
from .conftest import build_test_transcript as _build_transcript
from .service import (
ReducedContext,
_is_prompt_too_long,
@@ -101,37 +101,6 @@ class TestIsPromptTooLong:
# ---------------------------------------------------------------------------
def _build_transcript(pairs: list[tuple[str, str]]) -> str:
from backend.util import json
lines: list[str] = []
last_uuid = ""
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uuid4().hex[:24]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"
class TestReduceContext:
@pytest.mark.asyncio
async def test_first_retry_compaction_success(self) -> None:
@@ -267,7 +236,7 @@ class TestIterSdkMessages:
async def _slow_receive() -> AsyncGenerator[str]:
await asyncio.sleep(100) # never completes
yield "never" # noqa: RUF027
yield "never" # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _slow_receive
@@ -287,7 +256,7 @@ class TestIterSdkMessages:
async def _error_receive() -> AsyncGenerator[str]:
raise RuntimeError("SDK crash")
yield # noqa: RUF027 — make it an async generator
yield # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _error_receive

View File

@@ -716,6 +716,7 @@ def _messages_to_transcript(messages: list[dict]) -> str:
_COMPACTION_TIMEOUT_SECONDS = 60
_TRUNCATION_TIMEOUT_SECONDS = 30
async def _run_compression(
@@ -731,12 +732,17 @@ async def _run_compression(
summarization.
A 60-second timeout prevents a hung LLM call from blocking the
retry path indefinitely.
retry path indefinitely. The truncation fallback also has a
30-second timeout to guard against slow tokenization on very large
transcripts.
"""
client = get_openai_client()
if client is None:
logger.warning("%s No OpenAI client configured, using truncation", log_prefix)
return await compress_context(messages=messages, model=model, client=None)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
try:
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=client),
@@ -744,11 +750,16 @@ async def _run_compression(
)
except Exception as e:
logger.warning("%s LLM compaction failed, using truncation: %s", log_prefix, e)
return await compress_context(messages=messages, model=model, client=None)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
async def compact_transcript(
content: str,
*,
model: str = "",
log_prefix: str = "[Transcript]",
) -> str | None:
"""Compact an oversized JSONL transcript using LLM summarization.
@@ -771,13 +782,14 @@ async def compact_transcript(
Returns the compacted JSONL string, or ``None`` on failure.
"""
cfg = ChatConfig()
if not model:
model = ChatConfig().model
messages = _transcript_to_messages(content)
if len(messages) < 2:
logger.warning("%s Too few messages to compact (%d)", log_prefix, len(messages))
return None
try:
result = await _run_compression(messages, cfg.model, log_prefix)
result = await _run_compression(messages, model, log_prefix)
if not result.was_compacted:
# Compressor says it's within budget, but the SDK rejected it.
# Return None so the caller falls through to DB fallback.

View File

@@ -150,9 +150,11 @@ def _truncate_middle_tokens(text: str, enc, max_tok: int) -> str:
return text # nothing to do
# Need at least 3 tokens (head + ellipsis + tail) for meaningful truncation
if max_tok < 1:
return ""
mid = enc.encode("")
if max_tok < 3:
return enc.decode(mid)
return enc.decode(ids[:max_tok])
# Split the allowance between the two ends:
head = max_tok // 2 - 1 # -1 for the ellipsis