merge: resolve import conflict in copilot/db.py after merging dev

Keep both ChatSessionMetadata (from feature branch) and
invalidate_session_cache (from dev) imports.
This commit is contained in:
Zamil Majdy
2026-03-31 09:17:27 +02:00
16 changed files with 228 additions and 15 deletions

View File

@@ -18,7 +18,13 @@ from prisma.types import (
from backend.data import db
from backend.util.json import SafeJson, sanitize_string
from .model import ChatMessage, ChatSession, ChatSessionInfo, ChatSessionMetadata
from .model import (
ChatMessage,
ChatSession,
ChatSessionInfo,
ChatSessionMetadata,
invalidate_session_cache,
)
logger = logging.getLogger(__name__)
@@ -224,6 +230,9 @@ async def add_chat_messages_batch(
if msg.get("function_call") is not None:
data["functionCall"] = SafeJson(msg["function_call"])
if msg.get("duration_ms") is not None:
data["durationMs"] = msg["duration_ms"]
messages_data.append(data)
# Run create_many and session update in parallel within transaction
@@ -366,3 +375,22 @@ async def update_tool_message_content(
f"tool_call_id {tool_call_id}: {e}"
)
return False
async def set_turn_duration(session_id: str, duration_ms: int) -> None:
"""Set durationMs on the last assistant message in a session.
Also invalidates the Redis session cache so the next GET returns
the updated duration.
"""
last_msg = await PrismaChatMessage.prisma().find_first(
where={"sessionId": session_id, "role": "assistant"},
order={"sequence": "desc"},
)
if last_msg:
await PrismaChatMessage.prisma().update(
where={"id": last_msg.id},
data={"durationMs": duration_ms},
)
# Invalidate cache so the session is re-fetched from DB with durationMs
await invalidate_session_cache(session_id)

View File

@@ -64,6 +64,7 @@ class ChatMessage(BaseModel):
refusal: str | None = None
tool_calls: list[dict] | None = None
function_call: dict | None = None
duration_ms: int | None = None
@staticmethod
def from_db(prisma_message: PrismaChatMessage) -> "ChatMessage":
@@ -76,6 +77,7 @@ class ChatMessage(BaseModel):
refusal=prisma_message.refusal,
tool_calls=_parse_json_field(prisma_message.toolCalls),
function_call=_parse_json_field(prisma_message.functionCall),
duration_ms=prisma_message.durationMs,
)

View File

@@ -26,6 +26,7 @@ import orjson
from redis.exceptions import RedisError
from backend.api.model import CopilotCompletionPayload
from backend.data.db_accessors import chat_db
from backend.data.notification_bus import (
AsyncRedisNotificationEventBus,
NotificationEvent,
@@ -111,6 +112,14 @@ def _parse_session_meta(meta: dict[Any, Any], session_id: str = "") -> ActiveSes
``session_id`` is used as a fallback for ``turn_id`` when the meta hash
pre-dates the turn_id field (backward compat for in-flight sessions).
"""
created_at = datetime.now(timezone.utc)
created_at_raw = meta.get("created_at")
if created_at_raw:
try:
created_at = datetime.fromisoformat(str(created_at_raw))
except (ValueError, TypeError):
pass
return ActiveSession(
session_id=meta.get("session_id", "") or session_id,
user_id=meta.get("user_id", "") or None,
@@ -119,6 +128,7 @@ def _parse_session_meta(meta: dict[Any, Any], session_id: str = "") -> ActiveSes
turn_id=meta.get("turn_id", "") or session_id,
blocking=meta.get("blocking") == "1",
status=meta.get("status", "running"), # type: ignore[arg-type]
created_at=created_at,
)
@@ -802,6 +812,33 @@ async def mark_session_completed(
f"Failed to publish error event for session {session_id}: {e}"
)
# Compute wall-clock duration from session created_at.
# Only persist when (a) the session completed successfully and
# (b) created_at was actually present in Redis meta (not a fallback).
duration_ms: int | None = None
if meta and not error_message:
created_at_raw = meta.get("created_at")
if created_at_raw:
try:
created_at = datetime.fromisoformat(str(created_at_raw))
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
elapsed = datetime.now(timezone.utc) - created_at
duration_ms = max(0, int(elapsed.total_seconds() * 1000))
except (ValueError, TypeError):
logger.warning(
"Failed to compute session duration for %s (created_at=%r)",
session_id,
created_at_raw,
)
# Persist duration on the last assistant message
if duration_ms is not None:
try:
await chat_db().set_turn_duration(session_id, duration_ms)
except Exception as e:
logger.warning(f"Failed to save turn duration for {session_id}: {e}")
# Publish StreamFinish AFTER status is set to "completed"/"failed".
# This is the SINGLE place that publishes StreamFinish — services and
# the processor must NOT publish it themselves.

View File

@@ -344,6 +344,7 @@ class DatabaseManager(AppService):
get_next_sequence = _(chat_db.get_next_sequence)
update_tool_message_content = _(chat_db.update_tool_message_content)
update_chat_session_title = _(chat_db.update_chat_session_title)
set_turn_duration = _(chat_db.set_turn_duration)
class DatabaseManagerClient(AppServiceClient):
@@ -540,3 +541,4 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_next_sequence = d.get_next_sequence
update_tool_message_content = d.update_tool_message_content
update_chat_session_title = d.update_chat_session_title
set_turn_duration = d.set_turn_duration

View File

@@ -0,0 +1,2 @@
-- Add durationMs column to ChatMessage for persisting turn elapsed time.
ALTER TABLE "ChatMessage" ADD COLUMN "durationMs" INTEGER;

View File

@@ -250,7 +250,8 @@ model ChatMessage {
functionCall Json? // Deprecated but kept for compatibility
// Ordering within session
sequence Int
sequence Int
durationMs Int? // Wall-clock milliseconds for this assistant turn
@@unique([sessionId, sequence])
}

View File

@@ -95,6 +95,8 @@ export function CopilotPage() {
isDeleting,
handleConfirmDelete,
handleCancelDelete,
// Historical durations for persisted timer stats
historicalDurations,
// Rate limit reset
rateLimitMessage,
dismissRateLimit,
@@ -186,6 +188,7 @@ export function CopilotPage() {
isUploadingFiles={isUploadingFiles}
droppedFiles={droppedFiles}
onDroppedFilesConsumed={handleDroppedFilesConsumed}
historicalDurations={historicalDurations}
/>
</div>
</div>

View File

@@ -27,6 +27,8 @@ export interface ChatContainerProps {
droppedFiles?: File[];
/** Called after droppedFiles have been consumed by ChatInput. */
onDroppedFilesConsumed?: () => void;
/** Duration in ms for historical turns, keyed by message ID. */
historicalDurations?: Map<string, number>;
}
export const ChatContainer = ({
messages,
@@ -44,6 +46,7 @@ export const ChatContainer = ({
isUploadingFiles,
droppedFiles,
onDroppedFilesConsumed,
historicalDurations,
}: ChatContainerProps) => {
const isBusy =
status === "streaming" ||
@@ -81,6 +84,7 @@ export const ChatContainer = ({
isLoading={isLoadingSession}
sessionID={sessionId}
onRetry={handleRetry}
historicalDurations={historicalDurations}
/>
<motion.div
initial={{ opacity: 0 }}

View File

@@ -1,4 +1,4 @@
import { useMemo } from "react";
import { useEffect, useMemo, useRef } from "react";
import {
Conversation,
ConversationContent,
@@ -13,6 +13,7 @@ import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner
import { FileUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { TOOL_PART_PREFIX } from "../JobStatsBar/constants";
import { TurnStatsBar } from "../JobStatsBar/TurnStatsBar";
import { useElapsedTimer } from "../JobStatsBar/useElapsedTimer";
import { CopilotPendingReviews } from "../CopilotPendingReviews/CopilotPendingReviews";
import {
buildRenderSegments,
@@ -37,6 +38,7 @@ interface Props {
isLoading: boolean;
sessionID?: string | null;
onRetry?: () => void;
historicalDurations?: Map<string, number>;
}
function renderSegments(
@@ -111,6 +113,7 @@ export function ChatMessagesContainer({
isLoading,
sessionID,
onRetry,
historicalDurations,
}: Props) {
const lastMessage = messages[messages.length - 1];
const graphExecId = useMemo(() => extractGraphExecId(messages), [messages]);
@@ -139,6 +142,25 @@ export function ChatMessagesContainer({
const showThinking =
status === "submitted" || (status === "streaming" && !hasInflight);
const isActivelyStreaming = status === "streaming" || status === "submitted";
const { elapsedSeconds } = useElapsedTimer(isActivelyStreaming);
// Freeze elapsed time when streaming ends so TurnStatsBar shows the final value.
// Reset when a new streaming turn begins.
const frozenElapsedRef = useRef(0);
const wasStreamingRef = useRef(false);
useEffect(() => {
if (isActivelyStreaming) {
if (!wasStreamingRef.current) {
frozenElapsedRef.current = 0;
}
if (elapsedSeconds > 0) {
frozenElapsedRef.current = elapsedSeconds;
}
}
wasStreamingRef.current = isActivelyStreaming;
});
return (
<Conversation className="min-h-0 flex-1">
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
@@ -239,10 +261,19 @@ export function ChatMessagesContainer({
{isLastInTurn && !isCurrentlyStreaming && (
<TurnStatsBar
turnMessages={getTurnMessages(messages, messageIndex)}
elapsedSeconds={
messageIndex === messages.length - 1
? frozenElapsedRef.current
: undefined
}
durationMs={historicalDurations?.get(message.id)}
/>
)}
{isLastAssistant && showThinking && (
<ThinkingIndicator active={showThinking} />
<ThinkingIndicator
active={showThinking}
elapsedSeconds={elapsedSeconds}
/>
)}
</MessageContent>
{message.role === "user" && textParts.length > 0 && (
@@ -268,7 +299,10 @@ export function ChatMessagesContainer({
{showThinking && lastMessage?.role !== "assistant" && (
<Message from="assistant">
<MessageContent className="text-[1rem] leading-relaxed">
<ThinkingIndicator active={showThinking} />
<ThinkingIndicator
active={showThinking}
elapsedSeconds={elapsedSeconds}
/>
</MessageContent>
</Message>
)}

View File

@@ -1,4 +1,5 @@
import { useEffect, useRef, useState } from "react";
import { formatElapsed } from "../../JobStatsBar/formatElapsed";
import { ScaleLoader } from "../../ScaleLoader/ScaleLoader";
const THINKING_PHRASES = [
@@ -27,6 +28,9 @@ const THINKING_PHRASES = [
const PHRASE_CYCLE_MS = 6_000;
const FADE_DURATION_MS = 300;
/** Only show elapsed time after this many seconds. */
const SHOW_TIME_AFTER_SECONDS = 20;
/**
* Cycles through thinking phrases sequentially with a fade-out/in transition.
* Returns the current phrase and whether it's visible (for opacity).
@@ -72,10 +76,12 @@ function useCyclingPhrase(active: boolean) {
interface Props {
active: boolean;
elapsedSeconds: number;
}
export function ThinkingIndicator({ active }: Props) {
export function ThinkingIndicator({ active, elapsedSeconds }: Props) {
const { phrase, visible } = useCyclingPhrase(active);
const showTime = active && elapsedSeconds >= SHOW_TIME_AFTER_SECONDS;
return (
<span className="inline-flex items-center gap-1.5 text-neutral-500">
@@ -88,6 +94,11 @@ export function ThinkingIndicator({ active }: Props) {
{phrase}
</span>
</span>
{showTime && (
<span className="animate-pulse tabular-nums [animation-duration:1.5s]">
{formatElapsed(elapsedSeconds)}
</span>
)}
</span>
);
}

View File

@@ -1,21 +1,44 @@
import type { UIDataTypes, UIMessage, UITools } from "ai";
import { formatElapsed } from "./formatElapsed";
import { getWorkDoneCounters } from "./useWorkDoneCounters";
interface Props {
turnMessages: UIMessage<unknown, UIDataTypes, UITools>[];
elapsedSeconds?: number;
durationMs?: number;
}
export function TurnStatsBar({ turnMessages }: Props) {
export function TurnStatsBar({
turnMessages,
elapsedSeconds,
durationMs,
}: Props) {
const { counters } = getWorkDoneCounters(turnMessages);
if (counters.length === 0) return null;
// Prefer live elapsedSeconds, fall back to persisted durationMs
const displaySeconds =
elapsedSeconds !== undefined && elapsedSeconds > 0
? elapsedSeconds
: durationMs !== undefined
? Math.round(durationMs / 1000)
: undefined;
const hasTime = displaySeconds !== undefined && displaySeconds > 0;
if (counters.length === 0 && !hasTime) return null;
return (
<div className="mt-2 flex items-center gap-1.5">
{hasTime && (
<span className="text-[11px] tabular-nums text-neutral-500">
Thought for {formatElapsed(displaySeconds)}
</span>
)}
{counters.map(function renderCounter(counter, index) {
const needsDot = index > 0 || hasTime;
return (
<span key={counter.category} className="flex items-center gap-1">
{index > 0 && (
{needsDot && (
<span className="text-xs text-neutral-300">&middot;</span>
)}
<span className="text-[11px] tabular-nums text-neutral-500">

View File

@@ -0,0 +1,7 @@
export function formatElapsed(totalSeconds: number): string {
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
if (minutes === 0) return `${seconds}s`;
return `${minutes}m ${seconds}s`;
}

View File

@@ -0,0 +1,31 @@
import { useEffect, useRef, useState } from "react";
export function useElapsedTimer(isRunning: boolean) {
const [elapsedSeconds, setElapsedSeconds] = useState(0);
const startTimeRef = useRef<number | null>(null);
const intervalRef = useRef<ReturnType<typeof setInterval>>();
useEffect(() => {
if (isRunning) {
if (startTimeRef.current === null) {
startTimeRef.current = Date.now();
setElapsedSeconds(0);
}
intervalRef.current = setInterval(() => {
if (startTimeRef.current !== null) {
setElapsedSeconds(
Math.floor((Date.now() - startTimeRef.current) / 1000),
);
}
}, 1000);
return () => clearInterval(intervalRef.current);
}
clearInterval(intervalRef.current);
startTimeRef.current = null;
}, [isRunning]);
return { elapsedSeconds };
}

View File

@@ -6,6 +6,7 @@ interface SessionChatMessage {
content: string | null;
tool_call_id: string | null;
tool_calls: unknown[] | null;
duration_ms: number | null;
}
function coerceSessionChatMessages(
@@ -34,6 +35,8 @@ function coerceSessionChatMessages(
? null
: String(msg.tool_call_id),
tool_calls: Array.isArray(msg.tool_calls) ? msg.tool_calls : null,
duration_ms:
typeof msg.duration_ms === "number" ? msg.duration_ms : null,
};
})
.filter((m): m is SessionChatMessage => m !== null);
@@ -102,7 +105,10 @@ export function convertChatSessionMessagesToUiMessages(
sessionId: string,
rawMessages: unknown[],
options?: { isComplete?: boolean },
): UIMessage<unknown, UIDataTypes, UITools>[] {
): {
messages: UIMessage<unknown, UIDataTypes, UITools>[];
durations: Map<string, number>;
} {
const messages = coerceSessionChatMessages(rawMessages);
const toolOutputsByCallId = new Map<string, unknown>();
@@ -114,6 +120,7 @@ export function convertChatSessionMessagesToUiMessages(
}
const uiMessages: UIMessage<unknown, UIDataTypes, UITools>[] = [];
const durations = new Map<string, number>();
messages.forEach((msg, index) => {
if (msg.role === "tool") return;
@@ -186,15 +193,24 @@ export function convertChatSessionMessagesToUiMessages(
const prevUI = uiMessages[uiMessages.length - 1];
if (msg.role === "assistant" && prevUI && prevUI.role === "assistant") {
prevUI.parts.push(...parts);
// Capture duration on merged message (last assistant msg wins)
if (msg.duration_ms != null) {
durations.set(prevUI.id, msg.duration_ms);
}
return;
}
const msgId = `${sessionId}-${index}`;
uiMessages.push({
id: `${sessionId}-${index}`,
id: msgId,
role: msg.role,
parts,
});
if (msg.role === "assistant" && msg.duration_ms != null) {
durations.set(msgId, msg.duration_ms);
}
});
return uiMessages;
return { messages: uiMessages, durations };
}

View File

@@ -61,13 +61,21 @@ export function useChatSession() {
// array reference every render. Re-derives only when query data changes.
// When the session is complete (no active stream), mark dangling tool
// calls as completed so stale spinners don't persist after refresh.
const hydratedMessages = useMemo(() => {
if (sessionQuery.data?.status !== 200 || !sessionId) return undefined;
return convertChatSessionMessagesToUiMessages(
const { hydratedMessages, historicalDurations } = useMemo(() => {
if (sessionQuery.data?.status !== 200 || !sessionId)
return {
hydratedMessages: undefined,
historicalDurations: new Map<string, number>(),
};
const result = convertChatSessionMessagesToUiMessages(
sessionId,
sessionQuery.data.data.messages ?? [],
{ isComplete: !hasActiveStream },
);
return {
hydratedMessages: result.messages,
historicalDurations: result.durations,
};
}, [sessionQuery.data, sessionId, hasActiveStream]);
const { mutateAsync: createSessionMutation, isPending: isCreatingSession } =
@@ -122,6 +130,7 @@ export function useChatSession() {
sessionId,
setSessionId,
hydratedMessages,
historicalDurations,
hasActiveStream,
isLoadingSession: sessionQuery.isLoading,
isSessionError: sessionQuery.isError,

View File

@@ -39,6 +39,7 @@ export function useCopilotPage() {
sessionId,
setSessionId,
hydratedMessages,
historicalDurations,
hasActiveStream,
isLoadingSession,
isSessionError,
@@ -377,6 +378,8 @@ export function useCopilotPage() {
handleDeleteClick,
handleConfirmDelete,
handleCancelDelete,
// Historical durations for persisted timer stats
historicalDurations,
// Rate limit reset
rateLimitMessage,
dismissRateLimit,