mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend/copilot): make TranscriptBuilder compaction-aware and compact oversized transcripts
TranscriptBuilder accumulated all messages including pre-compaction content, causing uploaded transcripts to grow unbounded. When the CLI compacted mid-stream, TranscriptBuilder kept the full uncompacted history. On the next turn, --resume would fail with "Prompt is too long". Fix 1 (prevent): After the CLI's PreCompact hook fires and compaction ends, read the CLI's session file (which reflects compaction) and replace TranscriptBuilder's entries via new replace_entries() method. Fix 2 (mitigate): At download time, if transcript exceeds 400KB threshold, compact it using compress_context (LLM summarization + truncation fallback) before passing to --resume. Upload the compacted version for future turns.
This commit is contained in:
@@ -78,8 +78,11 @@ from .tool_adapter import (
|
||||
wait_for_stash,
|
||||
)
|
||||
from .transcript import (
|
||||
COMPACT_THRESHOLD_BYTES,
|
||||
cleanup_cli_project_dir,
|
||||
compact_transcript,
|
||||
download_transcript,
|
||||
read_cli_session_file,
|
||||
upload_transcript,
|
||||
validate_transcript,
|
||||
write_transcript_to_tempfile,
|
||||
@@ -837,16 +840,41 @@ async def stream_chat_completion_sdk(
|
||||
is_valid,
|
||||
)
|
||||
if is_valid:
|
||||
transcript_content = dl.content
|
||||
# Compact oversized transcripts to prevent "Prompt is too long"
|
||||
if len(transcript_content) > COMPACT_THRESHOLD_BYTES:
|
||||
logger.warning(
|
||||
"%s Transcript oversized (%dB > %dB), compacting",
|
||||
log_prefix,
|
||||
len(transcript_content),
|
||||
COMPACT_THRESHOLD_BYTES,
|
||||
)
|
||||
compacted = await compact_transcript(
|
||||
transcript_content, log_prefix=log_prefix
|
||||
)
|
||||
if compacted:
|
||||
transcript_content = compacted
|
||||
# Upload the compacted version for future turns
|
||||
await upload_transcript(
|
||||
user_id=user_id or "",
|
||||
session_id=session_id,
|
||||
content=transcript_content,
|
||||
message_count=dl.message_count,
|
||||
log_prefix=log_prefix,
|
||||
)
|
||||
|
||||
# Load previous FULL context into builder
|
||||
transcript_builder.load_previous(dl.content, log_prefix=log_prefix)
|
||||
transcript_builder.load_previous(
|
||||
transcript_content, log_prefix=log_prefix
|
||||
)
|
||||
resume_file = write_transcript_to_tempfile(
|
||||
dl.content, session_id, sdk_cwd
|
||||
transcript_content, session_id, sdk_cwd
|
||||
)
|
||||
if resume_file:
|
||||
use_resume = True
|
||||
transcript_msg_count = dl.message_count
|
||||
logger.debug(
|
||||
f"{log_prefix} Using --resume ({len(dl.content)}B, "
|
||||
f"{log_prefix} Using --resume ({len(transcript_content)}B, "
|
||||
f"msg_count={transcript_msg_count})"
|
||||
)
|
||||
else:
|
||||
@@ -1166,9 +1194,19 @@ async def stream_chat_completion_sdk(
|
||||
if sdk_msg.total_cost_usd is not None:
|
||||
turn_cost_usd = sdk_msg.total_cost_usd
|
||||
|
||||
# Emit compaction end if SDK finished compacting
|
||||
for ev in await compaction.emit_end_if_ready(session):
|
||||
# Emit compaction end if SDK finished compacting.
|
||||
# When compaction ends, sync TranscriptBuilder with
|
||||
# the CLI's compacted session file so the uploaded
|
||||
# transcript reflects compaction.
|
||||
compaction_events = await compaction.emit_end_if_ready(session)
|
||||
for ev in compaction_events:
|
||||
yield ev
|
||||
if compaction_events and sdk_cwd:
|
||||
cli_content = read_cli_session_file(sdk_cwd)
|
||||
if cli_content:
|
||||
transcript_builder.replace_entries(
|
||||
cli_content, log_prefix=log_prefix
|
||||
)
|
||||
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
|
||||
@@ -137,32 +137,61 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
|
||||
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
|
||||
|
||||
|
||||
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
||||
"""Remove the CLI's project directory for a specific working directory.
|
||||
|
||||
The CLI stores session data under ``~/.claude/projects/<encoded_cwd>/``.
|
||||
Each SDK turn uses a unique ``sdk_cwd``, so the project directory is
|
||||
safe to remove entirely after the transcript has been uploaded.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
|
||||
def _cli_project_dir(sdk_cwd: str) -> str | None:
|
||||
"""Return the CLI's project directory for a given working directory."""
|
||||
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
|
||||
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
|
||||
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
|
||||
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
|
||||
|
||||
if not project_dir.startswith(projects_base + os.sep):
|
||||
logger.warning(
|
||||
f"[Transcript] Cleanup path escaped projects base: {project_dir}"
|
||||
)
|
||||
return
|
||||
logger.warning("[Transcript] Project dir escaped base: %s", project_dir)
|
||||
return None
|
||||
return project_dir
|
||||
|
||||
|
||||
def read_cli_session_file(sdk_cwd: str) -> str | None:
|
||||
"""Read the CLI's own session file, which reflects any mid-stream compaction.
|
||||
|
||||
After the CLI compacts context, its session file contains the compacted
|
||||
conversation. Reading this file lets ``TranscriptBuilder`` replace its
|
||||
uncompacted entries with the CLI's compacted version.
|
||||
"""
|
||||
import glob as glob_mod
|
||||
|
||||
project_dir = _cli_project_dir(sdk_cwd)
|
||||
if not project_dir or not os.path.isdir(project_dir):
|
||||
return None
|
||||
jsonl_files = glob_mod.glob(os.path.join(project_dir, "*.jsonl"))
|
||||
if not jsonl_files:
|
||||
logger.debug("[Transcript] No CLI session file in %s", project_dir)
|
||||
return None
|
||||
session_file = max(jsonl_files, key=os.path.getmtime)
|
||||
try:
|
||||
with open(session_file) as f:
|
||||
content = f.read()
|
||||
logger.info(
|
||||
"[Transcript] Read CLI session file: %s (%d bytes)",
|
||||
session_file,
|
||||
len(content),
|
||||
)
|
||||
return content
|
||||
except OSError as e:
|
||||
logger.warning("[Transcript] Failed to read CLI session file: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
||||
"""Remove the CLI's project directory for a specific working directory."""
|
||||
import shutil
|
||||
|
||||
project_dir = _cli_project_dir(sdk_cwd)
|
||||
if not project_dir:
|
||||
return
|
||||
if os.path.isdir(project_dir):
|
||||
shutil.rmtree(project_dir, ignore_errors=True)
|
||||
logger.debug(f"[Transcript] Cleaned up CLI project dir: {project_dir}")
|
||||
logger.debug("[Transcript] Cleaned up CLI project dir: %s", project_dir)
|
||||
else:
|
||||
logger.debug(f"[Transcript] Project dir not found: {project_dir}")
|
||||
logger.debug("[Transcript] Project dir not found: %s", project_dir)
|
||||
|
||||
|
||||
def write_transcript_to_tempfile(
|
||||
@@ -417,3 +446,147 @@ async def delete_transcript(user_id: str, session_id: str) -> None:
|
||||
logger.info(f"[Transcript] Deleted transcript for session {session_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[Transcript] Failed to delete transcript: {e}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Transcript compaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Transcripts above this byte threshold are compacted at download time.
|
||||
COMPACT_THRESHOLD_BYTES = 400_000
|
||||
|
||||
|
||||
def _transcript_to_messages(content: str) -> list[dict]:
|
||||
"""Convert JSONL transcript entries to message dicts for compress_context."""
|
||||
messages: list[dict] = []
|
||||
for line in content.strip().split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
entry = json.loads(line, fallback=None)
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
if entry.get("type", "") in STRIPPABLE_TYPES:
|
||||
continue
|
||||
msg = entry.get("message", {})
|
||||
role = msg.get("role", "")
|
||||
if not role:
|
||||
continue
|
||||
msg_dict: dict = {"role": role}
|
||||
raw_content = msg.get("content")
|
||||
if role == "assistant" and isinstance(raw_content, list):
|
||||
parts: list[str] = []
|
||||
for block in raw_content:
|
||||
if isinstance(block, dict):
|
||||
if block.get("type") == "text":
|
||||
parts.append(block.get("text", ""))
|
||||
elif block.get("type") == "tool_use":
|
||||
parts.append(f"[tool_use: {block.get('name', '?')}]")
|
||||
elif isinstance(block, str):
|
||||
parts.append(block)
|
||||
msg_dict["content"] = "\n".join(parts) if parts else ""
|
||||
elif isinstance(raw_content, list):
|
||||
parts = []
|
||||
for block in raw_content:
|
||||
if isinstance(block, dict) and block.get("type") == "tool_result":
|
||||
parts.append(str(block.get("content", "")))
|
||||
elif isinstance(block, str):
|
||||
parts.append(block)
|
||||
msg_dict["content"] = "\n".join(parts) if parts else ""
|
||||
else:
|
||||
msg_dict["content"] = raw_content or ""
|
||||
messages.append(msg_dict)
|
||||
return messages
|
||||
|
||||
|
||||
def _messages_to_transcript(messages: list[dict]) -> str:
|
||||
"""Convert compressed message dicts back to JSONL transcript format."""
|
||||
from uuid import uuid4
|
||||
|
||||
lines: list[str] = []
|
||||
last_uuid: str | None = None
|
||||
for msg in messages:
|
||||
role = msg.get("role", "user")
|
||||
entry_type = "assistant" if role == "assistant" else "user"
|
||||
uid = str(uuid4())
|
||||
content = msg.get("content", "")
|
||||
if role == "assistant":
|
||||
message: dict = {
|
||||
"role": "assistant",
|
||||
"model": "",
|
||||
"id": f"msg_compact_{uuid4().hex[:24]}",
|
||||
"type": "message",
|
||||
"content": [{"type": "text", "text": content}] if content else [],
|
||||
"stop_reason": "end_turn",
|
||||
"stop_sequence": None,
|
||||
}
|
||||
else:
|
||||
message = {"role": role, "content": content}
|
||||
entry = {
|
||||
"type": entry_type,
|
||||
"uuid": uid,
|
||||
"parentUuid": last_uuid,
|
||||
"message": message,
|
||||
}
|
||||
lines.append(json.dumps(entry, separators=(",", ":")))
|
||||
last_uuid = uid
|
||||
return "\n".join(lines) + "\n" if lines else ""
|
||||
|
||||
|
||||
async def compact_transcript(
|
||||
content: str,
|
||||
log_prefix: str = "[Transcript]",
|
||||
) -> str | None:
|
||||
"""Compact an oversized JSONL transcript using LLM summarization.
|
||||
|
||||
Converts transcript entries to plain messages, runs ``compress_context``
|
||||
(the same compressor used for pre-query history), and rebuilds JSONL.
|
||||
|
||||
Returns the compacted JSONL string, or ``None`` on failure.
|
||||
"""
|
||||
from backend.copilot.config import ChatConfig
|
||||
|
||||
cfg = ChatConfig()
|
||||
messages = _transcript_to_messages(content)
|
||||
if len(messages) < 2:
|
||||
logger.warning("%s Too few messages to compact (%d)", log_prefix, len(messages))
|
||||
return None
|
||||
try:
|
||||
import openai
|
||||
|
||||
from backend.util.prompt import compress_context
|
||||
|
||||
try:
|
||||
async with openai.AsyncOpenAI(
|
||||
api_key=cfg.api_key, base_url=cfg.base_url, timeout=30.0
|
||||
) as client:
|
||||
result = await compress_context(
|
||||
messages=messages, model=cfg.model, client=client
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"%s LLM compaction failed, using truncation: %s", log_prefix, e
|
||||
)
|
||||
result = await compress_context(
|
||||
messages=messages, model=cfg.model, client=None
|
||||
)
|
||||
if not result.was_compacted:
|
||||
logger.info("%s Transcript already within token budget", log_prefix)
|
||||
return content
|
||||
logger.info(
|
||||
"%s Compacted transcript: %d->%d tokens (%d summarized, %d dropped)",
|
||||
log_prefix,
|
||||
result.original_token_count,
|
||||
result.token_count,
|
||||
result.messages_summarized,
|
||||
result.messages_dropped,
|
||||
)
|
||||
compacted = _messages_to_transcript(result.messages)
|
||||
if not validate_transcript(compacted):
|
||||
logger.warning("%s Compacted transcript failed validation", log_prefix)
|
||||
return None
|
||||
return compacted
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"%s Transcript compaction failed: %s", log_prefix, e, exc_info=True
|
||||
)
|
||||
return None
|
||||
|
||||
@@ -177,6 +177,24 @@ class TranscriptBuilder:
|
||||
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
def replace_entries(self, content: str, log_prefix: str = "[Transcript]") -> None:
|
||||
"""Replace all entries with compacted JSONL content.
|
||||
|
||||
Called after the CLI performs mid-stream compaction so the builder's
|
||||
state reflects the compacted conversation instead of the full
|
||||
pre-compaction history.
|
||||
"""
|
||||
prev_count = len(self._entries)
|
||||
self._entries.clear()
|
||||
self._last_uuid = None
|
||||
self.load_previous(content, log_prefix=log_prefix)
|
||||
logger.info(
|
||||
"%s Replaced %d entries with %d compacted entries",
|
||||
log_prefix,
|
||||
prev_count,
|
||||
len(self._entries),
|
||||
)
|
||||
|
||||
@property
|
||||
def entry_count(self) -> int:
|
||||
"""Total number of entries in the complete context."""
|
||||
|
||||
Reference in New Issue
Block a user