mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend/copilot): backend idempotency guard + frontend dedup fix for duplicate messages
Backend: add Redis NX key (30 s TTL) keyed on session_id + message hash before saving user messages. Duplicate POSTs within the window (k8s rolling-deploy retries, nginx upstream retries, rapid double-clicks) return an empty SSE stream (StreamFinish + [DONE]) so the frontend marks the turn done without creating a ghost response. Frontend: stop clearing lastSubmittedMsgRef on stream success. The ref previously went to null after every completed turn, destroying the getSendSuppressionReason dedup guard and allowing the same text to be re-submitted if the UI appeared stalled. Keeping the ref populated means any rapid re-submit of identical text while the last user message is still in chat is caught and suppressed. Root cause of the observed 3× duplicate messages: k8s rolling deploy (triggered by the #12782 merge) caused infrastructure-level POST retries to new pods while the original stream was still running on the old pod. Both fixes together prevent recurrence. Tests: 3 new backend route tests, 5 new frontend helper unit tests.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
"""Chat API routes for chat session management and streaming via SSE."""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import AsyncGenerator
|
||||
@@ -838,6 +839,37 @@ async def stream_chat_post(
|
||||
)
|
||||
request.message += files_block
|
||||
|
||||
# ── Idempotency guard ────────────────────────────────────────────────────
|
||||
# Prevent duplicate executor tasks from concurrent/retry POSTs (e.g. k8s
|
||||
# rolling-deploy retries, nginx upstream retries, rapid double-clicks).
|
||||
# We set a Redis NX key keyed on session_id + message hash with a 30 s TTL.
|
||||
# The first POST wins; any subsequent identical POST within the window gets
|
||||
# an empty SSE stream back so the frontend marks the turn done without
|
||||
# creating a ghost response.
|
||||
if request.message and request.is_user_message:
|
||||
_content_hash = hashlib.sha256(
|
||||
f"{session_id}:{request.message}".encode()
|
||||
).hexdigest()[:16]
|
||||
_dedup_key = f"chat:msg_dedup:{session_id}:{_content_hash}"
|
||||
_redis = await get_redis_async()
|
||||
_is_new_msg = await _redis.set(_dedup_key, "1", ex=30, nx=True)
|
||||
if not _is_new_msg:
|
||||
logger.warning(
|
||||
f"[STREAM] Duplicate user message blocked for session {session_id}, "
|
||||
f"hash={_content_hash} — returning empty SSE",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
async def _empty_sse() -> AsyncGenerator[str, None]:
|
||||
yield StreamFinish().to_sse()
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
_empty_sse(),
|
||||
media_type="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||||
)
|
||||
|
||||
# Atomically append user message to session BEFORE creating task to avoid
|
||||
# race condition where GET_SESSION sees task as "running" but message isn't
|
||||
# saved yet. append_and_save_message re-fetches inside a lock to prevent
|
||||
|
||||
@@ -133,9 +133,19 @@ def test_stream_chat_rejects_too_many_file_ids():
|
||||
assert response.status_code == 422
|
||||
|
||||
|
||||
def _mock_stream_internals(mocker: pytest_mock.MockFixture):
|
||||
def _mock_stream_internals(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
*,
|
||||
redis_set_returns: object = True,
|
||||
):
|
||||
"""Mock the async internals of stream_chat_post so tests can exercise
|
||||
validation and enrichment logic without needing Redis/RabbitMQ."""
|
||||
validation and enrichment logic without needing Redis/RabbitMQ.
|
||||
|
||||
Args:
|
||||
redis_set_returns: Value returned by the mocked Redis ``set`` call.
|
||||
``True`` (default) simulates a fresh key (new message);
|
||||
``None`` simulates a collision (duplicate blocked).
|
||||
"""
|
||||
mocker.patch(
|
||||
"backend.api.features.chat.routes._validate_and_get_session",
|
||||
return_value=None,
|
||||
@@ -158,6 +168,14 @@ def _mock_stream_internals(mocker: pytest_mock.MockFixture):
|
||||
"backend.api.features.chat.routes.track_user_message",
|
||||
return_value=None,
|
||||
)
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.set = AsyncMock(return_value=redis_set_returns)
|
||||
mocker.patch(
|
||||
"backend.api.features.chat.routes.get_redis_async",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_redis,
|
||||
)
|
||||
return mock_redis
|
||||
|
||||
|
||||
def test_stream_chat_accepts_20_file_ids(mocker: pytest_mock.MockFixture):
|
||||
@@ -677,3 +695,66 @@ class TestStripInjectedContext:
|
||||
result = _strip_injected_context(msg)
|
||||
# Without a role, the helper short-circuits without touching content.
|
||||
assert result["content"] == "hello"
|
||||
|
||||
|
||||
# ─── Idempotency / duplicate-POST guard ──────────────────────────────
|
||||
|
||||
|
||||
def test_stream_chat_blocks_duplicate_post_returns_empty_sse(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
) -> None:
|
||||
"""A second POST with the same message within the 30-s window must return
|
||||
an empty SSE stream (StreamFinish + [DONE]) so the frontend marks the
|
||||
turn complete without creating a ghost response."""
|
||||
# redis_set_returns=None simulates a collision: the NX key already exists.
|
||||
_mock_stream_internals(mocker, redis_set_returns=None)
|
||||
|
||||
response = client.post(
|
||||
"/sessions/sess-dup/stream",
|
||||
json={"message": "duplicate message", "is_user_message": True},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.text
|
||||
# The response must contain StreamFinish (type=finish) and the SSE [DONE] terminator.
|
||||
assert '"finish"' in body
|
||||
assert "[DONE]" in body
|
||||
|
||||
|
||||
def test_stream_chat_first_post_proceeds_normally(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
) -> None:
|
||||
"""The first POST (Redis NX key set successfully) must proceed through the
|
||||
normal streaming path — no early return."""
|
||||
mock_redis = _mock_stream_internals(mocker, redis_set_returns=True)
|
||||
|
||||
response = client.post(
|
||||
"/sessions/sess-new/stream",
|
||||
json={"message": "first message", "is_user_message": True},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Redis set must have been called once with the NX flag.
|
||||
mock_redis.set.assert_called_once()
|
||||
call_kwargs = mock_redis.set.call_args
|
||||
assert call_kwargs.kwargs.get("nx") is True or (
|
||||
len(call_kwargs.args) > 3 and call_kwargs.args[3] is True
|
||||
)
|
||||
|
||||
|
||||
def test_stream_chat_dedup_skipped_for_non_user_messages(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
) -> None:
|
||||
"""System/assistant messages (is_user_message=False) bypass the dedup
|
||||
guard — they are injected programmatically and must always be processed."""
|
||||
mock_redis = _mock_stream_internals(mocker, redis_set_returns=None)
|
||||
|
||||
response = client.post(
|
||||
"/sessions/sess-sys/stream",
|
||||
json={"message": "system context", "is_user_message": False},
|
||||
)
|
||||
|
||||
# Even though redis_set_returns=None (would block a user message),
|
||||
# the endpoint must proceed because is_user_message=False.
|
||||
assert response.status_code == 200
|
||||
mock_redis.set.assert_not_called()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { IMPERSONATION_HEADER_NAME } from "@/lib/constants";
|
||||
import { getCopilotAuthHeaders } from "../helpers";
|
||||
import { getCopilotAuthHeaders, getSendSuppressionReason } from "../helpers";
|
||||
import type { UIMessage } from "ai";
|
||||
|
||||
vi.mock("@/lib/supabase/actions", () => ({
|
||||
getWebSocketToken: vi.fn(),
|
||||
@@ -72,3 +73,71 @@ describe("getCopilotAuthHeaders", () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── getSendSuppressionReason ─────────────────────────────────────────────────
|
||||
|
||||
function makeUserMsg(text: string): UIMessage {
|
||||
return {
|
||||
id: "msg-1",
|
||||
role: "user",
|
||||
content: text,
|
||||
parts: [{ type: "text", text }],
|
||||
} as UIMessage;
|
||||
}
|
||||
|
||||
describe("getSendSuppressionReason", () => {
|
||||
it("returns null when no dedup context exists (fresh ref)", () => {
|
||||
const result = getSendSuppressionReason({
|
||||
text: "hello",
|
||||
isReconnectScheduled: false,
|
||||
lastSubmittedText: null,
|
||||
messages: [],
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("returns 'reconnecting' when reconnect is scheduled regardless of text", () => {
|
||||
const result = getSendSuppressionReason({
|
||||
text: "hello",
|
||||
isReconnectScheduled: true,
|
||||
lastSubmittedText: null,
|
||||
messages: [],
|
||||
});
|
||||
expect(result).toBe("reconnecting");
|
||||
});
|
||||
|
||||
it("returns 'duplicate' when same text was submitted and is the last user message", () => {
|
||||
// This is the core regression test: after a successful turn the ref
|
||||
// is intentionally NOT cleared to null, so submitting the same text
|
||||
// again is caught here.
|
||||
const result = getSendSuppressionReason({
|
||||
text: "hello",
|
||||
isReconnectScheduled: false,
|
||||
lastSubmittedText: "hello",
|
||||
messages: [makeUserMsg("hello")],
|
||||
});
|
||||
expect(result).toBe("duplicate");
|
||||
});
|
||||
|
||||
it("returns null when same ref text but different last user message (different question)", () => {
|
||||
// User asked "hello" before, got a reply, then asked a different question
|
||||
// — the last user message in chat is now different, so no suppression.
|
||||
const result = getSendSuppressionReason({
|
||||
text: "hello",
|
||||
isReconnectScheduled: false,
|
||||
lastSubmittedText: "hello",
|
||||
messages: [makeUserMsg("hello"), makeUserMsg("something else")],
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when text differs from lastSubmittedText", () => {
|
||||
const result = getSendSuppressionReason({
|
||||
text: "new question",
|
||||
isReconnectScheduled: false,
|
||||
lastSubmittedText: "old question",
|
||||
messages: [makeUserMsg("old question")],
|
||||
});
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -458,7 +458,12 @@ export function useCopilotStream({
|
||||
if (status === "ready") {
|
||||
reconnectAttemptsRef.current = 0;
|
||||
hasShownDisconnectToast.current = false;
|
||||
lastSubmittedMsgRef.current = null;
|
||||
// Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last
|
||||
// submitted text prevents getSendSuppressionReason from allowing a
|
||||
// duplicate POST of the same message immediately after a successful turn
|
||||
// (the "duplicate" branch checks both the ref and the visible last user
|
||||
// message, so legitimate re-sends after a different reply are still
|
||||
// allowed).
|
||||
setReconnectExhausted(false);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user