mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
Compare commits
31 Commits
claude/nic
...
test-scree
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0202c15460 | ||
|
|
33a7b83125 | ||
|
|
b05846d515 | ||
|
|
3fb63d7eb0 | ||
|
|
2f3003f059 | ||
|
|
7aef023f28 | ||
|
|
c263fbca5c | ||
|
|
0c3a15832b | ||
|
|
d91cfb5d84 | ||
|
|
dfa07d88b8 | ||
|
|
c305ce5bac | ||
|
|
c3aaa1d48e | ||
|
|
9415166ee0 | ||
|
|
cbf71fddb2 | ||
|
|
0732fb695a | ||
|
|
2c7ba36804 | ||
|
|
e11e3841b4 | ||
|
|
5cdc7d1e80 | ||
|
|
f6a2a118a6 | ||
|
|
d30ff9e73f | ||
|
|
2e92efa29d | ||
|
|
7737b7c21f | ||
|
|
f3bf44ce25 | ||
|
|
5b487829f7 | ||
|
|
9118d61a76 | ||
|
|
d6d4fd5cba | ||
|
|
95a90b92df | ||
|
|
3e137eb91b | ||
|
|
6023d3ea91 | ||
|
|
2ec20e76bd | ||
|
|
af8a86e6b6 |
@@ -703,14 +703,17 @@ async def _compress_session_messages(
|
||||
return messages
|
||||
|
||||
|
||||
def should_upload_transcript(user_id: str | None, upload_safe: bool) -> bool:
|
||||
def should_upload_transcript(
|
||||
user_id: str | None, transcript_covers_prefix: bool
|
||||
) -> bool:
|
||||
"""Return ``True`` when the caller should upload the final transcript.
|
||||
|
||||
Uploads require a logged-in user (for the storage key) *and* a safe
|
||||
upload signal from ``_load_prior_transcript`` — i.e. GCS does not hold a
|
||||
newer version that we'd be overwriting.
|
||||
Uploads require a logged-in user (for the storage key) *and* a
|
||||
transcript that covered the session prefix when loaded — otherwise
|
||||
we'd be overwriting a more complete version in storage with a
|
||||
partial one built from just the current turn.
|
||||
"""
|
||||
return bool(user_id) and upload_safe
|
||||
return bool(user_id) and transcript_covers_prefix
|
||||
|
||||
|
||||
def _append_gap_to_builder(
|
||||
@@ -752,19 +755,11 @@ def _append_gap_to_builder(
|
||||
# so the builder's entry count matches the gap length.
|
||||
content_blocks.append({"type": "text", "text": ""})
|
||||
builder.append_assistant(content_blocks=content_blocks)
|
||||
elif msg.role == "tool":
|
||||
if msg.tool_call_id:
|
||||
builder.append_tool_result(
|
||||
tool_use_id=msg.tool_call_id,
|
||||
content=msg.content or "",
|
||||
)
|
||||
else:
|
||||
# Malformed tool message — no tool_call_id to link to an
|
||||
# assistant tool_use block. Skip to avoid an unmatched
|
||||
# tool_result entry in the builder (which would confuse --resume).
|
||||
logger.warning(
|
||||
"[Baseline] Skipping tool gap message with no tool_call_id"
|
||||
)
|
||||
elif msg.role == "tool" and msg.tool_call_id:
|
||||
builder.append_tool_result(
|
||||
tool_use_id=msg.tool_call_id,
|
||||
content=msg.content or "",
|
||||
)
|
||||
|
||||
|
||||
async def _load_prior_transcript(
|
||||
@@ -775,11 +770,11 @@ async def _load_prior_transcript(
|
||||
) -> tuple[bool, "TranscriptDownload | None"]:
|
||||
"""Download and load the prior CLI session into ``transcript_builder``.
|
||||
|
||||
Returns a tuple of (upload_safe, transcript_download):
|
||||
- ``upload_safe`` is ``True`` when it is safe to upload at the end of this
|
||||
turn. Upload is suppressed only for **download errors** (unknown GCS
|
||||
state) — missing and invalid files return ``True`` because there is
|
||||
nothing in GCS worth protecting against overwriting.
|
||||
Returns a tuple of (covers_prefix, transcript_download):
|
||||
- ``covers_prefix`` is ``True`` when the loaded session fully covers the
|
||||
session prefix; ``False`` otherwise (missing, invalid, or download error).
|
||||
Callers should suppress uploads when this is ``False`` to avoid overwriting
|
||||
a more complete version in storage.
|
||||
- ``transcript_download`` is a ``TranscriptDownload`` with str content
|
||||
(pre-decoded and stripped) when available, or ``None`` when no valid
|
||||
transcript could be loaded. Callers pass this to
|
||||
@@ -791,14 +786,11 @@ async def _load_prior_transcript(
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("[Baseline] Session restore failed: %s", e)
|
||||
# Unknown GCS state — be conservative, skip upload.
|
||||
return False, None
|
||||
|
||||
if restore is None:
|
||||
logger.debug("[Baseline] No CLI session available — will upload fresh")
|
||||
# Nothing in GCS to protect; allow upload so the first baseline turn
|
||||
# writes the initial transcript snapshot.
|
||||
return True, None
|
||||
logger.debug("[Baseline] No CLI session available")
|
||||
return False, None
|
||||
|
||||
content_bytes = restore.content
|
||||
try:
|
||||
@@ -809,14 +801,12 @@ async def _load_prior_transcript(
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
logger.warning("[Baseline] CLI session content is not valid UTF-8")
|
||||
# Corrupt file in GCS; overwriting with a valid one is better.
|
||||
return True, None
|
||||
return False, None
|
||||
|
||||
stripped = strip_for_upload(raw_str)
|
||||
if not validate_transcript(stripped):
|
||||
logger.warning("[Baseline] CLI session content invalid after strip")
|
||||
# Corrupt file in GCS; overwriting with a valid one is better.
|
||||
return True, None
|
||||
return False, None
|
||||
|
||||
transcript_builder.load_previous(stripped, log_prefix="[Baseline]")
|
||||
logger.info(
|
||||
@@ -967,7 +957,7 @@ async def stream_chat_completion_baseline(
|
||||
|
||||
# --- Transcript support (feature parity with SDK path) ---
|
||||
transcript_builder = TranscriptBuilder()
|
||||
transcript_upload_safe = True
|
||||
transcript_covers_prefix = True
|
||||
|
||||
# Build system prompt only on the first turn to avoid mid-conversation
|
||||
# changes from concurrent chats updating business understanding.
|
||||
@@ -987,7 +977,7 @@ async def stream_chat_completion_baseline(
|
||||
transcript_download: TranscriptDownload | None = None
|
||||
if user_id and len(session.messages) > 1:
|
||||
(
|
||||
(transcript_upload_safe, transcript_download),
|
||||
(transcript_covers_prefix, transcript_download),
|
||||
(base_system_prompt, understanding),
|
||||
) = await asyncio.gather(
|
||||
_load_prior_transcript(
|
||||
@@ -1039,7 +1029,7 @@ async def stream_chat_completion_baseline(
|
||||
# See extract_context_messages() in transcript.py for the shared primitive.
|
||||
prior_context = extract_context_messages(transcript_download, session.messages)
|
||||
messages_for_context = await _compress_session_messages(
|
||||
prior_context + ([session.messages[-1]] if session.messages else []),
|
||||
prior_context + [session.messages[-1]],
|
||||
model=active_model,
|
||||
)
|
||||
|
||||
@@ -1384,7 +1374,7 @@ async def stream_chat_completion_baseline(
|
||||
stop_reason=STOP_REASON_END_TURN,
|
||||
)
|
||||
|
||||
if user_id and should_upload_transcript(user_id, transcript_upload_safe):
|
||||
if user_id and should_upload_transcript(user_id, transcript_covers_prefix):
|
||||
await _upload_final_transcript(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
|
||||
@@ -137,27 +137,25 @@ class TestLoadPriorTranscript:
|
||||
assert builder.entry_count == 4
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_missing_transcript_allows_upload(self):
|
||||
"""Nothing in GCS → upload is safe; the turn writes the first snapshot."""
|
||||
async def test_missing_transcript_returns_false(self):
|
||||
builder = TranscriptBuilder()
|
||||
with patch(
|
||||
"backend.copilot.baseline.service.download_transcript",
|
||||
new=AsyncMock(return_value=None),
|
||||
):
|
||||
upload_safe, dl = await _load_prior_transcript(
|
||||
covers, dl = await _load_prior_transcript(
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
session_messages=_make_session_messages("user", "assistant"),
|
||||
transcript_builder=builder,
|
||||
)
|
||||
|
||||
assert upload_safe is True
|
||||
assert covers is False
|
||||
assert dl is None
|
||||
assert builder.is_empty
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_transcript_allows_upload(self):
|
||||
"""Corrupt file in GCS → overwriting with a valid one is better."""
|
||||
async def test_invalid_transcript_returns_false(self):
|
||||
builder = TranscriptBuilder()
|
||||
restore = TranscriptDownload(
|
||||
content=b'{"type":"progress","uuid":"a"}\n',
|
||||
@@ -168,14 +166,14 @@ class TestLoadPriorTranscript:
|
||||
"backend.copilot.baseline.service.download_transcript",
|
||||
new=AsyncMock(return_value=restore),
|
||||
):
|
||||
upload_safe, dl = await _load_prior_transcript(
|
||||
covers, dl = await _load_prior_transcript(
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
session_messages=_make_session_messages("user", "assistant"),
|
||||
transcript_builder=builder,
|
||||
)
|
||||
|
||||
assert upload_safe is True
|
||||
assert covers is False
|
||||
assert dl is None
|
||||
assert builder.is_empty
|
||||
|
||||
@@ -559,7 +557,10 @@ class TestTranscriptLifecycle:
|
||||
|
||||
# --- 3. Gate + upload ---
|
||||
assert (
|
||||
should_upload_transcript(user_id="user-1", upload_safe=covers) is True
|
||||
should_upload_transcript(
|
||||
user_id="user-1", transcript_covers_prefix=covers
|
||||
)
|
||||
is True
|
||||
)
|
||||
await _upload_final_transcript(
|
||||
user_id="user-1",
|
||||
@@ -623,11 +624,15 @@ class TestTranscriptLifecycle:
|
||||
stop_reason=STOP_REASON_END_TURN,
|
||||
)
|
||||
|
||||
assert should_upload_transcript(user_id=None, upload_safe=True) is False
|
||||
assert (
|
||||
should_upload_transcript(user_id=None, transcript_covers_prefix=True)
|
||||
is False
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_missing_download_still_uploads_new_content(self):
|
||||
"""No prior session → upload is safe; the turn writes the first snapshot."""
|
||||
"""No prior session → covers defaults to True in the service,
|
||||
new turn should upload cleanly."""
|
||||
builder = TranscriptBuilder()
|
||||
upload_mock = AsyncMock(return_value=None)
|
||||
with (
|
||||
@@ -640,20 +645,24 @@ class TestTranscriptLifecycle:
|
||||
new=upload_mock,
|
||||
),
|
||||
):
|
||||
upload_safe, dl = await _load_prior_transcript(
|
||||
covers, dl = await _load_prior_transcript(
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
session_messages=_make_session_messages("user"),
|
||||
transcript_builder=builder,
|
||||
)
|
||||
# Nothing in GCS → upload is safe so the first baseline turn
|
||||
# can write the initial transcript snapshot.
|
||||
assert upload_safe is True
|
||||
# No restore: covers is False, so the production path would
|
||||
# skip upload. This protects against overwriting a future
|
||||
# more-complete session with a single-turn snapshot.
|
||||
assert covers is False
|
||||
assert dl is None
|
||||
assert (
|
||||
should_upload_transcript(user_id="user-1", upload_safe=upload_safe)
|
||||
is True
|
||||
should_upload_transcript(
|
||||
user_id="user-1", transcript_covers_prefix=covers
|
||||
)
|
||||
is False
|
||||
)
|
||||
upload_mock.assert_not_awaited()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1003,14 +1003,11 @@ def _process_cli_restore(
|
||||
is_valid = validate_transcript(stripped)
|
||||
# Use len(raw_str) rather than len(cli_restore.content) so the unit is always
|
||||
# characters (raw_str is always str at this point regardless of input type).
|
||||
# lines_stripped = original lines minus remaining lines after stripping.
|
||||
_original_lines = len(raw_str.strip().split("\n")) if raw_str.strip() else 0
|
||||
_remaining_lines = len(stripped.strip().split("\n")) if stripped.strip() else 0
|
||||
logger.info(
|
||||
"%s Restored CLI session: %dB raw, %d lines stripped, msg_count=%d, valid=%s",
|
||||
log_prefix,
|
||||
len(raw_str),
|
||||
_original_lines - _remaining_lines,
|
||||
len(stripped.strip().split("\n")) if stripped.strip() else 0,
|
||||
cli_restore.message_count,
|
||||
is_valid,
|
||||
)
|
||||
@@ -1101,9 +1098,8 @@ def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
|
||||
result.append(block)
|
||||
else:
|
||||
logger.warning(
|
||||
"[SDK] Unknown content block type: %s."
|
||||
" This may indicate a new SDK version with additional block types.",
|
||||
type(block).__name__,
|
||||
f"[SDK] Unknown content block type: {type(block).__name__}. "
|
||||
f"This may indicate a new SDK version with additional block types."
|
||||
)
|
||||
return result
|
||||
|
||||
@@ -1158,11 +1154,10 @@ async def _compress_messages(
|
||||
|
||||
if result.was_compacted:
|
||||
logger.info(
|
||||
"[SDK] Context compacted: %d -> %d tokens (%d summarized, %d dropped)",
|
||||
result.original_token_count,
|
||||
result.token_count,
|
||||
result.messages_summarized,
|
||||
result.messages_dropped,
|
||||
f"[SDK] Context compacted: {result.original_token_count} -> "
|
||||
f"{result.token_count} tokens "
|
||||
f"({result.messages_summarized} summarized, "
|
||||
f"{result.messages_dropped} dropped)"
|
||||
)
|
||||
# Convert compressed dicts back to ChatMessages
|
||||
return [
|
||||
@@ -1229,17 +1224,11 @@ def _session_messages_to_transcript(messages: list[ChatMessage]) -> str:
|
||||
)
|
||||
if blocks:
|
||||
builder.append_assistant(blocks)
|
||||
elif msg.role == "tool":
|
||||
if msg.tool_call_id:
|
||||
builder.append_tool_result(
|
||||
tool_use_id=msg.tool_call_id,
|
||||
content=msg.content or "",
|
||||
)
|
||||
else:
|
||||
# Malformed tool message — no tool_call_id to link to an
|
||||
# assistant tool_use block. Skip to avoid an unmatched
|
||||
# tool_result entry in the builder (which would confuse --resume).
|
||||
logger.warning("[SDK] Skipping tool gap message with no tool_call_id")
|
||||
elif msg.role == "tool" and msg.tool_call_id:
|
||||
builder.append_tool_result(
|
||||
tool_use_id=msg.tool_call_id,
|
||||
content=msg.content or "",
|
||||
)
|
||||
return builder.to_jsonl()
|
||||
|
||||
|
||||
@@ -2566,10 +2555,7 @@ async def _restore_cli_session_for_turn(
|
||||
if validate_transcript(stripped):
|
||||
transcript_builder.load_previous(stripped, log_prefix=log_prefix)
|
||||
result.transcript_content = stripped
|
||||
except (UnicodeDecodeError, ValueError, OSError) as _load_err:
|
||||
# UnicodeDecodeError: non-UTF-8 content; ValueError: malformed JSONL in
|
||||
# strip_for_upload; OSError: encode/decode I/O failure. Unexpected
|
||||
# exceptions propagate so programming errors are not silently masked.
|
||||
except Exception as _load_err:
|
||||
logger.debug(
|
||||
"%s Could not load baseline transcript into builder: %s",
|
||||
log_prefix,
|
||||
@@ -3669,20 +3655,43 @@ async def stream_chat_completion_sdk(
|
||||
sdk_cwd, session_id, log_prefix
|
||||
)
|
||||
if _cli_content:
|
||||
# Watermark = number of DB messages this transcript covers.
|
||||
# len(session.messages) is accurate: the CLI session file
|
||||
# was just written after the turn completed, so it covers
|
||||
# all messages through this turn. Any gap from a prior
|
||||
# missed upload was already detected by detect_gap and
|
||||
# injected as context, so the model has the full history.
|
||||
#
|
||||
# Previously this used _final_tmsg_count + 2, which
|
||||
# under-counted for tool-use turns (delta = 2 + 2*N_tool_calls),
|
||||
# causing persistent spurious gap-fills on every subsequent turn.
|
||||
# That concern was addressed by the inflated-watermark fix
|
||||
# (using the GCS watermark as the anchor for gap detection),
|
||||
# which makes len(session.messages) safe to use here.
|
||||
_jsonl_covered = len(session.messages)
|
||||
# Compute JSONL coverage watermark.
|
||||
# Using len(session.messages) (DB count) causes an
|
||||
# "inflated watermark" bug: when prior turns fail to
|
||||
# upload, the GCS JSONL is stale but its meta.json
|
||||
# watermark matches the DB count, so the next turn's
|
||||
# gap-fill check (transcript_msg_count < msg_count-1)
|
||||
# never triggers and the model silently loses context.
|
||||
# Fix: when we have a reliable watermark from the
|
||||
# downloaded transcript (use_resume=True,
|
||||
# transcript_msg_count>0), set watermark =
|
||||
# previous_coverage + 2 (current user+asst pair).
|
||||
# This accurately reflects what is actually in the
|
||||
# JSONL so any stale gap is detected next turn.
|
||||
# For all other cases (fresh session, restore failed,
|
||||
# old-format meta with count=0, retry reset) fall back
|
||||
# to len(session.messages) to preserve original
|
||||
# behaviour and avoid triggering spurious gap-fills
|
||||
# on sessions whose watermark we cannot determine.
|
||||
_final_use_resume = (
|
||||
state.use_resume if state is not None else use_resume
|
||||
)
|
||||
_final_tmsg_count = (
|
||||
state.transcript_msg_count
|
||||
if state is not None
|
||||
else transcript_msg_count
|
||||
)
|
||||
if _final_use_resume and _final_tmsg_count > 0:
|
||||
# +2 = current user turn + current assistant response.
|
||||
# For tool-use turns with multiple assistant+tool pairs this
|
||||
# under-counts (e.g. +4), but the resulting underestimate is
|
||||
# safe: next turn's detect_gap returns the uncovered entries
|
||||
# as a small gap that is filled from DB. Over-estimating
|
||||
# (using len(session.messages)) would suppress gap detection
|
||||
# when a prior upload was missed — the inflated-watermark bug.
|
||||
_jsonl_covered = _final_tmsg_count + 2
|
||||
else:
|
||||
_jsonl_covered = len(session.messages)
|
||||
await asyncio.shield(
|
||||
upload_transcript(
|
||||
user_id=user_id,
|
||||
|
||||
@@ -880,116 +880,6 @@ class TestUploadCliSession:
|
||||
assert meta_content["mode"] == "baseline"
|
||||
assert meta_content["message_count"] == 4
|
||||
|
||||
def test_strips_session_before_upload_and_writes_back(self):
|
||||
"""strip_for_upload removes progress entries and returns smaller content."""
|
||||
import json
|
||||
|
||||
from .transcript import strip_for_upload
|
||||
|
||||
progress_entry = {
|
||||
"type": "progress",
|
||||
"uuid": "p1",
|
||||
"parentUuid": "u1",
|
||||
"data": {"type": "bash_progress", "stdout": "running..."},
|
||||
}
|
||||
user_entry = {
|
||||
"type": "user",
|
||||
"uuid": "u1",
|
||||
"message": {"role": "user", "content": "hello"},
|
||||
}
|
||||
asst_entry = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"parentUuid": "u1",
|
||||
"message": {"role": "assistant", "content": "world"},
|
||||
}
|
||||
raw_content = (
|
||||
json.dumps(progress_entry)
|
||||
+ "\n"
|
||||
+ json.dumps(user_entry)
|
||||
+ "\n"
|
||||
+ json.dumps(asst_entry)
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
stripped = strip_for_upload(raw_content)
|
||||
|
||||
stored_lines = stripped.strip().split("\n")
|
||||
stored_types = [json.loads(line).get("type") for line in stored_lines]
|
||||
assert "progress" not in stored_types
|
||||
assert "user" in stored_types
|
||||
assert "assistant" in stored_types
|
||||
assert len(stripped.encode()) < len(raw_content.encode())
|
||||
|
||||
def test_strips_stale_thinking_blocks_before_upload(self):
|
||||
"""strip_for_upload removes thinking blocks from non-last assistant turns."""
|
||||
import json
|
||||
|
||||
from .transcript import strip_for_upload
|
||||
|
||||
u1 = {
|
||||
"type": "user",
|
||||
"uuid": "u1",
|
||||
"message": {"role": "user", "content": "q1"},
|
||||
}
|
||||
a1_with_thinking = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"parentUuid": "u1",
|
||||
"message": {
|
||||
"id": "msg_a1",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": "A" * 5000},
|
||||
{"type": "text", "text": "answer1"},
|
||||
],
|
||||
},
|
||||
}
|
||||
u2 = {
|
||||
"type": "user",
|
||||
"uuid": "u2",
|
||||
"parentUuid": "a1",
|
||||
"message": {"role": "user", "content": "q2"},
|
||||
}
|
||||
a2_no_thinking = {
|
||||
"type": "assistant",
|
||||
"uuid": "a2",
|
||||
"parentUuid": "u2",
|
||||
"message": {
|
||||
"id": "msg_a2",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "answer2"}],
|
||||
},
|
||||
}
|
||||
raw_content = (
|
||||
json.dumps(u1)
|
||||
+ "\n"
|
||||
+ json.dumps(a1_with_thinking)
|
||||
+ "\n"
|
||||
+ json.dumps(u2)
|
||||
+ "\n"
|
||||
+ json.dumps(a2_no_thinking)
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
stripped = strip_for_upload(raw_content)
|
||||
|
||||
stored_lines = stripped.strip().split("\n")
|
||||
|
||||
# a1 should have its thinking block stripped (it's not the last assistant turn).
|
||||
a1_stored = json.loads(stored_lines[1])
|
||||
a1_content = a1_stored["message"]["content"]
|
||||
assert all(
|
||||
b["type"] != "thinking" for b in a1_content
|
||||
), "stale thinking block should be stripped from a1"
|
||||
assert any(
|
||||
b["type"] == "text" for b in a1_content
|
||||
), "text block should be kept in a1"
|
||||
|
||||
# a2 (last turn) should be unchanged.
|
||||
a2_stored = json.loads(stored_lines[3])
|
||||
assert a2_stored["message"]["content"] == [{"type": "text", "text": "answer2"}]
|
||||
|
||||
|
||||
class TestRestoreCliSession:
|
||||
def test_returns_none_when_file_not_found_in_storage(self):
|
||||
|
||||
@@ -110,7 +110,7 @@ export const Flow = () => {
|
||||
event.preventDefault();
|
||||
}}
|
||||
maxZoom={2}
|
||||
minZoom={0.05}
|
||||
minZoom={0.1}
|
||||
onDragOver={onDragOver}
|
||||
onDrop={onDrop}
|
||||
nodesDraggable={!isLocked}
|
||||
|
||||
@@ -1,29 +1,5 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
extractWorkspaceArtifacts,
|
||||
filePartToArtifactRef,
|
||||
isReasoningToolPart,
|
||||
splitReasoningAndResponse,
|
||||
} from "./helpers";
|
||||
import type { MessagePart } from "./helpers";
|
||||
|
||||
function textPart(text: string): MessagePart {
|
||||
return { type: "text", text } as MessagePart;
|
||||
}
|
||||
|
||||
function toolPart(
|
||||
toolName: string,
|
||||
state: string = "output-available",
|
||||
): MessagePart {
|
||||
return {
|
||||
type: `tool-${toolName}`,
|
||||
state,
|
||||
toolCallId: `call-${toolName}`,
|
||||
toolName,
|
||||
args: {},
|
||||
output: "{}",
|
||||
} as unknown as MessagePart;
|
||||
}
|
||||
import { extractWorkspaceArtifacts, filePartToArtifactRef } from "./helpers";
|
||||
|
||||
describe("extractWorkspaceArtifacts", () => {
|
||||
it("extracts a single workspace:// link with its markdown title", () => {
|
||||
@@ -125,130 +101,3 @@ describe("filePartToArtifactRef", () => {
|
||||
expect(overridden?.origin).toBe("agent");
|
||||
});
|
||||
});
|
||||
|
||||
describe("isReasoningToolPart", () => {
|
||||
it("returns true for reasoning/search tools", () => {
|
||||
const reasoningTools = [
|
||||
"find_block",
|
||||
"find_agent",
|
||||
"find_library_agent",
|
||||
"search_docs",
|
||||
"get_doc_page",
|
||||
"search_feature_requests",
|
||||
"ask_question",
|
||||
];
|
||||
for (const name of reasoningTools) {
|
||||
expect(isReasoningToolPart(toolPart(name))).toBe(true);
|
||||
}
|
||||
});
|
||||
|
||||
it("returns false for action tools", () => {
|
||||
const actionTools = [
|
||||
"run_block",
|
||||
"run_agent",
|
||||
"create_agent",
|
||||
"edit_agent",
|
||||
"run_mcp_tool",
|
||||
"schedule_agent",
|
||||
"continue_run_block",
|
||||
];
|
||||
for (const name of actionTools) {
|
||||
expect(isReasoningToolPart(toolPart(name))).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("returns false for text parts", () => {
|
||||
expect(isReasoningToolPart(textPart("hello"))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("splitReasoningAndResponse", () => {
|
||||
it("returns all parts as response when there are no tools", () => {
|
||||
const parts = [textPart("Hello"), textPart("World")];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("splits on reasoning tools — text before goes to reasoning", () => {
|
||||
const parts = [
|
||||
textPart("Let me search..."),
|
||||
toolPart("find_block"),
|
||||
textPart("Here is your answer"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(2);
|
||||
expect(result.response).toHaveLength(1);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Here is your answer",
|
||||
);
|
||||
});
|
||||
|
||||
it("does NOT split on action tools — response before run_block stays visible", () => {
|
||||
const parts = [
|
||||
textPart("Here is my answer"),
|
||||
toolPart("run_block"),
|
||||
textPart("Block finished"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("splits only on reasoning tools when both reasoning and action tools are present", () => {
|
||||
const parts = [
|
||||
textPart("Planning..."),
|
||||
toolPart("search_docs"),
|
||||
textPart("Found it. Running now."),
|
||||
toolPart("run_block"),
|
||||
textPart("Done!"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(2);
|
||||
expect(result.response).toHaveLength(3);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Found it. Running now.",
|
||||
);
|
||||
});
|
||||
|
||||
it("returns all as response when reasoning tools have no text after them", () => {
|
||||
const parts = [
|
||||
textPart("Hello"),
|
||||
toolPart("find_agent"),
|
||||
toolPart("run_block"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("handles multiple reasoning tools correctly", () => {
|
||||
const parts = [
|
||||
textPart("Searching..."),
|
||||
toolPart("find_block"),
|
||||
textPart("Found one, searching more..."),
|
||||
toolPart("search_docs"),
|
||||
textPart("Here are the results"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(4);
|
||||
expect(result.response).toHaveLength(1);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Here are the results",
|
||||
);
|
||||
});
|
||||
|
||||
it("handles action tool after response text without hiding the response", () => {
|
||||
const parts = [
|
||||
toolPart("find_block"),
|
||||
textPart("I found it! Let me run it."),
|
||||
toolPart("run_agent"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(1);
|
||||
expect(result.response).toHaveLength(2);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"I found it! Let me run it.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -33,20 +33,6 @@ const CUSTOM_TOOL_TYPES = new Set([
|
||||
"tool-create_feature_request",
|
||||
]);
|
||||
|
||||
const REASONING_TOOL_TYPES = new Set([
|
||||
"tool-find_block",
|
||||
"tool-find_agent",
|
||||
"tool-find_library_agent",
|
||||
"tool-search_docs",
|
||||
"tool-get_doc_page",
|
||||
"tool-search_feature_requests",
|
||||
"tool-ask_question",
|
||||
]);
|
||||
|
||||
export function isReasoningToolPart(part: MessagePart): boolean {
|
||||
return REASONING_TOOL_TYPES.has(part.type);
|
||||
}
|
||||
|
||||
const WORKSPACE_FILE_PATTERN =
|
||||
/\/api\/proxy\/api\/workspace\/files\/([a-f0-9-]+)\/download/;
|
||||
const WORKSPACE_URI_PATTERN = /workspace:\/\/([a-f0-9-]+)(?:#([^\s)\]]+))?/g;
|
||||
@@ -140,22 +126,22 @@ export function splitReasoningAndResponse(parts: MessagePart[]): {
|
||||
reasoning: MessagePart[];
|
||||
response: MessagePart[];
|
||||
} {
|
||||
const lastReasoningIndex = parts.findLastIndex((p) => isReasoningToolPart(p));
|
||||
const lastToolIndex = parts.findLastIndex((p) => p.type.startsWith("tool-"));
|
||||
|
||||
if (lastReasoningIndex === -1) {
|
||||
if (lastToolIndex === -1) {
|
||||
return { reasoning: [], response: parts };
|
||||
}
|
||||
|
||||
const hasResponseAfterReasoning = parts
|
||||
.slice(lastReasoningIndex + 1)
|
||||
const hasResponseAfterTools = parts
|
||||
.slice(lastToolIndex + 1)
|
||||
.some((p) => p.type === "text");
|
||||
|
||||
if (!hasResponseAfterReasoning) {
|
||||
if (!hasResponseAfterTools) {
|
||||
return { reasoning: [], response: parts };
|
||||
}
|
||||
|
||||
const rawReasoning = parts.slice(0, lastReasoningIndex + 1);
|
||||
const rawResponse = parts.slice(lastReasoningIndex + 1);
|
||||
const rawReasoning = parts.slice(0, lastToolIndex + 1);
|
||||
const rawResponse = parts.slice(lastToolIndex + 1);
|
||||
|
||||
const reasoning: MessagePart[] = [];
|
||||
const pinnedParts: MessagePart[] = [];
|
||||
|
||||
@@ -27,9 +27,6 @@ const RECONNECT_MAX_ATTEMPTS = 3;
|
||||
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
|
||||
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
|
||||
|
||||
/** Max time (ms) the UI can stay in "reconnecting" state before forcing idle. */
|
||||
const RECONNECT_MAX_DURATION_MS = 30_000;
|
||||
|
||||
interface UseCopilotStreamArgs {
|
||||
sessionId: string | null;
|
||||
hydratedMessages: UIMessage[] | undefined;
|
||||
@@ -125,21 +122,12 @@ export function useCopilotStream({
|
||||
const [isSyncing, setIsSyncing] = useState(false);
|
||||
// Tracks the last time the page was hidden — used to detect sleep/wake gaps.
|
||||
const lastHiddenAtRef = useRef(Date.now());
|
||||
// Monotonic counter that increments on each session switch — async callbacks
|
||||
// from old sessions compare their captured epoch to detect staleness.
|
||||
const sessionEpochRef = useRef(0);
|
||||
// Timestamp when reconnection first started — used to force timeout.
|
||||
const reconnectStartedAtRef = useRef<number | null>(null);
|
||||
// Timer for the forced reconnect timeout.
|
||||
const reconnectTimeoutTimerRef = useRef<ReturnType<typeof setTimeout>>();
|
||||
|
||||
function handleReconnect(sid: string) {
|
||||
if (isReconnectScheduledRef.current || !sid) return;
|
||||
|
||||
const nextAttempt = reconnectAttemptsRef.current + 1;
|
||||
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
setReconnectExhausted(true);
|
||||
toast({
|
||||
title: "Connection lost",
|
||||
@@ -149,26 +137,6 @@ export function useCopilotStream({
|
||||
return;
|
||||
}
|
||||
|
||||
// Track when reconnection first started for the forced timeout.
|
||||
if (reconnectStartedAtRef.current === null) {
|
||||
reconnectStartedAtRef.current = Date.now();
|
||||
// Schedule a forced timeout — if reconnecting takes longer than
|
||||
// RECONNECT_MAX_DURATION_MS, force the UI back to idle.
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
const capturedEpoch = sessionEpochRef.current;
|
||||
reconnectTimeoutTimerRef.current = setTimeout(() => {
|
||||
if (sessionEpochRef.current !== capturedEpoch) return;
|
||||
setReconnectExhausted(true);
|
||||
reconnectStartedAtRef.current = null;
|
||||
toast({
|
||||
title: "Connection timed out",
|
||||
description:
|
||||
"AutoPilot may still be working. Refresh to check for updates.",
|
||||
variant: "destructive",
|
||||
});
|
||||
}, RECONNECT_MAX_DURATION_MS);
|
||||
}
|
||||
|
||||
isReconnectScheduledRef.current = true;
|
||||
setIsReconnectScheduled(true);
|
||||
reconnectAttemptsRef.current = nextAttempt;
|
||||
@@ -182,12 +150,8 @@ export function useCopilotStream({
|
||||
}
|
||||
|
||||
const delay = RECONNECT_BASE_DELAY_MS * 2 ** (nextAttempt - 1);
|
||||
const capturedEpoch = sessionEpochRef.current;
|
||||
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
// Bail if the session switched while the timer was pending.
|
||||
if (sessionEpochRef.current !== capturedEpoch) return;
|
||||
|
||||
isReconnectScheduledRef.current = false;
|
||||
setIsReconnectScheduled(false);
|
||||
// Strip the stale in-progress assistant message before resuming —
|
||||
@@ -220,11 +184,7 @@ export function useCopilotStream({
|
||||
transport: transport ?? undefined,
|
||||
onFinish: async ({ isDisconnect, isAbort }) => {
|
||||
if (isAbort || !sessionId) return;
|
||||
// User-initiated stops should not trigger reconnection.
|
||||
if (isUserStoppingRef.current) return;
|
||||
|
||||
// The AI SDK rarely sets isDisconnect — treat ANY non-user-initiated
|
||||
// finish as a potential disconnect when the backend stream is active.
|
||||
if (isDisconnect) {
|
||||
handleReconnect(sessionId);
|
||||
return;
|
||||
@@ -459,35 +419,20 @@ export function useCopilotStream({
|
||||
};
|
||||
}, [refetchSession, setMessages]);
|
||||
|
||||
// Hydrate messages from REST API when not actively streaming.
|
||||
// Sets hydrateCompletedRef so the resume effect knows it's safe to proceed.
|
||||
// Hydrate messages from REST API when not actively streaming
|
||||
useEffect(() => {
|
||||
if (!hydratedMessages) return;
|
||||
if (!hydratedMessages || hydratedMessages.length === 0) return;
|
||||
if (status === "streaming" || status === "submitted") return;
|
||||
if (isReconnectScheduled) return;
|
||||
if (hydratedMessages.length > 0) {
|
||||
setMessages((prev) => {
|
||||
if (prev.length >= hydratedMessages.length) return prev;
|
||||
return deduplicateMessages(hydratedMessages);
|
||||
});
|
||||
}
|
||||
hydrateCompletedRef.current = true;
|
||||
// Flush any resume that was waiting for hydration to finish.
|
||||
if (pendingResumeRef.current) {
|
||||
const pendingResume = pendingResumeRef.current;
|
||||
pendingResumeRef.current = null;
|
||||
pendingResume();
|
||||
}
|
||||
setMessages((prev) => {
|
||||
if (prev.length >= hydratedMessages.length) return prev;
|
||||
return deduplicateMessages(hydratedMessages);
|
||||
});
|
||||
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
|
||||
|
||||
// Track resume state per session
|
||||
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
|
||||
|
||||
// Coordination: hydration must complete before resume fires.
|
||||
// Prevents duplicate messages / missing content from the two effects racing.
|
||||
const hydrateCompletedRef = useRef(false);
|
||||
const pendingResumeRef = useRef<(() => void) | null>(null);
|
||||
|
||||
// Clean up reconnect state on session switch.
|
||||
// Abort the old stream's in-flight fetch and tell the backend to release
|
||||
// its XREAD listeners immediately (fire-and-forget).
|
||||
@@ -496,24 +441,21 @@ export function useCopilotStream({
|
||||
const prevSid = prevStreamSessionRef.current;
|
||||
prevStreamSessionRef.current = sessionId;
|
||||
|
||||
// Increment epoch so stale async callbacks from the old session bail out.
|
||||
sessionEpochRef.current += 1;
|
||||
const currentEpoch = sessionEpochRef.current;
|
||||
|
||||
const isSwitching = Boolean(prevSid && prevSid !== sessionId);
|
||||
if (isSwitching) {
|
||||
// Mark BEFORE stopping so the old stream's async onError (which fires
|
||||
// after the abort) sees the flag and short-circuits the reconnect path.
|
||||
// Without this, the AbortError can queue a reconnect against the new
|
||||
// session's `sessionId` (captured in the fresh onError closure).
|
||||
isUserStoppingRef.current = true;
|
||||
sdkStopRef.current();
|
||||
disconnectSessionStream(prevSid!);
|
||||
// Schedule the reset as a task (not a microtask) so it runs AFTER the
|
||||
// aborted fetch's onError has fired — but verify the epoch hasn't
|
||||
// changed again (rapid session switches).
|
||||
// aborted fetch's onError has fired — otherwise the new session would
|
||||
// be stuck with the "user stopping" flag set, preventing auto-resume
|
||||
// when hydration detects an active backend stream.
|
||||
setTimeout(() => {
|
||||
if (sessionEpochRef.current === currentEpoch) {
|
||||
isUserStoppingRef.current = false;
|
||||
}
|
||||
isUserStoppingRef.current = false;
|
||||
}, 0);
|
||||
} else {
|
||||
isUserStoppingRef.current = false;
|
||||
@@ -521,10 +463,7 @@ export function useCopilotStream({
|
||||
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = undefined;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
reconnectStartedAtRef.current = null;
|
||||
isReconnectScheduledRef.current = false;
|
||||
setIsReconnectScheduled(false);
|
||||
setRateLimitMessage(null);
|
||||
@@ -533,13 +472,9 @@ export function useCopilotStream({
|
||||
setReconnectExhausted(false);
|
||||
setIsSyncing(false);
|
||||
hasResumedRef.current.clear();
|
||||
hydrateCompletedRef.current = false;
|
||||
pendingResumeRef.current = null;
|
||||
return () => {
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = undefined;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
};
|
||||
}, [sessionId]);
|
||||
|
||||
@@ -550,18 +485,8 @@ export function useCopilotStream({
|
||||
prevStatusRef.current = status;
|
||||
|
||||
const wasActive = prev === "streaming" || prev === "submitted";
|
||||
const isNowActive = status === "streaming" || status === "submitted";
|
||||
const isIdle = status === "ready" || status === "error";
|
||||
|
||||
// Clear the forced reconnect timeout as soon as the stream resumes —
|
||||
// otherwise the stale 30s timer can fire mid-stream and show a
|
||||
// "timed out" toast even though reconnection succeeded.
|
||||
if (isNowActive && reconnectStartedAtRef.current !== null) {
|
||||
reconnectStartedAtRef.current = null;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
}
|
||||
|
||||
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: getGetV2GetSessionQueryKey(sessionId),
|
||||
@@ -572,9 +497,6 @@ export function useCopilotStream({
|
||||
if (status === "ready") {
|
||||
reconnectAttemptsRef.current = 0;
|
||||
hasShownDisconnectToast.current = false;
|
||||
reconnectStartedAtRef.current = null;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
// Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last
|
||||
// submitted text prevents getSendSuppressionReason from allowing a
|
||||
// duplicate POST of the same message immediately after a successful turn
|
||||
@@ -589,11 +511,10 @@ export function useCopilotStream({
|
||||
// Resume an active stream AFTER hydration completes.
|
||||
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
|
||||
// Does NOT run when new streams start during active conversation.
|
||||
// Gated on hydrateCompletedRef to prevent racing with the hydration effect.
|
||||
useEffect(() => {
|
||||
if (!sessionId) return;
|
||||
if (!hasActiveStream) return;
|
||||
if (!hydratedMessages) return;
|
||||
if (!hydratedMessages || hydratedMessages.length === 0) return;
|
||||
|
||||
// Never resume if currently streaming
|
||||
if (status === "streaming" || status === "submitted") return;
|
||||
@@ -604,30 +525,21 @@ export function useCopilotStream({
|
||||
// Don't resume a stream the user just cancelled
|
||||
if (isUserStoppingRef.current) return;
|
||||
|
||||
function doResume() {
|
||||
if (!sessionId || hasResumedRef.current.get(sessionId)) return;
|
||||
if (isUserStoppingRef.current) return;
|
||||
// Mark as resumed immediately to prevent race conditions
|
||||
hasResumedRef.current.set(sessionId, true);
|
||||
|
||||
hasResumedRef.current.set(sessionId, true);
|
||||
// Remove the in-progress assistant message before resuming.
|
||||
// The backend replays the stream from "0-0", so keeping the hydrated
|
||||
// version would cause the old parts to overlap with replayed parts.
|
||||
// Previous turns are preserved; the stream recreates the current turn.
|
||||
setMessages((prev) => {
|
||||
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
|
||||
return prev.slice(0, -1);
|
||||
}
|
||||
return prev;
|
||||
});
|
||||
|
||||
setMessages((prev) => {
|
||||
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
|
||||
return prev.slice(0, -1);
|
||||
}
|
||||
return prev;
|
||||
});
|
||||
|
||||
resumeStreamRef.current();
|
||||
}
|
||||
|
||||
// Wait for hydration to complete before resuming to prevent
|
||||
// the two effects from racing (duplicate messages / missing content).
|
||||
if (!hydrateCompletedRef.current) {
|
||||
pendingResumeRef.current = doResume;
|
||||
return;
|
||||
}
|
||||
|
||||
doResume();
|
||||
resumeStreamRef.current();
|
||||
}, [sessionId, hasActiveStream, hydratedMessages, status, setMessages]);
|
||||
|
||||
// Clear messages when session is null
|
||||
|
||||
BIN
test-screenshots/PR-12804/00-login-already-in.png
Normal file
BIN
test-screenshots/PR-12804/00-login-already-in.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 92 KiB |
BIN
test-screenshots/PR-12804/01-copilot-page.png
Normal file
BIN
test-screenshots/PR-12804/01-copilot-page.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 92 KiB |
BIN
test-screenshots/PR-12804/02-copilot-sessions.png
Normal file
BIN
test-screenshots/PR-12804/02-copilot-sessions.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 92 KiB |
BIN
test-screenshots/PR-12804/03-s4-sdk-baseline-sdk.png
Normal file
BIN
test-screenshots/PR-12804/03-s4-sdk-baseline-sdk.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 92 KiB |
Reference in New Issue
Block a user