fix(backend/copilot): make upload_transcript atomic — sequential writes with bidirectional rollback

Previously JSONL and meta were uploaded in parallel; if meta failed the JSONL
was left orphaned (no rollback), causing the next restore to use wrong
mode/message_count defaults.

Now writes are sequential (JSONL first, meta second):
- JSONL failure: returns early, meta never written → neither file exists
- Meta failure: deletes JSONL (rollback) → neither file exists
- Process crash between writes: orphaned JSONL with no meta → download falls
  back to mode="sdk" / message_count=0 defaults (safe for SDK content; a
  baseline JSONL would fail --resume gracefully and fall back to DB context)

Also logs mode in both upload and download info lines, and updates tests:
- test_skips_upload_on_storage_failure: asserts meta store never called on JSONL failure
- test_rolls_back_session_when_meta_upload_fails: replaces old meta-rollback test
This commit is contained in:
Zamil Majdy
2026-04-16 14:18:36 +07:00
parent 7aef023f28
commit 2f3003f059
6 changed files with 58 additions and 36 deletions

View File

@@ -693,41 +693,48 @@ async def upload_transcript(
meta = {"message_count": message_count, "mode": mode, "uploaded_at": time.time()}
meta_encoded = json.dumps(meta).encode("utf-8")
session_result, meta_result = await asyncio.gather(
storage.store(workspace_id=wid, file_id=fid, filename=fname, content=content),
storage.store(
workspace_id=mwid, file_id=mfid, filename=mfname, content=meta_encoded
),
return_exceptions=True,
)
if isinstance(session_result, BaseException):
logger.warning(
"%s Failed to upload CLI session file: %s", log_prefix, session_result
# Write JSONL first, meta second — sequential so a crash between the two
# leaves an orphaned JSONL (no meta) rather than an orphaned meta (wrong
# watermark / mode paired with stale or absent content).
# On any failure we roll back the other file so the pair is always absent
# together; download_transcript returns None when either file is missing.
try:
await storage.store(
workspace_id=wid, file_id=fid, filename=fname, content=content
)
except Exception as session_err:
logger.warning(
"%s Failed to upload CLI session file: %s", log_prefix, session_err
)
# Roll back the companion meta to avoid pairing a stale session with a
# fresh watermark on the next restore. Best-effort — if this also fails
# the next restore will find a meta with no matching session (FileNotFoundError
# on the session bytes) and return None, so no data corruption can occur.
if not isinstance(meta_result, BaseException):
try:
meta_path = _build_path_from_parts(
_cli_session_meta_path_parts(user_id, session_id), storage
)
await storage.delete(meta_path)
except Exception as rollback_err:
logger.debug(
"%s Meta rollback failed (harmless): %s", log_prefix, rollback_err
)
return
if isinstance(meta_result, BaseException):
logger.warning(
"%s Failed to upload CLI session meta: %s", log_prefix, meta_result
try:
await storage.store(
workspace_id=mwid, file_id=mfid, filename=mfname, content=meta_encoded
)
except Exception as meta_err:
logger.warning("%s Failed to upload CLI session meta: %s", log_prefix, meta_err)
# Roll back the JSONL so neither file exists — avoids orphaned JSONL being
# used with wrong mode/watermark defaults on the next restore.
try:
session_path = _build_path_from_parts(
_cli_session_storage_path_parts(user_id, session_id), storage
)
await storage.delete(session_path)
except Exception as rollback_err:
logger.debug(
"%s Session rollback failed (harmless — download will return None): %s",
log_prefix,
rollback_err,
)
return
logger.info(
"%s Uploaded CLI session file (%dB, msg_count=%d) for cross-pod --resume",
"%s Uploaded CLI session (%dB, msg_count=%d, mode=%s)",
log_prefix,
len(content),
message_count,
mode,
)
@@ -793,10 +800,11 @@ async def download_transcript(
mode = raw_mode if raw_mode in ("sdk", "baseline") else "sdk"
logger.info(
"%s Downloaded CLI session (%dB, msg_count=%d)",
"%s Downloaded CLI session (%dB, msg_count=%d, mode=%s)",
log_prefix,
len(content),
message_count,
mode,
)
return TranscriptDownload(content=content, message_count=message_count, mode=mode)

View File

@@ -779,14 +779,18 @@ class TestUploadCliSession:
assert meta_content["message_count"] == 5
def test_skips_upload_on_storage_failure(self):
"""Storage exception on jsonl write is logged and does not propagate."""
"""Storage exception on jsonl write is logged and does not propagate.
With sequential writes, JSONL failure returns early — meta store is
never called, so no rollback is needed.
"""
import asyncio
from unittest.mock import AsyncMock, patch
from .transcript import upload_transcript
mock_storage = AsyncMock()
mock_storage.store.side_effect = [RuntimeError("gcs unavailable"), None]
mock_storage.store.side_effect = RuntimeError("gcs unavailable")
content = b'{"type":"assistant"}\n'
with patch(
@@ -803,16 +807,24 @@ class TestUploadCliSession:
)
)
def test_rolls_back_meta_when_session_upload_fails(self):
"""When session upload fails but meta succeeded, meta is rolled back."""
# Only one store call attempted (the JSONL); meta never reached
mock_storage.store.assert_called_once()
mock_storage.delete.assert_not_called()
def test_rolls_back_session_when_meta_upload_fails(self):
"""When meta upload fails after JSONL succeeds, JSONL is rolled back.
Guarantees the pair is either both present or both absent — avoids an
orphaned JSONL being used with wrong mode/watermark defaults.
"""
import asyncio
from unittest.mock import AsyncMock, patch
from .transcript import upload_transcript
mock_storage = AsyncMock()
# session store fails, meta store succeeds → rollback delete should be called
mock_storage.store.side_effect = [RuntimeError("gcs unavailable"), None]
# First store (JSONL) succeeds; second store (meta) fails
mock_storage.store.side_effect = [None, RuntimeError("meta write failed")]
content = b'{"type":"assistant"}\n'
with patch(
@@ -828,7 +840,9 @@ class TestUploadCliSession:
)
)
# delete should be called once for the meta rollback
# Both store calls were attempted (JSONL then meta)
assert mock_storage.store.call_count == 2
# JSONL should be rolled back via delete
mock_storage.delete.assert_called_once()
def test_baseline_mode_stored_in_meta(self):

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 129 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB