Compare commits

..

31 Commits

Author SHA1 Message Date
Zamil Majdy
0202c15460 test: add E2E screenshots for PR #12804 2026-04-16 14:45:00 +07:00
Zamil Majdy
33a7b83125 fix(backend/copilot): fix len(source) log metric, add retry comment, complete sdk re-exports
- Use len(source) instead of len(prior) in _build_query_message fallback
  warning so the logged count reflects the actual source being compressed
- Add comment explaining retry path intentionally omits prior_messages
  and falls back to full DB context (authoritative, overhead acceptable)
- Add missing cli_session_path, extract_context_messages, projects_base
  to sdk/transcript.py re-export for complete public API surface
2026-04-16 14:39:16 +07:00
Zamil Majdy
b05846d515 fix(backend/copilot): narrow broad except, fix len unit, add invariant comments
- _read_cli_session_from_disk: narrow `except Exception` to `except (OSError, ValueError)`
  to avoid silently masking unexpected programming errors in strip_for_upload
- _process_cli_restore: use `len(raw_str)` instead of `len(cli_restore.content)` in log
  so the reported size is always character count regardless of bytes|str input type
- detect_gap: add comment explaining that message_count is always written after a
  complete user→assistant exchange so the assistant-role invariant always holds
- _append_gap_to_builder: add pre-condition comment explaining why gap always starts
  at a turn boundary (detect_gap enforces session_messages[wm-1].role == 'assistant')
- _jsonl_covered: expand comment to explain why +2 undercount on tool-use turns is
  safe (gap-fill corrects it) and preferable to over-estimating (inflated-watermark bug)
2026-04-16 14:33:08 +07:00
Zamil Majdy
3fb63d7eb0 remove: test screenshots accidentally cherry-picked 2026-04-16 14:19:57 +07:00
Zamil Majdy
2f3003f059 fix(backend/copilot): make upload_transcript atomic — sequential writes with bidirectional rollback
Previously JSONL and meta were uploaded in parallel; if meta failed the JSONL
was left orphaned (no rollback), causing the next restore to use wrong
mode/message_count defaults.

Now writes are sequential (JSONL first, meta second):
- JSONL failure: returns early, meta never written → neither file exists
- Meta failure: deletes JSONL (rollback) → neither file exists
- Process crash between writes: orphaned JSONL with no meta → download falls
  back to mode="sdk" / message_count=0 defaults (safe for SDK content; a
  baseline JSONL would fail --resume gracefully and fall back to DB context)

Also logs mode in both upload and download info lines, and updates tests:
- test_skips_upload_on_storage_failure: asserts meta store never called on JSONL failure
- test_rolls_back_session_when_meta_upload_fails: replaces old meta-rollback test
2026-04-16 14:19:29 +07:00
Zamil Majdy
7aef023f28 fix(backend/copilot): encode content to bytes in cmd_load upload_transcript call
upload_transcript now requires bytes for the content param but cmd_load was
passing a str read from the transcript file. Encode to UTF-8 before the call.
2026-04-16 14:14:22 +07:00
Zamil Majdy
c263fbca5c docs(backend/copilot): document tool_calls flattening in extract_context_messages
Add a note to the extract_context_messages docstring explaining that assistant
messages derived from JSONL entries have tool_use blocks flattened to text
(same behaviour as the old _compress_session_messages path — no regression).
Gap messages from DB preserve their structured tool_calls field.
2026-04-16 14:10:15 +07:00
Zamil Majdy
0c3a15832b fix(backend/copilot): set transcript_content on baseline restore, fix relative import in transcript.py
- _restore_cli_session_for_turn now sets result.transcript_content when loading
  baseline content into the TranscriptBuilder, preventing the _seed_transcript
  guard in stream_chat_completion_sdk from overwriting the builder with a full
  DB reconstruction (which would duplicate entries since load_previous appends).
- Change transcript.py TYPE_CHECKING and runtime ChatMessage import from
  absolute (backend.copilot.model) to relative (.model) to match service.py's
  import style and eliminate Pyright type-identity collisions.
- Unpack _load_prior_transcript tuple return in mode_switch_context_test.py
  and assert dl is not None.
- Add assert result.transcript_content != "" in service_helpers_test.py.
2026-04-16 14:05:28 +07:00
Zamil Majdy
d91cfb5d84 Merge branch 'master' into fix/copilot-single-session-store 2026-04-16 13:52:50 +07:00
Zamil Majdy
dfa07d88b8 refactor(backend/copilot): unified transcript context — extract_context_messages
Introduces extract_context_messages() as a shared primitive used by both the
SDK (use_resume=False fallback) and baseline (openai_messages array). Both
modes now read the GCS transcript content + gap from DB instead of doing a
full session history scan on every turn.

**SDK path (mode="baseline" or missing transcript):**
Previously _restore_cli_session_for_turn discarded baseline transcripts and
fell through to _session_messages_to_transcript — a full DB reconstruction
that ignored the compaction summaries stored in the baseline JSONL.

Now: saves the baseline TranscriptDownload in result.baseline_download, calls
extract_context_messages to get transcript content + gap as list[ChatMessage],
stores in result.context_messages. _build_query_message receives prior_messages
and uses it instead of session.messages[:-1] when building <conversation_history>.

**Baseline path:**
Previously _compress_session_messages(session.messages) re-read all DB messages
every turn. Now _load_prior_transcript returns (bool, TranscriptDownload | None)
so the download is available to the LLM call; extract_context_messages builds
prior context from transcript + gap, appending the current user turn before
passing to _compress_session_messages.

**Shared primitive — extract_context_messages:**
- TranscriptBuilder.load_previous preserves isCompactSummary=True entries,
  so the GCS JSONL mirrors the CLI's compacted context (not raw messages).
- Gap is always small in normal operation; bounded by turns since last write.
- Falls back to session_messages[:-1] when no transcript exists (first turn).
- TranscriptDownload.content widened to bytes | str for pre-decoded callers.

