mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend/copilot): skip enqueue_copilot_turn when append_and_save_message detects duplicate
This commit is contained in:
@@ -875,8 +875,9 @@ async def stream_chat_post(
|
||||
|
||||
# Atomically append user message to session BEFORE creating task to avoid
|
||||
# race condition where GET_SESSION sees task as "running" but message isn't
|
||||
# saved yet. append_and_save_message is idempotent — duplicate POSTs with
|
||||
# the same content are silently skipped at the DB layer.
|
||||
# saved yet. append_and_save_message returns None when a duplicate is
|
||||
# detected — in that case skip enqueue to avoid processing the message twice.
|
||||
is_duplicate_message = False
|
||||
try:
|
||||
if request.message:
|
||||
message = ChatMessage(
|
||||
@@ -890,7 +891,8 @@ async def stream_chat_post(
|
||||
message_length=len(request.message),
|
||||
)
|
||||
logger.info(f"[STREAM] Saving user message to session {session_id}")
|
||||
await append_and_save_message(session_id, message)
|
||||
saved = await append_and_save_message(session_id, message)
|
||||
is_duplicate_message = saved is None
|
||||
logger.info(f"[STREAM] User message saved for session {session_id}")
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
@@ -915,17 +917,22 @@ async def stream_chat_post(
|
||||
},
|
||||
)
|
||||
|
||||
await enqueue_copilot_turn(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
message=request.message,
|
||||
turn_id=turn_id,
|
||||
is_user_message=request.is_user_message,
|
||||
context=request.context,
|
||||
file_ids=sanitized_file_ids,
|
||||
mode=request.mode,
|
||||
model=request.model,
|
||||
)
|
||||
if not is_duplicate_message:
|
||||
await enqueue_copilot_turn(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
message=request.message,
|
||||
turn_id=turn_id,
|
||||
is_user_message=request.is_user_message,
|
||||
context=request.context,
|
||||
file_ids=sanitized_file_ids,
|
||||
mode=request.mode,
|
||||
model=request.model,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"[STREAM] Duplicate message detected for session {session_id}, skipping enqueue"
|
||||
)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
||||
@@ -149,7 +149,7 @@ def _mock_stream_internals(mocker: pytest_mock.MockerFixture):
|
||||
)
|
||||
mock_save = mocker.patch(
|
||||
"backend.api.features.chat.routes.append_and_save_message",
|
||||
return_value=None,
|
||||
return_value=MagicMock(), # non-None = message was saved (not a duplicate)
|
||||
)
|
||||
mock_registry = mocker.MagicMock()
|
||||
mock_registry.create_session = mocker.AsyncMock(return_value=None)
|
||||
@@ -194,6 +194,26 @@ def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockerFixture):
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
# ─── Duplicate message dedup ──────────────────────────────────────────
|
||||
|
||||
|
||||
def test_stream_chat_skips_enqueue_for_duplicate_message(
|
||||
mocker: pytest_mock.MockerFixture,
|
||||
):
|
||||
"""When append_and_save_message returns None (duplicate detected),
|
||||
enqueue_copilot_turn must NOT be called to avoid double-processing."""
|
||||
mocks = _mock_stream_internals(mocker)
|
||||
# Override save to return None — signalling a duplicate
|
||||
mocks.save.return_value = None
|
||||
|
||||
response = client.post(
|
||||
"/sessions/sess-1/stream",
|
||||
json={"message": "hello"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
mocks.enqueue.assert_not_called()
|
||||
|
||||
|
||||
# ─── UUID format filtering ─────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -647,9 +647,15 @@ async def _save_session_to_db(
|
||||
msg.sequence = existing_message_count + i
|
||||
|
||||
|
||||
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
|
||||
async def append_and_save_message(
|
||||
session_id: str, message: ChatMessage
|
||||
) -> ChatSession | None:
|
||||
"""Atomically append a message to a session and persist it.
|
||||
|
||||
Returns the updated session, or None if the message was detected as a
|
||||
duplicate (idempotency guard). Callers must check for None and skip any
|
||||
downstream work (e.g. enqueuing a new LLM turn) when a duplicate is detected.
|
||||
|
||||
Uses _get_session_lock (Redis NX) to serialise concurrent writers across replicas.
|
||||
The idempotency check below provides a last-resort guard when the lock degrades.
|
||||
"""
|
||||
@@ -676,7 +682,7 @@ async def append_and_save_message(session_id: str, message: ChatMessage) -> Chat
|
||||
and session.messages[-1].role == message.role
|
||||
and session.messages[-1].content == message.content
|
||||
):
|
||||
return session
|
||||
return None # duplicate — caller should skip enqueue
|
||||
|
||||
session.messages.append(message)
|
||||
existing_message_count = await chat_db().get_next_sequence(session_id)
|
||||
|
||||
Reference in New Issue
Block a user