mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-03 03:14:57 -05:00
Compare commits
4 Commits
ntindle/fi
...
refactor/u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72ae2e9504 | ||
|
|
d81d1ce024 | ||
|
|
2dd341c369 | ||
|
|
1081590384 |
@@ -3,9 +3,13 @@ import logging
|
||||
import time
|
||||
from asyncio import CancelledError
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import openai
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.util.prompt import CompressResult
|
||||
|
||||
import orjson
|
||||
from langfuse import get_client
|
||||
from openai import (
|
||||
@@ -15,7 +19,13 @@ from openai import (
|
||||
PermissionDeniedError,
|
||||
RateLimitError,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
|
||||
from openai.types.chat import (
|
||||
ChatCompletionChunk,
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionStreamOptionsParam,
|
||||
ChatCompletionSystemMessageParam,
|
||||
ChatCompletionToolParam,
|
||||
)
|
||||
|
||||
from backend.data.redis_client import get_redis_async
|
||||
from backend.data.understanding import (
|
||||
@@ -794,207 +804,58 @@ def _is_region_blocked_error(error: Exception) -> bool:
|
||||
return "not available in your region" in str(error).lower()
|
||||
|
||||
|
||||
async def _summarize_messages(
|
||||
async def _manage_context_window(
|
||||
messages: list,
|
||||
model: str,
|
||||
api_key: str | None = None,
|
||||
base_url: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""Summarize a list of messages into concise context.
|
||||
) -> "CompressResult":
|
||||
"""
|
||||
Manage context window using the unified compress_context function.
|
||||
|
||||
Uses the same model as the chat for higher quality summaries.
|
||||
This is a thin wrapper that creates an OpenAI client for summarization
|
||||
and delegates to the shared compression logic in prompt.py.
|
||||
|
||||
Args:
|
||||
messages: List of message dicts to summarize
|
||||
model: Model to use for summarization (same as chat model)
|
||||
api_key: API key for OpenAI client
|
||||
base_url: Base URL for OpenAI client
|
||||
timeout: Request timeout in seconds (default: 30.0)
|
||||
messages: List of messages in OpenAI format
|
||||
model: Model name for token counting and summarization
|
||||
api_key: API key for summarization calls
|
||||
base_url: Base URL for summarization calls
|
||||
|
||||
Returns:
|
||||
Summarized text
|
||||
CompressResult with compacted messages and metadata
|
||||
"""
|
||||
# Format messages for summarization
|
||||
conversation = []
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
# Include user, assistant, and tool messages (tool outputs are important context)
|
||||
if content and role in ("user", "assistant", "tool"):
|
||||
conversation.append(f"{role.upper()}: {content}")
|
||||
|
||||
conversation_text = "\n\n".join(conversation)
|
||||
|
||||
# Handle empty conversation
|
||||
if not conversation_text:
|
||||
return "No conversation history available."
|
||||
|
||||
# Truncate conversation to fit within summarization model's context
|
||||
# gpt-4o-mini has 128k context, but we limit to ~25k tokens (~100k chars) for safety
|
||||
MAX_CHARS = 100_000
|
||||
if len(conversation_text) > MAX_CHARS:
|
||||
conversation_text = conversation_text[:MAX_CHARS] + "\n\n[truncated]"
|
||||
|
||||
# Call LLM to summarize
|
||||
import openai
|
||||
|
||||
summarization_client = openai.AsyncOpenAI(
|
||||
api_key=api_key, base_url=base_url, timeout=timeout
|
||||
)
|
||||
from backend.util.prompt import compress_context
|
||||
|
||||
response = await summarization_client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"Create a detailed summary of the conversation so far. "
|
||||
"This summary will be used as context when continuing the conversation.\n\n"
|
||||
"Before writing the summary, analyze each message chronologically to identify:\n"
|
||||
"- User requests and their explicit goals\n"
|
||||
"- Your approach and key decisions made\n"
|
||||
"- Technical specifics (file names, tool outputs, function signatures)\n"
|
||||
"- Errors encountered and resolutions applied\n\n"
|
||||
"You MUST include ALL of the following sections:\n\n"
|
||||
"## 1. Primary Request and Intent\n"
|
||||
"The user's explicit goals and what they are trying to accomplish.\n\n"
|
||||
"## 2. Key Technical Concepts\n"
|
||||
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
|
||||
"## 3. Files and Resources Involved\n"
|
||||
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
|
||||
"## 4. Errors and Fixes\n"
|
||||
"Problems encountered, error messages, and their resolutions. "
|
||||
"Include any user feedback on fixes.\n\n"
|
||||
"## 5. Problem Solving\n"
|
||||
"Issues that have been resolved and how they were addressed.\n\n"
|
||||
"## 6. All User Messages\n"
|
||||
"A complete list of all user inputs (excluding tool outputs) to preserve their exact requests.\n\n"
|
||||
"## 7. Pending Tasks\n"
|
||||
"Work items the user explicitly requested that have not yet been completed.\n\n"
|
||||
"## 8. Current Work\n"
|
||||
"Precise description of what was being worked on most recently, including relevant context.\n\n"
|
||||
"## 9. Next Steps\n"
|
||||
"What should happen next, aligned with the user's most recent requests. "
|
||||
"Include verbatim quotes of recent instructions if relevant."
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
|
||||
],
|
||||
max_tokens=1500,
|
||||
temperature=0.3,
|
||||
)
|
||||
# Convert messages to dict format
|
||||
messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
summary = response.choices[0].message.content
|
||||
return summary or "No summary available."
|
||||
|
||||
|
||||
def _ensure_tool_pairs_intact(
|
||||
recent_messages: list[dict],
|
||||
all_messages: list[dict],
|
||||
start_index: int,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Ensure tool_call/tool_response pairs stay together after slicing.
|
||||
|
||||
When slicing messages for context compaction, a naive slice can separate
|
||||
an assistant message containing tool_calls from its corresponding tool
|
||||
response messages. This causes API validation errors (e.g., Anthropic's
|
||||
"unexpected tool_use_id found in tool_result blocks").
|
||||
|
||||
This function checks for orphan tool responses in the slice and extends
|
||||
backwards to include their corresponding assistant messages.
|
||||
|
||||
Args:
|
||||
recent_messages: The sliced messages to validate
|
||||
all_messages: The complete message list (for looking up missing assistants)
|
||||
start_index: The index in all_messages where recent_messages begins
|
||||
|
||||
Returns:
|
||||
A potentially extended list of messages with tool pairs intact
|
||||
"""
|
||||
if not recent_messages:
|
||||
return recent_messages
|
||||
|
||||
# Collect all tool_call_ids from assistant messages in the slice
|
||||
available_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tc_id = tc.get("id")
|
||||
if tc_id:
|
||||
available_tool_call_ids.add(tc_id)
|
||||
|
||||
# Find orphan tool responses (tool messages whose tool_call_id is missing)
|
||||
orphan_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "tool":
|
||||
tc_id = msg.get("tool_call_id")
|
||||
if tc_id and tc_id not in available_tool_call_ids:
|
||||
orphan_tool_call_ids.add(tc_id)
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# No orphans, slice is valid
|
||||
return recent_messages
|
||||
|
||||
# Find the assistant messages that contain the orphan tool_call_ids
|
||||
# Search backwards from start_index in all_messages
|
||||
messages_to_prepend: list[dict] = []
|
||||
for i in range(start_index - 1, -1, -1):
|
||||
msg = all_messages[i]
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
msg_tool_ids = {tc.get("id") for tc in msg["tool_calls"] if tc.get("id")}
|
||||
if msg_tool_ids & orphan_tool_call_ids:
|
||||
# This assistant message has tool_calls we need
|
||||
# Also collect its contiguous tool responses that follow it
|
||||
assistant_and_responses: list[dict] = [msg]
|
||||
|
||||
# Scan forward from this assistant to collect tool responses
|
||||
for j in range(i + 1, start_index):
|
||||
following_msg = all_messages[j]
|
||||
if following_msg.get("role") == "tool":
|
||||
tool_id = following_msg.get("tool_call_id")
|
||||
if tool_id and tool_id in msg_tool_ids:
|
||||
assistant_and_responses.append(following_msg)
|
||||
else:
|
||||
# Stop at first non-tool message
|
||||
break
|
||||
|
||||
# Prepend the assistant and its tool responses (maintain order)
|
||||
messages_to_prepend = assistant_and_responses + messages_to_prepend
|
||||
# Mark these as found
|
||||
orphan_tool_call_ids -= msg_tool_ids
|
||||
# Also add this assistant's tool_call_ids to available set
|
||||
available_tool_call_ids |= msg_tool_ids
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# Found all missing assistants
|
||||
break
|
||||
|
||||
if orphan_tool_call_ids:
|
||||
# Some tool_call_ids couldn't be resolved - remove those tool responses
|
||||
# This shouldn't happen in normal operation but handles edge cases
|
||||
logger.warning(
|
||||
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
|
||||
"Removing orphan tool responses."
|
||||
)
|
||||
recent_messages = [
|
||||
msg
|
||||
for msg in recent_messages
|
||||
if not (
|
||||
msg.get("role") == "tool"
|
||||
and msg.get("tool_call_id") in orphan_tool_call_ids
|
||||
# Only create client if api_key is provided (enables summarization)
|
||||
# Use context manager to avoid socket leaks
|
||||
if api_key:
|
||||
async with openai.AsyncOpenAI(
|
||||
api_key=api_key, base_url=base_url, timeout=30.0
|
||||
) as client:
|
||||
return await compress_context(
|
||||
messages=messages_dict,
|
||||
model=model,
|
||||
client=client,
|
||||
)
|
||||
]
|
||||
|
||||
if messages_to_prepend:
|
||||
logger.info(
|
||||
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
|
||||
f"tool_call/tool_response pairs"
|
||||
else:
|
||||
# No API key - use truncation-only mode
|
||||
return await compress_context(
|
||||
messages=messages_dict,
|
||||
model=model,
|
||||
client=None,
|
||||
)
|
||||
return messages_to_prepend + recent_messages
|
||||
|
||||
return recent_messages
|
||||
|
||||
|
||||
async def _stream_chat_chunks(
|
||||
@@ -1022,11 +883,8 @@ async def _stream_chat_chunks(
|
||||
|
||||
logger.info("Starting pure chat stream")
|
||||
|
||||
# Build messages with system prompt prepended
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
@@ -1034,314 +892,38 @@ async def _stream_chat_chunks(
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management
|
||||
token_count = 0 # Initialize for exception handler
|
||||
try:
|
||||
from backend.util.prompt import estimate_token_count
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
# Convert to dict for token counting
|
||||
# OpenAI message types are TypedDicts, so they're already dict-like
|
||||
messages_dict = []
|
||||
for msg in messages:
|
||||
# TypedDict objects are already dicts, just filter None values
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
# Fallback for unexpected types
|
||||
msg_dict = dict(msg)
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
# Estimate tokens using appropriate tokenizer
|
||||
# Normalize model name for token counting (tiktoken only supports OpenAI models)
|
||||
token_count_model = model
|
||||
if "/" in model:
|
||||
# Strip provider prefix (e.g., "anthropic/claude-opus-4.5" -> "claude-opus-4.5")
|
||||
token_count_model = model.split("/")[-1]
|
||||
|
||||
# For Claude and other non-OpenAI models, approximate with gpt-4o tokenizer
|
||||
# Most modern LLMs have similar tokenization (~1 token per 4 chars)
|
||||
if "claude" in token_count_model.lower() or not any(
|
||||
known in token_count_model.lower()
|
||||
for known in ["gpt", "o1", "chatgpt", "text-"]
|
||||
):
|
||||
token_count_model = "gpt-4o"
|
||||
|
||||
# Attempt token counting with error handling
|
||||
try:
|
||||
token_count = estimate_token_count(messages_dict, model=token_count_model)
|
||||
except Exception as token_error:
|
||||
# If token counting fails, use gpt-4o as fallback approximation
|
||||
logger.warning(
|
||||
f"Token counting failed for model {token_count_model}: {token_error}. "
|
||||
"Using gpt-4o approximation."
|
||||
)
|
||||
token_count = estimate_token_count(messages_dict, model="gpt-4o")
|
||||
|
||||
# If over threshold, summarize old messages
|
||||
if token_count > 120_000:
|
||||
KEEP_RECENT = 15
|
||||
|
||||
# Check if we have a system prompt at the start
|
||||
has_system_prompt = (
|
||||
len(messages) > 0 and messages[0].get("role") == "system"
|
||||
)
|
||||
|
||||
# Always attempt mitigation when over limit, even with few messages
|
||||
if messages:
|
||||
# Split messages based on whether system prompt exists
|
||||
# Calculate start index for the slice
|
||||
slice_start = max(0, len(messages_dict) - KEEP_RECENT)
|
||||
recent_messages = messages_dict[-KEEP_RECENT:]
|
||||
|
||||
# Ensure tool_call/tool_response pairs stay together
|
||||
# This prevents API errors from orphan tool responses
|
||||
recent_messages = _ensure_tool_pairs_intact(
|
||||
recent_messages, messages_dict, slice_start
|
||||
)
|
||||
|
||||
if has_system_prompt:
|
||||
# Keep system prompt separate, summarize everything between system and recent
|
||||
system_msg = messages[0]
|
||||
old_messages_dict = messages_dict[1:-KEEP_RECENT]
|
||||
else:
|
||||
# No system prompt, summarize everything except recent
|
||||
system_msg = None
|
||||
old_messages_dict = messages_dict[:-KEEP_RECENT]
|
||||
|
||||
# Summarize any non-empty old messages (no minimum threshold)
|
||||
# If we're over the token limit, we need to compress whatever we can
|
||||
if old_messages_dict:
|
||||
# Summarize old messages using the same model as chat
|
||||
summary_text = await _summarize_messages(
|
||||
old_messages_dict,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
# Build new message list
|
||||
# Use assistant role (not system) to prevent privilege escalation
|
||||
# of user-influenced content to instruction-level authority
|
||||
from openai.types.chat import ChatCompletionAssistantMessageParam
|
||||
|
||||
summary_msg = ChatCompletionAssistantMessageParam(
|
||||
role="assistant",
|
||||
content=(
|
||||
"[Previous conversation summary — for context only]: "
|
||||
f"{summary_text}"
|
||||
),
|
||||
)
|
||||
|
||||
# Rebuild messages based on whether we have a system prompt
|
||||
if has_system_prompt:
|
||||
# system_prompt + summary + recent_messages
|
||||
messages = [system_msg, summary_msg] + recent_messages
|
||||
else:
|
||||
# summary + recent_messages (no original system prompt)
|
||||
messages = [summary_msg] + recent_messages
|
||||
|
||||
logger.info(
|
||||
f"Context summarized: {token_count} tokens, "
|
||||
f"summarized {len(old_messages_dict)} old messages, "
|
||||
f"kept last {KEEP_RECENT} messages"
|
||||
)
|
||||
|
||||
# Fallback: If still over limit after summarization, progressively drop recent messages
|
||||
# This handles edge cases where recent messages are extremely large
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count > 120_000:
|
||||
# Still over limit - progressively reduce KEEP_RECENT
|
||||
logger.warning(
|
||||
f"Still over limit after summarization: {new_token_count} tokens. "
|
||||
"Reducing number of recent messages kept."
|
||||
)
|
||||
|
||||
for keep_count in [12, 10, 8, 5, 3, 2, 1, 0]:
|
||||
if keep_count == 0:
|
||||
# Try with just system prompt + summary (no recent messages)
|
||||
if has_system_prompt:
|
||||
messages = [system_msg, summary_msg]
|
||||
else:
|
||||
messages = [summary_msg]
|
||||
logger.info(
|
||||
"Trying with 0 recent messages (system + summary only)"
|
||||
)
|
||||
else:
|
||||
# Slice from ORIGINAL recent_messages to avoid duplicating summary
|
||||
reduced_recent = (
|
||||
recent_messages[-keep_count:]
|
||||
if len(recent_messages) >= keep_count
|
||||
else recent_messages
|
||||
)
|
||||
# Ensure tool pairs stay intact in the reduced slice
|
||||
reduced_slice_start = max(
|
||||
0, len(recent_messages) - keep_count
|
||||
)
|
||||
reduced_recent = _ensure_tool_pairs_intact(
|
||||
reduced_recent, recent_messages, reduced_slice_start
|
||||
)
|
||||
if has_system_prompt:
|
||||
messages = [
|
||||
system_msg,
|
||||
summary_msg,
|
||||
] + reduced_recent
|
||||
else:
|
||||
messages = [summary_msg] + reduced_recent
|
||||
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {
|
||||
k: v for k, v in msg.items() if v is not None
|
||||
}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count <= 120_000:
|
||||
logger.info(
|
||||
f"Reduced to {keep_count} recent messages, "
|
||||
f"now {new_token_count} tokens"
|
||||
)
|
||||
break
|
||||
else:
|
||||
logger.error(
|
||||
f"Unable to reduce token count below threshold even with 0 messages. "
|
||||
f"Final count: {new_token_count} tokens"
|
||||
)
|
||||
# ABSOLUTE LAST RESORT: Drop system prompt
|
||||
# This should only happen if summary itself is massive
|
||||
if has_system_prompt and len(messages) > 1:
|
||||
messages = messages[1:] # Drop system prompt
|
||||
logger.critical(
|
||||
"CRITICAL: Dropped system prompt as absolute last resort. "
|
||||
"Behavioral consistency may be affected."
|
||||
)
|
||||
# Yield error to user
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# No old messages to summarize - all messages are "recent"
|
||||
# Apply progressive truncation to reduce token count
|
||||
logger.warning(
|
||||
f"Token count {token_count} exceeds threshold but no old messages to summarize. "
|
||||
f"Applying progressive truncation to recent messages."
|
||||
)
|
||||
|
||||
# Create a base list excluding system prompt to avoid duplication
|
||||
# This is the pool of messages we'll slice from in the loop
|
||||
# Use messages_dict for type consistency with _ensure_tool_pairs_intact
|
||||
base_msgs = (
|
||||
messages_dict[1:] if has_system_prompt else messages_dict
|
||||
)
|
||||
|
||||
# Try progressively smaller keep counts
|
||||
new_token_count = token_count # Initialize with current count
|
||||
for keep_count in [12, 10, 8, 5, 3, 2, 1, 0]:
|
||||
if keep_count == 0:
|
||||
# Try with just system prompt (no recent messages)
|
||||
if has_system_prompt:
|
||||
messages = [system_msg]
|
||||
logger.info(
|
||||
"Trying with 0 recent messages (system prompt only)"
|
||||
)
|
||||
else:
|
||||
# No system prompt and no recent messages = empty messages list
|
||||
# This is invalid, skip this iteration
|
||||
continue
|
||||
else:
|
||||
if len(base_msgs) < keep_count:
|
||||
continue # Skip if we don't have enough messages
|
||||
|
||||
# Slice from base_msgs to get recent messages (without system prompt)
|
||||
recent_messages = base_msgs[-keep_count:]
|
||||
|
||||
# Ensure tool pairs stay intact in the reduced slice
|
||||
reduced_slice_start = max(0, len(base_msgs) - keep_count)
|
||||
recent_messages = _ensure_tool_pairs_intact(
|
||||
recent_messages, base_msgs, reduced_slice_start
|
||||
)
|
||||
|
||||
if has_system_prompt:
|
||||
messages = [system_msg] + recent_messages
|
||||
else:
|
||||
messages = recent_messages
|
||||
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if msg is None:
|
||||
continue # Skip None messages (type safety)
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {
|
||||
k: v for k, v in msg.items() if v is not None
|
||||
}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count <= 120_000:
|
||||
logger.info(
|
||||
f"Reduced to {keep_count} recent messages, "
|
||||
f"now {new_token_count} tokens"
|
||||
)
|
||||
break
|
||||
else:
|
||||
# Even with 0 messages still over limit
|
||||
logger.error(
|
||||
f"Unable to reduce token count below threshold even with 0 messages. "
|
||||
f"Final count: {new_token_count} tokens. Messages may be extremely large."
|
||||
)
|
||||
# ABSOLUTE LAST RESORT: Drop system prompt
|
||||
if has_system_prompt and len(messages) > 1:
|
||||
messages = messages[1:] # Drop system prompt
|
||||
logger.critical(
|
||||
"CRITICAL: Dropped system prompt as absolute last resort. "
|
||||
"Behavioral consistency may be affected."
|
||||
)
|
||||
# Yield error to user
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Context summarization failed: {e}", exc_info=True)
|
||||
# If we were over the token limit, yield error to user
|
||||
# Don't silently continue with oversized messages that will fail
|
||||
if token_count > 120_000:
|
||||
if context_result.error:
|
||||
if "System prompt dropped" in context_result.error:
|
||||
# Warning only - continue with reduced context
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
f"Unable to manage context window (token limit exceeded: {token_count} tokens). "
|
||||
"Context summarization failed. Please start a new conversation."
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Any other error - abort to prevent failed LLM calls
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
f"Context window management failed: {context_result.error}. "
|
||||
"Please start a new conversation."
|
||||
)
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
# Otherwise, continue with original messages (under limit)
|
||||
|
||||
messages = context_result.messages
|
||||
if context_result.was_compacted:
|
||||
logger.info(
|
||||
f"Context compacted for streaming: {context_result.token_count} tokens"
|
||||
)
|
||||
|
||||
# Loop to handle tool calls and continue conversation
|
||||
while True:
|
||||
@@ -1369,14 +951,6 @@ async def _stream_chat_chunks(
|
||||
:128
|
||||
] # OpenRouter limit
|
||||
|
||||
# Create the stream with proper types
|
||||
from typing import cast
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionStreamOptionsParam,
|
||||
)
|
||||
|
||||
stream = await client.chat.completions.create(
|
||||
model=model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
@@ -1900,17 +1474,36 @@ async def _generate_llm_continuation(
|
||||
# Build system prompt
|
||||
system_prompt, _ = await _build_system_prompt(user_id)
|
||||
|
||||
# Build messages in OpenAI format
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
)
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management to prevent oversized requests
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=config.model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
if context_result.error and "System prompt dropped" not in context_result.error:
|
||||
logger.error(
|
||||
f"Context window management failed for session {session_id}: "
|
||||
f"{context_result.error} (tokens={context_result.token_count})"
|
||||
)
|
||||
return
|
||||
|
||||
messages = context_result.messages
|
||||
if context_result.was_compacted:
|
||||
logger.info(
|
||||
f"Context compacted for LLM continuation: "
|
||||
f"{context_result.token_count} tokens"
|
||||
)
|
||||
|
||||
# Build extra_body for tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {
|
||||
@@ -1923,19 +1516,54 @@ async def _generate_llm_continuation(
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
# Make non-streaming LLM call (no tools - just text response)
|
||||
from typing import cast
|
||||
retry_count = 0
|
||||
last_error: Exception | None = None
|
||||
response = None
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
while retry_count <= MAX_RETRIES:
|
||||
try:
|
||||
logger.info(
|
||||
f"Generating LLM continuation for session {session_id}"
|
||||
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}"
|
||||
)
|
||||
|
||||
# No tools parameter = text-only response (no tool calls)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
last_error = None # Clear any previous error on success
|
||||
break # Success, exit retry loop
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if _is_retryable_error(e) and retry_count < MAX_RETRIES:
|
||||
retry_count += 1
|
||||
delay = min(
|
||||
BASE_DELAY_SECONDS * (2 ** (retry_count - 1)),
|
||||
MAX_DELAY_SECONDS,
|
||||
)
|
||||
logger.warning(
|
||||
f"Retryable error in LLM continuation: {e!s}. "
|
||||
f"Retrying in {delay:.1f}s (attempt {retry_count}/{MAX_RETRIES})"
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
# Non-retryable error - log and exit gracefully
|
||||
logger.error(
|
||||
f"Non-retryable error in LLM continuation: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
if response.choices and response.choices[0].message.content:
|
||||
if last_error:
|
||||
logger.error(
|
||||
f"Max retries ({MAX_RETRIES}) exceeded for LLM continuation. "
|
||||
f"Last error: {last_error!s}"
|
||||
)
|
||||
return
|
||||
|
||||
if response and response.choices and response.choices[0].message.content:
|
||||
assistant_content = response.choices[0].message.content
|
||||
|
||||
# Reload session from DB to avoid race condition with user messages
|
||||
|
||||
@@ -139,11 +139,10 @@ async def decompose_goal_external(
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
# Build the request payload
|
||||
payload: dict[str, Any] = {"description": description}
|
||||
if context:
|
||||
# The external service uses user_instruction for additional context
|
||||
payload["user_instruction"] = context
|
||||
description = f"{description}\n\nAdditional context from user:\n{context}"
|
||||
|
||||
payload: dict[str, Any] = {"description": description}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ from backend.data.model import (
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util import json
|
||||
from backend.util.logging import TruncatedLogger
|
||||
from backend.util.prompt import compress_prompt, estimate_token_count
|
||||
from backend.util.prompt import compress_context, estimate_token_count
|
||||
from backend.util.text import TextFormatter
|
||||
|
||||
logger = TruncatedLogger(logging.getLogger(__name__), "[LLM-Block]")
|
||||
@@ -634,11 +634,12 @@ async def llm_call(
|
||||
context_window = llm_model.context_window
|
||||
|
||||
if compress_prompt_to_fit:
|
||||
prompt = compress_prompt(
|
||||
result = await compress_context(
|
||||
messages=prompt,
|
||||
target_tokens=llm_model.context_window // 2,
|
||||
lossy_ok=True,
|
||||
client=None, # Truncation-only, no LLM summarization
|
||||
)
|
||||
prompt = result.messages
|
||||
|
||||
# Calculate available tokens based on context window and input length
|
||||
estimated_input_tokens = estimate_token_count(prompt)
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import fastapi
|
||||
from fastapi.routing import APIRoute
|
||||
|
||||
from backend.api.features.integrations.router import router as integrations_router
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.integrations.webhooks import utils as webhooks_utils
|
||||
|
||||
|
||||
def test_webhook_ingress_url_matches_route(monkeypatch) -> None:
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(integrations_router, prefix="/api/integrations")
|
||||
|
||||
provider = ProviderName.GITHUB
|
||||
webhook_id = "webhook_123"
|
||||
base_url = "https://example.com"
|
||||
|
||||
monkeypatch.setattr(webhooks_utils.app_config, "platform_base_url", base_url)
|
||||
|
||||
route = next(
|
||||
route
|
||||
for route in integrations_router.routes
|
||||
if isinstance(route, APIRoute)
|
||||
and route.path == "/{provider}/webhooks/{webhook_id}/ingress"
|
||||
and "POST" in route.methods
|
||||
)
|
||||
expected_path = f"/api/integrations{route.path}".format(
|
||||
provider=provider.value,
|
||||
webhook_id=webhook_id,
|
||||
)
|
||||
actual_url = urlparse(webhooks_utils.webhook_ingress_url(provider, webhook_id))
|
||||
expected_base = urlparse(base_url)
|
||||
|
||||
assert (actual_url.scheme, actual_url.netloc) == (
|
||||
expected_base.scheme,
|
||||
expected_base.netloc,
|
||||
)
|
||||
assert actual_url.path == expected_path
|
||||
@@ -1,10 +1,19 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from typing import Any
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from tiktoken import encoding_for_model
|
||||
|
||||
from backend.util import json
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------#
|
||||
# CONSTANTS #
|
||||
# ---------------------------------------------------------------------------#
|
||||
@@ -100,9 +109,17 @@ def _is_objective_message(msg: dict) -> bool:
|
||||
def _truncate_tool_message_content(msg: dict, enc, max_tokens: int) -> None:
|
||||
"""
|
||||
Carefully truncate tool message content while preserving tool structure.
|
||||
Only truncates tool_result content, leaves tool_use intact.
|
||||
Handles both Anthropic-style (list content) and OpenAI-style (string content) tool messages.
|
||||
"""
|
||||
content = msg.get("content")
|
||||
|
||||
# OpenAI-style tool message: role="tool" with string content
|
||||
if msg.get("role") == "tool" and isinstance(content, str):
|
||||
if _tok_len(content, enc) > max_tokens:
|
||||
msg["content"] = _truncate_middle_tokens(content, enc, max_tokens)
|
||||
return
|
||||
|
||||
# Anthropic-style: list content with tool_result items
|
||||
if not isinstance(content, list):
|
||||
return
|
||||
|
||||
@@ -140,141 +157,6 @@ def _truncate_middle_tokens(text: str, enc, max_tok: int) -> str:
|
||||
# ---------------------------------------------------------------------------#
|
||||
|
||||
|
||||
def compress_prompt(
|
||||
messages: list[dict],
|
||||
target_tokens: int,
|
||||
*,
|
||||
model: str = "gpt-4o",
|
||||
reserve: int = 2_048,
|
||||
start_cap: int = 8_192,
|
||||
floor_cap: int = 128,
|
||||
lossy_ok: bool = True,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Shrink *messages* so that::
|
||||
|
||||
token_count(prompt) + reserve ≤ target_tokens
|
||||
|
||||
Strategy
|
||||
--------
|
||||
1. **Token-aware truncation** – progressively halve a per-message cap
|
||||
(`start_cap`, `start_cap/2`, … `floor_cap`) and apply it to the
|
||||
*content* of every message except the first and last. Tool shells
|
||||
are included: we keep the envelope but shorten huge payloads.
|
||||
2. **Middle-out deletion** – if still over the limit, delete whole
|
||||
messages working outward from the centre, **skipping** any message
|
||||
that contains ``tool_calls`` or has ``role == "tool"``.
|
||||
3. **Last-chance trim** – if still too big, truncate the *first* and
|
||||
*last* message bodies down to `floor_cap` tokens.
|
||||
4. If the prompt is *still* too large:
|
||||
• raise ``ValueError`` when ``lossy_ok == False`` (default)
|
||||
• return the partially-trimmed prompt when ``lossy_ok == True``
|
||||
|
||||
Parameters
|
||||
----------
|
||||
messages Complete chat history (will be deep-copied).
|
||||
model Model name; passed to tiktoken to pick the right
|
||||
tokenizer (gpt-4o → 'o200k_base', others fallback).
|
||||
target_tokens Hard ceiling for prompt size **excluding** the model's
|
||||
forthcoming answer.
|
||||
reserve How many tokens you want to leave available for that
|
||||
answer (`max_tokens` in your subsequent completion call).
|
||||
start_cap Initial per-message truncation ceiling (tokens).
|
||||
floor_cap Lowest cap we'll accept before moving to deletions.
|
||||
lossy_ok If *True* return best-effort prompt instead of raising
|
||||
after all trim passes have been exhausted.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[dict] – A *new* messages list that abides by the rules above.
|
||||
"""
|
||||
enc = encoding_for_model(model) # best-match tokenizer
|
||||
msgs = deepcopy(messages) # never mutate caller
|
||||
|
||||
def total_tokens() -> int:
|
||||
"""Current size of *msgs* in tokens."""
|
||||
return sum(_msg_tokens(m, enc) for m in msgs)
|
||||
|
||||
original_token_count = total_tokens()
|
||||
|
||||
if original_token_count + reserve <= target_tokens:
|
||||
return msgs
|
||||
|
||||
# ---- STEP 0 : normalise content --------------------------------------
|
||||
# Convert non-string payloads to strings so token counting is coherent.
|
||||
for i, m in enumerate(msgs):
|
||||
if not isinstance(m.get("content"), str) and m.get("content") is not None:
|
||||
if _is_tool_message(m):
|
||||
continue
|
||||
|
||||
# Keep first and last messages intact (unless they're tool messages)
|
||||
if i == 0 or i == len(msgs) - 1:
|
||||
continue
|
||||
|
||||
# Reasonable 20k-char ceiling prevents pathological blobs
|
||||
content_str = json.dumps(m["content"], separators=(",", ":"))
|
||||
if len(content_str) > 20_000:
|
||||
content_str = _truncate_middle_tokens(content_str, enc, 20_000)
|
||||
m["content"] = content_str
|
||||
|
||||
# ---- STEP 1 : token-aware truncation ---------------------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for m in msgs[1:-1]: # keep first & last intact
|
||||
if _is_tool_message(m):
|
||||
# For tool messages, only truncate tool result content, preserve structure
|
||||
_truncate_tool_message_content(m, enc, cap)
|
||||
continue
|
||||
|
||||
if _is_objective_message(m):
|
||||
# Never truncate objective messages - they contain the core task
|
||||
continue
|
||||
|
||||
content = m.get("content") or ""
|
||||
if _tok_len(content, enc) > cap:
|
||||
m["content"] = _truncate_middle_tokens(content, enc, cap)
|
||||
cap //= 2 # tighten the screw
|
||||
|
||||
# ---- STEP 2 : middle-out deletion -----------------------------------
|
||||
while total_tokens() + reserve > target_tokens and len(msgs) > 2:
|
||||
# Identify all deletable messages (not first/last, not tool messages, not objective messages)
|
||||
deletable_indices = []
|
||||
for i in range(1, len(msgs) - 1): # Skip first and last
|
||||
if not _is_tool_message(msgs[i]) and not _is_objective_message(msgs[i]):
|
||||
deletable_indices.append(i)
|
||||
|
||||
if not deletable_indices:
|
||||
break # nothing more we can drop
|
||||
|
||||
# Delete from center outward - find the index closest to center
|
||||
centre = len(msgs) // 2
|
||||
to_delete = min(deletable_indices, key=lambda i: abs(i - centre))
|
||||
del msgs[to_delete]
|
||||
|
||||
# ---- STEP 3 : final safety-net trim on first & last ------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for idx in (0, -1): # first and last
|
||||
if _is_tool_message(msgs[idx]):
|
||||
# For tool messages at first/last position, truncate tool result content only
|
||||
_truncate_tool_message_content(msgs[idx], enc, cap)
|
||||
continue
|
||||
|
||||
text = msgs[idx].get("content") or ""
|
||||
if _tok_len(text, enc) > cap:
|
||||
msgs[idx]["content"] = _truncate_middle_tokens(text, enc, cap)
|
||||
cap //= 2 # tighten the screw
|
||||
|
||||
# ---- STEP 4 : success or fail-gracefully -----------------------------
|
||||
if total_tokens() + reserve > target_tokens and not lossy_ok:
|
||||
raise ValueError(
|
||||
"compress_prompt: prompt still exceeds budget "
|
||||
f"({total_tokens() + reserve} > {target_tokens})."
|
||||
)
|
||||
|
||||
return msgs
|
||||
|
||||
|
||||
def estimate_token_count(
|
||||
messages: list[dict],
|
||||
*,
|
||||
@@ -318,3 +200,430 @@ def estimate_token_count_str(
|
||||
enc = encoding_for_model(model) # best-match tokenizer
|
||||
text = json.dumps(text) if not isinstance(text, str) else text
|
||||
return _tok_len(text, enc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------#
|
||||
# UNIFIED CONTEXT COMPRESSION #
|
||||
# ---------------------------------------------------------------------------#
|
||||
|
||||
# Default thresholds
|
||||
DEFAULT_TOKEN_THRESHOLD = 120_000
|
||||
DEFAULT_KEEP_RECENT = 15
|
||||
|
||||
|
||||
@dataclass
|
||||
class CompressResult:
|
||||
"""Result of context compression."""
|
||||
|
||||
messages: list[dict]
|
||||
token_count: int
|
||||
was_compacted: bool
|
||||
error: str | None = None
|
||||
original_token_count: int = 0
|
||||
messages_summarized: int = 0
|
||||
messages_dropped: int = 0
|
||||
|
||||
|
||||
def _normalize_model_for_tokenizer(model: str) -> str:
|
||||
"""Normalize model name for tiktoken tokenizer selection."""
|
||||
if "/" in model:
|
||||
model = model.split("/")[-1]
|
||||
if "claude" in model.lower() or not any(
|
||||
known in model.lower() for known in ["gpt", "o1", "chatgpt", "text-"]
|
||||
):
|
||||
return "gpt-4o"
|
||||
return model
|
||||
|
||||
|
||||
def _ensure_tool_pairs_intact(
|
||||
recent_messages: list[dict],
|
||||
all_messages: list[dict],
|
||||
start_index: int,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Ensure tool_call/tool_response pairs stay together after slicing.
|
||||
|
||||
When slicing messages for context compaction, a naive slice can separate
|
||||
an assistant message containing tool_calls from its corresponding tool
|
||||
response messages. This causes API validation errors (e.g., Anthropic's
|
||||
"unexpected tool_use_id found in tool_result blocks").
|
||||
|
||||
This function checks for orphan tool responses in the slice and extends
|
||||
backwards to include their corresponding assistant messages.
|
||||
|
||||
Args:
|
||||
recent_messages: The sliced messages to validate
|
||||
all_messages: The complete message list (for looking up missing assistants)
|
||||
start_index: The index in all_messages where recent_messages begins
|
||||
|
||||
Returns:
|
||||
A potentially extended list of messages with tool pairs intact
|
||||
"""
|
||||
if not recent_messages:
|
||||
return recent_messages
|
||||
|
||||
# Collect all tool_call_ids from assistant messages in the slice
|
||||
available_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tc_id = tc.get("id")
|
||||
if tc_id:
|
||||
available_tool_call_ids.add(tc_id)
|
||||
|
||||
# Find orphan tool responses (tool messages whose tool_call_id is missing)
|
||||
orphan_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "tool":
|
||||
tc_id = msg.get("tool_call_id")
|
||||
if tc_id and tc_id not in available_tool_call_ids:
|
||||
orphan_tool_call_ids.add(tc_id)
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# No orphans, slice is valid
|
||||
return recent_messages
|
||||
|
||||
# Find the assistant messages that contain the orphan tool_call_ids
|
||||
# Search backwards from start_index in all_messages
|
||||
messages_to_prepend: list[dict] = []
|
||||
for i in range(start_index - 1, -1, -1):
|
||||
msg = all_messages[i]
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
msg_tool_ids = {tc.get("id") for tc in msg["tool_calls"] if tc.get("id")}
|
||||
if msg_tool_ids & orphan_tool_call_ids:
|
||||
# This assistant message has tool_calls we need
|
||||
# Also collect its contiguous tool responses that follow it
|
||||
assistant_and_responses: list[dict] = [msg]
|
||||
|
||||
# Scan forward from this assistant to collect tool responses
|
||||
for j in range(i + 1, start_index):
|
||||
following_msg = all_messages[j]
|
||||
if following_msg.get("role") == "tool":
|
||||
tool_id = following_msg.get("tool_call_id")
|
||||
if tool_id and tool_id in msg_tool_ids:
|
||||
assistant_and_responses.append(following_msg)
|
||||
else:
|
||||
# Stop at first non-tool message
|
||||
break
|
||||
|
||||
# Prepend the assistant and its tool responses (maintain order)
|
||||
messages_to_prepend = assistant_and_responses + messages_to_prepend
|
||||
# Mark these as found
|
||||
orphan_tool_call_ids -= msg_tool_ids
|
||||
# Also add this assistant's tool_call_ids to available set
|
||||
available_tool_call_ids |= msg_tool_ids
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# Found all missing assistants
|
||||
break
|
||||
|
||||
if orphan_tool_call_ids:
|
||||
# Some tool_call_ids couldn't be resolved - remove those tool responses
|
||||
# This shouldn't happen in normal operation but handles edge cases
|
||||
logger.warning(
|
||||
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
|
||||
"Removing orphan tool responses."
|
||||
)
|
||||
recent_messages = [
|
||||
msg
|
||||
for msg in recent_messages
|
||||
if not (
|
||||
msg.get("role") == "tool"
|
||||
and msg.get("tool_call_id") in orphan_tool_call_ids
|
||||
)
|
||||
]
|
||||
|
||||
if messages_to_prepend:
|
||||
logger.info(
|
||||
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
|
||||
f"tool_call/tool_response pairs"
|
||||
)
|
||||
return messages_to_prepend + recent_messages
|
||||
|
||||
return recent_messages
|
||||
|
||||
|
||||
async def _summarize_messages_llm(
|
||||
messages: list[dict],
|
||||
client: AsyncOpenAI,
|
||||
model: str,
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""Summarize messages using an LLM."""
|
||||
conversation = []
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if content and role in ("user", "assistant", "tool"):
|
||||
conversation.append(f"{role.upper()}: {content}")
|
||||
|
||||
conversation_text = "\n\n".join(conversation)
|
||||
|
||||
if not conversation_text:
|
||||
return "No conversation history available."
|
||||
|
||||
# Limit to ~100k chars for safety
|
||||
MAX_CHARS = 100_000
|
||||
if len(conversation_text) > MAX_CHARS:
|
||||
conversation_text = conversation_text[:MAX_CHARS] + "\n\n[truncated]"
|
||||
|
||||
response = await client.with_options(timeout=timeout).chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"Create a detailed summary of the conversation so far. "
|
||||
"This summary will be used as context when continuing the conversation.\n\n"
|
||||
"Before writing the summary, analyze each message chronologically to identify:\n"
|
||||
"- User requests and their explicit goals\n"
|
||||
"- Your approach and key decisions made\n"
|
||||
"- Technical specifics (file names, tool outputs, function signatures)\n"
|
||||
"- Errors encountered and resolutions applied\n\n"
|
||||
"You MUST include ALL of the following sections:\n\n"
|
||||
"## 1. Primary Request and Intent\n"
|
||||
"The user's explicit goals and what they are trying to accomplish.\n\n"
|
||||
"## 2. Key Technical Concepts\n"
|
||||
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
|
||||
"## 3. Files and Resources Involved\n"
|
||||
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
|
||||
"## 4. Errors and Fixes\n"
|
||||
"Problems encountered, error messages, and their resolutions. "
|
||||
"Include any user feedback on fixes.\n\n"
|
||||
"## 5. Problem Solving\n"
|
||||
"Issues that have been resolved and how they were addressed.\n\n"
|
||||
"## 6. All User Messages\n"
|
||||
"A complete list of all user inputs (excluding tool outputs) to preserve their exact requests.\n\n"
|
||||
"## 7. Pending Tasks\n"
|
||||
"Work items the user explicitly requested that have not yet been completed.\n\n"
|
||||
"## 8. Current Work\n"
|
||||
"Precise description of what was being worked on most recently, including relevant context.\n\n"
|
||||
"## 9. Next Steps\n"
|
||||
"What should happen next, aligned with the user's most recent requests. "
|
||||
"Include verbatim quotes of recent instructions if relevant."
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
|
||||
],
|
||||
max_tokens=1500,
|
||||
temperature=0.3,
|
||||
)
|
||||
|
||||
return response.choices[0].message.content or "No summary available."
|
||||
|
||||
|
||||
async def compress_context(
|
||||
messages: list[dict],
|
||||
target_tokens: int = DEFAULT_TOKEN_THRESHOLD,
|
||||
*,
|
||||
model: str = "gpt-4o",
|
||||
client: AsyncOpenAI | None = None,
|
||||
keep_recent: int = DEFAULT_KEEP_RECENT,
|
||||
reserve: int = 2_048,
|
||||
start_cap: int = 8_192,
|
||||
floor_cap: int = 128,
|
||||
) -> CompressResult:
|
||||
"""
|
||||
Unified context compression that combines summarization and truncation strategies.
|
||||
|
||||
Strategy (in order):
|
||||
1. **Content truncation** – Progressively halve a per-message cap and truncate
|
||||
bloated message content (tool outputs, large pastes).
|
||||
2. **LLM summarization** – If client provided, summarize old messages into a
|
||||
single context message while keeping recent messages intact.
|
||||
3. **Message dropping** – If still over limit, progressively reduce the number
|
||||
of recent messages kept.
|
||||
4. **Middle-out deletion** – Delete whole messages from the center outward,
|
||||
skipping tool messages and objective messages.
|
||||
5. **First/last trim** – Truncate first and last message content as last resort.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
messages Complete chat history (will be deep-copied).
|
||||
target_tokens Hard ceiling for prompt size.
|
||||
model Model name for tokenization and summarization.
|
||||
client AsyncOpenAI client. If provided, enables LLM summarization.
|
||||
If None, only truncation/deletion strategies are used.
|
||||
keep_recent Number of recent messages to preserve during summarization.
|
||||
reserve Tokens to reserve for model response.
|
||||
start_cap Initial per-message truncation ceiling (tokens).
|
||||
floor_cap Lowest cap before moving to deletions.
|
||||
|
||||
Returns
|
||||
-------
|
||||
CompressResult with compressed messages and metadata.
|
||||
"""
|
||||
token_model = _normalize_model_for_tokenizer(model)
|
||||
enc = encoding_for_model(token_model)
|
||||
msgs = deepcopy(messages)
|
||||
|
||||
def total_tokens() -> int:
|
||||
return sum(_msg_tokens(m, enc) for m in msgs)
|
||||
|
||||
original_count = total_tokens()
|
||||
|
||||
# Already under limit
|
||||
if original_count + reserve <= target_tokens:
|
||||
return CompressResult(
|
||||
messages=msgs,
|
||||
token_count=original_count,
|
||||
was_compacted=False,
|
||||
original_token_count=original_count,
|
||||
)
|
||||
|
||||
messages_summarized = 0
|
||||
messages_dropped = 0
|
||||
|
||||
# ---- STEP 0: Normalize content ----------------------------------------
|
||||
for i, m in enumerate(msgs):
|
||||
if not isinstance(m.get("content"), str) and m.get("content") is not None:
|
||||
if _is_tool_message(m):
|
||||
continue
|
||||
if i == 0 or i == len(msgs) - 1:
|
||||
continue
|
||||
content_str = json.dumps(m["content"], separators=(",", ":"))
|
||||
if len(content_str) > 20_000:
|
||||
content_str = _truncate_middle_tokens(content_str, enc, 20_000)
|
||||
m["content"] = content_str
|
||||
|
||||
# ---- STEP 1: Token-aware content truncation ---------------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for m in msgs[1:-1]:
|
||||
if _is_tool_message(m):
|
||||
_truncate_tool_message_content(m, enc, cap)
|
||||
continue
|
||||
if _is_objective_message(m):
|
||||
continue
|
||||
content = m.get("content") or ""
|
||||
if _tok_len(content, enc) > cap:
|
||||
m["content"] = _truncate_middle_tokens(content, enc, cap)
|
||||
cap //= 2
|
||||
|
||||
# ---- STEP 2: LLM summarization (if client provided) -------------------
|
||||
if total_tokens() + reserve > target_tokens and client is not None:
|
||||
has_system = len(msgs) > 0 and msgs[0].get("role") == "system"
|
||||
system_msg = msgs[0] if has_system else None
|
||||
|
||||
# Calculate old vs recent messages
|
||||
if has_system:
|
||||
if len(msgs) > keep_recent + 1:
|
||||
old_msgs = msgs[1:-keep_recent]
|
||||
recent_msgs = msgs[-keep_recent:]
|
||||
else:
|
||||
old_msgs = []
|
||||
recent_msgs = msgs[1:] if len(msgs) > 1 else []
|
||||
else:
|
||||
if len(msgs) > keep_recent:
|
||||
old_msgs = msgs[:-keep_recent]
|
||||
recent_msgs = msgs[-keep_recent:]
|
||||
else:
|
||||
old_msgs = []
|
||||
recent_msgs = msgs
|
||||
|
||||
# Ensure tool pairs stay intact
|
||||
# slice_start is where recent_msgs begins in the original msgs list
|
||||
slice_start = max(0, len(msgs) - keep_recent)
|
||||
recent_msgs = _ensure_tool_pairs_intact(recent_msgs, msgs, slice_start)
|
||||
|
||||
if old_msgs:
|
||||
try:
|
||||
summary_text = await _summarize_messages_llm(old_msgs, client, model)
|
||||
summary_msg = {
|
||||
"role": "assistant",
|
||||
"content": f"[Previous conversation summary — for context only]: {summary_text}",
|
||||
}
|
||||
messages_summarized = len(old_msgs)
|
||||
|
||||
if has_system:
|
||||
msgs = [system_msg, summary_msg] + recent_msgs
|
||||
else:
|
||||
msgs = [summary_msg] + recent_msgs
|
||||
|
||||
logger.info(
|
||||
f"Context summarized: {original_count} -> {total_tokens()} tokens, "
|
||||
f"summarized {messages_summarized} messages"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Summarization failed, continuing with truncation: {e}")
|
||||
# Fall through to message dropping
|
||||
|
||||
# ---- STEP 3: Progressive message reduction ----------------------------
|
||||
if total_tokens() + reserve > target_tokens:
|
||||
has_system = len(msgs) > 0 and msgs[0].get("role") == "system"
|
||||
system_msg: dict | None = msgs[0] if has_system else None
|
||||
working_msgs = msgs[1:] if has_system else msgs
|
||||
|
||||
for keep_count in [12, 10, 8, 5, 3, 2, 1]:
|
||||
if len(working_msgs) <= keep_count:
|
||||
continue
|
||||
|
||||
dropped = len(working_msgs) - keep_count
|
||||
reduced = working_msgs[-keep_count:]
|
||||
slice_start = max(0, len(working_msgs) - keep_count)
|
||||
reduced = _ensure_tool_pairs_intact(reduced, working_msgs, slice_start)
|
||||
|
||||
if has_system and system_msg is not None:
|
||||
test_msgs: list[dict] = [system_msg] + reduced
|
||||
else:
|
||||
test_msgs = reduced
|
||||
test_count = sum(_msg_tokens(m, enc) for m in test_msgs)
|
||||
|
||||
if test_count + reserve <= target_tokens:
|
||||
msgs = test_msgs
|
||||
messages_dropped += dropped
|
||||
logger.info(
|
||||
f"Reduced to {keep_count} messages, now {test_count} tokens"
|
||||
)
|
||||
break
|
||||
|
||||
# ---- STEP 4: Middle-out deletion --------------------------------------
|
||||
while total_tokens() + reserve > target_tokens and len(msgs) > 2:
|
||||
deletable: list[int] = []
|
||||
for i in range(1, len(msgs) - 1):
|
||||
msg = msgs[i]
|
||||
if (
|
||||
msg is not None
|
||||
and not _is_tool_message(msg)
|
||||
and not _is_objective_message(msg)
|
||||
):
|
||||
deletable.append(i)
|
||||
if not deletable:
|
||||
break
|
||||
centre = len(msgs) // 2
|
||||
to_delete = min(deletable, key=lambda i: abs(i - centre))
|
||||
del msgs[to_delete]
|
||||
messages_dropped += 1
|
||||
|
||||
# ---- STEP 5: Final trim on first/last ---------------------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for idx in (0, -1):
|
||||
msg = msgs[idx]
|
||||
if msg is None:
|
||||
continue
|
||||
if _is_tool_message(msg):
|
||||
_truncate_tool_message_content(msg, enc, cap)
|
||||
continue
|
||||
text = msg.get("content") or ""
|
||||
if _tok_len(text, enc) > cap:
|
||||
msg["content"] = _truncate_middle_tokens(text, enc, cap)
|
||||
cap //= 2
|
||||
|
||||
# Filter out any None values that may have been introduced
|
||||
final_msgs: list[dict] = [m for m in msgs if m is not None]
|
||||
final_count = sum(_msg_tokens(m, enc) for m in final_msgs)
|
||||
error = None
|
||||
if final_count + reserve > target_tokens:
|
||||
error = f"Could not compress below target ({final_count + reserve} > {target_tokens})"
|
||||
logger.warning(error)
|
||||
|
||||
return CompressResult(
|
||||
messages=final_msgs,
|
||||
token_count=final_count,
|
||||
was_compacted=True,
|
||||
error=error,
|
||||
original_token_count=original_count,
|
||||
messages_summarized=messages_summarized,
|
||||
messages_dropped=messages_dropped,
|
||||
)
|
||||
|
||||
@@ -1,10 +1,21 @@
|
||||
"""Tests for prompt utility functions, especially tool call token counting."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from tiktoken import encoding_for_model
|
||||
|
||||
from backend.util import json
|
||||
from backend.util.prompt import _msg_tokens, estimate_token_count
|
||||
from backend.util.prompt import (
|
||||
CompressResult,
|
||||
_ensure_tool_pairs_intact,
|
||||
_msg_tokens,
|
||||
_normalize_model_for_tokenizer,
|
||||
_truncate_middle_tokens,
|
||||
_truncate_tool_message_content,
|
||||
compress_context,
|
||||
estimate_token_count,
|
||||
)
|
||||
|
||||
|
||||
class TestMsgTokens:
|
||||
@@ -276,3 +287,337 @@ class TestEstimateTokenCount:
|
||||
|
||||
assert total_tokens == expected_total
|
||||
assert total_tokens > 20 # Should be substantial
|
||||
|
||||
|
||||
class TestNormalizeModelForTokenizer:
|
||||
"""Test model name normalization for tiktoken."""
|
||||
|
||||
def test_openai_models_unchanged(self):
|
||||
"""Test that OpenAI models are returned as-is."""
|
||||
assert _normalize_model_for_tokenizer("gpt-4o") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("gpt-4") == "gpt-4"
|
||||
assert _normalize_model_for_tokenizer("gpt-3.5-turbo") == "gpt-3.5-turbo"
|
||||
|
||||
def test_claude_models_normalized(self):
|
||||
"""Test that Claude models are normalized to gpt-4o."""
|
||||
assert _normalize_model_for_tokenizer("claude-3-opus") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("claude-3-sonnet") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("anthropic/claude-3-haiku") == "gpt-4o"
|
||||
|
||||
def test_openrouter_paths_extracted(self):
|
||||
"""Test that OpenRouter model paths are handled."""
|
||||
assert _normalize_model_for_tokenizer("openai/gpt-4o") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("anthropic/claude-3-opus") == "gpt-4o"
|
||||
|
||||
def test_unknown_models_default_to_gpt4o(self):
|
||||
"""Test that unknown models default to gpt-4o."""
|
||||
assert _normalize_model_for_tokenizer("some-random-model") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("llama-3-70b") == "gpt-4o"
|
||||
|
||||
|
||||
class TestTruncateToolMessageContent:
|
||||
"""Test tool message content truncation."""
|
||||
|
||||
@pytest.fixture
|
||||
def enc(self):
|
||||
return encoding_for_model("gpt-4o")
|
||||
|
||||
def test_truncate_openai_tool_message(self, enc):
|
||||
"""Test truncation of OpenAI-style tool message with string content."""
|
||||
long_content = "x" * 10000
|
||||
msg = {"role": "tool", "tool_call_id": "call_123", "content": long_content}
|
||||
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=100)
|
||||
|
||||
# Content should be truncated
|
||||
assert len(msg["content"]) < len(long_content)
|
||||
assert "…" in msg["content"] # Has ellipsis marker
|
||||
|
||||
def test_truncate_anthropic_tool_result(self, enc):
|
||||
"""Test truncation of Anthropic-style tool_result."""
|
||||
long_content = "y" * 10000
|
||||
msg = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_123",
|
||||
"content": long_content,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=100)
|
||||
|
||||
# Content should be truncated
|
||||
result_content = msg["content"][0]["content"]
|
||||
assert len(result_content) < len(long_content)
|
||||
assert "…" in result_content
|
||||
|
||||
def test_preserve_tool_use_blocks(self, enc):
|
||||
"""Test that tool_use blocks are not truncated."""
|
||||
msg = {
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_123",
|
||||
"name": "some_function",
|
||||
"input": {"key": "value" * 1000}, # Large input
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
original = json.dumps(msg["content"][0]["input"])
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=10)
|
||||
|
||||
# tool_use should be unchanged
|
||||
assert json.dumps(msg["content"][0]["input"]) == original
|
||||
|
||||
def test_no_truncation_when_under_limit(self, enc):
|
||||
"""Test that short content is not modified."""
|
||||
msg = {"role": "tool", "tool_call_id": "call_123", "content": "Short content"}
|
||||
|
||||
original = msg["content"]
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=1000)
|
||||
|
||||
assert msg["content"] == original
|
||||
|
||||
|
||||
class TestTruncateMiddleTokens:
|
||||
"""Test middle truncation of text."""
|
||||
|
||||
@pytest.fixture
|
||||
def enc(self):
|
||||
return encoding_for_model("gpt-4o")
|
||||
|
||||
def test_truncates_long_text(self, enc):
|
||||
"""Test that long text is truncated with ellipsis in middle."""
|
||||
long_text = "word " * 1000
|
||||
result = _truncate_middle_tokens(long_text, enc, max_tok=50)
|
||||
|
||||
assert len(enc.encode(result)) <= 52 # Allow some slack for ellipsis
|
||||
assert "…" in result
|
||||
assert result.startswith("word") # Head preserved
|
||||
assert result.endswith("word ") # Tail preserved
|
||||
|
||||
def test_preserves_short_text(self, enc):
|
||||
"""Test that short text is not modified."""
|
||||
short_text = "Hello world"
|
||||
result = _truncate_middle_tokens(short_text, enc, max_tok=100)
|
||||
|
||||
assert result == short_text
|
||||
|
||||
|
||||
class TestEnsureToolPairsIntact:
|
||||
"""Test tool call/response pair preservation."""
|
||||
|
||||
def test_adds_missing_tool_call(self):
|
||||
"""Test that orphaned tool_response gets its tool_call prepended."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "f1"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "result"},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (the tool response)
|
||||
recent = [all_msgs[2], all_msgs[3]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the tool_call message
|
||||
assert len(result) == 3
|
||||
assert result[0]["role"] == "assistant"
|
||||
assert "tool_calls" in result[0]
|
||||
|
||||
def test_keeps_complete_pairs(self):
|
||||
"""Test that complete pairs are unchanged."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "System"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "f1"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "result"},
|
||||
]
|
||||
recent = all_msgs[1:] # Include both tool_call and response
|
||||
start_index = 1
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
assert len(result) == 2 # No messages added
|
||||
|
||||
def test_handles_no_tool_messages(self):
|
||||
"""Test messages without tool calls."""
|
||||
all_msgs = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there!"},
|
||||
]
|
||||
recent = all_msgs
|
||||
start_index = 0
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
assert result == all_msgs
|
||||
|
||||
|
||||
class TestCompressContext:
|
||||
"""Test the async compress_context function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_compression_needed(self):
|
||||
"""Test messages under limit return without compression."""
|
||||
messages = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{"role": "user", "content": "Hello!"},
|
||||
]
|
||||
|
||||
result = await compress_context(messages, target_tokens=100000)
|
||||
|
||||
assert isinstance(result, CompressResult)
|
||||
assert result.was_compacted is False
|
||||
assert len(result.messages) == 2
|
||||
assert result.error is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_truncation_without_client(self):
|
||||
"""Test that truncation works without LLM client."""
|
||||
long_content = "x" * 50000
|
||||
messages = [
|
||||
{"role": "system", "content": "System"},
|
||||
{"role": "user", "content": long_content},
|
||||
{"role": "assistant", "content": "Response"},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=1000, client=None, reserve=100
|
||||
)
|
||||
|
||||
assert result.was_compacted is True
|
||||
# Should have truncated without summarization
|
||||
assert result.messages_summarized == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_with_mocked_llm_client(self):
|
||||
"""Test summarization with mocked LLM client."""
|
||||
# Create many messages to trigger summarization
|
||||
messages = [{"role": "system", "content": "System prompt"}]
|
||||
for i in range(30):
|
||||
messages.append({"role": "user", "content": f"User message {i} " * 100})
|
||||
messages.append(
|
||||
{"role": "assistant", "content": f"Assistant response {i} " * 100}
|
||||
)
|
||||
|
||||
# Mock the AsyncOpenAI client
|
||||
mock_client = AsyncMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [MagicMock()]
|
||||
mock_response.choices[0].message.content = "Summary of conversation"
|
||||
mock_client.with_options.return_value.chat.completions.create = AsyncMock(
|
||||
return_value=mock_response
|
||||
)
|
||||
|
||||
result = await compress_context(
|
||||
messages,
|
||||
target_tokens=5000,
|
||||
client=mock_client,
|
||||
keep_recent=5,
|
||||
reserve=500,
|
||||
)
|
||||
|
||||
assert result.was_compacted is True
|
||||
# Should have attempted summarization
|
||||
assert mock_client.with_options.called or result.messages_summarized > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preserves_tool_pairs(self):
|
||||
"""Test that tool call/response pairs stay together."""
|
||||
messages = [
|
||||
{"role": "system", "content": "System"},
|
||||
{"role": "user", "content": "Do something"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "func"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "Result " * 1000},
|
||||
{"role": "assistant", "content": "Done!"},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=500, client=None, reserve=50
|
||||
)
|
||||
|
||||
# Check that if tool response exists, its call exists too
|
||||
tool_call_ids = set()
|
||||
tool_response_ids = set()
|
||||
for msg in result.messages:
|
||||
if "tool_calls" in msg:
|
||||
for tc in msg["tool_calls"]:
|
||||
tool_call_ids.add(tc["id"])
|
||||
if msg.get("role") == "tool":
|
||||
tool_response_ids.add(msg.get("tool_call_id"))
|
||||
|
||||
# All tool responses should have their calls
|
||||
assert tool_response_ids <= tool_call_ids
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_cannot_compress(self):
|
||||
"""Test that error is returned when compression fails."""
|
||||
# Single huge message that can't be compressed enough
|
||||
messages = [
|
||||
{"role": "user", "content": "x" * 100000},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=100, client=None, reserve=50
|
||||
)
|
||||
|
||||
# Should have an error since we can't get below 100 tokens
|
||||
assert result.error is not None
|
||||
assert result.was_compacted is True
|
||||
|
||||
|
||||
class TestCompressResultDataclass:
|
||||
"""Test CompressResult dataclass."""
|
||||
|
||||
def test_default_values(self):
|
||||
"""Test default values are set correctly."""
|
||||
result = CompressResult(
|
||||
messages=[{"role": "user", "content": "test"}],
|
||||
token_count=10,
|
||||
was_compacted=False,
|
||||
)
|
||||
|
||||
assert result.error is None
|
||||
assert result.original_token_count == 0 # Defaults to 0, not None
|
||||
assert result.messages_summarized == 0
|
||||
assert result.messages_dropped == 0
|
||||
|
||||
def test_all_fields(self):
|
||||
"""Test all fields can be set."""
|
||||
result = CompressResult(
|
||||
messages=[{"role": "user", "content": "test"}],
|
||||
token_count=100,
|
||||
was_compacted=True,
|
||||
error="Some error",
|
||||
original_token_count=500,
|
||||
messages_summarized=10,
|
||||
messages_dropped=5,
|
||||
)
|
||||
|
||||
assert result.token_count == 100
|
||||
assert result.was_compacted is True
|
||||
assert result.error == "Some error"
|
||||
assert result.original_token_count == 500
|
||||
assert result.messages_summarized == 10
|
||||
assert result.messages_dropped == 5
|
||||
|
||||
@@ -102,7 +102,7 @@ class TestDecomposeGoalExternal:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_with_context(self):
|
||||
"""Test decomposition with additional context."""
|
||||
"""Test decomposition with additional context enriched into description."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
@@ -119,9 +119,12 @@ class TestDecomposeGoalExternal:
|
||||
"Build a chatbot", context="Use Python"
|
||||
)
|
||||
|
||||
expected_description = (
|
||||
"Build a chatbot\n\nAdditional context from user:\nUse Python"
|
||||
)
|
||||
mock_client.post.assert_called_once_with(
|
||||
"/api/decompose-description",
|
||||
json={"description": "Build a chatbot", "user_instruction": "Use Python"},
|
||||
json={"description": expected_description},
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Reference in New Issue
Block a user