**Watermark fix tests:**
The inflated-watermark bug fix (transcript_msg_count + 2 when use_resume=True)
was already in service.py; added 4 unit tests covering: gap-fill triggers with
corrected watermark, no false-positive when current, fresh-session fallback,
old-format meta fallback.
2026-04-16 13:50:40 +07:00
Zamil Majdy
c305ce5bac fix(backend/copilot): use JSONL coverage count as transcript watermark
The meta.json message_count was set to len(session.messages) (current DB
count). When prior turns' GCS uploads failed silently, the JSONL was stale
(e.g. only T1-T12) but the watermark appeared current (e.g. 46). The next
turn's gap-fill check (transcript_msg_count < msg_count-1) never triggered,
so the model silently lost the skipped turns.

Fix: set message_count = transcript_msg_count + 2 (previous JSONL coverage
+ current user+asst pair) when use_resume=True and transcript_msg_count > 0.
This ensures the watermark reflects the actual JSONL content. Stale uploads
now produce a low watermark, triggering gap-fill on the next turn to inject
the missing context.

Adds unit tests verifying gap-fill triggers with the corrected watermark and
documenting the original inflated-watermark suppression behavior.
2026-04-16 12:52:40 +07:00
Zamil Majdy
c3aaa1d48e remove useless env 2026-04-16 12:15:33 +07:00
Zamil Majdy
9415166ee0 fix(backend/copilot): split broad except in _read_cli_session_from_disk, clean up dataclass field comment
- Split `except Exception` into `UnicodeDecodeError` (returns raw) + nested
  `OSError` on write-back (returns stripped for GCS despite local failure) +
  fallback `Exception` — eliminates path leak via OSError str() and clarifies
  each fallback path's intent
- Move write-back into its own nested try/except so pyright can verify
  `stripped_bytes` is always bound before use
- Simplify `TranscriptDownload.mode` default: use conventional comment on
  separate line instead of parenthesized expression
- Add `TestReadCliSessionFromDisk` tests covering both new exception branches
2026-04-16 06:58:04 +07:00
Zamil Majdy
cbf71fddb2 fix(backend/copilot): skip --resume for DB-reconstructed transcripts
When the SDK encounters a baseline-written GCS transcript and correctly
discards it (mode != 'sdk'), it falls back to rebuilding context from
DB session messages via _session_messages_to_transcript. The previous
code then wrote this reconstructed transcript to disk and set
use_resume=True, but the Claude CLI rejected it with exit code 1 because
the TranscriptBuilder format uses synthetic IDs (msg_sdk_...) and lacks
required fields (sessionId, cwd, version) that the CLI needs for --resume.

Fix: reconstruction loads context into transcript_builder for state
tracking and uploads, but never writes to disk or sets use_resume=True.
Context is injected via the use_resume=False path in _build_query_message.

Add test assertion: result.use_resume is False after baseline reconstruction.
2026-04-16 06:42:53 +07:00
Zamil Majdy
0732fb695a fix(backend/copilot): update stale upload_cli_session/restore_cli_session comments to new API names 2026-04-16 05:47:20 +07:00
Zamil Majdy
2c7ba36804 fix(backend/copilot): use e.strerror in _read_cli_session_from_disk OSError log to avoid path disclosure 2026-04-16 05:42:34 +07:00
Zamil Majdy
e11e3841b4 fix(backend/copilot): sanitize OSError path in write log, patch config in mode-check tests
- _write_cli_session_to_disk: log basename + e.strerror instead of raw OSError
  to avoid exposing host directory paths in warning logs
- TestRestoreCliSessionModeCheck: patch config.claude_agent_use_resume=True so
  tests are not silently skipped when CLAUDE_AGENT_USE_RESUME=false in env
- baseline/transcript_integration_test.py: fix stale docstring (restore_cli_session
  → download_transcript)
2026-04-16 05:31:40 +07:00
Zamil Majdy
5cdc7d1e80 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fix/copilot-single-session-store 2026-04-16 05:24:59 +07:00
Zamil Majdy
f6a2a118a6 fix(backend/copilot): fix import sort order and black formatting in test files 2026-04-16 04:31:15 +07:00
Zamil Majdy
d30ff9e73f test(backend/copilot): add coverage for mode field, gap-fill, and SDK mode skip
- transcript_test: cover mode='baseline' round-trip in upload/download,
  invalid mode fallback, UTF-8 decode error in meta, meta fetch exception
- transcript_integration_test: TestAppendGapToBuilder covering user/assistant/
  tool messages, tool_calls, empty fallback, missing function key
- service_helpers_test: TestRestoreCliSessionModeCheck verifying baseline-mode
  transcripts are discarded and DB reconstruction runs instead
2026-04-16 04:09:22 +07:00
Zamil Majdy
2e92efa29d test(backend/copilot): add TestDetectGap unit tests for detect_gap boundary cases
Cover all boundary conditions directly: zero watermark, watermark at prefix,
watermark exceeds session, misaligned watermark (user at wm-1 position),
gap fill returning correct slice, current-turn exclusion, and single-gap-message.
2026-04-16 03:45:33 +07:00
Zamil Majdy
7737b7c21f fix(backend/copilot): fallback empty text block for empty-content gap assistant messages; fix stale test docstring
- _append_gap_to_builder: add fallback {"type":"text","text":""} block so
  assistant messages with neither content nor tool_calls still produce an
  entry in the builder (preserves entry-count == gap-count invariant)
- mode_switch_context_test.py: update module docstring to reference the new
  upload_transcript / download_transcript API (was upload_cli_session / restore_cli_session)
2026-04-16 03:41:34 +07:00
Zamil Majdy
f3bf44ce25 refactor(backend/copilot): extract _restore_cli_session_for_turn to fix Pyright complexity error
Extract the 125-line CLI session restore block from stream_chat_completion_sdk
into a dedicated _restore_cli_session_for_turn helper (returns _RestoreResult
dataclass) to reduce the function's code path complexity below Pyright's limit.
2026-04-16 03:37:59 +07:00
Zamil Majdy
5b487829f7 fix(backend/copilot): address review cycle 2 — write stripped bytes, harden meta parsing, rollback on partial upload
- _process_cli_restore: write stripped_bytes (not raw cli_restore.content) to disk so
  the CLI --resumes from the clean version without bloated progress/thinking entries
- transcript.upload_transcript: use BaseException (not Exception) for gather result
  checks; add meta rollback when session upload fails to prevent stale watermark
- transcript.download_transcript: harden .meta.json parsing — guard UnicodeDecodeError
  and non-dict JSON; validate message_count type/range
- sdk/service.py: log OSError instead of silently passing on stale-file unlink;
  sanitize session filename in FileNotFoundError/OSError log messages
- sdk/service.py: write reconstructed transcript to disk for --resume when GCS
  session is absent, seeding the native session for the current turn
- service_test.py: wait for message_count > 0 (watermark) not just non-None bytes
- Add TestProcessCliRestore unit tests verifying stripped bytes written (not raw)
- Add test_rolls_back_meta_when_session_upload_fails for rollback behavior
2026-04-16 03:26:48 +07:00
Zamil Majdy
9118d61a76 fix(backend/copilot): use backend.util.json in _append_gap_to_builder, drop inline import 2026-04-16 03:20:03 +07:00
Zamil Majdy
d6d4fd5cba refactor(backend/copilot): unify transcript API — TranscriptDownload, TranscriptMode, detect_gap, baseline gap-fill
- Rename CliSessionRestore → TranscriptDownload; add mode: TranscriptMode field
- Add TranscriptMode = Literal["sdk", "baseline"] — persisted in .meta.json
- Rename upload_cli_session → upload_transcript (mode param)
- Rename restore_cli_session → download_transcript (reads mode from meta)
- Add detect_gap(download, session_messages) shared helper
- SDK: skip --resume when transcript mode != "sdk" (baseline-written JSONL)
- Baseline: fill gap via _append_gap_to_builder instead of discarding stale transcript
- Remove all backward-compat aliases; update all test files
2026-04-16 03:11:24 +07:00
Zamil Majdy
95a90b92df chore: merge dev into fix/copilot-single-session-store
Resolve conflicts by keeping pure-GCS upload_cli_session API.
Move stripping logic into _read_cli_session_from_disk in sdk/service.py
so same-pod turns also benefit from stripped sessions, matching the
behavior added in df205b5444 (strip CLI session to prevent auto-compaction).
2026-04-16 02:21:47 +07:00
Zamil Majdy
3e137eb91b refactor(backend/copilot): pure-GCS restore/upload, disk I/O moves to callers
restore_cli_session and upload_cli_session are now pure GCS operations.
Removed sdk_cwd parameter from both; callers own all disk I/O.

- sdk/service.py: _write_cli_session_to_disk / _read_cli_session_from_disk
  helpers handle path-traversal guard + read/write at call sites
- baseline/service.py: restore → decode → validate in-memory, upload encoded
  bytes directly; no disk access
- transcript.py: removed TranscriptDownload, TRANSCRIPT_STORAGE_PREFIX,
  _storage_path_parts, _meta_storage_path_parts, upload_transcript,
  download_transcript; renamed _projects_base → projects_base,
  _cli_session_path → cli_session_path (public exports)
- delete_transcript now deletes only the CLI session (jsonl + meta.json),
  2 deletes instead of 3
- All tests updated to match new signatures; 1416 tests pass
2026-04-16 02:16:22 +07:00
Zamil Majdy
6023d3ea91 fix(backend/copilot): use explicit side_effect list in download exception test
Make two-call contract of asyncio.gather explicit: RuntimeError for session
retrieve and FileNotFoundError for meta retrieve, matching the pattern
already applied to test_returns_none_when_file_not_found_in_storage.
2026-04-16 01:49:44 +07:00
Zamil Majdy
2ec20e76bd fix(backend/copilot): address review comments cycle 1
- Replace redundant elif guard with plain else in service.py restore path
- Use isinstance(…, Exception) instead of BaseException in gather error
  checks for upload_cli_session (BaseException swallows KeyboardInterrupt)
- Use explicit list side_effect in test_returns_none_when_file_not_found
  to document the two-call contract of the concurrent retrieve gather
2026-04-16 01:47:28 +07:00
Zamil Majdy
af8a86e6b6 refactor(backend/copilot): consolidate session storage to single GCS location
Before this change the SDK turn cycle made two separate GCS downloads and two
uploads per turn: chat-transcripts/ (our stripped JSONL + message_count meta)
and cli-sessions/ (raw CLI session for --resume).  The chat-transcripts/ path
was introduced before --resume existed; cli-sessions/ was added in PR #12777
to enable cross-pod resume, but chat-transcripts/ was never removed.

This refactoring eliminates chat-transcripts/ from the SDK path entirely:

- message_count watermark moves to a companion cli-sessions/.meta.json,
  uploaded and downloaded in the same asyncio.gather as the session file —
  no window for divergence between them.
- TranscriptBuilder is now seeded from the restored CLI session content
  (strip_for_upload applied in-memory), replacing the separate transcript
  download.
- restore_cli_session returns CliSessionRestore | None (content + message_count)
  instead of bool, combining the two previous download operations into one.
- upload_cli_session accepts message_count and writes the companion meta.
- The same-pod early-return optimisation is removed (cross-pod fix): the
  local file may be stale from an older turn that ran on this pod while a
  newer turn ran on a different pod and uploaded to GCS.

upload_transcript / download_transcript are kept for the baseline service
which has its own separate context management path.
2026-04-16 01:41:36 +07:00
12 changed files with 137 additions and 492 deletions

View File

@@ -703,14 +703,17 @@ async def _compress_session_messages(
return messages
def should_upload_transcript(user_id: str | None, upload_safe: bool) -> bool:
def should_upload_transcript(
user_id: str | None, transcript_covers_prefix: bool
) -> bool:
"""Return ``True`` when the caller should upload the final transcript.
Uploads require a logged-in user (for the storage key) *and* a safe
upload signal from ``_load_prior_transcript`` — i.e. GCS does not hold a
newer version that we'd be overwriting.
Uploads require a logged-in user (for the storage key) *and* a
transcript that covered the session prefix when loaded — otherwise
we'd be overwriting a more complete version in storage with a
partial one built from just the current turn.
"""
return bool(user_id) and upload_safe
return bool(user_id) and transcript_covers_prefix
def _append_gap_to_builder(
@@ -752,19 +755,11 @@ def _append_gap_to_builder(
# so the builder's entry count matches the gap length.
content_blocks.append({"type": "text", "text": ""})
builder.append_assistant(content_blocks=content_blocks)
elif msg.role == "tool":
if msg.tool_call_id:
builder.append_tool_result(
tool_use_id=msg.tool_call_id,
content=msg.content or "",
)
else:
# Malformed tool message — no tool_call_id to link to an
# assistant tool_use block. Skip to avoid an unmatched
# tool_result entry in the builder (which would confuse --resume).
logger.warning(
"[Baseline] Skipping tool gap message with no tool_call_id"
)
elif msg.role == "tool" and msg.tool_call_id:
builder.append_tool_result(
tool_use_id=msg.tool_call_id,
content=msg.content or "",
)
async def _load_prior_transcript(
@@ -775,11 +770,11 @@ async def _load_prior_transcript(
) -> tuple[bool, "TranscriptDownload | None"]:
"""Download and load the prior CLI session into ``transcript_builder``.
Returns a tuple of (upload_safe, transcript_download):
- ``upload_safe`` is ``True`` when it is safe to upload at the end of this
turn. Upload is suppressed only for **download errors** (unknown GCS
state) — missing and invalid files return ``True`` because there is
nothing in GCS worth protecting against overwriting.
Returns a tuple of (covers_prefix, transcript_download):
- ``covers_prefix`` is ``True`` when the loaded session fully covers the
session prefix; ``False`` otherwise (missing, invalid, or download error).
Callers should suppress uploads when this is ``False`` to avoid overwriting
a more complete version in storage.
- ``transcript_download`` is a ``TranscriptDownload`` with str content
(pre-decoded and stripped) when available, or ``None`` when no valid
transcript could be loaded. Callers pass this to
@@ -791,14 +786,11 @@ async def _load_prior_transcript(
)
except Exception as e:
logger.warning("[Baseline] Session restore failed: %s", e)
# Unknown GCS state — be conservative, skip upload.
return False, None
if restore is None:
logger.debug("[Baseline] No CLI session available — will upload fresh")
# Nothing in GCS to protect; allow upload so the first baseline turn
# writes the initial transcript snapshot.
return True, None
logger.debug("[Baseline] No CLI session available")
return False, None
content_bytes = restore.content
try:
@@ -809,14 +801,12 @@ async def _load_prior_transcript(
)
except UnicodeDecodeError:
logger.warning("[Baseline] CLI session content is not valid UTF-8")
# Corrupt file in GCS; overwriting with a valid one is better.
return True, None
return False, None
stripped = strip_for_upload(raw_str)
if not validate_transcript(stripped):
logger.warning("[Baseline] CLI session content invalid after strip")
# Corrupt file in GCS; overwriting with a valid one is better.
return True, None
return False, None
transcript_builder.load_previous(stripped, log_prefix="[Baseline]")
logger.info(
@@ -967,7 +957,7 @@ async def stream_chat_completion_baseline(
# --- Transcript support (feature parity with SDK path) ---
transcript_builder = TranscriptBuilder()
transcript_upload_safe = True
transcript_covers_prefix = True
# Build system prompt only on the first turn to avoid mid-conversation
# changes from concurrent chats updating business understanding.
@@ -987,7 +977,7 @@ async def stream_chat_completion_baseline(
transcript_download: TranscriptDownload | None = None
if user_id and len(session.messages) > 1:
(
(transcript_upload_safe, transcript_download),
(transcript_covers_prefix, transcript_download),
(base_system_prompt, understanding),
) = await asyncio.gather(
_load_prior_transcript(
@@ -1039,7 +1029,7 @@ async def stream_chat_completion_baseline(
# See extract_context_messages() in transcript.py for the shared primitive.
prior_context = extract_context_messages(transcript_download, session.messages)
messages_for_context = await _compress_session_messages(
prior_context + ([session.messages[-1]] if session.messages else []),
prior_context + [session.messages[-1]],
model=active_model,
)
@@ -1384,7 +1374,7 @@ async def stream_chat_completion_baseline(
stop_reason=STOP_REASON_END_TURN,
)
if user_id and should_upload_transcript(user_id, transcript_upload_safe):
if user_id and should_upload_transcript(user_id, transcript_covers_prefix):
await _upload_final_transcript(
user_id=user_id,
session_id=session_id,

View File

@@ -137,27 +137,25 @@ class TestLoadPriorTranscript:
assert builder.entry_count == 4
@pytest.mark.asyncio
async def test_missing_transcript_allows_upload(self):
"""Nothing in GCS → upload is safe; the turn writes the first snapshot."""
async def test_missing_transcript_returns_false(self):
builder = TranscriptBuilder()
with patch(
"backend.copilot.baseline.service.download_transcript",
new=AsyncMock(return_value=None),
):
upload_safe, dl = await _load_prior_transcript(
covers, dl = await _load_prior_transcript(
user_id="user-1",
session_id="session-1",
session_messages=_make_session_messages("user", "assistant"),
transcript_builder=builder,
)
assert upload_safe is True
assert covers is False
assert dl is None
assert builder.is_empty
@pytest.mark.asyncio
async def test_invalid_transcript_allows_upload(self):
"""Corrupt file in GCS → overwriting with a valid one is better."""
async def test_invalid_transcript_returns_false(self):
builder = TranscriptBuilder()
restore = TranscriptDownload(
content=b'{"type":"progress","uuid":"a"}\n',
@@ -168,14 +166,14 @@ class TestLoadPriorTranscript:
"backend.copilot.baseline.service.download_transcript",
new=AsyncMock(return_value=restore),
):
upload_safe, dl = await _load_prior_transcript(
covers, dl = await _load_prior_transcript(
user_id="user-1",
session_id="session-1",
session_messages=_make_session_messages("user", "assistant"),
transcript_builder=builder,
)
assert upload_safe is True
assert covers is False
assert dl is None
assert builder.is_empty
@@ -559,7 +557,10 @@ class TestTranscriptLifecycle:
# --- 3. Gate + upload ---
assert (
should_upload_transcript(user_id="user-1", upload_safe=covers) is True
should_upload_transcript(
user_id="user-1", transcript_covers_prefix=covers
)
is True
)
await _upload_final_transcript(
user_id="user-1",
@@ -623,11 +624,15 @@ class TestTranscriptLifecycle:
stop_reason=STOP_REASON_END_TURN,
)
assert should_upload_transcript(user_id=None, upload_safe=True) is False
assert (
should_upload_transcript(user_id=None, transcript_covers_prefix=True)
is False
)
@pytest.mark.asyncio
async def test_lifecycle_missing_download_still_uploads_new_content(self):
"""No prior session → upload is safe; the turn writes the first snapshot."""
"""No prior session → covers defaults to True in the service,
new turn should upload cleanly."""
builder = TranscriptBuilder()
upload_mock = AsyncMock(return_value=None)
with (
@@ -640,20 +645,24 @@ class TestTranscriptLifecycle:
new=upload_mock,
),
):
upload_safe, dl = await _load_prior_transcript(
covers, dl = await _load_prior_transcript(
user_id="user-1",
session_id="session-1",
session_messages=_make_session_messages("user"),
transcript_builder=builder,
)
# Nothing in GCS → upload is safe so the first baseline turn
# can write the initial transcript snapshot.
assert upload_safe is True
# No restore: covers is False, so the production path would
# skip upload. This protects against overwriting a future
# more-complete session with a single-turn snapshot.
assert covers is False
assert dl is None
assert (
should_upload_transcript(user_id="user-1", upload_safe=upload_safe)
is True
should_upload_transcript(
user_id="user-1", transcript_covers_prefix=covers
)
is False
)
upload_mock.assert_not_awaited()
# ---------------------------------------------------------------------------

View File

@@ -1003,14 +1003,11 @@ def _process_cli_restore(
is_valid = validate_transcript(stripped)
# Use len(raw_str) rather than len(cli_restore.content) so the unit is always
# characters (raw_str is always str at this point regardless of input type).
# lines_stripped = original lines minus remaining lines after stripping.
_original_lines = len(raw_str.strip().split("\n")) if raw_str.strip() else 0
_remaining_lines = len(stripped.strip().split("\n")) if stripped.strip() else 0
logger.info(
"%s Restored CLI session: %dB raw, %d lines stripped, msg_count=%d, valid=%s",
log_prefix,
len(raw_str),
_original_lines - _remaining_lines,
len(stripped.strip().split("\n")) if stripped.strip() else 0,
cli_restore.message_count,
is_valid,
)
@@ -1101,9 +1098,8 @@ def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
result.append(block)
else:
logger.warning(
"[SDK] Unknown content block type: %s."
" This may indicate a new SDK version with additional block types.",
type(block).__name__,
f"[SDK] Unknown content block type: {type(block).__name__}. "
f"This may indicate a new SDK version with additional block types."
)
return result
@@ -1158,11 +1154,10 @@ async def _compress_messages(
if result.was_compacted:
logger.info(
"[SDK] Context compacted: %d -> %d tokens (%d summarized, %d dropped)",
result.original_token_count,
result.token_count,
result.messages_summarized,
result.messages_dropped,
f"[SDK] Context compacted: {result.original_token_count} -> "
f"{result.token_count} tokens "
f"({result.messages_summarized} summarized, "
f"{result.messages_dropped} dropped)"
)
# Convert compressed dicts back to ChatMessages
return [
@@ -1229,17 +1224,11 @@ def _session_messages_to_transcript(messages: list[ChatMessage]) -> str:
)
if blocks:
builder.append_assistant(blocks)
elif msg.role == "tool":
if msg.tool_call_id:
builder.append_tool_result(
tool_use_id=msg.tool_call_id,
content=msg.content or "",
)
else:
# Malformed tool message — no tool_call_id to link to an
# assistant tool_use block. Skip to avoid an unmatched
# tool_result entry in the builder (which would confuse --resume).
logger.warning("[SDK] Skipping tool gap message with no tool_call_id")
elif msg.role == "tool" and msg.tool_call_id:
builder.append_tool_result(
tool_use_id=msg.tool_call_id,
content=msg.content or "",
)
return builder.to_jsonl()
@@ -2566,10 +2555,7 @@ async def _restore_cli_session_for_turn(
if validate_transcript(stripped):
transcript_builder.load_previous(stripped, log_prefix=log_prefix)
result.transcript_content = stripped
except (UnicodeDecodeError, ValueError, OSError) as _load_err:
# UnicodeDecodeError: non-UTF-8 content; ValueError: malformed JSONL in
# strip_for_upload; OSError: encode/decode I/O failure. Unexpected
# exceptions propagate so programming errors are not silently masked.
except Exception as _load_err:
logger.debug(
"%s Could not load baseline transcript into builder: %s",
log_prefix,
@@ -3669,20 +3655,43 @@ async def stream_chat_completion_sdk(
sdk_cwd, session_id, log_prefix
)
if _cli_content:
# Watermark = number of DB messages this transcript covers.
# len(session.messages) is accurate: the CLI session file
# was just written after the turn completed, so it covers
# all messages through this turn. Any gap from a prior
# missed upload was already detected by detect_gap and
# injected as context, so the model has the full history.
#
# Previously this used _final_tmsg_count + 2, which
# under-counted for tool-use turns (delta = 2 + 2*N_tool_calls),
# causing persistent spurious gap-fills on every subsequent turn.
# That concern was addressed by the inflated-watermark fix
# (using the GCS watermark as the anchor for gap detection),
# which makes len(session.messages) safe to use here.
_jsonl_covered = len(session.messages)
# Compute JSONL coverage watermark.
# Using len(session.messages) (DB count) causes an
# "inflated watermark" bug: when prior turns fail to
# upload, the GCS JSONL is stale but its meta.json
# watermark matches the DB count, so the next turn's
# gap-fill check (transcript_msg_count < msg_count-1)
# never triggers and the model silently loses context.
# Fix: when we have a reliable watermark from the
# downloaded transcript (use_resume=True,
# transcript_msg_count>0), set watermark =
# previous_coverage + 2 (current user+asst pair).
# This accurately reflects what is actually in the
# JSONL so any stale gap is detected next turn.
# For all other cases (fresh session, restore failed,
# old-format meta with count=0, retry reset) fall back
# to len(session.messages) to preserve original
# behaviour and avoid triggering spurious gap-fills
# on sessions whose watermark we cannot determine.
_final_use_resume = (
state.use_resume if state is not None else use_resume
)
_final_tmsg_count = (
state.transcript_msg_count
if state is not None
else transcript_msg_count
)
if _final_use_resume and _final_tmsg_count > 0:
# +2 = current user turn + current assistant response.
# For tool-use turns with multiple assistant+tool pairs this
# under-counts (e.g. +4), but the resulting underestimate is
# safe: next turn's detect_gap returns the uncovered entries
# as a small gap that is filled from DB. Over-estimating
# (using len(session.messages)) would suppress gap detection
# when a prior upload was missed — the inflated-watermark bug.
_jsonl_covered = _final_tmsg_count + 2
else:
_jsonl_covered = len(session.messages)
await asyncio.shield(
upload_transcript(
user_id=user_id,

View File

@@ -880,116 +880,6 @@ class TestUploadCliSession:
assert meta_content["mode"] == "baseline"
assert meta_content["message_count"] == 4
def test_strips_session_before_upload_and_writes_back(self):
"""strip_for_upload removes progress entries and returns smaller content."""
import json
from .transcript import strip_for_upload
progress_entry = {
"type": "progress",
"uuid": "p1",
"parentUuid": "u1",
"data": {"type": "bash_progress", "stdout": "running..."},
}
user_entry = {
"type": "user",
"uuid": "u1",
"message": {"role": "user", "content": "hello"},
}
asst_entry = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "u1",
"message": {"role": "assistant", "content": "world"},
}
raw_content = (
json.dumps(progress_entry)
+ "\n"
+ json.dumps(user_entry)
+ "\n"
+ json.dumps(asst_entry)
+ "\n"
)
stripped = strip_for_upload(raw_content)
stored_lines = stripped.strip().split("\n")
stored_types = [json.loads(line).get("type") for line in stored_lines]
assert "progress" not in stored_types
assert "user" in stored_types
assert "assistant" in stored_types
assert len(stripped.encode()) < len(raw_content.encode())
def test_strips_stale_thinking_blocks_before_upload(self):
"""strip_for_upload removes thinking blocks from non-last assistant turns."""
import json
from .transcript import strip_for_upload
u1 = {
"type": "user",
"uuid": "u1",
"message": {"role": "user", "content": "q1"},
}
a1_with_thinking = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "u1",
"message": {
"id": "msg_a1",
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "A" * 5000},
{"type": "text", "text": "answer1"},
],
},
}
u2 = {
"type": "user",
"uuid": "u2",
"parentUuid": "a1",
"message": {"role": "user", "content": "q2"},
}
a2_no_thinking = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "u2",
"message": {
"id": "msg_a2",
"role": "assistant",
"content": [{"type": "text", "text": "answer2"}],
},
}
raw_content = (
json.dumps(u1)
+ "\n"
+ json.dumps(a1_with_thinking)
+ "\n"
+ json.dumps(u2)
+ "\n"
+ json.dumps(a2_no_thinking)
+ "\n"
)
stripped = strip_for_upload(raw_content)
stored_lines = stripped.strip().split("\n")
# a1 should have its thinking block stripped (it's not the last assistant turn).
a1_stored = json.loads(stored_lines[1])
a1_content = a1_stored["message"]["content"]
assert all(
b["type"] != "thinking" for b in a1_content
), "stale thinking block should be stripped from a1"
assert any(
b["type"] == "text" for b in a1_content
), "text block should be kept in a1"
# a2 (last turn) should be unchanged.
a2_stored = json.loads(stored_lines[3])
assert a2_stored["message"]["content"] == [{"type": "text", "text": "answer2"}]
class TestRestoreCliSession:
def test_returns_none_when_file_not_found_in_storage(self):

View File

@@ -110,7 +110,7 @@ export const Flow = () => {
event.preventDefault();
}}
maxZoom={2}
minZoom={0.05}
minZoom={0.1}
onDragOver={onDragOver}
onDrop={onDrop}
nodesDraggable={!isLocked}

View File

@@ -1,29 +1,5 @@
import { describe, expect, it } from "vitest";
import {
extractWorkspaceArtifacts,
filePartToArtifactRef,
isReasoningToolPart,
splitReasoningAndResponse,
} from "./helpers";
import type { MessagePart } from "./helpers";
function textPart(text: string): MessagePart {
return { type: "text", text } as MessagePart;
}
function toolPart(
toolName: string,
state: string = "output-available",
): MessagePart {
return {
type: `tool-${toolName}`,
state,
toolCallId: `call-${toolName}`,
toolName,
args: {},
output: "{}",
} as unknown as MessagePart;
}
import { extractWorkspaceArtifacts, filePartToArtifactRef } from "./helpers";
describe("extractWorkspaceArtifacts", () => {
it("extracts a single workspace:// link with its markdown title", () => {
@@ -125,130 +101,3 @@ describe("filePartToArtifactRef", () => {
expect(overridden?.origin).toBe("agent");
});
});
describe("isReasoningToolPart", () => {
it("returns true for reasoning/search tools", () => {
const reasoningTools = [
"find_block",
"find_agent",
"find_library_agent",
"search_docs",
"get_doc_page",
"search_feature_requests",
"ask_question",
];
for (const name of reasoningTools) {
expect(isReasoningToolPart(toolPart(name))).toBe(true);
}
});
it("returns false for action tools", () => {
const actionTools = [
"run_block",
"run_agent",
"create_agent",
"edit_agent",
"run_mcp_tool",
"schedule_agent",
"continue_run_block",
];
for (const name of actionTools) {
expect(isReasoningToolPart(toolPart(name))).toBe(false);
}
});
it("returns false for text parts", () => {
expect(isReasoningToolPart(textPart("hello"))).toBe(false);
});
});
describe("splitReasoningAndResponse", () => {
it("returns all parts as response when there are no tools", () => {
const parts = [textPart("Hello"), textPart("World")];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("splits on reasoning tools — text before goes to reasoning", () => {
const parts = [
textPart("Let me search..."),
toolPart("find_block"),
textPart("Here is your answer"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(2);
expect(result.response).toHaveLength(1);
expect((result.response[0] as { text: string }).text).toBe(
"Here is your answer",
);
});
it("does NOT split on action tools — response before run_block stays visible", () => {
const parts = [
textPart("Here is my answer"),
toolPart("run_block"),
textPart("Block finished"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("splits only on reasoning tools when both reasoning and action tools are present", () => {
const parts = [
textPart("Planning..."),
toolPart("search_docs"),
textPart("Found it. Running now."),
toolPart("run_block"),
textPart("Done!"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(2);
expect(result.response).toHaveLength(3);
expect((result.response[0] as { text: string }).text).toBe(
"Found it. Running now.",
);
});
it("returns all as response when reasoning tools have no text after them", () => {
const parts = [
textPart("Hello"),
toolPart("find_agent"),
toolPart("run_block"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("handles multiple reasoning tools correctly", () => {
const parts = [
textPart("Searching..."),
toolPart("find_block"),
textPart("Found one, searching more..."),
toolPart("search_docs"),
textPart("Here are the results"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(4);
expect(result.response).toHaveLength(1);
expect((result.response[0] as { text: string }).text).toBe(
"Here are the results",
);
});
it("handles action tool after response text without hiding the response", () => {
const parts = [
toolPart("find_block"),
textPart("I found it! Let me run it."),
toolPart("run_agent"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(1);
expect(result.response).toHaveLength(2);
expect((result.response[0] as { text: string }).text).toBe(
"I found it! Let me run it.",
);
});
});

View File

@@ -33,20 +33,6 @@ const CUSTOM_TOOL_TYPES = new Set([
"tool-create_feature_request",
]);
const REASONING_TOOL_TYPES = new Set([
"tool-find_block",
"tool-find_agent",
"tool-find_library_agent",
"tool-search_docs",
"tool-get_doc_page",
"tool-search_feature_requests",
"tool-ask_question",
]);
export function isReasoningToolPart(part: MessagePart): boolean {
return REASONING_TOOL_TYPES.has(part.type);
}
const WORKSPACE_FILE_PATTERN =
/\/api\/proxy\/api\/workspace\/files\/([a-f0-9-]+)\/download/;
const WORKSPACE_URI_PATTERN = /workspace:\/\/([a-f0-9-]+)(?:#([^\s)\]]+))?/g;
@@ -140,22 +126,22 @@ export function splitReasoningAndResponse(parts: MessagePart[]): {
reasoning: MessagePart[];
response: MessagePart[];
} {
const lastReasoningIndex = parts.findLastIndex((p) => isReasoningToolPart(p));
const lastToolIndex = parts.findLastIndex((p) => p.type.startsWith("tool-"));
if (lastReasoningIndex === -1) {
if (lastToolIndex === -1) {
return { reasoning: [], response: parts };
}
const hasResponseAfterReasoning = parts
.slice(lastReasoningIndex + 1)
const hasResponseAfterTools = parts
.slice(lastToolIndex + 1)
.some((p) => p.type === "text");
if (!hasResponseAfterReasoning) {
if (!hasResponseAfterTools) {
return { reasoning: [], response: parts };
}
const rawReasoning = parts.slice(0, lastReasoningIndex + 1);
const rawResponse = parts.slice(lastReasoningIndex + 1);
const rawReasoning = parts.slice(0, lastToolIndex + 1);
const rawResponse = parts.slice(lastToolIndex + 1);
const reasoning: MessagePart[] = [];
const pinnedParts: MessagePart[] = [];

View File

@@ -27,9 +27,6 @@ const RECONNECT_MAX_ATTEMPTS = 3;
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
/** Max time (ms) the UI can stay in "reconnecting" state before forcing idle. */
const RECONNECT_MAX_DURATION_MS = 30_000;
interface UseCopilotStreamArgs {
sessionId: string | null;
hydratedMessages: UIMessage[] | undefined;
@@ -125,21 +122,12 @@ export function useCopilotStream({
const [isSyncing, setIsSyncing] = useState(false);
// Tracks the last time the page was hidden — used to detect sleep/wake gaps.
const lastHiddenAtRef = useRef(Date.now());
// Monotonic counter that increments on each session switch — async callbacks
// from old sessions compare their captured epoch to detect staleness.
const sessionEpochRef = useRef(0);
// Timestamp when reconnection first started — used to force timeout.
const reconnectStartedAtRef = useRef<number | null>(null);
// Timer for the forced reconnect timeout.
const reconnectTimeoutTimerRef = useRef<ReturnType<typeof setTimeout>>();
function handleReconnect(sid: string) {
if (isReconnectScheduledRef.current || !sid) return;
const nextAttempt = reconnectAttemptsRef.current + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
setReconnectExhausted(true);
toast({
title: "Connection lost",
@@ -149,26 +137,6 @@ export function useCopilotStream({
return;
}
// Track when reconnection first started for the forced timeout.
if (reconnectStartedAtRef.current === null) {
reconnectStartedAtRef.current = Date.now();
// Schedule a forced timeout — if reconnecting takes longer than
// RECONNECT_MAX_DURATION_MS, force the UI back to idle.
clearTimeout(reconnectTimeoutTimerRef.current);
const capturedEpoch = sessionEpochRef.current;
reconnectTimeoutTimerRef.current = setTimeout(() => {
if (sessionEpochRef.current !== capturedEpoch) return;
setReconnectExhausted(true);
reconnectStartedAtRef.current = null;
toast({
title: "Connection timed out",
description:
"AutoPilot may still be working. Refresh to check for updates.",
variant: "destructive",
});
}, RECONNECT_MAX_DURATION_MS);
}
isReconnectScheduledRef.current = true;
setIsReconnectScheduled(true);
reconnectAttemptsRef.current = nextAttempt;
@@ -182,12 +150,8 @@ export function useCopilotStream({
}
const delay = RECONNECT_BASE_DELAY_MS * 2 ** (nextAttempt - 1);
const capturedEpoch = sessionEpochRef.current;
reconnectTimerRef.current = setTimeout(() => {
// Bail if the session switched while the timer was pending.
if (sessionEpochRef.current !== capturedEpoch) return;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
// Strip the stale in-progress assistant message before resuming —
@@ -220,11 +184,7 @@ export function useCopilotStream({
transport: transport ?? undefined,
onFinish: async ({ isDisconnect, isAbort }) => {
if (isAbort || !sessionId) return;
// User-initiated stops should not trigger reconnection.
if (isUserStoppingRef.current) return;
// The AI SDK rarely sets isDisconnect — treat ANY non-user-initiated
// finish as a potential disconnect when the backend stream is active.
if (isDisconnect) {
handleReconnect(sessionId);
return;
@@ -459,35 +419,20 @@ export function useCopilotStream({
};
}, [refetchSession, setMessages]);
// Hydrate messages from REST API when not actively streaming.
// Sets hydrateCompletedRef so the resume effect knows it's safe to proceed.
// Hydrate messages from REST API when not actively streaming
useEffect(() => {
if (!hydratedMessages) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (status === "streaming" || status === "submitted") return;
if (isReconnectScheduled) return;
if (hydratedMessages.length > 0) {
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
}
hydrateCompletedRef.current = true;
// Flush any resume that was waiting for hydration to finish.
if (pendingResumeRef.current) {
const pendingResume = pendingResumeRef.current;
pendingResumeRef.current = null;
pendingResume();
}
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
// Track resume state per session
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
// Coordination: hydration must complete before resume fires.
// Prevents duplicate messages / missing content from the two effects racing.
const hydrateCompletedRef = useRef(false);
const pendingResumeRef = useRef<(() => void) | null>(null);
// Clean up reconnect state on session switch.
// Abort the old stream's in-flight fetch and tell the backend to release
// its XREAD listeners immediately (fire-and-forget).
@@ -496,24 +441,21 @@ export function useCopilotStream({
const prevSid = prevStreamSessionRef.current;
prevStreamSessionRef.current = sessionId;
// Increment epoch so stale async callbacks from the old session bail out.
sessionEpochRef.current += 1;
const currentEpoch = sessionEpochRef.current;
const isSwitching = Boolean(prevSid && prevSid !== sessionId);
if (isSwitching) {
// Mark BEFORE stopping so the old stream's async onError (which fires
// after the abort) sees the flag and short-circuits the reconnect path.
// Without this, the AbortError can queue a reconnect against the new
// session's `sessionId` (captured in the fresh onError closure).
isUserStoppingRef.current = true;
sdkStopRef.current();
disconnectSessionStream(prevSid!);
// Schedule the reset as a task (not a microtask) so it runs AFTER the
// aborted fetch's onError has fired — but verify the epoch hasn't
// changed again (rapid session switches).
// aborted fetch's onError has fired — otherwise the new session would
// be stuck with the "user stopping" flag set, preventing auto-resume
// when hydration detects an active backend stream.
setTimeout(() => {
if (sessionEpochRef.current === currentEpoch) {
isUserStoppingRef.current = false;
}
isUserStoppingRef.current = false;
}, 0);
} else {
isUserStoppingRef.current = false;
@@ -521,10 +463,7 @@ export function useCopilotStream({
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
reconnectAttemptsRef.current = 0;
reconnectStartedAtRef.current = null;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
setRateLimitMessage(null);
@@ -533,13 +472,9 @@ export function useCopilotStream({
setReconnectExhausted(false);
setIsSyncing(false);
hasResumedRef.current.clear();
hydrateCompletedRef.current = false;
pendingResumeRef.current = null;
return () => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
};
}, [sessionId]);
@@ -550,18 +485,8 @@ export function useCopilotStream({
prevStatusRef.current = status;
const wasActive = prev === "streaming" || prev === "submitted";
const isNowActive = status === "streaming" || status === "submitted";
const isIdle = status === "ready" || status === "error";
// Clear the forced reconnect timeout as soon as the stream resumes —
// otherwise the stale 30s timer can fire mid-stream and show a
// "timed out" toast even though reconnection succeeded.
if (isNowActive && reconnectStartedAtRef.current !== null) {
reconnectStartedAtRef.current = null;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
}
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
@@ -572,9 +497,6 @@ export function useCopilotStream({
if (status === "ready") {
reconnectAttemptsRef.current = 0;
hasShownDisconnectToast.current = false;
reconnectStartedAtRef.current = null;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
// Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last
// submitted text prevents getSendSuppressionReason from allowing a
// duplicate POST of the same message immediately after a successful turn
@@ -589,11 +511,10 @@ export function useCopilotStream({
// Resume an active stream AFTER hydration completes.
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
// Does NOT run when new streams start during active conversation.
// Gated on hydrateCompletedRef to prevent racing with the hydration effect.
useEffect(() => {
if (!sessionId) return;
if (!hasActiveStream) return;
if (!hydratedMessages) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
// Never resume if currently streaming
if (status === "streaming" || status === "submitted") return;
@@ -604,30 +525,21 @@ export function useCopilotStream({
// Don't resume a stream the user just cancelled
if (isUserStoppingRef.current) return;
function doResume() {
if (!sessionId || hasResumedRef.current.get(sessionId)) return;
if (isUserStoppingRef.current) return;
// Mark as resumed immediately to prevent race conditions
hasResumedRef.current.set(sessionId, true);
hasResumedRef.current.set(sessionId, true);
// Remove the in-progress assistant message before resuming.
// The backend replays the stream from "0-0", so keeping the hydrated
// version would cause the old parts to overlap with replayed parts.
// Previous turns are preserved; the stream recreates the current turn.
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
return prev.slice(0, -1);
}
return prev;
});
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
return prev.slice(0, -1);
}
return prev;
});
resumeStreamRef.current();
}
// Wait for hydration to complete before resuming to prevent
// the two effects from racing (duplicate messages / missing content).
if (!hydrateCompletedRef.current) {
pendingResumeRef.current = doResume;
return;
}
doResume();
resumeStreamRef.current();
}, [sessionId, hasActiveStream, hydratedMessages, status, setMessages]);
// Clear messages when session is null

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB